Aller au contenu

Apache Spark

Aperçu

Apache Spark fournit le traitement distribué de données pour AKKO, avec une intégration native d'Iceberg via le catalogue REST Polaris. Spark est principalement utilisé via le protocole gRPC Spark Connect depuis les notebooks JupyterHub avec PySpark.

Architecture

  image akko-spark (personnalisée)
  ├── JAR runtime Iceberg
  ├── JAR bundle AWS Iceberg
  ├── JAR Spark Connect
  ├── Listener OpenLineage Spark
  └── Agent OpenMetadata Spark

  ┌─────────────────┐
  │  spark-master    │  Gestionnaire de cluster (Spark Standalone)
  │  :8090 (Web UI)  │
  └────────┬────────┘
  ┌────────┴────────┐
  │  spark-worker    │  Noeud exécuteur
  └─────────────────┘
  ┌────────┴────────┐
  │  spark-connect   │  Serveur gRPC (:15002)
  │  (type Thrift)   │
  └─────────────────┘
           │ gRPC (sc://)
  ┌────────┴────────┐
  │   Notebooks      │  Client PySpark
  │  (akko-notebook) │
  └─────────────────┘

Les trois services (spark-master, spark-worker, spark-connect) utilisent la même image Docker personnalisée : akko-spark.

Image personnalisée

L'image akko-spark est construite à partir de apache/spark:3.5.1-python3 avec des JARs supplémentaires intégrés :

docker/spark/Dockerfile
FROM apache/spark:3.5.1-python3

USER root

# JARs Iceberg + AWS
RUN wget -q -P /opt/spark/jars/ \
    iceberg-spark-runtime-3.5_2.12-1.5.2.jar \
    iceberg-aws-bundle-1.5.2.jar \
    spark-connect_2.12-3.5.1.jar

# Agent OpenLineage + OpenMetadata
RUN wget -q -P /opt/spark/jars/ \
    openlineage-spark_2.12-1.28.0.jar \
    openmetadata-spark-agent.jar

USER spark
JAR Version Fonction
iceberg-spark-runtime 1.5.2 Support du format de table Iceberg
iceberg-aws-bundle 1.5.2 Support du système de fichiers S3
spark-connect 3.5.1 Serveur gRPC Spark Connect
openlineage-spark 1.28.0 Émission d'événements OpenLineage
openmetadata-spark-agent 1.0 Transport de lignage vers OpenMetadata

Les JARs doivent être dans l'image Docker

Les JARs Spark doivent être intégrés dans l'image Docker lors de la construction. L'utilisation de --jars ou --packages au moment de l'exécution provoque des conflits de ClassLoader et des échecs imprévisibles. Reconstruisez toujours l'image lors de l'ajout ou de la mise à jour de JARs :

docker build -t akko-spark:2026.03 \
  -f docker/spark/Dockerfile docker/spark/

Spark Connect

Spark Connect est un protocole basé sur gRPC (introduit dans Spark 3.4) qui découple le client (PySpark dans les notebooks) du serveur (cluster Spark). Cela signifie :

  • Aucun JAR nécessaire dans l'image notebook
  • Client léger : seul le package pip pyspark est requis
  • Exécution côté serveur avec API DataFrame côté client

Le serveur Spark Connect écoute sur le port 15002 (gRPC).

Connexion depuis les notebooks

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .remote("sc://akko-akko-spark-connect:15002") \
    .getOrCreate()

# Interroger les tables Iceberg
df = spark.sql("SELECT * FROM iceberg.analytics.transactions")
df.show()

Variable d'environnement

La variable d'environnement SPARK_REMOTE est pré-définie dans les conteneurs notebook à sc://akko-akko-spark-connect:15002. PySpark l'utilisera automatiquement si aucun .remote() n'est spécifié :

# Ceci fonctionne également (utilise la variable d'environnement SPARK_REMOTE)
spark = SparkSession.builder.getOrCreate()

Intégration Iceberg

Spark lit et écrit les tables Iceberg via le catalogue REST Polaris. La configuration du catalogue est passée via les propriétés Spark dans les valeurs Helm :

Propriété Valeur
spark.sql.catalog.iceberg org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.iceberg.type rest
spark.sql.catalog.iceberg.uri http://akko-akko-polaris:8181/api/catalog
spark.sql.catalog.iceberg.warehouse akko-warehouse
spark.sql.catalog.iceberg.credential root:{POLARIS_ROOT_SECRET}
spark.sql.catalog.iceberg.scope PRINCIPAL_ROLE:ALL

Exemple : écriture de tables Iceberg

# Créer un namespace
spark.sql("CREATE NAMESPACE IF NOT EXISTS iceberg.analytics")

# Créer une table
spark.sql("""
    CREATE TABLE IF NOT EXISTS iceberg.analytics.transactions (
        id BIGINT,
        customer_id INT,
        amount DECIMAL(10,2),
        transaction_date DATE
    ) USING iceberg
""")

# Insérer des données
spark.sql("""
    INSERT INTO iceberg.analytics.transactions VALUES
    (1, 101, 250.00, DATE '2026-01-15'),
    (2, 102, 1500.00, DATE '2026-01-16')
""")

object storage comme backend de stockage S3

Les fichiers de données des tables Iceberg (Parquet) sont stockés dans object storage, le stockage objet compatible S3 d'AKKO. La configuration S3 est passée via les propriétés Spark :

Propriété Valeur
spark.sql.catalog.iceberg.s3.endpoint http://akko-minio:9000
spark.sql.catalog.iceberg.s3.path-style-access true
spark.sql.catalog.iceberg.s3.access-key-id {MINIO_ROOT_USER}
spark.sql.catalog.iceberg.s3.secret-access-key {MINIO_ROOT_PASSWORD}

OpenLineage

L'image Spark inclut le JAR du listener OpenLineage, qui émet des événements de lignage (début de job, complétion, échec) pour chaque job Spark. Ces événements capturent les jeux de données en entrée/sortie, permettant le suivi du lignage de données de bout en bout.

Résumé des services

Service Port Protocole Fonction
spark-master 8090 (Web UI), 7077 (cluster) HTTP, Spark Gestionnaire de cluster
spark-worker -- Spark Exécuteur de tâches
spark-connect 15002 gRPC Serveur Spark Connect face au client

Problèmes connus

Points d'attention importants

  • Les JARs doivent être dans l'image Docker : N'utilisez jamais --jars ou --packages. Les problèmes de ClassLoader provoqueront des échecs silencieux. Reconstruisez toujours l'image.
  • SerializedLambda sur .collect() : L'appel à .collect() sur les tables de métadonnées Iceberg (par ex. snapshots, history) échoue avec une erreur SerializedLambda en mode Spark Connect. Utilisez .show() à la place :

    # Ceci échoue :
    spark.sql("SELECT * FROM iceberg.analytics.transactions.snapshots").collect()
    
    # Ceci fonctionne :
    spark.sql("SELECT * FROM iceberg.analytics.transactions.snapshots").show()
    
  • Reconstruire après modification du Dockerfile : Après modification du Dockerfile, reconstruisez les images avec helm/scripts/build-images.sh et redéployez avec helm upgrade akko helm/akko/ -n akko.

  • Distribution d'identifiants Polaris : object storage ne supporte pas STS AssumeRole. Le catalogue Polaris est configuré avec stsUnavailable=true pour contourner la distribution d'identifiants et utiliser les identifiants S3 directs.