Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: client side metrics handlers #924

Open
wants to merge 11 commits into
base: client_side_metrics_data_model
Choose a base branch
from
10 changes: 10 additions & 0 deletions google/cloud/bigtable/data/_metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from google.cloud.bigtable.data._metrics.handlers.opentelemetry import (
OpenTelemetryMetricsHandler,
)
from google.cloud.bigtable.data._metrics.handlers.gcp_exporter import (
GoogleCloudMetricsHandler,
)
from google.cloud.bigtable.data._metrics.handlers._stdout import _StdoutMetricsHandler
from google.cloud.bigtable.data._metrics.metrics_controller import (
BigtableClientSideMetricsController,
)
Expand All @@ -20,6 +27,9 @@

__all__ = (
"BigtableClientSideMetricsController",
"OpenTelemetryMetricsHandler",
"GoogleCloudMetricsHandler",
"_StdoutMetricsHandler",
"OperationType",
"ActiveOperationMetric",
)
48 changes: 48 additions & 0 deletions google/cloud/bigtable/data/_metrics/handlers/_stdout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from google.cloud.bigtable.data._metrics.handlers._base import MetricsHandler
from google.cloud.bigtable.data._metrics.data_model import CompletedOperationMetric


class _StdoutMetricsHandler(MetricsHandler):
"""
Prints a table of metric data after each operation, for debugging purposes.
"""

def __init__(self, **kwargs):
self._completed_ops = {}

def on_operation_complete(self, op: CompletedOperationMetric) -> None:
"""
After each operation, update the state and print the metrics table.
"""
current_list = self._completed_ops.setdefault(op.op_type, [])
current_list.append(op)
self.print()

def print(self):
"""
Print the current state of the metrics table.
"""
print("Bigtable Metrics:")
for ops_type, ops_list in self._completed_ops.items():
count = len(ops_list)
total_latency = sum([op.duration for op in ops_list])
total_attempts = sum([len(op.completed_attempts) for op in ops_list])
avg_latency = total_latency / count
avg_attempts = total_attempts / count
print(
f"{ops_type}: count: {count}, avg latency: {avg_latency:.2f}, avg attempts: {avg_attempts:.1f}"
)
print()
268 changes: 268 additions & 0 deletions google/cloud/bigtable/data/_metrics/handlers/gcp_exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

import time

from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics import view
from opentelemetry.sdk.metrics.export import (
HistogramDataPoint,
MetricExporter,
MetricExportResult,
MetricsData,
NumberDataPoint,
PeriodicExportingMetricReader,
)
from google.protobuf.timestamp_pb2 import Timestamp
from google.api.distribution_pb2 import Distribution
from google.api.metric_pb2 import Metric as GMetric
from google.api.monitored_resource_pb2 import MonitoredResource
from google.api.metric_pb2 import MetricDescriptor
from google.api_core import gapic_v1
from google.cloud.monitoring_v3 import (
CreateTimeSeriesRequest,
MetricServiceClient,
Point,
TimeInterval,
TimeSeries,
TypedValue,
)

from google.cloud.bigtable.data._metrics.handlers.opentelemetry import (
OpenTelemetryMetricsHandler,
)
from google.cloud.bigtable.data._metrics.handlers.opentelemetry import (
_OpenTelemetryInstruments,
)


# create OpenTelemetry views for Bigtable metrics
# avoid reformatting into individual lines
# fmt: off
MILLIS_AGGREGATION = view.ExplicitBucketHistogramAggregation(
[
0, 0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16,
20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400,
500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We updated the buckets in java:

      ImmutableList.of(
          0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 8.0, 10.0, 13.0, 16.0, 20.0, 25.0, 30.0, 40.0,
          50.0, 65.0, 80.0, 100.0, 130.0, 160.0, 200.0, 250.0, 300.0, 400.0, 500.0, 650.0,
          800.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0, 100000.0, 200000.0,
          400000.0, 800000.0, 1600000.0, 3200000.0));

I think 100k is too small

]
)
# fmt: on
COUNT_AGGREGATION = view.SumAggregation()
INSTRUMENT_NAMES = (
"operation_latencies",
"first_response_latencies",
"attempt_latencies",
"retry_count",
"server_latencies",
"connectivity_error_count",
"application_latencies",
"throttling_latencies",
)
VIEW_LIST = [
view.View(
instrument_name=n,
name=n,
aggregation=MILLIS_AGGREGATION
if n.endswith("latencies")
else COUNT_AGGREGATION,
)
for n in INSTRUMENT_NAMES
]


