Skip to content

Commit

Permalink
Fix for reconnect of TransactionMetaStoreHandler #285
Browse files Browse the repository at this point in the history
  • Loading branch information
Lanayx committed Dec 9, 2024
1 parent 18c68d4 commit d5edb19
Show file tree
Hide file tree
Showing 10 changed files with 31 additions and 25 deletions.
2 changes: 1 addition & 1 deletion examples/CsharpExamples/CsharpExamples.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="8.0.0" />
<PackageReference Include="OpenTelemetry.Exporter.Console" Version="1.7.0" />
<PackageReference Include="OpenTelemetry" Version="1.7.0" />
<PackageReference Include="Pulsar.Client" Version="3.1.0" />
<PackageReference Include="Pulsar.Client" Version="3.6.1" />
<PackageReference Include="Pulsar.Client.Otel" Version="0.1.2" />
</ItemGroup>
</Project>
2 changes: 1 addition & 1 deletion examples/FsharpExamples/FsharpExamples.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="8.0.0" />
<PackageReference Include="OpenTelemetry.Exporter.Console" Version="1.7.0" />
<PackageReference Include="OpenTelemetry" Version="1.7.0" />
<PackageReference Include="Pulsar.Client" Version="3.1.0" />
<PackageReference Include="Pulsar.Client" Version="3.6.1" />
<PackageReference Include="Pulsar.Client.Otel" Version="0.1.2" />
</ItemGroup>
<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion src/Pulsar.Client/Common/MessageId.fs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type MessageId =
data.FirstChunkMessageId <- chunkMsgIds.[0].GetMessageIdData()
| _ ->
()
use stream = MemoryStreamManager.GetStream()
use stream = MemoryStreamManager.GetStream("ToByteArray")
Serializer.Serialize(stream, data)
stream.ToArray()
static member FromByteArray (data: byte[]) =
Expand Down
4 changes: 2 additions & 2 deletions src/Pulsar.Client/Internal/BatchMessageContainer.fs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ type internal DefaultBatchMessageContainer<'T>(prefix: string, config: ProducerC
this.IsBatchFull()
override this.CreateOpSendMsg () =
let lowestSequenceId = batchItems[0].SequenceId
let stream = MemoryStreamManager.GetStream()
let stream = MemoryStreamManager.GetStream("DefaultBatcher")
let highestSequenceId = batchItems[batchItems.Count - 1].SequenceId
{
OpSendMsg = makeBatch stream batchItems
Expand Down Expand Up @@ -184,7 +184,7 @@ type internal KeyBasedBatchMessageContainer<'T>(prefix: string, config: Producer
override this.CreateOpSendMsgs () =
keyBatchItems
|> Seq.map (fun (KeyValue(_, batchItems)) ->
let stream = MemoryStreamManager.GetStream()
let stream = MemoryStreamManager.GetStream("KeyBasedBatcher")
let lowestSequenceId = batchItems[0].SequenceId
let highestSequenceId = batchItems[batchItems.Count - 1].SequenceId
{
Expand Down
18 changes: 9 additions & 9 deletions src/Pulsar.Client/Internal/Compression.fs
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,23 @@ module internal CompressionCodec =

type ZLibCompression() =
let zlibEncode (payload: MemoryStream) =
let ms = MemoryStreamManager.GetStream()
let ms = MemoryStreamManager.GetStream("zlibEncode")
let zlib = new ZOutputStream(ms, zlibConst.Z_DEFAULT_COMPRESSION)
zlib.FlushMode <- zlibConst.Z_SYNC_FLUSH
zlib.Write(payload.ToArray(), 0, int payload.Length)
payload.Dispose()
ms

let zlibDecode (uncompressedSize: int) (payload: MemoryStream) =
let ms = MemoryStreamManager.GetStream(null, uncompressedSize)
let ms = MemoryStreamManager.GetStream("zlibDecode", uncompressedSize)
let zlib = new ZOutputStream(ms)
zlib.FlushMode <- zlibConst.Z_SYNC_FLUSH
zlib.Write(payload.ToArray(), 0, int payload.Length)
payload.Dispose()
ms

let zlibDecodeBytes (uncompressedSize: int) (bytes : byte[]) payloadLength =
use ms = MemoryStreamManager.GetStream(null, uncompressedSize)
use ms = MemoryStreamManager.GetStream("zlibDecodeBytes", uncompressedSize)
use zlib = new ZOutputStream(ms)
zlib.FlushMode <- zlibConst.Z_SYNC_FLUSH
zlib.Write(bytes, 0, payloadLength)
Expand All @@ -54,7 +54,7 @@ module internal CompressionCodec =
let sourceSpan = payload.ToArray().AsSpan()
let targetSpan = target.AsSpan()
let count = LZ4Codec.Encode(sourceSpan, targetSpan)
let ms = MemoryStreamManager.GetStream()
let ms = MemoryStreamManager.GetStream("lz4Encode")
ms.Write(targetSpan.Slice(0, count))
ms
finally
Expand All @@ -64,7 +64,7 @@ module internal CompressionCodec =
let target: byte[] = uncompressedSize |> ArrayPool.Shared.Rent
try
LZ4Codec.Decode(payload.ToArray(), 0, int payload.Length, target, 0, uncompressedSize) |> ignore
let ms = MemoryStreamManager.GetStream(null, uncompressedSize)
let ms = MemoryStreamManager.GetStream("lz4Decode", uncompressedSize)
ms.Write(target, 0, uncompressedSize)
ms
finally
Expand All @@ -84,7 +84,7 @@ module internal CompressionCodec =
let sourceArray = payload.ToArray()
let targetSpan = target.AsSpan()
let count = Snappy.Compress(sourceArray, targetSpan)
let ms = MemoryStreamManager.GetStream()
let ms = MemoryStreamManager.GetStream("SnappyEncode")
ms.Write(targetSpan.Slice(0, count))
ms
finally
Expand All @@ -94,7 +94,7 @@ module internal CompressionCodec =
let target: byte[] = uncompressedSize |> ArrayPool.Shared.Rent
try
Snappy.Decompress(payload.ToArray(), target) |> ignore
let ms = MemoryStreamManager.GetStream(null, uncompressedSize)
let ms = MemoryStreamManager.GetStream("SnappyDecode", uncompressedSize)
ms.Write(target, 0, uncompressedSize)
ms
finally
Expand All @@ -113,7 +113,7 @@ module internal CompressionCodec =
try
let sourceSpan = payload.ToArray().AsSpan()
let count = zstdCompressor.Wrap(sourceSpan, target)
let ms = MemoryStreamManager.GetStream()
let ms = MemoryStreamManager.GetStream("ZstdDecode")
ms.Write(target.AsSpan(0, count))
ms
finally
Expand All @@ -126,7 +126,7 @@ module internal CompressionCodec =
try
let sourceSpan = payload.ToArray().AsSpan()
zstdDecompressor.Unwrap(sourceSpan, target, false) |> ignore
let ms = MemoryStreamManager.GetStream(null, uncompressedSize)
let ms = MemoryStreamManager.GetStream("ZstdDecode", uncompressedSize)
ms.Write(target, 0, uncompressedSize)
ms
finally
Expand Down
4 changes: 2 additions & 2 deletions src/Pulsar.Client/Internal/ProducerImpl.fs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c

let verifyIfLocalBufferIsCorrupted (msg: PendingMessage<'T>) =
backgroundTask {
use stream = MemoryStreamManager.GetStream()
use stream = MemoryStreamManager.GetStream("VerifyBuffer")
use reader = new BinaryReader(stream)
let struct(send, _) = msg.SendTask
let writer = PipeWriter.Create(stream, StreamPipeWriterOptions(leaveOpen = true))
Expand Down Expand Up @@ -590,7 +590,7 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c
let nonRetriableError = ex |> PulsarClientException.isRetriableError |> not
let timeout = Stopwatch.GetElapsedTime(createProducerStartTime) > clientConfig.OperationTimeout
if ((nonRetriableError || timeout) && producerCreatedTsc.TrySetException(ex)) then
Log.Logger.LogInformation("{0} creation failed {1}", prefix,
Log.Logger.LogInformation("{0} connection failed {1}", prefix,
if nonRetriableError then "with unretriableError" else "after timeout")
connectionHandler.Failed()
stopProducer()
Expand Down
12 changes: 9 additions & 3 deletions src/Pulsar.Client/Internal/TransactionMetaStoreHandler.fs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type internal TransactionMetaStoreHandler(clientConfig: PulsarClientConfiguratio
let pendingRequests = Dictionary<RequestId, TaskCompletionSource<TxnRequest>>()
let blockedRequests = Queue<TransactionMetaStoreMessage>()
let timeoutQueue = Queue<TransRequestTime>()
let createTmsHandlerStartTime = Stopwatch.GetTimestamp()

let connectionHandler =
ConnectionHandler(prefix,
Expand Down Expand Up @@ -143,9 +144,14 @@ type internal TransactionMetaStoreHandler(clientConfig: PulsarClientConfiguratio

| TransactionMetaStoreMessage.ConnectionFailed ex ->

Log.Logger.LogError(ex, "{0} connection failed.", prefix)
connectionHandler.Failed()
transactionCoordinatorCreatedTsc.TrySetException(ex) |> ignore
Log.Logger.LogDebug("{0} ConnectionFailed", prefix)
let nonRetriableError = ex |> PulsarClientException.isRetriableError |> not
let timeout = Stopwatch.GetElapsedTime(createTmsHandlerStartTime) > clientConfig.OperationTimeout
if ((nonRetriableError || timeout) && transactionCoordinatorCreatedTsc.TrySetException(ex)) then
Log.Logger.LogInformation("{0} connection failed {1}", prefix,
if nonRetriableError then "with unretriableError" else "after timeout")
connectionHandler.Failed()
continueLoop <- false

| TransactionMetaStoreMessage.ConnectionClosed clientCnx ->

Expand Down
6 changes: 3 additions & 3 deletions src/Pulsar.Client/Pulsar.Client.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@
<Title>Pulsar.Client</Title>
<RootNamespace>Pulsar.Client</RootNamespace>
<AssemblyName>Pulsar.Client</AssemblyName>
<Version>3.6.1</Version>
<Version>3.6.2</Version>
<Company>F# community</Company>
<Description>.NET client library for Apache Pulsar</Description>
<RepositoryUrl>https://github.com/fsprojects/pulsar-client-dotnet</RepositoryUrl>
<PackageReleaseNotes>Fix for resending pending producer messages</PackageReleaseNotes>
<PackageReleaseNotes>Fix for reconnect of TransactionMetaStoreHandler</PackageReleaseNotes>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
<PackageProjectUrl>https://github.com/fsprojects/pulsar-client-dotnet</PackageProjectUrl>
<RepositoryType>git</RepositoryType>
<PackageTags>pulsar</PackageTags>
<Authors>F# community</Authors>
<PackageVersion>3.6.1</PackageVersion>
<PackageVersion>3.6.2</PackageVersion>
<DebugType>portable</DebugType>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<PackageReadmeFile>README.md</PackageReadmeFile>
Expand Down
2 changes: 1 addition & 1 deletion src/Pulsar.Client/Schema/AvroSchema.fs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type internal AvroSchema<'T> private (schema: Schema, avroReader: DatumReader<'T
override this.Encode value =
if parameterIsClass && (isNull <| box value) then
raise <| SchemaSerializationException "Need Non-Null content value"
use stream = MemoryStreamManager.GetStream()
use stream = MemoryStreamManager.GetStream("AvroEncode")
avroWriter.Write(value, BinaryEncoder(stream))
stream.ToArray()
override this.Decode bytes =
Expand Down
4 changes: 2 additions & 2 deletions src/Pulsar.Client/Schema/ProtobufNativeSchema.fs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type internal ProtoBufNativeSchema<'T > () =
set.Add protoFileName |> ignore
set.Process()

use stream = MemoryStreamManager.GetStream()
use stream = MemoryStreamManager.GetStream("ProtobufGetDescriptor")
Serializer.Serialize(stream, set)

ProtobufNativeSchemaData (stream.ToArray (), userClassNamespace + "." + userClassName, protoFileName)
Expand All @@ -82,7 +82,7 @@ type internal ProtoBufNativeSchema<'T > () =
override this.Encode value =
if parameterIsClass && (isNull <| box value) then
raise <| SchemaSerializationException "Need Non-Null content value"
use stream = MemoryStreamManager.GetStream()
use stream = MemoryStreamManager.GetStream("ProtobufEncode")
Serializer.Serialize(stream, value)
stream.ToArray()

Expand Down

0 comments on commit d5edb19

Please sign in to comment.