Skip to content

Commit

Permalink
Implemented KafkaProducer
Browse files Browse the repository at this point in the history
  • Loading branch information
eneshoxha committed Sep 8, 2021
1 parent 3dfdc4c commit 571a1d6
Showing 1 changed file with 27 additions and 3 deletions.
30 changes: 27 additions & 3 deletions src/Buildersoft.Messaging/App/Kafka/KafkaProducer.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using Buildersoft.Messaging.Abstraction;
using Buildersoft.Messaging.Configuration;
using Confluent.Kafka;
using System;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -9,14 +11,36 @@ public class KafkaProducer<T> : IMessagingProducer<T>
{
public event IMessagingProducer<T>.StatusChangedHandler StatusChanged;

private readonly MessagingProducerConfiguration<T> _messagingProducerConfiguration;
private readonly ClientConfig _clientConfig;
private IProducer<Null, String> producer;

public KafkaProducer(IMessagingClient messagingClient, MessagingProducerConfiguration<T> messagingProducerConfiguration)
{
_messagingProducerConfiguration = messagingProducerConfiguration;

_clientConfig = messagingClient.GetClient() as ClientConfig;
producer = new ProducerBuilder<Null, String>(_clientConfig)
.Build();
}

public CancellationTokenSource GetCancellationTokenSource()
{
throw new NotImplementedException();
return null;
}

public Task<Guid> SendAsync(T tEntity, string key = "")
public async Task<Guid> SendAsync(T tEntity, string key = "")
{
throw new NotImplementedException();
try
{
var dr = await producer.ProduceAsync(_messagingProducerConfiguration.Topic, new Message<Null, String> { Value = tEntity.ToJson() });
// return new MessageId(dr.Key, dr.TopicPartition.Partition.Value);
return Guid.NewGuid();
}
catch (ProduceException<Null, String>)
{
return Guid.Empty;
}
}
}
}

0 comments on commit 571a1d6

Please sign in to comment.