CoderCastrov logo
CoderCastrov
Очередь

Наше решение для очередей

Наше решение для очередей
просмотров
7 мин чтение
#Очередь

В YipitData мы создали систему очередей, которая хорошо работает для парсинга веб-страниц. Когда мы начали разрабатывать эту систему в 2014 году, мы не смогли найти ничего на рынке, что было бы подходящим. Большинство решений, которые мы нашли, работали либо как FIFO, либо как LIFO, но нам требовалась большая гибкость.

Проблемы парсинга веб-страниц

Парсинг веб-страниц кажется простым, но нам пришлось быть креативными, чтобы создать масштабируемое и надежное решение.

Например, представьте, что вам нужно получить все ссылки с веб-страницы, а затем спарсить каждую страницу и сохранить некоторую информацию в базу данных. Вы можете написать код вроде этого:

for link in links:
    page = get_page(link)
    save_data(page)

Этот код работает в небольшом масштабе, и многие люди пишут парсеры именно таким образом. Однако, когда вы запускаете парсеры в большом масштабе, многое может пойти не так, и надежное решение должно учитывать проблемы, такие как:

  • изменение веб-сайта
  • внезапное завершение серверов
  • использование слишком большого объема памяти и сбои
  • возникновение исключений
  • ошибки, которые возникают только на определенной части данных

Если что-либо из перечисленного происходит с предыдущим кодом, вам придется начинать сначала. Если у вас есть миллион URL-адресов для парсинга, может быть слишком дорого и затратно по времени повторять всю работу, когда что-то идет не так. Предыдущее решение не подходит для нашего бизнеса.

Парсинг в YipitData

Наши парсеры состоят из нескольких задач. Каждая задача имеет короткое время жизни и может быть выполнена в отдельности, поэтому если что-то происходит с частью парсера, эту часть можно повторно выполнить независимо. Вот как мы бы написали предыдущее решение с использованием нашей платформы:

Самое большое отличие заключается в функции парсить_ссылку(), которая задерживает вызов функции парсить_листинг(). Этот пример идентичен обычной очереди FIFO, однако у нас есть возможность планировать выполнение задач в любое будущее время (с помощью параметра seconds). У нас есть отдельная очередь для каждой функции, помеченной декоратором @подзадача.

Важно, чтобы мы не выполняли лишние парсинги при сбое системы. Если что-то идет не так с функцией парсить_листинг(), мы не хотим добавлять одни и те же задачи в очередь, что означает, что наше решение для очереди должно иметь встроенную уникальность. Следующий код только перепланирует ссылку в очереди парсить_ссылку - он не добавляет дубликаты в очередь:

Наша система очередей не является FIFO или LIFO, она имеет встроенную уникальность, и элементы могут перемещаться. У нас есть гибкая система очередей с приоритетом и отложенной уникальностью.

Построен на Redis

Мы воспользовались богатыми структурами данных Redis, чтобы создать наше решение для очередей, в основном из-за сортированных множеств и последовательных свойств Redis (отсутствие одновременного чтения/записи). Каждая из наших очередей представляет собой сортированное множество в Redis, где оценка представляет запланированное время, а элемент - данные. Вся логика очередей находится на стороне клиента, но выполняется на стороне Redis - благодаря последовательным свойствам Redis нет необходимости в координации задач, что упрощает решение. Мы создали фреймворк для очередей на Python и возможно в будущем выпустим его в открытый доступ.

Например, следующий код означает, что мы хотим добавить элемент http://example.com в очередь scrape_link и он будет доступен через 0 секунд:

Предыдущий код практически эквивалентен ZADD scrape_link 0 "http://example.com" в Redis (мы делаем больше вещей за кулисами).

Redis Logo

Redis и Lua скрипты

Мы реализуем механизм доставки хотя бы один раз, что означает, что элемент может быть обработан более одного раза. Это необходимо, потому что если элемент извлекается из очереди, но никогда не подтверждается завершение задачи (задача занимает слишком много времени или завершается аварийно), он будет доступен в очереди в будущем для повторной попытки. Мы используем скрипты Lua для создания атомарных пользовательских команд, таких как команда для удаления элемента из очереди и перепланировки его в будущем (в случае, если задача никогда не завершается).

Сводка функций

Ниже приведены функции нашей системы:

  • Каждая очередь является очередью задач
  • Каждая очередь имеет элементы, упорядоченные по временной метке
  • При добавлении элемента в очередь, если он уже существует, его временная метка перезаписывается
  • Элементы в очереди уникальны (не дублируются)
  • Очереди могут сохранять элементы бесконечно (неограниченный срок хранения)
  • Поддержка очереди мертвых писем (не объясняется в этой статье)
  • Очереди имеют количество элементов
  • Очереди имеют количество просроченных элементов

