Overview
Str:::Lab Studio provides two dedicated UI panels that sit above the raw SQL editor
and make working with the ml-inference connector ergonomic:
- Define and name feature columns from stream fields
- Apply built-in transforms (cast, normalise, encode)
- Generate the
MAP[key, CAST(col AS STRING), ...]SQL expression - Preview feature map with sample data
- Save feature sets as reusable templates
- Register the shaded JAR and create the UDF in one click
- Define connector tables (model preload from MinIO)
- Set score threshold and risk tier labels
- Generate the full inference SQL with CROSS JOIN LATERAL
- Monitor live inference latency and error rate
Feature Engineering Manager
The Feature Engineering Manager lets you build the feature map that will be passed to
ml_score() without writing the MAP[...] SQL by hand. Open it
from the Studio top bar: Studio → Feature Engineering.
Step-by-step: building a fraud detection feature set
In the Source Table dropdown choose transactions (the Kafka
source table you created). The manager reads the table schema and lists all available
columns on the left panel.
Drag or click the columns you want to include. For each column the manager creates a feature entry with three fields: Feature Name, Source Column, and Transform.
| Feature name | Source column | Transform | Notes |
|---|---|---|---|
amount | amount | CAST to STRING | Engine casts back to float internally |
hour_of_day | hour_of_day | CAST to STRING | |
day_of_week | day_of_week | CAST to STRING | |
is_international | is_international | CAST to STRING | Boolean → 'true'/'false' |
card_type | card_type | None (already STRING) | |
currency | currency | None (already STRING) |
Give the feature set a name: fraud_features_v1. Click Save Template. The manager stores it as a session-level named expression you can reference in future queries.
Click Copy SQL. The manager outputs the following ready-to-paste expression:
-- Feature set: fraud_features_v1
-- Generated by Str:::Lab Studio Feature Engineering Manager
MAP[
'amount', CAST(amount AS STRING),
'hour_of_day', CAST(hour_of_day AS STRING),
'day_of_week', CAST(day_of_week AS STRING),
'is_international', CAST(is_international AS STRING),
'card_type', card_type,
'currency', currency
] AS fraud_features
Click Preview Features. The manager runs a bounded SELECT against the table and shows a sample of the feature maps produced by the expression, so you can verify the values before running the pipeline.
'true' / 'false'
in Flink SQL. If your model was trained expecting '1' / '0'
use CASE WHEN is_international THEN '1' ELSE '0' END in the transform
field instead.
Inference Manager
The Inference Manager handles the model-side setup: JAR registration, UDF creation, MinIO connector table, and pipeline query generation. Open it from: Studio → Inference Manager.
Step-by-step: registering the fraud model
In the JAR Management tab click Upload JAR and select
otter-stream-sql-1.0.17-flink-udf.jar. The Studio copies it to
/var/www/udf-jars/ on the Gateway container and runs
ADD JAR automatically.
*-flink-udf.jar), not the plain thin JAR. The shaded artifact
bundles ml-inference-core so that InferenceException
is on the Gateway classpath at registration time.
Switch to the Register UDF tab. Fill in these three fields and click Register Function:
ml_scoreJAVAcom.codedstreams.otterstreams.sql.udf.MLInferenceFunctionThe Studio generates and executes this DDL in the active session:
CREATE TEMPORARY FUNCTION IF NOT EXISTS ml_score AS 'com.codedstreams.otterstreams.sql.udf.MLInferenceFunction' LANGUAGE JAVA;
In the Model Sources tab click Add Model and fill in the form:
| Field | Value |
|---|---|
| Model Name | fraud-detector |
| Model Path | s3a://ml-models/fraud-detector/v1/ |
| Format | tensorflow-savedmodel |
| MinIO Endpoint | http://minio:9000 |
| Access Key | minioadmin |
| Secret Key | minioadmin (stored in session config — not persisted to disk) |
| Path Style | true (required for MinIO) |
| Cache TTL | 60 minutes |
Click Load Model. The Studio creates the connector table and verifies the MinIO download:
CREATE TEMPORARY TABLE fraud_model_source (
score DOUBLE,
confidence DOUBLE
) WITH (
'connector' = 'ml-inference',
'model.name' = 'fraud-detector',
'model.path' = 's3a://ml-models/fraud-detector/v1/',
'model.format' = 'tensorflow-savedmodel',
'model.s3.endpoint' = 'http://minio:9000',
'model.s3.access-key' = 'minioadmin',
'model.s3.secret-key' = 'minioadmin',
'model.s3.path-style' = 'true',
'cache.enabled' = 'true',
'cache.ttl-minutes' = '60'
);
DESCRIBE fraud_model_source;
A green status indicator confirms the model was downloaded from MinIO and is
resident in ModelCache. A red indicator means the download failed —
check the TaskManager logs linked from the status tooltip.
In the Pipeline Builder tab:
- Select Source Table:
transactions - Select Feature Set:
fraud_features_v1(from the Feature Engineering Manager) - Select Model:
fraud-detector - Set Score Column Name:
fraud_score - Set Score Threshold:
0.40 - Configure Risk Tiers: CRITICAL ≥ 0.85, HIGH ≥ 0.65, MEDIUM ≥ 0.40
- Select Sink Table:
fraud_alerts
Click Generate SQL. The Inference Manager produces the complete pipeline query shown in the next section.
Click Execute Pipeline. The Studio submits the INSERT job to the Flink
cluster. The Metrics panel in the Inference Manager shows live inference
latency (P50, P95, P99), requests/sec, cache hit rate, and error rate — all
sourced from the Micrometer metrics emitted by ml-inference-core.
SQL Connector Demo — Full Generated SQL
Below is the complete SQL that the Str:::Lab Studio Inference Manager generates and executes. You can paste this directly into any Flink SQL Gateway session or Studio SQL editor to reproduce the end-to-end pipeline without using the UI panels.
-- ── SESSION PROPERTIES ──────────────────────────────────────────────────── -- Set MinIO credentials for the model loader. -- Re-run after any session renewal. SET 'otter.minio.endpoint' = 'http://minio:9000'; SET 'otter.minio.access-key' = 'minioadmin'; SET 'otter.minio.secret-key' = 'minioadmin'; SET 'otter.model.cache.max-size' = '20'; SET 'otter.model.cache.ttl-minutes' = '60'; SET;
-- ── JAR AND UDF ──────────────────────────────────────────────────────────── ADD JAR '/var/www/udf-jars/otter-stream-sql-1.0.17-flink-udf.jar'; CREATE TEMPORARY FUNCTION IF NOT EXISTS ml_score AS 'com.codedstreams.otterstreams.sql.udf.MLInferenceFunction' LANGUAGE JAVA; SHOW JARS; SHOW USER FUNCTIONS;
-- ── MODEL PRELOAD ──────────────────────────────────────────────────────────
-- Creating this table triggers the MinIO download and populates ModelCache.
-- Run this BEFORE starting the streaming pipeline to avoid cold-start latency.
CREATE TEMPORARY TABLE fraud_model_source (
score DOUBLE,
confidence DOUBLE
) WITH (
'connector' = 'ml-inference',
'model.name' = 'fraud-detector',
'model.path' = 's3a://ml-models/fraud-detector/v1/',
'model.format' = 'tensorflow-savedmodel',
'model.version' = 'v1',
'model.s3.endpoint' = 'http://minio:9000',
'model.s3.access-key' = 'minioadmin',
'model.s3.secret-key' = 'minioadmin',
'model.s3.path-style' = 'true',
'cache.enabled' = 'true',
'cache.ttl-minutes' = '60',
'batch.size' = '1',
'async.enabled' = 'false'
);
-- Confirm the connector initialised (model loaded from MinIO)
DESCRIBE fraud_model_source;
-- ── KAFKA SOURCE: transactions ─────────────────────────────────────────────
CREATE TEMPORARY TABLE transactions (
transaction_id STRING,
user_id STRING,
merchant_id STRING,
amount DOUBLE,
currency STRING,
card_type STRING,
hour_of_day INT,
day_of_week INT,
is_international BOOLEAN,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'transactions',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'otter-fraud-demo',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
-- ── KAFKA SINK: fraud alerts ────────────────────────────────────────────────
CREATE TEMPORARY TABLE fraud_alerts (
alert_id STRING,
transaction_id STRING,
user_id STRING,
amount DOUBLE,
fraud_score DOUBLE,
risk_tier STRING,
detected_at TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'fraud-alerts',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
);
-- ── FEATURE ENGINEERING VIEW ──────────────────────────────────────────────
-- This view encapsulates the feature map construction generated by the
-- Str:::Lab Studio Feature Engineering Manager (feature set: fraud_features_v1).
-- Separating feature engineering into a view makes the inference query cleaner
-- and lets you reuse or version the feature set independently.
CREATE TEMPORARY VIEW fraud_feature_view AS
SELECT
transaction_id,
user_id,
amount,
event_time,
-- Feature map: all values cast to STRING for the UDF
-- The inference engine coerces them back to the correct numeric types
MAP[
'amount', CAST(amount AS STRING),
'hour_of_day', CAST(hour_of_day AS STRING),
'day_of_week', CAST(day_of_week AS STRING),
'is_international', CAST(is_international AS STRING),
'card_type', card_type,
'currency', currency
] AS fraud_features
FROM transactions;
-- ── STREAMING INFERENCE PIPELINE ──────────────────────────────────────────
-- Reads from the feature view, calls ml_score once per row via CROSS JOIN
-- LATERAL (prevents double evaluation), applies risk tiering, and writes
-- qualifying events to the fraud_alerts Kafka topic.
INSERT INTO fraud_alerts
SELECT
-- Deterministic alert ID: transaction_id + score bucket
CONCAT(transaction_id, '-', CAST(FLOOR(fraud_score * 10) AS STRING)) AS alert_id,
transaction_id,
user_id,
amount,
fraud_score,
CASE
WHEN fraud_score >= 0.85 THEN 'CRITICAL'
WHEN fraud_score >= 0.65 THEN 'HIGH'
WHEN fraud_score >= 0.40 THEN 'MEDIUM'
ELSE 'LOW'
END AS risk_tier,
CURRENT_TIMESTAMP AS detected_at
FROM fraud_feature_view
-- CROSS JOIN LATERAL evaluates the UDF once and names the result,
-- so we can reference fraud_score in both SELECT and WHERE without
-- calling the function twice per row.
CROSS JOIN LATERAL (
SELECT COALESCE(
ml_score(fraud_features, 'fraud-detector'),
0.0
) AS fraud_score
) scores
WHERE fraud_score >= 0.40;
-- ── VERIFICATION ──────────────────────────────────────────────────────────
-- 1. Confirm JAR is loaded
SHOW JARS;
-- Expected: otter-stream-sql-1.0.17-flink-udf.jar
-- 2. Confirm UDF is registered
SHOW USER FUNCTIONS;
-- Expected: ml_score appears in the list
-- 3. Smoke-test: static feature map — should return a DOUBLE in [0.0, 1.0]
-- A NULL result means the model is NOT in ModelCache (re-run Section 3)
SELECT ml_score(
MAP[
'amount', '350.00',
'hour_of_day', '23',
'day_of_week', '6',
'is_international', 'true',
'card_type', 'CREDIT',
'currency', 'USD'
],
'fraud-detector'
) AS smoke_test_score;
-- 4. Confirm the connector table is healthy (model loaded from MinIO)
DESCRIBE fraud_model_source;
-- 5. Count rows produced to the alert topic in the last 60 s
-- (requires the INSERT pipeline from Section 6 to be running)
SELECT COUNT(*) AS alerts_in_last_60s
FROM fraud_alerts
WHERE detected_at >= CURRENT_TIMESTAMP - INTERVAL '60' SECOND;
Table API / Non-Studio Platforms
The ml-inference connector works on any platform that runs the Flink
Table / SQL API — not just Str:::Lab Studio. Here is how to use it programmatically
via the Java Table API, for platforms that do not have a Studio UI.
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class TableApiFraudPipeline {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// ── 1. Register the JAR on the classpath ────────────────────────────
// On non-Studio platforms, add the JAR to the Flink classpath at
// job submission time, or use the SQL ADD JAR statement.
tEnv.executeSql(
"ADD JAR '/opt/flink/usrlib/otter-stream-sql-1.0.17-flink-udf.jar'"
);
// ── 2. Register the UDF ─────────────────────────────────────────────
tEnv.executeSql(
"CREATE TEMPORARY FUNCTION IF NOT EXISTS ml_score " +
"AS 'com.codedstreams.otterstreams.sql.udf.MLInferenceFunction' " +
"LANGUAGE JAVA"
);
// ── 3. Preload the model via connector table ─────────────────────────
tEnv.executeSql(
"CREATE TEMPORARY TABLE fraud_model_source (score DOUBLE) WITH (" +
" 'connector' = 'ml-inference'," +
" 'model.name' = 'fraud-detector'," +
" 'model.path' = 's3a://ml-models/fraud-detector/v1/'," +
" 'model.format' = 'tensorflow-savedmodel'," +
" 'model.s3.endpoint' = 'http://minio:9000'," +
" 'model.s3.access-key' = 'minioadmin'," +
" 'model.s3.secret-key' = 'minioadmin'," +
" 'model.s3.path-style' = 'true'," +
" 'cache.enabled' = 'true'," +
" 'cache.ttl-minutes' = '60'" +
")"
);
// ── 4. Source table ─────────────────────────────────────────────────
tEnv.executeSql(
"CREATE TEMPORARY TABLE transactions (" +
" transaction_id STRING," +
" user_id STRING," +
" amount DOUBLE," +
" hour_of_day INT," +
" is_international BOOLEAN," +
" card_type STRING," +
" currency STRING," +
" event_time TIMESTAMP(3)," +
" WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'transactions'," +
" 'properties.bootstrap.servers' = 'kafka:9092'," +
" 'scan.startup.mode' = 'latest-offset'," +
" 'format' = 'json'" +
")"
);
// ── 5. Run inference via Table API expression ────────────────────────
Table scored = tEnv.sqlQuery(
"SELECT " +
" transaction_id," +
" user_id," +
" amount," +
" fraud_score," +
" CASE " +
" WHEN fraud_score >= 0.85 THEN 'CRITICAL' " +
" WHEN fraud_score >= 0.65 THEN 'HIGH' " +
" ELSE 'MEDIUM' " +
" END AS risk_tier " +
"FROM transactions " +
"CROSS JOIN LATERAL (" +
" SELECT COALESCE(ml_score(" +
" MAP['amount', CAST(amount AS STRING)," +
" 'hour_of_day', CAST(hour_of_day AS STRING)," +
" 'is_international', CAST(is_international AS STRING)," +
" 'card_type', card_type," +
" 'currency', currency]," +
" 'fraud-detector'" +
" ), 0.0) AS fraud_score" +
") scores " +
"WHERE fraud_score >= 0.40"
);
// ── 6. Write to Kafka sink ───────────────────────────────────────────
tEnv.executeSql(
"CREATE TEMPORARY TABLE fraud_alerts (" +
" transaction_id STRING, user_id STRING," +
" amount DOUBLE, fraud_score DOUBLE, risk_tier STRING" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'fraud-alerts'," +
" 'properties.bootstrap.servers' = 'kafka:9092'," +
" 'format' = 'json'" +
")"
);
scored.executeInsert("fraud_alerts");
}
}
End-to-End Studio Walkthrough — Quick Reference
Run mvn clean package -DskipTests inside otter-stream-sql/. Collect target/otter-stream-sql-1.0.17-flink-udf.jar.
Studio → Inference Manager → JAR Management → Upload JAR.
Studio → Feature Engineering → add columns, set transforms, name and save as fraud_features_v1.
Studio → Inference Manager → Register UDF → fill three fields → Register Function.
Studio → Inference Manager → Model Sources → Add Model → fill MinIO details → Load Model. Wait for green status.
Studio → Inference Manager → Pipeline Builder → select source/feature set/model/sink → Generate SQL → Execute Pipeline.
Run the Section 7 verification queries. Watch the Metrics panel for live inference latency and cache hit rate.
Studio Tips
ModelCache. Re-run
the connector table creation (Section 3). Then check TaskManager logs for any
MinIO download exception — verify endpoint, credentials, and
model.s3.path-style = 'true'.
ml_score(...) in both SELECT and WHERE without LATERAL
causes Flink to call the UDF twice per row, doubling inference cost. Always use
CROSS JOIN LATERAL (...) scores to name the score once.
CREATE TEMPORARY VIEW (Section 5) keeps
the inference pipeline query clean and lets you version or A/B test feature sets
independently of the model.
'model.s3.path-style' = 'true'. Virtual-hosted style
(bucket.host) requires DNS configuration that is typically absent in
local and staging MinIO deployments.