Aggregate Operators
Description and Use Cases
Section titled “Description and Use Cases”The Aggregate Operator performs aggregation operations on data items grouped by a key. It maintains and updates an aggregate value for each key based on incoming data, enabling cumulative computations such as sums, averages, or custom aggregations.
Use Cases:
- Counting: Tracking the number of occurrences of each key.
- Summation: Calculating the total sum of values per key.
- Averaging: Computing the average value per key.
- Custom Aggregations: Implementing complex aggregation logic tailored to specific requirements.
Implementation Guide
Section titled “Implementation Guide”To implement the Aggregate Operator, follow these steps:
- Define the Key Selector and Aggregation Function:
- Key Selector: Determines how to group data items.
- Aggregation Function: Defines how to update the aggregate value based on incoming data.
- Configure the State Store:
- Use a state store (e.g.,
RocksDbStateStore) to maintain aggregate states.
- Use a state store (e.g.,
- Integrate the Operator into the Stream:
- Use the
Aggregatemethod provided by theStreamBuilderto add the operator to the pipeline.
- Use the
- Handle Telemetry (Optional):
- Configure telemetry to monitor aggregation metrics and performance.
Code Example
Section titled “Code Example”The following example demonstrates the Aggregate Operator by counting the number of occurrences of each word in a stream of strings.
using Cortex.States.RocksDb;
using Cortex.Streams;
using System;
class Program
{
static void Main(string[] args)
{
// Initialize a RocksDbStateStore for word counts
var wordCountStore = new RocksDbStateStore<string, int>("WordCountStore", "/path/to/rocksdb");
// Create and configure the stream with an Aggregate operator
var stream = StreamBuilder<string, string>.CreateNewStream("WordCountStream")
.Stream()
.AggregateSilently(
keySelector: word => word, // Group by the word itself
aggregateFunction: (currentCount, word) => currentCount + 1, // Increment count
stateStoreName: "WordCountStore",
stateStore: wordCountStore
)
.Sink(msg => Console.WriteLine($"Word: {msg}, processed")) // Output word counts
.Build();
// Start the stream
stream.Start();
// Emit data into the stream
var words = new[] { "apple", "banana", "apple", "orange", "banana", "apple" };
foreach (var word in words)
{
stream.Emit(word);
}
// Stop the stream after processing
stream.Stop();
}
}Explanation:
- State Store Initialization: A
RocksDbStateStorenamed"WordCountStore"is initialized to persist word counts. - Stream Configuration:
- Aggregate Operator: Groups incoming words and increments their counts.
- Data Emission: The stream processes the words, updating counts accordingly.
- Stream Lifecycle: The stream is started, data is emitted, and then the stream is stopped.