Масштабирование вывода ML на Databricks: гибкое или секционированное? Соленый или нет? | На пути к науке о данных

Масштабирование вывода ML на Databricks: гибкое или секционированное? Соленый или нет? | На пути к науке о данных


Введение

Непрерывная переменная для четырех разных продуктов. Конвейер машинного обучения был построен в Databricks и состоит из двух основных компонентов.

  1. Подготовка функций в SQL с бессерверными вычислениями.
  2. Ансамблевая оценка нескольких сотен моделей с использованием кластеров заданий для управления вычислительной мощностью.

При первой попытке 420-ядерный кластер потратил около 10 часов на обработку всего 18 разделов.

цель Настройте потоки данных, чтобы максимизировать использование кластера и обеспечить масштабируемость. Оценка выполняется на четырех наборах моделей машинного обучения, по одному на каждый продукт. Однако мы сосредоточимся на Как сохраняются данные как получится Насколько равенством мы можем воспользоваться? гадать. Мы не будем концентрироваться на внутренней работе самой оценки.

Если файловых разделов очень мало, кластеру потребуется много времени для сканирования больших файлов, и в этот момент, если не будет выполнено перераспределение (это означает дополнительную задержку в сети и перетасовку данных), вы можете предполагать еще больший набор строк в каждом разделе. Это приводит к увеличению рабочего времени.

Масштабирование вывода ML на Databricks: гибкое или секционированное? Соленый или нет? | На пути к науке о данных
Рисунок 1. Не бойтесь добавить немного соли к своим данным, если вам нужно. Фото Фаран Рафи на Unsplash

Однако у предприятий ограниченное терпение в отношении реализации конвейеров машинного обучения, которые оказывают прямое влияние на организацию. Поэтому испытания ограничены.

В этой статье мы рассмотрим наш сценарий с объектными данными, затем предоставим обзор вывода ML, а также представим результаты и обсуждение производительности вывода на основе четырех сценариев обработки набора данных:

  1. Секционированная таблица, без соли, без ограничения количества строк в разделе (несоленый и разделенный)
  2. Секционированная таблица, соленая, с ограничением строки в 1 М. (соленый и разделенный)
  3. Таблица с жидкостной кластеризацией, без соли, без ограничения количества строк в разделе (без соли и жидкости)
  4. Таблица с жидкими кластерами, соленая, с ограничением в 1 млн строк. (соленый и жидкий)

ландшафт данных

Набор данных содержит функции, которые набор моделей ML использует для вывода. Он содержит около 550 миллионов строк и содержит четыре продукта, указанные в атрибуте. ProductLine: :

  • Продукт А: ~ 10,45 млн (1,9%)
  • Продукт Б: ~4,4 млн (0,8%)
  • Продукт С: ~100 млн (17,6%)
  • Продукт Д: ~354 млн (79,7%)

Затем у него есть еще одна особенность низкой мощности. attrB, Он содержит только два различных значения и используется в качестве фильтра для извлечения подмножеств набора данных для каждой части системы ML.

более того, RunDate Регистрирует дату создания объектов. Это всего лишь приложения. Наконец, набор данных считывается с помощью следующего запроса:

SELECT
  Id,
  ProductLine,
  AttrB,
  AttrC,
  RunDate,
  {model_features}
FROM
  catalog.schema.FeatureStore
WHERE
  ProductLine = :product AND
  AttrB = :attributeB AND
  RunDate = :RunDate

реализация соли

Здесь соление генерируется динамически. Его цель — распределить данные по объёму. Это означает, что более крупные продукты получают больше ведер, а более мелкие продукты получают меньше ведер. Например, учитывая пропорции в сценарии данных, продукт D должен получить примерно 80% корзин.

Мы делаем это для того, чтобы лучше оценить время выполнения и максимизировать использование кластера.

# Calculate percentage of each (ProductLine, AttrB) based on row counts
brand_cat_counts = df_demand_price_grid_load.groupBy(
   "ProductLine", "AttrB"
).count()
total_count = df_demand_price_grid_load.count()
brand_cat_percents = brand_cat_counts.withColumn(
   "percent", F.col("count") / F.lit(total_count)
)