class GoogleCloudMetricsHandler(OpenTelemetryMetricsHandler):
"""
Maintains an internal set of OpenTelemetry metrics for the Bigtable client library,
and periodically exports them to Google Cloud Monitoring.

The OpenTelemetry metrics that are tracked are as follows:
- operation_latencies: latency of each client method call, over all of it's attempts.
- first_response_latencies: latency of receiving the first row in a ReadRows operation.
- attempt_latencies: latency of each client attempt RPC.
- retry_count: Number of additional RPCs sent after the initial attempt.
- server_latencies: latency recorded on the server side for each attempt.
- connectivity_error_count: number of attempts that failed to reach Google's network.
- application_latencies: the time spent waiting for the application to process the next response.
- throttling_latencies: latency introduced by waiting when there are too many outstanding requests in a bulk operation.

Args:
- project_id: The Google Cloud project ID for the associated Bigtable Table
- export_interval: The interval (in seconds) at which to export metrics to Cloud Monitoring.
"""

def __init__(self, *args, project_id: str, export_interval=60, **kwargs):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the minimum interval we can publish metrics to cloud monitoring is 60 seconds. And I don't think we want to update with larger intervals. So let's remove this option?

# internal exporter to write metrics to Cloud Monitoring
exporter = _BigtableMetricsExporter(project_id=project_id)
# periodically executes exporter
gcp_reader = PeriodicExportingMetricReader(
exporter, export_interval_millis=export_interval * 1000
)
# use private meter provider to store instruments and views
meter_provider = MeterProvider(metric_readers=[gcp_reader], views=VIEW_LIST)
otel = _OpenTelemetryInstruments(meter_provider=meter_provider)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how would a customer provide their own otel instrumentation? (this could be in a follow up PR)

In java we let customers override it in the settings and pass the otel instance down. see the description in googleapis/java-bigtable#1796

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The client holds a controller that manages a set of handlers. Users can add their own OpenTelemetryHandler to send metrics to a different MeterProvider if they want

I'll have to write up some documentation for this at some point

super().__init__(*args, instruments=otel, project_id=project_id, **kwargs)


class _BigtableMetricsExporter(MetricExporter):
"""
OpenTelemetry Exporter implementation for sending metrics to Google Cloud Monitoring.

We must use a custom exporter because the public one doesn't support writing to internal
metrics like `bigtable.googleapis.com/internal/client/`

Each GoogleCloudMetricsHandler will maintain its own exporter instance associated with the
project_id it is configured with.

Args:
- project_id: GCP project id to associate metrics with
"""

def __init__(self, project_id: str):
super().__init__()
self.client = MetricServiceClient()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to configure retry attempt number for create_service_time_series method? what's the default? In java we are not retrying the method, I think because republishing could lead to errors.

Copy link
Contributor Author

@daniel-sanche daniel-sanche Feb 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I just looked at the gapic code, and it looks like this rpc defaults to no retries.

We can add them if we want, but if we export every 60 seconds, maybe we should add back to the queue for the next batch? (This may actually be the default for OpenTelemetry too, I'd have to look into it. Right now we just return a MetricExportResult.FAILURE, I'm not sure what happens to the failed metrics)

Would republishing lead to errors here too?

self.prefix = "bigtable.googleapis.com/internal/client"
self.project_name = self.client.common_project_path(project_id)

def export(
self, metrics_data: MetricsData, timeout_millis: float = 10_000, **kwargs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just pass in second? :) so we don't need to convert it back to seconds on line 145 later :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is part of the superclass method definition unfortunately. We inherit from opentelemetry.sdk.metrics.export.MetricExporter

) -> MetricExportResult:
"""
Write a set of metrics to Cloud Monitoring.
This method is called by the OpenTelemetry SDK
"""
deadline = time.time() + (timeout_millis / 1000)
metric_kind = MetricDescriptor.MetricKind.CUMULATIVE
all_series: list[TimeSeries] = []
# process each metric from OTel format into Cloud Monitoring format
for resource_metric in metrics_data.resource_metrics:
for scope_metric in resource_metric.scope_metrics:
for metric in scope_metric.metrics:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to filter only bigtable related metrics? To avoid people from publishing irrelevant metrics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This exporter is attached to a private MeterProvider, so there shouldn't be any other metrics showing up here

