OS
Otter Streams v1.0.17
Flink 1.15–1.20 GitHub

MinIO Pipeline Demo

This is the reference integration test for Str:::Lab Studio. It demonstrates the complete lifecycle: uploading a model to MinIO, loading it into the Otter Streams ModelCache via a pre-load job, registering the UDF, and running a streaming inference query.

i
All SQL blocks below are designed to be pasted directly into the Str:::Lab Studio SQL editor, one section at a time. Run each section in order. The Studio UDF Manager handles ADD JAR and CREATE FUNCTION for you if you use the Register UDF tab; these blocks show the raw SQL equivalents for reference and for pipeline automation.

Prerequisites

RequirementValue / Notes
Shaded JAR uploadedotter-stream-sql-1.0.17-flink-udf.jar in /var/www/udf-jars/
MinIO endpointAccessible from the Flink TaskManagers (not just from the browser)
MinIO bucketml-models (adjust to your bucket name)
Model pathml-models/fraud-detector/v1/ — contains saved_model.pb + variables/
Flink sessionActive gateway session in Studio — check the session indicator
Kafka topicstransactions (input), fraud-alerts (output)

MinIO Configuration

The otter-stream-sql module uses the MinIO Java client bundled inside the shaded JAR to download model files at engine initialisation time. Pass the connection details through the DDL WITH options or through Flink's configuration properties.

Set session-level MinIO properties

These SET statements write into the Flink session configuration and are picked up by the MinioModelLoader inside the SQL module. Run them once at the start of every Studio session.

SQL Section 1 of 6 — Session properties
-- ── Section 1: MinIO connection properties ──────────────────────────────
-- Adjust endpoint, credentials, and bucket to match your deployment.
-- These are session-scoped: re-run after any session renewal.

SET 'otter.minio.endpoint'          = 'http://minio:9000';
SET 'otter.minio.access-key'        = 'minioadmin';
SET 'otter.minio.secret-key'        = 'minioadmin';
SET 'otter.minio.bucket'            = 'ml-models';
SET 'otter.model.cache.max-size'    = '20';
SET 'otter.model.cache.ttl-minutes' = '60';

-- Verify the properties are visible to the session
SET;
!
Credentials in DDL. For production use, inject MinIO credentials via Flink's secrets management or environment variables rather than hardcoding them in SQL. The properties shown here match a typical local or staging MinIO deployment.

JAR and UDF Registration

If you are running this demo manually through the Studio SQL editor rather than the UDF Manager, execute the following block to load the JAR and register the function. The Studio UDF Manager does this automatically when you click Register Function.

SQL Section 2 of 6 — JAR and UDF
-- ── Section 2: Load JAR and register scalar UDF ─────────────────────────

-- Load the shaded JAR onto the Gateway session classpath.
-- Path must exist on the SQL Gateway container filesystem.
ADD JAR '/var/www/udf-jars/otter-stream-sql-1.0.17-flink-udf.jar';

-- Confirm the JAR is visible
SHOW JARS;

-- Register the ML inference scalar function
CREATE TEMPORARY FUNCTION IF NOT EXISTS ml_score
AS 'com.codedstreams.otterstreams.sql.udf.MLInferenceFunction'
LANGUAGE JAVA;

-- Confirm registration
SHOW USER FUNCTIONS;

Model Preloading

ModelCache is populated lazily — the first call to ml_score(features, 'fraud-detector') that finds no cached engine will attempt to load the model from MinIO. For production pipelines this cold-start latency is undesirable. The preferred pattern is to declare a connector table that forces the engine to load during table creation, before any streaming query begins.

SQL Section 3 of 6 — Model preload via connector table
-- ── Section 3: Preload the model into ModelCache ────────────────────────
-- Creating this connector table triggers MinIO download and engine init.
-- The table itself is not queried directly in this demo; it exists solely
-- to warm the ModelCache before the streaming pipeline starts.

CREATE TEMPORARY TABLE fraud_model_source (
    score      DOUBLE,
    confidence DOUBLE
) WITH (
    'connector'          = 'ml-inference',

    -- Logical name used in ml_score(features, '')
    'model.name'         = 'fraud-detector',

    -- MinIO path: s3a:////
    -- The loader downloads saved_model.pb + variables/ from this prefix.
    'model.path'         = 's3a://ml-models/fraud-detector/v1/',
    'model.format'       = 'tensorflow-savedmodel',
    'model.version'      = 'v1',

    -- MinIO / S3-compatible storage settings
    'model.s3.endpoint'  = 'http://minio:9000',
    'model.s3.access-key'= 'minioadmin',
    'model.s3.secret-key'= 'minioadmin',
    'model.s3.path-style'= 'true',

    -- Cache behaviour
    'cache.enabled'      = 'true',
    'cache.ttl-minutes'  = '60',

    -- Inference settings
    'batch.size'         = '1',
    'async.enabled'      = 'false'
);

