Подключаем очередь сообщений к Flask-приложению под управлением Gunicorn
Один из возможных подходов к построению микросервисных приложений состоит в организации общения сервисов между собой посредством шины сообщений. Такая архитектура называется “Event-driven architecture”. Её преймущество в слабой связности компонентов. В этой небольшой статье я делюсь методом добавления интеграции с шиной на основе RabbitMQ в приложение, написанное на Flask.
В проекте, над которым я сейчас работаю, есть необходимость использования отдельно стоящих компонентов для работы с видео-файлами. Их необходимо загружать из хранилища, обрабатывать и сохранять результаты в хранилище. Эта функция довольно обособлена и никак непосредственно не связана с основным приложением. Её удобно хранить отдельно, масштабировать отдельно, выделять ей дисковое пространство отдельно, а также поручить разработку отдельной команде.
У меня уже накопился определенный опыт работы в таких условиях и я решил снова объединить компоненты через шину сообщений. Основное приложение написано на Flask и запускается под управлением сервера Gunicorn. Flask в этом уравнении роли не играет. Любое WSGI приложение будет одинаково запускаться под Gunicorn.
Что происходит во время запуска? Как и во многих современных серверах, во время запуска приложение инициализируется в одном центральном master-процессе (происходит предзагрузка), а затем производится fork необходимого количества рабочих процессов. За счет того, что код приложения загружается в память до разветвления, эта память разделяется между процессами (а не копируется), что позволяет держать лишь одну копию программы и данных в памяти для всех рабочих процессов.
Для подключения к очереди и потребления событий нам понадобися небольшая утилита. До сих пор я сталкивался и использовал для этого Pika – имплементацию AMQP 0.9.1, написанную на Python. Но недавно наткнулся на очень интересный проект Kombu от создателей самой известной библиотеки для фоновых задач в мире Python – Celery. В отличие от Pika, Kombu предоставляет абстракции публикатора и потребителя поверх нескольких популярных реализаций очередей (Redis, RabbitMQ, Amazon SQS и другие). Она самостоятельно решает вопросы восстановления подключений, декларирования очередей и многое другое. Поэтому и ещё потому, что эта библиотека используется в большом и популярном проекте, я решил ее попробовать в своей работе.
Итак, есть веб-приложение под Gunicorn и очередь под RabbitMQ, к которой мы желаем подключиться с помощью Kombu. Как это сделать?
Есть несколько способов. Вот основные подходы:
-
Написать маленькое приложение-прицеп, которое мы бы разместили в виде вспомогательного процесса в том же контейнере, которое бы слушало очередь, брало бы поступившие сообщения и отправляло бы во Flask приложение по HTTP. С этим подходом есть нюанс. Docker (Containerd) устроены так, что в контейнерах приложений они следят только за главным процессом (PID 1), и если он падает, то перезапускают контейнер. Если умерает какой-либо еще процесс, контейнер перезапущен не будет. Надежность такого подхода сильно страдает.
-
Запустить потребителя (нескольких потребителей) очереди в рамках основного процесса или как под-процесс. Это подход мне нравится чуточку больше. Он лишен недостатка процесса-прицепа.
Для того, чтобы запустить нитки потребителей нужно выбрать хуки у Gunicorn, к которым можно подключиться. Я выбрал on_starting
для инициализации ниток и on_exit
для остановки (не забывайте закрывать соединения на выходе; это позволит RabbitMQ быстрей понять кто из потребителей еще доступен).
Файл конфигурации Gunicorn выглядит так.
# gunicorn.conf.py
from app.event_bus import start_consumers, stop_consumers
def on_starting(_):
"""
This hook is called right before Gunicorn master process is initialized.
We use this opportunity to start bus consumers.
"""
start_consumers()
def on_exit(_):
"""
This hook is called on application exit.
We use it to stop bus consumers.
"""
stop_consumers()
В app/event_bus.py
собрана функциональность работы потребителей очереди:
# app/event_bus.py
import logging
from threading import Thread
from kombu import Connection, Queue, Consumer
from kombu.mixins import ConsumerMixin
NUM_CONSUMERS = 3
consumers = []
queue = Queue('myapp')
connection = Connection('amqp://')
class C(Thread, ConsumerMixin):
"""
Consumer thread. Listens for messages in the queue and handles them.
"""
def __init__(self, n, connection, queue):
Thread.__init__(self)
self.n = n
self.connection = connection
self.queue = queue
def get_consumers(self, _, channel):
return [Consumer(channel, [self.queue], callbacks=[self.on_message])]
def on_message(self, body, message):
# TODO: parse JSON message, log receival and handle
logging.info("Message received by %d: %s", self.n, body)
message.ack()
def run(self) -> None:
ConsumerMixin.run(self)
def finish(self):
self.should_stop = True
def start_consumers():
"""Starts consumer threads."""
if len(consumers) > 0:
return
for i in range(NUM_CONSUMERS):
consumer = C(i, connection, queue)
consumer.start()
consumers.append(consumer)
def stop_consumers():
"""Stops consumer threads."""
for consumer in consumers:
consumer.finish()
for consumer in consumers:
consumer.join()
consumers.clear()
Альтернативный вариант мог бы стартовать под-процесс для потребителей очереди. Пока я остановился на этом и хочу побольше с ним поработать, чтобы понять хорош он или нет.
Еще стоит отметить, что если в app/event_bus.py
будут импортироваться и подгружаться другие модули приложения, то перестанет работать (такой удобный в разработке) hot reload. Изменения в коде не будут подтягиваться без перезагрузки всего приложения. Все дело в механизме предзагрузки.