PySpark для пользователей Panda | На пути к науке о данных

PySpark для пользователей Panda | На пути к науке о данных


Реальная проблема при работе с очень большими наборами данных. Под «очень большими» я подразумеваю данные, превышающие емкость оперативной памяти машины.

Некоторые из основных проблем, с которыми сталкиваются пользователи Panda, включают:

Барьеры в памяти

Pandas необходимо хранить весь обрабатываемый набор данных в оперативной памяти (ОЗУ) компьютера. Он не может легко обрабатывать данные, хранящиеся на жестком диске, пока он не будет загружен в первый раз, и если эти данные слишком велики для вашей памяти, вы столкнетесь с проблемами.

Например, если вы попытаетесь загрузить CSV-файл размером 100 ГБ в Pandas на стандартном ноутбуке с 16 ГБ ОЗУ, код немедленно выйдет из строя.

И это не просто соотношение 1:1. Из-за типа данных и накладных расходов на объекты для pandas обычно требуется объем оперативной памяти, кратный размеру файла на диске. При 16 ГБ ОЗУ максимальный размер файла может составлять всего 3–4 ГБ.

однопоточное выполнение

Pandas был разработан для удобства и анализа, а не для простого масштабирования производительности. По умолчанию pandas выполняет операции на одном ядре ЦП.. Даже если пользователь запускает свой код на мощном сервере с 64 ядрами, Pandas в основном будет использовать только одно, оставляя остальные простаивать.

Нетерпеливое выполнение против ленивой оценки

Pandas использует активное выполнение, то есть выполняет вычисления сразу после запуска кода. Инструменты для работы с большими данными (например, Apache Spark) используют отложенную оценку. Последнее часто более эффективно, чем нетерпеливое выполнение, поскольку, когда для выполнения задачи требуется ряд шагов, ленивая оценка может увидеть все шаги и требуемый конечный результат и соответствующим образом оптимизировать. Нетерпеливое исполнение не может этого сделать. Он слепо выполняет каждый шаг по очереди, несмотря ни на что.

пределы вертикального масштабирования

Чтобы панды могли работать с большими наборами данных, вам придется полагаться на вертикальное масштабирование (покупка более дорогого компьютера с большим объемом оперативной памяти и более быстрым процессором). Но это может завести вас только до определенного момента. Например, у Панды нет встроенной способности «разговаривать» с группой. Он не может распространять фрейм данных на несколько машин.

так что делать?

Как обычно в мире ИТ, существует множество решений. Три наиболее популярных варианта:

1/ сумерки или луч

Это сторонние библиотеки, которые помогают писать распределенный код, который может работать на кластерах компьютеров. Хотя они пытаются имитировать API Pandas, у них все же есть тонкие различия и ограничения, которые могут потребовать рефакторинга кода.

2/ Spark: еще один механизм распределенных вычислений. Требуется другой синтаксис и другая ментальная модель.

3/ СУРБД: требует помещения ваших данных в базу данных и изучения SQL.

Реализация всех вышеперечисленных вариантов требует большой работы, но в оставшейся части статьи я сосредоточусь на варианте 2.

Итак, допустим, я убедил вас или, по крайней мере, возбудил ваш интерес, и вы подумываете о переносе некоторых или всех существующих вами процессов обработки на основе панд в PySpark. Каким должен быть ваш следующий шаг? Что ж, вам нужно начать конвертировать часть или всю вашу кодовую базу. Это может быть сложно, но не волнуйтесь, я вас прикрою.

Продолжайте читать, пока я проведу вас через несколько примеров фрагментов кода, которые показывают некоторые типичные операции обработки данных, от простых до более сложных. Я уверен, что вы узнаете некоторые из этих шаблонов в своем коде. Я покажу вам, как работает pandas, и воспроизведу это в PySpark, предоставив сравнение результатов и времени между ними.

настройка среды разработки

Я использую Ubuntu на WSL2. Сначала мы настроим для этой работы отдельную среду разработки, чтобы наши проекты были крутыми и не мешали друг другу. Для этой части я использую conda, но вы можете использовать любой метод, к которому вы привыкли.

Установите pyspark, pandas и т. д.

(base) $ conda create -n pandas_to_pyspark python=3.11 -y
(base) $ conda activate pandas_to_pyspark
(pands_to_pyspark) $ conda install jupyter polars pyarrow pandas -y
(pands_to_pyspark) $ conda install -c conda-forge pyspark

Чтобы проверить правильность установки PySpark, введите команду pyspark в окне терминала.

(pands_to_pyspark) pyspark

