Getting Started
Prerequisites
Section titled “Prerequisites”- .NET 6.0 SDK or later
- NuGet Package Manager (integrated with Visual Studio or available via CLI)
- Apache Kafka (if using Cortex.Streams.Kafka)
- Apache Pulsar (if using Cortex.Streams.Pulsar)
Installation
Section titled “Installation”Cortex Data Framework is available through the NuGet Package Manager. You can easily add the necessary packages to your .NET project using the following methods:
Using the .NET CLI
Section titled “Using the .NET CLI”Open your terminal or command prompt and navigate to your project directory, then run the following commands to install the desired packages:
# Install Cortex.Streams
dotnet add package Cortex.Streams
# Install Cortex.States
dotnet add package Cortex.States2.1.2. Using the Package Manager Console in Visual Studio
Section titled “2.1.2. Using the Package Manager Console in Visual Studio”- Open your project in Visual Studio.
- Navigate to Tools > NuGet Package Manager > Package Manager Console.
- Run the following commands:
# Install Cortex.Streams
Install-Package Cortex.Streams
# Install Cortex.States
Install-Package Cortex.StatesCortex Data Framework makes it easy to set up and run real-time data processing pipelines. Below are some simple examples to get you started.
1. Creating a Stream
Section titled “1. Creating a Stream”var stream = StreamBuilder<int, int>.CreateNewStream("ExampleStream")
.Stream()
.Map(x => x * 2)
.Filter(x => x > 10)
.Sink(Console.WriteLine)
.Build();
stream.Start();
// emitting data to the stream
stream.Emit(2);2. Using State Stores
Section titled “2. Using State Stores”var stateStore = new RocksDbStateStore<string, int>("ExampleStateStore", "./data");
stateStore.Put("key1", 42);
Console.WriteLine(stateStore.Get("key1"));3. Telemetry Integration
Section titled “3. Telemetry Integration”var telemetryProvider = new OpenTelemetryProvider();
var stream = StreamBuilder<int, int>
.CreateNewStream("TelemetryStream")
.WithTelemetry(telemetryProvider)
.Stream()
.Map(x => x * 2)
.Sink(Console.WriteLine)
.Build();4. Real-Time Click Tracking
Section titled “4. Real-Time Click Tracking”Scenario: Track the number of clicks on different web pages in real-time and display the aggregated counts.
Steps:
1. Define the Event Class
public class ClickEvent
{
public string PageUrl { get; set; }
public DateTime Timestamp { get; set; }
}2. Build the Stream Pipeline
- Stream: Starts with the source operator.
- Filter: Filters events based on certain criteria.
- GroupBy: Groups events by PageUrl.
- Aggregate: Counts the number of clicks per page.
- Sink: Prints the total clicks per page.
static void Main(string[] args)
{
// Build the stream
var stream = StreamBuilder<ClickEvent, ClickEvent>.CreateNewStream("ClickStream")
.Stream()
.Filter(e => !string.IsNullOrEmpty(e.PageUrl))
.GroupBySilently(
e => e.PageUrl, // Key selector: group by PageUrl
stateStoreName: "ClickGroupStore")
.AggregateSilently<string, int>(
e => e.PageUrl, // Key selector for aggregation
(count, e) => count + 1, // Aggregation function: increment count
stateStoreName: "ClickAggregateStore")
.Sink(e =>
{
Console.WriteLine($"Page: {e.PageUrl}");
})
.Build();
// start the stream
stream.Start();Emitting some random events into the stream
// emit some events
var random = new Random();
var pages = new[] { "/home", "/about", "/contact", "/products" };
while (true)
{
var page = pages[random.Next(pages.Length)];
var click = new ClickEvent
{
PageUrl = page,
Timestamp = DateTime.UtcNow
};
stream.Emit(click);
Thread.Sleep(100); // Simulate click rate
}3. Access Aggregated Data
Retrieve and display the click counts from the state store
// Access the aggregate state store data
var aggregateStore = stream.GetStateStoreByName<InMemoryStateStore<string, int>>("ClickAggregateStore");
// Access the groupby state store data
var groupByStore = stream.GetStateStoreByName<InMemoryStateStore<string, List<ClickEvent>>>("ClickGroupStore")
if (aggregateStore != null)
{
Console.WriteLine("\nAggregated Click Counts:");
foreach (var kvp in aggregateStore.GetAll())
{
Console.WriteLine($"Page: {kvp.Key}, Clicks: {kvp.Value}");
}
}
else
{
Console.WriteLine("Aggregate state store not found.");
}