Quick Start Apache Flink Real-time ML ๐Ÿ“Š Flink SQL Support

Getting Started with Otter Streams

Learn how to integrate real-time machine learning inference into your Apache Flink streaming applications with both DataStream and SQL APIs

What's Implemented

โœ… Complete Core Framework

Async inference, caching, metrics, and configuration system with multi-source model loading support.

๐Ÿง  Multi-Framework Support

ONNX, TensorFlow, PyTorch, XGBoost, and PMML with native execution engines.

๐Ÿ“Š Flink SQL Integration

Full Flink SQL support with UDFs, table functions, and SQL API for ML inference.

โšก Performance Features

Batch processing, async operations, intelligent caching, and GPU acceleration.

New Feature: Otter Streams now supports both DataStream and Flink SQL APIs! Check out the SQL documentation for details.

๐Ÿ“ฆ Installation

Clone and Build

bash
# Clone the repository
git clone https://github.com/martourez21/otter-streams.git
cd otter-streams

# Build the project
mvn clean install -DskipTests

# Or use the setup script
chmod +x setup.sh
./setup.sh

Maven Dependencies

xml
<!-- Core framework (required) -->
<dependency>
    <groupId>com.codedstreams</groupId>
    <artifactId>ml-inference-core</artifactId>
    <version>1.0.16</version>
</dependency>

<!-- Flink SQL Integration -->
<dependency>
    <groupId>com.codedstreams</groupId>
    <artifactId>otter-stream-sql</artifactId>
    <version>1.0.16</version>
</dependency>

<!-- Add framework-specific modules as needed -->
<dependency>
    <groupId>com.codedstreams</groupId>
    <artifactId>otter-stream-onnx</artifactId>
    <version>1.0.16</version>
</dependency>

๐Ÿค– Model Preparation

Export Your Model to ONNX (Recommended)

python
# PyTorch to ONNX
import torch
torch.onnx.export(
    model,
    dummy_input,
    "model.onnx",
    input_names=['input'],
    output_names=['output']
)

# TensorFlow to ONNX
import tf2onnx
model_proto, _ = tf2onnx.convert.from_keras(model, input_signature=spec)
with open("model.onnx", "wb") as f:
    f.write(model_proto.SerializeToString())

# Scikit-learn to ONNX
from skl2onnx import to_onnx
onnx_model = to_onnx(model, X[:1].astype(np.float32))
with open("model.onnx", "wb") as f:
    f.write(onnx_model.SerializeToString())

Supported Formats

Ready ONNX

.onnx files - Most compatible format

Ready TensorFlow

saved_model.pb files in saved_model directory

Ready PyTorch

.pt, .pth files (TorchScript)

Ready XGBoost

.model, .bin, .json files

๐Ÿ”ง Flink Integration

1

Set Up Dependencies

Add Otter Streams to your Flink project:

xml
<!-- pom.xml -->
<dependency>
    <groupId>com.codedstreams</groupId>
    <artifactId>ml-inference-core</artifactId>
    <version>1.0.16</version>
</dependency>
<dependency>
    <groupId>com.codedstreams</groupId>
    <artifactId>otter-stream-onnx</artifactId>
    <version>1.0.16</version>
</dependency>
2

Configure Your Model

Create an inference configuration with your model details:

java
InferenceConfig config = InferenceConfig.builder()
    .modelConfig(ModelConfig.builder()
        .modelId("fraud-detection")
        .modelPath("models/fraud_model.onnx")
        .format(ModelFormat.ONNX)
        .modelName("fraud_predictor")
        .modelVersion("1.0")
        .build())
    .batchSize(32)
    .timeout(5000)
    .maxRetries(3)
    .enableMetrics(true)
    .build();
3

Create Inference Function

Instantiate the async inference function with your configuration:

java
AsyncModelInferenceFunction<Map<String, Object>, InferenceResult> inferenceFunction =
    new AsyncModelInferenceFunction<>(
        config,
        cfg -> new OnnxInferenceEngine()
    );
4

Apply to DataStream

Integrate with your Flink streaming pipeline:

java
DataStream<InferenceResult> predictions = AsyncDataStream.unorderedWait(
    transactionStream,
    inferenceFunction,
    5000,                    // Async timeout
    TimeUnit.MILLISECONDS,
    100                      // Max concurrent requests
);
5

Process Results

Handle the inference results in your downstream processing:

java
predictions
    .filter(result -> result.getScore() > 0.8)
    .map(result -> createAlert(result))
    .addSink(new AlertSink());