-- A lightweight describe confirms the connector initialised without error.
-- If the model failed to download you will see an error here rather than
-- silently at inference time.
DESCRIBE fraud_model_source;

Source and Sink Tables

SQL Section 4 of 6 — Kafka source and sink
-- ── Section 4: Kafka source table (transactions) ─────────────────────────
-- Adjust bootstrap.servers and topic to match your Kafka deployment.

CREATE TEMPORARY TABLE transactions (
    transaction_id   STRING,
    user_id          STRING,
    merchant_id      STRING,
    amount           DOUBLE,
    currency         STRING,
    card_type        STRING,
    hour_of_day      INT,
    day_of_week      INT,
    is_international BOOLEAN,
    -- Feature map pre-assembled upstream (or computed in the query below)
    features         MAP<STRING, STRING>,
    event_time       TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
    'connector'                     = 'kafka',
    'topic'                         = 'transactions',
    'properties.bootstrap.servers'  = 'kafka:9092',
    'properties.group.id'           = 'otter-fraud-demo',
    'scan.startup.mode'             = 'latest-offset',
    'format'                        = 'json',
    'json.timestamp-format.standard'= 'ISO-8601',
    'json.fail-on-missing-field'    = 'false',
    'json.ignore-parse-errors'      = 'true'
);

-- ── Kafka sink table (fraud alerts) ───────────────────────────────────────
CREATE TEMPORARY TABLE fraud_alerts (
    alert_id         STRING,
    transaction_id   STRING,
    user_id          STRING,
    amount           DOUBLE,
    fraud_score      DOUBLE,
    risk_tier        STRING,
    detected_at      TIMESTAMP(3)
) WITH (
    'connector'                    = 'kafka',
    'topic'                        = 'fraud-alerts',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format'                       = 'json',
    'json.timestamp-format.standard' = 'ISO-8601'
);

Full Streaming Pipeline

This is the core inference pipeline. It reads from the transactions Kafka topic, builds a feature map inline, calls ml_score to obtain a fraud probability, classifies the result into risk tiers, and writes qualifying events to fraud-alerts.

Note that the features map is assembled using MAP[...] syntax so the pipeline works even when the upstream producer has not pre-assembled it. In practice you would either pre-assemble the map in the producer or use a Flink SQL computed column.

SQL Section 5 of 6 — Streaming inference pipeline
-- ── Section 5: Real-time fraud inference pipeline ───────────────────────
-- Writes scored events with risk_score > 0.40 to the fraud-alerts topic.
-- Adjust the threshold to suit your operating point.

INSERT INTO fraud_alerts
SELECT
    -- Deterministic alert ID from transaction ID + score bucket
    CONCAT(transaction_id, '-', CAST(FLOOR(fraud_score * 10) AS STRING))
        AS alert_id,
    transaction_id,
    user_id,
    amount,
    fraud_score,

    -- Risk tier classification downstream of the model score
    CASE
        WHEN fraud_score >= 0.85 THEN 'CRITICAL'
        WHEN fraud_score >= 0.65 THEN 'HIGH'
        WHEN fraud_score >= 0.40 THEN 'MEDIUM'
        ELSE                          'LOW'
    END AS risk_tier,

    CURRENT_TIMESTAMP AS detected_at

FROM (
    SELECT
        transaction_id,
        user_id,
        amount,

        -- Build feature map from individual columns.
        -- All values are cast to STRING because the UDF accepts MAP<STRING,STRING>.
        -- The inference engine casts them back to the correct numeric types internally.
        MAP[
            'amount',           CAST(amount          AS STRING),
            'hour_of_day',      CAST(hour_of_day     AS STRING),
            'day_of_week',      CAST(day_of_week     AS STRING),
            'is_international', CAST(is_international AS STRING),
            'card_type',        card_type,
            'currency',         currency
        ] AS feature_map,

        event_time

    FROM transactions
) enriched

-- Call the registered UDF; COALESCE guards against NULL when the
-- model name is not yet in ModelCache (should not happen after preload).
CROSS JOIN LATERAL (
    SELECT COALESCE(
        ml_score(feature_map, 'fraud-detector'),
        0.0
    ) AS fraud_score
) scores

WHERE fraud_score >= 0.40;
i
The CROSS JOIN LATERAL pattern evaluates the scalar UDF once per row and makes the result available as a named column (fraud_score) that can be referenced in both the SELECT list and the WHERE clause without calling the function twice. This is equivalent to a WITH subquery but avoids the double evaluation that Flink may apply to scalar UDFs referenced in both positions.

