Aller au contenu

Apache Airflow

Orchestration de workflows et planification de pipelines.

URL https://orchestrator.akko.local
Authentification Keycloak SSO (realm akko)
Service Docker airflow

Aperçu

Apache Airflow est le moteur d'orchestration de la plateforme AKKO. Il planifie et supervise les pipelines de données sous forme de graphes acycliques dirigés (DAGs), offrant une interface visuelle pour suivre les exécutions, inspecter les journaux et gérer les dépendances entre tâches.

Airflow est pré-configuré avec :

  • Une connexion Trino pour les requêtes SQL contre le catalogue Iceberg
  • Une intégration Spark Connect pour les jobs ETL distribués
  • Le provider OpenLineage pour Airflow pour le suivi du lignage
  • Un DAG de démonstration (akko_e2e_pipeline) qui exerce le flux complet du lakehouse

DAG pré-configuré : akko_e2e_pipeline

La plateforme est livrée avec un DAG de démonstration de bout en bout qui illustre le pipeline lakehouse complet :

CSV (en mémoire) --> object storage (S3) --> Spark (ETL) --> table Iceberg --> Trino (vérification)

Étapes du pipeline

ID de tâche Description
upload_csv_to_minio Génère 12 lignes de ventes échantillon (produits, régions, prix) et téléverse un CSV vers s3://akko-warehouse/raw/sales/sales_sample.csv
spark_csv_to_iceberg Se connecte à Spark via Spark Connect (sc://spark-connect:15002), lit le CSV, ajoute une colonne revenue et écrit dans iceberg.demo.sales partitionné par région
verify_with_trino Interroge la table Iceberg via Trino pour vérifier le nombre de lignes, le revenu par région et les meilleurs produits
pipeline_summary Journalise un résumé avec le total de lignes ingérées, le revenu total et les régions couvertes

Exécuter le DAG

  1. Ouvrez https://orchestrator.akko.local
  2. Connectez-vous avec Keycloak (par ex. alice / mot de passe admin)
  3. Trouvez akko_e2e_pipeline dans la liste des DAGs
  4. Basculez le DAG sur Actif (reprendre)
  5. Cliquez sur Trigger DAG pour l'exécuter manuellement

Déclenchement manuel uniquement

Le DAG de démonstration a schedule=None -- il ne s'exécute pas sur une minuterie. Déclenchez-le manuellement depuis l'interface ou via la CLI Airflow.


Intégration OpenLineage

Airflow est configuré avec le provider OpenLineage, qui émet des événements de lignage pour chaque exécution de tâche. Cela suit quels jeux de données sont lus et écrits par chaque tâche, permettant une visibilité de lignage de bout en bout.

Actuellement, les événements de lignage sont envoyés au transport console (journalisés sur stdout). Ceci est utile pour le développement et le débogage.

# Les événements de lignage apparaissent dans les journaux du worker Airflow
kubectl logs -f deploy/akko-airflow -n akko | grep openlineage

À venir : transport HTTP

Une amélioration prévue consiste à passer du transport console au transport HTTP, envoyant les événements de lignage directement à OpenMetadata ou Marquez pour une visualisation centralisée du lignage.


Répertoire des DAGs

Les DAGs sont stockés dans le répertoire airflow/dags/ sur l'hôte et montés dans le conteneur Airflow :

airflow/
└── dags/
    └── akko_e2e_pipeline.py    # DAG de démonstration de bout en bout

Pour ajouter un nouveau DAG, créez simplement un fichier .py dans ce répertoire. Airflow détecte automatiquement les nouveaux fichiers DAG (intervalle de scan par défaut : 30 secondes).

Écrire un DAG personnalisé

Un DAG AKKO minimal qui interroge Trino :

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def query_trino(**kwargs):
    from trino.dbapi import connect
    conn = connect(host="trino", port=8080, user="airflow", catalog="iceberg")
    cursor = conn.cursor()
    cursor.execute("SELECT COUNT(*) FROM iceberg.analytics.customers")
    count = cursor.fetchone()[0]
    print(f"Customer count: {count}")

with DAG(
    dag_id="my_custom_dag",
    start_date=datetime(2025, 1, 1),
    schedule=None,
    catchup=False,
    tags=["akko", "custom"],
) as dag:
    PythonOperator(
        task_id="query_trino",
        python_callable=query_trino,
    )

Un DAG AKKO minimal qui utilise Spark Connect :

def spark_job(**kwargs):
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.remote("sc://spark-connect:15002").getOrCreate()
    try:
        df = spark.sql("SELECT * FROM iceberg.analytics.accounts LIMIT 10")
        df.show()
    finally:
        spark.stop()

Connexions

Airflow se connecte aux autres services AKKO en utilisant les noms d'hôtes internes Docker :

Service Hôte interne Port Protocole
Trino trino 8080 HTTP
Spark Connect spark-connect 15002 gRPC
object storage minio 9000 HTTP (S3)
PostgreSQL postgres 5432 TCP

Identifiants object storage

Le DAG de démonstration lit les identifiants object storage depuis les variables d'environnement MINIO_ROOT_USER et MINIO_ROOT_PASSWORD, qui sont injectées depuis les Secrets Kubernetes.


Authentification

Airflow utilise Keycloak SSO pour l'authentification de l'interface web. Les utilisateurs se connectent via le realm AKKO et sont associés aux rôles Airflow.


Problèmes connus

L'utilisateur Airflow doit être dans le groupe Trino akko-admin

L'utilisateur airflow qui se connecte à Trino doit disposer de privilèges suffisants. Dans la configuration RBAC de Trino d'AKKO, l'utilisateur airflow doit être dans le groupe akko-admin pour effectuer des opérations DDL (CREATE TABLE, DROP TABLE) lors de l'exécution du pipeline. Les DAGs en lecture seule peuvent utiliser des groupes à privilèges réduits.

Les métriques Prometheus ne sont pas disponibles nativement

Le point d'accès /health d'Airflow renvoie du JSON, pas du format Prometheus. Pour collecter les métriques Airflow dans Prometheus, déployez un sidecar statsd_exporter ou installez le package apache-airflow-providers-statsd. Consultez la page Supervision pour plus de détails.

Délai de sérialisation des DAGs

Après la création d'un nouveau fichier DAG, Airflow peut mettre jusqu'à 30 secondes pour le détecter et l'analyser. Vérifiez les journaux du scheduler si un nouveau DAG n'apparaît pas.