From 6c9301aca2517c6245dbf1a7094b41f79dcf9c0c Mon Sep 17 00:00:00 2001 From: ahmar-husain Date: Sat, 29 Apr 2023 12:04:24 +0530 Subject: [PATCH 1/4] Fix Example --- .../Customer/Orders/CustomerOrder.cs | 83 +++++++++++++++++++ .../Orders/CustomerOrderKey.cs} | 22 ++--- .../Customer/Orders/Items.cs} | 36 +++++--- .../Nucleotidz.Consumer.Avro/ErrorHandler.cs | 15 +--- Example/Nucleotidz.Consumer.Avro/Program.cs | 12 +-- Example/Nucleotidz.Consumer.Avro/Worker.cs | 8 +- .../Customer/Orders/CustomerOrder.cs | 83 +++++++++++++++++++ .../Customer/Orders/CustomerOrderKey.cs} | 22 ++--- .../Customer/Orders/Items.cs | 70 ++++++++++++++++ Example/Nucleotidz.Producer.Avro/Program.cs | 8 +- Example/Nucleotidz.Producer.Avro/Worker.cs | 25 +++--- Example/Nucleotidz.Producer.Json/Animal.cs | 24 ------ .../Customer/Orders/CustomerOrder.cs | 83 +++++++++++++++++++ .../Customer/Orders/CustomerOrderKey.cs} | 22 ++--- .../Customer/Orders/Items.cs | 70 ++++++++++++++++ Example/Nucleotidz.Producer.Json/Program.cs | 5 +- Example/Nucleotidz.Producer.Json/Worker.cs | 22 +++-- .../Nucleotidz.Kafka.Consumer/Consumer.cs | 19 +---- .../ConsumerFactory.cs | 13 +-- 19 files changed, 505 insertions(+), 137 deletions(-) create mode 100644 Example/Nucleotidz.Consumer.Avro/Customer/Orders/CustomerOrder.cs rename Example/Nucleotidz.Consumer.Avro/{com/nucleotidz/employee/employeeMessage.cs => Customer/Orders/CustomerOrderKey.cs} (66%) rename Example/{Nucleotidz.Producer.Avro/com/nucleotidz/employee/key/employeeKey.cs => Nucleotidz.Consumer.Avro/Customer/Orders/Items.cs} (58%) create mode 100644 Example/Nucleotidz.Producer.Avro/Customer/Orders/CustomerOrder.cs rename Example/{Nucleotidz.Consumer.Avro/com/nucleotidz/employee/key/employeeKey.cs => Nucleotidz.Producer.Avro/Customer/Orders/CustomerOrderKey.cs} (66%) create mode 100644 Example/Nucleotidz.Producer.Avro/Customer/Orders/Items.cs delete mode 100644 Example/Nucleotidz.Producer.Json/Animal.cs create mode 100644 Example/Nucleotidz.Producer.Json/Customer/Orders/CustomerOrder.cs rename Example/{Nucleotidz.Producer.Avro/com/nucleotidz/employee/employeeMessage.cs => Nucleotidz.Producer.Json/Customer/Orders/CustomerOrderKey.cs} (66%) create mode 100644 Example/Nucleotidz.Producer.Json/Customer/Orders/Items.cs diff --git a/Example/Nucleotidz.Consumer.Avro/Customer/Orders/CustomerOrder.cs b/Example/Nucleotidz.Consumer.Avro/Customer/Orders/CustomerOrder.cs new file mode 100644 index 0000000..6e5f738 --- /dev/null +++ b/Example/Nucleotidz.Consumer.Avro/Customer/Orders/CustomerOrder.cs @@ -0,0 +1,83 @@ +// ------------------------------------------------------------------------------ +// +// Generated by avrogen, version 1.10.0.0 +// Changes to this file may cause incorrect behavior and will be lost if code +// is regenerated +// +// ------------------------------------------------------------------------------ +namespace Customer.Orders +{ + using System; + using System.Collections.Generic; + using System.Text; + using Avro; + using Avro.Specific; + + public partial class CustomerOrder : ISpecificRecord + { + public static Schema _SCHEMA = Avro.Schema.Parse(@"{""type"":""record"",""name"":""CustomerOrder"",""namespace"":""Customer.Orders"",""fields"":[{""name"":""orderNumber"",""type"":""string""},{""name"":""status"",""type"":""string""},{""name"":""orderItems"",""type"":{""type"":""array"",""items"":{""type"":""record"",""name"":""Items"",""namespace"":""Customer.Orders"",""fields"":[{""name"":""quantity"",""type"":""int""},{""name"":""Name"",""type"":""string""}]}}}]}"); + private string _orderNumber; + private string _status; + private IList _orderItems; + public virtual Schema Schema + { + get + { + return CustomerOrder._SCHEMA; + } + } + public string orderNumber + { + get + { + return this._orderNumber; + } + set + { + this._orderNumber = value; + } + } + public string status + { + get + { + return this._status; + } + set + { + this._status = value; + } + } + public IList orderItems + { + get + { + return this._orderItems; + } + set + { + this._orderItems = value; + } + } + public virtual object Get(int fieldPos) + { + switch (fieldPos) + { + case 0: return this.orderNumber; + case 1: return this.status; + case 2: return this.orderItems; + default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()"); + }; + } + public virtual void Put(int fieldPos, object fieldValue) + { + switch (fieldPos) + { + case 0: this.orderNumber = (System.String)fieldValue; break; + case 1: this.status = (System.String)fieldValue; break; + case 2: this.orderItems = (IList)fieldValue; break; + default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()"); + }; + } + } +} diff --git a/Example/Nucleotidz.Consumer.Avro/com/nucleotidz/employee/employeeMessage.cs b/Example/Nucleotidz.Consumer.Avro/Customer/Orders/CustomerOrderKey.cs similarity index 66% rename from Example/Nucleotidz.Consumer.Avro/com/nucleotidz/employee/employeeMessage.cs rename to Example/Nucleotidz.Consumer.Avro/Customer/Orders/CustomerOrderKey.cs index 7d4716d..4f27fae 100644 --- a/Example/Nucleotidz.Consumer.Avro/com/nucleotidz/employee/employeeMessage.cs +++ b/Example/Nucleotidz.Consumer.Avro/Customer/Orders/CustomerOrderKey.cs @@ -5,7 +5,7 @@ // is regenerated // // ------------------------------------------------------------------------------ -namespace com.nucleotidz.employee +namespace Customer.Orders { using System; using System.Collections.Generic; @@ -13,34 +13,34 @@ namespace com.nucleotidz.employee using Avro; using Avro.Specific; - public partial class employeeMessage : ISpecificRecord + public partial class CustomerOrderKey : ISpecificRecord { - public static Schema _SCHEMA = Avro.Schema.Parse("{\"type\":\"record\",\"name\":\"employeeMessage\",\"namespace\":\"com.nucleotidz.employee\",\"" + - "fields\":[{\"name\":\"Name\",\"type\":\"string\"}]}"); - private string _Name; + public static Schema _SCHEMA = Avro.Schema.Parse("{\"type\":\"record\",\"name\":\"CustomerOrderKey\",\"namespace\":\"Customer.Orders\",\"fields\"" + + ":[{\"name\":\"orderNumber\",\"type\":\"string\"}]}"); + private string _orderNumber; public virtual Schema Schema { get { - return employeeMessage._SCHEMA; + return CustomerOrderKey._SCHEMA; } } - public string Name + public string orderNumber { get { - return this._Name; + return this._orderNumber; } set { - this._Name = value; + this._orderNumber = value; } } public virtual object Get(int fieldPos) { switch (fieldPos) { - case 0: return this.Name; + case 0: return this.orderNumber; default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()"); }; } @@ -48,7 +48,7 @@ public virtual void Put(int fieldPos, object fieldValue) { switch (fieldPos) { - case 0: this.Name = (System.String)fieldValue; break; + case 0: this.orderNumber = (System.String)fieldValue; break; default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()"); }; } diff --git a/Example/Nucleotidz.Producer.Avro/com/nucleotidz/employee/key/employeeKey.cs b/Example/Nucleotidz.Consumer.Avro/Customer/Orders/Items.cs similarity index 58% rename from Example/Nucleotidz.Producer.Avro/com/nucleotidz/employee/key/employeeKey.cs rename to Example/Nucleotidz.Consumer.Avro/Customer/Orders/Items.cs index 3bfc326..a276d56 100644 --- a/Example/Nucleotidz.Producer.Avro/com/nucleotidz/employee/key/employeeKey.cs +++ b/Example/Nucleotidz.Consumer.Avro/Customer/Orders/Items.cs @@ -5,7 +5,7 @@ // is regenerated // // ------------------------------------------------------------------------------ -namespace com.nucleotidz.employee.key +namespace Customer.Orders { using System; using System.Collections.Generic; @@ -13,34 +13,47 @@ namespace com.nucleotidz.employee.key using Avro; using Avro.Specific; - public partial class employeeKey : ISpecificRecord + public partial class Items : ISpecificRecord { - public static Schema _SCHEMA = Avro.Schema.Parse("{\"type\":\"record\",\"name\":\"employeeKey\",\"namespace\":\"com.nucleotidz.employee.key\",\"" + - "fields\":[{\"name\":\"Id\",\"type\":\"string\"}]}"); - private string _Id; + public static Schema _SCHEMA = Avro.Schema.Parse("{\"type\":\"record\",\"name\":\"Items\",\"namespace\":\"Customer.Orders\",\"fields\":[{\"name\":\"" + + "quantity\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}"); + private int _quantity; + private string _Name; public virtual Schema Schema { get { - return employeeKey._SCHEMA; + return Items._SCHEMA; } } - public string Id + public int quantity { get { - return this._Id; + return this._quantity; } set { - this._Id = value; + this._quantity = value; + } + } + public string Name + { + get + { + return this._Name; + } + set + { + this._Name = value; } } public virtual object Get(int fieldPos) { switch (fieldPos) { - case 0: return this.Id; + case 0: return this.quantity; + case 1: return this.Name; default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()"); }; } @@ -48,7 +61,8 @@ public virtual void Put(int fieldPos, object fieldValue) { switch (fieldPos) { - case 0: this.Id = (System.String)fieldValue; break; + case 0: this.quantity = (System.Int32)fieldValue; break; + case 1: this.Name = (System.String)fieldValue; break; default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()"); }; } diff --git a/Example/Nucleotidz.Consumer.Avro/ErrorHandler.cs b/Example/Nucleotidz.Consumer.Avro/ErrorHandler.cs index 7125f68..b35eeaa 100644 --- a/Example/Nucleotidz.Consumer.Avro/ErrorHandler.cs +++ b/Example/Nucleotidz.Consumer.Avro/ErrorHandler.cs @@ -1,19 +1,12 @@ -using com.nucleotidz.employee.key; -using com.nucleotidz.employee; -using Newtonsoft.Json.Linq; +using Confluent.Kafka; +using Customer.Orders; using Nucleotidz.Kafka.Abstraction; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using Confluent.Kafka; namespace Nucleotidz.Consumer.Avro { - public class ErrorHandler : IConsumerErrorHandler + public class ErrorHandler : IConsumerErrorHandler { - public void Handle(IConsumer consumer, Error error) + public void Handle(IConsumer consumer, Error error) { Console.WriteLine(error.ToString()); } diff --git a/Example/Nucleotidz.Consumer.Avro/Program.cs b/Example/Nucleotidz.Consumer.Avro/Program.cs index 988229c..11b9d90 100644 --- a/Example/Nucleotidz.Consumer.Avro/Program.cs +++ b/Example/Nucleotidz.Consumer.Avro/Program.cs @@ -1,17 +1,17 @@ -using com.nucleotidz.employee.key; -using com.nucleotidz.employee; + using Newtonsoft.Json.Linq; using Nucleotidz.Kafka.Abstraction; using Nucleotidz.Kafka.Consumer; using Nucleotidz.Consumer.Avro; +using Customer.Orders; IHost host = Host.CreateDefaultBuilder(args) .ConfigureServices((hostContext, services) => { - services.AddTransient, ErrorHandler>(); - services.AddTransient>(_ => default); - services.AddTransient, Worker>(); - services.AddConsumer(hostContext.Configuration, "Kafka", "Kafka:SchemaRegistry",SerializationScheme.avro); + services.AddTransient, ErrorHandler>(); + services.AddTransient>(_ => default); + services.AddTransient, Worker>(); + services.AddConsumer(hostContext.Configuration, "Kafka", "Kafka:SchemaRegistry",SerializationScheme.avro); }) .Build(); diff --git a/Example/Nucleotidz.Consumer.Avro/Worker.cs b/Example/Nucleotidz.Consumer.Avro/Worker.cs index 5abbc77..26442e6 100644 --- a/Example/Nucleotidz.Consumer.Avro/Worker.cs +++ b/Example/Nucleotidz.Consumer.Avro/Worker.cs @@ -1,16 +1,16 @@ namespace Nucleotidz.Consumer.Avro { - using com.nucleotidz.employee; - using com.nucleotidz.employee.key; + using Confluent.Kafka; + using Customer.Orders; using Nucleotidz.Kafka.Abstraction; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; - public class Worker : IHandler + public class Worker : IHandler { - public async Task> HandleAsync(IEnumerable> consumeResults, CancellationToken cancellationToken) + public async Task> HandleAsync(IEnumerable> consumeResults, CancellationToken cancellationToken) { await Task.CompletedTask; Console.WriteLine(consumeResults.Count().ToString()); diff --git a/Example/Nucleotidz.Producer.Avro/Customer/Orders/CustomerOrder.cs b/Example/Nucleotidz.Producer.Avro/Customer/Orders/CustomerOrder.cs new file mode 100644 index 0000000..6e5f738 --- /dev/null +++ b/Example/Nucleotidz.Producer.Avro/Customer/Orders/CustomerOrder.cs @@ -0,0 +1,83 @@ +// ------------------------------------------------------------------------------ +// +// Generated by avrogen, version 1.10.0.0 +// Changes to this file may cause incorrect behavior and will be lost if code +// is regenerated +// +// ------------------------------------------------------------------------------ +namespace Customer.Orders +{ + using System; + using System.Collections.Generic; + using System.Text; + using Avro; + using Avro.Specific; + + public partial class CustomerOrder : ISpecificRecord + { + public static Schema _SCHEMA = Avro.Schema.Parse(@"{""type"":""record"",""name"":""CustomerOrder"",""namespace"":""Customer.Orders"",""fields"":[{""name"":""orderNumber"",""type"":""string""},{""name"":""status"",""type"":""string""},{""name"":""orderItems"",""type"":{""type"":""array"",""items"":{""type"":""record"",""name"":""Items"",""namespace"":""Customer.Orders"",""fields"":[{""name"":""quantity"",""type"":""int""},{""name"":""Name"",""type"":""string""}]}}}]}"); + private string _orderNumber; + private string _status; + private IList _orderItems; + public virtual Schema Schema + { + get + { + return CustomerOrder._SCHEMA; + } + } + public string orderNumber + { + get + { + return this._orderNumber; + } + set + { + this._orderNumber = value; + } + } + public string status + { + get + { + return this._status; + } + set + { + this._status = value; + } + } + public IList orderItems + { + get + { + return this._orderItems; + } + set + { + this._orderItems = value; + } + } + public virtual object Get(int fieldPos) + { + switch (fieldPos) + { + case 0: return this.orderNumber; + case 1: return this.status; + case 2: return this.orderItems; + default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()"); + }; + } + public virtual void Put(int fieldPos, object fieldValue) + { + switch (fieldPos) + { + case 0: this.orderNumber = (System.String)fieldValue; break; + case 1: this.status = (System.String)fieldValue; break; + case 2: this.orderItems = (IList)fieldValue; break; + default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()"); + }; + } + } +} diff --git a/Example/Nucleotidz.Consumer.Avro/com/nucleotidz/employee/key/employeeKey.cs b/Example/Nucleotidz.Producer.Avro/Customer/Orders/CustomerOrderKey.cs similarity index 66% rename from Example/Nucleotidz.Consumer.Avro/com/nucleotidz/employee/key/employeeKey.cs rename to Example/Nucleotidz.Producer.Avro/Customer/Orders/CustomerOrderKey.cs index 3bfc326..4f27fae 100644 --- a/Example/Nucleotidz.Consumer.Avro/com/nucleotidz/employee/key/employeeKey.cs +++ b/Example/Nucleotidz.Producer.Avro/Customer/Orders/CustomerOrderKey.cs @@ -5,7 +5,7 @@ // is regenerated // // ------------------------------------------------------------------------------ -namespace com.nucleotidz.employee.key +namespace Customer.Orders { using System; using System.Collections.Generic; @@ -13,34 +13,34 @@ namespace com.nucleotidz.employee.key using Avro; using Avro.Specific; - public partial class employeeKey : ISpecificRecord + public partial class CustomerOrderKey : ISpecificRecord { - public static Schema _SCHEMA = Avro.Schema.Parse("{\"type\":\"record\",\"name\":\"employeeKey\",\"namespace\":\"com.nucleotidz.employee.key\",\"" + - "fields\":[{\"name\":\"Id\",\"type\":\"string\"}]}"); - private string _Id; + public static Schema _SCHEMA = Avro.Schema.Parse("{\"type\":\"record\",\"name\":\"CustomerOrderKey\",\"namespace\":\"Customer.Orders\",\"fields\"" + + ":[{\"name\":\"orderNumber\",\"type\":\"string\"}]}"); + private string _orderNumber; public virtual Schema Schema { get { - return employeeKey._SCHEMA; + return CustomerOrderKey._SCHEMA; } } - public string Id + public string orderNumber { get { - return this._Id; + return this._orderNumber; } set { - this._Id = value; + this._orderNumber = value; } } public virtual object Get(int fieldPos) { switch (fieldPos) { - case 0: return this.Id; + case 0: return this.orderNumber; default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()"); }; } @@ -48,7 +48,7 @@ public virtual void Put(int fieldPos, object fieldValue) { switch (fieldPos) { - case 0: this.Id = (System.String)fieldValue; break; + case 0: this.orderNumber = (System.String)fieldValue; break; default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()"); }; } diff --git a/Example/Nucleotidz.Producer.Avro/Customer/Orders/Items.cs b/Example/Nucleotidz.Producer.Avro/Customer/Orders/Items.cs new file mode 100644 index 0000000..a276d56 --- /dev/null +++ b/Example/Nucleotidz.Producer.Avro/Customer/Orders/Items.cs @@ -0,0 +1,70 @@ +// ------------------------------------------------------------------------------ +// +// Generated by avrogen, version 1.10.0.0 +// Changes to this file may cause incorrect behavior and will be lost if code +// is regenerated +// +// ------------------------------------------------------------------------------ +namespace Customer.Orders +{ + using System; + using System.Collections.Generic; + using System.Text; + using Avro; + using Avro.Specific; + + public partial class Items : ISpecificRecord + { + public static Schema _SCHEMA = Avro.Schema.Parse("{\"type\":\"record\",\"name\":\"Items\",\"namespace\":\"Customer.Orders\",\"fields\":[{\"name\":\"" + + "quantity\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}"); + private int _quantity; + private string _Name; + public virtual Schema Schema + { + get + { + return Items._SCHEMA; + } + } + public int quantity + { + get + { + return this._quantity; + } + set + { + this._quantity = value; + } + } + public string Name + { + get + { + return this._Name; + } + set + { + this._Name = value; + } + } + public virtual object Get(int fieldPos) + { + switch (fieldPos) + { + case 0: return this.quantity; + case 1: return this.Name; + default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()"); + }; + } + public virtual void Put(int fieldPos, object fieldValue) + { + switch (fieldPos) + { + case 0: this.quantity = (System.Int32)fieldValue; break; + case 1: this.Name = (System.String)fieldValue; break; + default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()"); + }; + } + } +} diff --git a/Example/Nucleotidz.Producer.Avro/Program.cs b/Example/Nucleotidz.Producer.Avro/Program.cs index 842acc9..adac3fd 100644 --- a/Example/Nucleotidz.Producer.Avro/Program.cs +++ b/Example/Nucleotidz.Producer.Avro/Program.cs @@ -1,14 +1,14 @@ -using com.nucleotidz.employee; -using com.nucleotidz.employee.key; + using Nucleotidz.Kafka.Abstraction; using Nucleotidz.Producer.Avro; using Nucleotidz.Kafka.Producer; +using Customer.Orders; IHost host = Host.CreateDefaultBuilder(args) .ConfigureServices((hostContext, services) => { - services.AddTransient>(_ => default); - services.AddProducer(hostContext.Configuration, "Kafka", "Kafka:SchemaRegistry", SerializationScheme.avro); + services.AddTransient>(_ => default); + services.AddProducer(hostContext.Configuration, "Kafka", "Kafka:SchemaRegistry", SerializationScheme.avro); services.AddHostedService(); }) diff --git a/Example/Nucleotidz.Producer.Avro/Worker.cs b/Example/Nucleotidz.Producer.Avro/Worker.cs index 426ae84..70d3188 100644 --- a/Example/Nucleotidz.Producer.Avro/Worker.cs +++ b/Example/Nucleotidz.Producer.Avro/Worker.cs @@ -1,6 +1,6 @@ -using com.nucleotidz.employee; -using com.nucleotidz.employee.key; + using Confluent.Kafka; +using Customer.Orders; using Nucleotidz.Kafka.Abstraction; namespace Nucleotidz.Producer.Avro @@ -8,8 +8,8 @@ namespace Nucleotidz.Producer.Avro public class Worker : BackgroundService { private readonly ILogger _logger; - private readonly IMessageProducer _messageProducer; - public Worker(ILogger logger, IMessageProducer messageProducer) + private readonly IMessageProducer _messageProducer; + public Worker(ILogger logger, IMessageProducer messageProducer) { _messageProducer = messageProducer; _logger = logger; @@ -17,15 +17,18 @@ public Worker(ILogger logger, IMessageProducer message = new Message(); - message.Value = new employeeMessage { Name = "Tom" }; - message.Key = new employeeKey { Id = "TM100" }; - var result = await _messageProducer.Produce(message); - if (result.Status == PersistenceStatus.Persisted || result.Status == PersistenceStatus.PossiblyPersisted) + for (int i = 0; i < 5; i++) { - _logger.LogInformation("Message Prdouced"); + Message message = new Message(); + message.Value = new CustomerOrder { status = "L", orderNumber = $"OD{i.ToString()}", orderItems = new List { new Items { Name = "Car", quantity = 1 } } }; + message.Key = new CustomerOrderKey { orderNumber = $"OD{i.ToString()}" }; + var result = await _messageProducer.Produce(message); + if (result.Status == PersistenceStatus.Persisted || result.Status == PersistenceStatus.PossiblyPersisted) + { + _logger.LogInformation("Message Prdouced"); + } + await Task.CompletedTask; } - await Task.CompletedTask; } } } \ No newline at end of file diff --git a/Example/Nucleotidz.Producer.Json/Animal.cs b/Example/Nucleotidz.Producer.Json/Animal.cs deleted file mode 100644 index 9aebdf3..0000000 --- a/Example/Nucleotidz.Producer.Json/Animal.cs +++ /dev/null @@ -1,24 +0,0 @@ -using Newtonsoft.Json; - -namespace Nucleotidz.Producer.Json -{ - public class Animal - { - [JsonRequired] - [JsonProperty("category")] - public string category { get; set; } - - [JsonRequired] - [JsonProperty("name")] - public string name { get; set; } - } - - public class AnimalKey - { - [JsonRequired] - [JsonProperty("tagid")] - public string tagid { get; set; } - - } - -} \ No newline at end of file diff --git a/Example/Nucleotidz.Producer.Json/Customer/Orders/CustomerOrder.cs b/Example/Nucleotidz.Producer.Json/Customer/Orders/CustomerOrder.cs new file mode 100644 index 0000000..6e5f738 --- /dev/null +++ b/Example/Nucleotidz.Producer.Json/Customer/Orders/CustomerOrder.cs @@ -0,0 +1,83 @@ +// ------------------------------------------------------------------------------ +// +// Generated by avrogen, version 1.10.0.0 +// Changes to this file may cause incorrect behavior and will be lost if code +// is regenerated +// +// ------------------------------------------------------------------------------ +namespace Customer.Orders +{ + using System; + using System.Collections.Generic; + using System.Text; + using Avro; + using Avro.Specific; + + public partial class CustomerOrder : ISpecificRecord + { + public static Schema _SCHEMA = Avro.Schema.Parse(@"{""type"":""record"",""name"":""CustomerOrder"",""namespace"":""Customer.Orders"",""fields"":[{""name"":""orderNumber"",""type"":""string""},{""name"":""status"",""type"":""string""},{""name"":""orderItems"",""type"":{""type"":""array"",""items"":{""type"":""record"",""name"":""Items"",""namespace"":""Customer.Orders"",""fields"":[{""name"":""quantity"",""type"":""int""},{""name"":""Name"",""type"":""string""}]}}}]}"); + private string _orderNumber; + private string _status; + private IList _orderItems; + public virtual Schema Schema + { + get + { + return CustomerOrder._SCHEMA; + } + } + public string orderNumber + { + get + { + return this._orderNumber; + } + set + { + this._orderNumber = value; + } + } + public string status + { + get + { + return this._status; + } + set + { + this._status = value; + } + } + public IList orderItems + { + get + { + return this._orderItems; + } + set + { + this._orderItems = value; + } + } + public virtual object Get(int fieldPos) + { + switch (fieldPos) + { + case 0: return this.orderNumber; + case 1: return this.status; + case 2: return this.orderItems; + default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()"); + }; + } + public virtual void Put(int fieldPos, object fieldValue) + { + switch (fieldPos) + { + case 0: this.orderNumber = (System.String)fieldValue; break; + case 1: this.status = (System.String)fieldValue; break; + case 2: this.orderItems = (IList)fieldValue; break; + default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()"); + }; + } + } +} diff --git a/Example/Nucleotidz.Producer.Avro/com/nucleotidz/employee/employeeMessage.cs b/Example/Nucleotidz.Producer.Json/Customer/Orders/CustomerOrderKey.cs similarity index 66% rename from Example/Nucleotidz.Producer.Avro/com/nucleotidz/employee/employeeMessage.cs rename to Example/Nucleotidz.Producer.Json/Customer/Orders/CustomerOrderKey.cs index 7d4716d..4f27fae 100644 --- a/Example/Nucleotidz.Producer.Avro/com/nucleotidz/employee/employeeMessage.cs +++ b/Example/Nucleotidz.Producer.Json/Customer/Orders/CustomerOrderKey.cs @@ -5,7 +5,7 @@ // is regenerated // // ------------------------------------------------------------------------------ -namespace com.nucleotidz.employee +namespace Customer.Orders { using System; using System.Collections.Generic; @@ -13,34 +13,34 @@ namespace com.nucleotidz.employee using Avro; using Avro.Specific; - public partial class employeeMessage : ISpecificRecord + public partial class CustomerOrderKey : ISpecificRecord { - public static Schema _SCHEMA = Avro.Schema.Parse("{\"type\":\"record\",\"name\":\"employeeMessage\",\"namespace\":\"com.nucleotidz.employee\",\"" + - "fields\":[{\"name\":\"Name\",\"type\":\"string\"}]}"); - private string _Name; + public static Schema _SCHEMA = Avro.Schema.Parse("{\"type\":\"record\",\"name\":\"CustomerOrderKey\",\"namespace\":\"Customer.Orders\",\"fields\"" + + ":[{\"name\":\"orderNumber\",\"type\":\"string\"}]}"); + private string _orderNumber; public virtual Schema Schema { get { - return employeeMessage._SCHEMA; + return CustomerOrderKey._SCHEMA; } } - public string Name + public string orderNumber { get { - return this._Name; + return this._orderNumber; } set { - this._Name = value; + this._orderNumber = value; } } public virtual object Get(int fieldPos) { switch (fieldPos) { - case 0: return this.Name; + case 0: return this.orderNumber; default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()"); }; } @@ -48,7 +48,7 @@ public virtual void Put(int fieldPos, object fieldValue) { switch (fieldPos) { - case 0: this.Name = (System.String)fieldValue; break; + case 0: this.orderNumber = (System.String)fieldValue; break; default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()"); }; } diff --git a/Example/Nucleotidz.Producer.Json/Customer/Orders/Items.cs b/Example/Nucleotidz.Producer.Json/Customer/Orders/Items.cs new file mode 100644 index 0000000..a276d56 --- /dev/null +++ b/Example/Nucleotidz.Producer.Json/Customer/Orders/Items.cs @@ -0,0 +1,70 @@ +// ------------------------------------------------------------------------------ +// +// Generated by avrogen, version 1.10.0.0 +// Changes to this file may cause incorrect behavior and will be lost if code +// is regenerated +// +// ------------------------------------------------------------------------------ +namespace Customer.Orders +{ + using System; + using System.Collections.Generic; + using System.Text; + using Avro; + using Avro.Specific; + + public partial class Items : ISpecificRecord + { + public static Schema _SCHEMA = Avro.Schema.Parse("{\"type\":\"record\",\"name\":\"Items\",\"namespace\":\"Customer.Orders\",\"fields\":[{\"name\":\"" + + "quantity\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}"); + private int _quantity; + private string _Name; + public virtual Schema Schema + { + get + { + return Items._SCHEMA; + } + } + public int quantity + { + get + { + return this._quantity; + } + set + { + this._quantity = value; + } + } + public string Name + { + get + { + return this._Name; + } + set + { + this._Name = value; + } + } + public virtual object Get(int fieldPos) + { + switch (fieldPos) + { + case 0: return this.quantity; + case 1: return this.Name; + default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()"); + }; + } + public virtual void Put(int fieldPos, object fieldValue) + { + switch (fieldPos) + { + case 0: this.quantity = (System.Int32)fieldValue; break; + case 1: this.Name = (System.String)fieldValue; break; + default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()"); + }; + } + } +} diff --git a/Example/Nucleotidz.Producer.Json/Program.cs b/Example/Nucleotidz.Producer.Json/Program.cs index 279c128..7fffd78 100644 --- a/Example/Nucleotidz.Producer.Json/Program.cs +++ b/Example/Nucleotidz.Producer.Json/Program.cs @@ -1,12 +1,13 @@ using Nucleotidz.Kafka.Abstraction; using Nucleotidz.Producer.Json; using Nucleotidz.Kafka.Producer; +using Customer.Orders; IHost host = Host.CreateDefaultBuilder(args) .ConfigureServices((hostContext, services) => { - services.AddTransient>(_ => default); - services.AddProducer(hostContext.Configuration, "Kafka", "Kafka:SchemaRegistry", SerializationScheme.json); + services.AddTransient>(_ => default); + services.AddProducer(hostContext.Configuration, "Kafka", "Kafka:SchemaRegistry", SerializationScheme.json); services.AddHostedService(); }) diff --git a/Example/Nucleotidz.Producer.Json/Worker.cs b/Example/Nucleotidz.Producer.Json/Worker.cs index 4c8276d..a12e5b5 100644 --- a/Example/Nucleotidz.Producer.Json/Worker.cs +++ b/Example/Nucleotidz.Producer.Json/Worker.cs @@ -1,4 +1,5 @@ using Confluent.Kafka; +using Customer.Orders; using Nucleotidz.Kafka.Abstraction; namespace Nucleotidz.Producer.Json @@ -6,8 +7,8 @@ namespace Nucleotidz.Producer.Json public class Worker : BackgroundService { private readonly ILogger _logger; - private readonly IMessageProducer _messageProducer; - public Worker(ILogger logger, IMessageProducer messageProducer) + private readonly IMessageProducer _messageProducer; + public Worker(ILogger logger, IMessageProducer messageProducer) { _messageProducer = messageProducer; _logger = logger; @@ -15,15 +16,18 @@ public Worker(ILogger logger, IMessageProducer messag protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - Message message = new Message(); - message.Value = new Animal { category = "Omnivore",name="Human" }; - message.Key = new AnimalKey { tagid = "Ahmar" }; - var result = await _messageProducer.Produce(message); - if (result.Status == PersistenceStatus.Persisted || result.Status == PersistenceStatus.PossiblyPersisted) + for (int i = 0; i < 5; i++) { - _logger.LogInformation("Message Prdouced"); + Message message = new Message(); + message.Value = new CustomerOrder { status = "L", orderNumber = $"OD{i.ToString()}", orderItems = new List { new Items { Name = "Car", quantity = 1 } } }; + message.Key = new CustomerOrderKey { orderNumber = $"OD{i.ToString()}" }; + var result = await _messageProducer.Produce(message); + if (result.Status == PersistenceStatus.Persisted || result.Status == PersistenceStatus.PossiblyPersisted) + { + _logger.LogInformation("Message Prdouced"); + } + await Task.CompletedTask; } - await Task.CompletedTask; } } } \ No newline at end of file diff --git a/src/Consumer/Nucleotidz.Kafka.Consumer/Consumer.cs b/src/Consumer/Nucleotidz.Kafka.Consumer/Consumer.cs index c5b3411..47b5c8a 100644 --- a/src/Consumer/Nucleotidz.Kafka.Consumer/Consumer.cs +++ b/src/Consumer/Nucleotidz.Kafka.Consumer/Consumer.cs @@ -46,23 +46,10 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) } var offsets = await _handler.HandleAsync(buffer, stoppingToken); - var topicPartitionGroup = offsets.GroupBy(_ => new + foreach(var offset in offsets) { - _.Topic, - _.Partition.Value - }); - - foreach (var topicPartition in topicPartitionGroup) - { - TopicPartitionOffset? offset = topicPartition.OrderBy(o => o.Offset.Value).LastOrDefault(); - - if (offset is null) - { - continue; - } - TopicPartitionOffset offsetToCommit = new(offset.TopicPartition, offset.Offset + 1); - consumer.Commit(new[] { offsetToCommit }); - } + consumer.StoreOffset(offset); + } buffer.Clear(); lastReset = GetUtcTime(); diff --git a/src/Consumer/Nucleotidz.Kafka.Consumer/ConsumerFactory.cs b/src/Consumer/Nucleotidz.Kafka.Consumer/ConsumerFactory.cs index 64b63b1..1d7f546 100644 --- a/src/Consumer/Nucleotidz.Kafka.Consumer/ConsumerFactory.cs +++ b/src/Consumer/Nucleotidz.Kafka.Consumer/ConsumerFactory.cs @@ -11,7 +11,7 @@ public class ConsumerFactory : IConsumerFactory { private readonly ConsumerConfiguration _consumerConfiguration; private readonly ISerializerFactory _serializerFactory; - private readonly IConsumerErrorHandler _errorHandler; + private readonly IConsumerErrorHandler _errorHandler; private readonly IPartitionsAssignedHandler _partitionsAssignedHandler; private readonly HashSet _defaultDeserializers = new() { @@ -30,13 +30,13 @@ public ConsumerFactory(IOptions _consumerConfigurationOpt _errorHandler = errorHandler; _consumerConfiguration = _consumerConfigurationOption.Value; _serializerFactory = serializerFactory; - _partitionsAssignedHandler= partitionsAssignedHandler; + _partitionsAssignedHandler = partitionsAssignedHandler; } public IConsumer Create() { var consumerBuilder = new ConsumerBuilder(CreateConfiguration()); - if(_errorHandler is not null) + if (_errorHandler is not null) { consumerBuilder.SetErrorHandler((consumer, error) => _errorHandler.Handle(consumer, error)); } @@ -44,7 +44,7 @@ public IConsumer Create() { consumerBuilder.SetPartitionsAssignedHandler((consumer, topicPartitions) => _partitionsAssignedHandler.Handle(consumer, topicPartitions)); } - + ArgumentNullException.ThrowIfNull(_serializerFactory, nameof(_serializerFactory)); if (!_defaultDeserializers.Contains(typeof(TKey))) consumerBuilder.SetKeyDeserializer(_serializerFactory.CreateDeserializer()); @@ -57,7 +57,8 @@ private ConsumerConfig CreateConfiguration() return new ConsumerConfig { BootstrapServers = string.Join(",", _consumerConfiguration.BootstrapServers), - EnableAutoCommit = false, + EnableAutoCommit = true, + EnableAutoOffsetStore = false, GroupId = _consumerConfiguration.GroupName, ClientId = _consumerConfiguration.ClientId, SecurityProtocol = SecurityProtocol.SaslSsl, @@ -65,7 +66,7 @@ private ConsumerConfig CreateConfiguration() SaslUsername = _consumerConfiguration.Username, SaslPassword = _consumerConfiguration.Password, AutoOffsetReset = _consumerConfiguration.AutoOffsetReset, - Debug=_consumerConfiguration.Debug, + Debug = _consumerConfiguration.Debug, }; } From 423b9a11837482107b379992ee1367f75c83d786 Mon Sep 17 00:00:00 2001 From: ahmar-husain Date: Sat, 29 Apr 2023 12:30:16 +0530 Subject: [PATCH 2/4] Refactor --- .../Nucleotidz.Kafka.Consumer/Consumer.cs | 31 ++++++++++++++++--- .../ConsumerFactory.cs | 3 +- 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/src/Consumer/Nucleotidz.Kafka.Consumer/Consumer.cs b/src/Consumer/Nucleotidz.Kafka.Consumer/Consumer.cs index 47b5c8a..2d09eae 100644 --- a/src/Consumer/Nucleotidz.Kafka.Consumer/Consumer.cs +++ b/src/Consumer/Nucleotidz.Kafka.Consumer/Consumer.cs @@ -46,15 +46,36 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) } var offsets = await _handler.HandleAsync(buffer, stoppingToken); - foreach(var offset in offsets) - { - consumer.StoreOffset(offset); - } - + var toCommit=PrepareCommit(offsets); + consumer.Commit(toCommit); buffer.Clear(); lastReset = GetUtcTime(); } } + + private IEnumerable PrepareCommit(IEnumerable consumedOffsets) + { + var offsetToCommit = new List(); + var topicPartitionGroup = consumedOffsets.GroupBy(tpo => new + { + tpo.Topic, + tpo.Partition.Value + }); + + foreach (var topicPartition in topicPartitionGroup) + { + var offset = topicPartition.OrderBy(o => o.Offset.Value).LastOrDefault(); + + if (offset is null) + { + continue; + } + + offsetToCommit.Add( new TopicPartitionOffset(offset.TopicPartition, offset.Offset + 1)); + } + return offsetToCommit; + } + private DateTimeOffset GetUtcTime() { return DateTimeOffset.UtcNow; diff --git a/src/Consumer/Nucleotidz.Kafka.Consumer/ConsumerFactory.cs b/src/Consumer/Nucleotidz.Kafka.Consumer/ConsumerFactory.cs index 1d7f546..8d35bf8 100644 --- a/src/Consumer/Nucleotidz.Kafka.Consumer/ConsumerFactory.cs +++ b/src/Consumer/Nucleotidz.Kafka.Consumer/ConsumerFactory.cs @@ -57,8 +57,9 @@ private ConsumerConfig CreateConfiguration() return new ConsumerConfig { BootstrapServers = string.Join(",", _consumerConfiguration.BootstrapServers), - EnableAutoCommit = true, + EnableAutoCommit = false, EnableAutoOffsetStore = false, + AutoCommitIntervalMs = 2000, GroupId = _consumerConfiguration.GroupName, ClientId = _consumerConfiguration.ClientId, SecurityProtocol = SecurityProtocol.SaslSsl, From 4b51082be59299f82ed46f5d9fa94fd82634028d Mon Sep 17 00:00:00 2001 From: Ahmar Date: Sat, 29 Apr 2023 12:32:31 +0530 Subject: [PATCH 3/4] Update build.yml --- .github/workflows/build.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 4a30cb0..75e4fef 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -17,7 +17,7 @@ on: jobs: build: env: - solution: '${{ github.workspace }}/Nucleotidz.Kafka.Consumer.sln' + solution: '${{ github.workspace }}/Nucleotidz.Kafka.sln' feedurl : 'https://nuget.pkg.github.com/nucleo-tidz/index.json' name: publish runs-on: ubuntu-latest @@ -32,4 +32,4 @@ jobs: run: dotnet restore $solution - name: Build - run: dotnet build $solution --no-restore \ No newline at end of file + run: dotnet build $solution --no-restore From 6303c8e0c2e536fc5cad2d2951cf697bf85bf605 Mon Sep 17 00:00:00 2001 From: Ahmar Date: Sat, 29 Apr 2023 12:32:59 +0530 Subject: [PATCH 4/4] Update pack.nuget.yml --- .github/workflows/pack.nuget.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pack.nuget.yml b/.github/workflows/pack.nuget.yml index 18ca7bb..52b51fc 100644 --- a/.github/workflows/pack.nuget.yml +++ b/.github/workflows/pack.nuget.yml @@ -38,7 +38,7 @@ jobs: needs: semver if: ${{ needs.semver.steps.checksemver.outputs.IsSemanticVersion }}=='true' env: - solution: '${{ github.workspace }}/Nucleotidz.Kafka.Consumer.sln' + solution: '${{ github.workspace }}/Nucleotidz.Kafka.sln' feedurl : 'https://nuget.pkg.github.com/nucleo-tidz/index.json' name: publish runs-on: ubuntu-latest