Data Engineer Starter Kit¶
This guide covers the core data engineering workflows on AKKO: building Iceberg tables with Spark, querying with Trino, orchestrating with Airflow, and governing with OpenMetadata.
Data Pipeline Architecture¶
AKKO provides a complete Medallion architecture pipeline:
Sources --> Spark (Bronze) --> Spark (Silver) --> Spark (Gold) --> Trino --> Superset
| | |
v v v
Iceberg Iceberg Iceberg
(MinIO) (MinIO) (MinIO)
| | |
+-------- Polaris REST Catalog ---------+
|
OpenMetadata
(lineage + quality)
Key components:
- Spark Connect -- distributed compute engine for ETL
- Apache Iceberg -- open table format (ACID, time-travel, schema evolution)
- Apache Polaris -- REST catalog for Iceberg tables
- MinIO -- S3-compatible object storage (data lake)
- Trino -- federated SQL query engine
- Airflow -- pipeline orchestration and scheduling
Creating Iceberg Tables via Spark Connect¶
Connect to Spark from a JupyterHub notebook:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.remote("sc://spark-connect:15002") \
.getOrCreate()
# Create a namespace
spark.sql("CREATE NAMESPACE IF NOT EXISTS polaris.analytics")
# Create an Iceberg table
spark.sql("""
CREATE TABLE IF NOT EXISTS polaris.analytics.events (
event_id BIGINT,
event_type STRING,
user_id STRING,
payload STRING,
event_timestamp TIMESTAMP,
processing_date DATE
)
USING iceberg
PARTITIONED BY (processing_date)
""")
# Insert data
spark.sql("""
INSERT INTO polaris.analytics.events VALUES
(1, 'login', 'user_001', '{"ip": "10.0.0.1"}', TIMESTAMP '2026-03-13 08:30:00', DATE '2026-03-13'),
(2, 'purchase', 'user_002', '{"amount": 99.99}', TIMESTAMP '2026-03-13 09:15:00', DATE '2026-03-13')
""")
# Query the table
spark.sql("SELECT * FROM polaris.analytics.events").show()
Iceberg Features¶
# Time travel -- query historical snapshots
spark.sql("SELECT * FROM polaris.analytics.events VERSION AS OF 1").show()
# Schema evolution -- add columns without rewriting data
spark.sql("ALTER TABLE polaris.analytics.events ADD COLUMN region STRING")
# Snapshot management
spark.sql("SELECT * FROM polaris.analytics.events.snapshots").show()
Use .show() instead of .collect()
In Spark Connect mode, .collect() on Iceberg metadata tables may throw a SerializedLambda error. Use .show() instead.
Querying via Trino¶
Trino provides federated SQL across Iceberg tables and PostgreSQL.
Iceberg Queries¶
-- Query Iceberg tables through the Polaris catalog
SELECT event_type, COUNT(*) as cnt
FROM iceberg.analytics.events
GROUP BY event_type;
-- Join Iceberg + PostgreSQL data
SELECT e.event_type, e.user_id, u.full_name
FROM iceberg.analytics.events e
JOIN postgresql.public.users u ON e.user_id = u.user_id;
Available Catalogs in Trino¶
| Catalog | Backend | Use Case |
|---|---|---|
iceberg |
Polaris REST catalog (MinIO) | Lakehouse tables |
postgresql |
akko-postgresql-data | Operational/transactional data |
system |
Trino internal | Cluster metadata |
Connecting from Python¶
from trino.dbapi import connect
conn = connect(host="trino", port=8080, user="alice",
catalog="iceberg", schema="analytics")
cursor = conn.cursor()
cursor.execute("SELECT * FROM events LIMIT 100")
for row in cursor.fetchall():
print(row)
Scheduling with Airflow DAGs¶
Airflow orchestrates your data pipelines. DAGs are stored in airflow/dags/.
Example DAG Structure¶
from airflow.decorators import dag, task
from datetime import datetime
@dag(
schedule="@daily",
start_date=datetime(2026, 1, 1),
catchup=False,
tags=["analytics", "iceberg"]
)
def analytics_pipeline():
@task
def extract():
"""Extract data from source systems."""
# Spark Connect job
pass
@task
def transform(raw_data):
"""Apply business logic (Bronze -> Silver -> Gold)."""
pass
@task
def quality_check():
"""Run data quality assertions via OpenMetadata."""
pass
@task
def publish():
"""Update Trino views and Superset datasets."""
pass
raw = extract()
clean = transform(raw)
quality_check() >> publish()
analytics_pipeline()
Pre-built DAG¶
The akko_e2e_pipeline.py DAG demonstrates the full flow: ingest CSV into Iceberg via Spark, run quality checks, and refresh Superset datasets.
Monitoring DAGs¶
- Access the Airflow UI at
https://airflow.<domain> - Check DAG run status, task logs, and execution graphs
- Failed tasks trigger Alertmanager notifications (if configured)
Data Governance with OpenMetadata¶
OpenMetadata provides a centralized data catalog with lineage, quality, and glossary management.
Automatic Ingestion¶
AKKO pre-configures ingestion pipelines for:
- Trino -- discovers all schemas and tables
- Airflow -- captures DAG metadata and lineage
- Superset -- indexes dashboards and charts
Data Lineage¶
OpenMetadata automatically traces data flow:
Data Quality¶
Define quality tests in OpenMetadata:
- Column null percentage < 5%
- Unique constraint on
event_id - Freshness: table updated in the last 24 hours
- Custom SQL assertions
Enriching the Catalog¶
Use the enrichment scripts to add business context:
# V1: tags, glossary, owners, descriptions
python openmetadata/enrich_catalog.py
# V2: domains, data products, dashboards, pipelines, quality, lineage
python openmetadata/enrich_catalog_v2.py
Glossary & Tags¶
- Create business glossary terms (e.g., "PII", "Revenue", "Customer ID")
- Tag tables and columns for classification and compliance
- Assign data owners per dataset for accountability
Access OpenMetadata at https://openmetadata.<domain>.