Skip to content

Commit

Permalink
commit
Browse files Browse the repository at this point in the history
  • Loading branch information
HadhemiDD committed Dec 19, 2024
1 parent f6fc127 commit b3e0374
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 59 deletions.
49 changes: 28 additions & 21 deletions duckdb/datadog_checks/duckdb/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from contextlib import closing, contextmanager
from copy import deepcopy
import json
import os
import re
from typing import Any, AnyStr, Iterable, Iterator, Sequence # noqa: F401

Expand Down Expand Up @@ -64,14 +65,20 @@ def _execute_query_raw(self, query):

query = query.format(self.db_name)
curs = cursor.execute(query)
if len(cursor.execute(query).fetchall()) < 1: # this was returning a -1 with rowcount
if len(curs.fetchall()) < 1: # this was returning a -1 with rowcount
self._query_errors += 1
self.log.warning('Failed to fetch records from query: `%s`.', query)
return None
for row in cursor.execute(query).fetchall():
# To find the field name from the query
pattern = r"(?i)\bname\s*=\s*'([^']+)'"
query_name = re.search(pattern, query).group(1)
query_version = None
pattern_version = r"\bversion\b"
query_version = re.search(pattern_version, query)
if query_version:
query_name = 'version'
else:
# Try to find the field name from the query
pattern = r"(?i)\bname\s*=\s*'([^']+)'"
query_name = re.search(pattern, query).group(1)
try:
yield self._queries_processor(row, query_name)
except Exception as e:
Expand All @@ -81,7 +88,6 @@ def _execute_query_raw(self, query):
def _queries_processor(self, row, query_name):
# type: (Sequence, AnyStr) -> Sequence
unprocessed_row = row

# Return database version
if query_name == 'version':
self.submit_version(row)
Expand All @@ -93,20 +99,22 @@ def _queries_processor(self, row, query_name):
@contextmanager
def connect(self):
conn = None
try:
# Try to establish the connection
conn = duckdb.connect(self.db_name, read_only=True)
self.log.info('Connected to DuckDB database.')
yield conn
except Exception as e:
if 'Conflicting lock' in str(e):
self.log.error('Lock conflict detected')
else:
self.log.error('Unable to connect to DuckDB database. %s.', e)
raise e
finally:
if conn:
conn.close()
# Only attempt connection if the file exists
if os.path.exists(self.db_name):
try:
# Try to establish the connection
conn = duckdb.connect(self.db_name, read_only=True)
self.log.info('Connected to DuckDB database.')
yield conn
except Exception as e:
if 'Conflicting lock' in str(e):
self.log.error('Lock conflict detected')
else:
self.log.error('Unable to connect to DuckDB database. %s.', e)
raise e
finally:
if conn:
conn.close()

def initialize_config(self):
self._connect_params = json.dumps(
Expand Down Expand Up @@ -136,7 +144,7 @@ def submit_version(self, row):
try:
duckdb_version_row = row[0]
duckdb_version = duckdb_version_row[1:]
version_split = duckdb_version(".")
version_split = duckdb_version.split('.')

if len(version_split) >= 3:
major = version_split[0]
Expand All @@ -159,6 +167,5 @@ def submit_version(self, row):
def _executor_error_handler(self, error):
# type: (AnyStr) -> AnyStr
self.log.debug('Error from query "%s"', error)

self._query_errors += 1
return error
44 changes: 32 additions & 12 deletions duckdb/datadog_checks/duckdb/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,16 @@
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)

# TABLE_STATS = {
# 'name': 'tables_number',
# 'query': "SELECT table_catalog, COUNT(*) AS num_tables FROM information_schema.tables "
# "GROUP BY table_catalog WHERE table_catalog='{}';",
# 'columns': [
# {'name': 'table_catalog', 'type': 'tag'},
# {'name': 'num_tables', 'type': 'gauge'},
# ],
# }

DUCKDB_VERSION = {
'name': 'duckdb_version',
'name': 'version',
'query': "SELECT version();",
'columns': [{'name': 'version', 'type': 'source'}],
}

DUCKDDB_WAL = {
'name': 'wal_autocheckpoint',
'query': " SELECT CAST(SUBSTR(value, 1, LENGTH(value) - 3) AS INTEGER) * "
'query': " SELECT CAST(SUBSTR(value, 1, LENGTH(value) - 3) AS BIGINT) * "
"CASE "
" WHEN RIGHT(value, 3) = 'KiB' THEN 1024 "
" WHEN RIGHT(value, 3) = 'MiB' THEN 1024 * 1024 "
Expand All @@ -38,4 +29,33 @@
'columns': [{'name': 'worker_threads', 'type': 'gauge'}],
}

DEFAULT_QUERIES = [DUCKDDB_THREADS, DUCKDDB_WAL, DUCKDB_VERSION]