Quick Test:

  • Run with a small sample dataset first
  • Verify model loads correctly
  • Check inference latency meets your requirements
  • Monitor Flink metrics for backpressure

New Feature: Use ML inference directly in Flink SQL with our new SQL module. See full SQL documentation for more examples.

1

Add SQL Dependency

Include the SQL module in your project:

xml
<!-- pom.xml -->
<dependency>
    <groupId>com.codedstreams</groupId>
    <artifactId>otter-stream-sql</artifactId>
    <version>1.0.16</version>
</dependency>
2

Register ML Functions

Register ML inference functions in your Flink table environment:

java
TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());

// Register ML inference as a UDF
tEnv.createTemporarySystemFunction(
    "ML_PREDICT",
    new MLInferenceUDF("models/fraud_model.onnx")
);

// Or register as a table function
tEnv.createTemporarySystemFunction(
    "ML_PREDICT_TABLE",
    new MLInferenceTableFunction("models/fraud_model.onnx")
);
3

Use in SQL Queries

Call ML inference functions directly in your SQL statements:

sql
-- Simple scalar function
SELECT
    transaction_id,
    amount,
    ML_PREDICT(amount, category, merchant) as fraud_score
FROM transactions
WHERE ML_PREDICT(amount, category, merchant) > 0.8;

-- Table function for multiple outputs
SELECT
    t.transaction_id,
    p.score,
    p.confidence,
    p.reason
FROM transactions t,
LATERAL TABLE(ML_PREDICT_TABLE(t.amount, t.category, t.merchant)) AS p(score, confidence, reason)
WHERE p.score > 0.8;
4

Configure via SQL Hints

Configure inference behavior using SQL hints:

sql
-- Configure batch size and timeout via hints
SELECT
    /*+ INFERENCE_OPTIONS('batch_size'='64', 'timeout_ms'='5000') */
    transaction_id,
    ML_PREDICT(amount, category, merchant) as fraud_score
FROM transactions;

-- Use different model versions
SELECT
    /*+ MODEL_CONFIG('model_version'='v2.1', 'enable_cache'='true') */
    user_id,
    ML_PREDICT(features) as prediction
FROM user_features;
5

Create Views with ML

Create materialized views with ML predictions for easy access:

sql
-- Create view with real-time predictions
CREATE VIEW fraud_alerts AS
SELECT
    transaction_id,
    user_id,
    amount,
    ML_PREDICT(amount, category, merchant) as fraud_score,
    CASE
        WHEN ML_PREDICT(amount, category, merchant) > 0.9 THEN 'HIGH_RISK'
        WHEN ML_PREDICT(amount, category, merchant) > 0.7 THEN 'MEDIUM_RISK'
        ELSE 'LOW_RISK'
    END as risk_level
FROM transactions;

-- Query the view
SELECT * FROM fraud_alerts WHERE risk_level = 'HIGH_RISK';

SQL Best Practices:

  • Use batch inference for better performance with large datasets
  • Enable caching for repeated predictions on similar inputs
  • Monitor SQL plan to ensure optimal execution
  • Consider materialized views for frequent ML queries

๐Ÿ“Š Flink SQL Integration

Tip: The Flink SQL API provides a declarative way to use ML inference, making it easier to integrate with existing SQL-based workflows and tools.

Available SQL Functions

ML_PREDICT()

Scalar Function: Returns a single prediction value for each input row.

ML_PREDICT(feature1, feature2, ...) โ†’ prediction

ML_PREDICT_TABLE()

Table Function: Returns multiple output columns for complex models.

ML_PREDICT_TABLE(features) โ†’ (score, confidence, label)

ML_BATCH_PREDICT()

Aggregate Function: Batch prediction for windowed aggregations.

ML_BATCH_PREDICT(features) OVER (PARTITION BY ...)

Complete SQL Example

sql
-- Register ML model as a catalog function
CREATE FUNCTION FraudDetector
AS 'com.codedstreams.otter.MLInferenceUDF'
USING JAR '/path/to/otter-stream-sql.jar';

-- Create streaming table
CREATE TABLE transactions (
    transaction_id STRING,
    user_id STRING,
    amount DECIMAL(10, 2),
    category STRING,
    merchant STRING,
    transaction_time TIMESTAMP(3),
    WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'transactions',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'json'
);

-- Real-time fraud detection with ML
CREATE TABLE fraud_alerts (
    alert_id STRING,
    transaction_id STRING,
    user_id STRING,
    amount DECIMAL(10, 2),
    fraud_score DOUBLE,
    detection_time TIMESTAMP(3)
) WITH (
    'connector' = 'kafka',
    'topic' = 'fraud-alerts',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'json'
);

