diff --git a/google/cloud/bigtable/data/_metrics/__init__.py b/google/cloud/bigtable/data/_metrics/__init__.py index 43b8b6139..f0f4118d5 100644 --- a/google/cloud/bigtable/data/_metrics/__init__.py +++ b/google/cloud/bigtable/data/_metrics/__init__.py @@ -11,6 +11,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from google.cloud.bigtable.data._metrics.handlers.opentelemetry import ( + OpenTelemetryMetricsHandler, +) +from google.cloud.bigtable.data._metrics.handlers.gcp_exporter import ( + GoogleCloudMetricsHandler, +) +from google.cloud.bigtable.data._metrics.handlers._stdout import _StdoutMetricsHandler from google.cloud.bigtable.data._metrics.metrics_controller import ( BigtableClientSideMetricsController, ) @@ -20,6 +27,9 @@ __all__ = ( "BigtableClientSideMetricsController", + "OpenTelemetryMetricsHandler", + "GoogleCloudMetricsHandler", + "_StdoutMetricsHandler", "OperationType", "ActiveOperationMetric", ) diff --git a/google/cloud/bigtable/data/_metrics/handlers/_stdout.py b/google/cloud/bigtable/data/_metrics/handlers/_stdout.py new file mode 100644 index 000000000..5a2c4a78b --- /dev/null +++ b/google/cloud/bigtable/data/_metrics/handlers/_stdout.py @@ -0,0 +1,48 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from google.cloud.bigtable.data._metrics.handlers._base import MetricsHandler +from google.cloud.bigtable.data._metrics.data_model import CompletedOperationMetric + + +class _StdoutMetricsHandler(MetricsHandler): + """ + Prints a table of metric data after each operation, for debugging purposes. + """ + + def __init__(self, **kwargs): + self._completed_ops = {} + + def on_operation_complete(self, op: CompletedOperationMetric) -> None: + """ + After each operation, update the state and print the metrics table. + """ + current_list = self._completed_ops.setdefault(op.op_type, []) + current_list.append(op) + self.print() + + def print(self): + """ + Print the current state of the metrics table. + """ + print("Bigtable Metrics:") + for ops_type, ops_list in self._completed_ops.items(): + count = len(ops_list) + total_latency = sum([op.duration for op in ops_list]) + total_attempts = sum([len(op.completed_attempts) for op in ops_list]) + avg_latency = total_latency / count + avg_attempts = total_attempts / count + print( + f"{ops_type}: count: {count}, avg latency: {avg_latency:.2f}, avg attempts: {avg_attempts:.1f}" + ) + print() diff --git a/google/cloud/bigtable/data/_metrics/handlers/gcp_exporter.py b/google/cloud/bigtable/data/_metrics/handlers/gcp_exporter.py new file mode 100644 index 000000000..7f0e0365c --- /dev/null +++ b/google/cloud/bigtable/data/_metrics/handlers/gcp_exporter.py @@ -0,0 +1,268 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +import time + +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics import view +from opentelemetry.sdk.metrics.export import ( + HistogramDataPoint, + MetricExporter, + MetricExportResult, + MetricsData, + NumberDataPoint, + PeriodicExportingMetricReader, +) +from google.protobuf.timestamp_pb2 import Timestamp +from google.api.distribution_pb2 import Distribution +from google.api.metric_pb2 import Metric as GMetric +from google.api.monitored_resource_pb2 import MonitoredResource +from google.api.metric_pb2 import MetricDescriptor +from google.api_core import gapic_v1 +from google.cloud.monitoring_v3 import ( + CreateTimeSeriesRequest, + MetricServiceClient, + Point, + TimeInterval, + TimeSeries, + TypedValue, +) + +from google.cloud.bigtable.data._metrics.handlers.opentelemetry import ( + OpenTelemetryMetricsHandler, +) +from google.cloud.bigtable.data._metrics.handlers.opentelemetry import ( + _OpenTelemetryInstruments, +) + + +# create OpenTelemetry views for Bigtable metrics +# avoid reformatting into individual lines +# fmt: off +MILLIS_AGGREGATION = view.ExplicitBucketHistogramAggregation( + [ + 0, 0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, + 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, + 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000, + ] +) +# fmt: on +COUNT_AGGREGATION = view.SumAggregation() +INSTRUMENT_NAMES = ( + "operation_latencies", + "first_response_latencies", + "attempt_latencies", + "retry_count", + "server_latencies", + "connectivity_error_count", + "application_latencies", + "throttling_latencies", +) +VIEW_LIST = [ + view.View( + instrument_name=n, + name=n, + aggregation=MILLIS_AGGREGATION + if n.endswith("latencies") + else COUNT_AGGREGATION, + ) + for n in INSTRUMENT_NAMES +] + + +class GoogleCloudMetricsHandler(OpenTelemetryMetricsHandler): + """ + Maintains an internal set of OpenTelemetry metrics for the Bigtable client library, + and periodically exports them to Google Cloud Monitoring. + + The OpenTelemetry metrics that are tracked are as follows: + - operation_latencies: latency of each client method call, over all of it's attempts. + - first_response_latencies: latency of receiving the first row in a ReadRows operation. + - attempt_latencies: latency of each client attempt RPC. + - retry_count: Number of additional RPCs sent after the initial attempt. + - server_latencies: latency recorded on the server side for each attempt. + - connectivity_error_count: number of attempts that failed to reach Google's network. + - application_latencies: the time spent waiting for the application to process the next response. + - throttling_latencies: latency introduced by waiting when there are too many outstanding requests in a bulk operation. + + Args: + - project_id: The Google Cloud project ID for the associated Bigtable Table + - export_interval: The interval (in seconds) at which to export metrics to Cloud Monitoring. + """ + + def __init__(self, *args, project_id: str, export_interval=60, **kwargs): + # internal exporter to write metrics to Cloud Monitoring + exporter = _BigtableMetricsExporter(project_id=project_id) + # periodically executes exporter + gcp_reader = PeriodicExportingMetricReader( + exporter, export_interval_millis=export_interval * 1000 + ) + # use private meter provider to store instruments and views + meter_provider = MeterProvider(metric_readers=[gcp_reader], views=VIEW_LIST) + otel = _OpenTelemetryInstruments(meter_provider=meter_provider) + super().__init__(*args, instruments=otel, project_id=project_id, **kwargs) + + +class _BigtableMetricsExporter(MetricExporter): + """ + OpenTelemetry Exporter implementation for sending metrics to Google Cloud Monitoring. + + We must use a custom exporter because the public one doesn't support writing to internal + metrics like `bigtable.googleapis.com/internal/client/` + + Each GoogleCloudMetricsHandler will maintain its own exporter instance associated with the + project_id it is configured with. + + Args: + - project_id: GCP project id to associate metrics with + """ + + def __init__(self, project_id: str): + super().__init__() + self.client = MetricServiceClient() + self.prefix = "bigtable.googleapis.com/internal/client" + self.project_name = self.client.common_project_path(project_id) + + def export( + self, metrics_data: MetricsData, timeout_millis: float = 10_000, **kwargs + ) -> MetricExportResult: + """ + Write a set of metrics to Cloud Monitoring. + This method is called by the OpenTelemetry SDK + """ + deadline = time.time() + (timeout_millis / 1000) + metric_kind = MetricDescriptor.MetricKind.CUMULATIVE + all_series: list[TimeSeries] = [] + # process each metric from OTel format into Cloud Monitoring format + for resource_metric in metrics_data.resource_metrics: + for scope_metric in resource_metric.scope_metrics: + for metric in scope_metric.metrics: + for data_point in [ + pt for pt in metric.data.data_points if pt.attributes + ]: + if data_point.attributes: + project_id = data_point.attributes.get("resource_project") + if not isinstance(project_id, str): + # we expect string for project_id field + continue + monitored_resource = MonitoredResource( + type="bigtable_client_raw", + labels={ + "project_id": project_id, + "instance": data_point.attributes[ + "resource_instance" + ], + "cluster": data_point.attributes[ + "resource_cluster" + ], + "table": data_point.attributes["resource_table"], + "zone": data_point.attributes["resource_zone"], + }, + ) + point = self._to_point(data_point) + series = TimeSeries( + resource=monitored_resource, + metric_kind=metric_kind, + points=[point], + metric=GMetric( + type=f"{self.prefix}/{metric.name}", + labels={ + k: v + for k, v in data_point.attributes.items() + if not k.startswith("resource_") + }, + ), + unit=metric.unit, + ) + all_series.append(series) + # send all metrics to Cloud Monitoring + try: + self._batch_write(all_series, deadline) + return MetricExportResult.SUCCESS + except Exception: + return MetricExportResult.FAILURE + + def _batch_write( + self, series: list[TimeSeries], deadline=None, max_batch_size=200 + ) -> None: + """ + Adapted from CloudMonitoringMetricsExporter + https://github.com/GoogleCloudPlatform/opentelemetry-operations-python/blob/3668dfe7ce3b80dd01f42af72428de957b58b316/opentelemetry-exporter-gcp-monitoring/src/opentelemetry/exporter/cloud_monitoring/__init__.py#L82 + + Args: + - series: list of TimeSeries to write. Will be split into batches if necessary + - deadline: designates the time.time() at which to stop writing. If None, uses API default + - max_batch_size: maximum number of time series to write at once. + Cloud Monitoring allows up to 200 per request + """ + write_ind = 0 + while write_ind < len(series): + # find time left for next batch + timeout = deadline - time.time() if deadline else gapic_v1.method.DEFAULT + # write next batch + self.client.create_service_time_series( + CreateTimeSeriesRequest( + name=self.project_name, + time_series=series[write_ind : write_ind + max_batch_size], + ), + timeout=timeout, + ) + write_ind += max_batch_size + + @staticmethod + def _to_point(data_point: NumberDataPoint | HistogramDataPoint) -> Point: + """ + Adapted from CloudMonitoringMetricsExporter + https://github.com/GoogleCloudPlatform/opentelemetry-operations-python/blob/3668dfe7ce3b80dd01f42af72428de957b58b316/opentelemetry-exporter-gcp-monitoring/src/opentelemetry/exporter/cloud_monitoring/__init__.py#L82 + """ + if isinstance(data_point, HistogramDataPoint): + mean = data_point.sum / data_point.count if data_point.count else 0.0 + point_value = TypedValue( + distribution_value=Distribution( + count=data_point.count, + mean=mean, + bucket_counts=data_point.bucket_counts, + bucket_options=Distribution.BucketOptions( + explicit_buckets=Distribution.BucketOptions.Explicit( + bounds=data_point.explicit_bounds, + ) + ), + ) + ) + else: + if isinstance(data_point.value, int): + point_value = TypedValue(int64_value=data_point.value) + else: + point_value = TypedValue(double_value=data_point.value) + start_time = Timestamp() + start_time.FromNanoseconds(data_point.start_time_unix_nano) + end_time = Timestamp() + end_time.FromNanoseconds(data_point.time_unix_nano) + interval = TimeInterval(start_time=start_time, end_time=end_time) + return Point(interval=interval, value=point_value) + + def shutdown(self, timeout_millis: float = 30_000, **kwargs): + """ + Adapted from CloudMonitoringMetricsExporter + https://github.com/GoogleCloudPlatform/opentelemetry-operations-python/blob/3668dfe7ce3b80dd01f42af72428de957b58b316/opentelemetry-exporter-gcp-monitoring/src/opentelemetry/exporter/cloud_monitoring/__init__.py#L82 + """ + pass + + def force_flush(self, timeout_millis: float = 10_000): + """ + Adapted from CloudMonitoringMetricsExporter + https://github.com/GoogleCloudPlatform/opentelemetry-operations-python/blob/3668dfe7ce3b80dd01f42af72428de957b58b316/opentelemetry-exporter-gcp-monitoring/src/opentelemetry/exporter/cloud_monitoring/__init__.py#L82 + """ + return True diff --git a/google/cloud/bigtable/data/_metrics/handlers/opentelemetry.py b/google/cloud/bigtable/data/_metrics/handlers/opentelemetry.py new file mode 100644 index 000000000..9f4f2caf3 --- /dev/null +++ b/google/cloud/bigtable/data/_metrics/handlers/opentelemetry.py @@ -0,0 +1,233 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +import os +import socket +import uuid + +from google.cloud.bigtable import __version__ as bigtable_version +from google.cloud.bigtable.data._metrics.handlers._base import MetricsHandler +from google.cloud.bigtable.data._metrics.data_model import OperationType +from google.cloud.bigtable.data._metrics.data_model import DEFAULT_CLUSTER_ID +from google.cloud.bigtable.data._metrics.data_model import DEFAULT_ZONE +from google.cloud.bigtable.data._metrics.data_model import ActiveOperationMetric +from google.cloud.bigtable.data._metrics.data_model import CompletedAttemptMetric +from google.cloud.bigtable.data._metrics.data_model import CompletedOperationMetric + + +class _OpenTelemetryInstruments: + """ + class that holds OpenTelelmetry instrument objects + """ + + def __init__(self, meter_provider=None): + if meter_provider is None: + # use global meter provider + from opentelemetry import metrics + + meter_provider = metrics + # grab meter for this module + meter = meter_provider.get_meter("bigtable.googleapis.com") + # create instruments + self.operation_latencies = meter.create_histogram( + name="operation_latencies", + description=""" + The total end-to-end latency across all RPC attempts associated with a Bigtable operation. + This metric measures an operation's round trip from the client to Bigtable and back to the client and includes all retries. + + For ReadRows requests, the operation latencies include the application processing time for each returned message. + """, + unit="ms", + ) + self.first_response_latencies = meter.create_histogram( + name="first_response_latencies", + description="Latencies from when a client sends a request and receives the first row of the response.", + unit="ms", + ) + self.attempt_latencies = meter.create_histogram( + name="attempt_latencies", + description=""" + The latencies of a client RPC attempt. + + Under normal circumstances, this value is identical to operation_latencies. + If the client receives transient errors, however, then operation_latencies is the sum of all attempt_latencies and the exponential delays. + """, + unit="ms", + ) + self.retry_count = meter.create_counter( + name="retry_count", + description=""" + A counter that records the number of attempts that an operation required to complete. + Under normal circumstances, this value is empty. + """, + ) + self.server_latencies = meter.create_histogram( + name="server_latencies", + description="Latencies between the time when the Google frontend receives an RPC and when it sends the first byte of the response.", + unit="ms", + ) + self.connectivity_error_count = meter.create_counter( + name="connectivity_error_count", + description=""" + The number of requests that failed to reach Google's network. + In normal cases, this number is 0. When the number is not 0, it can indicate connectivity issues between the application and the Google network. + """, + ) + self.application_latencies = meter.create_histogram( + name="application_latencies", + description=""" + The time from when the client receives the response to a request until the application reads the response. + This metric is most relevant for ReadRows requests. + The start and stop times for this metric depend on the way that you send the read request; see Application blocking latencies timer examples for details. + """, + unit="ms", + ) + self.throttling_latencies = meter.create_histogram( + name="throttling_latencies", + description="Latencies introduced when the client blocks the sending of more requests to the server because of too many pending requests in a bulk operation.", + unit="ms", + ) + + +class OpenTelemetryMetricsHandler(MetricsHandler): + """ + Maintains a set of OpenTelemetry metrics for the Bigtable client library, + and updates them with each completed operation and attempt. + + The OpenTelemetry metrics that are tracked are as follows: + - operation_latencies: latency of each client method call, over all of it's attempts. + - first_response_latencies: latency of receiving the first row in a ReadRows operation. + - attempt_latencies: latency of each client attempt RPC. + - retry_count: Number of additional RPCs sent after the initial attempt. + - server_latencies: latency recorded on the server side for each attempt. + - connectivity_error_count: number of attempts that failed to reach Google's network. + - application_latencies: the time spent waiting for the application to process the next response. + - throttling_latencies: latency introduced by waiting when there are too many outstanding requests in a bulk operation. + """ + + def __init__( + self, + *, + project_id: str, + instance_id: str, + table_id: str, + app_profile_id: str | None = None, + client_uid: str | None = None, + instruments: _OpenTelemetryInstruments = _OpenTelemetryInstruments(), + **kwargs, + ): + super().__init__() + self.otel = instruments + # fixed labels sent with each metric update + self.shared_labels = { + "client_name": f"python-bigtable/{bigtable_version}", + "client_uid": client_uid or self._generate_client_uid(), + "resource_project": project_id, + "resource_instance": instance_id, + "resource_table": table_id, + "app_profile": app_profile_id or "default", + } + + @staticmethod + def _generate_client_uid(): + """ + client_uid will take the format `python-@` where uuid is a + random value, pid is the process id, and hostname is the hostname of the machine. + + If not found, localhost will be used in place of hostname, and a random number + will be used in place of pid. + """ + try: + hostname = socket.gethostname() or "localhost" + except Exception: + hostname = "localhost" + try: + pid = os.getpid() or "" + except Exception: + pid = "" + return f"python-{uuid.uuid4()}-{pid}@{hostname}" + + def on_operation_complete(self, op: CompletedOperationMetric) -> None: + """ + Update the metrics associated with a completed operation: + - operation_latencies + - retry_count + """ + labels = { + "method": op.op_type.value, + "status": op.final_status.name, + "resource_zone": op.zone, + "resource_cluster": op.cluster_id, + **self.shared_labels, + } + is_streaming = str(op.is_streaming) + + self.otel.operation_latencies.record( + op.duration_ms, {"streaming": is_streaming, **labels} + ) + # only record completed attempts if there were retries + if op.completed_attempts: + self.otel.retry_count.add(len(op.completed_attempts) - 1, labels) + + def on_attempt_complete( + self, attempt: CompletedAttemptMetric, op: ActiveOperationMetric + ): + """ + Update the metrics associated with a completed attempt: + - attempt_latencies + - first_response_latencies + - server_latencies + - connectivity_error_count + - application_latencies + - throttling_latencies + """ + labels = { + "method": op.op_type.value, + "resource_zone": op.zone or DEFAULT_ZONE, # fallback to default if unset + "resource_cluster": op.cluster_id or DEFAULT_CLUSTER_ID, + **self.shared_labels, + } + status = attempt.end_status.name + is_streaming = str(op.is_streaming) + + self.otel.attempt_latencies.record( + attempt.duration_ms, {"streaming": is_streaming, "status": status, **labels} + ) + combined_throttling = attempt.grpc_throttling_time_ms + if not op.completed_attempts: + # add flow control latency to first attempt's throttling latency + combined_throttling += op.flow_throttling_time_ms + self.otel.throttling_latencies.record(combined_throttling, labels) + self.otel.application_latencies.record( + attempt.application_blocking_time_ms + attempt.backoff_before_attempt_ms, labels + ) + if ( + op.op_type == OperationType.READ_ROWS + and attempt.first_response_latency_ms is not None + ): + self.otel.first_response_latencies.record( + attempt.first_response_latency_ms, {"status": status, **labels} + ) + if attempt.gfe_latency_ms is not None: + self.otel.server_latencies.record( + attempt.gfe_latency_ms, + {"streaming": is_streaming, "status": status, **labels}, + ) + else: + # gfe headers not attached. Record a connectivity error. + # TODO: this should not be recorded as an error when direct path is enabled + self.otel.connectivity_error_count.add( + 1, {"status": status, **labels} + ) diff --git a/google/cloud/bigtable/data/_metrics/metrics_controller.py b/google/cloud/bigtable/data/_metrics/metrics_controller.py index a70caaa76..2995a6400 100644 --- a/google/cloud/bigtable/data/_metrics/metrics_controller.py +++ b/google/cloud/bigtable/data/_metrics/metrics_controller.py @@ -13,11 +13,20 @@ # limitations under the License. from __future__ import annotations +import os + from google.cloud.bigtable.data._metrics.data_model import ActiveOperationMetric +from google.cloud.bigtable.data._metrics.handlers.gcp_exporter import ( + GoogleCloudMetricsHandler, +) +from google.cloud.bigtable.data._metrics.handlers._stdout import _StdoutMetricsHandler from google.cloud.bigtable.data._metrics.handlers._base import MetricsHandler from google.cloud.bigtable.data._metrics.data_model import OperationType +PRINT_METRICS = os.getenv("BIGTABLE_PRINT_METRICS", False) + + class BigtableClientSideMetricsController: """ BigtableClientSideMetricsController is responsible for managing the @@ -37,8 +46,13 @@ def __init__(self, handlers: list[MetricsHandler] | None = None, **kwargs): self.handlers: list[MetricsHandler] = handlers or [] if handlers is None: # handlers not given. Use default handlers. - # TODO: add default handlers - pass + if PRINT_METRICS: + self.handlers.append(_StdoutMetricsHandler(**kwargs)) + try: + ot_handler = GoogleCloudMetricsHandler(**kwargs) + self.handlers.append(ot_handler) + except ImportError: + pass def add_handler(self, handler: MetricsHandler) -> None: """ diff --git a/tests/unit/data/_metrics/handlers/test_handler_gcp_exporter.py b/tests/unit/data/_metrics/handlers/test_handler_gcp_exporter.py new file mode 100644 index 000000000..c353cd681 --- /dev/null +++ b/tests/unit/data/_metrics/handlers/test_handler_gcp_exporter.py @@ -0,0 +1,391 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +import mock + + +class TestGoogleCloudMetricsHandler: + def _make_one(self, **kwargs): + from google.cloud.bigtable.data._metrics import GoogleCloudMetricsHandler + + if not kwargs: + # create defaults + kwargs = { + "project_id": "p", + "instance_id": "i", + "table_id": "t", + "app_profile_id": "a", + } + return GoogleCloudMetricsHandler(**kwargs) + + @pytest.mark.parametrize( + "metric_name,kind", + [ + ("operation_latencies", "histogram"), + ("first_response_latencies", "histogram"), + ("attempt_latencies", "histogram"), + ("retry_count", "count"), + ("server_latencies", "histogram"), + ("connectivity_error_count", "count"), + ("application_latencies", "histogram"), + ("throttling_latencies", "histogram"), + ], + ) + def test_ctor_creates_metrics(self, metric_name, kind): + """ + Make sure each expected metric is created + """ + from opentelemetry.metrics import Counter + from opentelemetry.metrics import Histogram + + instance = self._make_one() + metric = getattr(instance.otel, metric_name) + if kind == "count": + assert isinstance(metric, Counter) + elif kind == "histogram": + assert isinstance(metric, Histogram) + else: + raise ValueError(f"Unknown metric kind: {kind}") + + def test_ctor_shared_otel_instance(self): + """ + Each instance should use its own Otel instruments + """ + instance_1 = self._make_one() + instance_2 = self._make_one() + assert instance_1.otel is not instance_2.otel + + @mock.patch( + "google.cloud.bigtable.data._metrics.handlers.gcp_exporter._OpenTelemetryInstruments", + autospec=True, + ) + @mock.patch( + "google.cloud.bigtable.data._metrics.handlers.gcp_exporter.MeterProvider", + autospec=True, + ) + def test_uses_custom_meter_provider(self, mock_meter_provider, mock_instruments): + """ + Metrics should be set up using an instance-specific meter provider + """ + from google.cloud.bigtable.data._metrics.handlers.gcp_exporter import VIEW_LIST + from google.cloud.bigtable.data._metrics.handlers.gcp_exporter import ( + _BigtableMetricsExporter, + ) + + project_id = "test-project" + instance = self._make_one(project_id=project_id, instance_id="i", table_id="t") + assert instance.otel is mock_instruments.return_value + otel_kwargs = mock_instruments.call_args[1] + # meter provider was used to instantiate the instruments + assert otel_kwargs["meter_provider"] is mock_meter_provider.return_value + # meter provider was configured with gcp reader and views + mp_kwargs = mock_meter_provider.call_args[1] + assert mp_kwargs["views"] == VIEW_LIST + reader_list = mp_kwargs["metric_readers"] + assert len(reader_list) == 1 + found_reader = reader_list[0] + # ensure exporter was set up + assert isinstance(found_reader._exporter, _BigtableMetricsExporter) + assert found_reader._exporter.project_name == f"projects/{project_id}" + + @mock.patch( + "google.cloud.bigtable.data._metrics.handlers.gcp_exporter.PeriodicExportingMetricReader", + autospec=True, + ) + def test_custom_export_interval(self, mock_reader): + """ + should be able to set a custom export interval + """ + input_interval = 123 + try: + self._make_one( + export_interval=input_interval, + project_id="p", + instance_id="i", + table_id="t", + ) + except Exception: + pass + reader_init_kwargs = mock_reader.call_args[1] + found_interval = reader_init_kwargs["export_interval_millis"] + assert found_interval == input_interval * 1000 # convert to ms + + +class Test_BigtableMetricsExporter: + def _get_class(self): + from google.cloud.bigtable.data._metrics.handlers.gcp_exporter import ( + _BigtableMetricsExporter, + ) + + return _BigtableMetricsExporter + + def _make_one(self, project_id="test-project"): + return self._get_class()(project_id) + + def test_ctor(self): + from google.cloud.monitoring_v3 import ( + MetricServiceClient, + ) + + project = "test-project" + instance = self._make_one(project) + assert instance.project_name == f"projects/{project}" + assert isinstance(instance.client, MetricServiceClient) + + def test_export_conversion_number(self): + """ + export function should properly convert between opentelemetry DataPoints + and bigtable TimeSeries + """ + from opentelemetry.sdk.metrics.export import ( + NumberDataPoint, + HistogramDataPoint, + MetricsData, + ResourceMetrics, + ScopeMetrics, + ) + + attributes = { + "resource_project": "project", + "resource_instance": "instance", + "resource_table": "table", + "resource_zone": "zone", + "resource_cluster": "cluster", + "some_other_attr": "value", + } + start_time = 1 + end_time = 2 + # test with different data points + int_pt = NumberDataPoint(attributes, start_time, end_time, 10) + float_pt = NumberDataPoint(attributes, start_time, end_time, 5.5) + histogram_pt = HistogramDataPoint( + attributes, + start_time, + end_time, + count=3, + sum=12, + bucket_counts=[1, 2, 3], + explicit_bounds=[1, 2, 3], + min=1, + max=3, + ) + # wrap up data points in OpenTelemetry objects + metric = mock.Mock() + metric.name = "metric_name" + metric.unit = "metric_unit" + metric.data.data_points = [int_pt, float_pt, histogram_pt] + full_data = MetricsData( + [ + ResourceMetrics( + mock.Mock(), [ScopeMetrics(mock.Mock(), [metric], "")], "" + ) + ] + ) + # run through export to convert to TimeSeries + instance = self._make_one() + with mock.patch.object(instance, "_batch_write") as mock_batch_write: + instance.export(full_data) + resulting_list = mock_batch_write.call_args[0][0] + # examine output + assert len(resulting_list) == len(metric.data.data_points) + found_int_pt, found_float_pt, found_hist_pt = resulting_list + # ensure values were set correctly + assert found_int_pt.points[0].value.int64_value == int_pt.value + assert found_float_pt.points[0].value.double_value == float_pt.value + hist_value = found_hist_pt.points[0].value.distribution_value + assert hist_value.count == histogram_pt.count + assert hist_value.mean == histogram_pt.sum / histogram_pt.count + assert hist_value.bucket_counts == histogram_pt.bucket_counts + assert ( + hist_value.bucket_options.explicit_buckets.bounds + == histogram_pt.explicit_bounds + ) + # check fields that should be common across all TimeSeries + for result_series in resulting_list: + result_series = resulting_list[0] + assert result_series.metric_kind == 3 # CUMULATIVE + assert result_series.unit == metric.unit + assert len(result_series.points) == 1 + assert ( + result_series.points[0].interval.start_time._nanosecond + == start_time + ) + assert result_series.points[0].interval.end_time._nanosecond == end_time + assert ( + result_series.metric.type + == f"bigtable.googleapis.com/internal/client/{metric.name}" + ) + assert ( + result_series.metric.labels["some_other_attr"] + == attributes["some_other_attr"] + ) + assert len(result_series.metric.labels) == 1 + # check the monitored resource + monitored_resource = result_series.resource + assert monitored_resource.type == "bigtable_client_raw" + assert len(monitored_resource.labels) == 5 + assert ( + monitored_resource.labels["project_id"] + == attributes["resource_project"] + ) + assert ( + monitored_resource.labels["instance"] + == attributes["resource_instance"] + ) + assert ( + monitored_resource.labels["table"] == attributes["resource_table"] + ) + assert monitored_resource.labels["zone"] == attributes["resource_zone"] + assert ( + monitored_resource.labels["cluster"] + == attributes["resource_cluster"] + ) + + def test_export_timeout(self): + """ + timeout value should be properly passed to _batch_write + """ + timeout_ms = 123_000 + current_timestamp = 5 + instance = self._make_one() + metric_data = mock.Mock() + metric_data.resource_metrics = [] + + with mock.patch("time.time", return_value=current_timestamp): + with mock.patch.object(instance, "_batch_write") as mock_batch_write: + instance.export(metric_data, timeout_millis=timeout_ms) + found_args = mock_batch_write.call_args[0] + found_deadline = found_args[1] + assert found_deadline == current_timestamp + (timeout_ms / 1000) + + @pytest.mark.parametrize("should_fail", [True, False]) + def test_export_return_value(self, should_fail): + """ + should return success or failure based on result of _batch_write + """ + from opentelemetry.sdk.metrics.export import MetricExportResult + + instance = self._make_one() + metric_data = mock.Mock() + metric_data.resource_metrics = [] + + with mock.patch.object(instance, "_batch_write") as mock_batch_write: + if should_fail: + mock_batch_write.side_effect = Exception("test exception") + result = instance.export(metric_data) + if should_fail: + assert result == MetricExportResult.FAILURE + else: + assert result == MetricExportResult.SUCCESS + + @pytest.mark.parametrize( + "num_series,batch_size,expected", + [ + (1, 1, 1), + (1, 2, 1), + (2, 1, 2), + (2, 2, 1), + (3, 2, 2), + (3, 3, 1), + (0, 10, 0), + (201, 200, 2), + (500, None, 3), # default batch size is 200 + ], + ) + @mock.patch( + "google.cloud.bigtable.data._metrics.handlers.gcp_exporter.MetricServiceClient", + autospec=True, + ) + def test__batch_write_batching(self, mock_client, num_series, batch_size, expected): + """ + should properly batch large requests into multiple calls + """ + from google.cloud.monitoring_v3 import TimeSeries + + instance = self._make_one() + instance.project_name = "projects/project" + series = [TimeSeries() for _ in range(num_series)] + kwargs = {"max_batch_size": batch_size} if batch_size is not None else {} + instance._batch_write(series, **kwargs) + # ensure that the right number of batches were used + rpc_count = mock_client.return_value.create_service_time_series.call_count + assert rpc_count == expected + # check actual batch sizes + requests = [ + call[0][0] + for call in mock_client.return_value.create_service_time_series.call_args_list + ] + assert len(requests) == expected + if batch_size is None: + batch_size = 200 + for request in requests[:-1]: + assert len(request.time_series) == batch_size + # last batch may be smaller + if requests: + last_size = len(requests[-1].time_series) + assert last_size == batch_size or last_size == num_series % batch_size + + def test__batch_write_uses_project_name(self): + """ + exporter should use project sent at init time + """ + from google.cloud.monitoring_v3 import TimeSeries + + expected_project = "test-project" + batch = [TimeSeries()] * 5 + instance = self._make_one() + with mock.patch.object( + instance.client, "create_service_time_series" + ) as mock_rpc: + instance._batch_write(batch, max_batch_size=1) + requests = [call[0][0] for call in mock_rpc.call_args_list] + assert len(requests) == 5 + for request in requests: + assert request.name == f"projects/{expected_project}" + + @pytest.mark.parametrize( + "start_time,deadline,rpc_time,expected", + [ + (0, 5, 1, (5, 4, 3, 2, 1, 0, -1)), + (1, 10, 2, (9, 7, 5, 3, 1, -1)), + ], + ) + @mock.patch("time.time") + def test__batch_write_deadline( + self, mock_time, start_time, deadline, rpc_time, expected + ): + """ + deadline should be properly calculated and passed to RPC as timeouts + """ + from google.cloud.monitoring_v3 import TimeSeries + + batch = [TimeSeries()] * len(expected) + instance = self._make_one() + + # increment time.time() each time it is called + def increment_time(*args, **kwargs): + mock_time.return_value += rpc_time + + mock_time.return_value = start_time + + with mock.patch.object( + instance.client, "create_service_time_series" + ) as mock_rpc: + mock_rpc.side_effect = increment_time + instance._batch_write(batch, max_batch_size=1, deadline=deadline) + timeouts = [call[1]["timeout"] for call in mock_rpc.call_args_list] + assert len(timeouts) == len(expected) + for timeout, expected_timeout in zip(timeouts, expected): + assert timeout == expected_timeout diff --git a/tests/unit/data/_metrics/handlers/test_handler_opentelemetry.py b/tests/unit/data/_metrics/handlers/test_handler_opentelemetry.py new file mode 100644 index 000000000..94c1acc3b --- /dev/null +++ b/tests/unit/data/_metrics/handlers/test_handler_opentelemetry.py @@ -0,0 +1,432 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +import mock + +from google.cloud.bigtable.data._metrics.data_model import ActiveOperationMetric +from google.cloud.bigtable.data._metrics.data_model import CompletedAttemptMetric +from google.cloud.bigtable.data._metrics.data_model import CompletedOperationMetric + + +class TestOpenTelemetryMetricsHandler: + def _make_one(self, **kwargs): + from google.cloud.bigtable.data._metrics import OpenTelemetryMetricsHandler + + if not kwargs: + # create defaults + kwargs = { + "project_id": "p", + "instance_id": "i", + "table_id": "t", + "app_profile_id": "a", + } + return OpenTelemetryMetricsHandler(**kwargs) + + @pytest.mark.parametrize( + "metric_name,kind", + [ + ("operation_latencies", "histogram"), + ("first_response_latencies", "histogram"), + ("attempt_latencies", "histogram"), + ("retry_count", "count"), + ("server_latencies", "histogram"), + ("connectivity_error_count", "count"), + ("application_latencies", "histogram"), + ("throttling_latencies", "histogram"), + ], + ) + def test_ctor_creates_metrics(self, metric_name, kind): + """ + Make sure each expected metric is created + """ + from opentelemetry.metrics import Counter + from opentelemetry.metrics import Histogram + + instance = self._make_one() + metric = getattr(instance.otel, metric_name) + if kind == "count": + assert isinstance(metric, Counter) + elif kind == "histogram": + assert isinstance(metric, Histogram) + else: + raise ValueError(f"Unknown metric kind: {kind}") + + def test_ctor_labels(self): + """ + should create dicts with with client name and uid, and shared labels + """ + from google.cloud.bigtable import __version__ + + expected_project = "p" + expected_instance = "i" + expected_table = "t" + expected_app_profile = "a" + expected_uid = "uid" + + instance = self._make_one( + project_id=expected_project, + instance_id=expected_instance, + table_id=expected_table, + app_profile_id=expected_app_profile, + client_uid=expected_uid, + ) + assert instance.shared_labels["client_uid"] == expected_uid + assert instance.shared_labels["client_name"] == f"python-bigtable/{__version__}" + assert instance.shared_labels["resource_project"] == expected_project + assert instance.shared_labels["resource_instance"] == expected_instance + assert instance.shared_labels["resource_table"] == expected_table + assert instance.shared_labels["app_profile"] == expected_app_profile + assert len(instance.shared_labels) == 6 + + def test_ctor_shared_otel_instance(self): + """ + Two instances should be writing to the same metrics + """ + instance1 = self._make_one() + instance2 = self._make_one() + assert instance1 is not instance2 + assert instance1.otel is instance2.otel + assert instance1.otel.attempt_latencies is instance2.otel.attempt_latencies + + def test_ctor_defaults(self): + """ + Should work without explicit uid or app_profile_id + """ + instance = self._make_one( + project_id="p", + instance_id="i", + table_id="t", + ) + assert instance.shared_labels["client_uid"] is not None + assert isinstance(instance.shared_labels["client_uid"], str) + assert len(instance.shared_labels["client_uid"]) > 10 # should be decently long + assert instance.shared_labels["resource_project"] == "p" + assert instance.shared_labels["resource_instance"] == "i" + assert instance.shared_labels["resource_table"] == "t" + assert instance.shared_labels["app_profile"] == "default" + assert len(instance.shared_labels) == 6 + + def test__generate_client_uid(self): + """ + Should generate a unique id with format `python-@` + """ + import re + instance = self._make_one() + # test with random values + uid = instance._generate_client_uid() + assert re.match(r"python-[a-f0-9-]+-[0-9]+@\w+", uid) + assert isinstance(uid, str) + # test with fixed mocks + with mock.patch("os.getpid", return_value="pid"): + with mock.patch("socket.gethostname", return_value="google.com"): + with mock.patch("uuid.uuid4", return_value="uuid"): + uid = instance._generate_client_uid() + assert uid == "python-uuid-pid@google.com" + # test with exceptions + with mock.patch("os.getpid", side_effect=Exception): + with mock.patch("socket.gethostname", side_effect=Exception): + with mock.patch("uuid.uuid4", return_value="uuid"): + uid = instance._generate_client_uid() + assert uid == "python-uuid-@localhost" + + + @pytest.mark.parametrize( + "metric_name,kind,optional_labels", + [ + ("first_response_latencies", "histogram", ["status"]), + ("attempt_latencies", "histogram", ["status", "streaming"]), + ("server_latencies", "histogram", ["status", "streaming"]), + ("connectivity_error_count", "count", ["status"]), + ("application_latencies", "histogram", []), + ("throttling_latencies", "histogram", []), + ], + ) + def test_attempt_update_labels(self, metric_name, kind, optional_labels): + """ + test that each attempt metric is sending the set of expected labels + + optional_labels: status and streaming aren't used by all metrics. + Mark which ones expect them + """ + from google.cloud.bigtable.data._metrics.data_model import OperationType + from grpc import StatusCode + + expected_op_type = OperationType.READ_ROWS + expected_status = StatusCode.ABORTED + expected_streaming = mock.Mock() + # server_latencies only shows up if gfe_latency_ms is set + gfe_latency_ms = 1 if metric_name == "server_latencies" else None + attempt = CompletedAttemptMetric( + start_time=0, + duration_ms=1, + end_status=expected_status, + gfe_latency_ms=gfe_latency_ms, + first_response_latency_ms=1, + ) + op = ActiveOperationMetric(expected_op_type, is_streaming=expected_streaming) + + instance = self._make_one() + metric = getattr(instance.otel, metric_name) + record_fn = "record" if kind == "histogram" else "add" + with mock.patch.object(metric, record_fn) as record: + instance.on_attempt_complete(attempt, op) + assert record.call_count == 1 + found_labels = record.call_args[0][1] + assert found_labels["method"] == expected_op_type.value + if "status" in optional_labels: + assert found_labels["status"] == expected_status.name + else: + assert "status" not in found_labels + if "streaming" in optional_labels: + assert found_labels["streaming"] == str(expected_streaming) + else: + assert "streaming" not in found_labels + assert len(instance.shared_labels) == 6 + # shared labels should be copied over + for k in instance.shared_labels: + assert k in found_labels + assert found_labels[k] == instance.shared_labels[k] + + @pytest.mark.parametrize( + "metric_name,kind,optional_labels", + [ + ("operation_latencies", "histogram", ["status", "streaming"]), + ("retry_count", "count", ["status"]), + ], + ) + def test_operation_update_labels(self, metric_name, kind, optional_labels): + """ + test that each operation metric is sending the set of expected labels + + optional_labels: status and streaming aren't used by all metrics. + Mark which ones expect them + """ + from google.cloud.bigtable.data._metrics.data_model import OperationType + from grpc import StatusCode + + expected_op_type = OperationType.READ_ROWS + expected_status = StatusCode.RESOURCE_EXHAUSTED + expected_streaming = mock.Mock() + op = CompletedOperationMetric( + op_type=expected_op_type, + start_time=0, + completed_attempts=[object()], + duration_ms=1, + final_status=expected_status, + cluster_id="c", + zone="z", + is_streaming=expected_streaming, + ) + instance = self._make_one() + metric = getattr(instance.otel, metric_name) + record_fn = "record" if kind == "histogram" else "add" + with mock.patch.object(metric, record_fn) as record: + instance.on_operation_complete(op) + assert record.call_count == 1 + found_labels = record.call_args[0][1] + assert found_labels["method"] == expected_op_type.value + if "status" in optional_labels: + assert found_labels["status"] == expected_status.name + else: + assert "status" not in found_labels + if "streaming" in optional_labels: + assert found_labels["streaming"] == str(expected_streaming) + else: + assert "streaming" not in found_labels + assert len(instance.shared_labels) == 6 + # shared labels should be copied over + for k in instance.shared_labels: + assert k in found_labels + assert found_labels[k] == instance.shared_labels[k] + + def test_attempt_update_latency_ms(self): + """ + update attempt_latencies on attempt completion + """ + expected_latency_ms = 123 + attempt = CompletedAttemptMetric( + start_time=0, duration_ms=expected_latency_ms, end_status=mock.Mock() + ) + op = ActiveOperationMetric(mock.Mock()) + + instance = self._make_one() + with mock.patch.object(instance.otel.attempt_latencies, "record") as record: + instance.on_attempt_complete(attempt, op) + assert record.call_count == 1 + assert record.call_args[0][0] == expected_latency_ms + + def test_attempt_update_first_response(self): + """ + update first_response_latency_ms on attempt completion + """ + from google.cloud.bigtable.data._metrics.data_model import OperationType + + expected_first_response_latency_ms = 123 + attempt = CompletedAttemptMetric( + start_time=0, + duration_ms=1, + end_status=mock.Mock(), + first_response_latency_ms=expected_first_response_latency_ms, + ) + op = ActiveOperationMetric(OperationType.READ_ROWS) + + instance = self._make_one() + with mock.patch.object( + instance.otel.first_response_latencies, "record" + ) as record: + instance.on_attempt_complete(attempt, op) + assert record.call_count == 1 + assert record.call_args[0][0] == expected_first_response_latency_ms + + def test_attempt_update_server_latency_ms(self): + """ + update server_latency_ms on attempt completion + """ + expected_latency_ms = 456 + attempt = CompletedAttemptMetric( + start_time=0, + duration_ms=expected_latency_ms, + end_status=mock.Mock(), + gfe_latency_ms=expected_latency_ms, + ) + op = ActiveOperationMetric(mock.Mock()) + + instance = self._make_one() + with mock.patch.object(instance.otel.server_latencies, "record") as record: + instance.on_attempt_complete(attempt, op) + assert record.call_count == 1 + assert record.call_args[0][0] == expected_latency_ms + + def test_attempt_update_connectivity_error_count(self): + """ + update connectivity_error_count on attempt completion + """ + # error connectivity is logged when gfe_latency_ms is None + attempt = CompletedAttemptMetric( + start_time=0, duration_ms=1, end_status=mock.Mock(), gfe_latency_ms=None + ) + op = ActiveOperationMetric(mock.Mock()) + + instance = self._make_one() + with mock.patch.object(instance.otel.connectivity_error_count, "add") as add: + instance.on_attempt_complete(attempt, op) + assert add.call_count == 1 + assert add.call_args[0][0] == 1 + + @pytest.mark.parametrize("app_blocking,backoff", [(0, 10), (10, 0), (123, 456)]) + def test_attempt_update_application_latencies(self, app_blocking, backoff): + """ + update application_latencies on attempt completion + """ + expected_total_latency_ms = app_blocking + backoff + attempt = CompletedAttemptMetric( + start_time=0, + duration_ms=1, + end_status=mock.Mock(), + application_blocking_time_ms=app_blocking, + backoff_before_attempt_ms=backoff, + ) + op = ActiveOperationMetric(mock.Mock()) + + instance = self._make_one() + with mock.patch.object(instance.otel.application_latencies, "record") as record: + instance.on_attempt_complete(attempt, op) + assert record.call_count == 1 + assert record.call_args[0][0] == expected_total_latency_ms + + @pytest.mark.parametrize("grpc,flow", [(0, 10), (10, 0), (123, 456)]) + def test_attempt_update_throttling_latencies(self, grpc, flow): + """ + Update throttling_latencies on attempt completion + """ + expected_total_latency_ms = grpc + flow + attempt = CompletedAttemptMetric( + start_time=0, + duration_ms=1, + end_status=mock.Mock(), + grpc_throttling_time_ms=grpc, + ) + op = ActiveOperationMetric(mock.Mock(), flow_throttling_time_ms=flow) + + instance = self._make_one() + with mock.patch.object(instance.otel.throttling_latencies, "record") as record: + instance.on_attempt_complete(attempt, op) + assert record.call_count == 1 + assert record.call_args[0][0] == expected_total_latency_ms + + def test_attempt_empty_cluster_zone(self): + """ + if cluster and zone are None at attempt complete, fall back to default values + """ + op = ActiveOperationMetric(mock.Mock()) + attempt = CompletedAttemptMetric( + start_time=0, + duration_ms=1, + end_status=mock.Mock(), + ) + op.cluster_id = None + op.zone = None + instance = self._make_one() + with mock.patch.object(instance.otel.throttling_latencies, "record") as record: + instance.on_attempt_complete(attempt, op) + labels = record.call_args[0][1] + assert labels["resource_cluster"] == "unspecified" + assert labels["resource_zone"] == "global" + + def tyest_operation_update_latency_ms(self): + """ + update op_latency_ms on operation completion + """ + expected_latency_ms = 123 + op = CompletedOperationMetric( + op_type=mock.Mock(), + start_time=0, + completed_attempts=[], + duration_ms=expected_latency_ms, + final_status=mock.Mock(), + cluster_id="c", + zone="z", + is_streaming=True, + ) + + instance = self._make_one() + with mock.patch.object(instance.otel.operation_latencies, "record") as record: + instance.on_operation_complete(op) + assert record.call_count == 1 + assert record.call_args[0][0] == expected_latency_ms + + def test_operation_update_retry_count(self): + """ + update retry_count on operation completion + """ + num_attempts = 9 + # we don't count the first attempt + expected_count = num_attempts - 1 + op = CompletedOperationMetric( + op_type=mock.Mock(), + start_time=0, + completed_attempts=[object()] * num_attempts, + duration_ms=1, + final_status=mock.Mock(), + cluster_id="c", + zone="z", + is_streaming=True, + ) + + instance = self._make_one() + with mock.patch.object(instance.otel.retry_count, "add") as add: + instance.on_operation_complete(op) + assert add.call_count == 1 + assert add.call_args[0][0] == expected_count diff --git a/tests/unit/data/_metrics/test_metrics_controller.py b/tests/unit/data/_metrics/test_metrics_controller.py index 12cd32c92..9d2cfe209 100644 --- a/tests/unit/data/_metrics/test_metrics_controller.py +++ b/tests/unit/data/_metrics/test_metrics_controller.py @@ -27,10 +27,30 @@ def test_ctor_defaults(self): """ should create instance with GCP Exporter handler by default """ + from google.cloud.bigtable.data._metrics import GoogleCloudMetricsHandler + instance = self._make_one( project_id="p", instance_id="i", table_id="t", app_profile_id="a" ) - assert len(instance.handlers) == 0 + assert len(instance.handlers) == 1 + assert isinstance(instance.handlers[0], GoogleCloudMetricsHandler) + + def test_ctor_w_logging(self): + """ + if BIGTABLE_PRINT_METRICS is True, include _StdoutMetricsHandler + """ + from google.cloud.bigtable.data._metrics import GoogleCloudMetricsHandler + from google.cloud.bigtable.data._metrics import _StdoutMetricsHandler + + with mock.patch( + "google.cloud.bigtable.data._metrics.metrics_controller.PRINT_METRICS", True + ): + controller = self._make_one( + project_id="p", instance_id="i", table_id="t", app_profile_id="a" + ) + assert len(controller.handlers) == 2 + assert GoogleCloudMetricsHandler in [type(h) for h in controller.handlers] + assert _StdoutMetricsHandler in [type(h) for h in controller.handlers] def ctor_custom_handlers(self): """