Асинхронные операции ввода-вывода

Операции ввода-вывода в Windows

В процессе выполнения устройством операции ввода-вывода поток исполнения, передавший запрос, простаивает, поэтому Windows переводит его в спящее состояние, чтобы не расходовать процессорное время. Однако при этом поток продолжает занимать место в памяти своими структурами. После завершение операции ввода-выводы Windows пробуждает поток, ставит его в очередь процессора.

Можно представить реализацию веб-приложения, в которой для каждого пришедшего на сервер клиентского запроса формируется запрос к базе данных. В случае синхронного запроса к базе поток из пула окажется заблокированным на время, необходимое для чтения данных из базы. Если в это время придёт ещё один клиентский запрос, пул создаст новый поток. Таким образом потребление ресурсов продолжит расти. Проблема усугубится тем, что при получении результатов из базы несколько потоков могут разблокироваться одновременно, и их число может оказаться выше числа процессоров, что приведёт к частым переключениям контекста.

image

В качестве альтернативы есть способ асинхронного чтения данных.

image

В случае асинхронных операций ввода-вывода в веб-приложении поток не блокируется, а возвращается в пул и может заняться обработкой других клиентский запросов. В этом случае все запросы может обработать один поток. Полученный от базы ответ окажется в очереди пула потоков, то есть его обработает тот же поток. Если элементы в пуле будут появляться быстрее, чем поток сможет их обработать, пул быстро создаст по одному потоку на каждый процессор.

Однако при блокировке потока (выполнение синхронной операции ввода-вывода, вызове метода Thread.Sleep() или ожидании, связанном с блокировкой потока в рамках синхронизации) пул будет уведомлен, что один из потоков прекратил работу. В этом случае создастся дополнительный поток взамен заблокированного, что приведёт к дополнительным затратам времени и памяти. Кроме того, позднее поток может быть разблокирован и процессор окажется перегруженным, провоцируя частые переключения контекста. Эта проблема решается средствами пула. Завершившим и вернувшимся в пул потокам не дают обрабатывать новые элементы, пока загрузка процессора не достигнет определённого уровня. Если впоследствии выяснится, что потоков больше, чем нужно, лишние самоуничтожатся.

Асинхронный ввод-вывод предоставляет и другие преимущества. CLR в начале сборки мусора приостанавливает все потоки, следовательно, чем меньше потоков, тем быстрее произойдёт сборка мусора. Также при сборке будет просматриваться меньше стеков потоков. Также если потоки не были заблокированы, то они окажутся в пуле наверху стека, и поиск корней не займёт много времени. По аналогичным же причинам ускоряется и отладка приложений.

Асинхронные функции C#

Асинхронные операции являются ключом к созданию высокопроизводительных масштабируемых приложений, выполняющих множество операций при помощи небольшого числа потоков. Вместе с пулом потоков они позволяют эффективно задействовать все процессоры в системе. Для этого разработчики CLR разработали модель программирования, основанную на Task и асинхронных функциях языка C#.

private static async Task<String> IssueClientRequestAsync(String serverName, String message)
{
  using (var pipe = new NamedPipeClientStream(serverName, "PipeName", PipeDirection.InOut, PipeOptions.Asynchronous | PipeOptions.WriteThrough))
  {
    pipe.Connect(); // Прежде чем задавать ReadMode, необходимо
    pipe.ReadMode = PipeTransmissionMode.Message; // вызвать Connect

    // Асинхронная отправка данных серверу
    Byte[] request = Encoding.UTF8.GetBytes(message);
    await pipe.WriteAsync(request, 0, request.Length);

    // Асинхронное чтение ответа сервера
    Byte[] response = new Byte[1000];
    Int32 bytesRead = await pipe.ReadAsync(response, 0, response.Length);
    return Encoding.UTF8.GetString(response, 0, bytesRead);
  } // Закрытие канала
}

Асинхронная функция помечается ключевым словом async. Такой метод преобразуется компилятором в конечных автомат. Это позволяет потоку выполнить часть кода в конечном автомате, а затем вернуть управления без выполнения всего метода до завершения. Когда выполнение метода доходит до строки с ключевым словом await, вызывается метод Task.ContinueWith() с передачей метода, возобновляющее выполнение конечного автомата, после чего поток возвращает управление из асинхронного метода.

В будущем драйвер сетевого устройства завершит запись данных в канал, поток из пула оповестит об этом Task, что приведёт к активизации метода обратного вызова ContinueWith(), заставляющего поток возобновить выполнение конечного автомат. Если конкретнее, поток заново входит в асинхронный метод, но в точке оператора await. Теперь выполнится сгенерированный компилятором код, запрашивающий состояние объекта Task. Если операция завершается успешно, оператор await возвращает результат. Аналогичная ситуация произойдёт со следующим оператором await. После чего выполнится оставшаяся часть кода, а сборщик мусора при необходимости освободит память.

