OS
Otter Streams v1.0.17
Str:::Lab Studio GitHub

Overview

Str:::Lab Studio provides two dedicated UI panels that sit above the raw SQL editor and make working with the ml-inference connector ergonomic:

Feature Engineering Manager
  • Define and name feature columns from stream fields
  • Apply built-in transforms (cast, normalise, encode)
  • Generate the MAP[key, CAST(col AS STRING), ...] SQL expression
  • Preview feature map with sample data
  • Save feature sets as reusable templates
Inference Manager
  • Register the shaded JAR and create the UDF in one click
  • Define connector tables (model preload from MinIO)
  • Set score threshold and risk tier labels
  • Generate the full inference SQL with CROSS JOIN LATERAL
  • Monitor live inference latency and error rate
i
Both panels generate standard Flink SQL. Everything shown here can also be run by pasting the generated SQL directly into the Studio SQL editor, or into any other Flink SQL Gateway session. See the Table API / Non-Studio section for the raw Java Table API equivalent.

Feature Engineering Manager

The Feature Engineering Manager lets you build the feature map that will be passed to ml_score() without writing the MAP[...] SQL by hand. Open it from the Studio top bar: Studio → Feature Engineering.

Step-by-step: building a fraud detection feature set

Select the source table

In the Source Table dropdown choose transactions (the Kafka source table you created). The manager reads the table schema and lists all available columns on the left panel.

Add features from the column list

Drag or click the columns you want to include. For each column the manager creates a feature entry with three fields: Feature Name, Source Column, and Transform.

Feature nameSource columnTransformNotes
amountamountCAST to STRINGEngine casts back to float internally
hour_of_dayhour_of_dayCAST to STRING
day_of_weekday_of_weekCAST to STRING
is_internationalis_internationalCAST to STRINGBoolean → 'true'/'false'
card_typecard_typeNone (already STRING)
currencycurrencyNone (already STRING)
Name and save the feature set

Give the feature set a name: fraud_features_v1. Click Save Template. The manager stores it as a session-level named expression you can reference in future queries.

Copy the generated MAP expression

Click Copy SQL. The manager outputs the following ready-to-paste expression:

SQLGenerated by Feature Engineering Manager
-- Feature set: fraud_features_v1
-- Generated by Str:::Lab Studio Feature Engineering Manager
MAP[
    'amount',           CAST(amount           AS STRING),
    'hour_of_day',      CAST(hour_of_day      AS STRING),
    'day_of_week',      CAST(day_of_week      AS STRING),
    'is_international', CAST(is_international AS STRING),
    'card_type',        card_type,
    'currency',         currency
] AS fraud_features
Preview with sample data

Click Preview Features. The manager runs a bounded SELECT against the table and shows a sample of the feature maps produced by the expression, so you can verify the values before running the pipeline.

!
String encoding of booleans produces 'true' / 'false' in Flink SQL. If your model was trained expecting '1' / '0' use CASE WHEN is_international THEN '1' ELSE '0' END in the transform field instead.

Inference Manager

The Inference Manager handles the model-side setup: JAR registration, UDF creation, MinIO connector table, and pipeline query generation. Open it from: Studio → Inference Manager.

Step-by-step: registering the fraud model

Upload the shaded JAR

In the JAR Management tab click Upload JAR and select otter-stream-sql-1.0.17-flink-udf.jar. The Studio copies it to /var/www/udf-jars/ on the Gateway container and runs ADD JAR automatically.

!
You must upload the shaded classifier JAR (*-flink-udf.jar), not the plain thin JAR. The shaded artifact bundles ml-inference-core so that InferenceException is on the Gateway classpath at registration time.
Register the UDF

Switch to the Register UDF tab. Fill in these three fields and click Register Function:

Function Name
ml_score
Language
JAVA
Class Path
com.codedstreams.otterstreams.sql.udf.MLInferenceFunction

The Studio generates and executes this DDL in the active session:

SQL
CREATE TEMPORARY FUNCTION IF NOT EXISTS ml_score
AS 'com.codedstreams.otterstreams.sql.udf.MLInferenceFunction'
LANGUAGE JAVA;
Configure the MinIO model source

