MinIO Pipeline Demo
This is the reference integration test for Str:::Lab Studio. It demonstrates the complete lifecycle:
uploading a model to MinIO, loading it into the Otter Streams ModelCache via a
pre-load job, registering the UDF, and running a streaming inference query.
ADD JAR and CREATE FUNCTION for you if you use the Register UDF tab;
these blocks show the raw SQL equivalents for reference and for pipeline automation.
Prerequisites
| Requirement | Value / Notes |
|---|---|
| Shaded JAR uploaded | otter-stream-sql-1.0.17-flink-udf.jar in /var/www/udf-jars/ |
| MinIO endpoint | Accessible from the Flink TaskManagers (not just from the browser) |
| MinIO bucket | ml-models (adjust to your bucket name) |
| Model path | ml-models/fraud-detector/v1/ — contains saved_model.pb + variables/ |
| Flink session | Active gateway session in Studio — check the session indicator |
| Kafka topics | transactions (input), fraud-alerts (output) |
MinIO Configuration
The otter-stream-sql module uses the MinIO Java client bundled inside the shaded JAR
to download model files at engine initialisation time. Pass the connection details through
the DDL WITH options or through Flink's configuration properties.
Set session-level MinIO properties
These SET statements write into the Flink session configuration and are picked up
by the MinioModelLoader inside the SQL module. Run them once at the start of
every Studio session.
-- ── Section 1: MinIO connection properties ────────────────────────────── -- Adjust endpoint, credentials, and bucket to match your deployment. -- These are session-scoped: re-run after any session renewal. SET 'otter.minio.endpoint' = 'http://minio:9000'; SET 'otter.minio.access-key' = 'minioadmin'; SET 'otter.minio.secret-key' = 'minioadmin'; SET 'otter.minio.bucket' = 'ml-models'; SET 'otter.model.cache.max-size' = '20'; SET 'otter.model.cache.ttl-minutes' = '60'; -- Verify the properties are visible to the session SET;
JAR and UDF Registration
If you are running this demo manually through the Studio SQL editor rather than the UDF Manager, execute the following block to load the JAR and register the function. The Studio UDF Manager does this automatically when you click Register Function.
-- ── Section 2: Load JAR and register scalar UDF ───────────────────────── -- Load the shaded JAR onto the Gateway session classpath. -- Path must exist on the SQL Gateway container filesystem. ADD JAR '/var/www/udf-jars/otter-stream-sql-1.0.17-flink-udf.jar'; -- Confirm the JAR is visible SHOW JARS; -- Register the ML inference scalar function CREATE TEMPORARY FUNCTION IF NOT EXISTS ml_score AS 'com.codedstreams.otterstreams.sql.udf.MLInferenceFunction' LANGUAGE JAVA; -- Confirm registration SHOW USER FUNCTIONS;
Model Preloading
ModelCache is populated lazily — the first call to
ml_score(features, 'fraud-detector') that finds no cached engine will attempt to
load the model from MinIO. For production pipelines this cold-start latency is undesirable.
The preferred pattern is to declare a connector table that forces the engine to load during
table creation, before any streaming query begins.
-- ── Section 3: Preload the model into ModelCache ────────────────────────
-- Creating this connector table triggers MinIO download and engine init.
-- The table itself is not queried directly in this demo; it exists solely
-- to warm the ModelCache before the streaming pipeline starts.
CREATE TEMPORARY TABLE fraud_model_source (
score DOUBLE,
confidence DOUBLE
) WITH (
'connector' = 'ml-inference',
-- Logical name used in ml_score(features, '')
'model.name' = 'fraud-detector',
-- MinIO path: s3a:////
-- The loader downloads saved_model.pb + variables/ from this prefix.
'model.path' = 's3a://ml-models/fraud-detector/v1/',
'model.format' = 'tensorflow-savedmodel',
'model.version' = 'v1',
-- MinIO / S3-compatible storage settings
'model.s3.endpoint' = 'http://minio:9000',
'model.s3.access-key'= 'minioadmin',
'model.s3.secret-key'= 'minioadmin',
'model.s3.path-style'= 'true',
-- Cache behaviour
'cache.enabled' = 'true',
'cache.ttl-minutes' = '60',
-- Inference settings
'batch.size' = '1',
'async.enabled' = 'false'
);
-- A lightweight describe confirms the connector initialised without error.
-- If the model failed to download you will see an error here rather than
-- silently at inference time.
DESCRIBE fraud_model_source;
Source and Sink Tables
-- ── Section 4: Kafka source table (transactions) ─────────────────────────
-- Adjust bootstrap.servers and topic to match your Kafka deployment.
CREATE TEMPORARY TABLE transactions (
transaction_id STRING,
user_id STRING,
merchant_id STRING,
amount DOUBLE,
currency STRING,
card_type STRING,
hour_of_day INT,
day_of_week INT,
is_international BOOLEAN,
-- Feature map pre-assembled upstream (or computed in the query below)
features MAP<STRING, STRING>,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'transactions',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'otter-fraud-demo',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.timestamp-format.standard'= 'ISO-8601',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
-- ── Kafka sink table (fraud alerts) ───────────────────────────────────────
CREATE TEMPORARY TABLE fraud_alerts (
alert_id STRING,
transaction_id STRING,
user_id STRING,
amount DOUBLE,
fraud_score DOUBLE,
risk_tier STRING,
detected_at TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'fraud-alerts',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
);
Full Streaming Pipeline
This is the core inference pipeline. It reads from the transactions Kafka topic,
builds a feature map inline, calls ml_score to obtain a fraud probability,
classifies the result into risk tiers, and writes qualifying events to fraud-alerts.
Note that the features map is assembled using MAP[...] syntax so
the pipeline works even when the upstream producer has not pre-assembled it. In practice you
would either pre-assemble the map in the producer or use a Flink SQL computed column.
-- ── Section 5: Real-time fraud inference pipeline ───────────────────────
-- Writes scored events with risk_score > 0.40 to the fraud-alerts topic.
-- Adjust the threshold to suit your operating point.
INSERT INTO fraud_alerts
SELECT
-- Deterministic alert ID from transaction ID + score bucket
CONCAT(transaction_id, '-', CAST(FLOOR(fraud_score * 10) AS STRING))
AS alert_id,
transaction_id,
user_id,
amount,
fraud_score,
-- Risk tier classification downstream of the model score
CASE
WHEN fraud_score >= 0.85 THEN 'CRITICAL'
WHEN fraud_score >= 0.65 THEN 'HIGH'
WHEN fraud_score >= 0.40 THEN 'MEDIUM'
ELSE 'LOW'
END AS risk_tier,
CURRENT_TIMESTAMP AS detected_at
FROM (
SELECT
transaction_id,
user_id,
amount,
-- Build feature map from individual columns.
-- All values are cast to STRING because the UDF accepts MAP<STRING,STRING>.
-- The inference engine casts them back to the correct numeric types internally.
MAP[
'amount', CAST(amount AS STRING),
'hour_of_day', CAST(hour_of_day AS STRING),
'day_of_week', CAST(day_of_week AS STRING),
'is_international', CAST(is_international AS STRING),
'card_type', card_type,
'currency', currency
] AS feature_map,
event_time
FROM transactions
) enriched
-- Call the registered UDF; COALESCE guards against NULL when the
-- model name is not yet in ModelCache (should not happen after preload).
CROSS JOIN LATERAL (
SELECT COALESCE(
ml_score(feature_map, 'fraud-detector'),
0.0
) AS fraud_score
) scores
WHERE fraud_score >= 0.40;
CROSS JOIN LATERAL pattern evaluates the scalar UDF once per row and makes
the result available as a named column (fraud_score) that can be referenced in
both the SELECT list and the WHERE clause without calling the
function twice. This is equivalent to a WITH subquery but avoids the double
evaluation that Flink may apply to scalar UDFs referenced in both positions.
Verification Queries
Run these in the Studio SQL editor to confirm the pipeline is healthy. They do not start new jobs — they are bounded queries against the current session state.
-- ── Section 6: Verification ──────────────────────────────────────────────
-- 1. Confirm the shaded JAR is on the classpath
SHOW JARS;
-- Expected: .../otter-stream-sql-1.0.17-flink-udf.jar
-- 2. Confirm the UDF is registered
SHOW USER FUNCTIONS;
-- Expected: ml_score appears in the list
-- 3. Smoke-test the UDF with a static feature map.
-- If the model is loaded this returns a DOUBLE between 0.0 and 1.0.
-- If it returns NULL the model name was not found in ModelCache.
SELECT ml_score(
MAP['amount', '250.00',
'hour_of_day', '23',
'day_of_week', '6',
'is_international', 'true',
'card_type', 'CREDIT',
'currency', 'USD'],
'fraud-detector'
) AS smoke_test_score;
-- 4. Confirm the connector table is accessible (model loaded from MinIO)
DESCRIBE fraud_model_source;
-- 5. Count messages received on the transactions topic in the last 60 s
-- (requires the streaming job from Section 5 to be running)
SELECT COUNT(*) AS events_in_last_60s
FROM transactions
WHERE event_time >= CURRENT_TIMESTAMP - INTERVAL '60' SECOND;
Fraud Detection — Extended Example
A more complete version of the pipeline that adds windowed aggregation for velocity features,
a dead-letter sink for rows where the model returned NULL, and a Flink view that
abstracts the feature-building logic away from the inference query.
-- ── Extended fraud detection with velocity features ──────────────────────
-- Step A: Create a view that computes velocity features using a 5-minute
-- tumbling window before the inference step.
CREATE TEMPORARY VIEW transaction_velocity AS
SELECT
user_id,
COUNT(*) AS txn_count_5m,
SUM(amount) AS total_amount_5m,
MAX(amount) AS max_amount_5m,
COUNT(DISTINCT merchant_id) AS distinct_merchants_5m,
TUMBLE_START(event_time, INTERVAL '5' MINUTE) AS window_start
FROM transactions
GROUP BY
user_id,
TUMBLE(event_time, INTERVAL '5' MINUTE);
-- Step B: Join raw transactions with velocity window and run inference.
-- The join key is user_id; window_start aligns to the tumble boundary.
CREATE TEMPORARY VIEW scored_transactions AS
SELECT
t.transaction_id,
t.user_id,
t.amount,
t.event_time,
COALESCE(
ml_score(
MAP[
'amount', CAST(t.amount AS STRING),
'hour_of_day', CAST(t.hour_of_day AS STRING),
'day_of_week', CAST(t.day_of_week AS STRING),
'is_international', CAST(t.is_international AS STRING),
'card_type', t.card_type,
'currency', t.currency,
-- Velocity features from the window join
'txn_count_5m', CAST(v.txn_count_5m AS STRING),
'total_amount_5m', CAST(v.total_amount_5m AS STRING),
'max_amount_5m', CAST(v.max_amount_5m AS STRING),
'distinct_merchants_5m', CAST(v.distinct_merchants_5m AS STRING)
],
'fraud-detector'
),
-1.0 -- sentinel: model returned NULL (engine not loaded)
) AS fraud_score
FROM transactions t
LEFT JOIN transaction_velocity v
ON t.user_id = v.user_id
AND t.event_time >= v.window_start
AND t.event_time < v.window_start + INTERVAL '5' MINUTE;
-- Step C: Route high-confidence fraud to the alerts sink,
-- and model errors (sentinel -1.0) to a dead-letter topic.
INSERT INTO fraud_alerts
SELECT
CONCAT(transaction_id, '-alert') AS alert_id,
transaction_id,
user_id,
amount,
fraud_score,
CASE
WHEN fraud_score >= 0.85 THEN 'CRITICAL'
WHEN fraud_score >= 0.65 THEN 'HIGH'
ELSE 'MEDIUM'
END AS risk_tier,
event_time AS detected_at
FROM scored_transactions
WHERE fraud_score >= 0.40;
Anomaly Detection — IoT Sensors
A second demo using an XGBoost model stored in MinIO. The model is a binary classifier
trained on sensor telemetry; it flags readings that deviate significantly from the expected
distribution. The model file is a JSON-format XGBoost model at
ml-models/anomaly-detector/v2/model.json.
-- ── IoT anomaly detection with XGBoost model from MinIO ─────────────────
-- 1. Preload the XGBoost model from MinIO
CREATE TEMPORARY TABLE anomaly_model_source (
score DOUBLE
) WITH (
'connector' = 'ml-inference',
'model.name' = 'anomaly-detector',
'model.path' = 's3a://ml-models/anomaly-detector/v2/model.json',
'model.format' = 'xgboost-json',
'model.s3.endpoint' = 'http://minio:9000',
'model.s3.access-key' = 'minioadmin',
'model.s3.secret-key' = 'minioadmin',
'model.s3.path-style' = 'true',
'cache.enabled' = 'true'
);
-- 2. Sensor readings source
CREATE TEMPORARY TABLE sensor_readings (
device_id STRING,
temperature DOUBLE,
pressure DOUBLE,
humidity DOUBLE,
vibration DOUBLE,
rpm DOUBLE,
reading_time TIMESTAMP(3),
WATERMARK FOR reading_time AS reading_time - INTERVAL '10' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'sensor-readings',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'otter-anomaly-demo',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
-- 3. Anomaly alerts sink
CREATE TEMPORARY TABLE anomaly_alerts (
device_id STRING,
anomaly_score DOUBLE,
severity STRING,
reading_time TIMESTAMP(3),
detected_at TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'anomaly-alerts',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
);
-- 4. Sliding window aggregation then inference
INSERT INTO anomaly_alerts
SELECT
device_id,
anomaly_score,
CASE
WHEN anomaly_score >= 0.90 THEN 'CRITICAL'
WHEN anomaly_score >= 0.70 THEN 'HIGH'
ELSE 'MEDIUM'
END AS severity,
window_end AS reading_time,
CURRENT_TIMESTAMP AS detected_at
FROM (
SELECT
device_id,
HOP_END(reading_time, INTERVAL '30' SECOND, INTERVAL '5' MINUTE) AS window_end,
COALESCE(
ml_score(
MAP[
'temperature', CAST(AVG(temperature) AS STRING),
'pressure', CAST(AVG(pressure) AS STRING),
'humidity', CAST(AVG(humidity) AS STRING),
'vibration', CAST(AVG(vibration) AS STRING),
'rpm', CAST(AVG(rpm) AS STRING)
],
'anomaly-detector'
),
0.0
) AS anomaly_score
FROM sensor_readings
GROUP BY
device_id,
HOP(reading_time, INTERVAL '30' SECOND, INTERVAL '5' MINUTE)
) windowed
WHERE anomaly_score >= 0.60;
Str:::Lab Studio Tips
SELECT ml_score(MAP[...], 'fraud-detector') query in Section 6
is the fastest way to confirm end-to-end connectivity — JAR loaded, UDF registered,
model in cache, engine producing output — without starting a streaming job.
ml_score returns NULL the model name was not found in
ModelCache. Re-run the connector table creation statement to trigger a fresh
MinIO download. Check the Flink TaskManager logs for download errors.
MAP<STRING,STRING> as the closest
available type for the features argument. The UDF accepts
Map<String, Object> at runtime; Flink's type system handles the
coercion automatically.
'model.s3.path-style' = 'true' when connecting to MinIO.
Virtual-hosted style (bucket.host) requires DNS configuration that is
typically not present in local deployments.