-- Streaming ML inference pipeline
INSERT INTO fraud_alerts
SELECT
    UUID() as alert_id,
    transaction_id,
    user_id,
    amount,
    FraudDetector(amount, category, merchant) as fraud_score,
    CURRENT_TIMESTAMP as detection_time
FROM transactions
WHERE FraudDetector(amount, category, merchant) > 0.8;

Learn More:

For complete SQL documentation, including UDF development, optimization, and best practices, visit the Flink SQL Integration Guide.

๐Ÿ’ก Examples

Run the Fraud Detection Example

bash
# Run DataStream example
mvn exec:java -pl otter-stream-examples \
  -Dexec.mainClass="com.flinkml.inference.examples.FraudDetectionExample"

# Run SQL example
mvn exec:java -pl otter-stream-examples \
  -Dexec.mainClass="com.flinkml.inference.examples.FraudDetectionSQLExample"

More Examples

๐ŸŽญ Fraud Detection

Real-time transaction analysis with feature extraction and ML scoring.

๐Ÿค– Sentiment Analysis

Streaming text classification with ONNX models.

๐Ÿ“ˆ Anomaly Detection

Time-series monitoring with automatic alerting.

๐Ÿ“Š SQL ML Pipeline

Complete ML inference pipeline using Flink SQL API.

โš™๏ธ Configuration

Tip: Most configuration options have sensible defaults. Start with the basics and customize as needed.

Complete Configuration Example

java
InferenceConfig config = InferenceConfig.builder()
    // Model configuration
    .modelConfig(ModelConfig.builder()
        .modelId("my-model")
        .modelPath("/path/to/model.onnx")
        .format(ModelFormat.ONNX)
        .modelName("production-model")
        .modelVersion("2.0")
        .build())

    // Performance settings
    .batchSize(64)
    .batchTimeout(Duration.ofMillis(100))
    .parallelism(4)
    .maxConcurrentRequests(200)
    .queueSize(1000)

    // Caching
    .enableCaching(true)
    .cacheSize(10000)
    .cacheTtl(Duration.ofMinutes(30))

    // Error handling
    .maxRetries(3)
    .retryDelay(Duration.ofMillis(100))
    .timeout(Duration.ofSeconds(10))

    // Monitoring
    .enableMetrics(true)
    .metricsPrefix("myapp.ml.inference")
    .collectLatencyMetrics(true)
    .collectThroughputMetrics(true)
    .collectErrorMetrics(true)
    .collectCacheMetrics(true)
    .metricsExportInterval(Duration.ofSeconds(30))

    // Resource management
    .gpuAcceleration(true)
    .cpuThreads(4)
    .memoryLimit(2048) // MB

    .build();

Common Configuration Scenarios

๐Ÿงช Development

Small batch size, no caching, minimal resources. Focus on correctness.

๐Ÿš€ Production

Large batch size, caching enabled, full monitoring. Focus on performance.

๐Ÿ“Š SQL Workloads

Optimized for declarative queries with automatic batching and caching.

โšก Performance Optimization

Important: Always measure performance in your specific environment before optimizing.

Performance Checklist

๐Ÿ“Š Batch Optimization

Find optimal batch size for your model (usually between 16-128).

  • Start with batch size 32
  • Measure latency vs throughput
  • Adjust based on your SLA

๐Ÿ’พ Caching Strategy

Enable caching for repeated predictions with similar inputs.

  • Result caching for deterministic models
  • Feature caching for expensive preprocessing
  • Model caching for frequent reloads

๐Ÿ“Š SQL Optimization

Optimize SQL queries for ML inference.

  • Use batch prediction functions
  • Enable query plan caching
  • Consider materialized views

Performance Tuning Example

java
// High-performance configuration
InferenceConfig config = InferenceConfig.builder()
    .modelConfig(modelConfig)

    // Batch settings for throughput
    .batchSize(64)
    .batchTimeout(Duration.ofMillis(50))

    // Parallel execution
    .parallelism(Runtime.getRuntime().availableProcessors())
    .maxConcurrentRequests(500)
    .queueSize(5000)

    // Caching for repeated patterns
    .enableCaching(true)
    .cacheSize(50000)
    .cacheTtl(Duration.ofHours(1))

    // GPU acceleration if available
    .gpuAcceleration(true)

    // Memory optimization
    .memoryLimit(4096)
    .cpuThreads(8)

    .build();

๐Ÿ“Š Monitoring & Observability

Available Metrics

โฑ๏ธ Latency Metrics

Track prediction time, preprocessing, and postprocessing.

  • inference.latency.p50
  • inference.latency.p95
  • inference.latency.p99