# Collect percentages as dicts with string keys (this will later determine
# the number of salt buckets each product receives
brand_cat_percent_dict = {
   f"{row['ProductLine']}|{row['AttrB']}": row['percent']
   for row in brand_cat_percents.collect()
}

# Collect counts as dicts with string keys (this will help
# to add an additional bucket if counts is not divisible by the number of 
# buckets for the product
brand_cat_count_dict = {
   f"{row['ProductLine']}|{row['AttrB']}": row['count']
   for row in brand_cat_percents.collect()
}

# Helper to flatten key-value pairs for create_map
def dict_to_map_expr(d):
   expr = []
   for k, v in d.items():
       expr.append(F.lit(k))
       expr.append(F.lit(v))
   return expr

percent_case = F.create_map(*dict_to_map_expr(brand_cat_percent_dict))
count_case = F.create_map(*dict_to_map_expr(brand_cat_count_dict))

# Add string key column in pyspark
df_demand_price_grid_load = df_demand_price_grid_load.withColumn(
   "product_cat_key",
   F.concat_ws("|", F.col("ProductLine"), F.col("AttrB"))
)

df_demand_price_grid_load = df_demand_price_grid_load.withColumn(
   "percent", percent_case.getItem(F.col("product_cat_key"))
).withColumn(
   "product_count", count_case.getItem(F.col("product_cat_key"))
)

# Set min/max buckets
min_buckets = 10
max_buckets = 1160

# Calculate buckets per row based on (BrandName, price_delta_cat) percentage
df_demand_price_grid_load = df_demand_price_grid_load.withColumn(
   "buckets_base",
   (F.lit(min_buckets) + (F.col("percent") * (max_buckets - min_buckets))).cast("int")
)

# Add an extra bucket if brand_count is not divisible by buckets_base
df_demand_price_grid_load = df_demand_price_grid_load.withColumn(
   "buckets",
   F.when(
       (F.col("product_count") % F.col("buckets_base")) != 0,
       F.col("buckets_base") + 1
   ).otherwise(F.col("buckets_base"))
)

# Generate salt per row based on (ProductLine, AttrB) bucket count
df_demand_price_grid_load = df_demand_price_grid_load.withColumn(
   "salt",
   (F.rand(seed=42) * F.col("buckets")).cast("int")
)

# Perform the repartition using the core attributes and the salt column
df_demand_price_grid_load = df_demand_price_grid_load.repartition(
   1200, "AttrB", "ProductLine", "salt"
).drop("product_cat_key", "percent", "brand_count", "buckets_base", "buckets", "salt")

Наконец, мы сохраняем наш набор данных в таблицу объектов и добавляем максимальное количество строк на раздел. Это сделано для того, чтобы Spark не создавал разделы со слишком большим количеством строк, что он может сделать, даже если мы уже вычислили соль.

Почему мы реализуем 1 миллион строк? Основное внимание уделяется времени оценки модели, а не размеру файла. После нескольких тестов с 1М, 1,5М, 2М первый тест дает в нашем случае лучшую производительность. Опять же, у этого проекта большие бюджетные и временные ограничения, поэтому мы должны максимально использовать наши ресурсы.

df_demand_price_grid_load.write\
   .mode("overwrite")\
   .option("replaceWhere", f"RunDate = '{params['RunDate']}'")\
   .option("maxRecordsPerFile", 1_000_000) \
   .partitionBy("RunDate", "price_delta_cat", "BrandName") \
   .saveAsTable(f"{params['catalog_revauto']}.{params['schema_revenueautomation']}.demand_features_price_grid")

Почему бы просто не положиться на адаптивное выполнение запросов (AQE) Spark?

Помните, что основное внимание уделяется времени оценки, а не таким измерениям, как размер файла для обычных запросов Spark SQL. На самом деле нашей первой попыткой было использование только AQE. Как вы увидите по результатам, время выполнения было очень нежелательным и не позволило максимизировать загрузку кластера с учетом нашего соотношения данных.

вывод машинного обучения

Существует конвейер с 4 задачами, по одной на продукт. Каждая задача выполняет следующие общие шаги:

  • Загружает функции из соответствующего продукта
  • Загружает подмножество модели машинного обучения для соответствующего продукта.
  • Оценки на половину подмножеств, усеченных на AttrB
  • Оценки на вторую половину сокращены на AttrB
  • Сохраняет данные в таблицу результатов

