Skip to content
GitHubDiscord

Stream Performance & Async Processing

This guide covers the performance optimization features in Cortex.Streams, including buffered async processing, backpressure handling, and high-throughput configurations.


Cortex.Streams provides optional performance features for high-throughput scenarios:

FeatureDescription
Buffered ProcessingInternal bounded buffer with async consumers
Backpressure HandlingConfigurable strategies when buffer is full
Async EmissionNon-blocking EmitAsync and EmitAndForget
Batch ProcessingProcess multiple items in batches for throughput
Parallel ConsumersMultiple concurrent processing tasks
Buffer StatisticsReal-time monitoring of buffer state
  • ? Non-blocking emission - Producers don’t wait for pipeline completion
  • ? Backpressure control - Handle overload scenarios gracefully
  • ? Higher throughput - Batch processing and parallel consumers
  • ? Backward compatible - Opt-in features, existing code works unchanged

Without any performance configuration, streams work synchronously:

var stream = StreamBuilder<int>.CreateNewStream("BasicStream")
    .Stream()
    .Map(x => x * 2)
    .Sink(Console.WriteLine)
    .Build();

stream.Start();
stream.Emit(42);        // Blocks until processing completes
await stream.EmitAsync(42); // Runs on thread pool, still waits
stream.Stop();

Enable buffered processing for non-blocking emission:

var stream = StreamBuilder<int>.CreateNewStream("FastStream")
    .WithPerformanceOptions(new StreamPerformanceOptions
    {
        EnableBufferedProcessing = true,
        BufferCapacity = 10_000,
        BackpressureStrategy = BackpressureStrategy.Block
    })
    .Stream()
    .Map(x => x * 2)
    .Sink(ProcessItem)
    .Build();

stream.Start();

// Non-blocking - returns immediately after buffering
stream.EmitAndForget(42);

// Async - awaits buffering (not processing)
await stream.EmitAsync(42);

// Graceful shutdown - waits for buffer to drain
await stream.StopAsync();

PropertyTypeDefaultDescription
EnableBufferedProcessingboolfalseEnable internal buffer and async consumers
BufferCapacityint10,000Maximum items in buffer
BackpressureStrategyBackpressureStrategyBlockBehavior when buffer is full
BatchSizeint1Items per batch (1 = immediate processing)
BatchTimeoutTimeSpan100msMax wait time for batch to fill
ConcurrencyLevelint1Number of parallel consumer tasks
BlockingTimeoutTimeSpan30sTimeout for blocking operations
OnItemDroppedAction<object, DropReason>nullCallback when items are dropped
var options = new StreamPerformanceOptions
{
    EnableBufferedProcessing = true,
    BufferCapacity = 50_000,
    BackpressureStrategy = BackpressureStrategy.DropOldest,
    BatchSize = 100,
    BatchTimeout = TimeSpan.FromMilliseconds(50),
    ConcurrencyLevel = Environment.ProcessorCount,
    BlockingTimeout = TimeSpan.FromSeconds(60),
    OnItemDropped = (item, reason) => 
        Console.WriteLine($"Dropped: {item}, Reason: {reason}")
};

var stream = StreamBuilder<Event>.CreateNewStream("CustomStream")
    .WithPerformanceOptions(options)
    .Stream()
    // ... pipeline
    .Build();

When the internal buffer reaches capacity, the backpressure strategy determines behavior:

Waits for space to become available. Best for scenarios where data loss is unacceptable.

BackpressureStrategy = BackpressureStrategy.Block,
BlockingTimeout = TimeSpan.FromSeconds(30) // Throws after timeout

Behavior:

  • EmitAsync blocks (asynchronously) until space available
  • EmitAndForget blocks synchronously
  • Throws OperationCanceledException on timeout

Silently drops incoming items when buffer is full. Best for real-time data where latest data is more important.

BackpressureStrategy = BackpressureStrategy.DropNewest,
OnItemDropped = (item, reason) => metrics.IncrementDropped()

Behavior:

  • Writes always succeed (return true)
  • Excess items are silently dropped
  • Use OnItemDropped callback to track dropped items

Removes oldest items to make room for new ones. Best for keeping the most recent data.

BackpressureStrategy = BackpressureStrategy.DropOldest,
OnItemDropped = (item, reason) => LogDropped(item)

Behavior:

  • New items always accepted
  • Oldest buffered items are evicted
  • Callback receives evicted items

Throws BufferFullException when buffer is full. Best for explicit failure handling.

BackpressureStrategy = BackpressureStrategy.ThrowException

Behavior:

  • EmitAndForget throws BufferFullException
  • EmitAsync throws BufferFullException
  • Caller must handle the exception
try
{
    stream.EmitAndForget(item);
}
catch (BufferFullException ex)
{
    Console.WriteLine($"Buffer full! Capacity: {ex.BufferCapacity}");
    // Implement retry logic or alternative handling
}