In the Model Sources tab click Add Model and fill in the form:

FieldValue
Model Namefraud-detector
Model Paths3a://ml-models/fraud-detector/v1/
Formattensorflow-savedmodel
MinIO Endpointhttp://minio:9000
Access Keyminioadmin
Secret Keyminioadmin (stored in session config — not persisted to disk)
Path Styletrue (required for MinIO)
Cache TTL60 minutes

Click Load Model. The Studio creates the connector table and verifies the MinIO download:

SQLGenerated by Inference Manager — Model Source
CREATE TEMPORARY TABLE fraud_model_source (
    score      DOUBLE,
    confidence DOUBLE
) WITH (
    'connector'           = 'ml-inference',
    'model.name'          = 'fraud-detector',
    'model.path'          = 's3a://ml-models/fraud-detector/v1/',
    'model.format'        = 'tensorflow-savedmodel',
    'model.s3.endpoint'   = 'http://minio:9000',
    'model.s3.access-key' = 'minioadmin',
    'model.s3.secret-key' = 'minioadmin',
    'model.s3.path-style' = 'true',
    'cache.enabled'       = 'true',
    'cache.ttl-minutes'   = '60'
);

DESCRIBE fraud_model_source;

A green status indicator confirms the model was downloaded from MinIO and is resident in ModelCache. A red indicator means the download failed — check the TaskManager logs linked from the status tooltip.

Configure the inference pipeline

In the Pipeline Builder tab:

  • Select Source Table: transactions
  • Select Feature Set: fraud_features_v1 (from the Feature Engineering Manager)
  • Select Model: fraud-detector
  • Set Score Column Name: fraud_score
  • Set Score Threshold: 0.40
  • Configure Risk Tiers: CRITICAL ≥ 0.85, HIGH ≥ 0.65, MEDIUM ≥ 0.40
  • Select Sink Table: fraud_alerts

Click Generate SQL. The Inference Manager produces the complete pipeline query shown in the next section.

Run and monitor

Click Execute Pipeline. The Studio submits the INSERT job to the Flink cluster. The Metrics panel in the Inference Manager shows live inference latency (P50, P95, P99), requests/sec, cache hit rate, and error rate — all sourced from the Micrometer metrics emitted by ml-inference-core.

SQL Connector Demo — Full Generated SQL

Below is the complete SQL that the Str:::Lab Studio Inference Manager generates and executes. You can paste this directly into any Flink SQL Gateway session or Studio SQL editor to reproduce the end-to-end pipeline without using the UI panels.

Section 1 — Session Properties
SQL
-- ── SESSION PROPERTIES ────────────────────────────────────────────────────
-- Set MinIO credentials for the model loader.
-- Re-run after any session renewal.
SET 'otter.minio.endpoint'          = 'http://minio:9000';
SET 'otter.minio.access-key'        = 'minioadmin';
SET 'otter.minio.secret-key'        = 'minioadmin';
SET 'otter.model.cache.max-size'    = '20';
SET 'otter.model.cache.ttl-minutes' = '60';

SET;
Section 2 — JAR + UDF Registration
SQL
-- ── JAR AND UDF ────────────────────────────────────────────────────────────
ADD JAR '/var/www/udf-jars/otter-stream-sql-1.0.17-flink-udf.jar';

CREATE TEMPORARY FUNCTION IF NOT EXISTS ml_score
AS 'com.codedstreams.otterstreams.sql.udf.MLInferenceFunction'
LANGUAGE JAVA;

SHOW JARS;
SHOW USER FUNCTIONS;
Section 3 — Model Preload (MinIO connector table)
SQL
-- ── MODEL PRELOAD ──────────────────────────────────────────────────────────
-- Creating this table triggers the MinIO download and populates ModelCache.
-- Run this BEFORE starting the streaming pipeline to avoid cold-start latency.
CREATE TEMPORARY TABLE fraud_model_source (
    score      DOUBLE,
    confidence DOUBLE
) WITH (
    'connector'           = 'ml-inference',
    'model.name'          = 'fraud-detector',
    'model.path'          = 's3a://ml-models/fraud-detector/v1/',
    'model.format'        = 'tensorflow-savedmodel',
    'model.version'       = 'v1',
    'model.s3.endpoint'   = 'http://minio:9000',
    'model.s3.access-key' = 'minioadmin',
    'model.s3.secret-key' = 'minioadmin',
    'model.s3.path-style' = 'true',
    'cache.enabled'       = 'true',
    'cache.ttl-minutes'   = '60',
    'batch.size'          = '1',
    'async.enabled'       = 'false'
);

