Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[otlp] Refactor shared protobuf otlp export client code into a base class #6001

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#if NETFRAMEWORK
using System.Net.Http;
#endif
using System.Net.Http.Headers;
using OpenTelemetry.Internal;

namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;

internal abstract class ProtobufOtlpExportClient : IProtobufExportClient
{
private static readonly Version Http2RequestVersion = new(2, 0);

#if NET
private static readonly bool SynchronousSendSupportedByCurrentPlatform;

static ProtobufOtlpExportClient()
{
#if NET
// See: https://github.com/dotnet/runtime/blob/280f2a0c60ce0378b8db49adc0eecc463d00fe5d/src/libraries/System.Net.Http/src/System/Net/Http/HttpClientHandler.AnyMobile.cs#L767
SynchronousSendSupportedByCurrentPlatform = !OperatingSystem.IsAndroid()
&& !OperatingSystem.IsIOS()
&& !OperatingSystem.IsTvOS()
&& !OperatingSystem.IsBrowser();
#endif
}
#endif

protected ProtobufOtlpExportClient(OtlpExporterOptions options, HttpClient httpClient, string signalPath)
{
Guard.ThrowIfNull(options);
Guard.ThrowIfNull(httpClient);
Guard.ThrowIfNull(signalPath);

Uri exporterEndpoint = options.Endpoint.AppendPathIfNotPresent(signalPath);
this.Endpoint = new UriBuilder(exporterEndpoint).Uri;
this.Headers = options.GetHeaders<Dictionary<string, string>>((d, k, v) => d.Add(k, v));
this.HttpClient = httpClient;
}

internal HttpClient HttpClient { get; }

internal Uri Endpoint { get; }

internal IReadOnlyDictionary<string, string> Headers { get; }

internal abstract MediaTypeHeaderValue MediaTypeHeader { get; }

internal virtual bool RequireHttp2 => false;

public abstract ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, DateTime deadlineUtc, CancellationToken cancellationToken = default);

/// <inheritdoc/>
public bool Shutdown(int timeoutMilliseconds)
{
this.HttpClient.CancelPendingRequests();
return true;
}

protected HttpRequestMessage CreateHttpRequest(byte[] buffer, int contentLength)
{
var request = new HttpRequestMessage(HttpMethod.Post, this.Endpoint);

if (this.RequireHttp2)
{
request.Version = Http2RequestVersion;

#if NET6_0_OR_GREATER
request.VersionPolicy = HttpVersionPolicy.RequestVersionExact;
#endif
}

foreach (var header in this.Headers)
{
request.Headers.Add(header.Key, header.Value);
}

// TODO: Support compression.

request.Content = new ByteArrayContent(buffer, 0, contentLength);
request.Content.Headers.ContentType = this.MediaTypeHeader;

return request;
}

