Aller au contenu

Kit de Démarrage Ingénieur Données

Ce guide couvre les workflows essentiels d'ingénierie de données sur AKKO : construire des tables Iceberg avec Spark, interroger avec Trino, orchestrer avec Airflow et gouverner avec OpenMetadata.


Architecture du Pipeline de Données

AKKO fournit un pipeline d'architecture Medallion complet :

Sources --> Spark (Bronze) --> Spark (Silver) --> Spark (Gold) --> Trino --> Superset
              |                    |                  |
              v                    v                  v
           Iceberg             Iceberg            Iceberg
           (MinIO)             (MinIO)            (MinIO)
              |                    |                  |
              +-------- Catalogue REST Polaris -------+
                                   |
                              OpenMetadata
                           (lignage + qualité)

Composants clés :

  • Spark Connect -- moteur de calcul distribué pour l'ETL
  • Apache Iceberg -- format de table ouvert (ACID, voyage dans le temps, évolution de schéma)
  • Apache Polaris -- catalogue REST pour les tables Iceberg
  • MinIO -- stockage objets compatible S3 (data lake)
  • Trino -- moteur de requêtes SQL fédéré
  • Airflow -- orchestration et planification de pipelines

Création de Tables Iceberg via Spark Connect

Connectez-vous à Spark depuis un notebook JupyterHub :

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .remote("sc://spark-connect:15002") \
    .getOrCreate()

# Créer un namespace
spark.sql("CREATE NAMESPACE IF NOT EXISTS polaris.analytics")

# Créer une table Iceberg
spark.sql("""
    CREATE TABLE IF NOT EXISTS polaris.analytics.events (
        event_id BIGINT,
        event_type STRING,
        user_id STRING,
        payload STRING,
        event_timestamp TIMESTAMP,
        processing_date DATE
    )
    USING iceberg
    PARTITIONED BY (processing_date)
""")

# Insérer des données
spark.sql("""
    INSERT INTO polaris.analytics.events VALUES
    (1, 'login', 'user_001', '{"ip": "10.0.0.1"}', TIMESTAMP '2026-03-13 08:30:00', DATE '2026-03-13'),
    (2, 'purchase', 'user_002', '{"amount": 99.99}', TIMESTAMP '2026-03-13 09:15:00', DATE '2026-03-13')
""")

# Interroger la table
spark.sql("SELECT * FROM polaris.analytics.events").show()

Fonctionnalités Iceberg

# Voyage dans le temps -- interroger les snapshots historiques
spark.sql("SELECT * FROM polaris.analytics.events VERSION AS OF 1").show()

# Évolution de schéma -- ajouter des colonnes sans réécrire les données
spark.sql("ALTER TABLE polaris.analytics.events ADD COLUMN region STRING")

# Gestion des snapshots
spark.sql("SELECT * FROM polaris.analytics.events.snapshots").show()

Utilisez .show() au lieu de .collect()

En mode Spark Connect, .collect() sur les tables de métadonnées Iceberg peut lever une erreur SerializedLambda. Utilisez .show() à la place.


Requêtes via Trino

Trino fournit du SQL fédéré à travers les tables Iceberg et PostgreSQL.

Requêtes Iceberg

-- Interroger les tables Iceberg via le catalogue Polaris
SELECT event_type, COUNT(*) as cnt
FROM iceberg.analytics.events
GROUP BY event_type;

-- Joindre des données Iceberg + PostgreSQL
SELECT e.event_type, e.user_id, u.full_name
FROM iceberg.analytics.events e
JOIN postgresql.public.users u ON e.user_id = u.user_id;

Catalogues Disponibles dans Trino

Catalogue Backend Cas d'Usage
iceberg Catalogue REST Polaris (MinIO) Tables du lakehouse
postgresql akko-postgresql-data Données opérationnelles/transactionnelles
system Trino interne Métadonnées du cluster

Connexion depuis Python

from trino.dbapi import connect

conn = connect(host="trino", port=8080, user="alice",
               catalog="iceberg", schema="analytics")
cursor = conn.cursor()
cursor.execute("SELECT * FROM events LIMIT 100")
for row in cursor.fetchall():
    print(row)

Planification avec les DAGs Airflow

Airflow orchestre vos pipelines de données. Les DAGs sont stockés dans airflow/dags/.

Exemple de Structure de DAG

from airflow.decorators import dag, task
from datetime import datetime

@dag(
    schedule="@daily",
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=["analytics", "iceberg"]
)
def analytics_pipeline():

    @task
    def extract():
        """Extraire les données des systèmes sources."""
        pass

    @task
    def transform(raw_data):
        """Appliquer la logique métier (Bronze -> Silver -> Gold)."""
        pass

    @task
    def quality_check():
        """Exécuter les assertions de qualité via OpenMetadata."""
        pass

    @task
    def publish():
        """Mettre à jour les vues Trino et les datasets Superset."""
        pass

    raw = extract()
    clean = transform(raw)
    quality_check() >> publish()

analytics_pipeline()

DAG Pré-construit

Le DAG akko_e2e_pipeline.py démontre le flux complet : ingestion CSV dans Iceberg via Spark, contrôles de qualité et rafraîchissement des datasets Superset.

Supervision des DAGs

  • Accédez à l'interface Airflow via https://airflow.<domaine>
  • Vérifiez le statut des exécutions, les logs des tâches et les graphes d'exécution
  • Les tâches en échec déclenchent des notifications Alertmanager (si configuré)

Gouvernance des Données avec OpenMetadata

OpenMetadata fournit un catalogue de données centralisé avec lignage, qualité et gestion de glossaire.

Ingestion Automatique

AKKO pré-configure des pipelines d'ingestion pour :

  • Trino -- découvre tous les schémas et tables
  • Airflow -- capture les métadonnées des DAGs et le lignage
  • Superset -- indexe les dashboards et graphiques

Lignage des Données

OpenMetadata trace automatiquement le flux de données :

Job Spark --> Table Iceberg --> Vue Trino --> Dashboard Superset

Qualité des Données

Définissez des tests de qualité dans OpenMetadata :

  • Pourcentage de valeurs nulles par colonne < 5%
  • Contrainte d'unicité sur event_id
  • Fraîcheur : table mise à jour dans les dernières 24 heures
  • Assertions SQL personnalisées

Enrichir le Catalogue

Utilisez les scripts d'enrichissement pour ajouter du contexte métier :

# V1 : tags, glossaire, propriétaires, descriptions
python openmetadata/enrich_catalog.py

# V2 : domaines, produits de données, dashboards, pipelines, qualité, lignage
python openmetadata/enrich_catalog_v2.py

Glossaire & Tags

  • Créez des termes de glossaire métier (ex: "PII", "Revenu", "ID Client")
  • Taguez les tables et colonnes pour la classification et la conformité
  • Assignez des propriétaires de données par dataset pour la responsabilisation

Accédez à OpenMetadata via https://openmetadata.<domaine>.