Skip to content

Data Engineer Starter Kit

This guide covers the core data engineering workflows on AKKO: building Iceberg tables with Spark, querying with Trino, orchestrating with Airflow, and governing with OpenMetadata.


Data Pipeline Architecture

AKKO provides a complete Medallion architecture pipeline:

Sources --> Spark (Bronze) --> Spark (Silver) --> Spark (Gold) --> Trino --> Superset
              |                    |                  |
              v                    v                  v
           Iceberg             Iceberg            Iceberg
           (MinIO)             (MinIO)            (MinIO)
              |                    |                  |
              +-------- Polaris REST Catalog ---------+
                                   |
                              OpenMetadata
                           (lineage + quality)

Key components:

  • Spark Connect -- distributed compute engine for ETL
  • Apache Iceberg -- open table format (ACID, time-travel, schema evolution)
  • Apache Polaris -- REST catalog for Iceberg tables
  • MinIO -- S3-compatible object storage (data lake)
  • Trino -- federated SQL query engine
  • Airflow -- pipeline orchestration and scheduling

Creating Iceberg Tables via Spark Connect

Connect to Spark from a JupyterHub notebook:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .remote("sc://spark-connect:15002") \
    .getOrCreate()

# Create a namespace
spark.sql("CREATE NAMESPACE IF NOT EXISTS polaris.analytics")

# Create an Iceberg table
spark.sql("""
    CREATE TABLE IF NOT EXISTS polaris.analytics.events (
        event_id BIGINT,
        event_type STRING,
        user_id STRING,
        payload STRING,
        event_timestamp TIMESTAMP,
        processing_date DATE
    )
    USING iceberg
    PARTITIONED BY (processing_date)
""")

# Insert data
spark.sql("""
    INSERT INTO polaris.analytics.events VALUES
    (1, 'login', 'user_001', '{"ip": "10.0.0.1"}', TIMESTAMP '2026-03-13 08:30:00', DATE '2026-03-13'),
    (2, 'purchase', 'user_002', '{"amount": 99.99}', TIMESTAMP '2026-03-13 09:15:00', DATE '2026-03-13')
""")

# Query the table
spark.sql("SELECT * FROM polaris.analytics.events").show()

Iceberg Features

# Time travel -- query historical snapshots
spark.sql("SELECT * FROM polaris.analytics.events VERSION AS OF 1").show()

# Schema evolution -- add columns without rewriting data
spark.sql("ALTER TABLE polaris.analytics.events ADD COLUMN region STRING")

# Snapshot management
spark.sql("SELECT * FROM polaris.analytics.events.snapshots").show()

Use .show() instead of .collect()

In Spark Connect mode, .collect() on Iceberg metadata tables may throw a SerializedLambda error. Use .show() instead.


Querying via Trino

Trino provides federated SQL across Iceberg tables and PostgreSQL.

Iceberg Queries

-- Query Iceberg tables through the Polaris catalog
SELECT event_type, COUNT(*) as cnt
FROM iceberg.analytics.events
GROUP BY event_type;

-- Join Iceberg + PostgreSQL data
SELECT e.event_type, e.user_id, u.full_name
FROM iceberg.analytics.events e
JOIN postgresql.public.users u ON e.user_id = u.user_id;

Available Catalogs in Trino

Catalog Backend Use Case
iceberg Polaris REST catalog (MinIO) Lakehouse tables
postgresql akko-postgresql-data Operational/transactional data
system Trino internal Cluster metadata

Connecting from Python

from trino.dbapi import connect

conn = connect(host="trino", port=8080, user="alice",
               catalog="iceberg", schema="analytics")
cursor = conn.cursor()
cursor.execute("SELECT * FROM events LIMIT 100")
for row in cursor.fetchall():
    print(row)

Scheduling with Airflow DAGs

Airflow orchestrates your data pipelines. DAGs are stored in airflow/dags/.

Example DAG Structure

from airflow.decorators import dag, task
from datetime import datetime

@dag(
    schedule="@daily",
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=["analytics", "iceberg"]
)
def analytics_pipeline():

    @task
    def extract():
        """Extract data from source systems."""
        # Spark Connect job
        pass

    @task
    def transform(raw_data):
        """Apply business logic (Bronze -> Silver -> Gold)."""
        pass

    @task
    def quality_check():
        """Run data quality assertions via OpenMetadata."""
        pass

    @task
    def publish():
        """Update Trino views and Superset datasets."""
        pass

    raw = extract()
    clean = transform(raw)
    quality_check() >> publish()

analytics_pipeline()

Pre-built DAG

The akko_e2e_pipeline.py DAG demonstrates the full flow: ingest CSV into Iceberg via Spark, run quality checks, and refresh Superset datasets.

Monitoring DAGs

  • Access the Airflow UI at https://airflow.<domain>
  • Check DAG run status, task logs, and execution graphs
  • Failed tasks trigger Alertmanager notifications (if configured)

Data Governance with OpenMetadata

OpenMetadata provides a centralized data catalog with lineage, quality, and glossary management.

Automatic Ingestion

AKKO pre-configures ingestion pipelines for:

  • Trino -- discovers all schemas and tables
  • Airflow -- captures DAG metadata and lineage
  • Superset -- indexes dashboards and charts

Data Lineage

OpenMetadata automatically traces data flow:

Spark Job --> Iceberg Table --> Trino View --> Superset Dashboard

Data Quality

Define quality tests in OpenMetadata:

  • Column null percentage < 5%
  • Unique constraint on event_id
  • Freshness: table updated in the last 24 hours
  • Custom SQL assertions

Enriching the Catalog

Use the enrichment scripts to add business context:

# V1: tags, glossary, owners, descriptions
python openmetadata/enrich_catalog.py

# V2: domains, data products, dashboards, pipelines, quality, lineage
python openmetadata/enrich_catalog_v2.py

Glossary & Tags

  • Create business glossary terms (e.g., "PII", "Revenue", "Customer ID")
  • Tag tables and columns for classification and compliance
  • Assign data owners per dataset for accountability

Access OpenMetadata at https://openmetadata.<domain>.