Introduction
Otter Streams bridges the gap between machine learning model deployment and Apache Flink's streaming SQL runtime. Rather than embedding model-serving logic inside custom Java operators, data engineers register ML models as standard Flink SQL functions and call them inline in any Flink SQL query — no Java required at query time.
The otter-stream-sql module exposes two primary extension points: a
scalar UDF (MLInferenceFunction) for synchronous per-row
inference, and a lookup table function (MLInferenceLookupFunction)
for temporal joins. Both share the same ModelCache and inference engine
abstractions defined in ml-inference-core. Models are downloaded from MinIO
(or any S3-compatible store) at engine initialisation time and held in a Caffeine LRU cache
for the session lifetime.
CREATE FUNCTION statement. No Java operator required in the query itself.ModelCache singleton avoids repeated engine initialisation across task slots and query re-runs within a session.model.format DDL option. The SQL query is unchanged.eval() method uses standard Flink type-extraction patterns. No custom TypeInformation factories or annotations are required.Quick Start
The shortest path to running ML inference in Flink SQL has three steps: build the shaded JAR, upload it to the Studio, then register and call the function.
Clone the repository and run the Maven package goal. The shade plugin produces a
self-contained *-flink-udf.jar that bundles ml-inference-core
and all other runtime dependencies, while leaving Flink itself as a provided dependency.
git clone https://github.com/martourez21/otter-streams.git cd otter-streams mvn clean install -DskipTests # The artifact you need — note the -flink-udf classifier suffix: ls otter-stream-sql/target/otter-stream-sql-1.0.17-flink-udf.jar
Open the Str:::Lab Studio UDF Manager, navigate to the Upload JAR tab, and
drag otter-stream-sql-1.0.17-flink-udf.jar into the drop zone. The Studio
copies the file to /var/www/udf-jars/ on the Gateway container and
automatically executes ADD JAR in the active session.
Create the connector table first — this triggers the MinIO download and warms
ModelCache. Then register the scalar function and run the smoke test.
-- 1. Download the model from MinIO into ModelCache
CREATE TEMPORARY TABLE fraud_model_source (score DOUBLE)
WITH (
'connector' = 'ml-inference',
'model.name' = 'fraud-detector',
'model.path' = 's3a://ml-models/fraud-detector/v1/',
'model.format' = 'tensorflow-savedmodel',
'model.s3.endpoint' = 'http://minio:9000',
'model.s3.access-key' = 'minioadmin',
'model.s3.secret-key' = 'minioadmin',
'model.s3.path-style' = 'true',
'cache.enabled' = 'true'
);
-- 2. Register the scalar UDF
CREATE TEMPORARY FUNCTION IF NOT EXISTS ml_score
AS 'com.codedstreams.otterstreams.sql.udf.MLInferenceFunction'
LANGUAGE JAVA;
-- 3. Smoke test — should return a DOUBLE between 0.0 and 1.0
SELECT ml_score(
MAP['amount', '320.00', 'hour_of_day', '22', 'is_international', 'true'],
'fraud-detector'
) AS score;
NULL the model was not found in
ModelCache. This usually means the connector table was not created yet, or the
MinIO download failed. Check the TaskManager logs for a download exception and verify that
model.s3.path-style = 'true' is set — MinIO requires path-style access.
Architecture
Otter Streams is built on two parallel tracks: a DataStream API path for
low-level operator control and an SQL / Table API path for declarative
pipelines. Both converge on the same InferenceEngine abstraction and share
the ModelCache singleton. Models are fetched from MinIO (or any S3-compatible
store) once per session and reused across task slots. Flink is always a
provided dependency — it is never bundled in the shaded JAR, so the same
artifact runs across Flink 1.15 through 1.20 without recompilation.
ModelCache and
InferenceEngine abstraction. Models are downloaded from MinIO at
initialisation time and held for the session lifetime.
The critical design constraint is that Flink is always provided
scope. The shaded JAR bundles every runtime dependency except Flink and SLF4J,
so the same artifact works across Flink 1.15 through 1.20 without recompilation. Engine
modules are optional — include only the ones your pipeline requires to keep the JAR size
manageable.
UDF Class Reference — MLInferenceFunction
MLInferenceFunction is the primary entry point for SQL-based inference. It
extends Flink's ScalarFunction and performs synchronous per-row inference by
delegating to the named engine in ModelCache.
Method signature
Parameters
| # | Name | Java type | SQL type (Studio) | Description |
|---|---|---|---|---|
| 1 | features |
Map<String, Object> |
MAP<STRING,STRING> | Feature map passed to the inference engine. Keys are feature names; values are cast to the correct numeric types internally by the engine. |
| 2 | modelName |
String |
STRING | Logical model name that must match the model.name DDL option used when the connector table was created. |
eval() signature via reflection at registration time.
Parameter types are not written into the CREATE FUNCTION
DDL — they are inferred automatically. The entries in the Studio parameter builder are
advisory only; they populate the usage preview snippet but have no effect on the generated
DDL or on runtime behaviour.
Registering the UDF in Str:::Lab Studio
The Studio UDF Manager (Register UDF tab) generates and executes the session-scoped DDL automatically. Fill in the three required fields below; leave all other settings at their defaults.
Required fields
ml_scoreJAVAcom.codedstreams.otterstreams.sql.udf.MLInferenceFunctioneval — Flink resolves via reflection; not written into DDLParameter entries — advisory only
| # | Name | Type to select in Studio dropdown |
|---|---|---|
| 1 | features | MAP<STRING,STRING> |
| 2 | modelName | STRING |
DDL the Studio executes
-- Loads the shaded JAR onto the Gateway session classpath ADD JAR '/var/www/udf-jars/otter-stream-sql-1.0.17-flink-udf.jar'; -- Registers the scalar function CREATE TEMPORARY FUNCTION IF NOT EXISTS ml_score AS 'com.codedstreams.otterstreams.sql.udf.MLInferenceFunction' LANGUAGE JAVA;
ADD JAR and
CREATE TEMPORARY FUNCTION are bound to the active Gateway session. After any
session renewal or disconnect you must re-register. The Studio UDF Manager handles this
automatically when you click Register Function at the start of a new session.
SQL Usage Examples
Basic per-row inference
SELECT
transaction_id,
amount,
ml_score(features, 'fraud-detector') AS fraud_score
FROM transactions;
Filtering without double evaluation
Use CROSS JOIN LATERAL to name the score once and reference it in both the
SELECT list and the WHERE clause. This prevents Flink from
invoking the UDF twice per row, which would double the inference cost.
INSERT INTO fraud_alerts
SELECT
transaction_id,
fraud_score,
CASE
WHEN fraud_score >= 0.85 THEN 'CRITICAL'
WHEN fraud_score >= 0.65 THEN 'HIGH'
ELSE 'MEDIUM'
END AS risk_tier,
CURRENT_TIMESTAMP AS detected_at
FROM transactions
CROSS JOIN LATERAL (
SELECT COALESCE(ml_score(features, 'fraud-detector'), 0.0) AS fraud_score
) scores
WHERE fraud_score >= 0.40;
Inline feature map construction
If the upstream producer does not pre-assemble a MAP column, build one inline
using Flink's MAP[key, value, ...] constructor. Cast all values to
STRING to satisfy the UDF type; the engine casts them back to numerics
internally.
SELECT
transaction_id,
ml_score(
MAP[
'amount', CAST(amount AS STRING),
'hour_of_day', CAST(hour_of_day AS STRING),
'is_international', CAST(is_international AS STRING),
'card_type', card_type,
'currency', currency
],
'fraud-detector'
) AS fraud_score
FROM transactions;
Table Connector — ml-inference
The ml-inference connector is discovered at runtime via Java SPI from
META-INF/services/org.apache.flink.table.factories.Factory. Its primary role is
to preload models from MinIO into ModelCache before any
streaming query runs, so that the first rows processed do not incur model-download latency.
Preloading a TensorFlow model from MinIO
CREATE TEMPORARY TABLE fraud_model_source (
score DOUBLE,
confidence DOUBLE
) WITH (
'connector' = 'ml-inference',
'model.name' = 'fraud-detector',
'model.path' = 's3a://ml-models/fraud-detector/v1/',
'model.format' = 'tensorflow-savedmodel',
'model.s3.endpoint' = 'http://minio:9000',
'model.s3.access-key' = 'minioadmin',
'model.s3.secret-key' = 'minioadmin',
'model.s3.path-style' = 'true',
'cache.enabled' = 'true',
'cache.ttl-minutes' = '60'
);
-- DESCRIBE confirms the connector initialised without error.
-- If the MinIO download failed you will see the exception here
-- rather than silently at inference time.
DESCRIBE fraud_model_source;
Preloading an XGBoost model from MinIO
CREATE TEMPORARY TABLE anomaly_model_source (
score DOUBLE
) WITH (
'connector' = 'ml-inference',
'model.name' = 'anomaly-detector',
'model.path' = 's3a://ml-models/anomaly-detector/v2/model.json',
'model.format' = 'xgboost-json',
'model.s3.endpoint' = 'http://minio:9000',
'model.s3.access-key' = 'minioadmin',
'model.s3.secret-key' = 'minioadmin',
'model.s3.path-style' = 'true',
'cache.enabled' = 'true'
);
Lookup join (temporal join)
SELECT
t.transaction_id,
t.amount,
p.score AS fraud_score,
p.confidence AS model_confidence
FROM transactions AS t
JOIN fraud_model_source FOR SYSTEM_TIME AS OF t.proc_time AS p
ON t.feature_key = p.feature_key
WHERE p.score > 0.5;
For the full DDL option reference see API Reference → Connector DDL Options.
Building the Shaded JAR
The otter-stream-sql module ships as a thin JAR by default. When the Flink SQL
Gateway registers MLInferenceFunction it reflects over all method signatures,
including the throws InferenceException clause on eval(). If
InferenceException is not on the Gateway classpath the JVM raises
NoClassDefFoundError and the registration fails with a
ValidationException: due to implementation errors.
The fix is to build a shaded JAR that bundles all runtime dependencies except Flink and
SLF4J. Add the following plugin configuration to otter-stream-sql/pom.xml:
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${maven-shade-plugin.version}</version>
<executions>
<execution>
<phase>package</phase>
<goals><goal>shade</goal></goals>
<configuration>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>flink-udf</shadedClassifierName>
<createDependencyReducedPom>false</createDependencyReducedPom>
<artifactSet>
<excludes>
<!-- Provided by the Gateway at runtime -->
<exclude>org.apache.flink:*</exclude>
<!-- SLF4J API only; implementation supplied by Flink -->
<exclude>org.slf4j:slf4j-api</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<!-- Merges SPI service files from all bundled JARs -->
<transformer implementation=
"org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
Build and verify
cd otter-stream-sql mvn clean package -DskipTests # All four lines must appear in the output: jar tf target/otter-stream-sql-1.0.17-flink-udf.jar \ | grep -E 'InferenceException|InferenceEngine|ModelCache|caffeine'
tensorflow-core-platform includes
platform-native shared libraries that add several hundred megabytes to the JAR. If you are
not using TensorFlow-format models, exclude the module from the shade to keep the artifact
small: add <exclude>org.tensorflow:*</exclude> to the
<excludes> block.
Troubleshooting
The most common failure is NoClassDefFoundError: InferenceException during
CREATE FUNCTION. Flink's reflection scanner walks every method signature on the
UDF class, including exception types declared in throws clauses. Any class it
cannot resolve causes the validation to abort with an implementation errors message.
SHOW JARS; -- otter-stream-sql-1.0.17-flink-udf.jar must appear in the output
If the JAR is absent, navigate to the Studio Register UDF tab, click SHOW JARS, then enter the JAR path and click ADD JAR.
jar tf otter-stream-sql-1.0.17-flink-udf.jar | grep InferenceException # Expected output: # com/codedstream/otterstream/inference/exception/InferenceException.class
If the class is absent you are using the plain thin JAR, not the
*-flink-udf.jar shaded artifact. Rebuild with the shade plugin
configuration shown above, then re-upload the correct file.
DROP TEMPORARY FUNCTION IF EXISTS ml_score; CREATE TEMPORARY FUNCTION ml_score AS 'com.codedstreams.otterstreams.sql.udf.MLInferenceFunction' LANGUAGE JAVA;
SHOW USER FUNCTIONS; -- ml_score must appear in the list
Error Reference
| Error | Likely cause | Fix |
|---|---|---|
NoClassDefFoundError: InferenceException |
ml-inference-core not bundled in the JAR |
Rebuild with shade plugin; upload the *-flink-udf.jar classifier artifact, not the plain JAR |
ClassNotFoundException: MLInferenceFunction |
ADD JAR was skipped, or the JAR path on the Gateway container is wrong |
Run SHOW JARS; use the Studio path chips to confirm the correct container path |
already exists |
Function registered earlier in the same session under the same name | Use IF NOT EXISTS, or DROP TEMPORARY FUNCTION then recreate |
NoSuchMethodError (Scala) |
Scala binary version mismatch between the JAR and the Flink cluster | Rebuild against Flink's Scala binary version — 2.12 for Flink 1.17 |
ml_score returns NULL for every row |
Model name not found in ModelCache; connector table not created yet |
Create the connector table to trigger MinIO download; verify the model name matches exactly |
| MinIO download fails silently | Wrong endpoint, credentials, or bucket; path-style access not enabled | Set 'model.s3.path-style' = 'true'; inspect TaskManager logs for the underlying exception |