OS
Otter Streams v1.0.17
Flink 1.15–1.20 GitHub

Introduction

Otter Streams bridges the gap between machine learning model deployment and Apache Flink's streaming SQL runtime. Rather than embedding model-serving logic inside custom Java operators, data engineers register ML models as standard Flink SQL functions and call them inline in any Flink SQL query — no Java required at query time.

The otter-stream-sql module exposes two primary extension points: a scalar UDF (MLInferenceFunction) for synchronous per-row inference, and a lookup table function (MLInferenceLookupFunction) for temporal joins. Both share the same ModelCache and inference engine abstractions defined in ml-inference-core. Models are downloaded from MinIO (or any S3-compatible store) at engine initialisation time and held in a Caffeine LRU cache for the session lifetime.

Zero-code SQL deployment
Register any supported model format with a single CREATE FUNCTION statement. No Java operator required in the query itself.
MinIO / S3 model loading
The connector downloads SavedModel directories and single-file formats (ONNX, XGBoost, PMML) from any S3-compatible endpoint at table-creation time.
LRU model caching
The Caffeine-backed ModelCache singleton avoids repeated engine initialisation across task slots and query re-runs within a session.
Framework-agnostic
Swap between ONNX, TensorFlow, XGBoost, and PMML engines by changing the model.format DDL option. The SQL query is unchanged.
Flink reflection-compatible
The eval() method uses standard Flink type-extraction patterns. No custom TypeInformation factories or annotations are required.
Production observability
All engines emit Micrometer metrics — inference latency, request count, cache hit rate, and error rate — exportable to Prometheus, InfluxDB, or Datadog.

Quick Start

The shortest path to running ML inference in Flink SQL has three steps: build the shaded JAR, upload it to the Studio, then register and call the function.

1
Build the shaded JAR

Clone the repository and run the Maven package goal. The shade plugin produces a self-contained *-flink-udf.jar that bundles ml-inference-core and all other runtime dependencies, while leaving Flink itself as a provided dependency.

Shell
git clone https://github.com/martourez21/otter-streams.git
cd otter-streams
mvn clean install -DskipTests

# The artifact you need — note the -flink-udf classifier suffix:
ls otter-stream-sql/target/otter-stream-sql-1.0.17-flink-udf.jar
2
Upload the JAR in Studio

Open the Str:::Lab Studio UDF Manager, navigate to the Upload JAR tab, and drag otter-stream-sql-1.0.17-flink-udf.jar into the drop zone. The Studio copies the file to /var/www/udf-jars/ on the Gateway container and automatically executes ADD JAR in the active session.

3
Preload the model and register the UDF

Create the connector table first — this triggers the MinIO download and warms ModelCache. Then register the scalar function and run the smoke test.

SQL
-- 1. Download the model from MinIO into ModelCache
CREATE TEMPORARY TABLE fraud_model_source (score DOUBLE)
WITH (
    'connector'           = 'ml-inference',
    'model.name'          = 'fraud-detector',
    'model.path'          = 's3a://ml-models/fraud-detector/v1/',
    'model.format'        = 'tensorflow-savedmodel',
    'model.s3.endpoint'   = 'http://minio:9000',
    'model.s3.access-key' = 'minioadmin',
    'model.s3.secret-key' = 'minioadmin',
    'model.s3.path-style' = 'true',
    'cache.enabled'       = 'true'
);

-- 2. Register the scalar UDF
CREATE TEMPORARY FUNCTION IF NOT EXISTS ml_score
AS 'com.codedstreams.otterstreams.sql.udf.MLInferenceFunction'
LANGUAGE JAVA;

-- 3. Smoke test — should return a DOUBLE between 0.0 and 1.0
SELECT ml_score(
    MAP['amount', '320.00', 'hour_of_day', '22', 'is_international', 'true'],
    'fraud-detector'
) AS score;
i
If the smoke-test returns NULL the model was not found in ModelCache. This usually means the connector table was not created yet, or the MinIO download failed. Check the TaskManager logs for a download exception and verify that model.s3.path-style = 'true' is set — MinIO requires path-style access.

Architecture

