Skip to content
GitHubDiscord

Amazon S3

The Cortex.Streams.S3 package provides sink operators for writing stream data to Amazon S3. The operators support both per‑record uploads and batched uploads and use the AWS SDK’s TransferUtility for efficient transfers. JSON serialization is used by default.

Add the NuGet package:

dotnet add package Cortex.Streams.S3

You must also reference the AWS SDK packages (AWSSDK.S3) and configure credentials via the default AWS credential providers.

S3SinkOperator uploads each input object as an individual file in an S3 bucket. The constructor takes the bucket name, folder path inside the bucket, an IAmazonS3 client and an optional serializer. If no serializer is supplied, a default JSON serializer is used. The operator maintains a TransferUtility and must be started before processing.

When Process is called, the input is serialized, a unique file name (a GUID with .json extension) is generated and the data is uploaded using TransferUtility.UploadAsync. Errors during upload are caught and logged to the console. Stopping the operator disposes the TransferUtility and client.

using Cortex.Streams;
using Cortex.Streams.S3;
using Amazon.S3;

var s3Client = new AmazonS3Client();
var sink = new S3SinkOperator<MyEvent>(
    bucketName: "my-bucket",
    folderPath: "events",
    s3Client: s3Client);

var stream = StreamBuilder<MyEvent, MyEvent>
    .CreateNewStream("S3SingleSink")
    .Source()
    .Sink(sink)
    .Build();

stream.Start();
stream.Emit(new MyEvent { Id = 1, Value = "hello" });

S3SinkBulkOperator batches messages and writes them to S3 as JSON‑Lines files. The constructor takes the bucket name, folder path, IAmazonS3 client, optional serializer, and optional batchSize and flushInterval settings. When not supplied, the batch size defaults to 100 and the flush interval to 10 seconds. A timer flushes any buffered messages at the interval.

Each call to Process serializes the object and adds it to an in‑memory buffer. When the buffer reaches the batch size, the buffered objects are concatenated with new‑line separators and uploaded as a .jsonl file using TransferUtility. Stop flushes any remaining messages and disposes the timer.

using Cortex.Streams;
using Cortex.Streams.S3;
using Amazon.S3;

var s3Client = new AmazonS3Client();
var sink = new S3SinkBulkOperator<MyEvent>(
    bucketName: "my-bucket",
    folderPath: "events",
    s3Client: s3Client,
    batchSize: 50,
    flushInterval: TimeSpan.FromSeconds(15));

var stream = StreamBuilder<MyEvent, MyEvent>
    .CreateNewStream("S3BulkSink")
    .Source()
    .Sink(sink)
    .Build();

stream.Start();

for (int i = 0; i < 200; i++)
{
    stream.Emit(new MyEvent { Id = i, Value = $"event-{i}" });
}

// Flush any remaining records
sink.Stop();

By default both operators serialize objects to JSON. You can implement ISerializer<T> and pass it to the constructors to support CSV, XML or other formats.

Failures during upload are caught and printed to the console. Consider using an S3 dead‑letter bucket or re‑queueing mechanism to handle records that fail to upload.