Так как асинхронные функции возвращают управление до того, как их конечный автомат отработает до завершения, выполнение метода, вызвавшего асинхронную функцию, продолжится сразу после того, как функция выполнит свой первый оператор await. Вызывающая сторона узнает, что выполнение конечного автомата завершилось, так как при пометке метода ключевым словом async компилятор генерирует код, создающий Task в начале выполнения автомата, который завершится при завершении конечного автомата.

Для асинхронных функций действует ряд ограничений:

  • Метод Main() не может быть асинхронным. Кроме того, асинхронными не могут быть конструкторы, методы доступа свойств и методы доступа событий.
  • Асинхронная функция не может иметь параметры out и ref.
  • Оператор await не может использоваться в блоках catch, finally или unsafe.
  • Не допускается установление блокировки, поддерживающей владение потоком или рекурсию, до операции await и её снятие после этого оператора. Это объясняется тем, что один поток может выполнять код до await, а другой - после. При использовании await с конструкцией C# lock компилятор выдаёт сообщение об ошибке. Если вместо этого явно вызвать методы Monitor.Enter() и Monitor.Exit(), то код скомпилируется, то во время выполнения возникнет исключение.
  • В выражениях запросов оператор await может использоваться только в первом выражении коллекции условия from или в выражении коллекции условия join.

Обо всех этих ограничениях сообщает компилятор.

Преобразование асинхронной функции в конечный автомат

Исходный код:

internal sealed class Type1 { }
internal sealed class Type2 { }
private static async Task<Type1> Method1Async() { /* Асинхронная операция, возвращающая объект Type1 */ }
private static async Task<Type2> Method2Async() { /* Асинхронная операция, возвращающая объект Type2 */ }

private static async Task<String> MyMethodAsync(Int32 argument)
{
  Int32 local = argument;
  try
  {
    Type1 result1 = await Method1Async();
    for (Int32 x = 0; x < 3; x++)
    {
      Type2 result2 = await Method2Async();
    }
  }
  catch (Exception)
  {
    Console.WriteLine("Catch");
  }
  finally
  {
    Console.WriteLine("Finally");
  }
  return "Done";
}

Код конечного автомата:

// Атрибут AsyncStateMachine обозначает асинхронный метод (полезно для инструментов, использующих отражение);
// тип указывает, какая структура реализует конечный автомат.
[DebuggerStepThrough, AsyncStateMachine(typeof(StateMachine))]
private static Task<String> MyMethodAsync(Int32 argument)
{
  // Создание экземпляра конечного автомата и его инициализация
  StateMachine stateMachine = new StateMachine()
  {
    // Создание построителя, возвращающего Task<String>.
    // Конечный автомат обращается к построителю для назначения завершения задания или выдачи исключения.
    m_builder = AsyncTaskMethodBuilder<String>.Create(),
    m_state = ­1, // инициализация местонахождения
    m_argument = argument // Копирование аргументов в поля конечного автомата
  };

  // Начало выполнения конечного автомата.
  stateMachine.m_builder.Start(ref stateMachine);
  return stateMachine.m_builder.Task; // Возвращение задания конечного автомата
} 

// Структура конечного автомата
[CompilerGenerated, StructLayout(LayoutKind.Auto)]
private struct StateMachine : IAsyncStateMachine
{
  // Поля для построителя конечного автомата (Task) и его местонахождения
  public AsyncTaskMethodBuilder<String> m_builder;
  public Int32 m_state;

  // Аргумент и локальные переменные становятся полями:
  public Int32 m_argument, m_local, m_x;
  public Type1 m_resultType1;
  public Type2 m_resultType2;

  // Одно поле на каждый тип Awaiter.
  // В любой момент времени важно только одно из этих полей.
  // В нем хранится ссылка на последний выполненный экземпляр await, который завершается асинхронно:
  private TaskAwaiter<Type1> m_awaiterType1;
  private TaskAwaiter<Type2> m_awaiterType2;