protected HttpResponseMessage SendHttpRequest(HttpRequestMessage request, CancellationToken cancellationToken)
{
#if NET
// Note: SendAsync must be used with HTTP/2 because synchronous send is
// not supported.
return this.RequireHttp2 || !SynchronousSendSupportedByCurrentPlatform
? this.HttpClient.SendAsync(request, cancellationToken).GetAwaiter().GetResult()
: this.HttpClient.Send(request, cancellationToken);
#else
return this.HttpClient.SendAsync(request, cancellationToken).GetAwaiter().GetResult();
#endif
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,15 @@
#endif
using System.Net.Http.Headers;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient.Grpc;
using OpenTelemetry.Internal;

namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;

/// <summary>Base class for sending OTLP export request over gRPC.</summary>
internal sealed class ProtobufOtlpGrpcExportClient : IProtobufExportClient
internal sealed class ProtobufOtlpGrpcExportClient : ProtobufOtlpExportClient
{
public const string GrpcStatusDetailsHeader = "grpc-status-details-bin";
private static readonly ExportClientHttpResponse SuccessExportResponse = new(success: true, deadlineUtc: default, response: null, exception: null);
private static readonly MediaTypeHeaderValue MediaHeaderValue = new("application/grpc");
private static readonly Version Http2RequestVersion = new(2, 0);

private static readonly ExportClientGrpcResponse DefaultExceptionExportClientGrpcResponse
= new(
Expand All @@ -27,49 +25,34 @@ private static readonly ExportClientGrpcResponse DefaultExceptionExportClientGrp
grpcStatusDetailsHeader: null);

public ProtobufOtlpGrpcExportClient(OtlpExporterOptions options, HttpClient httpClient, string signalPath)
: base(options, httpClient, signalPath)
{
Guard.ThrowIfNull(options);
Guard.ThrowIfNull(httpClient);
Guard.ThrowIfNull(signalPath);
Guard.ThrowIfInvalidTimeout(options.TimeoutMilliseconds);

Uri exporterEndpoint = options.Endpoint.AppendPathIfNotPresent(signalPath);
this.Endpoint = new UriBuilder(exporterEndpoint).Uri;
this.Headers = options.GetHeaders<Dictionary<string, string>>((d, k, v) => d.Add(k, v));
this.HttpClient = httpClient;
}

internal HttpClient HttpClient { get; }
internal override MediaTypeHeaderValue MediaTypeHeader => MediaHeaderValue;

internal Uri Endpoint { get; set; }

internal IReadOnlyDictionary<string, string> Headers { get; }

internal int TimeoutMilliseconds { get; }
internal override bool RequireHttp2 => true;

/// <inheritdoc/>
public ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, DateTime deadlineUtc, CancellationToken cancellationToken = default)
public override ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, DateTime deadlineUtc, CancellationToken cancellationToken = default)
{
try
{
using var httpRequest = this.CreateHttpRequest(buffer, contentLength);
using var httpResponse = this.SendHttpRequest(httpRequest, cancellationToken);

try
{
httpResponse.EnsureSuccessStatusCode();
}
catch (HttpRequestException)
{
throw;
rajkumar-rangaraj marked this conversation as resolved.
Show resolved Hide resolved
}
httpResponse.EnsureSuccessStatusCode();

var trailingHeaders = httpResponse.TrailingHeaders();
Status status = GrpcProtocolHelpers.GetResponseStatus(httpResponse, trailingHeaders);

if (status.Detail.Equals(Status.NoReplyDetailMessage))
{
#if NET
using var responseStream = httpResponse.Content.ReadAsStream(cancellationToken);
#else
using var responseStream = httpResponse.Content.ReadAsStreamAsync().GetAwaiter().GetResult();
#endif
int firstByte = responseStream.ReadByte();

if (firstByte == -1)
Expand Down Expand Up @@ -170,45 +153,11 @@ public ExportClientResponse SendExportRequest(byte[] buffer, int contentLength,
}
}

public HttpRequestMessage CreateHttpRequest(byte[] buffer, int contentLength)
{
var request = new HttpRequestMessage(HttpMethod.Post, this.Endpoint);
request.Version = Http2RequestVersion;

#if NET6_0_OR_GREATER
request.VersionPolicy = HttpVersionPolicy.RequestVersionExact;
#endif

foreach (var header in this.Headers)
{
request.Headers.Add(header.Key, header.Value);
}

// TODO: Support compression.

request.Content = new ByteArrayContent(buffer, 0, contentLength);
request.Content.Headers.ContentType = MediaHeaderValue;

return request;
}

public HttpResponseMessage SendHttpRequest(HttpRequestMessage request, CancellationToken cancellationToken)
{
return this.HttpClient.SendAsync(request, cancellationToken).GetAwaiter().GetResult();
}

/// <inheritdoc/>
public bool Shutdown(int timeoutMilliseconds)
{
this.HttpClient.CancelPendingRequests();
return true;
}

private static bool IsTransientNetworkError(HttpRequestException ex)
{
return ex.InnerException is System.Net.Sockets.SocketException socketEx &&
(socketEx.SocketErrorCode == System.Net.Sockets.SocketError.TimedOut ||
socketEx.SocketErrorCode == System.Net.Sockets.SocketError.ConnectionReset ||
socketEx.SocketErrorCode == System.Net.Sockets.SocketError.HostUnreachable);
return ex.InnerException is System.Net.Sockets.SocketException socketEx
&& (socketEx.SocketErrorCode == System.Net.Sockets.SocketError.TimedOut
|| socketEx.SocketErrorCode == System.Net.Sockets.SocketError.ConnectionReset
|| socketEx.SocketErrorCode == System.Net.Sockets.SocketError.HostUnreachable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,48 +5,24 @@
using System.Net.Http;
#endif
using System.Net.Http.Headers;
using OpenTelemetry.Internal;

namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;

/// <summary>Class for sending OTLP trace export request over HTTP.</summary>
internal sealed class ProtobufOtlpHttpExportClient : IProtobufExportClient
internal sealed class ProtobufOtlpHttpExportClient : ProtobufOtlpExportClient
{
private static readonly MediaTypeHeaderValue MediaHeaderValue = new("application/x-protobuf");
private static readonly ExportClientHttpResponse SuccessExportResponse = new(success: true, deadlineUtc: default, response: null, exception: null);
#if NET
private readonly bool synchronousSendSupportedByCurrentPlatform;
#endif

internal ProtobufOtlpHttpExportClient(OtlpExporterOptions options, HttpClient httpClient, string signalPath)
: base(options, httpClient, signalPath)
{
Guard.ThrowIfNull(options);
Guard.ThrowIfNull(httpClient);
Guard.ThrowIfNull(signalPath);
Guard.ThrowIfInvalidTimeout(options.TimeoutMilliseconds);

Uri exporterEndpoint = options.Endpoint.AppendPathIfNotPresent(signalPath);
this.Endpoint = new UriBuilder(exporterEndpoint).Uri;
this.Headers = options.GetHeaders<Dictionary<string, string>>((d, k, v) => d.Add(k, v));
this.HttpClient = httpClient;

#if NET
// See: https://github.com/dotnet/runtime/blob/280f2a0c60ce0378b8db49adc0eecc463d00fe5d/src/libraries/System.Net.Http/src/System/Net/Http/HttpClientHandler.AnyMobile.cs#L767
this.synchronousSendSupportedByCurrentPlatform = !OperatingSystem.IsAndroid()
&& !OperatingSystem.IsIOS()
&& !OperatingSystem.IsTvOS()
&& !OperatingSystem.IsBrowser();
#endif
}

internal HttpClient HttpClient { get; }

internal Uri Endpoint { get; set; }

internal IReadOnlyDictionary<string, string> Headers { get; }
internal override MediaTypeHeaderValue MediaTypeHeader => MediaHeaderValue;

/// <inheritdoc/>
public ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, DateTime deadlineUtc, CancellationToken cancellationToken = default)
public override ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, DateTime deadlineUtc, CancellationToken cancellationToken = default)
{
try
{
Expand All @@ -71,38 +47,4 @@ public ExportClientResponse SendExportRequest(byte[] buffer, int contentLength,
return new ExportClientHttpResponse(success: false, deadlineUtc: deadlineUtc, response: null, exception: ex);
}
}

/// <inheritdoc/>
public bool Shutdown(int timeoutMilliseconds)
{
this.HttpClient.CancelPendingRequests();
return true;
}

public HttpRequestMessage CreateHttpRequest(byte[] exportRequest, int contentLength)
{
var request = new HttpRequestMessage(HttpMethod.Post, this.Endpoint);

foreach (var header in this.Headers)
{
request.Headers.Add(header.Key, header.Value);
}

var content = new ByteArrayContent(exportRequest, 0, contentLength);
content.Headers.ContentType = MediaHeaderValue;
request.Content = content;

return request;
}

public HttpResponseMessage SendHttpRequest(HttpRequestMessage request, CancellationToken cancellationToken)
{
#if NET
return this.synchronousSendSupportedByCurrentPlatform
? this.HttpClient.Send(request, cancellationToken)
: this.HttpClient.SendAsync(request, cancellationToken).GetAwaiter().GetResult();
#else
return this.HttpClient.SendAsync(request, cancellationToken).GetAwaiter().GetResult();
#endif
}
}
Loading