-- Confirm the connector initialised (model loaded from MinIO)
DESCRIBE fraud_model_source;
Section 4 — Source and Sink Tables
SQL
-- ── KAFKA SOURCE: transactions ─────────────────────────────────────────────
CREATE TEMPORARY TABLE transactions (
    transaction_id   STRING,
    user_id          STRING,
    merchant_id      STRING,
    amount           DOUBLE,
    currency         STRING,
    card_type        STRING,
    hour_of_day      INT,
    day_of_week      INT,
    is_international BOOLEAN,
    event_time       TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
    'connector'                      = 'kafka',
    'topic'                          = 'transactions',
    'properties.bootstrap.servers'   = 'kafka:9092',
    'properties.group.id'            = 'otter-fraud-demo',
    'scan.startup.mode'              = 'latest-offset',
    'format'                         = 'json',
    'json.timestamp-format.standard' = 'ISO-8601',
    'json.fail-on-missing-field'     = 'false',
    'json.ignore-parse-errors'       = 'true'
);

-- ── KAFKA SINK: fraud alerts ────────────────────────────────────────────────
CREATE TEMPORARY TABLE fraud_alerts (
    alert_id       STRING,
    transaction_id STRING,
    user_id        STRING,
    amount         DOUBLE,
    fraud_score    DOUBLE,
    risk_tier      STRING,
    detected_at    TIMESTAMP(3)
) WITH (
    'connector'                    = 'kafka',
    'topic'                        = 'fraud-alerts',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format'                       = 'json',
    'json.timestamp-format.standard' = 'ISO-8601'
);
Section 5 — Feature Engineering View
SQL
-- ── FEATURE ENGINEERING VIEW ──────────────────────────────────────────────
-- This view encapsulates the feature map construction generated by the
-- Str:::Lab Studio Feature Engineering Manager (feature set: fraud_features_v1).
-- Separating feature engineering into a view makes the inference query cleaner
-- and lets you reuse or version the feature set independently.

CREATE TEMPORARY VIEW fraud_feature_view AS
SELECT
    transaction_id,
    user_id,
    amount,
    event_time,
    -- Feature map: all values cast to STRING for the UDF
    -- The inference engine coerces them back to the correct numeric types
    MAP[
        'amount',           CAST(amount           AS STRING),
        'hour_of_day',      CAST(hour_of_day      AS STRING),
        'day_of_week',      CAST(day_of_week      AS STRING),
        'is_international', CAST(is_international AS STRING),
        'card_type',        card_type,
        'currency',         currency
    ] AS fraud_features
FROM transactions;
Section 6 — Streaming Inference Pipeline
SQL
-- ── STREAMING INFERENCE PIPELINE ──────────────────────────────────────────
-- Reads from the feature view, calls ml_score once per row via CROSS JOIN
-- LATERAL (prevents double evaluation), applies risk tiering, and writes
-- qualifying events to the fraud_alerts Kafka topic.

INSERT INTO fraud_alerts
SELECT
    -- Deterministic alert ID: transaction_id + score bucket
    CONCAT(transaction_id, '-', CAST(FLOOR(fraud_score * 10) AS STRING)) AS alert_id,
    transaction_id,
    user_id,
    amount,
    fraud_score,
    CASE
        WHEN fraud_score >= 0.85 THEN 'CRITICAL'
        WHEN fraud_score >= 0.65 THEN 'HIGH'
        WHEN fraud_score >= 0.40 THEN 'MEDIUM'
        ELSE                          'LOW'
    END AS risk_tier,
    CURRENT_TIMESTAMP AS detected_at
