Apache Airflow¶
Workflow orchestration and pipeline scheduling.
| URL | https://orchestrator.akko.local |
| Authentication | Keycloak SSO (akko realm) |
| Helm sub-chart | airflow (Apache community chart) |
Overview¶
Apache Airflow is the orchestration engine of the AKKO platform. It schedules and monitors data pipelines as Directed Acyclic Graphs (DAGs), providing a visual UI for tracking runs, inspecting logs, and managing task dependencies.
Airflow is pre-configured with:
- A Trino connection for SQL queries against the Iceberg catalog
- A Spark Connect integration for distributed ETL jobs
- The OpenLineage Airflow provider for lineage tracking
- A demo DAG (
akko_e2e_pipeline) that exercises the full lakehouse flow
Pre-Configured DAG: akko_e2e_pipeline¶
The platform ships with an end-to-end demo DAG that demonstrates the complete lakehouse pipeline:
Pipeline Steps¶
| Task ID | Description |
|---|---|
upload_csv_to_minio |
Generates 12 sample sales rows (products, regions, prices) and uploads a CSV to s3://akko-warehouse/raw/sales/sales_sample.csv |
spark_csv_to_iceberg |
Connects to Spark via Spark Connect (sc://spark-connect:15002), reads the CSV, adds a revenue column, and writes to iceberg.demo.sales partitioned by region |
verify_with_trino |
Queries the Iceberg table through Trino to verify row counts, revenue by region, and top products |
pipeline_summary |
Logs a summary with total rows ingested, total revenue, and regions covered |
Running the DAG¶
- Open
https://orchestrator.akko.local - Log in with Keycloak (e.g.,
alice/ admin password) - Find
akko_e2e_pipelinein the DAG list - Toggle the DAG to Active (unpause)
- Click Trigger DAG to run it manually
Manual trigger only
The demo DAG has schedule=None -- it does not run on a timer. Trigger it manually from the UI or via the Airflow CLI.
OpenLineage Integration¶
Airflow is configured with the OpenLineage provider, which emits lineage events for every task execution. This tracks which datasets are read and written by each task, enabling end-to-end lineage visibility.
Currently, lineage events are sent to the console transport (logged to stdout). This is useful for development and debugging.
# Lineage events appear in the Airflow worker logs
kubectl logs -f deploy/akko-airflow -n akko | grep openlineage
Future: HTTP transport
A planned improvement is to switch from console transport to HTTP transport, sending lineage events directly to OpenMetadata or Marquez for centralized lineage visualization.
DAGs Directory¶
DAGs are stored in the airflow/dags/ directory on the host and mounted into the Airflow container:
To add a new DAG, simply create a .py file in this directory. Airflow detects new DAG files automatically (default scan interval: 30 seconds).
Writing a Custom DAG¶
A minimal AKKO DAG that queries Trino:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def query_trino(**kwargs):
from trino.dbapi import connect
conn = connect(host="akko-trino", port=8080, user="airflow", catalog="iceberg")
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM iceberg.analytics.customers")
count = cursor.fetchone()[0]
print(f"Customer count: {count}")
with DAG(
dag_id="my_custom_dag",
start_date=datetime(2025, 1, 1),
schedule=None,
catchup=False,
tags=["akko", "custom"],
) as dag:
PythonOperator(
task_id="query_trino",
python_callable=query_trino,
)
A minimal AKKO DAG that uses Spark Connect:
def spark_job(**kwargs):
from pyspark.sql import SparkSession
spark = SparkSession.builder.remote("sc://akko-akko-spark-connect:15002").getOrCreate()
try:
df = spark.sql("SELECT * FROM iceberg.analytics.accounts LIMIT 10")
df.show()
finally:
spark.stop()
Connections¶
Airflow connects to other AKKO services using Docker internal hostnames:
| Service | Internal Host | Port | Protocol |
|---|---|---|---|
| Trino | akko-trino |
8080 |
HTTP |
| Spark Connect | akko-akko-spark-connect |
15002 |
gRPC |
| object storage | akko-minio |
9000 |
HTTP (S3) |
| PostgreSQL | akko-postgresql |
5432 |
TCP |
object storage credentials
The demo DAG reads object storage credentials from environment variables MINIO_ROOT_USER and MINIO_ROOT_PASSWORD, which are injected from Kubernetes Secrets.
Authentication¶
Airflow uses Keycloak SSO for web UI authentication. Users log in through the AKKO realm and are mapped to Airflow roles.
Known Issues¶
Airflow user must be in Trino akko-admin group
The airflow user that connects to Trino must have sufficient privileges. In AKKO's Trino RBAC configuration, the airflow user needs to be in the akko-admin group to perform DDL operations (CREATE TABLE, DROP TABLE) during pipeline execution. Read-only DAGs can use lower-privilege groups.
Prometheus metrics not available natively
Airflow's /health endpoint returns JSON, not Prometheus format. To collect Airflow metrics in Prometheus, deploy a statsd_exporter sidecar or install the apache-airflow-providers-statsd package. See the Monitoring page for details.
DAG serialization delay
After creating a new DAG file, it may take up to 30 seconds for Airflow to detect and parse it. Check the scheduler logs if a new DAG does not appear.