Skip to content

Apache Airflow

Workflow orchestration and pipeline scheduling.

URL https://orchestrator.akko.local
Authentication Keycloak SSO (akko realm)
Helm sub-chart airflow (Apache community chart)

Overview

Apache Airflow is the orchestration engine of the AKKO platform. It schedules and monitors data pipelines as Directed Acyclic Graphs (DAGs), providing a visual UI for tracking runs, inspecting logs, and managing task dependencies.

Airflow is pre-configured with:

  • A Trino connection for SQL queries against the Iceberg catalog
  • A Spark Connect integration for distributed ETL jobs
  • The OpenLineage Airflow provider for lineage tracking
  • A demo DAG (akko_e2e_pipeline) that exercises the full lakehouse flow

Pre-Configured DAG: akko_e2e_pipeline

The platform ships with an end-to-end demo DAG that demonstrates the complete lakehouse pipeline:

CSV (in-memory) --> object storage (S3) --> Spark (ETL) --> Iceberg table --> Trino (verify)

Pipeline Steps

Task ID Description
upload_csv_to_minio Generates 12 sample sales rows (products, regions, prices) and uploads a CSV to s3://akko-warehouse/raw/sales/sales_sample.csv
spark_csv_to_iceberg Connects to Spark via Spark Connect (sc://spark-connect:15002), reads the CSV, adds a revenue column, and writes to iceberg.demo.sales partitioned by region
verify_with_trino Queries the Iceberg table through Trino to verify row counts, revenue by region, and top products
pipeline_summary Logs a summary with total rows ingested, total revenue, and regions covered

Running the DAG

  1. Open https://orchestrator.akko.local
  2. Log in with Keycloak (e.g., alice / admin password)
  3. Find akko_e2e_pipeline in the DAG list
  4. Toggle the DAG to Active (unpause)
  5. Click Trigger DAG to run it manually

Manual trigger only

The demo DAG has schedule=None -- it does not run on a timer. Trigger it manually from the UI or via the Airflow CLI.


OpenLineage Integration

Airflow is configured with the OpenLineage provider, which emits lineage events for every task execution. This tracks which datasets are read and written by each task, enabling end-to-end lineage visibility.

Currently, lineage events are sent to the console transport (logged to stdout). This is useful for development and debugging.

# Lineage events appear in the Airflow worker logs
kubectl logs -f deploy/akko-airflow -n akko | grep openlineage

Future: HTTP transport

A planned improvement is to switch from console transport to HTTP transport, sending lineage events directly to OpenMetadata or Marquez for centralized lineage visualization.


DAGs Directory

DAGs are stored in the airflow/dags/ directory on the host and mounted into the Airflow container:

airflow/
└── dags/
    └── akko_e2e_pipeline.py    # End-to-end demo DAG

To add a new DAG, simply create a .py file in this directory. Airflow detects new DAG files automatically (default scan interval: 30 seconds).

Writing a Custom DAG

A minimal AKKO DAG that queries Trino:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def query_trino(**kwargs):
    from trino.dbapi import connect
    conn = connect(host="akko-trino", port=8080, user="airflow", catalog="iceberg")
    cursor = conn.cursor()
    cursor.execute("SELECT COUNT(*) FROM iceberg.analytics.customers")
    count = cursor.fetchone()[0]
    print(f"Customer count: {count}")

with DAG(
    dag_id="my_custom_dag",
    start_date=datetime(2025, 1, 1),
    schedule=None,
    catchup=False,
    tags=["akko", "custom"],
) as dag:
    PythonOperator(
        task_id="query_trino",
        python_callable=query_trino,
    )

A minimal AKKO DAG that uses Spark Connect:

def spark_job(**kwargs):
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.remote("sc://akko-akko-spark-connect:15002").getOrCreate()
    try:
        df = spark.sql("SELECT * FROM iceberg.analytics.accounts LIMIT 10")
        df.show()
    finally:
        spark.stop()

Connections

Airflow connects to other AKKO services using Docker internal hostnames:

Service Internal Host Port Protocol
Trino akko-trino 8080 HTTP
Spark Connect akko-akko-spark-connect 15002 gRPC
object storage akko-minio 9000 HTTP (S3)
PostgreSQL akko-postgresql 5432 TCP

object storage credentials

The demo DAG reads object storage credentials from environment variables MINIO_ROOT_USER and MINIO_ROOT_PASSWORD, which are injected from Kubernetes Secrets.


Authentication

Airflow uses Keycloak SSO for web UI authentication. Users log in through the AKKO realm and are mapped to Airflow roles.


Known Issues

Airflow user must be in Trino akko-admin group

The airflow user that connects to Trino must have sufficient privileges. In AKKO's Trino RBAC configuration, the airflow user needs to be in the akko-admin group to perform DDL operations (CREATE TABLE, DROP TABLE) during pipeline execution. Read-only DAGs can use lower-privilege groups.

Prometheus metrics not available natively

Airflow's /health endpoint returns JSON, not Prometheus format. To collect Airflow metrics in Prometheus, deploy a statsd_exporter sidecar or install the apache-airflow-providers-statsd package. See the Monitoring page for details.

DAG serialization delay

After creating a new DAG file, it may take up to 30 seconds for Airflow to detect and parse it. Check the scheduler logs if a new DAG does not appear.