Узнайте, как легко распараллеливать задачи в Python для повышения производительности.

Celery — это фреймворк асинхронной очереди задач, написанный на Python. Celery упрощает выполнение фоновых задач, а также предоставляет инструменты для параллельного выполнения и координации задач.

В прошлом посте мы коснулись основных принципов фреймворка Celery для Python. Вы можете проверить последний пост здесь.

В этом посте мы обсудим способ организации рабочих процессов в Celery и легкое параллельное выполнение задач.

Когда задействовано много фоновых задач, может быть сложно отслеживать и координировать их выполнение. Если задачи независимы и порядок выполнения не важен, их можно выполнять параллельно без каких-либо последствий. Celery предоставляет возможность как разработать рабочий процесс для координации, так и выполнять задачи параллельно. Излишне говорить, что параллельное выполнение обеспечивает значительный прирост производительности и должно быть реализовано, когда это возможно.

В этом посте мы рассмотрим следующие темы:

  • Получение результатов фоновых задач
  • Получение доступа к NewsAPI
  • Согласование задач с подписями
  • Код
  • Ориентиры
  • Заключение

К концу этого поста вы узнаете, как сделать практически любой код Python быстрее, а также как структурировать задачи в процессе.

Давайте приступим!

Получение результатов фоновых задач

Фоновую задачу можно определить следующим образом:

@celery.task()
def add(num1, num2):
    return num1 + num2

Фоновая задача может быть выполнена со следующим синтаксисом:

add.delay(4, 4)

Чтобы получить результат задачи, нам нужно использовать серверную часть, которая позволяет Celery сохранять результаты.
Доступно много вариантов, но для этого поста мы будем использовать Redis.
Обратите внимание: Redis не поддерживается в Windows.

Redis — это хранилище с открытым исходным кодом (под лицензией BSD), хранилище структур данных в памяти, используемое в качестве базы данных, кэша и брокера сообщений, согласно redis.

Хранилище данных в памяти означает, что операции чтения и записи в Redis выполняются особенно быстро, что делает его подходящим для задач, требующих частых операций чтения и записи.

Руководство по Linux по установке Redis.

Для MacOS вы можете установить Redis с помощью homebrew.
Откройте терминал и выполните:

brew install redis

Затем запустите службу с помощью:

brew services start redis

Для проверки работы службы вы можете запустить:

brew services list

Экземпляр Celery теперь можно определить следующим образом:

from celery import Celery
celery = Celery(
        'calc',
        backend='redis://localhost',
        broker='pyamqp://guest@localhost//'
    )

Результат фоновой задачи можно легко получить с помощью get() следующим образом:

res = add.delay(4, 4)
res.get()
>> 8

Получение доступа к NewsAPI

Для целей руководства нам нужно получить данные из вызова API.
Мы будем получать данные из NewsAPI. Зарегистрироваться для получения ключа API можно здесь. После того, как вы зарегистрируетесь, вы можете просмотреть документацию по вызову API топовых заголовков здесь. Мы будем использовать главную конечную точку заголовков и называть ее кодовым названием одной страны за раз.

Предположим, нам нужно получить главные заголовки из пяти стран. Вместо пяти последовательных вызовов было бы гораздо быстрее ускорить процесс, распараллелив вызовы. Но сначала нам нужно взглянуть на signatures в Celery и способ согласования результатов с функцией chord.

Координация задач в Celery

Подпись упаковывает аргументы, аргументы ключевых слов и параметры выполнения одного вызова задачи таким образом, что ее можно передать функциям или даже сериализовать и отправить по сети. из Сельдерей

По сути, сигнатуры позволяют нам создавать и передавать ссылки на объекты задачи Celery.
Это означает, что мы можем сделать что-то вроде:

task = add.s(2, 2)

Затем мы можем просто вызвать функцию с delay() и вызвать для нее get(), как и раньше:

res = task.delay()
res.get()
>> 8

Теперь вам может быть интересно, зачем нам вообще нужны подписи. Подписи позволяют нам объединять команды и использовать функции рабочего процесса, предоставляемые Celery. Предположим, мы хотим вызвать add() несколько раз параллельно и сохранить результаты. Как бы вы это сделали? Без подписи невозможно сохранить ссылку на объект задачи Celery.

Функция group в Celery принимает подписи в качестве аргументов и выполняет их параллельно, сохраняя результаты в списке. Если мы хотим выполнять add() вызовов параллельно, мы можем сделать это с group() следующим образом:

g = group(add.s(1, 1), add.s(3, 3))
res = g()
res.get()
>> [2, 6]

Еще одна полезная функция — chord(). Это похоже на команду group(), но позволяет нам определить обратный вызов. Таким образом, вместо возврата со списком результатов функция обратного вызова вызывается со списком результатов в качестве аргумента. Мы сами определяем функцию обратного вызова. Функция обратного вызова вызывается, когда все задачи выполнены. Это означает, что нам не нужно отслеживать, какая задача все еще выполняется и все ли завершены или нет.

Функция chord() очень полезна в тех случаях, когда необходимо манипулировать результатами после их получения. Например, если результаты предыдущих задач необходимо объединить для создания отчета в формате PDF. Или если данные, полученные из нескольких вызовов API, необходимо объединить.

Теперь мы будем параллельно получать главные заголовки из NewsAPI и агрегировать результаты вызовов.

