Один из возможных подходов к построению микросервисных приложений состоит в организации общения сервисов между собой посредством шины сообщений. Такая архитектура называется “Event-driven architecture”. Её преймущество в слабой связности компонентов. В этой небольшой статье я делюсь методом добавления интеграции с шиной на основе RabbitMQ в приложение, написанное на Flask.

В проекте, над которым я сейчас работаю, есть необходимость использования отдельно стоящих компонентов для работы с видео-файлами. Их необходимо загружать из хранилища, обрабатывать и сохранять результаты в хранилище. Эта функция довольно обособлена и никак непосредственно не связана с основным приложением. Её удобно хранить отдельно, масштабировать отдельно, выделять ей дисковое пространство отдельно, а также поручить разработку отдельной команде.

У меня уже накопился определенный опыт работы в таких условиях и я решил снова объединить компоненты через шину сообщений. Основное приложение написано на Flask и запускается под управлением сервера Gunicorn. Flask в этом уравнении роли не играет. Любое WSGI приложение будет одинаково запускаться под Gunicorn.

Что происходит во время запуска? Как и во многих современных серверах, во время запуска приложение инициализируется в одном центральном master-процессе (происходит предзагрузка), а затем производится fork необходимого количества рабочих процессов. За счет того, что код приложения загружается в память до разветвления, эта память разделяется между процессами (а не копируется), что позволяет держать лишь одну копию программы и данных в памяти для всех рабочих процессов.

Для подключения к очереди и потребления событий нам понадобися небольшая утилита. До сих пор я сталкивался и использовал для этого Pika – имплементацию AMQP 0.9.1, написанную на Python. Но недавно наткнулся на очень интересный проект Kombu от создателей самой известной библиотеки для фоновых задач в мире Python – Celery. В отличие от Pika, Kombu предоставляет абстракции публикатора и потребителя поверх нескольких популярных реализаций очередей (Redis, RabbitMQ, Amazon SQS и другие). Она самостоятельно решает вопросы восстановления подключений, декларирования очередей и многое другое. Поэтому и ещё потому, что эта библиотека используется в большом и популярном проекте, я решил ее попробовать в своей работе.

Итак, есть веб-приложение под Gunicorn и очередь под RabbitMQ, к которой мы желаем подключиться с помощью Kombu. Как это сделать?

Есть несколько способов. Вот основные подходы:

  • Написать маленькое приложение-прицеп, которое мы бы разместили в виде вспомогательного процесса в том же контейнере, которое бы слушало очередь, брало бы поступившие сообщения и отправляло бы во Flask приложение по HTTP. С этим подходом есть нюанс. Docker (Containerd) устроены так, что в контейнерах приложений они следят только за главным процессом (PID 1), и если он падает, то перезапускают контейнер. Если умерает какой-либо еще процесс, контейнер перезапущен не будет. Надежность такого подхода сильно страдает.

  • Запустить потребителя (нескольких потребителей) очереди в рамках основного процесса или как под-процесс. Это подход мне нравится чуточку больше. Он лишен недостатка процесса-прицепа.

Для того, чтобы запустить нитки потребителей нужно выбрать хуки у Gunicorn, к которым можно подключиться. Я выбрал on_starting для инициализации ниток и on_exit для остановки (не забывайте закрывать соединения на выходе; это позволит RabbitMQ быстрей понять кто из потребителей еще доступен).

Файл конфигурации Gunicorn выглядит так.

# gunicorn.conf.py

from app.event_bus import start_consumers, stop_consumers


def on_starting(_):
    """
    This hook is called right before Gunicorn master process is initialized.
    We use this opportunity to start bus consumers.
    """
    start_consumers()


def on_exit(_):
    """
    This hook is called on application exit.
    We use it to stop bus consumers.
    """
    stop_consumers()

В app/event_bus.py собрана функциональность работы потребителей очереди:

# app/event_bus.py

import logging
from threading import Thread

from kombu import Connection, Queue, Consumer
from kombu.mixins import ConsumerMixin

NUM_CONSUMERS = 3
consumers = []

queue = Queue('myapp')
connection = Connection('amqp://')


class C(Thread, ConsumerMixin):
    """
    Consumer thread. Listens for messages in the queue and handles them.
    """

    def __init__(self, n, connection, queue):
        Thread.__init__(self)
        self.n = n
        self.connection = connection
        self.queue = queue

    def get_consumers(self, _, channel):
        return [Consumer(channel, [self.queue], callbacks=[self.on_message])]

    def on_message(self, body, message):
        # TODO: parse JSON message, log receival and handle

        logging.info("Message received by %d: %s", self.n, body)

        message.ack()

    def run(self) -> None:
        ConsumerMixin.run(self)

    def finish(self):
        self.should_stop = True


def start_consumers():
    """Starts consumer threads."""
    if len(consumers) > 0:
        return

    for i in range(NUM_CONSUMERS):
        consumer = C(i, connection, queue)
        consumer.start()

        consumers.append(consumer)


def stop_consumers():
    """Stops consumer threads."""
    for consumer in consumers:
        consumer.finish()

    for consumer in consumers:
        consumer.join()

    consumers.clear()

Альтернативный вариант мог бы стартовать под-процесс для потребителей очереди. Пока я остановился на этом и хочу побольше с ним поработать, чтобы понять хорош он или нет.

Еще стоит отметить, что если в app/event_bus.py будут импортироваться и подгружаться другие модули приложения, то перестанет работать (такой удобный в разработке) hot reload. Изменения в коде не будут подтягиваться без перезагрузки всего приложения. Все дело в механизме предзагрузки.