Blocks until the entire pipeline processes the item. Unchanged from previous behavior.

stream.Emit(item); // Blocks until Sink completes

Asynchronously emits an item:

  • Without buffering: Runs pipeline on thread pool, awaits completion
  • With buffering: Awaits buffer space, returns when buffered (not processed)
await stream.EmitAsync(item);
await stream.EmitAsync(item, cancellationToken);

Fire-and-forget emission. Requires buffered processing enabled.

bool accepted = stream.EmitAndForget(item);
// Returns immediately
// accepted = true if buffered, false if dropped (DropNewest/DropOldest)
// Throws BufferFullException if strategy is ThrowException

Efficiently emit multiple items:

var items = Enumerable.Range(1, 1000);
await stream.EmitBatchAsync(items);
await stream.EmitBatchAsync(items, cancellationToken);

Returns real-time buffer metrics (only when buffered processing is enabled):

BufferStatistics stats = stream.GetBufferStatistics();

if (stats != null)
{
    Console.WriteLine($"Current Count: {stats.CurrentCount}");
    Console.WriteLine($"Capacity: {stats.Capacity}");
    Console.WriteLine($"Utilization: {stats.UtilizationPercent:F1}%");
    Console.WriteLine($"Total Enqueued: {stats.TotalEnqueued}");
    Console.WriteLine($"Total Processed: {stats.TotalProcessed}");
    Console.WriteLine($"Total Dropped: {stats.TotalDropped}");
}
PropertyTypeDescription
CurrentCountintItems currently in buffer
CapacityintMaximum buffer capacity
TotalEnqueuedlongTotal items added since start
TotalProcessedlongTotal items successfully processed
TotalDroppedlongTotal items dropped due to backpressure
UtilizationPercentdoubleCurrent buffer utilization (0-100)
// Periodic monitoring
var timer = new Timer(_ =>
{
    var stats = stream.GetBufferStatistics();
    if (stats != null && stats.UtilizationPercent > 80)
    {
        logger.Warn($"Buffer utilization high: {stats.UtilizationPercent:F1}%");
    }
}, null, TimeSpan.Zero, TimeSpan.FromSeconds(5));

Optimized for maximum throughput with parallel processing:

var options = StreamPerformanceOptions.HighThroughput(
    bufferCapacity: 100_000,      // Default: 100,000
    concurrencyLevel: 8           // Default: Environment.ProcessorCount
);

// Equivalent to:
new StreamPerformanceOptions
{
    EnableBufferedProcessing = true,
    BufferCapacity = 100_000,
    BackpressureStrategy = BackpressureStrategy.Block,
    BatchSize = 100,
    BatchTimeout = TimeSpan.FromMilliseconds(50),
    ConcurrencyLevel = 8,
    BlockingTimeout = TimeSpan.FromSeconds(60)
}

Best for: Log ingestion, metrics collection, high-volume event processing

Optimized for minimal latency with immediate processing:

var options = StreamPerformanceOptions.LowLatency(
    bufferCapacity: 10_000        // Default: 10,000
);

// Equivalent to:
new StreamPerformanceOptions
{
    EnableBufferedProcessing = true,
    BufferCapacity = 10_000,
    BackpressureStrategy = BackpressureStrategy.Block,
    BatchSize = 1,                // Process immediately
    ConcurrencyLevel = 1,         // Single consumer for ordering
    BlockingTimeout = TimeSpan.FromSeconds(30)
}

Best for: Real-time notifications, interactive applications

Optimized for scenarios where latest data matters most:

var options = StreamPerformanceOptions.DropOldest(
    bufferCapacity: 10_000,
    onItemDropped: (item, reason) => metrics.Track("dropped", item)
);

// Equivalent to:
new StreamPerformanceOptions
{
    EnableBufferedProcessing = true,
    BufferCapacity = 10_000,
    BackpressureStrategy = BackpressureStrategy.DropOldest,
    BatchSize = 1,
    ConcurrencyLevel = 1,
    OnItemDropped = onItemDropped
}

Best for: Stock tickers, sensor data, real-time dashboards


ScenarioRecommended Strategy
Financial transactionsBlock - Never lose data
Real-time metricsDropOldest - Keep latest
Log aggregationBlock with large buffer
Live video framesDropNewest - Skip if behind
Critical alertsThrowException - Explicit handling
// Rule of thumb: Buffer should hold 2-5 seconds of peak throughput
var peakItemsPerSecond = 10_000;
var bufferSeconds = 3;
var bufferCapacity = peakItemsPerSecond * bufferSeconds; // 30,000

3. Use Batch Processing for High Throughput