Otter Streams is built on two parallel tracks: a DataStream API path for low-level operator control and an SQL / Table API path for declarative pipelines. Both converge on the same InferenceEngine abstraction and share the ModelCache singleton. Models are fetched from MinIO (or any S3-compatible store) once per session and reused across task slots. Flink is always a provided dependency — it is never bundled in the shaded JAR, so the same artifact runs across Flink 1.15 through 1.20 without recompilation.

Otter Streams — System Architecture
flowchart TB subgraph SOURCES[" Data Sources "] direction LR K[("Kafka\nTopic")] S[("S3 /\nHDFS")] DB[("Database\nCDC")] end subgraph FLINK[" Apache Flink Cluster "] direction TB subgraph JM["Job Manager"] JMC["Job Coordination\n& Scheduling"] end subgraph TM["Task Managers (N slots)"] direction TB subgraph DS_PATH["DataStream API path"] direction LR SRC["SourceFunction\n/ KafkaSource"] ASYNC["AsyncDataStream\n.unorderedWait()"] AMIF["AsyncModelInference\nFunction"] SINK_DS["DataStream\nSink"] SRC --> ASYNC --> AMIF --> SINK_DS end subgraph SQL_PATH["Flink SQL / Table API path"] direction LR GW["SQL Gateway\nSession"] UDF["MLInference\nFunction\n(ScalarFunction)"] LKP["MLInference\nLookupFunction\n(TableFunction)"] CONN["ml-inference\nConnector\n(DynamicTableSource)"] SINK_SQL["Table /\nKafka Sink"] GW --> UDF GW --> LKP GW --> CONN UDF --> SINK_SQL LKP --> SINK_SQL end subgraph CORE["ml-inference-core (shared by both paths)"] direction LR MC["ModelCache\nCaffeine LRU\nsingleton"] IE["InferenceEngine<C>\ninterface"] IR["InferenceResult\n+ InferenceException"] MC --> IE --> IR end AMIF -- "getEngine(name)" --> MC UDF -- "getEngine(name)" --> MC LKP -- "getEngine(name)" --> MC CONN -- "putEngine(name, engine)" --> MC end end subgraph ENGINES[" Engine Modules (include only what you need) "] direction LR ONNX["otter-stream-onnx\nONNX Runtime 1.23.2\nCPU / CUDA / TensorRT"] TF["otter-stream-tensorflow\nSavedModel format\nGPU acceleration"] XGB["otter-streams-xgboost\nXGBoost4J 3.1.1\nbinary / JSON / UBJ"] PMML["otter-stream-pmml\nJPMML 1.5.16\nregression / trees / NN"] REM["otter-stream-remote\nHTTP / gRPC\nSageMaker · Vertex AI"] end subgraph STORE[" Model Store "] direction LR MINIO[("MinIO\nS3-compatible\nbucket")] S3AWS[("AWS S3")] LOCAL[("Local / HDFS\nfilesystem")] end SOURCES --> DS_PATH SOURCES --> SQL_PATH IE -- "implements" --> ONNX IE -- "implements" --> TF IE -- "implements" --> XGB IE -- "implements" --> PMML IE -- "implements" --> REM CONN -- "MinioModelLoader\ndownloads on\ntable creation" --> MINIO AMIF -- "ModelConfig\n.modelPath" --> MINIO MINIO -.->|"also supports"| S3AWS MINIO -.->|"also supports"| LOCAL classDef flinkBox fill:#1a1f28,stroke:#0091d5,color:#c9d1d9,rx:6 classDef coreBox fill:#0a2a3d,stroke:#0091d5,color:#a5d6ff,rx:6 classDef engineBox fill:#0d1f0d,stroke:#00875a,color:#a7f3d0,rx:6 classDef storeBox fill:#2a1a0d,stroke:#b45309,color:#fde68a,rx:6 classDef srcBox fill:#1a1a2a,stroke:#5c6878,color:#b0bac8,rx:6 classDef pathBox fill:#111827,stroke:#2c3340,color:#8492a6,rx:4 class FLINK,JM,TM flinkBox class CORE coreBox class ENGINES,ONNX,TF,XGB,PMML,REM engineBox class STORE,MINIO,S3AWS,LOCAL storeBox class SOURCES,K,S,DB srcBox class DS_PATH,SQL_PATH pathBox
Both the DataStream and SQL paths share a single ModelCache and InferenceEngine abstraction. Models are downloaded from MinIO at initialisation time and held for the session lifetime.

