Apache Airflow¶
Orchestration de workflows et planification de pipelines.
| URL | https://orchestrator.akko.local |
| Authentification | Keycloak SSO (realm akko) |
| Service Docker | airflow |
Aperçu¶
Apache Airflow est le moteur d'orchestration de la plateforme AKKO. Il planifie et supervise les pipelines de données sous forme de graphes acycliques dirigés (DAGs), offrant une interface visuelle pour suivre les exécutions, inspecter les journaux et gérer les dépendances entre tâches.
Airflow est pré-configuré avec :
- Une connexion Trino pour les requêtes SQL contre le catalogue Iceberg
- Une intégration Spark Connect pour les jobs ETL distribués
- Le provider OpenLineage pour Airflow pour le suivi du lignage
- Un DAG de démonstration (
akko_e2e_pipeline) qui exerce le flux complet du lakehouse
DAG pré-configuré : akko_e2e_pipeline¶
La plateforme est livrée avec un DAG de démonstration de bout en bout qui illustre le pipeline lakehouse complet :
Étapes du pipeline¶
| ID de tâche | Description |
|---|---|
upload_csv_to_minio |
Génère 12 lignes de ventes échantillon (produits, régions, prix) et téléverse un CSV vers s3://akko-warehouse/raw/sales/sales_sample.csv |
spark_csv_to_iceberg |
Se connecte à Spark via Spark Connect (sc://spark-connect:15002), lit le CSV, ajoute une colonne revenue et écrit dans iceberg.demo.sales partitionné par région |
verify_with_trino |
Interroge la table Iceberg via Trino pour vérifier le nombre de lignes, le revenu par région et les meilleurs produits |
pipeline_summary |
Journalise un résumé avec le total de lignes ingérées, le revenu total et les régions couvertes |
Exécuter le DAG¶
- Ouvrez
https://orchestrator.akko.local - Connectez-vous avec Keycloak (par ex.
alice/ mot de passe admin) - Trouvez
akko_e2e_pipelinedans la liste des DAGs - Basculez le DAG sur Actif (reprendre)
- Cliquez sur Trigger DAG pour l'exécuter manuellement
Déclenchement manuel uniquement
Le DAG de démonstration a schedule=None -- il ne s'exécute pas sur une minuterie. Déclenchez-le manuellement depuis l'interface ou via la CLI Airflow.
Intégration OpenLineage¶
Airflow est configuré avec le provider OpenLineage, qui émet des événements de lignage pour chaque exécution de tâche. Cela suit quels jeux de données sont lus et écrits par chaque tâche, permettant une visibilité de lignage de bout en bout.
Actuellement, les événements de lignage sont envoyés au transport console (journalisés sur stdout). Ceci est utile pour le développement et le débogage.
# Les événements de lignage apparaissent dans les journaux du worker Airflow
kubectl logs -f deploy/akko-airflow -n akko | grep openlineage
À venir : transport HTTP
Une amélioration prévue consiste à passer du transport console au transport HTTP, envoyant les événements de lignage directement à OpenMetadata ou Marquez pour une visualisation centralisée du lignage.
Répertoire des DAGs¶
Les DAGs sont stockés dans le répertoire airflow/dags/ sur l'hôte et montés dans le conteneur Airflow :
Pour ajouter un nouveau DAG, créez simplement un fichier .py dans ce répertoire. Airflow détecte automatiquement les nouveaux fichiers DAG (intervalle de scan par défaut : 30 secondes).
Écrire un DAG personnalisé¶
Un DAG AKKO minimal qui interroge Trino :
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def query_trino(**kwargs):
from trino.dbapi import connect
conn = connect(host="trino", port=8080, user="airflow", catalog="iceberg")
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM iceberg.analytics.customers")
count = cursor.fetchone()[0]
print(f"Customer count: {count}")
with DAG(
dag_id="my_custom_dag",
start_date=datetime(2025, 1, 1),
schedule=None,
catchup=False,
tags=["akko", "custom"],
) as dag:
PythonOperator(
task_id="query_trino",
python_callable=query_trino,
)
Un DAG AKKO minimal qui utilise Spark Connect :
def spark_job(**kwargs):
from pyspark.sql import SparkSession
spark = SparkSession.builder.remote("sc://spark-connect:15002").getOrCreate()
try:
df = spark.sql("SELECT * FROM iceberg.analytics.accounts LIMIT 10")
df.show()
finally:
spark.stop()
Connexions¶
Airflow se connecte aux autres services AKKO en utilisant les noms d'hôtes internes Docker :
| Service | Hôte interne | Port | Protocole |
|---|---|---|---|
| Trino | trino |
8080 |
HTTP |
| Spark Connect | spark-connect |
15002 |
gRPC |
| object storage | minio |
9000 |
HTTP (S3) |
| PostgreSQL | postgres |
5432 |
TCP |
Identifiants object storage
Le DAG de démonstration lit les identifiants object storage depuis les variables d'environnement MINIO_ROOT_USER et MINIO_ROOT_PASSWORD, qui sont injectées depuis les Secrets Kubernetes.
Authentification¶
Airflow utilise Keycloak SSO pour l'authentification de l'interface web. Les utilisateurs se connectent via le realm AKKO et sont associés aux rôles Airflow.
Problèmes connus¶
L'utilisateur Airflow doit être dans le groupe Trino akko-admin
L'utilisateur airflow qui se connecte à Trino doit disposer de privilèges suffisants. Dans la configuration RBAC de Trino d'AKKO, l'utilisateur airflow doit être dans le groupe akko-admin pour effectuer des opérations DDL (CREATE TABLE, DROP TABLE) lors de l'exécution du pipeline. Les DAGs en lecture seule peuvent utiliser des groupes à privilèges réduits.
Les métriques Prometheus ne sont pas disponibles nativement
Le point d'accès /health d'Airflow renvoie du JSON, pas du format Prometheus. Pour collecter les métriques Airflow dans Prometheus, déployez un sidecar statsd_exporter ou installez le package apache-airflow-providers-statsd. Consultez la page Supervision pour plus de détails.
Délai de sérialisation des DAGs
Après la création d'un nouveau fichier DAG, Airflow peut mettre jusqu'à 30 secondes pour le détecter et l'analyser. Vérifiez les journaux du scheduler si un nouveau DAG n'apparaît pas.