Apache Spark¶
Aperçu¶
Apache Spark fournit le traitement distribué de données pour AKKO, avec une intégration native d'Iceberg via le catalogue REST Polaris. Spark est principalement utilisé via le protocole gRPC Spark Connect depuis les notebooks JupyterHub avec PySpark.
Architecture¶
image akko-spark (personnalisée)
├── JAR runtime Iceberg
├── JAR bundle AWS Iceberg
├── JAR Spark Connect
├── Listener OpenLineage Spark
└── Agent OpenMetadata Spark
┌─────────────────┐
│ spark-master │ Gestionnaire de cluster (Spark Standalone)
│ :8090 (Web UI) │
└────────┬────────┘
│
┌────────┴────────┐
│ spark-worker │ Noeud exécuteur
└─────────────────┘
│
┌────────┴────────┐
│ spark-connect │ Serveur gRPC (:15002)
│ (type Thrift) │
└─────────────────┘
▲
│ gRPC (sc://)
┌────────┴────────┐
│ Notebooks │ Client PySpark
│ (akko-notebook) │
└─────────────────┘
Les trois services (spark-master, spark-worker, spark-connect) utilisent la
même image Docker personnalisée : akko-spark.
Image personnalisée¶
L'image akko-spark est construite à partir de apache/spark:3.5.1-python3 avec
des JARs supplémentaires intégrés :
FROM apache/spark:3.5.1-python3
USER root
# JARs Iceberg + AWS
RUN wget -q -P /opt/spark/jars/ \
iceberg-spark-runtime-3.5_2.12-1.5.2.jar \
iceberg-aws-bundle-1.5.2.jar \
spark-connect_2.12-3.5.1.jar
# Agent OpenLineage + OpenMetadata
RUN wget -q -P /opt/spark/jars/ \
openlineage-spark_2.12-1.28.0.jar \
openmetadata-spark-agent.jar
USER spark
| JAR | Version | Fonction |
|---|---|---|
iceberg-spark-runtime |
1.5.2 | Support du format de table Iceberg |
iceberg-aws-bundle |
1.5.2 | Support du système de fichiers S3 |
spark-connect |
3.5.1 | Serveur gRPC Spark Connect |
openlineage-spark |
1.28.0 | Émission d'événements OpenLineage |
openmetadata-spark-agent |
1.0 | Transport de lignage vers OpenMetadata |
Les JARs doivent être dans l'image Docker
Les JARs Spark doivent être intégrés dans l'image Docker lors de la construction.
L'utilisation de --jars ou --packages au moment de l'exécution provoque des conflits
de ClassLoader et des échecs imprévisibles. Reconstruisez toujours l'image lors de l'ajout
ou de la mise à jour de JARs :
Spark Connect¶
Spark Connect est un protocole basé sur gRPC (introduit dans Spark 3.4) qui découple le client (PySpark dans les notebooks) du serveur (cluster Spark). Cela signifie :
- Aucun JAR nécessaire dans l'image notebook
- Client léger : seul le package pip
pysparkest requis - Exécution côté serveur avec API DataFrame côté client
Le serveur Spark Connect écoute sur le port 15002 (gRPC).
Connexion depuis les notebooks¶
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.remote("sc://akko-akko-spark-connect:15002") \
.getOrCreate()
# Interroger les tables Iceberg
df = spark.sql("SELECT * FROM iceberg.analytics.transactions")
df.show()
Variable d'environnement
La variable d'environnement SPARK_REMOTE est pré-définie dans les conteneurs
notebook à sc://akko-akko-spark-connect:15002. PySpark l'utilisera automatiquement si
aucun .remote() n'est spécifié :
Intégration Iceberg¶
Spark lit et écrit les tables Iceberg via le catalogue REST Polaris. La configuration du catalogue est passée via les propriétés Spark dans les valeurs Helm :
| Propriété | Valeur |
|---|---|
spark.sql.catalog.iceberg |
org.apache.iceberg.spark.SparkCatalog |
spark.sql.catalog.iceberg.type |
rest |
spark.sql.catalog.iceberg.uri |
http://akko-akko-polaris:8181/api/catalog |
spark.sql.catalog.iceberg.warehouse |
akko-warehouse |
spark.sql.catalog.iceberg.credential |
root:{POLARIS_ROOT_SECRET} |
spark.sql.catalog.iceberg.scope |
PRINCIPAL_ROLE:ALL |
Exemple : écriture de tables Iceberg¶
# Créer un namespace
spark.sql("CREATE NAMESPACE IF NOT EXISTS iceberg.analytics")
# Créer une table
spark.sql("""
CREATE TABLE IF NOT EXISTS iceberg.analytics.transactions (
id BIGINT,
customer_id INT,
amount DECIMAL(10,2),
transaction_date DATE
) USING iceberg
""")
# Insérer des données
spark.sql("""
INSERT INTO iceberg.analytics.transactions VALUES
(1, 101, 250.00, DATE '2026-01-15'),
(2, 102, 1500.00, DATE '2026-01-16')
""")
object storage comme backend de stockage S3¶
Les fichiers de données des tables Iceberg (Parquet) sont stockés dans object storage, le stockage objet compatible S3 d'AKKO. La configuration S3 est passée via les propriétés Spark :
| Propriété | Valeur |
|---|---|
spark.sql.catalog.iceberg.s3.endpoint |
http://akko-minio:9000 |
spark.sql.catalog.iceberg.s3.path-style-access |
true |
spark.sql.catalog.iceberg.s3.access-key-id |
{MINIO_ROOT_USER} |
spark.sql.catalog.iceberg.s3.secret-access-key |
{MINIO_ROOT_PASSWORD} |
OpenLineage¶
L'image Spark inclut le JAR du listener OpenLineage, qui émet des événements de lignage (début de job, complétion, échec) pour chaque job Spark. Ces événements capturent les jeux de données en entrée/sortie, permettant le suivi du lignage de données de bout en bout.
Résumé des services¶
| Service | Port | Protocole | Fonction |
|---|---|---|---|
spark-master |
8090 (Web UI), 7077 (cluster) | HTTP, Spark | Gestionnaire de cluster |
spark-worker |
-- | Spark | Exécuteur de tâches |
spark-connect |
15002 | gRPC | Serveur Spark Connect face au client |
Problèmes connus¶
Points d'attention importants
- Les JARs doivent être dans l'image Docker : N'utilisez jamais
--jarsou--packages. Les problèmes de ClassLoader provoqueront des échecs silencieux. Reconstruisez toujours l'image. -
SerializedLambdasur.collect(): L'appel à.collect()sur les tables de métadonnées Iceberg (par ex.snapshots,history) échoue avec une erreurSerializedLambdaen mode Spark Connect. Utilisez.show()à la place : -
Reconstruire après modification du Dockerfile : Après modification du Dockerfile, reconstruisez les images avec
helm/scripts/build-images.shet redéployez avechelm upgrade akko helm/akko/ -n akko. - Distribution d'identifiants Polaris : object storage ne supporte pas STS AssumeRole.
Le catalogue Polaris est configuré avec
stsUnavailable=truepour contourner la distribution d'identifiants et utiliser les identifiants S3 directs.