Skip to content

Trino AI Plugin

The trino-ai-functions plugin adds 23 native scalar AI functions to Trino under the akko_ai_* prefix. Unlike connector-based approaches, these functions work on any Trino catalog — Iceberg, PostgreSQL, Hive, Delta, S3, etc.

-- Works on any source
SELECT name, akko_ai_sentiment(description) FROM iceberg.banking.transactions;
SELECT akko_ai_pii(comment)                 FROM postgresql.public.reviews;

Why the akko_ai_* prefix

Trino 466+ ships its own 7 ai_* functions in the llm catalog. AKKO uses the akko_ai_* prefix to avoid collision and to make the differentiation self-evident (cache, OPA, multimodal, audit). See AI / Trino AI Functions and ADR-026.

Architecture

Trino SQL query
  └─→ trino-ai-functions plugin (Java, in Trino JVM)
       └─→ HTTPS / HTTP/2 (java.net.http)
            └─→ akko-ai-service (FastAPI + OPA check)
                 └─→ LiteLLM gateway
                      └─→ Ollama / vLLM / OpenAI / etc.

The plugin runs inside the Trino coordinator JVM. Every akko_ai_*() SQL call is a function invocation that:

  1. Builds an HTTP GET to the AI Service.
  2. Forwards the calling user via X-Trino-User header plus the function label via X-Akko-Ai-Function: akko_ai_<name>.
  3. Uses bearer token authentication (AKKO_AI_SERVICE_TOKEN).
  4. Returns the parsed result, or NULL on any error.

Functions (23 total)

Function Purpose Example
akko_ai_ask(question[, context]) Question answering akko_ai_ask('What is a lakehouse?')
akko_ai_sentiment(text) Sentiment akko_ai_sentiment('I love this')POSITIVE
akko_ai_classify(text, categories) Classification akko_ai_classify('server down', 'bug,feature')
akko_ai_summarize(text[, n]) Summarization akko_ai_summarize('long text', 2)
akko_ai_translate(text[, lang]) Translation akko_ai_translate('Hello', 'French')
akko_ai_entities(text) NER akko_ai_entities('Tim Cook at Apple')
akko_ai_anomaly(value[, context]) Anomaly detection akko_ai_anomaly('150000', 'avg 45000')
akko_ai_sql(question[, schema]) NL → SQL akko_ai_sql('Top customers by revenue')
akko_ai_risk(profile) Risk scoring 0-100 akko_ai_risk('balance=-5000, inactive')
akko_ai_pii(text) PII redaction akko_ai_pii('Jean, jean@mail.com')
akko_ai_sensitivity(text) GDPR class akko_ai_sensitivity('SSN: 123...')
akko_ai_language(text) Language detection akko_ai_language('Bonjour')
akko_ai_keywords(text[, n]) Keyword extraction akko_ai_keywords('ML fraud detection', 3)
akko_ai_ocr(image) Image → text akko_ai_ocr(pdf_bytes)
akko_ai_describe_image(image[, prompt]) Image captioning akko_ai_describe_image(img)
akko_ai_parse_document(pdf) PDF → structured JSON akko_ai_parse_document(invoice)
akko_ai_transcribe(audio) Audio → text akko_ai_transcribe(call_bytes)
akko_ai_embed(text) 768-dim embedding akko_ai_embed('how to refund a charge')
akko_ai_similarity(vec_a, vec_b) Cosine similarity (local) akko_ai_similarity(a.embedding, b.embedding)
akko_ai_search(embedding, query) Semantic search score (cached) akko_ai_search(doc.embedding, 'refund policy')

Plus operational helpers: akko_ai_stats(), akko_ai_health(), akko_ai_version(), akko_ai_cache_clear(), akko_ai_cb_reset().

Semantic search pattern

akko_ai_search(embedding, query_text) is the canonical way to rank rows by semantic relevance. Internally it calls akko_ai_embed(query_text) exactly once per worker (LRU cache, 1024 distinct queries) then computes cosine similarity locally against each row — no HTTP call per row:

SELECT id, text, akko_ai_search(embedding, 'how do I refund a charge?') AS score
FROM iceberg.kb.documents
ORDER BY score DESC
LIMIT 10;

Equivalent but 10 – 1000× slower (re-embeds the query for every row):

-- DON'T do this in production
SELECT id, text, akko_ai_similarity(embedding, akko_ai_embed('how do I refund a charge?')) AS score
FROM iceberg.kb.documents ORDER BY score DESC LIMIT 10;

Storage-agnostic: the embedding column can live in Iceberg, Delta, Hive, PostgreSQL (vector or ARRAY<DOUBLE>), Hudi — any Trino catalog that exposes ARRAY<DOUBLE>.

Production-readiness features

  • Resilience — Circuit breaker (5 failures → OPEN, 60 s recovery), retry with exponential backoff (3 attempts), per-request timeout
  • Performance — HTTP/2 multiplexing, connection pool, Caffeine LRU cache (10 k entries, 1 h TTL, user-scoped)
  • Observability — HdrHistogram latency tracking (overall + per-function), JMX MBean exposed via Trino's jmx catalog and jmx_exporter for Prometheus, Tempo spans at the AI Service
  • Security — Bearer token from K8s Secret, X-Trino-User forwarding, HTTPS with optional cert validation skip for dev, OPA per-function allow-list enforced server-side
  • Robustness — All errors → SQL NULL via @SqlNullable (Trino is never destabilised)

Monitoring

From SQL:

SELECT akko_ai_stats();
-- JSON with circuit_breaker, latency p50/p95/p99, cache hit rate, per-function breakdown

SELECT * FROM jmx.current."dev.akko.trino.ai:type=aihttpclient";
-- 18 attributes including totals, latency percentiles, plugin version

For Prometheus, use the jmx_exporter agent attached to the Trino JVM. Metrics appear as:

trino_dev_akko_trino_ai_aihttpclient_total_requests
trino_dev_akko_trino_ai_aihttpclient_total_errors
trino_dev_akko_trino_ai_aihttpclient_latency_p95_micros
trino_dev_akko_trino_ai_aihttpclient_circuit_breaker_trips

Configuration

All settings via Trino coordinator environment variables (set in helm/akko/values.yaml under trino.env):

Variable Default Purpose
AKKO_AI_SERVICE_URL http://akko-akko-ai-service:8000 AI Service base URL
AKKO_AI_SERVICE_TOKEN (from akko-trino-ai Secret) Bearer token
AKKO_AI_TIMEOUT_MS 30000 Per-request timeout
AKKO_AI_CB_THRESHOLD 5 Failures before circuit opens
AKKO_AI_CB_RECOVERY_MS 60000 Recovery time before HALF_OPEN
AKKO_AI_RETRY_MAX 3 Max retry attempts
AKKO_AI_POOL_SIZE 16 HTTP thread pool size
AKKO_AI_CACHE_SIZE 10000 LRU cache max entries
AKKO_AI_CACHE_TTL_S 3600 Cache entry TTL (seconds)
AKKO_AI_VERIFY_TLS true Skip TLS verification (dev only)

See also