Python 3.11.14 | packaged by conda-forge | (main, Oct 22 2025, 22:46:25) [GCC 14.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
WARNING: Using incubator modules: jdk.incubator.vector
WARNING: package sun.security.action not in java.base
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/15 16:15:21 WARN Utils: Your hostname, tpr-desktop, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
26/01/15 16:15:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/15 16:15:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
WARNING: A terminally deprecated method in sun.misc.Unsafe has been called
WARNING: sun.misc.Unsafe::arrayBaseOffset has been called by org.apache.spark.unsafe.Platform (file:/home/tom/miniconda3/envs/pandas_to_pyspark/lib/python3.11/site-packages/pyspark/jars/spark-unsafe_2.13-4.1.1.jar)
WARNING: Please consider reporting this to the maintainers of class org.apache.spark.unsafe.Platform
WARNING: sun.misc.Unsafe::arrayBaseOffset will be removed in a future release
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 4.1.1
      /_/

Using Python version 3.11.14 (main, Oct 22 2025 22:46:25)
Spark context Web UI available at http://10.255.255.254:4040
Spark context available as 'sc' (master = local[*], app id = local-1768493723158).
SparkSession available as 'spark'.
>>>

Если вы не видите баннер приветствия Spark, что-то пошло не так, и вам следует еще раз проверить установку.

Получение нашего образца набора данных

Для наших целей нам не нужны какие-то сложные наборы. Достаточно набора синтетических данных о продажах со следующей схемой:

  • order_id (целое)
  • order_date(дата)
  • customer_id (целое)
  • имя_клиента (строка)
  • Product_id (целое)
  • имя_продукта(строка)
  • категория (стр)
  • количество (целое)
  • цена (плавающая)
  • итого(с плавающей запятой)

Нашими входными данными будет CSV-файл на 30 миллионов записей. Вот программа Python для генерации тестовых данных:

import polars as pl
import random
from datetime import datetime, timedelta

# Generate fake data
def generate_fake_data(num_records):
    random.seed(42)
    
    product_names = ['Laptop', 'Smartphone', 'Desk', 'Chair', 'Monitor', 
                     'Printer', 'Paper', 'Pen', 'Notebook', 'Coffee Maker']
    categories = ['Electronics', 'Electronics', 'Office', 'Office', 'Electronics',
                  'Electronics', 'Office', 'Office', 'Office', 'Electronics']
    
    data = {
        'order_id': range(num_records),
        'order_date': [datetime(2023, 1, 1) + timedelta(days=random.randint(0, 364)) 
                       for _ in range(num_records)],
        'customer_id': [random.randint(100, 999) for _ in range(num_records)],
        'customer_name': [f'Customer_{random.randint(0, 99999)}' for _ in range(num_records)],
        'product_id': [random.randint(200, 209) for _ in range(num_records)],
        'product_name': [random.choice(product_names) for _ in range(num_records)],
        'category': [random.choice(categories) for _ in range(num_records)],
        'quantity': [random.randint(1, 10) for _ in range(num_records)],
        'price': [round(random.uniform(1.99, 999.99), 2) for _ in range(num_records)]
    }
    
    df = pl.DataFrame(data)
    df = df.with_columns((pl.col('price') * pl.col('quantity')).alias('total'))
    
    return df
# Generate 30 million records
num_records = 30000000
df = generate_fake_data(num_records)
# Save to CSV
df.write_csv('/mnt/d/sales_data/sales_data_30m.csv')
print('CSV file with fake sales data has been created.')

Первые несколько строк моего файла тестовых данных выглядели следующим образом.

order_id,order_date,customer_id,customer_name,product_id,product_name,category,quantity,price,total
0,2023-11-24T00:00:00.000000,434,Customer_46318,201,Notebook,Office,6,925.68,5554.08
1,2023-02-27T00:00:00.000000,495,Customer_26514,203,Coffee Maker,Office,3,676.44,2029.3200000000002
2,2023-01-13T00:00:00.000000,377,Customer_56676,204,Pen,Electronics,10,533.2,5332.0
3,2023-05-21T00:00:00.000000,272,Customer_13772,209,Notebook,Electronics,5,752.0,3760.0
4,2023-05-06T00:00:00.000000,490,Customer_23118,206,Coffee Maker,Electronics,3,747.46,2242.38
5,2023-04-25T00:00:00.000000,515,Customer_88284,202,Desk,Electronics,10,886.22,8862.2
6,2023-03-13T00:00:00.000000,885,Customer_47303,200,Desk,Electronics,1,38.97,38.97
7,2023-02-22T00:00:00.000000,598,Customer_90712,203,Desk,Electronics,5,956.31,4781.549999999999
8,2023-12-13T00:00:00.000000,781,Customer_32943,205,Coffee Maker,Electronics,7,258.25,1807.75
9,2023-10-07T00:00:00.000000,797,Customer_40215,208,Pen,Electronics,8,464.81,3718.48
10,2023-02-14T00:00:00.000000,333,Customer_18388,209,Monitor,Electronics,1,478.95,478.95

пример кода

Запустите блокнот Jupyter:

(pands_to_pyspark) $ jupyter notebook

Данные и два набора кодов, которые я буду запускать, находятся на моем настольном ПК. Я покажу выходные данные обоих запусков кода, чтобы вы могли убедиться, что они делают одно и то же, и я укажу время (в секундах), чтобы вы могли сравнить производительность. Сначала код и вывод pandas, затем код и вывод Spark.

Фрагменты кода кратки и хорошо прокомментированы, поэтому, если вы уже являетесь программистом Pandas, вам будет довольно легко следить за тем, что происходит в коде PySpark, если вы еще с ним не знакомы.

Чтобы внести ясность: поскольку набор входных данных, который я буду использовать, не является «большими данными», время следует рассматривать как второстепенное значение.

Пример 1 – Загрузка данных из CSV

Мы начнем с простой операции — просто прочитаем наш входной файл данных CSV и отсортируем его по столбцам order_date и order_id перед отображением первых и последних пяти записей.

Вот код панды.

import pandas as pd
import time

# 1. Define Path (WSL format)
file_path = "/mnt/d/sales_data/sales_data_30m.csv"

print(f"Starting process for {file_path}...")

# --- LOAD PHASE ---
start_load = time.time()
df = pd.read_csv(file_path)
end_load = time.time()

print(f"Loading complete. Time taken: {end_load - start_load:.2f} seconds")

# --- SORT PHASE ---
start_sort = time.time()
# Note: Sorting by two columns at once
df_sorted = df.sort_values(by=['order_date', 'order_id'])
end_sort = time.time()

print(f"Sorting complete. Time taken: {end_sort - start_sort:.2f} seconds")

# --- DISPLAY ---
print("\n" + "="*30)
print("TOP 5 RECORDS")
print(df_sorted.head(5))

print("\nBOTTOM 5 RECORDS")
print(df_sorted.tail(5))
print("="*30)

total_time = end_sort - start_load
print(f"\nTotal Execution Time: {total_time:.2f} seconds")

Вот результат.

(pands_to_pyspark) $ python ex1_pandas.py

Starting process for /mnt/d/sales_data/sales_data_30m.csv...
Loading complete. Time taken: 34.02 seconds
Sorting complete. Time taken: 7.00 seconds

==============================
TOP 5 RECORDS
      order_id                  order_date  customer_id   customer_name  ...     category quantity   price    total
179        179  2023-01-01T00:00:00.000000          350  Customer_93033  ...       Office        5  640.16  3200.80
520        520  2023-01-01T00:00:00.000000          858  Customer_31280  ...  Electronics        3  841.21  2523.63
557        557  2023-01-01T00:00:00.000000          651  Customer_95137  ...       Office        7   75.66   529.62
1080      1080  2023-01-01T00:00:00.000000          303  Customer_87422  ...  Electronics       10   98.34   983.40
2023      2023  2023-01-01T00:00:00.000000          838  Customer_95193  ...       Office        4  427.96  1711.84

[5 rows x 10 columns]

BOTTOM 5 RECORDS
          order_id                  order_date  customer_id   customer_name  ...     category quantity   price    total
29997832  29997832  2023-12-31T00:00:00.000000          831  Customer_49372  ...  Electronics        6  418.86  2513.16
29997903  29997903  2023-12-31T00:00:00.000000          449  Customer_17384  ...       Office        3  494.29  1482.87
29998337  29998337  2023-12-31T00:00:00.000000          649  Customer_24018  ...  Electronics        5  241.71  1208.55
29999674  29999674  2023-12-31T00:00:00.000000          105  Customer_39890  ...       Office        1   94.97    94.97
29999933  29999933  2023-12-31T00:00:00.000000          572  Customer_38794  ...       Office        8  375.36  3002.88

[5 rows x 10 columns]
==============================

Total Execution Time: 41.03 seconds

Вот эквивалентный код Spark и результаты обработки.

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, DoubleType
import time
import pandas as pd

start_overall = time.time()

# 1. Initialize with explicit Memory and Shuffle tuning
spark = SparkSession.builder \
    .appName("OptimizedSpark") \
    .config("spark.sql.shuffle.partitions", "16") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

# 2. Define Manual Schema (Skips the double-read of inferSchema)
schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("order_date", DateType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("customer_name", StringType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("product_name", StringType(), True),
    StructField("category", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("price", DoubleType(), True),
    StructField("total", DoubleType(), True)
])

file_path = "/mnt/d/sales_data/sales_data_30m.csv"
print(f"Processing {file_path} with Optimized Spark...")

# --- LOAD ---
start_load = time.time()
# No inferSchema!
df = spark.read.csv(file_path, header=True, schema=schema)
print(f"LOAD INITIATED. (Time taken: {time.time() - start_load:.2f}s)")

# --- SORT ---
start_sort = time.time()
# Sorting 30M rows
df_sorted = df.orderBy(["order_date", "order_id"])

# Force the sort with a light action (NOT cache)
row_count = df_sorted.count()
end_sort = time.time()

print(f"SORT COMPLETE. Rows: {row_count}")
print(f"   Time taken: {end_sort - start_sort:.2f} seconds")

# --- DISPLAY ---
print("\n" + "="*80)
print("TOP 5 RECORDS")
print(df_sorted.limit(5).toPandas().to_string(index=False))

print("\nBOTTOM 5 RECORDS")
tail_data = df_sorted.tail(5)
print(pd.DataFrame(tail_data, columns=df.columns).to_string(index=False))
print("="*80)

print(f"\nTotal Execution Time: {time.time() - start_overall:.2f} seconds")
spark.stop()

И вывод.

(pands_to_pyspark) $ spark-submit ex1_spark.py 2> /dev/null
Processing /mnt/d/sales_data/sales_data_30m.csv with Optimized Spark...
LOAD INITIATED. (Time taken: 0.72s)
SORT COMPLETE. Rows: 30000000
   Time taken: 5.65 seconds

================================================================================
TOP 5 RECORDS
 order_id order_date  customer_id  customer_name  product_id product_name    category  quantity  price   total
      179 2023-01-01          350 Customer_93033         207         Desk      Office         5 640.16 3200.80
      520 2023-01-01          858 Customer_31280         201          Pen Electronics         3 841.21 2523.63
      557 2023-01-01          651 Customer_95137         209      Printer      Office         7  75.66  529.62
     1080 2023-01-01          303 Customer_87422         204   Smartphone Electronics        10  98.34  983.40
     2023 2023-01-01          838 Customer_95193         201        Paper      Office         4 427.96 1711.84

BOTTOM 5 RECORDS
 order_id order_date  customer_id  customer_name  product_id product_name    category  quantity  price   total
 29997832 2023-12-31          831 Customer_49372         201        Chair Electronics         6 418.86 2513.16
 29997903 2023-12-31          449 Customer_17384         205         Desk      Office         3 494.29 1482.87
 29998337 2023-12-31          649 Customer_24018         201   Smartphone Electronics         5 241.71 1208.55
 29999674 2023-12-31          105 Customer_39890         203        Chair      Office         1  94.97   94.97
 29999933 2023-12-31          572 Customer_38794         201         Desk      Office         8 375.36 3002.88
================================================================================

Total Execution Time: 36.12 seconds

Пример 2. Преобразование файла CSV в паркет.

В этом примере мы прочитаем тот же входной CSV-файл с 30 МБ записей, а затем перепишем его как файл Parquet.

Как и прежде, мы начнем с кода и вывода pandas.

import pandas as pd
import pyarrow.parquet as pq
import pyarrow as pa
import time

csv_file = "/mnt/d/sales_data/sales_data_30m.csv"
parquet_file = "/mnt/d/sales_data/sales_data_pandas_30m.parquet"
chunk_size = 1_000_000  # Process 1 million rows at a time

print(f"Starting memory-efficient conversion...")
start_total = time.time()

# 1. Create a CSV reader object (this doesn't load data yet)
reader = pd.read_csv(csv_file, chunksize=chunk_size)

parquet_writer = None

for i, chunk in enumerate(reader):
    start_chunk = time.time()

    # Convert Pandas chunk to PyArrow Table
    table = pa.Table.from_pandas(chunk)

    # Initialize the writer on the first chunk
    if parquet_writer is None:
        parquet_writer = pq.ParquetWriter(parquet_file, table.schema, compression='snappy')

    # Write this chunk to the file
    parquet_writer.write_table(table)

    print(f"Processed chunk {i+1} (Rows {i*chunk_size} to {(i+1)*chunk_size}) in {time.time() - start_chunk:.2f}s")

# 2. Close the writer
if parquet_writer:
    parquet_writer.close()

print("\n" + "="*40)
print(f"Conversion Complete!")
print(f"Total Time: {time.time() - start_total:.2f} seconds")
print("="*40)

выход.

(pands_to_pyspark) $ python ex2_pandas.py

Starting memory-efficient conversion...
Processed chunk 1 (Rows 0 to 1000000) in 4.82s
Processed chunk 2 (Rows 1000000 to 2000000) in 0.40s
Processed chunk 3 (Rows 2000000 to 3000000) in 0.39s
Processed chunk 4 (Rows 3000000 to 4000000) in 0.36s
Processed chunk 5 (Rows 4000000 to 5000000) in 0.43s
Processed chunk 6 (Rows 5000000 to 6000000) in 0.45s
Processed chunk 7 (Rows 6000000 to 7000000) in 0.35s
Processed chunk 8 (Rows 7000000 to 8000000) in 0.34s
Processed chunk 9 (Rows 8000000 to 9000000) in 0.36s
Processed chunk 10 (Rows 9000000 to 10000000) in 0.36s
Processed chunk 11 (Rows 10000000 to 11000000) in 0.37s
Processed chunk 12 (Rows 11000000 to 12000000) in 0.41s
Processed chunk 13 (Rows 12000000 to 13000000) in 0.48s
Processed chunk 14 (Rows 13000000 to 14000000) in 0.43s
Processed chunk 15 (Rows 14000000 to 15000000) in 0.38s
Processed chunk 16 (Rows 15000000 to 16000000) in 0.35s
Processed chunk 17 (Rows 16000000 to 17000000) in 0.34s
Processed chunk 18 (Rows 17000000 to 18000000) in 0.35s
Processed chunk 19 (Rows 18000000 to 19000000) in 0.36s
Processed chunk 20 (Rows 19000000 to 20000000) in 0.35s
Processed chunk 21 (Rows 20000000 to 21000000) in 0.34s
Processed chunk 22 (Rows 21000000 to 22000000) in 0.34s
Processed chunk 23 (Rows 22000000 to 23000000) in 0.34s
Processed chunk 24 (Rows 23000000 to 24000000) in 0.36s
Processed chunk 25 (Rows 24000000 to 25000000) in 0.36s
Processed chunk 26 (Rows 25000000 to 26000000) in 0.35s
Processed chunk 27 (Rows 26000000 to 27000000) in 0.36s
Processed chunk 28 (Rows 27000000 to 28000000) in 0.35s
Processed chunk 29 (Rows 28000000 to 29000000) in 0.35s
Processed chunk 30 (Rows 29000000 to 30000000) in 0.34s

========================================
Conversion Complete!
Total Time: 43.30 seconds
========================================

А теперь о PySpark.

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, DoubleType
import time

# Start the overall timer immediately
start_overall = time.time()

# 1. Initialize Spark with high memory configuration
spark = SparkSession.builder \
    .appName("EfficientParquetConversion") \
    .config("spark.driver.memory", "8g") \
    .master("local[*]") \
    .getOrCreate()

# Silence logs
spark.sparkContext.setLogLevel("ERROR")

# 2. Explicitly define the Schema (Most efficient for CSV)
schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("order_date", DateType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("customer_name", StringType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("product_name", StringType(), True),
    StructField("category", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("price", DoubleType(), True),
    StructField("total", DoubleType(), True)
])

csv_path = "/mnt/d/sales_data/sales_data_30m.csv"
parquet_path = "/mnt/d/sales_data/sales_data_parquet"

print(f"Starting Spark conversion to {parquet_path}...")

# 3. Read the CSV using the defined schema
start_proc = time.time()
df = spark.read.csv(csv_path, header=True, schema=schema)

# 4. Write to Parquet (Overwrite if exists)
df.write.mode("overwrite").parquet(parquet_path)
end_proc = time.time()

print("-" * 40)
print(f"CONVERSION COMPLETE")
print(f"Processing Time (Read + Write): {end_proc - start_proc:.2f} seconds")
print(f"Total Execution Time (incl. Spark startup): {time.time() - start_overall:.2f} seconds")
print("-" * 40)

spark.stop()

Я могу подтвердить, что содержимое файла паркета, созданного Pandas и PySpark, было одинаковым.

(pands_to_pyspark) $ spark-submit --driver-memory 8g ex2_spark.py 2> /dev/null
Starting Spark conversion to /mnt/d/sales_data/sales_data_parquet...
----------------------------------------
CONVERSION COMPLETE
Processing Time (Read + Write): 21.62 seconds
Total Execution Time (incl. Spark startup): 23.26 seconds
----------------------------------------

Пример 3. Сведение данных

Прочтите файлы Parquet, которые мы только что создали, и рассчитайте общий объем продаж по названию продукта на дату заказа.

Панда.

import pandas as pd
from timeit import default_timer as timer

parquet_path = r'/mnt/d/sales_data/sales_data_pandas_30m.parquet'

start = timer()

# Read the Parquet file
df = pd.read_parquet(parquet_path)

# 1) Make order_date a proper date
# Convert to datetime then extract the date component
df["order_date"] = pd.to_datetime(df["order_date"]).dt.date

# 2) Pivot (sum)
# Pandas pivot_table handles the aggregation (sum) and the shape simultaneously
pivot = df.pivot_table(
    values="total",
    index="order_date",
    columns="product_name",
    aggfunc="sum"
)

# 3) Sort rows by date (Pandas index)
pivot = pivot.sort_index()

# 4) Enforce a consistent column order (alphabetical product columns)
# pivot_table already sorts columns by default, but we can be explicit
pivot = pivot.reindex(sorted(pivot.columns), axis=1)

# 5) (Optional) Replace nulls with 0
# pivot = pivot.fillna(0)

end = timer()

print(f"Pandas: read + standardized pivot took {end - start:.2f} seconds")
print(pivot.head(5))

вывод панд.

(pandas_pysaprk) $ python ex3_pandas.py
Pandas: read + standardized pivot took 9.98 seconds
product_name        Chair  Coffee Maker         Desk       Laptop  ...        Paper          Pen      Printer   Smartphone
order_date                                                         ...                                                  
2023-01-01    22041864.51   22596967.46  22228235.43  22319250.97  ...  22778128.78  22690394.34  22747419.90  22848102.42
2023-01-02    22702337.42   21960074.98  23539803.82  23332945.56  ...  22414013.44  22378123.52  22494364.89  22321919.79
2023-01-03    22626028.85   22651440.10  22930421.42  22938328.34  ...  22880161.09  21607713.73  22937117.72  22262604.28
2023-01-04    22605466.70   22652219.77  22463371.43  22506729.47  ...  23097987.72  22327386.63  22922449.38  22673066.75
2023-01-05    22581240.40   23004302.70  22511769.34  22882968.52  ...  22058769.99  22379327.80  22946133.94  22988219.48

[5 rows x 10 columns]

ПиСпарк.

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from timeit import default_timer as timer

# Initialize Spark
spark = SparkSession.builder \
    .appName("SparkPivotBenchmark") \
    .config("spark.driver.memory", "8g") \
    .master("local[*]") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

parquet_path = '/mnt/d/sales_data/sales_data_parquet'
start = timer()

# 1. Read the Parquet file
df = spark.read.parquet(parquet_path)

# 2. Make order_date a proper date
# We cast the column to DateType
df = df.withColumn("order_date", F.col("order_date").cast("date"))

# 3. Pivot (sum)
# Spark's pivot is much faster if you provide the unique values (product_names)
# but it can also infer them automatically as shown below
pivot_df = df.groupBy("order_date") \
    .pivot("product_name") \
    .agg(F.sum("total"))

# 4. Sort rows by date
pivot_df = pivot_df.orderBy("order_date")

# 5. Enforce consistent column order (alphabetical product columns)
# The first column is 'order_date', the rest are the pivoted products
columns = pivot_df.columns
product_cols = sorted([c for c in columns if c != "order_date"])
pivot_df = pivot_df.select(["order_date"] + product_cols)

# 6. Replace nulls with 0
pivot_df = pivot_df.na.fill(0)

# Trigger an action to measure actual performance (count of pivoted days)
row_count = pivot_df.count()
end = timer()

print(f"PySpark: read + standardized pivot took {end - start:.2f} seconds")
print(f"Total days processed: {row_count}")

# 7. Display top 5
pivot_df.show(5)

spark.stop()

Вывод PySpark.

(pandas_pyspark) $ spark-submit --driver-memory 8g ex3_spark.py 2> /dev/null
PySpark: read + standardized pivot took 3.54 seconds
Total days processed: 365
+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|order_date|               Chair|        Coffee Maker|                Desk|              Laptop|             Monitor|            Notebook|               Paper|                 Pen|             Printer|          Smartphone|
+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|2023-01-01|2.2041864510000005E7|2.2596967459999997E7|       2.222823543E7|2.2319250969999995E7|       2.309861159E7|2.2687765309999995E7|2.2778128780000005E7|2.2690394339999996E7|        2.27474199E7|2.2848102419999998E7|
|2023-01-02|       2.270233742E7|2.1960074980000004E7|2.3539803819999993E7|2.3332945560000006E7|2.2441403840000004E7|       2.282151253E7|       2.241401344E7|2.2378123520000003E7|       2.249436489E7|       2.232191979E7|
|2023-01-03|2.2626028849999998E7|        2.26514401E7|       2.293042142E7|       2.293832834E7|       2.290862974E7|2.2432433990000006E7|2.2880161090000004E7|2.1607713730000008E7|       2.293711772E7|       2.226260428E7|
|2023-01-04|2.2605466699999996E7|2.2652219770000003E7|       2.246337143E7| 2.250672947000001E7|2.1930874809999995E7|2.3261865149999995E7|       2.309798772E7|2.2327386629999995E7|2.2922449380000003E7|2.2673066749999996E7|
|2023-01-05|2.2581240400000002E7|2.3004302700000003E7|       2.251176934E7|2.2882968520000003E7|       2.284090005E7|       2.272256243E7|2.2058769990000002E7|2.2379327800000004E7|2.2946133940000005E7|       2.298821948E7|
+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows

Пример 4 – оконная аналитика с LAG/LEAD

В моем последнем примере кода мы рассчитаем СУММУ всех заказов на дату заказа, а затем воспользуемся функцией LAG/LEAD для расчета процентного изменения общего количества заказов за последовательные даты заказов.

Панда.

import pandas as pd
from timeit import default_timer as timer

parquet_path = '/mnt/d/sales_data/sales_data_pandas_30m.parquet'

start = timer()

# 1. Read the Parquet file
df = pd.read_parquet(parquet_path)

# 2. Normalize order_date
# Pandas to_datetime is generally flexible enough to handle multiple formats
# automatically, which replaces the manual pl.coalesce logic.
df['order_date'] = pd.to_datetime(df['order_date'], errors='coerce').dt.date

# 3. Group by date and aggregate
result_pandas = df.groupby("order_date")["total"].sum().reset_index()

# 4. Sort by date
result_pandas = result_pandas.sort_values("order_date")

# 5. Analytic functions (Lag and Lead)
# In Pandas, shift(1) is lag, shift(-1) is lead
result_pandas["total_lag"] = result_pandas["total"].shift(1)
result_pandas["total_lead"] = result_pandas["total"].shift(-1)

# 6. Calculate Percent Changes
# We use Series operations which handle the 'None/NaN' and 'divide by zero'
# logic similar to pl.when().otherwise()
result_pandas["percent_change_from_lag"] = (
    (result_pandas["total"] - result_pandas["total_lag"]) * 100 / result_pandas["total_lag"]
)

result_pandas["percent_change_from_lead"] = (
    (result_pandas["total"] - result_pandas["total_lead"]) * 100 / result_pandas["total_lead"]
)

end = timer()

print(f"Pandas: read + analytic (lag/lead) took {end - start:.2f} seconds")
print(result_pandas.head(10).to_string(index=False))

вывод панд.

(pandas_pyspark) $ python ex4_pandas.py
Pandas: read + analytic (lag/lead) took 8.99 seconds
order_date        total    total_lag   total_lead  percent_change_from_lag  percent_change_from_lead
2023-01-01 226036740.71          NaN 226406499.79                      NaN                 -0.163316
2023-01-02 226406499.79 226036740.71 226174879.26                 0.163584                  0.102408
2023-01-03 226174879.26 226406499.79 226441417.81                -0.102303                 -0.117708
2023-01-04 226441417.81 226174879.26 226916194.65                 0.117846                 -0.209230
2023-01-05 226916194.65 226441417.81 226990804.43                 0.209669                 -0.032869
2023-01-06 226990804.43 226916194.65 225973424.85                 0.032880                  0.450221
2023-01-07 225973424.85 226990804.43 227894370.99                -0.448203                 -0.842911
2023-01-08 227894370.99 225973424.85 227111347.09                 0.850076                  0.344775
2023-01-09 227111347.09 227894370.99 226271884.19                -0.343591                  0.370997
2023-01-10 226271884.19 227111347.09 226635543.97                -0.369626                 -0.160460

ПиСпарк.

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from timeit import default_timer as timer

# Initialize Spark
spark = SparkSession.builder \
    .appName("SparkAnalyticBenchmark") \
    .config("spark.driver.memory", "8g") \
    .master("local[*]") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

# Path to the Parquet file

parquet_path = '/mnt/d/sales_data/sales_data_parquet'

start = timer()

# 1. Read the Parquet file
df = spark.read.parquet(parquet_path)

# 2. Normalize order_date
# Spark's to_date is efficient; coalesce handles multiple potential formats if needed
df = df.withColumn("order_date", F.to_date(F.col("order_date")))

# 3. Group by date and aggregate
daily_revenue = df.groupBy("order_date").agg(F.sum("total").alias("total"))

# 4. Define the Window for Analytic functions
# We must order by date for lag/lead to make sense
window_spec = Window.orderBy("order_date")

# 5. Apply Lag and Lead
# lag(col, 1) = previous row; lead(col, 1) = next row
daily_revenue = daily_revenue.withColumn("total_lag", F.lag("total", 1).over(window_spec))
daily_revenue = daily_revenue.withColumn("total_lead", F.lead("total", 1).over(window_spec))

# 6. Calculate Percent Changes
# We use F.when() to handle nulls and avoid division by zero
daily_revenue = daily_revenue.withColumn(
    "percent_change_from_lag",
    F.when((F.col("total_lag").isNotNull()) & (F.col("total_lag") != 0),
           (F.col("total") - F.col("total_lag")) * 100 / F.col("total_lag"))
    .otherwise(None)
)

daily_revenue = daily_revenue.withColumn(
    "percent_change_from_lead",
    F.when((F.col("total_lead").isNotNull()) & (F.col("total_lead") != 0),
           (F.col("total") - F.col("total_lead")) * 100 / F.col("total_lead"))
    .otherwise(None)
)

# 7. Final Sort and Action
result_spark = daily_revenue.orderBy("order_date")

# Trigger action to measure performance
row_count = result_spark.count()
end = timer()

print(f"PySpark: read + analytic (lag/lead) took {end - start:.2f} seconds")
print(f"Total days processed: {row_count}")

# Display top 10
result_spark.show(10)

spark.stop()

Вывод PySpark.

(pandas_pyspark) $ spark-submit --driver-memory 8g ex4_spark.py 2> /dev/null
PySpark: read + analytic (lag/lead) took 4.05 seconds
Total days processed: 365
+----------+--------------------+--------------------+--------------------+-----------------------+------------------------+
|order_date|               total|           total_lag|          total_lead|percent_change_from_lag|percent_change_from_lead|
+----------+--------------------+--------------------+--------------------+-----------------------+------------------------+
|2023-01-01|      2.2603674071E8|                NULL|2.2640649979000002E8|                   NULL|    -0.16331645970543143|
|2023-01-02|2.2640649979000002E8|      2.2603674071E8|      2.2617487926E8|    0.16358361868011784|     0.10240771687724477|
|2023-01-03|      2.2617487926E8|2.2640649979000002E8|2.2644141781000003E8|    -0.1023029507610723|    -0.11770750800707579|
|2023-01-04|2.2644141781000003E8|      2.2617487926E8|2.2691619464999998E8|    0.11784622185810545|     -0.2092300378702583|
|2023-01-05|2.2691619464999998E8|2.2644141781000003E8|2.2699080442999995E8|    0.20966872782889678|    -0.03286907599068832|
|2023-01-06|2.2699080442999995E8|2.2691619464999998E8| 2.259734248499999E8|   0.032879883304517334|     0.45022089684898775|
|2023-01-07| 2.259734248499999E8|2.2699080442999995E8|2.2789437099000004E8|    -0.4482029933127909|     -0.8429107448575048|
|2023-01-08|2.2789437099000004E8| 2.259734248499999E8|2.2711134708999988E8|     0.8500761278788644|       0.344775331586518|
|2023-01-09|2.2711134708999988E8|2.2789437099000004E8|2.2627188419000003E8|   -0.34359071555765364|     0.37099744097899573|
|2023-01-10|2.2627188419000003E8|2.2711134708999988E8|2.2663554396999997E8|    -0.3696261374678007|     -0.1604601703817825|
+----------+--------------------+--------------------+--------------------+-----------------------+------------------------+
only showing top 10 rows

Краткое содержание

В этой статье я объяснил, что существует несколько путей обновления вашей системы, если данные, с которыми вы работаете, начинают вторгаться на территорию «больших данных», так что их становится сложно (или невозможно) обрабатывать с использованием существующей базы кода pandas.

Я привел три распространенных варианта: распределенные библиотеки, такие как Dask или Ray, перемещение ваших данных в СУБД и запрос к ним с помощью SQL или использование библиотеки распределенных вычислений — Spark.

Сосредоточив внимание на последнем, я обрисовал аргументы в пользу PySpark, затем использовал четыре реальных примера типичных задач обработки данных, для которых регулярно используется pandas, а также эквивалентный код PySpark для каждой из них.

Хотя тесты времени показали некоторые улучшения во времени выполнения PySpark по сравнению с Pandas, это не было основной задачей. В конце концов, даже более крупные наборы данных панды вообще не смогут их обработать, не говоря уже о том, чтобы обработать их в течение определенного периода времени.

Вместо этого основная цель этой статьи заключалась в том, чтобы показать вам, насколько это относительно просто:

  • Быстро настройте и запустите среду Spark.
  • Чтобы гарантировать, что большие данные не должны ограничивать ваши возможности обработки, реплицируйте распространенные операции с данными Pandas на языке PySpark.

Преодолевая разрыв между однопоточным анализом и масштабируемой обработкой больших данных, вы можете уверенно трансформировать свой рабочий процесс, поскольку ваши данные перерастают возможности вашего локального оборудования.

Leave a Reply

Your email address will not be published. Required fields are marked *