Skip to content

Commit

Permalink
Changed several API properties to async methods to avoid deadlocks #247
Browse files Browse the repository at this point in the history
  • Loading branch information
Lanayx committed Feb 1, 2024
1 parent d7d32d7 commit 438f0bb
Show file tree
Hide file tree
Showing 13 changed files with 134 additions and 128 deletions.
5 changes: 5 additions & 0 deletions global.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"sdk": {
"version": "8.0.101"
}
}
12 changes: 6 additions & 6 deletions src/Pulsar.Client/Api/IConsumer.fs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type IConsumer<'T> =
abstract member BatchReceiveAsync: CancellationToken -> Task<Messages<'T>>
/// Asynchronously acknowledge the consumption of a single message
abstract member AcknowledgeAsync: messageId:MessageId -> Task<unit>
/// Asynchronously acknowledge the consumption of a single message, it will store in pending ack.
/// Asynchronously acknowledge the consumption of a single message, it will store in pending ack.
/// After the transaction commit, the message will actually ack.
/// After the transaction abort, the message will be redelivered.
abstract member AcknowledgeAsync: messageId:MessageId * txn:Transaction -> Task<unit>
Expand Down Expand Up @@ -48,7 +48,7 @@ type IConsumer<'T> =
/// Unsubscribes consumer
abstract member UnsubscribeAsync: unit -> Task<unit>
/// Return true if the topic was terminated and this consumer has already consumed all the messages in the topic.
abstract member HasReachedEndOfTopic: bool
abstract member HasReachedEndOfTopic: unit -> Task<bool>
/// Reset the subscription associated with this consumer to a specific message id.
abstract member SeekAsync: messageId:MessageId -> Task<unit>
/// Reset the subscription associated with this consumer to a specific message publish time (unix timestamp).
Expand All @@ -68,14 +68,14 @@ type IConsumer<'T> =
/// Get the consumer name
abstract member Name: string
/// Get statistics for the consumer.
abstract member GetStatsAsync: unit -> Task<ConsumerStats>
abstract member GetStats: unit -> Task<ConsumerStats>
/// ReconsumeLater the consumption of Message
abstract member ReconsumeLaterAsync: message:Message<'T> * deliverAt:TimeStamp -> Task<unit>
/// ReconsumeLater the consumption of Messages
abstract member ReconsumeLaterAsync: messages:Messages<'T> * deliverAt:TimeStamp -> Task<unit>
/// ReconsumeLater the reception of all the messages in the stream up to (and including) the provided message.
abstract member ReconsumeLaterCumulativeAsync: message:Message<'T> * deliverAt:TimeStamp -> Task<unit>
/// The last disconnected timestamp of the consumer abstract member LastDisconnected: DateTime
abstract member LastDisconnectedTimestamp: TimeStamp
/// The last disconnected timestamp of the consumer
abstract member LastDisconnectedTimestamp: unit -> Task<TimeStamp>
/// Return true if the consumer is connected to the broker
abstract member IsConnected: bool
abstract member IsConnected: unit -> Task<bool>
10 changes: 5 additions & 5 deletions src/Pulsar.Client/Api/IProducer.fs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ type IProducer<'T> =
/// Internal client producer id
abstract member ProducerId: ProducerId
/// Get the topic which producer is publishing to
abstract member Topic: string
abstract member Topic: string
/// Get statistics for the producer.
abstract member GetStatsAsync: unit -> Task<ProducerStats>
abstract member GetStats: unit -> Task<ProducerStats>
/// <summary>
/// Constructs <see cref="Pulsar.Client.Common.MessageBuilder" />
/// </summary>
Expand Down Expand Up @@ -68,10 +68,10 @@ type IProducer<'T> =
/// or custom sequence id that was published and acknowledged by the broker.
/// After recreating a producer with the same producer name, this will return the last message that was
/// published in the previous producer session, or -1 if there no message was ever published.
abstract member LastSequenceId : SequenceId
abstract member LastSequenceId: unit -> Task<SequenceId>
/// Get the producer name
abstract member Name: string
/// The last disconnected timestamp of the producer
abstract member LastDisconnectedTimestamp: TimeStamp
abstract member LastDisconnectedTimestamp: unit -> Task<TimeStamp>
/// Return true if the consumer is connected to the broker
abstract member IsConnected: bool
abstract member IsConnected: unit -> Task<bool>
8 changes: 4 additions & 4 deletions src/Pulsar.Client/Api/IReader.fs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ type IReader<'T> =
abstract member ReadNextAsync: CancellationToken -> Task<Message<'T>>
/// Reset the subscription associated with this consumer to a specific message id.
abstract member SeekAsync : messageId:MessageId -> Task<unit>
/// Reset the subscription associated with this consumer to a specific message publish time (Unix timestamp in ms).
/// Reset the subscription associated with this consumer to a specific message publish time (Unix timestamp in ms).
abstract member SeekAsync : timestamp:TimeStamp -> Task<unit>
/// Reset the subscription associated with this consumer to a specific message id or publish time (unix timestamp), returned by resolver function.
abstract member SeekAsync: resolver: Func<string, SeekType> -> Task<unit>
abstract member SeekAsync: resolver: Func<string, SeekType> -> Task<unit>
/// Return true if the topic was terminated and this consumer has already consumed all the messages in the topic.
abstract member HasReachedEndOfTopic: bool
abstract member HasReachedEndOfTopic: unit -> Task<bool>
/// Check if there is any message available to read from the current position.
abstract member HasMessageAvailableAsync: unit -> Task<bool>
/// Get a topic for the reader
abstract member Topic: string
/// Return true if the reader is connected to the broker
abstract member IsConnected: bool
abstract member IsConnected: unit -> Task<bool>
14 changes: 7 additions & 7 deletions src/Pulsar.Client/Internal/ConsumerImpl.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1582,7 +1582,7 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
connectionHandler.CheckIfActive() |> throwIfNotNull
postAndAsyncReply mb ConsumerMessage.Unsubscribe

