What's Implemented
Complete Core Framework
Async inference, caching, metrics, and configuration system with multi-source model loading support.
Multi-Framework Support
ONNX, TensorFlow, PyTorch, XGBoost, and PMML with native execution engines.
Flink SQL Integration
Full Flink SQL support with UDFs, table functions, and SQL API for ML inference.
Performance Features
Batch processing, async operations, intelligent caching, and GPU acceleration.
New Feature: Otter Streams now supports both DataStream and Flink SQL APIs! Check out the SQL documentation for details.
๐ฆ Installation
Clone and Build
# Clone the repository
git clone https://github.com/martourez21/otter-streams.git
cd otter-streams
# Build the project
mvn clean install -DskipTests
# Or use the setup script
chmod +x setup.sh
./setup.sh
Maven Dependencies
<!-- Core framework (required) -->
<dependency>
<groupId>com.codedstreams</groupId>
<artifactId>ml-inference-core</artifactId>
<version>1.0.16</version>
</dependency>
<!-- Flink SQL Integration -->
<dependency>
<groupId>com.codedstreams</groupId>
<artifactId>otter-stream-sql</artifactId>
<version>1.0.16</version>
</dependency>
<!-- Add framework-specific modules as needed -->
<dependency>
<groupId>com.codedstreams</groupId>
<artifactId>otter-stream-onnx</artifactId>
<version>1.0.16</version>
</dependency>
๐ค Model Preparation
Export Your Model to ONNX (Recommended)
# PyTorch to ONNX
import torch
torch.onnx.export(
model,
dummy_input,
"model.onnx",
input_names=['input'],
output_names=['output']
)
# TensorFlow to ONNX
import tf2onnx
model_proto, _ = tf2onnx.convert.from_keras(model, input_signature=spec)
with open("model.onnx", "wb") as f:
f.write(model_proto.SerializeToString())
# Scikit-learn to ONNX
from skl2onnx import to_onnx
onnx_model = to_onnx(model, X[:1].astype(np.float32))
with open("model.onnx", "wb") as f:
f.write(onnx_model.SerializeToString())
Supported Formats
Ready ONNX
.onnx files - Most compatible format
Ready TensorFlow
saved_model.pb files in saved_model directory
Ready PyTorch
.pt, .pth files (TorchScript)
Ready XGBoost
.model, .bin, .json files
๐ง Flink Integration
Set Up Dependencies
Add Otter Streams to your Flink project:
<!-- pom.xml -->
<dependency>
<groupId>com.codedstreams</groupId>
<artifactId>ml-inference-core</artifactId>
<version>1.0.16</version>
</dependency>
<dependency>
<groupId>com.codedstreams</groupId>
<artifactId>otter-stream-onnx</artifactId>
<version>1.0.16</version>
</dependency>
Configure Your Model
Create an inference configuration with your model details:
InferenceConfig config = InferenceConfig.builder()
.modelConfig(ModelConfig.builder()
.modelId("fraud-detection")
.modelPath("models/fraud_model.onnx")
.format(ModelFormat.ONNX)
.modelName("fraud_predictor")
.modelVersion("1.0")
.build())
.batchSize(32)
.timeout(5000)
.maxRetries(3)
.enableMetrics(true)
.build();
Create Inference Function
Instantiate the async inference function with your configuration:
AsyncModelInferenceFunction<Map<String, Object>, InferenceResult> inferenceFunction =
new AsyncModelInferenceFunction<>(
config,
cfg -> new OnnxInferenceEngine()
);
Apply to DataStream
Integrate with your Flink streaming pipeline:
DataStream<InferenceResult> predictions = AsyncDataStream.unorderedWait(
transactionStream,
inferenceFunction,
5000, // Async timeout
TimeUnit.MILLISECONDS,
100 // Max concurrent requests
);
Process Results
Handle the inference results in your downstream processing:
predictions
.filter(result -> result.getScore() > 0.8)
.map(result -> createAlert(result))
.addSink(new AlertSink());
Quick Test:
- Run with a small sample dataset first
- Verify model loads correctly
- Check inference latency meets your requirements
- Monitor Flink metrics for backpressure
New Feature: Use ML inference directly in Flink SQL with our new SQL module. See full SQL documentation for more examples.
Add SQL Dependency
Include the SQL module in your project:
<!-- pom.xml -->
<dependency>
<groupId>com.codedstreams</groupId>
<artifactId>otter-stream-sql</artifactId>
<version>1.0.16</version>
</dependency>
Register ML Functions
Register ML inference functions in your Flink table environment:
TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
// Register ML inference as a UDF
tEnv.createTemporarySystemFunction(
"ML_PREDICT",
new MLInferenceUDF("models/fraud_model.onnx")
);
// Or register as a table function
tEnv.createTemporarySystemFunction(
"ML_PREDICT_TABLE",
new MLInferenceTableFunction("models/fraud_model.onnx")
);
Use in SQL Queries
Call ML inference functions directly in your SQL statements:
-- Simple scalar function
SELECT
transaction_id,
amount,
ML_PREDICT(amount, category, merchant) as fraud_score
FROM transactions
WHERE ML_PREDICT(amount, category, merchant) > 0.8;
-- Table function for multiple outputs
SELECT
t.transaction_id,
p.score,
p.confidence,
p.reason
FROM transactions t,
LATERAL TABLE(ML_PREDICT_TABLE(t.amount, t.category, t.merchant)) AS p(score, confidence, reason)
WHERE p.score > 0.8;
Configure via SQL Hints
Configure inference behavior using SQL hints:
-- Configure batch size and timeout via hints
SELECT
/*+ INFERENCE_OPTIONS('batch_size'='64', 'timeout_ms'='5000') */
transaction_id,
ML_PREDICT(amount, category, merchant) as fraud_score
FROM transactions;
-- Use different model versions
SELECT
/*+ MODEL_CONFIG('model_version'='v2.1', 'enable_cache'='true') */
user_id,
ML_PREDICT(features) as prediction
FROM user_features;
Create Views with ML
Create materialized views with ML predictions for easy access:
-- Create view with real-time predictions
CREATE VIEW fraud_alerts AS
SELECT
transaction_id,
user_id,
amount,
ML_PREDICT(amount, category, merchant) as fraud_score,
CASE
WHEN ML_PREDICT(amount, category, merchant) > 0.9 THEN 'HIGH_RISK'
WHEN ML_PREDICT(amount, category, merchant) > 0.7 THEN 'MEDIUM_RISK'
ELSE 'LOW_RISK'
END as risk_level
FROM transactions;
-- Query the view
SELECT * FROM fraud_alerts WHERE risk_level = 'HIGH_RISK';
SQL Best Practices:
- Use batch inference for better performance with large datasets
- Enable caching for repeated predictions on similar inputs
- Monitor SQL plan to ensure optimal execution
- Consider materialized views for frequent ML queries
๐ Flink SQL Integration
Tip: The Flink SQL API provides a declarative way to use ML inference, making it easier to integrate with existing SQL-based workflows and tools.
Available SQL Functions
ML_PREDICT()
Scalar Function: Returns a single prediction value for each input row.
ML_PREDICT(feature1, feature2, ...) โ prediction
ML_PREDICT_TABLE()
Table Function: Returns multiple output columns for complex models.
ML_PREDICT_TABLE(features) โ (score, confidence, label)
ML_BATCH_PREDICT()
Aggregate Function: Batch prediction for windowed aggregations.
ML_BATCH_PREDICT(features) OVER (PARTITION BY ...)
Complete SQL Example
-- Register ML model as a catalog function
CREATE FUNCTION FraudDetector
AS 'com.codedstreams.otter.MLInferenceUDF'
USING JAR '/path/to/otter-stream-sql.jar';
-- Create streaming table
CREATE TABLE transactions (
transaction_id STRING,
user_id STRING,
amount DECIMAL(10, 2),
category STRING,
merchant STRING,
transaction_time TIMESTAMP(3),
WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'transactions',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
);
-- Real-time fraud detection with ML
CREATE TABLE fraud_alerts (
alert_id STRING,
transaction_id STRING,
user_id STRING,
amount DECIMAL(10, 2),
fraud_score DOUBLE,
detection_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'fraud-alerts',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
);
-- Streaming ML inference pipeline
INSERT INTO fraud_alerts
SELECT
UUID() as alert_id,
transaction_id,
user_id,
amount,
FraudDetector(amount, category, merchant) as fraud_score,
CURRENT_TIMESTAMP as detection_time
FROM transactions
WHERE FraudDetector(amount, category, merchant) > 0.8;
Learn More:
For complete SQL documentation, including UDF development, optimization, and best practices, visit the Flink SQL Integration Guide.
๐ก Examples
Run the Fraud Detection Example
# Run DataStream example
mvn exec:java -pl otter-stream-examples \
-Dexec.mainClass="com.flinkml.inference.examples.FraudDetectionExample"
# Run SQL example
mvn exec:java -pl otter-stream-examples \
-Dexec.mainClass="com.flinkml.inference.examples.FraudDetectionSQLExample"
More Examples
๐ญ Fraud Detection
Real-time transaction analysis with feature extraction and ML scoring.
๐ค Sentiment Analysis
Streaming text classification with ONNX models.
๐ Anomaly Detection
Time-series monitoring with automatic alerting.
๐ SQL ML Pipeline
Complete ML inference pipeline using Flink SQL API.
โ๏ธ Configuration
Tip: Most configuration options have sensible defaults. Start with the basics and customize as needed.
Complete Configuration Example
InferenceConfig config = InferenceConfig.builder()
// Model configuration
.modelConfig(ModelConfig.builder()
.modelId("my-model")
.modelPath("/path/to/model.onnx")
.format(ModelFormat.ONNX)
.modelName("production-model")
.modelVersion("2.0")
.build())
// Performance settings
.batchSize(64)
.batchTimeout(Duration.ofMillis(100))
.parallelism(4)
.maxConcurrentRequests(200)
.queueSize(1000)
// Caching
.enableCaching(true)
.cacheSize(10000)
.cacheTtl(Duration.ofMinutes(30))
// Error handling
.maxRetries(3)
.retryDelay(Duration.ofMillis(100))
.timeout(Duration.ofSeconds(10))
// Monitoring
.enableMetrics(true)
.metricsPrefix("myapp.ml.inference")
.collectLatencyMetrics(true)
.collectThroughputMetrics(true)
.collectErrorMetrics(true)
.collectCacheMetrics(true)
.metricsExportInterval(Duration.ofSeconds(30))
// Resource management
.gpuAcceleration(true)
.cpuThreads(4)
.memoryLimit(2048) // MB
.build();
Common Configuration Scenarios
๐งช Development
Small batch size, no caching, minimal resources. Focus on correctness.
๐ Production
Large batch size, caching enabled, full monitoring. Focus on performance.
๐ SQL Workloads
Optimized for declarative queries with automatic batching and caching.
โก Performance Optimization
Important: Always measure performance in your specific environment before optimizing.
Performance Checklist
๐ Batch Optimization
Find optimal batch size for your model (usually between 16-128).
- Start with batch size 32
- Measure latency vs throughput
- Adjust based on your SLA
๐พ Caching Strategy
Enable caching for repeated predictions with similar inputs.
- Result caching for deterministic models
- Feature caching for expensive preprocessing
- Model caching for frequent reloads
๐ SQL Optimization
Optimize SQL queries for ML inference.
- Use batch prediction functions
- Enable query plan caching
- Consider materialized views
Performance Tuning Example
// High-performance configuration
InferenceConfig config = InferenceConfig.builder()
.modelConfig(modelConfig)
// Batch settings for throughput
.batchSize(64)
.batchTimeout(Duration.ofMillis(50))
// Parallel execution
.parallelism(Runtime.getRuntime().availableProcessors())
.maxConcurrentRequests(500)
.queueSize(5000)
// Caching for repeated patterns
.enableCaching(true)
.cacheSize(50000)
.cacheTtl(Duration.ofHours(1))
// GPU acceleration if available
.gpuAcceleration(true)
// Memory optimization
.memoryLimit(4096)
.cpuThreads(8)
.build();
๐ Monitoring & Observability
Available Metrics
โฑ๏ธ Latency Metrics
Track prediction time, preprocessing, and postprocessing.
inference.latency.p50inference.latency.p95inference.latency.p99
๐ Throughput Metrics
Monitor requests per second and batch efficiency.
inference.throughput.requestsinference.throughput.batchesinference.queue.size
๐ Error Metrics
Track failures, retries, and timeout rates.
inference.errors.totalinference.errors.rateinference.retries.count
๐ SQL Metrics
Monitor SQL query performance with ML inference.
sql.ml.queries.totalsql.ml.latency.avgsql.ml.cache.hit_rate
Integration with Monitoring Systems
// Enable comprehensive monitoring
InferenceConfig config = InferenceConfig.builder()
.modelConfig(modelConfig)
// Enable all metrics
.enableMetrics(true)
.metricsPrefix("myapp.ml.inference")
// Detailed metrics collection
.collectLatencyMetrics(true)
.collectThroughputMetrics(true)
.collectErrorMetrics(true)
.collectCacheMetrics(true)
.collectMemoryMetrics(true)
.collectGpuMetrics(true)
.collectSqlMetrics(true)
// Export to Prometheus
.metricsExporter("prometheus")
.metricsExportInterval(Duration.ofSeconds(15))
// Alert thresholds
.alertOnHighLatency(Duration.ofMillis(1000))
.alertOnHighErrorRate(0.05) // 5%
.alertOnQueueBackpressure(1000)
.build();
๐ Troubleshooting Guide
Need Help? Check the GitHub Issues or contact the developer.
Common Issues and Solutions
Model Loading Failures High Priority
Symptoms: Application fails to start, model-related exceptions
Resolution Steps:
- Check file permissions: Ensure model file is readable
ls -la models/fraud_model.onnx - Verify model format: Confirm it's a supported format
file models/fraud_model.onnx - Test model locally: Use the test utility
mvn exec:java -Dexec.mainClass="com.flinkml.inference.util.ModelValidator" - Check dependencies: Ensure all required libraries are installed
SQL Function Errors Medium Priority
Symptoms: SQL queries fail with function not found or type mismatch errors
SQL Troubleshooting:
- Verify function registration: Ensure UDF is properly registered
- Check parameter types: Match SQL types with Java types
- Test function isolation: Test UDF outside of Flink first
- Review SQL plan: Use EXPLAIN to see function execution plan
Performance Issues Medium Priority
Symptoms: High latency, low throughput, Flink backpressure
Optimization Sequence:
- Check baseline: Measure single inference time
- Increase batch size: Start with 32, test up to 128
- Enable caching: If you have repeated patterns
- Check hardware: CPU utilization, memory, I/O
- Profile model: Identify bottlenecks in model execution
Memory Issues High Priority
Symptoms: OutOfMemory errors, high GC activity, unstable performance
Memory Management Steps:
- Reduce batch size: Start with 16 or 32
- Monitor heap usage: Use JVM metrics or VisualVM
- Adjust Flink memory: Increase task manager memory
taskmanager.memory.process.size: 4096m - Enable result caching: Reduces repeated computations
- Check for memory leaks: Use heap dumps if persistent
Debug Mode
// Enable debug logging
InferenceConfig config = InferenceConfig.builder()
.modelConfig(modelConfig)
// Debug settings
.enableDebug(true)
.logLevel("DEBUG")
.logPredictions(true) // Log input/output
.logLatencies(true) // Detailed timing
.logErrors(true) // Full error stack traces
.logCacheActivity(true) // Cache hits/misses
.logSqlQueries(true) // SQL query logging
// Performance profiling
.profileMode(true)
.profileSampleRate(0.01) // 1% of requests
.profileOutput("profiles/")
// Health checks
.healthCheckInterval(Duration.ofMinutes(5))
.enableSelfTest(true)
.build();
Next Steps
1๏ธโฃ Try the Example
Run the fraud detection example to see the framework in action.
2๏ธโฃ Load Your Model
Export your model to ONNX and test it with the framework.
3๏ธโฃ Try SQL API
Experiment with ML inference using Flink SQL declarative API.
4๏ธโฃ Contribute!
We welcome contributions! Check out our GitHub repository.
Ready to dive deeper? Explore the Architecture Documentation, SQL Integration Guide, and Example Gallery for more advanced use cases.