Давайте кодировать!

Если вам интересно, вы можете ознакомиться с полным списком функций рабочего процесса, предоставляемых Celery, здесь.

Определите приложение Celery

Убедитесь, что вы заменили <API_KEY> ключом API в своей учетной записи newsAPI.

Мы определяем приложение Celery, а затем определяем две задачи Celery с нотацией @. В getNewsData мы выполняем простой запрос GET, чтобы получить главные заголовки из страны, указанной в качестве аргумента. Это функция, которая будет распараллелена.

Мы хотим получить результаты всех вызовов и объединить их. Для этого мы определяем genReport как функцию обратного вызова. В genReport мы просто конвертируем объекты JSON в кадры данных Pandas, чтобы их можно было легко объединить.

Как только они объединены, мы сбрасываем индекс, чтобы избежать ошибок приведения JSON, а затем возвращаем объединенный dataframe как JSON. Тип возвращаемого значения функций задачи Celery должен быть сериализуемым в формате JSON, поэтому мы возвращаемся к JSON.

Определить API

Не забудьте заменить <API_KEY> своим ключом API.

Мы получаем список стран, из которых мы хотим получать новости, а затем создаем список сигнатур задач. Каждая функция подписи является ссылкой на задачу getNewsData со страной, указанной в качестве аргумента. Список сигнатурных функций передается в качестве аргумента функции chord. Наконец, функция genReport передается как функция обратного вызова.

Функция chord будет выполнять задачи параллельно, а затем вызывать genReport со списком результатов.

Тест и тесты

Большой! Теперь давайте протестируем код и убедимся, что все в порядке.

Убедитесь, что служба Redis запущена. Откройте окно терминала в корне проекта, а затем запустите приложение Flask с помощью:

python3 newsapi.py

Затем откройте новое окно терминала и запустите приложение Celery с помощью:

celery -A celerynews worker --loglevel=INFO

Все должно работать так:

Давайте вызовем API с помощью Postman и посмотрим, работает ли он!
Мы кодируем данные как объект JSON, передавая список стран, указав каждую страну, используя ее код страны. Убедитесь, что вы выбрали POST в качестве типа запроса и вставьте правильный URL-адрес: http://0.0.0.0:5000/api/send

Нажмите «Отправить», и вы должны увидеть ответ через секунду или две.

Я подумал, что было бы интересно посмотреть, насколько быстрее это распараллеливание на самом деле. В файле кода под названием «Медленная версия» есть некоторый закомментированный код. Вы можете разместить код в разделе «Быстрая версия» в комментариях и закомментировать код в разделе «Медленная версия». Сохраните файл после комментирования, и Flask автоматически перезапустит веб-сервер.

Теперь мы можем продублировать вкладку в Postman, которую мы использовали ранее, и снова нажать «Отправить».

Вы заметите, что для получения ответа требуется немного больше времени. Мы можем проверить правый верхний угол окна ответа в Postman, чтобы проверить, сколько времени занял ответ. Если вы сравните это с быстрой версией, вы заметите, что она примерно в 1,5 раза быстрее.

В моем случае распараллеливание с помощью функции chord привело к тому, что код был более чем в 2,5 раза быстрее, чем если бы я ее не использовал. Это довольно значительный прирост производительности, учитывая, что задействовано всего 5 вызовов API. При большем количестве вызовов прирост производительности будет еще больше, поскольку время отклика будет расти линейно, если вызовы выполняются последовательно.

Больше, чем просто вызовы API

Надеюсь, вам понравилось содержание этой статьи!

Мы увидели, как получить результаты фоновых задач, используя Redis в качестве внутреннего хранилища для Celery. Кроме того, мы увидели использование подписей в Celery и то, как координировать и распараллеливать задачи с помощью встроенных функций Celery, таких как chord и group.

Тема очень интересная и полезная. Я считаю, что это заслуживает большего, чем просто один пост. Итак, прежде чем уйти, я хотел бы коснуться еще нескольких вариантов использования.

Я использовал вызовы API в этом руководстве из-за простоты демонстрации. Однако этот метод может быть использован для множества различных и разнообразных случаев. Пока отдельные задачи неупорядочены, то есть — если порядок выполнения не имеет значения, их можно распараллелить.

На ум приходят следующие три случая:

  • Задачи машинного обучения — обучение нескольких разных моделей на одном и том же наборе данных.
  • Задачи парсинга веб-страниц.
  • Скрипты автоматизации

В первом случае разные модели можно обучать независимо друг от друга. Затем можно рассчитать баллы проверки на основе накопленных результатов. Затем модель, которая дает наилучший результат проверки, можно сохранить для использования в будущем. В этом случае функция chord идеально подходит для использования.

Во втором случае несколько веб-сайтов могут быть очищены независимо друг от друга, а результаты сохранены. Если результаты нужно просто сохранить без дальнейших манипуляций, мы можем использовать что-то вроде функции group.

Наконец, различные функции, доступные в Celery, можно использовать в сценариях автоматизации. Сценарии автоматизации часто требуют множества повторяющихся задач, которые необходимо выполнять и координировать. Использование встроенных функций рабочего процесса Celery для написания скриптов привело бы к более чистому и удобному в сопровождении коду.

Это все для этого поста. Надеюсь, вы чему-то научились.
До следующего раза!

Первоначально опубликовано на https://haseebkamal.com.