Проблемы с Redis

Redis - это база данных в памяти, что означает, что все данные находятся в памяти в любой момент времени. Когда в системе есть миллионы элементов в нескольких очередях, это может занимать несколько гигабайт памяти. Память ограничена, дорога и мы не можем изменить размер памяти сервера на лету (сервер должен быть заменен на более крупный, и это ручная задача).

Мы используем Redis из-за его богатых структур данных и гибкости, а не из-за его скорости работы в памяти. Если однажды вам потребуется добавить в очередь вдвое больше элементов, это может означать, что вам нужно вдвое больше места в памяти. Это неприятно для нас, потому что иногда мы не можем предсказать эти требования и своевременно их настроить - мы видели, как очереди растут слишком быстро, и Redis исчерпывает память. Мы напишем отдельный пост в блоге, где подробнее расскажем об этих и других проблемах.

В общем, нам нужно решение, которое поддерживает отложенные уникальные приоритетные очереди, имеет практически бесконечное хранилище и низкие операционные затраты.

Readypipe

Мы создаем платформу для парсинга веб-сайтов под названием Readypipe. Мы являемся клиентами нашей собственной платформы, но также предлагаем ее внешним клиентам в качестве услуги. Ранее упомянутые проблемы были менее важными, когда мы были единственными клиентами, потому что мы знали, какие типы проектов поддерживают наши очереди. Однако, работая с внешними клиентами, мы не можем предсказать, сколько мощности очереди им понадобится, что делает эластичное решение с низкими операционными затратами необходимостью, а не просто опцией.

Решения, которые не подходят нам

AWS SQS

AWS SQS имеет несколько типов очередей и обеспечивает эластичное хранение, но ни один из них не удовлетворяет нашим требованиям:

  • Стандартные очереди и FIFO-очереди: Отсутствие уникальности элементов, отсутствие задержки сообщений, невозможность изменения порядка элементов
  • Очереди с задержкой: Максимальная задержка 15 минут, отсутствие уникальности элементов, невозможность изменения порядка элементов

Мы могли бы создать решение на основе SQS, но это потребовало бы нескольких хаков, таких как удаление элементов из очереди, а затем их повторное добавление, если они не готовы к выполнению - это создало бы некоторую сложность, потребовало бы согласования задач и затруднило бы инспекцию очереди (например, подсчет количества элементов, просроченных по времени).

RabbitMQ, ZeroMQ

Те же проблемы, что и у SQS.

Apache Kafka

Kafka - это отказоустойчивая платформа потоковой обработки. Хотя люди используют темы в качестве очередей, они являются неизменяемыми журналами фиксации. Kafka не подходит для нас из-за неизменяемости сообщений и отсутствия возможности установки пользовательского порядка элементов, но мы можем создать нашу систему очередей поверх него с использованием нескольких тем - задержки не будут гранулированными, и это потребует согласования задач на стороне клиента. Для нас Kafka имеет те же концептуальные проблемы, что и SQS.

Celery, RQ, Resque и другое программное обеспечение для очередей задач

Все эти решения построены поверх предыдущего программного обеспечения (например, Redis, SQS, RabbitMQ, Kafka), и они не реализуют необходимые настройки, которые нам нужны поверх этих решений.

Наш последний эксперимент и будущая работа

Redis хорошо справляется с нашим решением, и миграция на другую систему связана с большими рисками; у нас есть сотни проектов, построенных на основе текущего поведения, и изменение любого поведения текущей системы может привести к непредвиденным проблемам в наших проектах (трудно протестировать разные решения для каждого проекта). Основные проблемы, с которыми мы сталкиваемся с Redis, это ограниченное пространство хранения и затраты на сервер. Было бы замечательно, если бы мы могли иметь поведение Redis через тот же API, но хранить данные по-другому. Мы не смогли найти проект, который бы полностью удовлетворял нашим требованиям во время наших исследований в 2018 году, и мы начали проект, чтобы заполнить этот пробел.

Мы начали DRedis, клон Redis с открытым исходным кодом, основанный на диске. Он реализует тот же сетевой протокол и те же команды, что и Redis, но хранит данные на файловой системе. Использование файловой системы имеет свои проблемы, и мы расскажем все подробности нашего исследования и реализации в отдельной публикации. Мы используем его в нескольких проектах в производстве и постоянно вносим улучшения. Проект доступен на Github под лицензией MIT.

Благодарности

Благодарим Мингвэя Гу и Джима Шилдса за их тщательные обзоры.