OS
Otter Streams v1.0.17
Flink 1.15–1.20 GitHub

InferenceEngine<C>

Package: com.codedstream.otterstream.inference.engine

The top-level contract that all ML framework adapters implement. Parameterised on a configuration record type C that carries framework-specific settings (e.g., ONNX execution provider, TensorFlow signature name).

MethodReturnDescription
initialize(C config)voidLoads the model from the path specified in config. Called once. Throws InferenceException on failure.
infer(Map<String,Object> inputs)InferenceResultRuns a single inference pass. Thread-safe in all built-in implementations.
close()voidReleases native resources (ONNX sessions, TF bundles, etc.).

InferenceResult

Package: com.codedstream.otterstream.inference.model

Immutable value object returned by InferenceEngine.infer(). Carries the output tensors as a named map, a success flag, and optional latency metadata.

MethodReturnDescription
isSuccess()booleanFalse if inference threw internally and the error was swallowed.
getOutputs()Map<String,Object>Output tensors keyed by name. Values are arrays (float[], double[], int[]) or scalars.
getLatencyMs()longWall-clock inference time in milliseconds. 0 if not instrumented.
getErrorMessage()StringNon-null only when isSuccess() is false.

InferenceException

Package: com.codedstream.otterstream.inference.exception

Checked exception declared on InferenceEngine.infer() and MLInferenceFunction.eval(). Wraps engine-specific errors (ONNX runtime errors, TensorFlow session errors, network errors for remote engines).

!
This is the class that triggers the NoClassDefFoundError. Because it appears in the throws clause of eval(), the JVM must be able to resolve it at function-registration time. It must be bundled in the shaded JAR.

ModelCache

Package: com.codedstreams.otterstreams.sql.loader

Thread-safe LRU singleton backed by Caffeine. Keyed by String modelName. Entries expire after a configurable TTL (default 30 minutes). The singleton is initialised on first access via ModelCache.getInstance(), which is called lazily inside MLInferenceFunction.eval().

MethodReturnDescription
getInstance()ModelCacheReturns the process-level singleton. Safe to call from multiple task slots.
getEngine(String name)InferenceEngine<?>Returns the cached engine or null if not present.
putEngine(String name, InferenceEngine<?> engine)voidRegisters an initialised engine. Called by the connector during table creation.
invalidate(String name)voidRemoves a single entry, triggering a fresh load on the next access.
invalidateAll()voidClears the entire cache. Useful during session renewal.

InferenceConfig

Package: com.codedstream.otterstream.inference.config

Builder-pattern configuration for the async DataStream function wrapper. Not used by the SQL path directly — the SQL path reads config from DDL WITH options via SqlInferenceConfig.

Java
InferenceConfig config = InferenceConfig.builder()
    .modelConfig(ModelConfig.builder()
        .modelId("fraud-detector")
        .modelPath("s3a://ml-models/fraud-detector/v1/")
        .format(ModelFormat.TENSORFLOW_SAVEDMODEL)
        .build())
    .batchSize(32)
    .timeout(Duration.ofSeconds(5))
    .maxRetries(3)
    .enableCaching(true)
    .cacheSize(10_000)
    .cacheTtl(Duration.ofMinutes(30))
    .enableMetrics(true)
    .metricsPrefix("myapp.ml")
    .build();

ModelConfig

Package: com.codedstream.otterstream.inference.config

FieldTypeRequiredDescription
modelIdStringYesLogical name; used as the ModelCache key.
modelPathStringYesURI: local path, s3a://, http://, hdfs://.
formatModelFormatYesEnum: ONNX, TENSORFLOW_SAVEDMODEL, XGBOOST_JSON, XGBOOST_BINARY, PMML, PYTORCH_TORCHSCRIPT.
modelVersionStringNoInformational; included in metrics tags.
modelOptionsMap<String,Object>NoFramework-specific overrides (e.g., ONNX execution providers).

MLInferenceFunction

Package: com.codedstreams.otterstreams.sql.udf

Extends: org.apache.flink.table.functions.ScalarFunction

AspectDetail
Entry methodDouble eval(Map<String,Object> features, String modelName) throws InferenceException
SerialisationmodelCache field is transient; re-initialised via singleton on first call in each task slot.
Null handlingReturns null when the model is absent from cache, inference fails, outputs are empty, or the output value is non-numeric.
Thread safetySafe — ModelCache and all engine implementations are thread-safe.
SQL nameRegistered as ml_score by convention (configurable in CREATE FUNCTION).

MLInferenceLookupFunction

Package: com.codedstreams.otterstreams.sql.connector

Extends: org.apache.flink.table.functions.TableFunction<RowData>

Used as the runtime provider for the ml-inference connector in lookup join mode. The eval(Object... keys) method converts positional lookup keys to a feature map, runs inference, and calls collect() with a two-field row containing score (DOUBLE) and confidence (DOUBLE).

MLInferenceDynamicTableFactory

Package: com.codedstreams.otterstreams.sql.connector

Implements: org.apache.flink.table.factories.DynamicTableSourceFactory

Discovered via Java SPI from META-INF/services/org.apache.flink.table.factories.Factory. Factory identifier: ml-inference. Creates MLInferenceDynamicTableSource instances and triggers model loading from the configured source (MinIO, S3, HTTP, local) during table creation.

SqlInferenceConfig

Package: com.codedstreams.otterstreams.sql.config

Parses Flink DDL WITH options into a typed configuration record. Created by SqlInferenceConfig.fromOptions(Map<String,String> options).

DDL optionJava fieldDefault
model.namemodelNamerequired
model.pathmodelPathrequired
model.formatmodelFormattensorflow-savedmodel
model.versionmodelVersionlatest
model.s3.endpoints3Endpointnull
model.s3.access-keys3AccessKeynull
model.s3.secret-keys3SecretKeynull
model.s3.path-styles3PathStyletrue
cache.enabledcacheEnabledtrue
cache.ttl-minutescacheTtlMinutes30
batch.sizebatchSize1
async.enabledasyncEnabledfalse
retry.max-attemptsmaxRetries3

Connector DDL Options — ml-inference

Complete reference for the 'connector' = 'ml-inference' DDL options used in CREATE TABLE statements.

OptionRequiredExample valueNotes
connectorrequired'ml-inference'Must be exactly this string.
model.namerequired'fraud-detector'Key used in ml_score(..., 'fraud-detector').
model.pathrequired's3a://ml-models/fraud/v1/'URI; supports s3a://, http://, file://.
model.formatoptional'tensorflow-savedmodel'Also: onnx, xgboost-json, xgboost-binary, pmml.
model.versionoptional'v1'Informational; included in log output.
model.s3.endpointoptional'http://minio:9000'Required when using MinIO or non-AWS S3.
model.s3.access-keyoptional'minioadmin'S3 access key ID.
model.s3.secret-keyoptional'minioadmin'S3 secret access key.
model.s3.path-styleoptional'true'Set true for MinIO.
cache.enabledoptional'true'Default true.
cache.ttl-minutesoptional'60'Cache entry TTL. Default 30.
batch.sizeoptional'32'Inference batch size. Default 1.
async.enabledoptional'true'Enables non-blocking inference. Default false.
retry.max-attemptsoptional'3'Max retries on transient errors.