Getting Started with Otter Streams
Learn how to integrate real-time machine learning inference into your Apache Flink streaming applications in just 5 minutes
What's Implemented
Complete Core Framework
Async inference, caching, metrics, and configuration system with multi-source model loading support.
Multi-Framework Support
ONNX, TensorFlow, PyTorch, XGBoost, and PMML with native execution engines.
Flexible Deployment
Local, HTTP, AWS SageMaker, and custom remote inference endpoints.
Performance Features
Batch processing, async operations, intelligent caching, and GPU acceleration.
📦 Installation
Clone and Build
# Clone the repository
git clone https://github.com/martourez21/otter-streams.git
cd otter-streams
# Build the project
mvn clean install -DskipTests
# Or use the setup script
chmod +x setup.sh
./setup.sh
Maven Dependencies
<!-- Core framework (required) -->
<dependency>
<groupId>com.codedstreams</groupId>
<artifactId>ml-inference-core</artifactId>
<version>1.0.0</version>
</dependency>
<!-- Add framework-specific modules as needed -->
<dependency>
<groupId>com.codedstreams</groupId>
<artifactId>otter-stream-onnx</artifactId>
<version>1.0.0</version>
</dependency>
🤖 Model Preparation
Export Your Model to ONNX (Recommended)
# PyTorch to ONNX
import torch
torch.onnx.export(
model,
dummy_input,
"model.onnx",
input_names=['input'],
output_names=['output']
)
# TensorFlow to ONNX
import tf2onnx
model_proto, _ = tf2onnx.convert.from_keras(model, input_signature=spec)
with open("model.onnx", "wb") as f:
f.write(model_proto.SerializeToString())
# Scikit-learn to ONNX
from skl2onnx import to_onnx
onnx_model = to_onnx(model, X[:1].astype(np.float32))
with open("model.onnx", "wb") as f:
f.write(onnx_model.SerializeToString())
Supported Formats
Ready ONNX
.onnx files - Most compatible format
Ready TensorFlow
saved_model.pb files in saved_model directory
Ready PyTorch
.pt, .pth files (TorchScript)
Ready XGBoost
.model, .bin, .json files
🔧 Flink Integration
Basic Usage Example
import com.flinkml.inference.config.*;
import com.flinkml.inference.function.*;
import com.flinkml.inference.onnx.*;
// 1. Configure your model
InferenceConfig config = InferenceConfig.builder()
.modelConfig(ModelConfig.builder()
.modelId("fraud-detection")
.modelPath("models/fraud_model.onnx")
.format(ModelFormat.ONNX)
.modelName("fraud_predictor")
.modelVersion("1.0")
.build())
.batchSize(32)
.timeout(5000)
.maxRetries(3)
.enableMetrics(true)
.build();
// 2. Create inference function
AsyncModelInferenceFunction<Map<String, Object>, InferenceResult> inferenceFunction =
new AsyncModelInferenceFunction<>(
config,
cfg -> new OnnxInferenceEngine()
);
// 3. Apply to Flink stream
DataStream<InferenceResult> predictions = AsyncDataStream.unorderedWait(
transactionStream,
inferenceFunction,
5000, // Async timeout
TimeUnit.MILLISECONDS,
100 // Max concurrent requests
);
💡 Examples
Run the Fraud Detection Example
# Run the fraud detection example
mvn exec:java -pl otter-stream-examples \
-Dexec.mainClass="com.flinkml.inference.examples.FraudDetectionExample"
More Examples
🎭 Fraud Detection
Real-time transaction analysis with feature extraction and ML scoring.
🤖 Sentiment Analysis
Streaming text classification with ONNX models.
📈 Anomaly Detection
Time-series monitoring with automatic alerting.
Next Steps
1️⃣ Try the Example
Run the fraud detection example to see the framework in action.
2️⃣ Load Your Model
Export your model to ONNX and test it with the framework.
3️⃣ Integrate with Flink
Add ML inference to your existing Flink streaming job.
4️⃣ Contribute!
We welcome contributions! Check out our GitHub repository.