Мы сосредоточимся на одном из этапов оценки, чтобы не перегружать эту статью цифрами, хотя второй этап очень похож по структуре и результатам. Кроме того, вы можете увидеть DAG для оценки на рисунке 2.

Рисунок 2. DAG для искровой стадии оценки ML. Собственное авторство.

Это кажется довольно простым, но время выполнения может варьироваться в зависимости от того, как вы сохраняете данные и размера вашего кластера.

конфигурация кластера

На этапе оценки, который мы анализируем, существует один кластер для каждого продукта, который также адаптирован к ограничениям инфраструктуры проекта и распределению данных:

  • Продукт А: 35 сотрудников (Standard_DS14v2, 420 ядер)
  • Продукт Б: 5 рабочих (Standard_DS14v2, 70 ядер)
  • Продукт C: 1 рабочий (Standard_DS14v2, 14 ядер)
  • Продукт D: 1 рабочий (Standard_DS14v2, 14 ядер)

Кроме того, AdaptiveQueryExecution включен по умолчанию, что позволит Spark решить, как лучше всего сохранить данные с учетом предоставленного вами контекста.

Результаты и обсуждение

Вы увидите количество разделов на файл продукта и среднее количество строк на раздел для каждого сценария, что даст вам представление о том, сколько строк система ML будет прогнозировать для каждого задания Spark. Кроме того, мы представляем метрики пользовательского интерфейса Spark для наблюдения за производительностью во время выполнения и визуализации распределения данных во время вывода. Мы будем выполнять часть пользовательского интерфейса Spark только для продукта D, который является самым крупным, чтобы не включать много информации. Более того, в зависимости от сценария оценка продукта D становится узким местом во время выполнения. Вторая причина заключалась в том, что именно на этом были сосредоточены основные результаты.

несоленый и Разделение

На рисунке 3 вы можете видеть, что средний раздел файла содержит миллионы строк, а это означает, что времени выполнения достаточно для одного исполнителя. Продукт C в среднем является самым большим: в одном разделе содержится более 45 миллионов строк. Самый маленький продукт — B со средним числом строк около 12 миллионов.

Рисунок 3. Среднее количество строк в разделе в сравнении с продуктами.

Рисунок 4. Покажите количество делений на товар, всего 26 на все. При рассмотрении продукта D у нас очень мало 18 разделов из 420 доступных ядер, и в среднем каждый раздел будет содержать около 40 миллионов строк.

Рисунок 4. Общее количество файловых разделов на продукт

Взгляните на рисунок 5. В общей сложности кластер проработал 9,9 часов и он все еще не был завершен, потому что нам нужно было закончить работу, потому что она становилась дорогой и блокировала чужие тесты.

Рисунок 5. Краткое описание этапа оценки разделенного набора данных без содержания соли для продукта D.

Из сводной статистики готовых работ на рисунке 6 мы видим, что разделение по Продукту D было сильно искажено. Максимальный размер ввода составлял ~ 56 МБ, а время выполнения — 7,8 часа.

Рисунок 6. Сводная статистика для оценки исполнителей на секционированных и несекционированных наборах данных.

несоленый и жидкий

В этом сценарии мы можем увидеть очень похожие результаты с точки зрения среднего количества строк на раздел файла и количества разделов на продукт, как показано на рисунках 7 и 8 соответственно.

Рисунок 7. Среднее количество строк в разделе в сравнении с продуктами

Продукт D имеет 19 файловых разделов, что по-прежнему намного меньше 420 ядер.

Рисунок 8. Общее количество файловых разделов на продукт

Мы уже могли догадаться, что этот эксперимент будет очень дорогим, поэтому я решил пропустить проверку гипотез для этого сценария. Тогда в идеальной ситуации мы продвигаем его вперед, но на моей доске остается очередь билетов.

соленый и Разделение

После применения процесса объединения и перераспределения мы получаем в среднем ~2,5 млн записей на раздел для продуктов A и B и ~1 млн для продуктов C и D, как показано на рисунке 9.

Рисунок 9. Среднее количество строк в разделе в сравнении с продуктами

