Quick Start Apache Flink Real-time ML

Getting Started with Otter Streams

Learn how to integrate real-time machine learning inference into your Apache Flink streaming applications in just 5 minutes

What's Implemented

Complete Core Framework

Async inference, caching, metrics, and configuration system with multi-source model loading support.

🧠 Multi-Framework Support

ONNX, TensorFlow, PyTorch, XGBoost, and PMML with native execution engines.

🌐 Flexible Deployment

Local, HTTP, AWS SageMaker, and custom remote inference endpoints.

Performance Features

Batch processing, async operations, intelligent caching, and GPU acceleration.

📦 Installation

Clone and Build

bash
# Clone the repository
git clone https://github.com/martourez21/otter-streams.git
cd otter-streams

# Build the project
mvn clean install -DskipTests

# Or use the setup script
chmod +x setup.sh
./setup.sh

Maven Dependencies

xml
<!-- Core framework (required) -->
<dependency>
    <groupId>com.codedstreams</groupId>
    <artifactId>ml-inference-core</artifactId>
    <version>1.0.0</version>
</dependency>

<!-- Add framework-specific modules as needed -->
<dependency>
    <groupId>com.codedstreams</groupId>
    <artifactId>otter-stream-onnx</artifactId>
    <version>1.0.0</version>
</dependency>

🤖 Model Preparation

Export Your Model to ONNX (Recommended)

python
# PyTorch to ONNX
import torch
torch.onnx.export(
    model,
    dummy_input,
    "model.onnx",
    input_names=['input'],
    output_names=['output']
)

# TensorFlow to ONNX
import tf2onnx
model_proto, _ = tf2onnx.convert.from_keras(model, input_signature=spec)
with open("model.onnx", "wb") as f:
    f.write(model_proto.SerializeToString())

# Scikit-learn to ONNX
from skl2onnx import to_onnx
onnx_model = to_onnx(model, X[:1].astype(np.float32))
with open("model.onnx", "wb") as f:
    f.write(onnx_model.SerializeToString())

Supported Formats

Ready ONNX

.onnx files - Most compatible format

Ready TensorFlow

saved_model.pb files in saved_model directory

Ready PyTorch

.pt, .pth files (TorchScript)

Ready XGBoost

.model, .bin, .json files

🔧 Flink Integration

Basic Usage Example

java
import com.flinkml.inference.config.*;
import com.flinkml.inference.function.*;
import com.flinkml.inference.onnx.*;

// 1. Configure your model
InferenceConfig config = InferenceConfig.builder()
    .modelConfig(ModelConfig.builder()
        .modelId("fraud-detection")
        .modelPath("models/fraud_model.onnx")
        .format(ModelFormat.ONNX)
        .modelName("fraud_predictor")
        .modelVersion("1.0")
        .build())
    .batchSize(32)
    .timeout(5000)
    .maxRetries(3)
    .enableMetrics(true)
    .build();

// 2. Create inference function
AsyncModelInferenceFunction<Map<String, Object>, InferenceResult> inferenceFunction =
    new AsyncModelInferenceFunction<>(
        config,
        cfg -> new OnnxInferenceEngine()
    );

// 3. Apply to Flink stream
DataStream<InferenceResult> predictions = AsyncDataStream.unorderedWait(
    transactionStream,
    inferenceFunction,
    5000,                    // Async timeout
    TimeUnit.MILLISECONDS,
    100                      // Max concurrent requests
);

💡 Examples

Run the Fraud Detection Example

bash
# Run the fraud detection example
mvn exec:java -pl otter-stream-examples \
  -Dexec.mainClass="com.flinkml.inference.examples.FraudDetectionExample"

More Examples

🎭 Fraud Detection

Real-time transaction analysis with feature extraction and ML scoring.

🤖 Sentiment Analysis

Streaming text classification with ONNX models.

📈 Anomaly Detection

Time-series monitoring with automatic alerting.

Next Steps

1️⃣ Try the Example

Run the fraud detection example to see the framework in action.

2️⃣ Load Your Model

Export your model to ONNX and test it with the framework.

3️⃣ Integrate with Flink

Add ML inference to your existing Flink streaming job.

4️⃣ Contribute!

We welcome contributions! Check out our GitHub repository.