Verification Queries

Run these in the Studio SQL editor to confirm the pipeline is healthy. They do not start new jobs — they are bounded queries against the current session state.

SQL Section 6 of 6 — Health checks
-- ── Section 6: Verification ──────────────────────────────────────────────

-- 1. Confirm the shaded JAR is on the classpath
SHOW JARS;
-- Expected: .../otter-stream-sql-1.0.17-flink-udf.jar

-- 2. Confirm the UDF is registered
SHOW USER FUNCTIONS;
-- Expected: ml_score appears in the list

-- 3. Smoke-test the UDF with a static feature map.
--    If the model is loaded this returns a DOUBLE between 0.0 and 1.0.
--    If it returns NULL the model name was not found in ModelCache.
SELECT ml_score(
    MAP['amount', '250.00',
        'hour_of_day', '23',
        'day_of_week', '6',
        'is_international', 'true',
        'card_type', 'CREDIT',
        'currency', 'USD'],
    'fraud-detector'
) AS smoke_test_score;

-- 4. Confirm the connector table is accessible (model loaded from MinIO)
DESCRIBE fraud_model_source;

-- 5. Count messages received on the transactions topic in the last 60 s
--    (requires the streaming job from Section 5 to be running)
SELECT COUNT(*) AS events_in_last_60s
FROM transactions
WHERE event_time >= CURRENT_TIMESTAMP - INTERVAL '60' SECOND;

Fraud Detection — Extended Example

A more complete version of the pipeline that adds windowed aggregation for velocity features, a dead-letter sink for rows where the model returned NULL, and a Flink view that abstracts the feature-building logic away from the inference query.

SQL fraud_detection_extended.sql
-- ── Extended fraud detection with velocity features ──────────────────────

-- Step A: Create a view that computes velocity features using a 5-minute
-- tumbling window before the inference step.
CREATE TEMPORARY VIEW transaction_velocity AS
SELECT
    user_id,
    COUNT(*)                                     AS txn_count_5m,
    SUM(amount)                                  AS total_amount_5m,
    MAX(amount)                                  AS max_amount_5m,
    COUNT(DISTINCT merchant_id)                  AS distinct_merchants_5m,
    TUMBLE_START(event_time, INTERVAL '5' MINUTE) AS window_start
FROM transactions
GROUP BY
    user_id,
    TUMBLE(event_time, INTERVAL '5' MINUTE);

-- Step B: Join raw transactions with velocity window and run inference.
-- The join key is user_id; window_start aligns to the tumble boundary.
CREATE TEMPORARY VIEW scored_transactions AS
SELECT
    t.transaction_id,
    t.user_id,
    t.amount,
    t.event_time,
    COALESCE(
        ml_score(
            MAP[
                'amount',                CAST(t.amount               AS STRING),
                'hour_of_day',           CAST(t.hour_of_day          AS STRING),
                'day_of_week',           CAST(t.day_of_week          AS STRING),
                'is_international',      CAST(t.is_international      AS STRING),
                'card_type',             t.card_type,
                'currency',              t.currency,
                -- Velocity features from the window join
                'txn_count_5m',          CAST(v.txn_count_5m          AS STRING),
                'total_amount_5m',       CAST(v.total_amount_5m       AS STRING),
                'max_amount_5m',         CAST(v.max_amount_5m         AS STRING),
                'distinct_merchants_5m', CAST(v.distinct_merchants_5m AS STRING)
            ],
            'fraud-detector'
        ),
        -1.0   -- sentinel: model returned NULL (engine not loaded)
    ) AS fraud_score
FROM transactions t
LEFT JOIN transaction_velocity v
  ON t.user_id      = v.user_id
 AND t.event_time  >= v.window_start
 AND t.event_time   < v.window_start + INTERVAL '5' MINUTE;

-- Step C: Route high-confidence fraud to the alerts sink,
-- and model errors (sentinel -1.0) to a dead-letter topic.
INSERT INTO fraud_alerts
SELECT
    CONCAT(transaction_id, '-alert') AS alert_id,
    transaction_id,
    user_id,
    amount,
    fraud_score,
    CASE
        WHEN fraud_score >= 0.85 THEN 'CRITICAL'
        WHEN fraud_score >= 0.65 THEN 'HIGH'
        ELSE                          'MEDIUM'
    END AS risk_tier,
    event_time AS detected_at
FROM scored_transactions
WHERE fraud_score >= 0.40;

Anomaly Detection — IoT Sensors

