InferenceEngine<C>
Package: com.codedstream.otterstream.inference.engine
The top-level contract that all ML framework adapters implement. Parameterised on a configuration
record type C that carries framework-specific settings (e.g., ONNX execution
provider, TensorFlow signature name).
| Method | Return | Description |
|---|---|---|
initialize(C config) | void | Loads the model from the path specified in config. Called once. Throws InferenceException on failure. |
infer(Map<String,Object> inputs) | InferenceResult | Runs a single inference pass. Thread-safe in all built-in implementations. |
close() | void | Releases native resources (ONNX sessions, TF bundles, etc.). |
InferenceResult
Package: com.codedstream.otterstream.inference.model
Immutable value object returned by InferenceEngine.infer(). Carries the output
tensors as a named map, a success flag, and optional latency metadata.
| Method | Return | Description |
|---|---|---|
isSuccess() | boolean | False if inference threw internally and the error was swallowed. |
getOutputs() | Map<String,Object> | Output tensors keyed by name. Values are arrays (float[], double[], int[]) or scalars. |
getLatencyMs() | long | Wall-clock inference time in milliseconds. 0 if not instrumented. |
getErrorMessage() | String | Non-null only when isSuccess() is false. |
InferenceException
Package: com.codedstream.otterstream.inference.exception
Checked exception declared on InferenceEngine.infer() and
MLInferenceFunction.eval(). Wraps engine-specific errors (ONNX runtime errors,
TensorFlow session errors, network errors for remote engines).
NoClassDefFoundError.
Because it appears in the throws clause of eval(), the JVM must
be able to resolve it at function-registration time. It must be bundled in the shaded JAR.
ModelCache
Package: com.codedstreams.otterstreams.sql.loader
Thread-safe LRU singleton backed by Caffeine. Keyed by String modelName.
Entries expire after a configurable TTL (default 30 minutes). The singleton is initialised
on first access via ModelCache.getInstance(), which is called lazily inside
MLInferenceFunction.eval().
| Method | Return | Description |
|---|---|---|
getInstance() | ModelCache | Returns the process-level singleton. Safe to call from multiple task slots. |
getEngine(String name) | InferenceEngine<?> | Returns the cached engine or null if not present. |
putEngine(String name, InferenceEngine<?> engine) | void | Registers an initialised engine. Called by the connector during table creation. |
invalidate(String name) | void | Removes a single entry, triggering a fresh load on the next access. |
invalidateAll() | void | Clears the entire cache. Useful during session renewal. |
InferenceConfig
Package: com.codedstream.otterstream.inference.config
Builder-pattern configuration for the async DataStream function wrapper. Not used by the SQL
path directly — the SQL path reads config from DDL WITH options via
SqlInferenceConfig.
InferenceConfig config = InferenceConfig.builder()
.modelConfig(ModelConfig.builder()
.modelId("fraud-detector")
.modelPath("s3a://ml-models/fraud-detector/v1/")
.format(ModelFormat.TENSORFLOW_SAVEDMODEL)
.build())
.batchSize(32)
.timeout(Duration.ofSeconds(5))
.maxRetries(3)
.enableCaching(true)
.cacheSize(10_000)
.cacheTtl(Duration.ofMinutes(30))
.enableMetrics(true)
.metricsPrefix("myapp.ml")
.build();
ModelConfig
Package: com.codedstream.otterstream.inference.config
| Field | Type | Required | Description |
|---|---|---|---|
modelId | String | Yes | Logical name; used as the ModelCache key. |
modelPath | String | Yes | URI: local path, s3a://, http://, hdfs://. |
format | ModelFormat | Yes | Enum: ONNX, TENSORFLOW_SAVEDMODEL, XGBOOST_JSON, XGBOOST_BINARY, PMML, PYTORCH_TORCHSCRIPT. |
modelVersion | String | No | Informational; included in metrics tags. |
modelOptions | Map<String,Object> | No | Framework-specific overrides (e.g., ONNX execution providers). |
MLInferenceFunction
Package: com.codedstreams.otterstreams.sql.udf
Extends: org.apache.flink.table.functions.ScalarFunction
| Aspect | Detail |
|---|---|
| Entry method | Double eval(Map<String,Object> features, String modelName) throws InferenceException |
| Serialisation | modelCache field is transient; re-initialised via singleton on first call in each task slot. |
| Null handling | Returns null when the model is absent from cache, inference fails, outputs are empty, or the output value is non-numeric. |
| Thread safety | Safe — ModelCache and all engine implementations are thread-safe. |
| SQL name | Registered as ml_score by convention (configurable in CREATE FUNCTION). |
MLInferenceLookupFunction
Package: com.codedstreams.otterstreams.sql.connector
Extends: org.apache.flink.table.functions.TableFunction<RowData>
Used as the runtime provider for the ml-inference connector in lookup join mode.
The eval(Object... keys) method converts positional lookup keys to a feature map,
runs inference, and calls collect() with a two-field row containing
score (DOUBLE) and confidence (DOUBLE).
MLInferenceDynamicTableFactory
Package: com.codedstreams.otterstreams.sql.connector
Implements: org.apache.flink.table.factories.DynamicTableSourceFactory
Discovered via Java SPI from META-INF/services/org.apache.flink.table.factories.Factory.
Factory identifier: ml-inference. Creates MLInferenceDynamicTableSource
instances and triggers model loading from the configured source (MinIO, S3, HTTP, local) during
table creation.
SqlInferenceConfig
Package: com.codedstreams.otterstreams.sql.config
Parses Flink DDL WITH options into a typed configuration record. Created by
SqlInferenceConfig.fromOptions(Map<String,String> options).
| DDL option | Java field | Default |
|---|---|---|
model.name | modelName | required |
model.path | modelPath | required |
model.format | modelFormat | tensorflow-savedmodel |
model.version | modelVersion | latest |
model.s3.endpoint | s3Endpoint | null |
model.s3.access-key | s3AccessKey | null |
model.s3.secret-key | s3SecretKey | null |
model.s3.path-style | s3PathStyle | true |
cache.enabled | cacheEnabled | true |
cache.ttl-minutes | cacheTtlMinutes | 30 |
batch.size | batchSize | 1 |
async.enabled | asyncEnabled | false |
retry.max-attempts | maxRetries | 3 |
Connector DDL Options — ml-inference
Complete reference for the 'connector' = 'ml-inference' DDL options used in
CREATE TABLE statements.
| Option | Required | Example value | Notes |
|---|---|---|---|
connector | required | 'ml-inference' | Must be exactly this string. |
model.name | required | 'fraud-detector' | Key used in ml_score(..., 'fraud-detector'). |
model.path | required | 's3a://ml-models/fraud/v1/' | URI; supports s3a://, http://, file://. |
model.format | optional | 'tensorflow-savedmodel' | Also: onnx, xgboost-json, xgboost-binary, pmml. |
model.version | optional | 'v1' | Informational; included in log output. |
model.s3.endpoint | optional | 'http://minio:9000' | Required when using MinIO or non-AWS S3. |
model.s3.access-key | optional | 'minioadmin' | S3 access key ID. |
model.s3.secret-key | optional | 'minioadmin' | S3 secret access key. |
model.s3.path-style | optional | 'true' | Set true for MinIO. |
cache.enabled | optional | 'true' | Default true. |
cache.ttl-minutes | optional | '60' | Cache entry TTL. Default 30. |
batch.size | optional | '32' | Inference batch size. Default 1. |
async.enabled | optional | 'true' | Enables non-blocking inference. Default false. |
retry.max-attempts | optional | '3' | Max retries on transient errors. |