Kit de Démarrage Ingénieur Données¶
Ce guide couvre les workflows essentiels d'ingénierie de données sur AKKO : construire des tables Iceberg avec Spark, interroger avec Trino, orchestrer avec Airflow et gouverner avec OpenMetadata.
Architecture du Pipeline de Données¶
AKKO fournit un pipeline d'architecture Medallion complet :
Sources --> Spark (Bronze) --> Spark (Silver) --> Spark (Gold) --> Trino --> Superset
| | |
v v v
Iceberg Iceberg Iceberg
(MinIO) (MinIO) (MinIO)
| | |
+-------- Catalogue REST Polaris -------+
|
OpenMetadata
(lignage + qualité)
Composants clés :
- Spark Connect -- moteur de calcul distribué pour l'ETL
- Apache Iceberg -- format de table ouvert (ACID, voyage dans le temps, évolution de schéma)
- Apache Polaris -- catalogue REST pour les tables Iceberg
- MinIO -- stockage objets compatible S3 (data lake)
- Trino -- moteur de requêtes SQL fédéré
- Airflow -- orchestration et planification de pipelines
Création de Tables Iceberg via Spark Connect¶
Connectez-vous à Spark depuis un notebook JupyterHub :
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.remote("sc://spark-connect:15002") \
.getOrCreate()
# Créer un namespace
spark.sql("CREATE NAMESPACE IF NOT EXISTS polaris.analytics")
# Créer une table Iceberg
spark.sql("""
CREATE TABLE IF NOT EXISTS polaris.analytics.events (
event_id BIGINT,
event_type STRING,
user_id STRING,
payload STRING,
event_timestamp TIMESTAMP,
processing_date DATE
)
USING iceberg
PARTITIONED BY (processing_date)
""")
# Insérer des données
spark.sql("""
INSERT INTO polaris.analytics.events VALUES
(1, 'login', 'user_001', '{"ip": "10.0.0.1"}', TIMESTAMP '2026-03-13 08:30:00', DATE '2026-03-13'),
(2, 'purchase', 'user_002', '{"amount": 99.99}', TIMESTAMP '2026-03-13 09:15:00', DATE '2026-03-13')
""")
# Interroger la table
spark.sql("SELECT * FROM polaris.analytics.events").show()
Fonctionnalités Iceberg¶
# Voyage dans le temps -- interroger les snapshots historiques
spark.sql("SELECT * FROM polaris.analytics.events VERSION AS OF 1").show()
# Évolution de schéma -- ajouter des colonnes sans réécrire les données
spark.sql("ALTER TABLE polaris.analytics.events ADD COLUMN region STRING")
# Gestion des snapshots
spark.sql("SELECT * FROM polaris.analytics.events.snapshots").show()
Utilisez .show() au lieu de .collect()
En mode Spark Connect, .collect() sur les tables de métadonnées Iceberg peut lever une erreur SerializedLambda. Utilisez .show() à la place.
Requêtes via Trino¶
Trino fournit du SQL fédéré à travers les tables Iceberg et PostgreSQL.
Requêtes Iceberg¶
-- Interroger les tables Iceberg via le catalogue Polaris
SELECT event_type, COUNT(*) as cnt
FROM iceberg.analytics.events
GROUP BY event_type;
-- Joindre des données Iceberg + PostgreSQL
SELECT e.event_type, e.user_id, u.full_name
FROM iceberg.analytics.events e
JOIN postgresql.public.users u ON e.user_id = u.user_id;
Catalogues Disponibles dans Trino¶
| Catalogue | Backend | Cas d'Usage |
|---|---|---|
iceberg |
Catalogue REST Polaris (MinIO) | Tables du lakehouse |
postgresql |
akko-postgresql-data | Données opérationnelles/transactionnelles |
system |
Trino interne | Métadonnées du cluster |
Connexion depuis Python¶
from trino.dbapi import connect
conn = connect(host="trino", port=8080, user="alice",
catalog="iceberg", schema="analytics")
cursor = conn.cursor()
cursor.execute("SELECT * FROM events LIMIT 100")
for row in cursor.fetchall():
print(row)
Planification avec les DAGs Airflow¶
Airflow orchestre vos pipelines de données. Les DAGs sont stockés dans airflow/dags/.
Exemple de Structure de DAG¶
from airflow.decorators import dag, task
from datetime import datetime
@dag(
schedule="@daily",
start_date=datetime(2026, 1, 1),
catchup=False,
tags=["analytics", "iceberg"]
)
def analytics_pipeline():
@task
def extract():
"""Extraire les données des systèmes sources."""
pass
@task
def transform(raw_data):
"""Appliquer la logique métier (Bronze -> Silver -> Gold)."""
pass
@task
def quality_check():
"""Exécuter les assertions de qualité via OpenMetadata."""
pass
@task
def publish():
"""Mettre à jour les vues Trino et les datasets Superset."""
pass
raw = extract()
clean = transform(raw)
quality_check() >> publish()
analytics_pipeline()
DAG Pré-construit¶
Le DAG akko_e2e_pipeline.py démontre le flux complet : ingestion CSV dans Iceberg via Spark, contrôles de qualité et rafraîchissement des datasets Superset.
Supervision des DAGs¶
- Accédez à l'interface Airflow via
https://airflow.<domaine> - Vérifiez le statut des exécutions, les logs des tâches et les graphes d'exécution
- Les tâches en échec déclenchent des notifications Alertmanager (si configuré)
Gouvernance des Données avec OpenMetadata¶
OpenMetadata fournit un catalogue de données centralisé avec lignage, qualité et gestion de glossaire.
Ingestion Automatique¶
AKKO pré-configure des pipelines d'ingestion pour :
- Trino -- découvre tous les schémas et tables
- Airflow -- capture les métadonnées des DAGs et le lignage
- Superset -- indexe les dashboards et graphiques
Lignage des Données¶
OpenMetadata trace automatiquement le flux de données :
Qualité des Données¶
Définissez des tests de qualité dans OpenMetadata :
- Pourcentage de valeurs nulles par colonne < 5%
- Contrainte d'unicité sur
event_id - Fraîcheur : table mise à jour dans les dernières 24 heures
- Assertions SQL personnalisées
Enrichir le Catalogue¶
Utilisez les scripts d'enrichissement pour ajouter du contexte métier :
# V1 : tags, glossaire, propriétaires, descriptions
python openmetadata/enrich_catalog.py
# V2 : domaines, produits de données, dashboards, pipelines, qualité, lignage
python openmetadata/enrich_catalog_v2.py
Glossaire & Tags¶
- Créez des termes de glossaire métier (ex: "PII", "Revenu", "ID Client")
- Taguez les tables et colonnes pour la classification et la conformité
- Assignez des propriétaires de données par dataset pour la responsabilisation
Accédez à OpenMetadata via https://openmetadata.<domaine>.