Class AsyncModelInferenceFunction<IN,​OUT>

  • Type Parameters:
    IN - input record type from Flink stream
    OUT - output record type to Flink stream
    All Implemented Interfaces:
    Serializable, org.apache.flink.api.common.functions.Function, org.apache.flink.api.common.functions.RichFunction, org.apache.flink.streaming.api.functions.async.AsyncFunction<IN,​OUT>

    public class AsyncModelInferenceFunction<IN,​OUT>
    extends org.apache.flink.api.common.functions.AbstractRichFunction
    implements org.apache.flink.streaming.api.functions.async.AsyncFunction<IN,​OUT>
    Asynchronous function for performing ML inference in Apache Flink streams.

    This function enables non-blocking inference operations in Flink pipelines, allowing high throughput by processing multiple requests concurrently without blocking the Flink operator thread.

    Key Features:

    • Non-blocking async I/O for better throughput
    • Automatic retry on failures
    • Timeout handling
    • Metrics collection
    • Generic input/output transformation

    Usage Example:

    
     // Define your input type (e.g., sensor reading)
     DataStream<SensorReading> input = ...;
    
     // Configure inference
     ModelConfig modelConfig = ModelConfig.builder()
         .modelId("anomaly-detector")
         .modelPath("/models/anomaly.onnx")
         .format(ModelFormat.ONNX)
         .build();
    
     InferenceConfig config = InferenceConfig.builder()
         .modelConfig(modelConfig)
         .batchSize(32)
         .timeout(Duration.ofSeconds(5))
         .build();
    
     // Create engine factory
     Function<InferenceConfig, InferenceEngine<?>> engineFactory =
         cfg -> new OnnxInferenceEngine();
    
     // Apply async inference
     AsyncDataStream.unorderedWait(
         input,
         new AsyncModelInferenceFunction<>(config, engineFactory),
         5000,  // timeout
         TimeUnit.MILLISECONDS,
         100    // capacity
     );
     

    Custom Feature Extraction:

    Override extractFeatures(Object) to customize how inputs are converted to model features:

    {@code
     public class CustomInferenceFunction
         extends AsyncModelInferenceFunction {
    Since:
    1.0.0
    Author:
    Nestor Martourez, Sr Software and Data Streaming Engineer @ CodedStreams
    See Also:
    AsyncDataStream, Serialized Form
    • Constructor Detail

      • AsyncModelInferenceFunction

        public AsyncModelInferenceFunction​(InferenceConfig inferenceConfig,
                                           Function<InferenceConfig,​InferenceEngine<?>> engineFactory)
        Constructs async inference function.
        Parameters:
        inferenceConfig - configuration for inference operations
        engineFactory - factory function to create inference engine
    • Method Detail

      • asyncInvoke

        public void asyncInvoke​(IN input,
                                org.apache.flink.streaming.api.functions.async.ResultFuture<OUT> resultFuture)
                         throws Exception
        Performs asynchronous inference on input record.
        Specified by:
        asyncInvoke in interface org.apache.flink.streaming.api.functions.async.AsyncFunction<IN,​OUT>
        Parameters:
        input - the input record
        resultFuture - callback to complete with result
        Throws:
        Exception - if processing fails
      • timeout

        public void timeout​(IN input,
                            org.apache.flink.streaming.api.functions.async.ResultFuture<OUT> resultFuture)
                     throws Exception
        Called when inference timeout occurs.
        Specified by:
        timeout in interface org.apache.flink.streaming.api.functions.async.AsyncFunction<IN,​OUT>
        Parameters:
        input - the input record that timed out
        resultFuture - callback to complete with error
        Throws:
        Exception - if handling fails
      • initializeEngine

        protected void initializeEngine()
                                 throws InferenceException
        Initializes the inference engine lazily.

        Called on first use in each Flink TaskManager.

        Throws:
        InferenceException - if initialization fails
      • extractFeatures

        protected Map<String,​Object> extractFeatures​(IN input)
        Extracts model input features from the input record.

        Override this to customize feature extraction for your use case.

        Parameters:
        input - the input record
        Returns:
        map of feature name to feature value
      • transformResult

        protected OUT transformResult​(IN input,
                                      InferenceResult result)
        Transforms inference result into output record.

        Override this to customize result transformation for your use case.

        Parameters:
        input - the original input record
        result - the inference result
        Returns:
        transformed output record