Custom Operators
While Cortex provides a variety of built-in operators, developers can create custom operators to extend the platform’s functionality and cater to specific processing needs.
Creating Custom Operators
Section titled “Creating Custom Operators”To create a custom operator, follow these steps:
- Implement the
IOperatorInterface:- Define the processing logic by implementing the
ProcessandSetNextmethods.
- Define the processing logic by implementing the
- Optionally Implement
IStatefulOperator:- If the operator needs to maintain state, implement the
IStatefulOperatorinterface.
- If the operator needs to maintain state, implement the
- Optionally Implement
ITelemetryEnabled:- For telemetry integration, implement the
ITelemetryEnabledinterface.
- For telemetry integration, implement the
- Integrate the Custom Operator into the Stream:
- Use the
Map,Filter, or other relevant methods to add the custom operator to the pipeline.
- Use the
Code Example: Custom Logging Operator
Section titled “Code Example: Custom Logging Operator”The following example demonstrates creating a custom operator that logs each data item processed.
using Cortex.Streams.Operators;
using Cortex.Telemetry;
using System;
public class LoggingOperator<T> : IOperator, ITelemetryEnabled
{
private IOperator _nextOperator;
private ITelemetryProvider _telemetryProvider;
private ICounter _logCounter;
public void SetTelemetryProvider(ITelemetryProvider telemetryProvider)
{
_telemetryProvider = telemetryProvider;
if (_telemetryProvider != null)
{
var metrics = _telemetryProvider.GetMetricsProvider();
_logCounter = metrics.CreateCounter($"logging_operator_processed_{typeof(T).Name}", "Number of items processed by LoggingOperator");
}
}
public void Process(object input)
{
T data = (T)input;
Console.WriteLine($"LoggingOperator: Processing data - {data}");
_logCounter?.Increment();
_nextOperator?.Process(input);
}
public void SetNext(IOperator nextOperator)
{
_nextOperator = nextOperator;
if (_nextOperator is ITelemetryEnabled telemetryEnabled)
{
telemetryEnabled.SetTelemetryProvider(_telemetryProvider);
}
}
}Integrating the Custom Operator:
using Cortex.Streams;
using Cortex.Streams.Extensions; // Namespace where StreamBuilderExtensions is defined
using System;
class Program
{
static void Main(string[] args)
{
// Initialize the custom logging operator
var loggingOperator = new LoggingOperator<string>();
// Create and configure the stream with Map, LoggingOperator, and Sink using the extension method
var stream = StreamBuilder<string, string>.CreateNewStream("CustomOperatorStream")
.Stream()
.Map(message => $"Transformed: {message}") // Example transformation
.UseOperator<string, string, string>(loggingOperator) // Add custom LoggingOperator
.Sink(x => Console.WriteLine(x)) // Sink to console
.Build();
// Start the stream
stream.Start();
// Emit data into the stream
stream.Emit("CustomEvent1");
stream.Emit("CustomEvent2");
// Stop the stream after processing
stream.Stop();
}
}Output:
LoggingOperator: Processing data - Transformed: CustomEvent1
Transformed: CustomEvent1
LoggingOperator: Processing data - Transformed: CustomEvent2
Transformed: CustomEvent2Explanation:
- Custom Operator Definition: The
LoggingOperatorlogs each data item it processes and increments a telemetry counter. - Stream Configuration:
- Map Operator: Transforms incoming messages.
- Custom Logging Operator: Logs the transformed messages.
- Sink Operator: Outputs the final data to the console.
- Data Emission: Emits two custom events that pass through the transformation, logging, and sink stages.
- Stream Lifecycle: The stream is started, data is emitted and processed, and then the stream is stopped.