FROM fraud_feature_view
-- CROSS JOIN LATERAL evaluates the UDF once and names the result,
-- so we can reference fraud_score in both SELECT and WHERE without
-- calling the function twice per row.
CROSS JOIN LATERAL (
    SELECT COALESCE(
        ml_score(fraud_features, 'fraud-detector'),
        0.0
    ) AS fraud_score
) scores
WHERE fraud_score >= 0.40;
Section 7 — Verification
SQL
-- ── VERIFICATION ──────────────────────────────────────────────────────────

-- 1. Confirm JAR is loaded
SHOW JARS;
-- Expected: otter-stream-sql-1.0.17-flink-udf.jar

-- 2. Confirm UDF is registered
SHOW USER FUNCTIONS;
-- Expected: ml_score appears in the list

-- 3. Smoke-test: static feature map — should return a DOUBLE in [0.0, 1.0]
--    A NULL result means the model is NOT in ModelCache (re-run Section 3)
SELECT ml_score(
    MAP[
        'amount',           '350.00',
        'hour_of_day',      '23',
        'day_of_week',      '6',
        'is_international', 'true',
        'card_type',        'CREDIT',
        'currency',         'USD'
    ],
    'fraud-detector'
) AS smoke_test_score;

-- 4. Confirm the connector table is healthy (model loaded from MinIO)
DESCRIBE fraud_model_source;

-- 5. Count rows produced to the alert topic in the last 60 s
--    (requires the INSERT pipeline from Section 6 to be running)
SELECT COUNT(*) AS alerts_in_last_60s
FROM fraud_alerts
WHERE detected_at >= CURRENT_TIMESTAMP - INTERVAL '60' SECOND;

Table API / Non-Studio Platforms

The ml-inference connector works on any platform that runs the Flink Table / SQL API — not just Str:::Lab Studio. Here is how to use it programmatically via the Java Table API, for platforms that do not have a Studio UI.

JavaTableApiFraudPipeline.java
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class TableApiFraudPipeline {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv    = StreamTableEnvironment.create(env);

        // ── 1. Register the JAR on the classpath ────────────────────────────
        //    On non-Studio platforms, add the JAR to the Flink classpath at
        //    job submission time, or use the SQL ADD JAR statement.
        tEnv.executeSql(
            "ADD JAR '/opt/flink/usrlib/otter-stream-sql-1.0.17-flink-udf.jar'"
        );

        // ── 2. Register the UDF ─────────────────────────────────────────────
        tEnv.executeSql(
            "CREATE TEMPORARY FUNCTION IF NOT EXISTS ml_score " +
            "AS 'com.codedstreams.otterstreams.sql.udf.MLInferenceFunction' " +
            "LANGUAGE JAVA"
        );

        // ── 3. Preload the model via connector table ─────────────────────────
        tEnv.executeSql(
            "CREATE TEMPORARY TABLE fraud_model_source (score DOUBLE) WITH (" +
            "  'connector'           = 'ml-inference',"           +
            "  'model.name'          = 'fraud-detector',"         +
            "  'model.path'          = 's3a://ml-models/fraud-detector/v1/'," +
            "  'model.format'        = 'tensorflow-savedmodel',"  +
            "  'model.s3.endpoint'   = 'http://minio:9000',"      +
            "  'model.s3.access-key' = 'minioadmin',"             +
            "  'model.s3.secret-key' = 'minioadmin',"             +
            "  'model.s3.path-style' = 'true',"                   +
            "  'cache.enabled'       = 'true',"                   +
            "  'cache.ttl-minutes'   = '60'"                      +
            ")"
        );

        // ── 4. Source table ─────────────────────────────────────────────────
        tEnv.executeSql(
            "CREATE TEMPORARY TABLE transactions (" +
            "  transaction_id   STRING,"  +
            "  user_id          STRING,"  +
            "  amount           DOUBLE,"  +
            "  hour_of_day      INT,"     +
            "  is_international BOOLEAN," +
            "  card_type        STRING,"  +
            "  currency         STRING,"  +
            "  event_time       TIMESTAMP(3)," +
            "  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND" +
            ") WITH (" +
            "  'connector' = 'kafka',"                                    +
            "  'topic'     = 'transactions',"                             +
            "  'properties.bootstrap.servers' = 'kafka:9092',"           +
            "  'scan.startup.mode' = 'latest-offset',"                   +
            "  'format' = 'json'"                                         +
            ")"
        );

        // ── 5. Run inference via Table API expression ────────────────────────
        Table scored = tEnv.sqlQuery(
            "SELECT " +
            "  transaction_id," +
            "  user_id," +
            "  amount," +
            "  fraud_score," +
            "  CASE " +
            "    WHEN fraud_score >= 0.85 THEN 'CRITICAL' " +
            "    WHEN fraud_score >= 0.65 THEN 'HIGH' " +
            "    ELSE 'MEDIUM' " +
            "  END AS risk_tier " +
            "FROM transactions " +
            "CROSS JOIN LATERAL (" +
            "  SELECT COALESCE(ml_score(" +
            "    MAP['amount', CAST(amount AS STRING)," +
            "        'hour_of_day', CAST(hour_of_day AS STRING)," +
            "        'is_international', CAST(is_international AS STRING)," +
            "        'card_type', card_type," +
            "        'currency', currency]," +
            "    'fraud-detector'" +
            "  ), 0.0) AS fraud_score" +
            ") scores " +
            "WHERE fraud_score >= 0.40"
        );

        // ── 6. Write to Kafka sink ───────────────────────────────────────────
        tEnv.executeSql(
            "CREATE TEMPORARY TABLE fraud_alerts (" +
            "  transaction_id STRING, user_id STRING," +
            "  amount DOUBLE, fraud_score DOUBLE, risk_tier STRING" +
            ") WITH (" +
            "  'connector' = 'kafka'," +
            "  'topic'     = 'fraud-alerts'," +
            "  'properties.bootstrap.servers' = 'kafka:9092'," +
            "  'format'    = 'json'" +
            ")"
        );

        scored.executeInsert("fraud_alerts");
    }
}

