-
Notifications
You must be signed in to change notification settings - Fork 58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: client side metrics handlers #924
base: client_side_metrics_data_model
Are you sure you want to change the base?
Changes from 10 commits
2c57f22
22ed8f8
a381d9b
f62dfe9
f0499ca
5f07f01
4a3f7cb
66212e1
a15a1bc
b38b1b5
987e3df
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
# Copyright 2023 Google LLC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
from google.cloud.bigtable.data._metrics.handlers._base import MetricsHandler | ||
from google.cloud.bigtable.data._metrics.data_model import CompletedOperationMetric | ||
|
||
|
||
class _StdoutMetricsHandler(MetricsHandler): | ||
""" | ||
Prints a table of metric data after each operation, for debugging purposes. | ||
""" | ||
|
||
def __init__(self, **kwargs): | ||
self._completed_ops = {} | ||
|
||
def on_operation_complete(self, op: CompletedOperationMetric) -> None: | ||
""" | ||
After each operation, update the state and print the metrics table. | ||
""" | ||
current_list = self._completed_ops.setdefault(op.op_type, []) | ||
current_list.append(op) | ||
self.print() | ||
|
||
def print(self): | ||
""" | ||
Print the current state of the metrics table. | ||
""" | ||
print("Bigtable Metrics:") | ||
for ops_type, ops_list in self._completed_ops.items(): | ||
count = len(ops_list) | ||
total_latency = sum([op.duration for op in ops_list]) | ||
total_attempts = sum([len(op.completed_attempts) for op in ops_list]) | ||
avg_latency = total_latency / count | ||
avg_attempts = total_attempts / count | ||
print( | ||
f"{ops_type}: count: {count}, avg latency: {avg_latency:.2f}, avg attempts: {avg_attempts:.1f}" | ||
) | ||
print() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,268 @@ | ||
# Copyright 2023 Google LLC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
from __future__ import annotations | ||
|
||
import time | ||
|
||
from opentelemetry.sdk.metrics import MeterProvider | ||
from opentelemetry.sdk.metrics import view | ||
from opentelemetry.sdk.metrics.export import ( | ||
HistogramDataPoint, | ||
MetricExporter, | ||
MetricExportResult, | ||
MetricsData, | ||
NumberDataPoint, | ||
PeriodicExportingMetricReader, | ||
) | ||
from google.protobuf.timestamp_pb2 import Timestamp | ||
from google.api.distribution_pb2 import Distribution | ||
from google.api.metric_pb2 import Metric as GMetric | ||
from google.api.monitored_resource_pb2 import MonitoredResource | ||
from google.api.metric_pb2 import MetricDescriptor | ||
from google.api_core import gapic_v1 | ||
from google.cloud.monitoring_v3 import ( | ||
CreateTimeSeriesRequest, | ||
MetricServiceClient, | ||
Point, | ||
TimeInterval, | ||
TimeSeries, | ||
TypedValue, | ||
) | ||
|
||
from google.cloud.bigtable.data._metrics.handlers.opentelemetry import ( | ||
OpenTelemetryMetricsHandler, | ||
) | ||
from google.cloud.bigtable.data._metrics.handlers.opentelemetry import ( | ||
_OpenTelemetryInstruments, | ||
) | ||
|
||
|
||
# create OpenTelemetry views for Bigtable metrics | ||
# avoid reformatting into individual lines | ||
# fmt: off | ||
MILLIS_AGGREGATION = view.ExplicitBucketHistogramAggregation( | ||
[ | ||
0, 0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, | ||
20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, | ||
500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000, | ||
] | ||
) | ||
# fmt: on | ||
COUNT_AGGREGATION = view.SumAggregation() | ||
INSTRUMENT_NAMES = ( | ||
"operation_latencies", | ||
"first_response_latencies", | ||
"attempt_latencies", | ||
"retry_count", | ||
"server_latencies", | ||
"connectivity_error_count", | ||
"application_latencies", | ||
"throttling_latencies", | ||
) | ||
VIEW_LIST = [ | ||
view.View( | ||
instrument_name=n, | ||
name=n, | ||
aggregation=MILLIS_AGGREGATION | ||
if n.endswith("latencies") | ||
else COUNT_AGGREGATION, | ||
) | ||
for n in INSTRUMENT_NAMES | ||
] | ||
|
||
|
||
class GoogleCloudMetricsHandler(OpenTelemetryMetricsHandler): | ||
""" | ||
Maintains an internal set of OpenTelemetry metrics for the Bigtable client library, | ||
and periodically exports them to Google Cloud Monitoring. | ||
|
||
The OpenTelemetry metrics that are tracked are as follows: | ||
- operation_latencies: latency of each client method call, over all of it's attempts. | ||
- first_response_latencies: latency of receiving the first row in a ReadRows operation. | ||
- attempt_latencies: latency of each client attempt RPC. | ||
- retry_count: Number of additional RPCs sent after the initial attempt. | ||
- server_latencies: latency recorded on the server side for each attempt. | ||
- connectivity_error_count: number of attempts that failed to reach Google's network. | ||
- application_latencies: the time spent waiting for the application to process the next response. | ||
- throttling_latencies: latency introduced by waiting when there are too many outstanding requests in a bulk operation. | ||
|
||
Args: | ||
- project_id: The Google Cloud project ID for the associated Bigtable Table | ||
- export_interval: The interval (in seconds) at which to export metrics to Cloud Monitoring. | ||
""" | ||
|
||
def __init__(self, *args, project_id: str, export_interval=60, **kwargs): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the minimum interval we can publish metrics to cloud monitoring is 60 seconds. And I don't think we want to update with larger intervals. So let's remove this option? |
||
# internal exporter to write metrics to Cloud Monitoring | ||
exporter = _BigtableMetricsExporter(project_id=project_id) | ||
# periodically executes exporter | ||
gcp_reader = PeriodicExportingMetricReader( | ||
exporter, export_interval_millis=export_interval * 1000 | ||
) | ||
# use private meter provider to store instruments and views | ||
meter_provider = MeterProvider(metric_readers=[gcp_reader], views=VIEW_LIST) | ||
otel = _OpenTelemetryInstruments(meter_provider=meter_provider) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how would a customer provide their own otel instrumentation? (this could be in a follow up PR) In java we let customers override it in the settings and pass the otel instance down. see the description in googleapis/java-bigtable#1796 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The client holds a controller that manages a set of handlers. Users can add their own OpenTelemetryHandler to send metrics to a different MeterProvider if they want I'll have to write up some documentation for this at some point |
||
super().__init__(*args, instruments=otel, project_id=project_id, **kwargs) | ||
|
||
|
||
class _BigtableMetricsExporter(MetricExporter): | ||
""" | ||
OpenTelemetry Exporter implementation for sending metrics to Google Cloud Monitoring. | ||
|
||
We must use a custom exporter because the public one doesn't support writing to internal | ||
metrics like `bigtable.googleapis.com/internal/client/` | ||
|
||
Each GoogleCloudMetricsHandler will maintain its own exporter instance associated with the | ||
project_id it is configured with. | ||
|
||
Args: | ||
- project_id: GCP project id to associate metrics with | ||
""" | ||
|
||
def __init__(self, project_id: str): | ||
super().__init__() | ||
self.client = MetricServiceClient() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need to configure retry attempt number for create_service_time_series method? what's the default? In java we are not retrying the method, I think because republishing could lead to errors. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. I just looked at the gapic code, and it looks like this rpc defaults to no retries. We can add them if we want, but if we export every 60 seconds, maybe we should add back to the queue for the next batch? (This may actually be the default for OpenTelemetry too, I'd have to look into it. Right now we just return a Would republishing lead to errors here too? |
||
self.prefix = "bigtable.googleapis.com/internal/client" | ||
self.project_name = self.client.common_project_path(project_id) | ||
|
||
def export( | ||
self, metrics_data: MetricsData, timeout_millis: float = 10_000, **kwargs | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not just pass in second? :) so we don't need to convert it back to seconds on line 145 later :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is part of the superclass method definition unfortunately. We inherit from |
||
) -> MetricExportResult: | ||
""" | ||
Write a set of metrics to Cloud Monitoring. | ||
This method is called by the OpenTelemetry SDK | ||
""" | ||
deadline = time.time() + (timeout_millis / 1000) | ||
metric_kind = MetricDescriptor.MetricKind.CUMULATIVE | ||
all_series: list[TimeSeries] = [] | ||
# process each metric from OTel format into Cloud Monitoring format | ||
for resource_metric in metrics_data.resource_metrics: | ||
for scope_metric in resource_metric.scope_metrics: | ||
for metric in scope_metric.metrics: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a way to filter only bigtable related metrics? To avoid people from publishing irrelevant metrics. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This exporter is attached to a private MeterProvider, so there shouldn't be any other metrics showing up here |
||
for data_point in [ | ||
pt for pt in metric.data.data_points if pt.attributes | ||
]: | ||
if data_point.attributes: | ||
project_id = data_point.attributes.get("resource_project") | ||
if not isinstance(project_id, str): | ||
# we expect string for project_id field | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe log a warning? Maybe something like: "malformatted resource_project x. Skip publishing" |
||
continue | ||
monitored_resource = MonitoredResource( | ||
type="bigtable_client_raw", | ||
labels={ | ||
"project_id": project_id, | ||
"instance": data_point.attributes[ | ||
"resource_instance" | ||
], | ||
"cluster": data_point.attributes[ | ||
"resource_cluster" | ||
], | ||
"table": data_point.attributes["resource_table"], | ||
"zone": data_point.attributes["resource_zone"], | ||
}, | ||
) | ||
point = self._to_point(data_point) | ||
series = TimeSeries( | ||
resource=monitored_resource, | ||
metric_kind=metric_kind, | ||
points=[point], | ||
metric=GMetric( | ||
type=f"{self.prefix}/{metric.name}", | ||
labels={ | ||
k: v | ||
for k, v in data_point.attributes.items() | ||
if not k.startswith("resource_") | ||
}, | ||
), | ||
unit=metric.unit, | ||
) | ||
all_series.append(series) | ||
# send all metrics to Cloud Monitoring | ||
try: | ||
self._batch_write(all_series, deadline) | ||
return MetricExportResult.SUCCESS | ||
except Exception: | ||
return MetricExportResult.FAILURE | ||
|
||
def _batch_write( | ||
self, series: list[TimeSeries], deadline=None, max_batch_size=200 | ||
) -> None: | ||
""" | ||
Adapted from CloudMonitoringMetricsExporter | ||
https://github.com/GoogleCloudPlatform/opentelemetry-operations-python/blob/3668dfe7ce3b80dd01f42af72428de957b58b316/opentelemetry-exporter-gcp-monitoring/src/opentelemetry/exporter/cloud_monitoring/__init__.py#L82 | ||
|
||
Args: | ||
- series: list of TimeSeries to write. Will be split into batches if necessary | ||
- deadline: designates the time.time() at which to stop writing. If None, uses API default | ||
- max_batch_size: maximum number of time series to write at once. | ||
Cloud Monitoring allows up to 200 per request | ||
""" | ||
write_ind = 0 | ||
while write_ind < len(series): | ||
# find time left for next batch | ||
timeout = deadline - time.time() if deadline else gapic_v1.method.DEFAULT | ||
# write next batch | ||
self.client.create_service_time_series( | ||
CreateTimeSeriesRequest( | ||
name=self.project_name, | ||
time_series=series[write_ind : write_ind + max_batch_size], | ||
), | ||
timeout=timeout, | ||
) | ||
write_ind += max_batch_size | ||
|
||
@staticmethod | ||
def _to_point(data_point: NumberDataPoint | HistogramDataPoint) -> Point: | ||
""" | ||
Adapted from CloudMonitoringMetricsExporter | ||
https://github.com/GoogleCloudPlatform/opentelemetry-operations-python/blob/3668dfe7ce3b80dd01f42af72428de957b58b316/opentelemetry-exporter-gcp-monitoring/src/opentelemetry/exporter/cloud_monitoring/__init__.py#L82 | ||
""" | ||
if isinstance(data_point, HistogramDataPoint): | ||
mean = data_point.sum / data_point.count if data_point.count else 0.0 | ||
point_value = TypedValue( | ||
distribution_value=Distribution( | ||
count=data_point.count, | ||
mean=mean, | ||
bucket_counts=data_point.bucket_counts, | ||
bucket_options=Distribution.BucketOptions( | ||
explicit_buckets=Distribution.BucketOptions.Explicit( | ||
bounds=data_point.explicit_bounds, | ||
) | ||
), | ||
) | ||
) | ||
else: | ||
if isinstance(data_point.value, int): | ||
point_value = TypedValue(int64_value=data_point.value) | ||
else: | ||
point_value = TypedValue(double_value=data_point.value) | ||
start_time = Timestamp() | ||
start_time.FromNanoseconds(data_point.start_time_unix_nano) | ||
end_time = Timestamp() | ||
end_time.FromNanoseconds(data_point.time_unix_nano) | ||
interval = TimeInterval(start_time=start_time, end_time=end_time) | ||
return Point(interval=interval, value=point_value) | ||
|
||
def shutdown(self, timeout_millis: float = 30_000, **kwargs): | ||
""" | ||
Adapted from CloudMonitoringMetricsExporter | ||
https://github.com/GoogleCloudPlatform/opentelemetry-operations-python/blob/3668dfe7ce3b80dd01f42af72428de957b58b316/opentelemetry-exporter-gcp-monitoring/src/opentelemetry/exporter/cloud_monitoring/__init__.py#L82 | ||
""" | ||
pass | ||
|
||
def force_flush(self, timeout_millis: float = 10_000): | ||
""" | ||
Adapted from CloudMonitoringMetricsExporter | ||
https://github.com/GoogleCloudPlatform/opentelemetry-operations-python/blob/3668dfe7ce3b80dd01f42af72428de957b58b316/opentelemetry-exporter-gcp-monitoring/src/opentelemetry/exporter/cloud_monitoring/__init__.py#L82 | ||
""" | ||
return True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We updated the buckets in java:
I think 100k is too small