member this.HasReachedEndOfTopic = hasReachedEndOfTopic
member this.HasReachedEndOfTopic() = hasReachedEndOfTopic |> Task.FromResult

member this.NegativeAcknowledge msgId =
connectionHandler.CheckIfActive() |> throwIfNotNull
Expand All @@ -1603,7 +1603,7 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien

member this.Name = consumerName

member this.GetStatsAsync() =
member this.GetStats() =
backgroundTask {
return!
match connectionHandler.ConnectionState with
Expand Down Expand Up @@ -1650,13 +1650,13 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
}


member this.LastDisconnectedTimestamp =
connectionHandler.LastDisconnectedTimestamp
member this.LastDisconnectedTimestamp() =
connectionHandler.LastDisconnectedTimestamp |> Task.FromResult

member this.IsConnected =
member this.IsConnected() =
match connectionHandler.ConnectionState with
| Ready _ -> true
| _ -> false
| Ready _ -> trueTask
| _ -> falseTask


interface IAsyncDisposable with
Expand Down
27 changes: 14 additions & 13 deletions src/Pulsar.Client/Internal/MultiTopicsConsumerImpl.fs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ type internal MultiTopicsConsumerImpl<'T> (consumerConfig: ConsumerConfiguration
let consumerImp = consumer :> IConsumer<'T>
fun () ->
backgroundTask {
if consumerImp.HasReachedEndOfTopic then
let! hasReachedEndOfTopic = consumerImp.HasReachedEndOfTopic()
if hasReachedEndOfTopic then
Log.Logger.LogWarning("{0} topic was terminated", topic)
do! Task.Delay(Timeout.Infinite) // infinite delay for terminated topic
let! message = consumer.ReceiveWrappedAsync(CancellationToken.None)
Expand Down Expand Up @@ -797,22 +798,22 @@ type internal MultiTopicsConsumerImpl<'T> (consumerConfig: ConsumerConfiguration

Log.Logger.LogDebug("{0} HasReachedEndOfTheTopic", prefix)
consumers
|> Seq.forall (fun (KeyValue(_, (consumer, _))) -> consumer.HasReachedEndOfTopic)
|> Seq.forall (fun (KeyValue(_, (consumer, _))) -> consumer.HasReachedEndOfTopic().Result)
|> channel.SetResult

| LastDisconnectedTimestamp channel ->

Log.Logger.LogDebug("{0} LastDisconnectedTimestamp", prefix)
consumers
|> Seq.map (fun (KeyValue(_, (consumer, _))) -> consumer.LastDisconnectedTimestamp)
|> Seq.map (fun (KeyValue(_, (consumer, _))) -> consumer.LastDisconnectedTimestamp().Result)
|> Seq.max
|> channel.SetResult

| IsConnected channel ->

Log.Logger.LogDebug("{0} IsConnected", prefix)
consumers
|> Seq.forall (fun (KeyValue(_, (consumer, _))) -> consumer.IsConnected)
|> Seq.forall (fun (KeyValue(_, (consumer, _))) -> consumer.IsConnected().Result)
|> channel.SetResult

| Seek (seekData, channel) ->
Expand Down Expand Up @@ -892,7 +893,7 @@ type internal MultiTopicsConsumerImpl<'T> (consumerConfig: ConsumerConfiguration
try
let! statsTask =
consumers
|> Seq.map (fun (KeyValue(_, (consumer, _))) -> consumer.GetStatsAsync())
|> Seq.map (fun (KeyValue(_, (consumer, _))) -> consumer.GetStats())
|> Task.WhenAll
channel.SetResult(statsTask)
with Flatten ex ->
Expand Down Expand Up @@ -1119,8 +1120,8 @@ type internal MultiTopicsConsumerImpl<'T> (consumerConfig: ConsumerConfiguration
member this.UnsubscribeAsync() =
postAndAsyncReply mb Unsubscribe

member this.HasReachedEndOfTopic =
(postAndAsyncReply mb HasReachedEndOfTheTopic).Result
member this.HasReachedEndOfTopic() =
postAndAsyncReply mb HasReachedEndOfTheTopic

member this.NegativeAcknowledge msgId =
postAndAsyncReply mb (fun channel -> NegativeAcknowledge(channel, msgId))
Expand All @@ -1137,7 +1138,7 @@ type internal MultiTopicsConsumerImpl<'T> (consumerConfig: ConsumerConfiguration

member this.Name = consumerName

member this.GetStatsAsync() =
member this.GetStats() =
backgroundTask {
let! allStats = postAndAsyncReply mb GetStats
return allStats |> statsReduce
Expand All @@ -1161,10 +1162,10 @@ type internal MultiTopicsConsumerImpl<'T> (consumerConfig: ConsumerConfiguration
do! postAndAsyncReply mb (fun channel -> ReconsumeLater(msg, deliverAt, channel))
}

member this.LastDisconnectedTimestamp =
(postAndAsyncReply mb LastDisconnectedTimestamp).Result
member this.IsConnected =
(postAndAsyncReply mb IsConnected).Result
member this.LastDisconnectedTimestamp() =
postAndAsyncReply mb LastDisconnectedTimestamp
member this.IsConnected() =
postAndAsyncReply mb IsConnected

interface IAsyncDisposable with

Expand Down
10 changes: 5 additions & 5 deletions src/Pulsar.Client/Internal/MultiTopicsReaderImpl.fs
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,17 @@ type internal MultiTopicsReaderImpl<'T> private (readerConfig: ReaderConfigurati
member this.SeekAsync (resolver: Func<string, SeekType>) : Task<Unit> =
castedConsumer.SeekAsync(resolver)

member this.HasReachedEndOfTopic with get() =
castedConsumer.HasReachedEndOfTopic
member this.HasReachedEndOfTopic() =
castedConsumer.HasReachedEndOfTopic()

member this.HasMessageAvailableAsync() =
consumer.HasMessageAvailableAsync()

member this.Topic with get() =
member this.Topic =
castedConsumer.Topic

member this.IsConnected with get() =
castedConsumer.IsConnected
member this.IsConnected() =
castedConsumer.IsConnected()

interface IAsyncDisposable with

Expand Down
20 changes: 10 additions & 10 deletions src/Pulsar.Client/Internal/PartitionedProducerImpl.fs
Original file line number Diff line number Diff line change
Expand Up @@ -186,25 +186,25 @@ type internal PartitionedProducerImpl<'T> private (producerConfig: ProducerConfi

Log.Logger.LogDebug("{0} LastSequenceId", prefix)
producers
|> Seq.map (fun producer -> producer.LastSequenceId)
|> Seq.map (fun producer -> producer.LastSequenceId().Result)
|> Seq.max
|> channel.SetResult

| LastDisconnectedTimestamp channel ->

Log.Logger.LogDebug("{0} LastDisconnectedTimestamp", prefix)
producers
|> Seq.map (fun producer -> producer.LastDisconnectedTimestamp)
|> Seq.map (fun producer -> producer.LastDisconnectedTimestamp().Result)
|> Seq.max
|> channel.SetResult

| IsConnected channel ->

Log.Logger.LogDebug("{0} IsConnected", prefix)
producers
|> Seq.forall (fun producer -> producer.IsConnected)
|> Seq.forall (fun producer -> producer.IsConnected().Result)
|> channel.SetResult

| Close channel ->

match this.ConnectionState with
Expand Down Expand Up @@ -280,7 +280,7 @@ type internal PartitionedProducerImpl<'T> private (producerConfig: ProducerConfi

let! stats =
producers
|> Seq.map(fun p -> p.GetStatsAsync())
|> Seq.map(fun p -> p.GetStats())
|> Task.WhenAll
channel.SetResult(statsReduce stats)
}:> Task).ContinueWith(fun t ->
Expand Down Expand Up @@ -375,15 +375,15 @@ type internal PartitionedProducerImpl<'T> private (producerConfig: ProducerConfi

member this.Topic = %producerConfig.Topic.CompleteTopicName

member this.LastSequenceId = (postAndAsyncReply mb LastSequenceId).Result
member this.LastSequenceId() = postAndAsyncReply mb LastSequenceId

member this.Name = producerConfig.ProducerName

member this.GetStatsAsync() = postAndAsyncReply mb GetStats
member this.GetStats() = postAndAsyncReply mb GetStats

member this.LastDisconnectedTimestamp() = postAndAsyncReply mb LastDisconnectedTimestamp

member this.LastDisconnectedTimestamp = (postAndAsyncReply mb LastDisconnectedTimestamp).Result

member this.IsConnected = (postAndAsyncReply mb IsConnected).Result
member this.IsConnected() = postAndAsyncReply mb IsConnected


interface IAsyncDisposable with
Expand Down
14 changes: 7 additions & 7 deletions src/Pulsar.Client/Internal/ProducerImpl.fs
Original file line number Diff line number Diff line change
Expand Up @@ -918,19 +918,19 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c

member this.Topic = %producerConfig.Topic.CompleteTopicName

member this.LastSequenceId = %Interlocked.Read(&lastSequenceIdPublished)
member this.LastSequenceId() = %Interlocked.Read(&lastSequenceIdPublished) |> Task.FromResult

member this.Name = producerName

member this.GetStatsAsync() =
member this.GetStats() =
postAndAsyncReply mb ProducerMessage.GetStats

member this.LastDisconnectedTimestamp =
connectionHandler.LastDisconnectedTimestamp
member this.IsConnected =
member this.LastDisconnectedTimestamp() =
connectionHandler.LastDisconnectedTimestamp |> Task.FromResult
member this.IsConnected() =
match connectionHandler.ConnectionState with
| Ready _ -> true
| _ -> false
| Ready _ -> trueTask
| _ -> falseTask

interface IAsyncDisposable with

Expand Down
10 changes: 5 additions & 5 deletions src/Pulsar.Client/Internal/ReaderImpl.fs
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,17 @@ type internal ReaderImpl<'T> private (readerConfig: ReaderConfiguration, clientC
member this.SeekAsync (resolver: Func<string, SeekType>) : Task<Unit> =
castedConsumer.SeekAsync(resolver)

member this.HasReachedEndOfTopic with get() =
castedConsumer.HasReachedEndOfTopic
member this.HasReachedEndOfTopic() =
castedConsumer.HasReachedEndOfTopic()

member this.HasMessageAvailableAsync() =
consumer.HasMessageAvailableAsync()

member this.Topic with get() =
member this.Topic =
castedConsumer.Topic

member this.IsConnected with get() =
castedConsumer.IsConnected
member this.IsConnected() =
castedConsumer.IsConnected()

interface IAsyncDisposable with

Expand Down
6 changes: 3 additions & 3 deletions src/Pulsar.Client/Pulsar.Client.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@
<Title>Pulsar.Client</Title>
<RootNamespace>Pulsar.Client</RootNamespace>
<AssemblyName>Pulsar.Client</AssemblyName>
<Version>3.1.0</Version>
<Version>3.2.0</Version>
<Company>F# community</Company>
<Description>.NET client library for Apache Pulsar</Description>
<RepositoryUrl>https://github.com/fsprojects/pulsar-client-dotnet</RepositoryUrl>
<PackageReleaseNotes>Support for IsConnected property</PackageReleaseNotes>
<PackageReleaseNotes>Changed several API properties to async methods to avoid deadlocks</PackageReleaseNotes>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
<PackageProjectUrl>https://github.com/fsprojects/pulsar-client-dotnet</PackageProjectUrl>
<RepositoryType>git</RepositoryType>
<PackageTags>pulsar</PackageTags>
<Authors>F# community</Authors>
<PackageVersion>3.1.0</PackageVersion>
<PackageVersion>3.2.0</PackageVersion>
<DebugType>portable</DebugType>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<PackageReadmeFile>README.md</PackageReadmeFile>
Expand Down
Loading

0 comments on commit 438f0bb

Please sign in to comment.