Skip to content

Commit

Permalink
[otlp] Refactor shared protobuf otlp export client code into a base c…
Browse files Browse the repository at this point in the history
…lass (#6001)
  • Loading branch information
CodeBlanch authored Nov 27, 2024
1 parent 88d2ad6 commit 7eeddf5
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 127 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#if NETFRAMEWORK
using System.Net.Http;
#endif
using System.Net.Http.Headers;
using OpenTelemetry.Internal;

namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;

internal abstract class ProtobufOtlpExportClient : IProtobufExportClient
{
private static readonly Version Http2RequestVersion = new(2, 0);

#if NET
private static readonly bool SynchronousSendSupportedByCurrentPlatform;

static ProtobufOtlpExportClient()
{
#if NET
// See: https://github.com/dotnet/runtime/blob/280f2a0c60ce0378b8db49adc0eecc463d00fe5d/src/libraries/System.Net.Http/src/System/Net/Http/HttpClientHandler.AnyMobile.cs#L767
SynchronousSendSupportedByCurrentPlatform = !OperatingSystem.IsAndroid()
&& !OperatingSystem.IsIOS()
&& !OperatingSystem.IsTvOS()
&& !OperatingSystem.IsBrowser();
#endif
}
#endif

protected ProtobufOtlpExportClient(OtlpExporterOptions options, HttpClient httpClient, string signalPath)
{
Guard.ThrowIfNull(options);
Guard.ThrowIfNull(httpClient);
Guard.ThrowIfNull(signalPath);

Uri exporterEndpoint = options.Endpoint.AppendPathIfNotPresent(signalPath);
this.Endpoint = new UriBuilder(exporterEndpoint).Uri;
this.Headers = options.GetHeaders<Dictionary<string, string>>((d, k, v) => d.Add(k, v));
this.HttpClient = httpClient;
}

internal HttpClient HttpClient { get; }

internal Uri Endpoint { get; }

internal IReadOnlyDictionary<string, string> Headers { get; }

internal abstract MediaTypeHeaderValue MediaTypeHeader { get; }

internal virtual bool RequireHttp2 => false;

public abstract ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, DateTime deadlineUtc, CancellationToken cancellationToken = default);

/// <inheritdoc/>
public bool Shutdown(int timeoutMilliseconds)
{
this.HttpClient.CancelPendingRequests();
return true;
}

protected HttpRequestMessage CreateHttpRequest(byte[] buffer, int contentLength)
{
var request = new HttpRequestMessage(HttpMethod.Post, this.Endpoint);

if (this.RequireHttp2)
{
request.Version = Http2RequestVersion;

#if NET6_0_OR_GREATER
request.VersionPolicy = HttpVersionPolicy.RequestVersionExact;
#endif
}

foreach (var header in this.Headers)
{
request.Headers.Add(header.Key, header.Value);
}

// TODO: Support compression.

request.Content = new ByteArrayContent(buffer, 0, contentLength);
request.Content.Headers.ContentType = this.MediaTypeHeader;

return request;
}

protected HttpResponseMessage SendHttpRequest(HttpRequestMessage request, CancellationToken cancellationToken)
{
#if NET
// Note: SendAsync must be used with HTTP/2 because synchronous send is
// not supported.
return this.RequireHttp2 || !SynchronousSendSupportedByCurrentPlatform
? this.HttpClient.SendAsync(request, cancellationToken).GetAwaiter().GetResult()
: this.HttpClient.Send(request, cancellationToken);
#else
return this.HttpClient.SendAsync(request, cancellationToken).GetAwaiter().GetResult();
#endif
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,15 @@
#endif
using System.Net.Http.Headers;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient.Grpc;
using OpenTelemetry.Internal;

namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;

/// <summary>Base class for sending OTLP export request over gRPC.</summary>
internal sealed class ProtobufOtlpGrpcExportClient : IProtobufExportClient
internal sealed class ProtobufOtlpGrpcExportClient : ProtobufOtlpExportClient
{
public const string GrpcStatusDetailsHeader = "grpc-status-details-bin";
private static readonly ExportClientHttpResponse SuccessExportResponse = new(success: true, deadlineUtc: default, response: null, exception: null);
private static readonly MediaTypeHeaderValue MediaHeaderValue = new("application/grpc");
private static readonly Version Http2RequestVersion = new(2, 0);

private static readonly ExportClientGrpcResponse DefaultExceptionExportClientGrpcResponse
= new(
Expand All @@ -27,49 +25,34 @@ private static readonly ExportClientGrpcResponse DefaultExceptionExportClientGrp
grpcStatusDetailsHeader: null);

public ProtobufOtlpGrpcExportClient(OtlpExporterOptions options, HttpClient httpClient, string signalPath)
: base(options, httpClient, signalPath)
{
Guard.ThrowIfNull(options);
Guard.ThrowIfNull(httpClient);
Guard.ThrowIfNull(signalPath);
Guard.ThrowIfInvalidTimeout(options.TimeoutMilliseconds);

Uri exporterEndpoint = options.Endpoint.AppendPathIfNotPresent(signalPath);
this.Endpoint = new UriBuilder(exporterEndpoint).Uri;
this.Headers = options.GetHeaders<Dictionary<string, string>>((d, k, v) => d.Add(k, v));
this.HttpClient = httpClient;
}

internal HttpClient HttpClient { get; }
internal override MediaTypeHeaderValue MediaTypeHeader => MediaHeaderValue;

internal Uri Endpoint { get; set; }

internal IReadOnlyDictionary<string, string> Headers { get; }

internal int TimeoutMilliseconds { get; }
internal override bool RequireHttp2 => true;

/// <inheritdoc/>
public ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, DateTime deadlineUtc, CancellationToken cancellationToken = default)
public override ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, DateTime deadlineUtc, CancellationToken cancellationToken = default)
{
try
{
using var httpRequest = this.CreateHttpRequest(buffer, contentLength);
using var httpResponse = this.SendHttpRequest(httpRequest, cancellationToken);

try
{
httpResponse.EnsureSuccessStatusCode();
}
catch (HttpRequestException)
{
throw;
}
httpResponse.EnsureSuccessStatusCode();

var trailingHeaders = httpResponse.TrailingHeaders();
Status status = GrpcProtocolHelpers.GetResponseStatus(httpResponse, trailingHeaders);

if (status.Detail.Equals(Status.NoReplyDetailMessage))
{
#if NET
using var responseStream = httpResponse.Content.ReadAsStream(cancellationToken);
#else
using var responseStream = httpResponse.Content.ReadAsStreamAsync().GetAwaiter().GetResult();
#endif
int firstByte = responseStream.ReadByte();

if (firstByte == -1)
Expand Down Expand Up @@ -170,45 +153,11 @@ public ExportClientResponse SendExportRequest(byte[] buffer, int contentLength,
}
}

public HttpRequestMessage CreateHttpRequest(byte[] buffer, int contentLength)
{
var request = new HttpRequestMessage(HttpMethod.Post, this.Endpoint);
request.Version = Http2RequestVersion;

#if NET6_0_OR_GREATER
request.VersionPolicy = HttpVersionPolicy.RequestVersionExact;
#endif

foreach (var header in this.Headers)
{
request.Headers.Add(header.Key, header.Value);
}

// TODO: Support compression.

request.Content = new ByteArrayContent(buffer, 0, contentLength);
request.Content.Headers.ContentType = MediaHeaderValue;

return request;
}

public HttpResponseMessage SendHttpRequest(HttpRequestMessage request, CancellationToken cancellationToken)
{
return this.HttpClient.SendAsync(request, cancellationToken).GetAwaiter().GetResult();
}

/// <inheritdoc/>
public bool Shutdown(int timeoutMilliseconds)
{
this.HttpClient.CancelPendingRequests();
return true;
}

private static bool IsTransientNetworkError(HttpRequestException ex)
{
return ex.InnerException is System.Net.Sockets.SocketException socketEx &&
(socketEx.SocketErrorCode == System.Net.Sockets.SocketError.TimedOut ||
socketEx.SocketErrorCode == System.Net.Sockets.SocketError.ConnectionReset ||
socketEx.SocketErrorCode == System.Net.Sockets.SocketError.HostUnreachable);
return ex.InnerException is System.Net.Sockets.SocketException socketEx
&& (socketEx.SocketErrorCode == System.Net.Sockets.SocketError.TimedOut
|| socketEx.SocketErrorCode == System.Net.Sockets.SocketError.ConnectionReset
|| socketEx.SocketErrorCode == System.Net.Sockets.SocketError.HostUnreachable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,48 +5,24 @@
using System.Net.Http;
#endif
using System.Net.Http.Headers;
using OpenTelemetry.Internal;

namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;

/// <summary>Class for sending OTLP trace export request over HTTP.</summary>
internal sealed class ProtobufOtlpHttpExportClient : IProtobufExportClient
internal sealed class ProtobufOtlpHttpExportClient : ProtobufOtlpExportClient
{
private static readonly MediaTypeHeaderValue MediaHeaderValue = new("application/x-protobuf");
private static readonly ExportClientHttpResponse SuccessExportResponse = new(success: true, deadlineUtc: default, response: null, exception: null);
#if NET
private readonly bool synchronousSendSupportedByCurrentPlatform;
#endif

internal ProtobufOtlpHttpExportClient(OtlpExporterOptions options, HttpClient httpClient, string signalPath)
: base(options, httpClient, signalPath)
{
Guard.ThrowIfNull(options);
Guard.ThrowIfNull(httpClient);
Guard.ThrowIfNull(signalPath);
Guard.ThrowIfInvalidTimeout(options.TimeoutMilliseconds);

Uri exporterEndpoint = options.Endpoint.AppendPathIfNotPresent(signalPath);
this.Endpoint = new UriBuilder(exporterEndpoint).Uri;
this.Headers = options.GetHeaders<Dictionary<string, string>>((d, k, v) => d.Add(k, v));
this.HttpClient = httpClient;

#if NET
// See: https://github.com/dotnet/runtime/blob/280f2a0c60ce0378b8db49adc0eecc463d00fe5d/src/libraries/System.Net.Http/src/System/Net/Http/HttpClientHandler.AnyMobile.cs#L767
this.synchronousSendSupportedByCurrentPlatform = !OperatingSystem.IsAndroid()
&& !OperatingSystem.IsIOS()
&& !OperatingSystem.IsTvOS()
&& !OperatingSystem.IsBrowser();
#endif
}

internal HttpClient HttpClient { get; }

internal Uri Endpoint { get; set; }

internal IReadOnlyDictionary<string, string> Headers { get; }
internal override MediaTypeHeaderValue MediaTypeHeader => MediaHeaderValue;

/// <inheritdoc/>
public ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, DateTime deadlineUtc, CancellationToken cancellationToken = default)
public override ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, DateTime deadlineUtc, CancellationToken cancellationToken = default)
{
try
{
Expand All @@ -71,38 +47,4 @@ public ExportClientResponse SendExportRequest(byte[] buffer, int contentLength,
return new ExportClientHttpResponse(success: false, deadlineUtc: deadlineUtc, response: null, exception: ex);
}
}

/// <inheritdoc/>
public bool Shutdown(int timeoutMilliseconds)
{
this.HttpClient.CancelPendingRequests();
return true;
}

public HttpRequestMessage CreateHttpRequest(byte[] exportRequest, int contentLength)
{
var request = new HttpRequestMessage(HttpMethod.Post, this.Endpoint);

foreach (var header in this.Headers)
{
request.Headers.Add(header.Key, header.Value);
}

var content = new ByteArrayContent(exportRequest, 0, contentLength);
content.Headers.ContentType = MediaHeaderValue;
request.Content = content;

return request;
}

public HttpResponseMessage SendHttpRequest(HttpRequestMessage request, CancellationToken cancellationToken)
{
#if NET
return this.synchronousSendSupportedByCurrentPlatform
? this.HttpClient.Send(request, cancellationToken)
: this.HttpClient.SendAsync(request, cancellationToken).GetAwaiter().GetResult();
#else
return this.HttpClient.SendAsync(request, cancellationToken).GetAwaiter().GetResult();
#endif
}
}

0 comments on commit 7eeddf5

Please sign in to comment.