Кроме того, на рисунке 10 мы видим, что количество разделов файла для продукта D увеличилось примерно до 860, что дает 430 на каждый шаг вывода.

Рисунок 10. Общее количество файловых разделов на продукт

В результате для оценки продукта D с 360 задачами потребуется 3 часа, как показано на рисунке 11.

Рисунок 11. Краткое изложение шагов вывода для разделенных и соленых наборов данных

Если посмотреть на сводную статистику на рис. 12, распределение выглядит сбалансированным: время выполнения около 1,7, но максимальная задача занимает 3 часа, что стоит изучить подробнее в будущем.

Рисунок 12. Сводная статистика для оценки исполнителей на разделенных и соленых наборах данных.

Большим преимуществом является то, что Salt распределяет данные по соотношению продуктов. Если бы у нас было больше ресурсов, мы могли бы увеличить количество разделов в случайном порядке. repartition() И добавьте работников в соответствии с пропорцией данных. Это гарантирует предсказуемое масштабирование нашего процесса.

соленый и жидкий

Этот сценарий сочетает в себе два самых сильных рычага, которые мы обнаружили на данный момент:

Соление для управления размером файлов и параллелизмом, а также гибкая кластеризация для организации связанных данных без строгих границ разделов.

После применения той же стратегии «соления» и ограничения количества строк в 1 миллион на секцию таблица жидких кластеров показывает аналогичный средний размер раздела в случае с «солом» и «разделением», как показано на рисунке 13. Продукты C и D остаются близкими к целевому значению — 1 миллион строк, в то время как продукты A и B находятся немного выше этого диапазона.

Рисунок 13. Среднее количество строк в разделе в сравнении с продуктами

Однако основное различие заключается в том, как эти разделы распределяются и используются Spark. Как показано на рисунке 14, продукт D снова достигает большого количества файловых разделов, что обеспечивает достаточный параллелизм для насыщения доступных ядер во время вывода.

Рисунок 14. Общее количество файловых разделов на продукт.

В отличие от своего секционированного аналога, жидкая кластеризация позволяет Spark со временем адаптировать макет файла, сохраняя при этом преимущества соли. Это приводит к более равномерному распределению работы между исполнителями с меньшим количеством резких выбросов как в размере входных данных, так и в продолжительности задачи.

Из сводной статистики на рисунке 15 мы видим, что большинство задач выполняются в сжатые сроки выполнения, а максимальная продолжительность задачи ниже, чем в сценарии с солью и секционированием. Это указывает на меньшую асимметрию и лучшую балансировку нагрузки в кластере.

Рисунок 15. Краткое изложение шагов вывода для наборов данных с кластерами жидкостей и соленой воды
Рисунок 16. Сводная статистика для оценки исполнителей на жидких кластеризованных и соленых наборах данных.

Важным побочным эффектом является то, что гибкая кластеризация сохраняет локальность данных для отфильтрованных столбцов, не налагая строгих границ разделов. Это позволяет Spark по-прежнему получать выгоду от пропуска данных, в то время как соль гарантирует, что ни один исполнитель не будет перегружен миллионами строк.

общий, соленый и жидкий представляет собой наиболее надежную настройку: она максимизирует параллелизм, минимизирует асимметрию и минимизирует операционный риск при увеличении рабочей нагрузки по оценке или изменении конфигурации кластера.

ключевые выводы

  • Масштабируемость вывода часто ограничивается расположением данных, а не сложностью модели. Неправильный размер файловых разделов может привести к тому, что сотни ядер будут простаивать, в то время как некоторые исполнители будут обрабатывать миллионы строк.
  • Одного разделения недостаточно для оценки массы. Без контроля размера файла многораздельные таблицы все равно могут создавать массивные разделы, что приводит к длительным и нестабильным операциям.
  • Соление — эффективный инструмент для раскрытия общности. Введение соляного ключа и ограничение количества строк для каждого раздела значительно увеличивает количество выполняемых задач и стабилизирует время выполнения.
  • Жидкостная кластеризация обеспечивает соленость за счет уменьшения асимметрии без жестких границ. Это позволяет Spark со временем адаптировать макет файла, делая систему более гибкой по мере роста данных.

Leave a Reply

Your email address will not be published. Required fields are marked *