  // Сам конечный автомат
  void IAsyncStateMachine.MoveNext()
  {
    String result = null; // Результат Task

    // Вставленный компилятором блок try гарантирует завершение задания конечного автомата
    try
    {
      Boolean executeFinally = true; // Логический выход из блока 'try'

      if (m_state == ­1)
      {                       // Если метод конечного автомата выполняется впервые
        m_local = m_argument; // Выполнить начало исходного метода
      }

      // Блок try из исходного кода
      try
      {
        TaskAwaiter<Type1> awaiterType1;
        TaskAwaiter<Type2> awaiterType2;

        switch (m_state)
        {
          case ­1: // Начало исполнения кода в 'try'
          // вызвать Method1Async и получить его объект ожидания
          awaiterType1 = Method1Async().GetAwaiter();

          if (!awaiterType1.IsCompleted)
          {
            m_state = 0; // 'Method1Async' завершается асинхронно
            m_awaiterType1 = awaiterType1; // Сохранить объект ожидания до возвращения
            // Приказать объекту ожидания вызвать MoveNext после завершения операции
            m_builder.AwaitUnsafeOnCompleted(ref awaiterType1, ref this);

            // Предыдущая строка вызывает метод OnCompleted объекта awaiterType1, что приводит к вызову ContinueWith(t => MoveNext()) для Task.
            // При завершении Task ContinueWith вызывает MoveNext
            executeFinally = false; // Без логического выхода из блока 'try'
            return; // Поток возвращает управление вызывающей стороне
          }

          // 'Method1Async' завершается синхронно.
          break;

          case 0: // 'Method1Async' завершается асинхронно
            awaiterType1 = m_awaiterType1; // Восстановление последнего объекта ожидания
            break; 

          case 1: // 'Method2Async' завершается асинхронно
            awaiterType2 = m_awaiterType2; // Восстановление последнего объекта ожидания
            goto ForLoopEpilog;
        }

        // После первого await сохраняем результат и запускаем цикл 'for'
        m_resultType1 = awaiterType1.GetResult(); // Получение результата

        ForLoopPrologue:
          m_x = 0; // Инициализация цикла 'for'
          goto ForLoopBody; // Переход к телу цикла 'for'

        ForLoopEpilog:
          m_resultType2 = awaiterType2.GetResult();
          m_x++; // Увеличение x после каждой итерации

        // Переход к телу цикла 'for'
        ForLoopBody:
          if (m_x < 3)
          { // Условие цикла 'for'
            // Вызов Method2Async и получение объекта ожидания
            awaiterType2 = Method2Async().GetAwaiter();

            if (!awaiterType2.IsCompleted)
            {
              m_state = 1; // 'Method2Async' завершается асинхронно
              m_awaiterType2 = awaiterType2; // Сохранение объекта ожидания до возвращения

              // Приказываем вызвать MoveNext при завершении операции
              m_builder.AwaitUnsafeOnCompleted(ref awaiterType2, ref this);
              executeFinally = false; // Без логического выхода из блока 'try'
              return; // Поток возвращает управление вызывающей стороне
            }

            // 'Method2Async' завершается синхронно
            goto ForLoopEpilog; // Синхронное завершение, возврат
          }
      }
      catch (Exception)
      {
        Console.WriteLine("Catch");
      }
      finally
      {
        // Каждый раз, когда блок физически выходит из 'try', выполняется 'finally'.
        // Этот код должен выполняться только при логическом выходе из 'try'.
        if (executeFinally)
        {
          Console.WriteLine("Finally");
        }
      }
      result = "Done"; // То, что в конечном итоге должна вернуть асинхронная функция.
    } 
    catch (Exception exception)
    {
      // Необработанное исключение: задание конечного автомата  завершается с исключением.
      m_builder.SetException(exception);
      return;
    }

    // Исключения нет: задание конечного автомата завершается с результатом
    m_builder.SetResult(result);
  }
}

Каждый раз, когда в коде используется оператор await, компилятор берёт указанный операнд и пытается вызвать для него метод GetAwaiter(). Он может быть как экземплярным, так и методом расширения. Объект, возвращаемый при вызове этого метода, называется объектом ожидания (awaiter).

После того, как конечный автомат получает объект ожидания, он запрашивает его свойство IsCompleted. Если операция завершается синхронно, возвращается true и конечный автомат продолжает выполнение. Он вызывает метод GetResult()? который либо выдаёт исключение, либо возвращает результат. Конечный автомат продолжает выполнение для обработки результата.

Если операция завершается асинхронно, то возвращается false. В этом случае вызывается метод OnCompleted() объекта ожидания, передавая ему делегат метода MoveNext() конечного автомата. И теперь конечный автомат позволяет своему потоку вернуть управление в сходную точку, чтобы тот мог продолжить выполнение другого кода. В будущем объект ожидания узнает о своём завершении и вызывает делегата, что приводит к исполнению MoveNext(). По полям конечного автомата определяется способ перехода к правильной точке кода, что создаёт иллюзию продолжения выполнения метода с того места, с которого он был прерван. На этой стадии код вызывает GetResult() и продолжает выполнение для обработки результата.

Расширяемость асинхронных функций

Представление всех видов асинхронных операций одним типом Task позволяет реализовывать комбинаторы (Task.WhenAny() и Task.WhenAll()) и другие полезные операции.