๐Ÿ“ˆ Throughput Metrics

Monitor requests per second and batch efficiency.

  • inference.throughput.requests
  • inference.throughput.batches
  • inference.queue.size

๐Ÿ” Error Metrics

Track failures, retries, and timeout rates.

  • inference.errors.total
  • inference.errors.rate
  • inference.retries.count

๐Ÿ“Š SQL Metrics

Monitor SQL query performance with ML inference.

  • sql.ml.queries.total
  • sql.ml.latency.avg
  • sql.ml.cache.hit_rate

Integration with Monitoring Systems

java
// Enable comprehensive monitoring
InferenceConfig config = InferenceConfig.builder()
    .modelConfig(modelConfig)

    // Enable all metrics
    .enableMetrics(true)
    .metricsPrefix("myapp.ml.inference")

    // Detailed metrics collection
    .collectLatencyMetrics(true)
    .collectThroughputMetrics(true)
    .collectErrorMetrics(true)
    .collectCacheMetrics(true)
    .collectMemoryMetrics(true)
    .collectGpuMetrics(true)
    .collectSqlMetrics(true)

    // Export to Prometheus
    .metricsExporter("prometheus")
    .metricsExportInterval(Duration.ofSeconds(15))

    // Alert thresholds
    .alertOnHighLatency(Duration.ofMillis(1000))
    .alertOnHighErrorRate(0.05) // 5%
    .alertOnQueueBackpressure(1000)

    .build();

๐Ÿ” Troubleshooting Guide

Need Help? Check the GitHub Issues or contact the developer.

Common Issues and Solutions

1

Model Loading Failures High Priority

Symptoms: Application fails to start, model-related exceptions

Resolution Steps:

  1. Check file permissions: Ensure model file is readable
    ls -la models/fraud_model.onnx
  2. Verify model format: Confirm it's a supported format
    file models/fraud_model.onnx
  3. Test model locally: Use the test utility
    mvn exec:java -Dexec.mainClass="com.flinkml.inference.util.ModelValidator"
  4. Check dependencies: Ensure all required libraries are installed
2

SQL Function Errors Medium Priority

Symptoms: SQL queries fail with function not found or type mismatch errors

SQL Troubleshooting:

  1. Verify function registration: Ensure UDF is properly registered
  2. Check parameter types: Match SQL types with Java types
  3. Test function isolation: Test UDF outside of Flink first
  4. Review SQL plan: Use EXPLAIN to see function execution plan
3

Performance Issues Medium Priority

Symptoms: High latency, low throughput, Flink backpressure

Optimization Sequence:

  1. Check baseline: Measure single inference time
  2. Increase batch size: Start with 32, test up to 128
  3. Enable caching: If you have repeated patterns
  4. Check hardware: CPU utilization, memory, I/O
  5. Profile model: Identify bottlenecks in model execution
4

Memory Issues High Priority

Symptoms: OutOfMemory errors, high GC activity, unstable performance

Memory Management Steps:

  1. Reduce batch size: Start with 16 or 32
  2. Monitor heap usage: Use JVM metrics or VisualVM
  3. Adjust Flink memory: Increase task manager memory
    taskmanager.memory.process.size: 4096m
  4. Enable result caching: Reduces repeated computations
  5. Check for memory leaks: Use heap dumps if persistent

Debug Mode

java
// Enable debug logging
InferenceConfig config = InferenceConfig.builder()
    .modelConfig(modelConfig)

    // Debug settings
    .enableDebug(true)
    .logLevel("DEBUG")
    .logPredictions(true)        // Log input/output
    .logLatencies(true)          // Detailed timing
    .logErrors(true)             // Full error stack traces
    .logCacheActivity(true)      // Cache hits/misses
    .logSqlQueries(true)         // SQL query logging

    // Performance profiling
    .profileMode(true)
    .profileSampleRate(0.01)     // 1% of requests
    .profileOutput("profiles/")

    // Health checks
    .healthCheckInterval(Duration.ofMinutes(5))
    .enableSelfTest(true)

    .build();

Next Steps

1๏ธโƒฃ Try the Example

Run the fraud detection example to see the framework in action.

2๏ธโƒฃ Load Your Model

Export your model to ONNX and test it with the framework.

3๏ธโƒฃ Try SQL API

Experiment with ML inference using Flink SQL declarative API.

4๏ธโƒฃ Contribute!

We welcome contributions! Check out our GitHub repository.

Ready to dive deeper? Explore the Architecture Documentation, SQL Integration Guide, and Example Gallery for more advanced use cases.