Quick Start Apache Flink Real-time ML 📊 Flink SQL Support

Getting Started with Otter Streams

Learn how to integrate real-time machine learning inference into your Apache Flink streaming applications

📋 What's Implemented

Complete Core Framework

Async inference, caching, metrics, and configuration system with multi-source model loading support.

🧠 Multi-Framework Support

ONNX, TensorFlow, PyTorch, XGBoost, and PMML with native execution engines.

📊 Flink SQL Integration

Full Flink SQL support with UDFs, table functions, and SQL API for ML inference.

Performance Features

Batch processing, async operations, intelligent caching, and GPU acceleration.

New Feature: Otter Streams now supports both DataStream and Flink SQL APIs!

📦 Installation

Clone and Build

bash
# Clone the repository
git clone https://github.com/martourez21/otter-streams.git
cd otter-streams

# Build the project
mvn clean install -DskipTests

# Or use the setup script
chmod +x setup.sh
./setup.sh

Maven Dependencies

xml
<!-- Core framework (required) -->
<dependency>
    <groupId>com.codedstreams</groupId>
    <artifactId>ml-inference-core</artifactId>
    <version>1.0.16</version>
</dependency>

<!-- Flink SQL Integration -->
<dependency>
    <groupId>com.codedstreams</groupId>
    <artifactId>otter-stream-sql</artifactId>
    <version>1.0.16</version>
</dependency>

<!-- Add framework-specific modules as needed -->
<dependency>
    <groupId>com.codedstreams</groupId>
    <artifactId>otter-stream-onnx</artifactId>
    <version>1.0.16</version>
</dependency>

🤖 Model Preparation

Export Your Model to ONNX (Recommended)

python
# PyTorch to ONNX
import torch
torch.onnx.export(
    model,
    dummy_input,
    "model.onnx",
    input_names=['input'],
    output_names=['output']
)

# TensorFlow to ONNX
import tf2onnx
model_proto, _ = tf2onnx.convert.from_keras(model, input_signature=spec)
with open("model.onnx", "wb") as f:
    f.write(model_proto.SerializeToString())

# Scikit-learn to ONNX
from skl2onnx import to_onnx
onnx_model = to_onnx(model, X[:1].astype(np.float32))
with open("model.onnx", "wb") as f:
    f.write(onnx_model.SerializeToString())

Supported Formats

Ready ONNX

.onnx files - Most compatible format

Ready TensorFlow

saved_model.pb files in saved_model directory

Ready PyTorch

.pt, .pth files (TorchScript)

Ready XGBoost

.model, .bin, .json files

🔧 Flink Integration

1

Set Up Dependencies

Add Otter Streams to your Flink project:

xml
<!-- pom.xml -->
<dependency>
    <groupId>com.codedstreams</groupId>
    <artifactId>ml-inference-core</artifactId>
    <version>1.0.16</version>
</dependency>
<dependency>
    <groupId>com.codedstreams</groupId>
    <artifactId>otter-stream-onnx</artifactId>
    <version>1.0.16</version>
</dependency>
2

Configure Your Model

Create an inference configuration with your model details:

java
InferenceConfig config = InferenceConfig.builder()
    .modelConfig(ModelConfig.builder()
        .modelId("fraud-detection")
        .modelPath("models/fraud_model.onnx")
        .format(ModelFormat.ONNX)
        .modelName("fraud_predictor")
        .modelVersion("1.0")
        .build())
    .batchSize(32)
    .timeout(5000)
    .maxRetries(3)
    .enableMetrics(true)
    .build();
3

Create Inference Function

Instantiate the async inference function with your configuration:

java
AsyncModelInferenceFunction<Map<String, Object>, InferenceResult> inferenceFunction =
    new AsyncModelInferenceFunction<>(
        config,
        cfg -> new OnnxInferenceEngine()
    );
4

Apply to DataStream

Integrate with your Flink streaming pipeline:

java
DataStream<InferenceResult> predictions = AsyncDataStream.unorderedWait(
    transactionStream,
    inferenceFunction,
    5000,                    // Async timeout
    TimeUnit.MILLISECONDS,
    100                      // Max concurrent requests
);
5

Process Results

Handle the inference results in your downstream processing:

java
predictions
    .filter(result -> result.getScore() > 0.8)
    .map(result -> createAlert(result))
    .addSink(new AlertSink());

Quick Test:

  • Run with a small sample dataset first
  • Verify model loads correctly
  • Check inference latency meets your requirements
  • Monitor Flink metrics for backpressure

Flink SQL Integration

New Feature: Use ML inference directly in Flink SQL with our new SQL module!

1

Add SQL Dependency

Include the SQL module in your project:

xml
<!-- pom.xml -->
<dependency>
    <groupId>com.codedstreams</groupId>
    <artifactId>otter-stream-sql</artifactId>
    <version>1.0.16</version>
</dependency>
2

Register ML Functions

Register ML inference functions in your Flink table environment:

sql
-- Register ML inference as a UDF
CREATE FUNCTION FraudDetector
AS 'com.codedstreams.otter.MLInferenceUDF'
USING JAR '/path/to/otter-stream-sql.jar';
3

Use in SQL Queries

Call ML inference functions directly in your SQL statements:

sql
-- Simple scalar function
SELECT
    transaction_id,
    amount,
    FraudDetector(amount, category, merchant) as fraud_score
FROM transactions
WHERE FraudDetector(amount, category, merchant) > 0.8;
4

Complete SQL Example

Create a complete streaming ML inference pipeline:

sql
-- Streaming ML inference pipeline
CREATE TABLE fraud_alerts (
    alert_id STRING,
    transaction_id STRING,
    fraud_score DOUBLE,
    detection_time TIMESTAMP(3)
) WITH (
    'connector' = 'kafka',
    'topic' = 'fraud-alerts',
    'format' = 'json'
);

INSERT INTO fraud_alerts
SELECT
    UUID() as alert_id,
    transaction_id,
    FraudDetector(amount, category, merchant) as fraud_score,
    CURRENT_TIMESTAMP as detection_time
FROM transactions
WHERE FraudDetector(amount, category, merchant) > 0.8;

Learn More:

For complete SQL documentation, visit the Flink SQL Integration Guide.

Examples

Run the Fraud Detection Example

bash
# Run DataStream example
mvn exec:java -pl otter-stream-examples \
  -Dexec.mainClass="com.flinkml.inference.examples.FraudDetectionExample"

# Run SQL example
mvn exec:java -pl otter-stream-examples \
  -Dexec.mainClass="com.flinkml.inference.examples.FraudDetectionSQLExample"

More Examples

🎭 Fraud Detection

Real-time transaction analysis with feature extraction and ML scoring.

🤖 Sentiment Analysis

Streaming text classification with ONNX models.

📈 Anomaly Detection

Time-series monitoring with automatic alerting.

📊 SQL ML Pipeline

Complete ML inference pipeline using Flink SQL API.

⚙️ Configuration

Tip: Most configuration options have sensible defaults. Start with the basics and customize as needed.

Complete Configuration Example

java
InferenceConfig config = InferenceConfig.builder()
    // Model configuration
    .modelConfig(ModelConfig.builder()
        .modelId("my-model")
        .modelPath("/path/to/model.onnx")
        .format(ModelFormat.ONNX)
        .modelName("production-model")
        .modelVersion("2.0")
        .build())

    // Performance settings
    .batchSize(64)
    .batchTimeout(Duration.ofMillis(100))
    .parallelism(4)
    .maxConcurrentRequests(200)
    .queueSize(1000)

    // Caching
    .enableCaching(true)
    .cacheSize(10000)
    .cacheTtl(Duration.ofMinutes(30))

    // Error handling
    .maxRetries(3)
    .retryDelay(Duration.ofMillis(100))
    .timeout(Duration.ofSeconds(10))

    // Monitoring
    .enableMetrics(true)
    .metricsPrefix("myapp.ml.inference")
    .collectLatencyMetrics(true)
    .collectThroughputMetrics(true)
    .collectErrorMetrics(true)
    .collectCacheMetrics(true)
    .metricsExportInterval(Duration.ofSeconds(30))

    // Resource management
    .gpuAcceleration(true)
    .cpuThreads(4)
    .memoryLimit(2048) // MB

    .build();

Common Configuration Scenarios

Development

Small batch size, no caching, minimal resources. Focus on correctness.

Production

Large batch size, caching enabled, full monitoring. Focus on performance.

SQL Workloads

Optimized for declarative queries with automatic batching and caching.

⚡ Performance Optimization

Important: Always measure performance in your specific environment before optimizing.

Step-by-Step Optimization Guide

1

Establish Performance Baseline

Start with default settings and measure current performance:

java
// Default configuration for baseline
InferenceConfig baselineConfig = InferenceConfig.builder()
    .modelConfig(modelConfig)
    .batchSize(1)      // Start with single inference
    .enableMetrics(true)
    .build();

Measure These Metrics:

  • Single inference latency
  • Memory usage per prediction
  • CPU utilization
  • Throughput with your expected load
2

Optimize Batch Processing

Find the optimal batch size for your model and hardware:

java
// Test different batch sizes
int[] batchSizes = {8, 16, 32, 64, 128};
for (int batchSize : batchSizes) {
    InferenceConfig testConfig = baselineConfig.toBuilder()
        .batchSize(batchSize)
        .batchTimeout(100) // ms
        .build();

    // Run performance test and record metrics
}

Optimization Targets:

  • Latency-sensitive: Smaller batches (8-32)
  • Throughput-focused: Larger batches (64-128)
  • Memory-constrained: Balance batch size with available RAM
3

Implement Caching

Enable caching for repeated patterns in your data:

java
// Enable caching with optimized settings
InferenceConfig cachedConfig = baselineConfig.toBuilder()
    .enableCaching(true)
    .cacheSize(10000)          // Cache capacity
    .cacheTtl(Duration.ofMinutes(30))
    .cacheKeyStrategy("feature-hash") // Choose based on your data
    .build();

When to Use Caching:

  • Deterministic models with repeated inputs
  • Expensive feature computation
  • High-throughput scenarios with pattern repetition
  • When cache hit rate exceeds 20%
4

Scale with Parallelism

Configure parallelism based on your cluster resources:

java
// Scale based on available resources
int availableCores = Runtime.getRuntime().availableProcessors();
InferenceConfig parallelConfig = baselineConfig.toBuilder()
    .parallelism(availableCores * 2)     // 2 threads per core
    .maxConcurrentRequests(availableCores * 100)
    .queueSize(10000)                    // Buffer for spikes
    .build();

Monitoring Points:

  • Watch for Flink backpressure indicators
  • Monitor CPU utilization (target 70-80%)
  • Check queue size doesn't keep growing
  • Adjust based on your specific workload
5

Advanced Tuning

Apply hardware-specific optimizations:

java
// Production-ready high-performance config
InferenceConfig productionConfig = baselineConfig.toBuilder()
    .batchSize(64)                    // From step 2 results
    .batchTimeout(Duration.ofMillis(50))
    .enableCaching(true)              // From step 3
    .parallelism(availableCores * 2)  // From step 4
    .gpuAcceleration(true)            // If GPU available
    .cpuThreads(availableCores)       // Match physical cores
    .memoryLimit(4096)                // MB, adjust based on model size
    .build();

Final Validation Checklist:

  • ✅ Performance meets SLA requirements
  • ✅ Memory usage stable under load
  • ✅ No persistent backpressure
  • ✅ Cache hit rate acceptable
  • ✅ Error rate below threshold (e.g., 0.1%)

📊 Monitoring & Observability

Available Metrics

⏱️ Latency Metrics

Track prediction time, preprocessing, and postprocessing.

  • inference.latency.p50
  • inference.latency.p95
  • inference.latency.p99

📈 Throughput Metrics

Monitor requests per second and batch efficiency.

  • inference.throughput.requests
  • inference.throughput.batches
  • inference.queue.size

🔍 Error Metrics

Track failures, retries, and timeout rates.

  • inference.errors.total
  • inference.errors.rate
  • inference.retries.count

📊 SQL Metrics

Monitor SQL query performance with ML inference.

  • sql.ml.queries.total
  • sql.ml.latency.avg
  • sql.ml.cache.hit_rate

🔍 Troubleshooting Guide

Need Help? Check the GitHub Issues or contact the developer.

Systematic Problem Resolution

1

Model Loading Failures High Priority

Symptoms: Application fails to start, model-related exceptions

Resolution Steps:

  1. Check file permissions: Ensure model file is readable
    ls -la models/fraud_model.onnx
  2. Verify model format: Confirm it's a supported format
    file models/fraud_model.onnx
  3. Test model locally: Use the test utility
    mvn exec:java -Dexec.mainClass="com.flinkml.inference.util.ModelValidator"
  4. Check dependencies: Ensure all required libraries are installed
2

Slow Inference Performance Medium Priority

Symptoms: High latency, low throughput, Flink backpressure

Optimization Sequence:

  1. Check baseline: Measure single inference time
  2. Increase batch size: Start with 32, test up to 128
  3. Enable caching: If you have repeated patterns
  4. Check hardware: CPU utilization, memory, I/O
  5. Profile model: Identify bottlenecks in model execution
3

Memory Issues High Priority

Symptoms: OutOfMemory errors, high GC activity, unstable performance

Memory Management Steps:

  1. Reduce batch size: Start with 16 or 32
  2. Monitor heap usage: Use JVM metrics or VisualVM
  3. Adjust Flink memory: Increase task manager memory
    taskmanager.memory.process.size: 4096m
  4. Enable result caching: Reduces repeated computations
  5. Check for memory leaks: Use heap dumps if persistent
4

Connection Problems Medium Priority

Symptoms: Remote inference failures, timeouts, network errors

Network Troubleshooting:

  1. Test connectivity: Ping the remote service
  2. Increase timeouts: Start with 30-second timeout
  3. Implement retries: Add exponential backoff
  4. Add circuit breaker: Prevent cascade failures
  5. Check firewall rules: Ensure ports are open

🚀 Next Steps

1️⃣ Try the Example

Run the fraud detection example to see the framework in action.

2️⃣ Load Your Model

Export your model to ONNX and test it with the framework.

3️⃣ Try SQL API

Experiment with ML inference using Flink SQL declarative API.

4️⃣ Contribute!

We welcome contributions! Check out our GitHub repository.

Ready to dive deeper? Explore the Architecture Documentation, SQL Integration Guide, and Example Gallery for more advanced use cases.