Наряду с гибкостью асинхронные функции предоставляют ещё одну точку расширения: компилятор вызывает GetAwaiter() для операнда, использовавшегося с await. Таким образом, операнд не обязан быть Task, он может относиться к любому типу. содержащему GetAwaiter().

Асинхронные функции и обработчики событий

Для асинхронных функций возможно определение возвращаемого типа как void. Это особый случай, который поддерживается компилятором для реализации асинхронного обработчика события. В таком случае асинхронна функция превращается в конечный автомат, но не создаёт объект Task, из-за чего нельзя узнать о завершённости задания.

Метод Main() нельзя пометить ключевым словом async и внутри него нельзя использовать операторы await, так как в этом случае управление вернётся сразу же после выполнения первого оператора await, а поскольку метод Main() не асинхронный, то он не создаёт таску и нельзя будет отследить его завершённость. Для предотвращения подобного поведения компилятор выдаёт ошибку.

Асинхронные функции FCL

Асинхронные функции легко освоить они просты в использовании и поддерживаются многими типами FCL. Кроме того, они сразу видны в коде, так как чаще всего имя метода снабжается суффиксом Async.

До модели асинхронных функций (TAP, Task-based Asynchronous Pattern) существовали также и другие подходы:

  • EAP (Event-based Asynchronous Pattern) — подход основан на событиях, которые срабатывают по завершении операции и обычного метода, вызывающего эту операцию.
  • APM (Asynchronous Programming Model) — основан на 2 методах: BeginSmth() возвращает интерфейс IAsyncResult, метод EndSmth() принимает IAsyncResult (если к моменту вызова EndSmth() операция не завершена, поток блокируется).

В некоторых классах FCL можно встретить APM подход, так как не всё успели переписать под TAP. Если необходимо воспользоваться одним из таких классов, то его можно привести к новой модели с помощью метода Task.Factory.FromAsync(). Если же используется EAP, то в FCL нет вспомогательных методов-адаптеров, поэтому адаптировать придётся вручную.

Асинхронные функции и исключения

При использовании await с Task вместо AggregateException выдаётся первое внутреннее исключение, чтобы поведение кода соответствовало ожиданиям разработчиков.

Другие возможности асинхронных функций

В разделе рассказывается подробнее про отладку асинхронных функций.

С помощью Task.Run() можно запустить асинхронную функцию в потоке. отличном от вызывающего.

Кроме того, в разделе описывается использование асинхронных лямбда-выражений и способы обхода предупреждений компилятора в случае, если внутри асинхронного метода не используется оператор await.

Потоковые модели приложений

Консольные приложение и Windows-службы не навязывают никакой потоковой модели. Приложение с графическим интерфейсом предлагают такую модель, в которой обновлять окно может только создавший его поток, GUI-потоки обычно порождают асинхронные операции.

Приложения ASP.NET позволяют любому потоку делать всё, что угодно. Порождённая одним потоком пула асинхронная операция может заканчиваться другим потоком, который обрабатывает её результат. При этом при переходе между потоками должен сохраняться контекст.

При разработке библиотек классов стоит принимать во внимание класс SynchronizationContext, что позволит создать высокопроизводительный код. Кроме того, при разработке стоит сделать всё возможное, чтобы пользователи библиотеки избежали взаимной блокировки. Для решения этих проблем классы Task и Task<TResult> предоставляют метод ConfigureAwait(). Если передать в этот метод false, то оператор await не запрашивает контекст синхронизации вызывающего потока, а когда поток пула завершает выполнение задания, происходит простое завершение.

Асинхронная реализация сервера

В разделе описывается построение асинхронных серверов с хорошей масштабируемостью для:

  • Web Forms ASP.NET.
  • Асинхронных MVC-контроллеров ASP.NET.
  • Асинхронного обработчика ASP.NET.
  • Асинхронной службы WCF.

Отмена операций ввода-вывода

В общем случае не существует возможности отмены затянувшейся операции ввода-вывода, потому что отменить посланный на сервер запрос уже не удастся. Кроме того, может возникнуть ситуация гонки. Если флаг отмены придёт в тот момент, когда сервер пришлёт ответ, то как быть? В разделе Рихтер предлагает реализовать метод расширения, который бы завершал такой Task при отмене.

Некоторые операции ввода-вывода должны выполняться синхронно

В разделе описываются ситуации, для которых недоступны асинхронные операции и как их можно избежать.

Проблемы FileStream

При работе с FileStream стоит заранее выбрать синхронным или асинхронным будет ввод-вывод файлов и установить соответствующий флаг. При этом стоит использовать соответствующий (синхронный или асинхронный) метод.

Приоритеты запросов ввода-вывода

Windows позволяет указать приоритет потока при выполнении запросов ввода-вывода. В FCL данная функциональность не реализована, на данный момент преимуществами данного функционала можно воспользоваться при помощи механизма P/Invoking.