DUCKDB_MEMORY_LIMIT = {
'name': 'memory_limit',
'query': " SELECT CAST(SUBSTR(value, 1, LENGTH(value) - 3) AS BIGINT) * "
"CASE "
" WHEN RIGHT(value, 3) = 'KiB' THEN 1024 "
" WHEN RIGHT(value, 3) = 'MiB' THEN 1024 * 1024 "
" WHEN RIGHT(value, 3) = 'GiB' THEN 1024 * 1024 * 1024 "
" WHEN RIGHT(value, 3) = 'TiB' THEN 1024 * 1024 * 1024 * 1024 "
" ELSE 1 "
" END AS value_in_bytes FROM duckdb_settings() WHERE name = 'memory_limit';",
'columns': [{'name': 'memory_limit', 'type': 'gauge'}],
}


DUCKDB_PART_WRITE_FLUSH_THRESHOLD = {
'name': 'partitioned_write_flush_threshold',
'query': " SELECT CAST(value AS INTEGER) AS value_as_integer "
" FROM duckdb_settings() WHERE name = 'partitioned_write_flush_threshold';",
'columns': [{'name': 'partitioned_write_flush_threshold', 'type': 'gauge'}],
}

DUCKDB_PART_WRITE_MAX_OPEN_FILES = {
'name': 'partitioned_write_max_open_files',
'query': " SELECT CAST(value AS INTEGER) AS value_as_integer "
" FROM duckdb_settings() WHERE name = 'partitioned_write_max_open_files';",
'columns': [{'name': 'partitioned_write_max_open_files', 'type': 'gauge'}],
}

DEFAULT_QUERIES = [DUCKDB_VERSION,DUCKDDB_THREADS, DUCKDDB_WAL, DUCKDB_MEMORY_LIMIT, DUCKDB_PART_WRITE_FLUSH_THRESHOLD , DUCKDB_PART_WRITE_MAX_OPEN_FILES ]
5 changes: 4 additions & 1 deletion duckdb/tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from datadog_checks.dev import get_here

HERE = get_here()
DB_NAME = 'data/sample_1.db'
DB_NAME = 'data/sample.db'

DB = os.path.join(HERE, DB_NAME)

Expand All @@ -15,4 +15,7 @@
METRICS_MAP = [
'duckdb.worker_threads',
'duckdb.wal_autocheckpoint',
'duckdb.memory_limit',
'duckdb.partitioned_write_flush_threshold',
'duckdb.partitioned_write_max_open_files',
]
7 changes: 0 additions & 7 deletions duckdb/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,6 @@
from . import common


@pytest.fixture(scope='session')
def connection_db():
db_file_path = os.path.join(common.HERE, common.DB_NAME)
connection = duckdb.connect(db_file_path)
return connection


@pytest.fixture(scope='session')
def dd_environment():
yield common.DEFAULT_INSTANCE
Expand Down
Binary file removed duckdb/tests/data/sample_1.db
Binary file not shown.
17 changes: 0 additions & 17 deletions duckdb/tests/test_e2e.py

This file was deleted.

37 changes: 37 additions & 0 deletions duckdb/tests/test_integration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# (C) Datadog, Inc. 2024-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
import pytest

from datadog_checks.base.constants import ServiceCheck
from datadog_checks.dev.utils import assert_service_checks
from datadog_checks.dev.utils import get_metadata_metrics
from datadog_checks.duckdb import DuckdbCheck

from . import common


def test_check(dd_run_check, aggregator, instance):
instance = common.DEFAULT_INSTANCE
check = DuckdbCheck('duckdb', {}, [instance])
dd_run_check(check)

for metric in common.METRICS_MAP:
aggregator.assert_metric(metric)

def test_version(dd_run_check, aggregator, instance, datadog_agent):
instance = common.DEFAULT_INSTANCE
check = DuckdbCheck('duckdb', {}, [instance])
check.check_id = 'test:123'
raw_version = '1.1.1'
major, minor, patch = raw_version.split('.')
version_metadata = {
'version.scheme': 'semver',
'version.major': major,
'version.minor': minor,
'version.patch': patch,
'version.raw': raw_version,
}
dd_run_check(check)

datadog_agent.assert_metadata('test:123', version_metadata)
3 changes: 2 additions & 1 deletion duckdb/tests/test_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ def test_check(dd_run_check, aggregator, instance):
dd_run_check(check)

aggregator.assert_service_check('duckdb.can_connect', DuckdbCheck.OK)
#aggregator.assert_service_check('duckdb.can_query', DuckdbCheck.OK)

for metric in common.METRICS_MAP:
aggregator.assert_metric(metric)
aggregator.assert_metrics_using_metadata(get_metadata_metrics())

def test_version(dd_run_check, aggregator, instance):
# type: (Callable[[AgentCheck, bool], None], AggregatorStub, Dict[str, Any]) -> None
Expand Down

0 comments on commit b3e0374

Please sign in to comment.