diff --git a/examples/CsharpExamples/CsharpExamples.csproj b/examples/CsharpExamples/CsharpExamples.csproj index 9a69413d..7eb03f4d 100755 --- a/examples/CsharpExamples/CsharpExamples.csproj +++ b/examples/CsharpExamples/CsharpExamples.csproj @@ -8,7 +8,7 @@ - + \ No newline at end of file diff --git a/examples/FsharpExamples/FsharpExamples.fsproj b/examples/FsharpExamples/FsharpExamples.fsproj index f81ebcf6..70d2b71c 100755 --- a/examples/FsharpExamples/FsharpExamples.fsproj +++ b/examples/FsharpExamples/FsharpExamples.fsproj @@ -19,7 +19,7 @@ - + diff --git a/src/Pulsar.Client/Common/MessageId.fs b/src/Pulsar.Client/Common/MessageId.fs index d20305c2..60526d4f 100644 --- a/src/Pulsar.Client/Common/MessageId.fs +++ b/src/Pulsar.Client/Common/MessageId.fs @@ -66,7 +66,7 @@ type MessageId = data.FirstChunkMessageId <- chunkMsgIds.[0].GetMessageIdData() | _ -> () - use stream = MemoryStreamManager.GetStream() + use stream = MemoryStreamManager.GetStream("ToByteArray") Serializer.Serialize(stream, data) stream.ToArray() static member FromByteArray (data: byte[]) = diff --git a/src/Pulsar.Client/Internal/BatchMessageContainer.fs b/src/Pulsar.Client/Internal/BatchMessageContainer.fs index 2a00179a..74487e8f 100644 --- a/src/Pulsar.Client/Internal/BatchMessageContainer.fs +++ b/src/Pulsar.Client/Internal/BatchMessageContainer.fs @@ -121,7 +121,7 @@ type internal DefaultBatchMessageContainer<'T>(prefix: string, config: ProducerC this.IsBatchFull() override this.CreateOpSendMsg () = let lowestSequenceId = batchItems[0].SequenceId - let stream = MemoryStreamManager.GetStream() + let stream = MemoryStreamManager.GetStream("DefaultBatcher") let highestSequenceId = batchItems[batchItems.Count - 1].SequenceId { OpSendMsg = makeBatch stream batchItems @@ -184,7 +184,7 @@ type internal KeyBasedBatchMessageContainer<'T>(prefix: string, config: Producer override this.CreateOpSendMsgs () = keyBatchItems |> Seq.map (fun (KeyValue(_, batchItems)) -> - let stream = MemoryStreamManager.GetStream() + let stream = MemoryStreamManager.GetStream("KeyBasedBatcher") let lowestSequenceId = batchItems[0].SequenceId let highestSequenceId = batchItems[batchItems.Count - 1].SequenceId { diff --git a/src/Pulsar.Client/Internal/Compression.fs b/src/Pulsar.Client/Internal/Compression.fs index 7ead82bf..489f9652 100644 --- a/src/Pulsar.Client/Internal/Compression.fs +++ b/src/Pulsar.Client/Internal/Compression.fs @@ -19,7 +19,7 @@ module internal CompressionCodec = type ZLibCompression() = let zlibEncode (payload: MemoryStream) = - let ms = MemoryStreamManager.GetStream() + let ms = MemoryStreamManager.GetStream("zlibEncode") let zlib = new ZOutputStream(ms, zlibConst.Z_DEFAULT_COMPRESSION) zlib.FlushMode <- zlibConst.Z_SYNC_FLUSH zlib.Write(payload.ToArray(), 0, int payload.Length) @@ -27,7 +27,7 @@ module internal CompressionCodec = ms let zlibDecode (uncompressedSize: int) (payload: MemoryStream) = - let ms = MemoryStreamManager.GetStream(null, uncompressedSize) + let ms = MemoryStreamManager.GetStream("zlibDecode", uncompressedSize) let zlib = new ZOutputStream(ms) zlib.FlushMode <- zlibConst.Z_SYNC_FLUSH zlib.Write(payload.ToArray(), 0, int payload.Length) @@ -35,7 +35,7 @@ module internal CompressionCodec = ms let zlibDecodeBytes (uncompressedSize: int) (bytes : byte[]) payloadLength = - use ms = MemoryStreamManager.GetStream(null, uncompressedSize) + use ms = MemoryStreamManager.GetStream("zlibDecodeBytes", uncompressedSize) use zlib = new ZOutputStream(ms) zlib.FlushMode <- zlibConst.Z_SYNC_FLUSH zlib.Write(bytes, 0, payloadLength) @@ -54,7 +54,7 @@ module internal CompressionCodec = let sourceSpan = payload.ToArray().AsSpan() let targetSpan = target.AsSpan() let count = LZ4Codec.Encode(sourceSpan, targetSpan) - let ms = MemoryStreamManager.GetStream() + let ms = MemoryStreamManager.GetStream("lz4Encode") ms.Write(targetSpan.Slice(0, count)) ms finally @@ -64,7 +64,7 @@ module internal CompressionCodec = let target: byte[] = uncompressedSize |> ArrayPool.Shared.Rent try LZ4Codec.Decode(payload.ToArray(), 0, int payload.Length, target, 0, uncompressedSize) |> ignore - let ms = MemoryStreamManager.GetStream(null, uncompressedSize) + let ms = MemoryStreamManager.GetStream("lz4Decode", uncompressedSize) ms.Write(target, 0, uncompressedSize) ms finally @@ -84,7 +84,7 @@ module internal CompressionCodec = let sourceArray = payload.ToArray() let targetSpan = target.AsSpan() let count = Snappy.Compress(sourceArray, targetSpan) - let ms = MemoryStreamManager.GetStream() + let ms = MemoryStreamManager.GetStream("SnappyEncode") ms.Write(targetSpan.Slice(0, count)) ms finally @@ -94,7 +94,7 @@ module internal CompressionCodec = let target: byte[] = uncompressedSize |> ArrayPool.Shared.Rent try Snappy.Decompress(payload.ToArray(), target) |> ignore - let ms = MemoryStreamManager.GetStream(null, uncompressedSize) + let ms = MemoryStreamManager.GetStream("SnappyDecode", uncompressedSize) ms.Write(target, 0, uncompressedSize) ms finally @@ -113,7 +113,7 @@ module internal CompressionCodec = try let sourceSpan = payload.ToArray().AsSpan() let count = zstdCompressor.Wrap(sourceSpan, target) - let ms = MemoryStreamManager.GetStream() + let ms = MemoryStreamManager.GetStream("ZstdDecode") ms.Write(target.AsSpan(0, count)) ms finally @@ -126,7 +126,7 @@ module internal CompressionCodec = try let sourceSpan = payload.ToArray().AsSpan() zstdDecompressor.Unwrap(sourceSpan, target, false) |> ignore - let ms = MemoryStreamManager.GetStream(null, uncompressedSize) + let ms = MemoryStreamManager.GetStream("ZstdDecode", uncompressedSize) ms.Write(target, 0, uncompressedSize) ms finally diff --git a/src/Pulsar.Client/Internal/ProducerImpl.fs b/src/Pulsar.Client/Internal/ProducerImpl.fs index 21571d86..cf8ad250 100644 --- a/src/Pulsar.Client/Internal/ProducerImpl.fs +++ b/src/Pulsar.Client/Internal/ProducerImpl.fs @@ -222,7 +222,7 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c let verifyIfLocalBufferIsCorrupted (msg: PendingMessage<'T>) = backgroundTask { - use stream = MemoryStreamManager.GetStream() + use stream = MemoryStreamManager.GetStream("VerifyBuffer") use reader = new BinaryReader(stream) let struct(send, _) = msg.SendTask let writer = PipeWriter.Create(stream, StreamPipeWriterOptions(leaveOpen = true)) @@ -590,7 +590,7 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c let nonRetriableError = ex |> PulsarClientException.isRetriableError |> not let timeout = Stopwatch.GetElapsedTime(createProducerStartTime) > clientConfig.OperationTimeout if ((nonRetriableError || timeout) && producerCreatedTsc.TrySetException(ex)) then - Log.Logger.LogInformation("{0} creation failed {1}", prefix, + Log.Logger.LogInformation("{0} connection failed {1}", prefix, if nonRetriableError then "with unretriableError" else "after timeout") connectionHandler.Failed() stopProducer() diff --git a/src/Pulsar.Client/Internal/TransactionMetaStoreHandler.fs b/src/Pulsar.Client/Internal/TransactionMetaStoreHandler.fs index 5f7a9d93..b5a88c5a 100644 --- a/src/Pulsar.Client/Internal/TransactionMetaStoreHandler.fs +++ b/src/Pulsar.Client/Internal/TransactionMetaStoreHandler.fs @@ -55,6 +55,7 @@ type internal TransactionMetaStoreHandler(clientConfig: PulsarClientConfiguratio let pendingRequests = Dictionary>() let blockedRequests = Queue() let timeoutQueue = Queue() + let createTmsHandlerStartTime = Stopwatch.GetTimestamp() let connectionHandler = ConnectionHandler(prefix, @@ -143,9 +144,14 @@ type internal TransactionMetaStoreHandler(clientConfig: PulsarClientConfiguratio | TransactionMetaStoreMessage.ConnectionFailed ex -> - Log.Logger.LogError(ex, "{0} connection failed.", prefix) - connectionHandler.Failed() - transactionCoordinatorCreatedTsc.TrySetException(ex) |> ignore + Log.Logger.LogDebug("{0} ConnectionFailed", prefix) + let nonRetriableError = ex |> PulsarClientException.isRetriableError |> not + let timeout = Stopwatch.GetElapsedTime(createTmsHandlerStartTime) > clientConfig.OperationTimeout + if ((nonRetriableError || timeout) && transactionCoordinatorCreatedTsc.TrySetException(ex)) then + Log.Logger.LogInformation("{0} connection failed {1}", prefix, + if nonRetriableError then "with unretriableError" else "after timeout") + connectionHandler.Failed() + continueLoop <- false | TransactionMetaStoreMessage.ConnectionClosed clientCnx -> diff --git a/src/Pulsar.Client/Pulsar.Client.fsproj b/src/Pulsar.Client/Pulsar.Client.fsproj index 806395fd..d14a17a5 100644 --- a/src/Pulsar.Client/Pulsar.Client.fsproj +++ b/src/Pulsar.Client/Pulsar.Client.fsproj @@ -7,17 +7,17 @@ Pulsar.Client Pulsar.Client Pulsar.Client - 3.6.1 + 3.6.2 F# community .NET client library for Apache Pulsar https://github.com/fsprojects/pulsar-client-dotnet - Fix for resending pending producer messages + Fix for reconnect of TransactionMetaStoreHandler MIT https://github.com/fsprojects/pulsar-client-dotnet git pulsar F# community - 3.6.1 + 3.6.2 portable true README.md diff --git a/src/Pulsar.Client/Schema/AvroSchema.fs b/src/Pulsar.Client/Schema/AvroSchema.fs index 0f23dd2f..682cd562 100644 --- a/src/Pulsar.Client/Schema/AvroSchema.fs +++ b/src/Pulsar.Client/Schema/AvroSchema.fs @@ -43,7 +43,7 @@ type internal AvroSchema<'T> private (schema: Schema, avroReader: DatumReader<'T override this.Encode value = if parameterIsClass && (isNull <| box value) then raise <| SchemaSerializationException "Need Non-Null content value" - use stream = MemoryStreamManager.GetStream() + use stream = MemoryStreamManager.GetStream("AvroEncode") avroWriter.Write(value, BinaryEncoder(stream)) stream.ToArray() override this.Decode bytes = diff --git a/src/Pulsar.Client/Schema/ProtobufNativeSchema.fs b/src/Pulsar.Client/Schema/ProtobufNativeSchema.fs index 1c4645c1..cc20dc3d 100644 --- a/src/Pulsar.Client/Schema/ProtobufNativeSchema.fs +++ b/src/Pulsar.Client/Schema/ProtobufNativeSchema.fs @@ -64,7 +64,7 @@ type internal ProtoBufNativeSchema<'T > () = set.Add protoFileName |> ignore set.Process() - use stream = MemoryStreamManager.GetStream() + use stream = MemoryStreamManager.GetStream("ProtobufGetDescriptor") Serializer.Serialize(stream, set) ProtobufNativeSchemaData (stream.ToArray (), userClassNamespace + "." + userClassName, protoFileName) @@ -82,7 +82,7 @@ type internal ProtoBufNativeSchema<'T > () = override this.Encode value = if parameterIsClass && (isNull <| box value) then raise <| SchemaSerializationException "Need Non-Null content value" - use stream = MemoryStreamManager.GetStream() + use stream = MemoryStreamManager.GetStream("ProtobufEncode") Serializer.Serialize(stream, value) stream.ToArray()