The critical design constraint is that Flink is always provided scope. The shaded JAR bundles every runtime dependency except Flink and SLF4J, so the same artifact works across Flink 1.15 through 1.20 without recompilation. Engine modules are optional — include only the ones your pipeline requires to keep the JAR size manageable.

UDF Class Reference — MLInferenceFunction

MLInferenceFunction is the primary entry point for SQL-based inference. It extends Flink's ScalarFunction and performs synchronous per-row inference by delegating to the named engine in ModelCache.

Method signature

Class
com.codedstreams.otterstreams.sql.udf.MLInferenceFunction
Extends
org.apache.flink.table.functions.ScalarFunction
Entry method
eval(Map<String, Object> features, String modelName)
Return type
Double — nullable; NULL when model absent, inference fails, or output is non-numeric
Declared exception
throws InferenceException — this class must be present in the shaded JAR
Serialisation
modelCache field is transient; lazily re-initialised per task slot via singleton getInstance()

Parameters

#NameJava typeSQL type (Studio)Description
1 features Map<String, Object> MAP<STRING,STRING> Feature map passed to the inference engine. Keys are feature names; values are cast to the correct numeric types internally by the engine.
2 modelName String STRING Logical model name that must match the model.name DDL option used when the connector table was created.
i
Flink discovers the eval() signature via reflection at registration time. Parameter types are not written into the CREATE FUNCTION DDL — they are inferred automatically. The entries in the Studio parameter builder are advisory only; they populate the usage preview snippet but have no effect on the generated DDL or on runtime behaviour.

Registering the UDF in Str:::Lab Studio

The Studio UDF Manager (Register UDF tab) generates and executes the session-scoped DDL automatically. Fill in the three required fields below; leave all other settings at their defaults.

Required fields

Function Name
ml_score
Language
JAVA
Class / Module Path
com.codedstreams.otterstreams.sql.udf.MLInferenceFunction
Method (informational)
eval — Flink resolves via reflection; not written into DDL
Scope
Temporary (session-scoped)

Parameter entries — advisory only

#NameType to select in Studio dropdown
1featuresMAP<STRING,STRING>
2modelNameSTRING

DDL the Studio executes

SQL session DDL — auto-generated by Studio
-- Loads the shaded JAR onto the Gateway session classpath
ADD JAR '/var/www/udf-jars/otter-stream-sql-1.0.17-flink-udf.jar';

-- Registers the scalar function
CREATE TEMPORARY FUNCTION IF NOT EXISTS ml_score
AS 'com.codedstreams.otterstreams.sql.udf.MLInferenceFunction'
LANGUAGE JAVA;
!
Session scope. Both ADD JAR and CREATE TEMPORARY FUNCTION are bound to the active Gateway session. After any session renewal or disconnect you must re-register. The Studio UDF Manager handles this automatically when you click Register Function at the start of a new session.

SQL Usage Examples

Basic per-row inference

SQL
SELECT
    transaction_id,
    amount,
    ml_score(features, 'fraud-detector') AS fraud_score
FROM transactions;

Filtering without double evaluation

Use CROSS JOIN LATERAL to name the score once and reference it in both the SELECT list and the WHERE clause. This prevents Flink from invoking the UDF twice per row, which would double the inference cost.

SQL
INSERT INTO fraud_alerts
SELECT
    transaction_id,
    fraud_score,
    CASE
        WHEN fraud_score >= 0.85 THEN 'CRITICAL'
        WHEN fraud_score >= 0.65 THEN 'HIGH'
        ELSE                          'MEDIUM'
    END AS risk_tier,
    CURRENT_TIMESTAMP AS detected_at