for data_point in [
pt for pt in metric.data.data_points if pt.attributes
]:
if data_point.attributes:
project_id = data_point.attributes.get("resource_project")
if not isinstance(project_id, str):
# we expect string for project_id field
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe log a warning? Maybe something like: "malformatted resource_project x. Skip publishing"

continue
monitored_resource = MonitoredResource(
type="bigtable_client_raw",
labels={
"project_id": project_id,
"instance": data_point.attributes[
"resource_instance"
],
"cluster": data_point.attributes[
"resource_cluster"
],
"table": data_point.attributes["resource_table"],
"zone": data_point.attributes["resource_zone"],
},
)
point = self._to_point(data_point)
series = TimeSeries(
resource=monitored_resource,
metric_kind=metric_kind,
points=[point],
metric=GMetric(
type=f"{self.prefix}/{metric.name}",
labels={
k: v
for k, v in data_point.attributes.items()
if not k.startswith("resource_")
},
),
unit=metric.unit,
)
all_series.append(series)
# send all metrics to Cloud Monitoring
try:
self._batch_write(all_series, deadline)
return MetricExportResult.SUCCESS
except Exception:
return MetricExportResult.FAILURE

def _batch_write(
self, series: list[TimeSeries], deadline=None, max_batch_size=200
) -> None:
"""
Adapted from CloudMonitoringMetricsExporter
https://github.com/GoogleCloudPlatform/opentelemetry-operations-python/blob/3668dfe7ce3b80dd01f42af72428de957b58b316/opentelemetry-exporter-gcp-monitoring/src/opentelemetry/exporter/cloud_monitoring/__init__.py#L82

Args:
- series: list of TimeSeries to write. Will be split into batches if necessary
- deadline: designates the time.time() at which to stop writing. If None, uses API default
- max_batch_size: maximum number of time series to write at once.
Cloud Monitoring allows up to 200 per request
"""
write_ind = 0
while write_ind < len(series):
# find time left for next batch
timeout = deadline - time.time() if deadline else gapic_v1.method.DEFAULT
# write next batch
self.client.create_service_time_series(
CreateTimeSeriesRequest(
name=self.project_name,
time_series=series[write_ind : write_ind + max_batch_size],
),
timeout=timeout,
)
write_ind += max_batch_size

@staticmethod
def _to_point(data_point: NumberDataPoint | HistogramDataPoint) -> Point:
"""
Adapted from CloudMonitoringMetricsExporter
https://github.com/GoogleCloudPlatform/opentelemetry-operations-python/blob/3668dfe7ce3b80dd01f42af72428de957b58b316/opentelemetry-exporter-gcp-monitoring/src/opentelemetry/exporter/cloud_monitoring/__init__.py#L82
"""
if isinstance(data_point, HistogramDataPoint):
mean = data_point.sum / data_point.count if data_point.count else 0.0
point_value = TypedValue(
distribution_value=Distribution(
count=data_point.count,
mean=mean,
bucket_counts=data_point.bucket_counts,
bucket_options=Distribution.BucketOptions(
explicit_buckets=Distribution.BucketOptions.Explicit(
bounds=data_point.explicit_bounds,
)
),
)
)
else:
if isinstance(data_point.value, int):
point_value = TypedValue(int64_value=data_point.value)
else:
point_value = TypedValue(double_value=data_point.value)
start_time = Timestamp()
start_time.FromNanoseconds(data_point.start_time_unix_nano)
end_time = Timestamp()
end_time.FromNanoseconds(data_point.time_unix_nano)
interval = TimeInterval(start_time=start_time, end_time=end_time)
return Point(interval=interval, value=point_value)

def shutdown(self, timeout_millis: float = 30_000, **kwargs):
"""
Adapted from CloudMonitoringMetricsExporter
https://github.com/GoogleCloudPlatform/opentelemetry-operations-python/blob/3668dfe7ce3b80dd01f42af72428de957b58b316/opentelemetry-exporter-gcp-monitoring/src/opentelemetry/exporter/cloud_monitoring/__init__.py#L82
"""
pass

def force_flush(self, timeout_millis: float = 10_000):
"""
Adapted from CloudMonitoringMetricsExporter
https://github.com/GoogleCloudPlatform/opentelemetry-operations-python/blob/3668dfe7ce3b80dd01f42af72428de957b58b316/opentelemetry-exporter-gcp-monitoring/src/opentelemetry/exporter/cloud_monitoring/__init__.py#L82
"""
return True
Loading
Loading