A second demo using an XGBoost model stored in MinIO. The model is a binary classifier trained on sensor telemetry; it flags readings that deviate significantly from the expected distribution. The model file is a JSON-format XGBoost model at ml-models/anomaly-detector/v2/model.json.

SQL anomaly_detection.sql
-- ── IoT anomaly detection with XGBoost model from MinIO ─────────────────

-- 1. Preload the XGBoost model from MinIO
CREATE TEMPORARY TABLE anomaly_model_source (
    score DOUBLE
) WITH (
    'connector'           = 'ml-inference',
    'model.name'          = 'anomaly-detector',
    'model.path'          = 's3a://ml-models/anomaly-detector/v2/model.json',
    'model.format'        = 'xgboost-json',
    'model.s3.endpoint'   = 'http://minio:9000',
    'model.s3.access-key' = 'minioadmin',
    'model.s3.secret-key' = 'minioadmin',
    'model.s3.path-style' = 'true',
    'cache.enabled'       = 'true'
);

-- 2. Sensor readings source
CREATE TEMPORARY TABLE sensor_readings (
    device_id    STRING,
    temperature  DOUBLE,
    pressure     DOUBLE,
    humidity     DOUBLE,
    vibration    DOUBLE,
    rpm          DOUBLE,
    reading_time TIMESTAMP(3),
    WATERMARK FOR reading_time AS reading_time - INTERVAL '10' SECOND
) WITH (
    'connector'                    = 'kafka',
    'topic'                        = 'sensor-readings',
    'properties.bootstrap.servers' = 'kafka:9092',
    'properties.group.id'          = 'otter-anomaly-demo',
    'scan.startup.mode'            = 'latest-offset',
    'format'                       = 'json'
);

-- 3. Anomaly alerts sink
CREATE TEMPORARY TABLE anomaly_alerts (
    device_id      STRING,
    anomaly_score  DOUBLE,
    severity       STRING,
    reading_time   TIMESTAMP(3),
    detected_at    TIMESTAMP(3)
) WITH (
    'connector'                    = 'kafka',
    'topic'                        = 'anomaly-alerts',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format'                       = 'json'
);

-- 4. Sliding window aggregation then inference
INSERT INTO anomaly_alerts
SELECT
    device_id,
    anomaly_score,
    CASE
        WHEN anomaly_score >= 0.90 THEN 'CRITICAL'
        WHEN anomaly_score >= 0.70 THEN 'HIGH'
        ELSE                           'MEDIUM'
    END AS severity,
    window_end AS reading_time,
    CURRENT_TIMESTAMP AS detected_at
FROM (
    SELECT
        device_id,
        HOP_END(reading_time, INTERVAL '30' SECOND, INTERVAL '5' MINUTE) AS window_end,
        COALESCE(
            ml_score(
                MAP[
                    'temperature', CAST(AVG(temperature) AS STRING),
                    'pressure',    CAST(AVG(pressure)    AS STRING),
                    'humidity',    CAST(AVG(humidity)    AS STRING),
                    'vibration',   CAST(AVG(vibration)   AS STRING),
                    'rpm',         CAST(AVG(rpm)         AS STRING)
                ],
                'anomaly-detector'
            ),
            0.0
        ) AS anomaly_score
    FROM sensor_readings
    GROUP BY
        device_id,
        HOP(reading_time, INTERVAL '30' SECOND, INTERVAL '5' MINUTE)
) windowed
WHERE anomaly_score >= 0.60;

Str:::Lab Studio Tips

Session renewal resets the classpath
After any session renewal or disconnect, re-run Sections 1 and 2 (properties + ADD JAR + CREATE FUNCTION) before executing inference queries. The Studio UDF Manager automates this if you use it rather than raw SQL.
Preload before the pipeline
Always run Section 3 (connector table creation) before Section 5 (the INSERT pipeline). This warms the ModelCache from MinIO. If you skip this step the first rows processed will experience cold-start latency while the model downloads.
Smoke-test with a static map
The static SELECT ml_score(MAP[...], 'fraud-detector') query in Section 6 is the fastest way to confirm end-to-end connectivity — JAR loaded, UDF registered, model in cache, engine producing output — without starting a streaming job.
NULL score means cache miss
If ml_score returns NULL the model name was not found in ModelCache. Re-run the connector table creation statement to trigger a fresh MinIO download. Check the Flink TaskManager logs for download errors.
MAP type compatibility
The Studio parameter builder shows MAP<STRING,STRING> as the closest available type for the features argument. The UDF accepts Map<String, Object> at runtime; Flink's type system handles the coercion automatically.
MinIO path-style access
Always set 'model.s3.path-style' = 'true' when connecting to MinIO. Virtual-hosted style (bucket.host) requires DNS configuration that is typically not present in local deployments.