Skip to content
GitHubDiscord

Window Operators Overview

Windowing is a fundamental concept in stream processing that allows you to group and aggregate unbounded streams of data into finite, manageable chunks. Cortex.Streams provides a comprehensive windowing system that enables real-time analytics, aggregations, and event processing.

In stream processing, data flows continuously and indefinitely. Windows help you:

  • Aggregate data over time: Calculate metrics like sums, averages, and counts for specific time periods
  • Detect patterns: Identify trends or anomalies within bounded time frames
  • Reduce memory usage: Process data in chunks rather than keeping all data in memory
  • Generate timely insights: Emit results at regular intervals or when specific conditions are met

Cortex.Streams supports three fundamental window types:

Window TypeDescriptionUse Case
Tumbling WindowFixed-size, non-overlapping windowsHourly/daily reports, batch aggregations
Sliding WindowFixed-size, overlapping windowsMoving averages, trend detection
Session WindowDynamic windows based on activity gapsUser session analysis, activity tracking

Each window type comes in two variants:

Simple window operations with automatic triggering at window end:

  • TumblingWindow<TKey>(...)
  • SlidingWindow<TKey>(...)
  • SessionWindow<TKey>(...)

Full control over triggers, state modes, and late data handling:

  • AdvancedTumblingWindow<TKey>(...)
  • AdvancedSlidingWindow<TKey>(...)
  • AdvancedSessionWindow<TKey>(...)
using Cortex.Streams;
using Cortex.Streams.Operators.Windows;
using Cortex.States;

// Define a simple event
public record SensorReading(string SensorId, double Temperature, DateTime Timestamp);

// Create a stream with a 5-minute tumbling window
var stream = StreamBuilder<SensorReading>
    .CreateNewStream("Temperature Monitor")
    .Stream()
    .TumblingWindow<string>(
        keySelector: reading => reading.SensorId,
        timestampSelector: reading => reading.Timestamp,
        windowSize: TimeSpan.FromMinutes(5))
    .Map(windowResult => new
    {
        SensorId = windowResult.Key,
        AverageTemp = windowResult.Items.Average(r => r.Temperature),
        WindowStart = windowResult.WindowStart,
        WindowEnd = windowResult.WindowEnd
    })
    .Sink(result => Console.WriteLine(
        $"Sensor {result.SensorId}: Avg Temp = {result.AverageTemp:F2}°C " +
        $"[{result.WindowStart:HH:mm} - {result.WindowEnd:HH:mm}]"))
    .Build();

stream.Start();

All windows emit WindowResult<TKey, TValue> objects containing:

public class WindowResult<TKey, TValue>
{
    public TKey Key { get; }                    // Partition key
    public DateTime WindowStart { get; }        // Window start time
    public DateTime WindowEnd { get; }          // Window end time
    public IReadOnlyList<TValue> Items { get; } // Items in the window
    public WindowEmissionType EmissionType { get; } // Early, OnTime, Late, or Retraction
    public bool IsFinal { get; }                // True if window is closed
    public DateTime EmissionTime { get; }       // When result was emitted
    public int EmissionSequence { get; }        // Emission counter for updates
}

Determines how data is partitioned into separate windows:

// Separate windows per sensor
keySelector: reading => reading.SensorId

// Separate windows per user
keySelector: event => event.UserId

// Global window (single partition)
keySelector: _ => "global"

Extracts the event time used for window assignment:

// Use event's timestamp
timestampSelector: reading => reading.Timestamp

// Use current time (processing time)
timestampSelector: _ => DateTime.UtcNow

Windows use state stores to maintain data between events:

// In-memory store (default)
var store = new InMemoryStateStore<string, List<SensorReading>>();

// Or use persistent stores like RocksDB
var rocksStore = new RocksDbStateStore<string, List<SensorReading>>(
    path: "./window-state",
    name: "temperature-windows");