Section titled “3. Use Batch Processing for High Throughput”
// For I/O-bound sinks (database, network)
var options = new StreamPerformanceOptions
{
    EnableBufferedProcessing = true,
    BatchSize = 100,                    // Batch writes
    BatchTimeout = TimeSpan.FromMilliseconds(100)
};
// Set up alerts
if (stats.UtilizationPercent > 90)
{
    // Scale up consumers or reduce input rate
}

if (stats.TotalDropped > previousDropped)
{
    // Data loss occurring - investigate
}

Always use StopAsync to ensure all buffered items are processed:

// ? Bad - may lose buffered items
stream.Stop();

// ? Good - waits for buffer to drain
await stream.StopAsync();

// ? Good - with timeout
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await stream.StopAsync(cts.Token);

When using ConcurrencyLevel > 1, ensure your operators are thread-safe:

// ? Bad - not thread-safe
var list = new List<int>();
.Sink(x => list.Add(x))

// ? Good - thread-safe collection
var bag = new ConcurrentBag<int>();
.Sink(x => bag.Add(x))

// ? Good - atomic operations
var counter = 0;
.Sink(x => Interlocked.Increment(ref counter))

From Synchronous to Async (Minimal Change)

Section titled “From Synchronous to Async (Minimal Change)”

Existing code continues to work unchanged:

// Before (still works)
stream.Emit(item);

// After (same behavior, async wrapper)
await stream.EmitAsync(item);

Add performance options without changing pipeline logic:

// Before
var stream = StreamBuilder<int>.CreateNewStream("MyStream")
    .Stream()
    .Map(x => x * 2)
    .Sink(ProcessItem)
    .Build();

// After (just add WithPerformanceOptions)
var stream = StreamBuilder<int>.CreateNewStream("MyStream")
    .WithPerformanceOptions(StreamPerformanceOptions.LowLatency())
    .Stream()
    .Map(x => x * 2)
    .Sink(ProcessItem)
    .Build();

Replace synchronous stop with async for graceful shutdown:

// Before
stream.Stop();

// After
await stream.StopAsync();

MethodDescription
void Emit(TIn value)Synchronous emission (blocks until processed)
Task EmitAsync(TIn value, CancellationToken ct)Async emission
Task EmitBatchAsync(IEnumerable<TIn> values, CancellationToken ct)Batch emission
bool EmitAndForget(TIn value)Fire-and-forget (requires buffering)
void Start()Start the stream
void Stop()Stop immediately
Task StopAsync(CancellationToken ct)Graceful async stop
BufferStatistics GetBufferStatistics()Get buffer metrics (null if no buffering)
MethodDescription
WithPerformanceOptions(StreamPerformanceOptions)Configure performance options
WithErrorHandling(StreamExecutionOptions)Configure error handling
WithTelemetry(ITelemetryProvider)Configure telemetry
ExceptionWhen Thrown
BufferFullExceptionBuffer full with ThrowException strategy
OperationCanceledExceptionBlocking timeout or cancellation
InvalidOperationExceptionEmitAndForget without buffering enabled

using Cortex.Streams;
using Cortex.Streams.Performance;

public class OrderProcessor
{
    private readonly IStream<Order, ProcessedOrder> _stream;

    public OrderProcessor()
    {
        _stream = StreamBuilder<Order>.CreateNewStream("OrderProcessor")
            .WithPerformanceOptions(new StreamPerformanceOptions
            {
                EnableBufferedProcessing = true,
                BufferCapacity = 50_000,
                BackpressureStrategy = BackpressureStrategy.Block,
                ConcurrencyLevel = 4,
                OnItemDropped = (item, reason) => 
                    Logger.Warn($"Order dropped: {((Order)item).Id}")
            })
            .WithErrorHandling(new StreamExecutionOptions
            {
                ErrorHandlingStrategy = ErrorHandlingStrategy.Retry,
                MaxRetries = 3,
                RetryDelay = TimeSpan.FromSeconds(1)
            })
            .Stream()
            .Filter(order => order.IsValid)
            .Map(order => EnrichOrder(order))
            .Map(order => ProcessOrder(order))
            .Sink(order => SaveToDatabase(order))
            .Build();
    }

    public void Start() => _stream.Start();

    public async Task StopAsync() => await _stream.StopAsync();

    public async Task SubmitOrderAsync(Order order)
    {
        await _stream.EmitAsync(order);
    }

    public void SubmitOrderFireAndForget(Order order)
    {
        if (!_stream.EmitAndForget(order))
        {
            Logger.Warn($"Order {order.Id} was dropped");
        }
    }

    public void LogStats()
    {
        var stats = _stream.GetBufferStatistics();
        if (stats != null)
        {
            Logger.Info($"Buffer: {stats.CurrentCount}/{stats.Capacity} " +
                       $"({stats.UtilizationPercent:F1}%), " +
                       $"Processed: {stats.TotalProcessed}, " +
                       $"Dropped: {stats.TotalDropped}");
        }
    }
}