await async lambda в ActionBlock

У меня есть класс Receiver с ActionBlock:

public class Receiver<T> : IReceiver<T>
{

  private ActionBlock<T> _receiver;

  public Task<bool> Send(T item) 
  {
     if(_receiver!=null)
        return _receiver.SendAsync(item);

     //Do some other stuff her
  }

  public void Register (Func<T, Task> receiver)
  {
    _receiver = new ActionBlock<T> (receiver);
  }

  //...
}

Register-Action для ActionBlock - это асинхронный метод с оператором await:

private static async Task Writer(int num)
{
   Console.WriteLine("start " + num);
   await Task.Delay(500);
   Console.WriteLine("end " + num);
}

Теперь я хочу синхронно ждать (если установлено условие), пока метод действия не завершится, чтобы получить эксклюзивное поведение:

var receiver = new Receiver<int>();
receiver.Register((Func<int, Task) Writer);
receiver.Send(5).Wait(); //does not wait the action-await here!

Проблема в том, когда "await Task.Delay (500);" выполняется оператор "Receiver.Post (5) .Wait ();" больше не ждет.

Я пробовал несколько вариантов (TaskCompletionSource, ContinueWith, ...), но это не работает.

Есть у кого идеи как решить проблему?


person obi111    schedule 04.12.2012    source источник
comment
Не могли бы вы изменить свой код, заменив _receiver на TransformBlock и поместив следующее действие в новый ActionBlock, связанный с _receiver?   -  person svick    schedule 05.12.2012
comment
Вы можете привести мне небольшой пример кода? Я не понимаю, как этот рефакторинг должен решить проблему исключительного поведения.   -  person obi111    schedule 05.12.2012


Ответы (1)


ActionBlock по умолчанию применяет эксклюзивное поведение (одновременно обрабатывается только один элемент). Если под «исключительным поведением» вы имеете в виду что-то еще, вы можете использовать TaskCompletionSource, чтобы уведомить отправителя о завершении действия:

... use ActionBlock<Tuple<int, TaskCompletionSource<object>>> and Receiver<Tuple<int, TaskCompletionSource<object>>>
var receiver = new Receiver<Tuple<int, TaskCompletionSource<object>>>();
receiver.Register((Func<Tuple<int, TaskCompletionSource<object>>, Task) Writer);
var tcs = new TaskCompletionSource<object>();
receiver.Send(Tuple.Create(5, tcs));
tcs.Task.Wait(); // if you must

private static async Task Writer(int num, TaskCompletionSource<object> tcs)
{
  Console.WriteLine("start " + num);
  await Task.Delay(500);
  Console.WriteLine("end " + num);
  tcs.SetResult(null);
}

В качестве альтернативы вы можете использовать AsyncLock (включен в мою библиотеку AsyncEx):

private static AsyncLock mutex = new AsyncLock();

private static async Task Writer(int num)
{
  using (await mutex.LockAsync())
  {
    Console.WriteLine("start " + num);
    await Task.Delay(500);
    Console.WriteLine("end " + num);
  }
}
person Stephen Cleary    schedule 04.12.2012
comment
да, вы правы, что ActionBlock обеспечивает эксклюзивное поведение, но если зарегистрированные действия являются асинхронными, это больше не является настоящим эксклюзивом. да, ваше решение должно работать, но я не хочу добавлять параметр TaskCompletionSource, потому что это действие является точкой входа для эксклюзивной логики - поэтому, если пользователь не вызывает tcs.SetResult, он больше не работает ... - person obi111; 04.12.2012
comment
В этом случае вы можете использовать AsyncLock. См. Обновленный ответ для образца кода. Вы больше не знаете, когда элемент будет обработан, но каждый элемент будет обрабатываться по отдельности (включая async обработку). - person Stephen Cleary; 04.12.2012
comment
хорошо, спасибо, я думаю, это именно то, что мне нужно - я попробую! - person obi111; 04.12.2012