📋 What's Implemented
Complete Core Framework
Async inference, caching, metrics, and configuration system with multi-source model loading support.
Multi-Framework Support
ONNX, TensorFlow, PyTorch, XGBoost, and PMML with native execution engines.
Flink SQL Integration
Full Flink SQL support with UDFs, table functions, and SQL API for ML inference.
Performance Features
Batch processing, async operations, intelligent caching, and GPU acceleration.
New Feature: Otter Streams now supports both DataStream and Flink SQL APIs!
📦 Installation
Clone and Build
# Clone the repository
git clone https://github.com/martourez21/otter-streams.git
cd otter-streams
# Build the project
mvn clean install -DskipTests
# Or use the setup script
chmod +x setup.sh
./setup.sh
Maven Dependencies
<!-- Core framework (required) -->
<dependency>
<groupId>com.codedstreams</groupId>
<artifactId>ml-inference-core</artifactId>
<version>1.0.16</version>
</dependency>
<!-- Flink SQL Integration -->
<dependency>
<groupId>com.codedstreams</groupId>
<artifactId>otter-stream-sql</artifactId>
<version>1.0.16</version>
</dependency>
<!-- Add framework-specific modules as needed -->
<dependency>
<groupId>com.codedstreams</groupId>
<artifactId>otter-stream-onnx</artifactId>
<version>1.0.16</version>
</dependency>
🤖 Model Preparation
Export Your Model to ONNX (Recommended)
# PyTorch to ONNX
import torch
torch.onnx.export(
model,
dummy_input,
"model.onnx",
input_names=['input'],
output_names=['output']
)
# TensorFlow to ONNX
import tf2onnx
model_proto, _ = tf2onnx.convert.from_keras(model, input_signature=spec)
with open("model.onnx", "wb") as f:
f.write(model_proto.SerializeToString())
# Scikit-learn to ONNX
from skl2onnx import to_onnx
onnx_model = to_onnx(model, X[:1].astype(np.float32))
with open("model.onnx", "wb") as f:
f.write(onnx_model.SerializeToString())
Supported Formats
Ready ONNX
.onnx files - Most compatible format
Ready TensorFlow
saved_model.pb files in saved_model directory
Ready PyTorch
.pt, .pth files (TorchScript)
Ready XGBoost
.model, .bin, .json files
🔧 Flink Integration
Set Up Dependencies
Add Otter Streams to your Flink project:
<!-- pom.xml -->
<dependency>
<groupId>com.codedstreams</groupId>
<artifactId>ml-inference-core</artifactId>
<version>1.0.16</version>
</dependency>
<dependency>
<groupId>com.codedstreams</groupId>
<artifactId>otter-stream-onnx</artifactId>
<version>1.0.16</version>
</dependency>
Configure Your Model
Create an inference configuration with your model details:
InferenceConfig config = InferenceConfig.builder()
.modelConfig(ModelConfig.builder()
.modelId("fraud-detection")
.modelPath("models/fraud_model.onnx")
.format(ModelFormat.ONNX)
.modelName("fraud_predictor")
.modelVersion("1.0")
.build())
.batchSize(32)
.timeout(5000)
.maxRetries(3)
.enableMetrics(true)
.build();
Create Inference Function
Instantiate the async inference function with your configuration:
AsyncModelInferenceFunction<Map<String, Object>, InferenceResult> inferenceFunction =
new AsyncModelInferenceFunction<>(
config,
cfg -> new OnnxInferenceEngine()
);
Apply to DataStream
Integrate with your Flink streaming pipeline:
DataStream<InferenceResult> predictions = AsyncDataStream.unorderedWait(
transactionStream,
inferenceFunction,
5000, // Async timeout
TimeUnit.MILLISECONDS,
100 // Max concurrent requests
);
Process Results
Handle the inference results in your downstream processing:
predictions
.filter(result -> result.getScore() > 0.8)
.map(result -> createAlert(result))
.addSink(new AlertSink());
Quick Test:
- Run with a small sample dataset first
- Verify model loads correctly
- Check inference latency meets your requirements
- Monitor Flink metrics for backpressure
Flink SQL Integration
New Feature: Use ML inference directly in Flink SQL with our new SQL module!
Add SQL Dependency
Include the SQL module in your project:
<!-- pom.xml -->
<dependency>
<groupId>com.codedstreams</groupId>
<artifactId>otter-stream-sql</artifactId>
<version>1.0.16</version>
</dependency>
Register ML Functions
Register ML inference functions in your Flink table environment:
-- Register ML inference as a UDF
CREATE FUNCTION FraudDetector
AS 'com.codedstreams.otter.MLInferenceUDF'
USING JAR '/path/to/otter-stream-sql.jar';
Use in SQL Queries
Call ML inference functions directly in your SQL statements:
-- Simple scalar function
SELECT
transaction_id,
amount,
FraudDetector(amount, category, merchant) as fraud_score
FROM transactions
WHERE FraudDetector(amount, category, merchant) > 0.8;
Complete SQL Example
Create a complete streaming ML inference pipeline:
-- Streaming ML inference pipeline
CREATE TABLE fraud_alerts (
alert_id STRING,
transaction_id STRING,
fraud_score DOUBLE,
detection_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'fraud-alerts',
'format' = 'json'
);
INSERT INTO fraud_alerts
SELECT
UUID() as alert_id,
transaction_id,
FraudDetector(amount, category, merchant) as fraud_score,
CURRENT_TIMESTAMP as detection_time
FROM transactions
WHERE FraudDetector(amount, category, merchant) > 0.8;
Learn More:
For complete SQL documentation, visit the Flink SQL Integration Guide.
Examples
Run the Fraud Detection Example
# Run DataStream example
mvn exec:java -pl otter-stream-examples \
-Dexec.mainClass="com.flinkml.inference.examples.FraudDetectionExample"
# Run SQL example
mvn exec:java -pl otter-stream-examples \
-Dexec.mainClass="com.flinkml.inference.examples.FraudDetectionSQLExample"
More Examples
🎭 Fraud Detection
Real-time transaction analysis with feature extraction and ML scoring.
🤖 Sentiment Analysis
Streaming text classification with ONNX models.
📈 Anomaly Detection
Time-series monitoring with automatic alerting.
📊 SQL ML Pipeline
Complete ML inference pipeline using Flink SQL API.
⚙️ Configuration
Tip: Most configuration options have sensible defaults. Start with the basics and customize as needed.
Complete Configuration Example
InferenceConfig config = InferenceConfig.builder()
// Model configuration
.modelConfig(ModelConfig.builder()
.modelId("my-model")
.modelPath("/path/to/model.onnx")
.format(ModelFormat.ONNX)
.modelName("production-model")
.modelVersion("2.0")
.build())
// Performance settings
.batchSize(64)
.batchTimeout(Duration.ofMillis(100))
.parallelism(4)
.maxConcurrentRequests(200)
.queueSize(1000)
// Caching
.enableCaching(true)
.cacheSize(10000)
.cacheTtl(Duration.ofMinutes(30))
// Error handling
.maxRetries(3)
.retryDelay(Duration.ofMillis(100))
.timeout(Duration.ofSeconds(10))
// Monitoring
.enableMetrics(true)
.metricsPrefix("myapp.ml.inference")
.collectLatencyMetrics(true)
.collectThroughputMetrics(true)
.collectErrorMetrics(true)
.collectCacheMetrics(true)
.metricsExportInterval(Duration.ofSeconds(30))
// Resource management
.gpuAcceleration(true)
.cpuThreads(4)
.memoryLimit(2048) // MB
.build();
Common Configuration Scenarios
Development
Small batch size, no caching, minimal resources. Focus on correctness.
Production
Large batch size, caching enabled, full monitoring. Focus on performance.
SQL Workloads
Optimized for declarative queries with automatic batching and caching.
⚡ Performance Optimization
Important: Always measure performance in your specific environment before optimizing.
Step-by-Step Optimization Guide
Establish Performance Baseline
Start with default settings and measure current performance:
// Default configuration for baseline
InferenceConfig baselineConfig = InferenceConfig.builder()
.modelConfig(modelConfig)
.batchSize(1) // Start with single inference
.enableMetrics(true)
.build();
Measure These Metrics:
- Single inference latency
- Memory usage per prediction
- CPU utilization
- Throughput with your expected load
Optimize Batch Processing
Find the optimal batch size for your model and hardware:
// Test different batch sizes
int[] batchSizes = {8, 16, 32, 64, 128};
for (int batchSize : batchSizes) {
InferenceConfig testConfig = baselineConfig.toBuilder()
.batchSize(batchSize)
.batchTimeout(100) // ms
.build();
// Run performance test and record metrics
}
Optimization Targets:
- Latency-sensitive: Smaller batches (8-32)
- Throughput-focused: Larger batches (64-128)
- Memory-constrained: Balance batch size with available RAM
Implement Caching
Enable caching for repeated patterns in your data:
// Enable caching with optimized settings
InferenceConfig cachedConfig = baselineConfig.toBuilder()
.enableCaching(true)
.cacheSize(10000) // Cache capacity
.cacheTtl(Duration.ofMinutes(30))
.cacheKeyStrategy("feature-hash") // Choose based on your data
.build();
When to Use Caching:
- Deterministic models with repeated inputs
- Expensive feature computation
- High-throughput scenarios with pattern repetition
- When cache hit rate exceeds 20%
Scale with Parallelism
Configure parallelism based on your cluster resources:
// Scale based on available resources
int availableCores = Runtime.getRuntime().availableProcessors();
InferenceConfig parallelConfig = baselineConfig.toBuilder()
.parallelism(availableCores * 2) // 2 threads per core
.maxConcurrentRequests(availableCores * 100)
.queueSize(10000) // Buffer for spikes
.build();
Monitoring Points:
- Watch for Flink backpressure indicators
- Monitor CPU utilization (target 70-80%)
- Check queue size doesn't keep growing
- Adjust based on your specific workload
Advanced Tuning
Apply hardware-specific optimizations:
// Production-ready high-performance config
InferenceConfig productionConfig = baselineConfig.toBuilder()
.batchSize(64) // From step 2 results
.batchTimeout(Duration.ofMillis(50))
.enableCaching(true) // From step 3
.parallelism(availableCores * 2) // From step 4
.gpuAcceleration(true) // If GPU available
.cpuThreads(availableCores) // Match physical cores
.memoryLimit(4096) // MB, adjust based on model size
.build();
Final Validation Checklist:
- ✅ Performance meets SLA requirements
- ✅ Memory usage stable under load
- ✅ No persistent backpressure
- ✅ Cache hit rate acceptable
- ✅ Error rate below threshold (e.g., 0.1%)
📊 Monitoring & Observability
Available Metrics
⏱️ Latency Metrics
Track prediction time, preprocessing, and postprocessing.
inference.latency.p50inference.latency.p95inference.latency.p99
📈 Throughput Metrics
Monitor requests per second and batch efficiency.
inference.throughput.requestsinference.throughput.batchesinference.queue.size
🔍 Error Metrics
Track failures, retries, and timeout rates.
inference.errors.totalinference.errors.rateinference.retries.count
📊 SQL Metrics
Monitor SQL query performance with ML inference.
sql.ml.queries.totalsql.ml.latency.avgsql.ml.cache.hit_rate
🔍 Troubleshooting Guide
Need Help? Check the GitHub Issues or contact the developer.
Systematic Problem Resolution
Model Loading Failures High Priority
Symptoms: Application fails to start, model-related exceptions
Resolution Steps:
- Check file permissions: Ensure model file is readable
ls -la models/fraud_model.onnx - Verify model format: Confirm it's a supported format
file models/fraud_model.onnx - Test model locally: Use the test utility
mvn exec:java -Dexec.mainClass="com.flinkml.inference.util.ModelValidator" - Check dependencies: Ensure all required libraries are installed
Slow Inference Performance Medium Priority
Symptoms: High latency, low throughput, Flink backpressure
Optimization Sequence:
- Check baseline: Measure single inference time
- Increase batch size: Start with 32, test up to 128
- Enable caching: If you have repeated patterns
- Check hardware: CPU utilization, memory, I/O
- Profile model: Identify bottlenecks in model execution
Memory Issues High Priority
Symptoms: OutOfMemory errors, high GC activity, unstable performance
Memory Management Steps:
- Reduce batch size: Start with 16 or 32
- Monitor heap usage: Use JVM metrics or VisualVM
- Adjust Flink memory: Increase task manager memory
taskmanager.memory.process.size: 4096m - Enable result caching: Reduces repeated computations
- Check for memory leaks: Use heap dumps if persistent
Connection Problems Medium Priority
Symptoms: Remote inference failures, timeouts, network errors
Network Troubleshooting:
- Test connectivity: Ping the remote service
- Increase timeouts: Start with 30-second timeout
- Implement retries: Add exponential backoff
- Add circuit breaker: Prevent cascade failures
- Check firewall rules: Ensure ports are open
🚀 Next Steps
1️⃣ Try the Example
Run the fraud detection example to see the framework in action.
2️⃣ Load Your Model
Export your model to ONNX and test it with the framework.
3️⃣ Try SQL API
Experiment with ML inference using Flink SQL declarative API.
4️⃣ Contribute!
We welcome contributions! Check out our GitHub repository.
Ready to dive deeper? Explore the Architecture Documentation, SQL Integration Guide, and Example Gallery for more advanced use cases.