Class AsyncModelInferenceFunction<IN,OUT>
- java.lang.Object
-
- org.apache.flink.api.common.functions.AbstractRichFunction
-
- com.codedstream.otterstream.inference.function.AsyncModelInferenceFunction<IN,OUT>
-
- Type Parameters:
IN- input record type from Flink streamOUT- output record type to Flink stream
- All Implemented Interfaces:
Serializable,org.apache.flink.api.common.functions.Function,org.apache.flink.api.common.functions.RichFunction,org.apache.flink.streaming.api.functions.async.AsyncFunction<IN,OUT>
public class AsyncModelInferenceFunction<IN,OUT> extends org.apache.flink.api.common.functions.AbstractRichFunction implements org.apache.flink.streaming.api.functions.async.AsyncFunction<IN,OUT>Asynchronous function for performing ML inference in Apache Flink streams.This function enables non-blocking inference operations in Flink pipelines, allowing high throughput by processing multiple requests concurrently without blocking the Flink operator thread.
Key Features:
- Non-blocking async I/O for better throughput
- Automatic retry on failures
- Timeout handling
- Metrics collection
- Generic input/output transformation
Usage Example:
// Define your input type (e.g., sensor reading) DataStream<SensorReading> input = ...; // Configure inference ModelConfig modelConfig = ModelConfig.builder() .modelId("anomaly-detector") .modelPath("/models/anomaly.onnx") .format(ModelFormat.ONNX) .build(); InferenceConfig config = InferenceConfig.builder() .modelConfig(modelConfig) .batchSize(32) .timeout(Duration.ofSeconds(5)) .build(); // Create engine factory Function<InferenceConfig, InferenceEngine<?>> engineFactory = cfg -> new OnnxInferenceEngine(); // Apply async inference AsyncDataStream.unorderedWait( input, new AsyncModelInferenceFunction<>(config, engineFactory), 5000, // timeout TimeUnit.MILLISECONDS, 100 // capacity );Custom Feature Extraction:
Override
extractFeatures(Object)to customize how inputs are converted to model features:{@code public class CustomInferenceFunction extends AsyncModelInferenceFunction{ - Since:
- 1.0.0
- Author:
- Nestor Martourez, Sr Software and Data Streaming Engineer @ CodedStreams
- See Also:
AsyncDataStream, Serialized Form
-
-
Constructor Summary
Constructors Constructor Description AsyncModelInferenceFunction(InferenceConfig inferenceConfig, Function<InferenceConfig,InferenceEngine<?>> engineFactory)Constructs async inference function.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description voidasyncInvoke(IN input, org.apache.flink.streaming.api.functions.async.ResultFuture<OUT> resultFuture)Performs asynchronous inference on input record.protected Map<String,Object>extractFeatures(IN input)Extracts model input features from the input record.protected voidinitializeEngine()Initializes the inference engine lazily.voidtimeout(IN input, org.apache.flink.streaming.api.functions.async.ResultFuture<OUT> resultFuture)Called when inference timeout occurs.protected OUTtransformResult(IN input, InferenceResult result)Transforms inference result into output record.
-
-
-
Constructor Detail
-
AsyncModelInferenceFunction
public AsyncModelInferenceFunction(InferenceConfig inferenceConfig, Function<InferenceConfig,InferenceEngine<?>> engineFactory)
Constructs async inference function.- Parameters:
inferenceConfig- configuration for inference operationsengineFactory- factory function to create inference engine
-
-
Method Detail
-
asyncInvoke
public void asyncInvoke(IN input, org.apache.flink.streaming.api.functions.async.ResultFuture<OUT> resultFuture) throws Exception
Performs asynchronous inference on input record.
-
timeout
public void timeout(IN input, org.apache.flink.streaming.api.functions.async.ResultFuture<OUT> resultFuture) throws Exception
Called when inference timeout occurs.
-
initializeEngine
protected void initializeEngine() throws InferenceExceptionInitializes the inference engine lazily.Called on first use in each Flink TaskManager.
- Throws:
InferenceException- if initialization fails
-
extractFeatures
protected Map<String,Object> extractFeatures(IN input)
Extracts model input features from the input record.Override this to customize feature extraction for your use case.
- Parameters:
input- the input record- Returns:
- map of feature name to feature value
-
transformResult
protected OUT transformResult(IN input, InferenceResult result)
Transforms inference result into output record.Override this to customize result transformation for your use case.
- Parameters:
input- the original input recordresult- the inference result- Returns:
- transformed output record
-
-