Skip to content

Commit

Permalink
Merge pull request #5 from nucleo-tidz/dev/ah/commit-offset
Browse files Browse the repository at this point in the history
Dev/ah/commit offset
  • Loading branch information
ahmar-husain authored Apr 29, 2023
2 parents 36f3e81 + 6303c8e commit 059b8d6
Show file tree
Hide file tree
Showing 21 changed files with 529 additions and 139 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ on:
jobs:
build:
env:
solution: '${{ github.workspace }}/Nucleotidz.Kafka.Consumer.sln'
solution: '${{ github.workspace }}/Nucleotidz.Kafka.sln'
feedurl : 'https://nuget.pkg.github.com/nucleo-tidz/index.json'
name: publish
runs-on: ubuntu-latest
Expand All @@ -32,4 +32,4 @@ jobs:
run: dotnet restore $solution

- name: Build
run: dotnet build $solution --no-restore
run: dotnet build $solution --no-restore
2 changes: 1 addition & 1 deletion .github/workflows/pack.nuget.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
needs: semver
if: ${{ needs.semver.steps.checksemver.outputs.IsSemanticVersion }}=='true'
env:
solution: '${{ github.workspace }}/Nucleotidz.Kafka.Consumer.sln'
solution: '${{ github.workspace }}/Nucleotidz.Kafka.sln'
feedurl : 'https://nuget.pkg.github.com/nucleo-tidz/index.json'
name: publish
runs-on: ubuntu-latest
Expand Down
83 changes: 83 additions & 0 deletions Example/Nucleotidz.Consumer.Avro/Customer/Orders/CustomerOrder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// ------------------------------------------------------------------------------
// <auto-generated>
// Generated by avrogen, version 1.10.0.0
// Changes to this file may cause incorrect behavior and will be lost if code
// is regenerated
// </auto-generated>
// ------------------------------------------------------------------------------
namespace Customer.Orders
{
using System;
using System.Collections.Generic;
using System.Text;
using Avro;
using Avro.Specific;

public partial class CustomerOrder : ISpecificRecord
{
public static Schema _SCHEMA = Avro.Schema.Parse(@"{""type"":""record"",""name"":""CustomerOrder"",""namespace"":""Customer.Orders"",""fields"":[{""name"":""orderNumber"",""type"":""string""},{""name"":""status"",""type"":""string""},{""name"":""orderItems"",""type"":{""type"":""array"",""items"":{""type"":""record"",""name"":""Items"",""namespace"":""Customer.Orders"",""fields"":[{""name"":""quantity"",""type"":""int""},{""name"":""Name"",""type"":""string""}]}}}]}");
private string _orderNumber;
private string _status;
private IList<Customer.Orders.Items> _orderItems;
public virtual Schema Schema
{
get
{
return CustomerOrder._SCHEMA;
}
}
public string orderNumber
{
get
{
return this._orderNumber;
}
set
{
this._orderNumber = value;
}
}
public string status
{
get
{
return this._status;
}
set
{
this._status = value;
}
}
public IList<Customer.Orders.Items> orderItems
{
get
{
return this._orderItems;
}
set
{
this._orderItems = value;
}
}
public virtual object Get(int fieldPos)
{
switch (fieldPos)
{
case 0: return this.orderNumber;
case 1: return this.status;
case 2: return this.orderItems;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()");
};
}
public virtual void Put(int fieldPos, object fieldValue)
{
switch (fieldPos)
{
case 0: this.orderNumber = (System.String)fieldValue; break;
case 1: this.status = (System.String)fieldValue; break;
case 2: this.orderItems = (IList<Customer.Orders.Items>)fieldValue; break;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()");
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,50 +5,50 @@
// is regenerated
// </auto-generated>
// ------------------------------------------------------------------------------
namespace com.nucleotidz.employee
namespace Customer.Orders
{
using System;
using System.Collections.Generic;
using System.Text;
using Avro;
using Avro.Specific;

public partial class employeeMessage : ISpecificRecord
public partial class CustomerOrderKey : ISpecificRecord
{
public static Schema _SCHEMA = Avro.Schema.Parse("{\"type\":\"record\",\"name\":\"employeeMessage\",\"namespace\":\"com.nucleotidz.employee\",\"" +
"fields\":[{\"name\":\"Name\",\"type\":\"string\"}]}");
private string _Name;
public static Schema _SCHEMA = Avro.Schema.Parse("{\"type\":\"record\",\"name\":\"CustomerOrderKey\",\"namespace\":\"Customer.Orders\",\"fields\"" +
":[{\"name\":\"orderNumber\",\"type\":\"string\"}]}");
private string _orderNumber;
public virtual Schema Schema
{
get
{
return employeeMessage._SCHEMA;
return CustomerOrderKey._SCHEMA;
}
}
public string Name
public string orderNumber
{
get
{
return this._Name;
return this._orderNumber;
}
set
{
this._Name = value;
this._orderNumber = value;
}
}
public virtual object Get(int fieldPos)
{
switch (fieldPos)
{
case 0: return this.Name;
case 0: return this.orderNumber;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()");
};
}
public virtual void Put(int fieldPos, object fieldValue)
{
switch (fieldPos)
{
case 0: this.Name = (System.String)fieldValue; break;
case 0: this.orderNumber = (System.String)fieldValue; break;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()");
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,50 +5,64 @@
// is regenerated
// </auto-generated>
// ------------------------------------------------------------------------------
namespace com.nucleotidz.employee.key
namespace Customer.Orders
{
using System;
using System.Collections.Generic;
using System.Text;
using Avro;
using Avro.Specific;

public partial class employeeKey : ISpecificRecord
public partial class Items : ISpecificRecord
{
public static Schema _SCHEMA = Avro.Schema.Parse("{\"type\":\"record\",\"name\":\"employeeKey\",\"namespace\":\"com.nucleotidz.employee.key\",\"" +
"fields\":[{\"name\":\"Id\",\"type\":\"string\"}]}");
private string _Id;
public static Schema _SCHEMA = Avro.Schema.Parse("{\"type\":\"record\",\"name\":\"Items\",\"namespace\":\"Customer.Orders\",\"fields\":[{\"name\":\"" +
"quantity\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}");
private int _quantity;
private string _Name;
public virtual Schema Schema
{
get
{
return employeeKey._SCHEMA;
return Items._SCHEMA;
}
}
public string Id
public int quantity
{
get
{
return this._Id;
return this._quantity;
}
set
{
this._Id = value;
this._quantity = value;
}
}
public string Name
{
get
{
return this._Name;
}
set
{
this._Name = value;
}
}
public virtual object Get(int fieldPos)
{
switch (fieldPos)
{
case 0: return this.Id;
case 0: return this.quantity;
case 1: return this.Name;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()");
};
}
public virtual void Put(int fieldPos, object fieldValue)
{
switch (fieldPos)
{
case 0: this.Id = (System.String)fieldValue; break;
case 0: this.quantity = (System.Int32)fieldValue; break;
case 1: this.Name = (System.String)fieldValue; break;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()");
};
}
Expand Down
15 changes: 4 additions & 11 deletions Example/Nucleotidz.Consumer.Avro/ErrorHandler.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,12 @@
using com.nucleotidz.employee.key;
using com.nucleotidz.employee;
using Newtonsoft.Json.Linq;
using Confluent.Kafka;
using Customer.Orders;
using Nucleotidz.Kafka.Abstraction;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Confluent.Kafka;

namespace Nucleotidz.Consumer.Avro
{
public class ErrorHandler : IConsumerErrorHandler<employeeKey, employeeMessage>
public class ErrorHandler : IConsumerErrorHandler<CustomerOrderKey, CustomerOrder>
{
public void Handle(IConsumer<employeeKey, employeeMessage> consumer, Error error)
public void Handle(IConsumer<CustomerOrderKey, CustomerOrder> consumer, Error error)
{
Console.WriteLine(error.ToString());
}
Expand Down
12 changes: 6 additions & 6 deletions Example/Nucleotidz.Consumer.Avro/Program.cs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
using com.nucleotidz.employee.key;
using com.nucleotidz.employee;

using Newtonsoft.Json.Linq;
using Nucleotidz.Kafka.Abstraction;
using Nucleotidz.Kafka.Consumer;
using Nucleotidz.Consumer.Avro;
using Customer.Orders;

IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
services.AddTransient<IConsumerErrorHandler<employeeKey, employeeMessage>, ErrorHandler>();
services.AddTransient<IPartitionsAssignedHandler<employeeKey, employeeMessage>>(_ => default);
services.AddTransient<IHandler<employeeKey, employeeMessage>, Worker>();
services.AddConsumer<employeeKey, employeeMessage>(hostContext.Configuration, "Kafka", "Kafka:SchemaRegistry",SerializationScheme.avro);
services.AddTransient<IConsumerErrorHandler<CustomerOrderKey, CustomerOrder>, ErrorHandler>();
services.AddTransient<IPartitionsAssignedHandler<CustomerOrderKey, CustomerOrder>>(_ => default);
services.AddTransient<IHandler<CustomerOrderKey, CustomerOrder>, Worker>();
services.AddConsumer<CustomerOrderKey, CustomerOrder>(hostContext.Configuration, "Kafka", "Kafka:SchemaRegistry",SerializationScheme.avro);
})
.Build();

Expand Down
8 changes: 4 additions & 4 deletions Example/Nucleotidz.Consumer.Avro/Worker.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
namespace Nucleotidz.Consumer.Avro
{
using com.nucleotidz.employee;
using com.nucleotidz.employee.key;

using Confluent.Kafka;
using Customer.Orders;
using Nucleotidz.Kafka.Abstraction;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

public class Worker : IHandler<employeeKey, employeeMessage>
public class Worker : IHandler<CustomerOrderKey, CustomerOrder>
{
public async Task<IEnumerable<TopicPartitionOffset>> HandleAsync(IEnumerable<ConsumeResult<employeeKey, employeeMessage>> consumeResults, CancellationToken cancellationToken)
public async Task<IEnumerable<TopicPartitionOffset>> HandleAsync(IEnumerable<ConsumeResult<CustomerOrderKey, CustomerOrder>> consumeResults, CancellationToken cancellationToken)
{
await Task.CompletedTask;
Console.WriteLine(consumeResults.Count().ToString());
Expand Down
83 changes: 83 additions & 0 deletions Example/Nucleotidz.Producer.Avro/Customer/Orders/CustomerOrder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// ------------------------------------------------------------------------------
// <auto-generated>
// Generated by avrogen, version 1.10.0.0
// Changes to this file may cause incorrect behavior and will be lost if code
// is regenerated
// </auto-generated>
// ------------------------------------------------------------------------------
namespace Customer.Orders
{
using System;
using System.Collections.Generic;
using System.Text;
using Avro;
using Avro.Specific;

public partial class CustomerOrder : ISpecificRecord
{
public static Schema _SCHEMA = Avro.Schema.Parse(@"{""type"":""record"",""name"":""CustomerOrder"",""namespace"":""Customer.Orders"",""fields"":[{""name"":""orderNumber"",""type"":""string""},{""name"":""status"",""type"":""string""},{""name"":""orderItems"",""type"":{""type"":""array"",""items"":{""type"":""record"",""name"":""Items"",""namespace"":""Customer.Orders"",""fields"":[{""name"":""quantity"",""type"":""int""},{""name"":""Name"",""type"":""string""}]}}}]}");
private string _orderNumber;
private string _status;
private IList<Customer.Orders.Items> _orderItems;
public virtual Schema Schema
{
get
{
return CustomerOrder._SCHEMA;
}
}
public string orderNumber
{
get
{
return this._orderNumber;
}
set
{
this._orderNumber = value;
}
}
public string status
{
get
{
return this._status;
}
set
{
this._status = value;
}
}
public IList<Customer.Orders.Items> orderItems
{
get
{
return this._orderItems;
}
set
{
this._orderItems = value;
}
}
public virtual object Get(int fieldPos)
{
switch (fieldPos)
{
case 0: return this.orderNumber;
case 1: return this.status;
case 2: return this.orderItems;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()");
};
}
public virtual void Put(int fieldPos, object fieldValue)
{
switch (fieldPos)
{
case 0: this.orderNumber = (System.String)fieldValue; break;
case 1: this.status = (System.String)fieldValue; break;
case 2: this.orderItems = (IList<Customer.Orders.Items>)fieldValue; break;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()");
};
}
}
}
Loading

0 comments on commit 059b8d6

Please sign in to comment.