End-to-End Studio Walkthrough — Quick Reference

1
Build the shaded JAR (once per version)

Run mvn clean package -DskipTests inside otter-stream-sql/. Collect target/otter-stream-sql-1.0.17-flink-udf.jar.

2
Upload the JAR in Studio

Studio → Inference Manager → JAR Management → Upload JAR.

3
Define feature set

Studio → Feature Engineering → add columns, set transforms, name and save as fraud_features_v1.

4
Register the UDF

Studio → Inference Manager → Register UDF → fill three fields → Register Function.

5
Load the model from MinIO

Studio → Inference Manager → Model Sources → Add Model → fill MinIO details → Load Model. Wait for green status.

6
Build and run the pipeline

Studio → Inference Manager → Pipeline Builder → select source/feature set/model/sink → Generate SQL → Execute Pipeline.

7
Smoke-test and monitor

Run the Section 7 verification queries. Watch the Metrics panel for live inference latency and cache hit rate.

Studio Tips

Session renewal resets JAR + UDF
After any disconnect or session renewal, re-run Sections 1 and 2 (or use the Inference Manager buttons). The connector table from Section 3 also needs to be recreated — its creation is what triggers the MinIO download.
NULL score = cache miss
If the smoke test returns NULL the model is not in ModelCache. Re-run the connector table creation (Section 3). Then check TaskManager logs for any MinIO download exception — verify endpoint, credentials, and model.s3.path-style = 'true'.
Feature Engineering templates persist in session
Feature set templates are stored in the Studio session state. They are lost on session renewal. Export them as SQL views before ending a session so you can recreate them quickly next time.
CROSS JOIN LATERAL avoids double evaluation
Referencing ml_score(...) in both SELECT and WHERE without LATERAL causes Flink to call the UDF twice per row, doubling inference cost. Always use CROSS JOIN LATERAL (...) scores to name the score once.
Feature view separates concerns
Creating the feature map as a CREATE TEMPORARY VIEW (Section 5) keeps the inference pipeline query clean and lets you version or A/B test feature sets independently of the model.
Use path-style for MinIO
Always set 'model.s3.path-style' = 'true'. Virtual-hosted style (bucket.host) requires DNS configuration that is typically absent in local and staging MinIO deployments.