FROM transactions
CROSS JOIN LATERAL (
    SELECT COALESCE(ml_score(features, 'fraud-detector'), 0.0) AS fraud_score
) scores
WHERE fraud_score >= 0.40;

Inline feature map construction

If the upstream producer does not pre-assemble a MAP column, build one inline using Flink's MAP[key, value, ...] constructor. Cast all values to STRING to satisfy the UDF type; the engine casts them back to numerics internally.

SQL
SELECT
    transaction_id,
    ml_score(
        MAP[
            'amount',           CAST(amount           AS STRING),
            'hour_of_day',      CAST(hour_of_day      AS STRING),
            'is_international', CAST(is_international AS STRING),
            'card_type',        card_type,
            'currency',         currency
        ],
        'fraud-detector'
    ) AS fraud_score
FROM transactions;

Table Connector — ml-inference

The ml-inference connector is discovered at runtime via Java SPI from META-INF/services/org.apache.flink.table.factories.Factory. Its primary role is to preload models from MinIO into ModelCache before any streaming query runs, so that the first rows processed do not incur model-download latency.

Preloading a TensorFlow model from MinIO

SQL
CREATE TEMPORARY TABLE fraud_model_source (
    score      DOUBLE,
    confidence DOUBLE
) WITH (
    'connector'           = 'ml-inference',
    'model.name'          = 'fraud-detector',
    'model.path'          = 's3a://ml-models/fraud-detector/v1/',
    'model.format'        = 'tensorflow-savedmodel',
    'model.s3.endpoint'   = 'http://minio:9000',
    'model.s3.access-key' = 'minioadmin',
    'model.s3.secret-key' = 'minioadmin',
    'model.s3.path-style' = 'true',
    'cache.enabled'       = 'true',
    'cache.ttl-minutes'   = '60'
);

-- DESCRIBE confirms the connector initialised without error.
-- If the MinIO download failed you will see the exception here
-- rather than silently at inference time.
DESCRIBE fraud_model_source;

Preloading an XGBoost model from MinIO

SQL
CREATE TEMPORARY TABLE anomaly_model_source (
    score DOUBLE
) WITH (
    'connector'           = 'ml-inference',
    'model.name'          = 'anomaly-detector',
    'model.path'          = 's3a://ml-models/anomaly-detector/v2/model.json',
    'model.format'        = 'xgboost-json',
    'model.s3.endpoint'   = 'http://minio:9000',
    'model.s3.access-key' = 'minioadmin',
    'model.s3.secret-key' = 'minioadmin',
    'model.s3.path-style' = 'true',
    'cache.enabled'       = 'true'
);

Lookup join (temporal join)

SQL
SELECT
    t.transaction_id,
    t.amount,
    p.score      AS fraud_score,
    p.confidence AS model_confidence
FROM transactions AS t
JOIN fraud_model_source FOR SYSTEM_TIME AS OF t.proc_time AS p
  ON t.feature_key = p.feature_key
WHERE p.score > 0.5;

For the full DDL option reference see API Reference → Connector DDL Options.

Building the Shaded JAR

The otter-stream-sql module ships as a thin JAR by default. When the Flink SQL Gateway registers MLInferenceFunction it reflects over all method signatures, including the throws InferenceException clause on eval(). If InferenceException is not on the Gateway classpath the JVM raises NoClassDefFoundError and the registration fails with a ValidationException: due to implementation errors.

The fix is to build a shaded JAR that bundles all runtime dependencies except Flink and SLF4J. Add the following plugin configuration to otter-stream-sql/pom.xml:

XML otter-stream-sql/pom.xml
<plugin>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-shade-plugin</artifactId>
  <version>${maven-shade-plugin.version}</version>
  <executions>
    <execution>
      <phase>package</phase>
      <goals><goal>shade</goal></goals>
      <configuration>
        <shadedArtifactAttached>true</shadedArtifactAttached>
        <shadedClassifierName>flink-udf</shadedClassifierName>
        <createDependencyReducedPom>false</createDependencyReducedPom>
        <artifactSet>
          <excludes>
            <!-- Provided by the Gateway at runtime -->
            <exclude>org.apache.flink:*</exclude>
            <!-- SLF4J API only; implementation supplied by Flink -->
            <exclude>org.slf4j:slf4j-api</exclude>
          </excludes>
        </artifactSet>
        <filters>
          <filter>
            <artifact>*:*</artifact>
            <excludes>
              <exclude>META-INF/*.SF</exclude>
              <exclude>META-INF/*.DSA</exclude>
              <exclude>META-INF/*.RSA</exclude>
            </excludes>
          </filter>
        </filters>
        <transformers>
          <!-- Merges SPI service files from all bundled JARs -->
          <transformer implementation=
            "org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
        </transformers>
      </configuration>
    </execution>
  </executions>
</plugin>

Build and verify

Shell
cd otter-stream-sql
mvn clean package -DskipTests

# All four lines must appear in the output:
jar tf target/otter-stream-sql-1.0.17-flink-udf.jar \
  | grep -E 'InferenceException|InferenceEngine|ModelCache|caffeine'
i
TensorFlow native binaries. tensorflow-core-platform includes platform-native shared libraries that add several hundred megabytes to the JAR. If you are not using TensorFlow-format models, exclude the module from the shade to keep the artifact small: add <exclude>org.tensorflow:*</exclude> to the <excludes> block.

Troubleshooting

The most common failure is NoClassDefFoundError: InferenceException during CREATE FUNCTION. Flink's reflection scanner walks every method signature on the UDF class, including exception types declared in throws clauses. Any class it cannot resolve causes the validation to abort with an implementation errors message.

Caused by: java.lang.NoClassDefFoundError: com/codedstream/otterstream/inference/exception/InferenceException at java.base/java.lang.Class.getDeclaredMethods0(Native Method) at org.apache.flink.table.functions.UserDefinedFunctionHelper .validateClass(UserDefinedFunctionHelper.java:439) Caused by: org.apache.flink.table.api.ValidationException: Could not register temporary catalog function 'default_catalog.default_database.ml_score' due to implementation errors.
1
Confirm the JAR is on the session classpath
SQL
SHOW JARS;
-- otter-stream-sql-1.0.17-flink-udf.jar must appear in the output

If the JAR is absent, navigate to the Studio Register UDF tab, click SHOW JARS, then enter the JAR path and click ADD JAR.

2
Verify the shaded JAR contains the missing class
Shell
jar tf otter-stream-sql-1.0.17-flink-udf.jar | grep InferenceException
# Expected output:
# com/codedstream/otterstream/inference/exception/InferenceException.class

If the class is absent you are using the plain thin JAR, not the *-flink-udf.jar shaded artifact. Rebuild with the shade plugin configuration shown above, then re-upload the correct file.

3
Drop and recreate a stale registration
SQL
DROP TEMPORARY FUNCTION IF EXISTS ml_score;

CREATE TEMPORARY FUNCTION ml_score
AS 'com.codedstreams.otterstreams.sql.udf.MLInferenceFunction'
LANGUAGE JAVA;
4
Confirm successful registration
SQL
SHOW USER FUNCTIONS;
-- ml_score must appear in the list

Error Reference

ErrorLikely causeFix
NoClassDefFoundError: InferenceException ml-inference-core not bundled in the JAR Rebuild with shade plugin; upload the *-flink-udf.jar classifier artifact, not the plain JAR
ClassNotFoundException: MLInferenceFunction ADD JAR was skipped, or the JAR path on the Gateway container is wrong Run SHOW JARS; use the Studio path chips to confirm the correct container path
already exists Function registered earlier in the same session under the same name Use IF NOT EXISTS, or DROP TEMPORARY FUNCTION then recreate
NoSuchMethodError (Scala) Scala binary version mismatch between the JAR and the Flink cluster Rebuild against Flink's Scala binary version — 2.12 for Flink 1.17
ml_score returns NULL for every row Model name not found in ModelCache; connector table not created yet Create the connector table to trigger MinIO download; verify the model name matches exactly
MinIO download fails silently Wrong endpoint, credentials, or bucket; path-style access not enabled Set 'model.s3.path-style' = 'true'; inspect TaskManager logs for the underlying exception