Cortex Data Framework is a robust, extensible platform designed to facilitate real-time data streaming, processing, and state management. It provides developers with a comprehensive suite of tools and libraries to build scalable, high-performance data pipelines tailored to diverse use cases. By abstracting underlying streaming technologies and state management solutions, Cortex Data Framework enables seamless integration, simplified development workflows, and enhanced maintainability for complex data-driven applications.
-
Modular Architecture: Comprises distinct, interchangeable modules for streaming, state management, and connectors, allowing developers to choose components that best fit their requirements.
-
Extensive Streaming Support: Natively integrates with popular streaming platforms like Kafka and Pulsar, ensuring reliable and efficient data ingestion and distribution.
-
Flexible State Management: Offers in-memory and persistent state storage options (e.g., RocksDB) to maintain stateful computations and enable advanced analytics.
-
Developer-Friendly APIs: Provides intuitive and expressive APIs for building, configuring, and managing data streams and state stores, reducing development overhead.
-
Thread-Safe Operations: Ensures data integrity and consistency through built-in thread safety mechanisms, suitable for concurrent processing environments.
-
Telemetry and Monitoring: Integrates with telemetry tools to monitor performance metrics, aiding in proactive maintenance and optimization.
- Real-time analytics and monitoring
- Event-driven architectures
- Stateful stream processing (e.g., aggregations, joins)
- Data enrichment and transformation pipelines
- Scalable data ingestion and distribution systems
- Modular Architecture: Distinct, interchangeable modules for streaming, state management, and connectors.
- Extensive Streaming Support: Integrates with popular streaming platforms like Kafka and Pulsar.
- Flexible State Management: In-memory and persistent state storage options (e.g., RocksDB).
- Developer-Friendly APIs: Intuitive and expressive APIs for building, configuring, and managing data streams.
- Thread-Safe Operations: Ensures data integrity and consistency in concurrent processing environments.
- Telemetry and Monitoring: Integrates with telemetry tools to monitor performance metrics.
-
Cortex.Streams: Core streaming capabilities for building data pipelines.
-
Cortex.Streams.Kafka: Integration with Apache Kafka for robust data streaming.
-
Cortex.Streams.Pulsar: Integration with Apache Pulsar for versatile messaging needs.
-
Cortex.Streams.RabbitMQ: Integration with RabbitMQ for versatile messaging needs.
-
Cortex.Streams.AWSSQS: Integration with Amazon SQS for messaging needs in the cloud.
-
Cortex.Streams.AzureServiceBus: Integration with Azure Messaging Service Bus for messaging needs in the cloud.
-
Cortex.Streams.AzureBlobStorage: Integration with Azure Blob Storage for sinking messages.
-
Cortex.Streams.S3: Integration with AWS S3 for sinking messages.
-
Cortex.Streams.Files: Implementation of File Source and Sink operators.
-
Cortex.States.RocksDb: Persistent state storage using RocksDB.
-
Cortex.Telemetry: Core library to add support for Tracing and Matrics.
-
Cortex.Telemetry.OpenTelemetry: Adds support for Open Telemetry.
- .NET 6.0 SDK or later
- NuGet Package Manager (integrated with Visual Studio or available via CLI)
- Apache Kafka (if using Cortex.Streams.Kafka)
- Apache Pulsar (if using Cortex.Streams.Pulsar)
Cortex Data Framework is available through the NuGet Package Manager. You can easily add the necessary packages to your .NET project using the following methods:
Open your terminal or command prompt and navigate to your project directory, then run the following commands to install the desired packages:
# Install Cortex.Streams
dotnet add package Cortex.Streams
# Install Cortex.States
dotnet add package Cortex.States
- Open your project in Visual Studio.
- Navigate to Tools > NuGet Package Manager > Package Manager Console.
- Run the following commands:
# Install Cortex.Streams
Install-Package Cortex.Streams
# Install Cortex.States
Install-Package Cortex.States
Cortex Data Framework makes it easy to set up and run real-time data processing pipelines. Below are some simple examples to get you started.
var stream = StreamBuilder<int, int>.CreateNewStream("ExampleStream")
.Map(x => x * 2)
.Filter(x => x > 10)
.Sink(Console.WriteLine)
.Build();
stream.Start();
// emitting data to the stream
stream.Emit(2);
var stateStore = new RocksDbStateStore<string, int>("ExampleStateStore", "./data");
stateStore.Put("key1", 42);
Console.WriteLine(stateStore.Get("key1"));
var telemetryProvider = new OpenTelemetryProvider();
var stream = StreamBuilder<int, int>
.CreateNewStream("TelemetryStream")
.WithTelemetry(telemetryProvider)
.Map(x => x * 2)
.Sink(Console.WriteLine)
.Build();
Scenario: Track the number of clicks on different web pages in real-time and display the aggregated counts.
Steps:
1. Define the Event Class
public class ClickEvent
{
public string PageUrl { get; set; }
public DateTime Timestamp { get; set; }
}
2. Build the Stream Pipeline
- Stream: Starts with the source operator.
- Filter: Filters events based on certain criteria.
- GroupBy: Groups events by PageUrl.
- Aggregate: Counts the number of clicks per page.
- Sink: Prints the total clicks per page.
static void Main(string[] args)
{
// Build the stream
var stream = StreamBuilder<ClickEvent, ClickEvent>.CreateNewStream("ClickStream")
.Stream()
.Filter(e => !string.IsNullOrEmpty(e.PageUrl))
.GroupBySilently(
e => e.PageUrl, // Key selector: group by PageUrl
stateStoreName: "ClickGroupStore")
.AggregateSilently<string, int>(
e => e.PageUrl, // Key selector for aggregation
(count, e) => count + 1, // Aggregation function: increment count
stateStoreName: "ClickAggregateStore")
.Sink(e =>
{
Console.WriteLine($"Page: {e.PageUrl}");
})
.Build();
// start the stream
stream.Start();
Emitting some random events into the stream
// emit some events
var random = new Random();
var pages = new[] { "/home", "/about", "/contact", "/products" };
while (true)
{
var page = pages[random.Next(pages.Length)];
var click = new ClickEvent
{
PageUrl = page,
Timestamp = DateTime.UtcNow
};
stream.Emit(click);
Thread.Sleep(100); // Simulate click rate
}
3. Access Aggregated Data
Retrieve and display the click counts from the state store
// Access the aggregate state store data
var aggregateStore = stream.GetStateStoreByName<InMemoryStateStore<string, int>>("ClickAggregateStore");
// Access the groupby state store data
var groupByStore = stream.GetStateStoreByName<InMemoryStateStore<string, List<ClickEvent>>>("ClickGroupStore")
if (aggregateStore != null)
{
Console.WriteLine("\nAggregated Click Counts:");
foreach (var kvp in aggregateStore.GetAll())
{
Console.WriteLine($"Page: {kvp.Key}, Clicks: {kvp.Value}");
}
}
else
{
Console.WriteLine("Aggregate state store not found.");
}
We welcome contributions from the community! Whether it's reporting bugs, suggesting features, or submitting pull requests, your involvement helps improve Cortex for everyone.
- Fork the Repository
- Create a Feature Branch
git checkout -b feature/YourFeature
- Commit Your Changes
git commit -m "Add your feature"
- Push to Your Fork
git push origin feature/YourFeature
- Open a Pull Request
Describe your changes and submit the pull request for review.
This project is licensed under the MIT License.
Cortex is an open-source project maintained by BuilderSoft. Your support helps us continue developing and improving Cortex. Consider sponsoring us to contribute to the future of resilient streaming platforms.
- Financial Contributions: Support us through GitHub Sponsors or other preferred platforms.
- Corporate Sponsorship: If your organization is interested in sponsoring Cortex, please contact us directly.
Contact Us: [email protected]
We'd love to hear from you! Whether you have questions, feedback, or need support, feel free to reach out.
- Email: [email protected]
- Website: https://buildersoft.io
- GitHub Issues: Cortex Data Framework Issues
- Join our Discord Community:
Thank you for using Cortex Data Framework! We hope it empowers you to build scalable and efficient data processing pipelines effortlessly.
Built with ❤️ by the Buildersoft team.