diff --git a/.gitignore b/.gitignore index fe97dd9..dd4d528 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,4 @@ __pycache__ env* venv Pipfile* +.venv diff --git a/pydruid/druidapi/__init__.py b/pydruid/druidapi/__init__.py new file mode 100644 index 0000000..ee702ab --- /dev/null +++ b/pydruid/druidapi/__init__.py @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .druid import DruidClient + +def jupyter_client(endpoint, auth=None) -> DruidClient: + ''' + Create a Druid client configured to display results as HTML withing a Jupyter notebook. + Waits for the cluster to become ready to avoid intermitent problems when using Druid. + ''' + from .html_display import HtmlDisplayClient + druid = DruidClient(endpoint, HtmlDisplayClient(), auth=auth) + druid.status.wait_until_ready() + return druid \ No newline at end of file diff --git a/pydruid/druidapi/base_table.py b/pydruid/druidapi/base_table.py new file mode 100644 index 0000000..d980781 --- /dev/null +++ b/pydruid/druidapi/base_table.py @@ -0,0 +1,143 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +ALIGN_LEFT = 0 +ALIGN_CENTER = 1 +ALIGN_RIGHT = 2 + +def padded(array, width, fill): + if array and len(array) >= width: + return array + if not array: + result = [] + else: + result = array.copy() + return pad(result, width, fill) + +def pad(array, width, fill): + for _ in range(len(array), width): + array.append(fill) + return array + +def infer_keys(data): + if type(data) is list: + data = data[0] + keys = {} + for key in data.keys(): + keys[key] = key + return keys + +class BaseTable: + + def __init__(self): + self._headers = None + self._align = None + self._col_fmt = None + self.sample_size = 10 + self._rows = None + + def headers(self, headers): + self._headers = headers + + def rows(self, rows): + self._rows = rows + + def alignments(self, align): + self._align = align + + def col_format(self, col_fmt): + self._col_fmt = col_fmt + + def row_width(self, rows): + max_width = 0 + min_width = None + if self._headers: + max_width = len(self._headers) + min_width = max_width + for row in rows: + max_width = max(max_width, len(row)) + min_width = max_width if min_width is None else min(min_width, max_width) + min_width = max_width if min_width is None else min_width + return (min_width, max_width) + + def find_alignments(self, rows, width): + align = padded(self._align, width, None) + unknown_count = 0 + for v in align: + if v is None: + unknown_count += 1 + if unknown_count == 0: + return align + for row in rows: + for i in range(len(row)): + if align[i] is not None: + continue + v = row[i] + if v is None: + continue + if type(v) is str: + align[i] = ALIGN_LEFT + else: + align[i] = ALIGN_RIGHT + unknown_count -= 1 + if unknown_count == 0: + return align + for i in range(width): + if align[i] is None: + align[i] = ALIGN_LEFT + return align + + def pad_rows(self, rows, width): + new_rows = [] + for row in rows: + new_rows.append(padded(row, width, None)) + return new_rows + + def pad_headers(self, width): + if not self._headers: + return None + if len(self._headers) == 0: + return None + has_none = False + for i in range(len(self._headers)): + if not self._headers[i]: + has_none = True + break + if len(self._headers) >= width and not has_none: + return self._headers + headers = self._headers.copy() + if has_none: + for i in range(len(headers)): + if not headers[i]: + headers[i] = '' + return pad(headers, width, '') + + def from_object_list(self, objects, cols=None): + cols = infer_keys(objects) if not cols else cols + self._rows = [] + for obj in objects: + row = [] + for key in cols.keys(): + row.append(obj.get(key)) + self._rows.append(row) + self.headers([head for head in cols.values()]) + self.alignments(self.find_alignments(self._rows, len(self._rows))) + + def from_object(self, obj, labels=None): + labels = infer_keys(obj) if not labels else labels + self._rows = [] + for key, head in labels.items(): + self._rows.append([head, obj.get(key)]) + self.headers(['Key', 'Value']) diff --git a/pydruid/druidapi/basic_auth.py b/pydruid/druidapi/basic_auth.py new file mode 100644 index 0000000..894a137 --- /dev/null +++ b/pydruid/druidapi/basic_auth.py @@ -0,0 +1,238 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +BASIC_AUTH_BASE = '/druid-ext/basic-security' + +AUTHENTICATION_BASE = BASIC_AUTH_BASE + '/authentication' +REQ_AUTHENTICATION_LOAD_STATUS = AUTHENTICATION_BASE + '/loadStatus' +REQ_AUTHENTICATION_REFRESH_ALL = AUTHENTICATION_BASE + '/refreshAll' +AUTHENTICATOR_BASE = AUTHENTICATION_BASE + '/db/{}' +REQ_AUTHENTICATION_USERS = AUTHENTICATOR_BASE + '/users' +REQ_AUTHENTICATION_USER = REQ_AUTHENTICATION_USERS + '/{}' +REQ_AUTHENTICATION_CREDENTIALS = REQ_AUTHENTICATION_USER + '/credentials' + +AUTHORIZATION_BASE = BASIC_AUTH_BASE + '/authorization' +REQ_AUTHORIZATION_LOAD_STATUS = AUTHORIZATION_BASE + '/loadStatus' +REQ_AUTHORIZATION_REFRESH_ALL = AUTHORIZATION_BASE + '/refreshAll' +AUTHORIZATION_BASE = AUTHORIZATION_BASE + '/db/{}' +REQ_AUTHORIZATION_USERS = AUTHORIZATION_BASE + '/users' +REQ_AUTHORIZATION_USER = REQ_AUTHORIZATION_USERS + '/{}' +REQ_AUTHORIZATION_USER_ROLES = REQ_AUTHORIZATION_USER + '/roles' +REQ_AUTHORIZATION_USER_ROLE = REQ_AUTHORIZATION_USER_ROLES + '/{}' +REQ_AUTHORIZATION_GROUP_MAPPINGS = AUTHORIZATION_BASE + '/groupMappings' +REQ_AUTHORIZATION_GROUP_MAPPING = AUTHORIZATION_BASE + '/groupMappings/{}' +REQ_AUTHORIZATION_GROUP_ROLES = REQ_AUTHORIZATION_GROUP_MAPPING + '/roles' +REQ_AUTHORIZATION_GROUP_ROLE = REQ_AUTHORIZATION_GROUP_ROLES + '/{}' +REQ_AUTHORIZATION_ROLES = AUTHORIZATION_BASE + '/roles' +REQ_AUTHORIZATION_ROLE = REQ_AUTHORIZATION_ROLES + '/{}' +REQ_AUTHORIZATION_ROLE_PERMISSIONS = REQ_AUTHORIZATION_ROLE + '/permissions' +REQ_USER_MAP = AUTHORIZATION_BASE + '/cachedSerializedUserMap' + +class BasicAuthClient: + ''' + Manage basic security. The Druid session must be logged in with the super + user, or some other user who has permission to modify user credentials. + + Each client works with one authorizer/authenticator pair. Create multiple clients if you have to + work with multiple authenticators on a single server. + + The basic pattern to add users and permissions is: + + ``` + # Create a client for your coordinator (Basic auth is not proxied through the router) + coord = druidapi.jupyter_client('http://localhost:8081', auth=('admin', 'password')) + + # Get a client for your authenticator and authorizer: + ac = coord.basic_security('yourAuthorizer', 'yourAuthenticator') + + # Create a user in both the authenticator and authorizer + ac.add_user('bob', 'secret') + + # Define a role + ac.add_role('myRole') + + # Assign the role to the user + ac.assign_role_to_user('myRole', 'bob') + + # Give the role some permissions + ac.grant_permissions('myRole', [[consts.DATASOURCE_RESOURCE, 'foo', consts.READ_ACTION]]) + ``` + + Then use the various other methods to list users, roles, and permissions to verify the + setup. You can then create a second Druid client that acts as the new user: + + ``` + bob_client = druidapi.jupyter_client('http://localhost:8888', auth=('bob', 'secret')) + ``` + + See https://druid.apache.org/docs/latest/operations/security-overview.html#enable-authorizers + ''' + + def __init__(self, rest_client, authenticator, authorizer=None): + self.rest_client = rest_client + self.authenticator = authenticator + self.authorizer = authorizer if authorizer else authenticator + + # Authentication + + def authentication_status(self) -> dict: + return self.rest_client.get_json(REQ_AUTHENTICATION_LOAD_STATUS) + + def authentication_refresh(self) -> None: + self.rest_client.get(REQ_AUTHENTICATION_REFRESH_ALL) + + def create_authentication_user(self, user) -> None: + self.rest_client.post(REQ_AUTHENTICATION_USER, None, args=[self.authenticator, user]) + + def set_password(self, user, password) -> None: + self.rest_client.post_only_json(REQ_AUTHENTICATION_CREDENTIALS, {'password': password}, args=[self.authenticator, user]) + + def drop_authentication_user(self, user) -> None: + self.rest_client.delete(REQ_AUTHENTICATION_USER, args=[self.authenticator, user]) + + def authentication_user(self, user) -> dict: + return self.rest_client.get_json(REQ_AUTHENTICATION_USER, args=[self.authenticator, user]) + + def authentication_users(self) -> list: + return self.rest_client.get_json(REQ_AUTHENTICATION_USERS, args=[self.authenticator]) + + # Authorization + # Groups are not documented. Use at your own risk. + + def authorization_status(self) -> dict: + return self.rest_client.get_json(REQ_AUTHORIZATION_LOAD_STATUS) + + def authorization_refresh(self) -> None: + self.rest_client.get(REQ_AUTHORIZATION_REFRESH_ALL) + + def create_authorization_user(self, user) -> None: + self.rest_client.post(REQ_AUTHORIZATION_USER, None, args=[self.authorizer, user]) + + def drop_authorization_user(self, user) -> None: + self.rest_client.delete(REQ_AUTHORIZATION_USER, args=[self.authenticator, user]) + + def authorization_user(self, user) -> dict: + return self.rest_client.get_json(REQ_AUTHORIZATION_USER, args=[self.authorizer, user]) + + def authorization_users(self) -> list: + return self.rest_client.get_json(REQ_AUTHORIZATION_USERS, args=[self.authorizer]) + + def create_group(self, group, payload): + self.rest_client.post_json(REQ_AUTHORIZATION_GROUP_MAPPING, payload, args=[self.authorizer, group]) + + def drop_group(self, group): + self.rest_client.delete(REQ_AUTHORIZATION_GROUP_MAPPING, args=[self.authorizer, group]) + + def groups(self) -> dict: + return self.rest_client.get_json(REQ_AUTHORIZATION_GROUP_MAPPINGS, args=[self.authorizer]) + + def group(self, group) -> dict: + return self.rest_client.get_json(REQ_AUTHORIZATION_GROUP_MAPPING, args=[self.authorizer, group]) + + def roles(self): + return self.rest_client.get_json(REQ_AUTHORIZATION_ROLES, args=[self.authenticator]) + + def add_role(self, role): + self.rest_client.post(REQ_AUTHORIZATION_ROLE, None, args=[self.authenticator, role]) + + def drop_role(self, role): + self.rest_client.delete(REQ_AUTHORIZATION_ROLE, args=[self.authorizer, role]) + + def set_role_permissions(self, role, permissions): + self.rest_client.post_only_json(REQ_AUTHORIZATION_ROLE_PERMISSIONS, permissions, args=[self.authenticator, role]) + + def role_permissions(self, role): + return self.rest_client.get_json(REQ_AUTHORIZATION_ROLE_PERMISSIONS, args=[self.authenticator, role]) + + def assign_role_to_user(self, role, user): + self.rest_client.post(REQ_AUTHORIZATION_USER_ROLE, None, args=[self.authenticator, user, role]) + + def revoke_role_from_user(self, role, user): + self.rest_client.delete(REQ_AUTHORIZATION_USER_ROLE, args=[self.authenticator, user, role]) + + def assign_role_to_group(self, group, role): + self.rest_client.post(REQ_AUTHORIZATION_GROUP_ROLE, None, args=[self.authenticator, group, role]) + + def revoke_role_from_group(self, group, role): + self.rest_client.delete(REQ_AUTHORIZATION_GROUP_ROLE, args=[self.authenticator, group, role]) + + def user_map(self): + # Result uses Smile encoding, not JSON. This is really just for sanity + # checks: a Python client can't make use of the info. + # To decode, see newsmile: https://pypi.org/project/newsmile/ + # However, the format Druid returns is not quite compatible with newsmile + return self.rest_client.get(REQ_USER_MAP, args=[self.authenticator]) + + # Convenience methods + + def add_user(self, user, password): + ''' + Adds a user to both the authenticator and authorizer. + ''' + self.create_authentication_user(user) + self.set_password(user, password) + self.create_authorization_user(user) + + def drop_user(self, user): + ''' + Drops a user from both the authenticator and authorizer. + ''' + self.drop_authorization_user(user) + self.drop_authentication_user(user) + + def users(self): + ''' + Returns the list of authenticator and authorizer users. + ''' + return { + "authenticator": self.authentication_users(), + "authorizer": self.authorization_users() + } + + def status(self): + ''' + Returns both the authenticator and authorizer status. + ''' + return { + "authenticator": self.authentication_status(), + "authorizer": self.authorization_status() + } + + def resource(self, type, name): + return { + 'type': type, + 'name': name + } + + def action(self, resource, action): + return { + 'resource': resource, + 'action': action + } + + def resource_action(self, type, name, action): + return self.action(self.resource(type, name), action) + + def grant_permissions(self, role, triples): + ''' + Set the permissions for a role given an array of triples of the form + `[[type, name, action], ...]`. + + Overwrites any existing permissions. + ''' + perms = [] + for triple in triples: + perms.append(self.resource_action(triple[0], triple[1], triple[2])) + self.set_role_permissions(role, perms) diff --git a/pydruid/druidapi/catalog.py b/pydruid/druidapi/catalog.py new file mode 100644 index 0000000..c9252f6 --- /dev/null +++ b/pydruid/druidapi/catalog.py @@ -0,0 +1,64 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import requests +from .consts import COORD_BASE +from .rest import check_error + +# Catalog (new feature in Druid 26) +CATALOG_BASE = COORD_BASE + '/catalog' +REQ_CAT_SCHEMAS = CATALOG_BASE + '/schemas' +REQ_CAT_SCHEMA = REQ_CAT_SCHEMAS + '/{}' +REQ_CAT_SCHEMA_TABLES = REQ_CAT_SCHEMA + '/tables' +REQ_CAT_SCHEMA_TABLE = REQ_CAT_SCHEMA_TABLES + '/{}' +REQ_CAT_SCHEMA_TABLE_EDIT = REQ_CAT_SCHEMA_TABLE + '/edit' + +class CatalogClient: + ''' + Client for the Druid catalog feature that provides metadata for tables, + including both datasources and external tables. + ''' + + def __init__(self, rest_client): + self.client = rest_client + + def post_table(self, schema, table_name, table_spec, version=None, overwrite=None): + params = {} + if version: + params['version'] = version + if overwrite is not None: + params['overwrite'] = overwrite + return self.client.post_json(REQ_CAT_SCHEMA_TABLE, table_spec, args=[schema, table_name], params=params) + + def create(self, schema, table_name, table_spec): + self.post_table(schema, table_name, table_spec) + + def table(self, schema, table_name): + return self.client.get_json(REQ_CAT_SCHEMA_TABLE, args=[schema, table_name]) + + def drop_table(self, schema, table_name, if_exists=False): + r = self.client.delete(REQ_CAT_SCHEMA_TABLE, args=[schema, table_name]) + if if_exists and r.status_code == requests.codes.not_found: + return + check_error(r) + + def edit_table(self, schema, table_name, action): + return self.client.post_json(REQ_CAT_SCHEMA_TABLE_EDIT, action, args=[schema, table_name]) + + def schema_names(self): + return self.client.get_json(REQ_CAT_SCHEMAS) + + def tables_in_schema(self, schema, list_format='name'): + return self.client.get_json(REQ_CAT_SCHEMA_TABLES, args=[schema], params={'format': list_format}) diff --git a/pydruid/druidapi/client.py b/pydruid/druidapi/client.py new file mode 100644 index 0000000..1ead9d6 --- /dev/null +++ b/pydruid/druidapi/client.py @@ -0,0 +1,28 @@ +from .druid import DruidClient + +class Client: + def __init__(self, druid=None) -> DruidClient: + # If the client is None, it must be backfilled by the caller. + # This case occurs only when creating the DruidClient to avoid + # a circular dependency. + self._druid = druid + + + def client(endpoint, auth=None) : + ''' + Create a Druid client for use in Python scripts that uses a text-based format for + displaying results. Does not wait for the cluster to be ready: clients should call + `status().wait_until_ready()` before making other Druid calls if there is a chance + that the cluster has not yet fully started. + ''' + return DruidClient(endpoint, auth=auth) + + def jupyter_client(endpoint, auth=None) -> DruidClient: + ''' + Create a Druid client configured to display results as HTML within a Jupyter notebook. + Waits for the cluster to become ready to avoid intermittent problems when using Druid. + ''' + from .html_display import HtmlDisplayClient + druid = DruidClient(endpoint, HtmlDisplayClient(), auth=auth) + druid.status.wait_until_ready() + return druid \ No newline at end of file diff --git a/pydruid/druidapi/consts.py b/pydruid/druidapi/consts.py new file mode 100644 index 0000000..79d1991 --- /dev/null +++ b/pydruid/druidapi/consts.py @@ -0,0 +1,64 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +COORD_BASE = '/druid/coordinator/v1' +ROUTER_BASE = '/druid/v2' +OVERLORD_BASE = '/druid/indexer/v1' + +# System schemas and table names. Note: case must match in Druid, though +# SQL itself is supposed to be case-insensitive. +SYS_SCHEMA = 'sys' +INFORMATION_SCHEMA = 'INFORMATION_SCHEMA' +DRUID_SCHEMA = 'druid' +EXT_SCHEMA = 'ext' + +# Information Schema tables +SCHEMA_TABLE = INFORMATION_SCHEMA + '.SCHEMATA' +TABLES_TABLE = INFORMATION_SCHEMA + '.TABLES' +COLUMNS_TABLE = INFORMATION_SCHEMA + '.COLUMNS' + +# SQL request formats +SQL_OBJECT = 'object' +SQL_ARRAY = 'array' +SQL_ARRAY_WITH_TRAILER = 'arrayWithTrailer' +SQL_CSV = 'csv' + +# Type names as known to Druid and mentioned in documentation. +DRUID_STRING_TYPE = 'string' +DRUID_LONG_TYPE = 'long' +DRUID_FLOAT_TYPE = 'float' +DRUID_DOUBLE_TYPE = 'double' +DRUID_TIMESTAMP_TYPE = 'timestamp' + +# SQL type names as returned from the INFORMATION_SCHEMA +SQL_VARCHAR_TYPE = 'VARCHAR' +SQL_BIGINT_TYPE = 'BIGINT' +SQL_FLOAT_TYPE = 'FLOAT' +SQL_DOUBLE_TYPE = 'DOUBLE' +SQL_TIMESTAMP_TYPE = 'TIMESTAMP' +SQL_ARRAY_TYPE = 'ARRAY' + +# Task status code +RUNNING_STATE = 'RUNNING' +SUCCESS_STATE = 'SUCCESS' +FAILED_STATE = 'FAILED' + +# Resource constants +DATASOURCE_RESOURCE = 'DATASOURCE' +STATE_RESOURCE = 'STATE' +CONFIG_RESOURCE = 'CONFIG' +EXTERNAL_RESOURCE = 'EXTERNAL' +READ_ACTION = 'READ' +WRITE_ACTION = 'WRITE' diff --git a/pydruid/druidapi/datasource.py b/pydruid/druidapi/datasource.py new file mode 100644 index 0000000..3db25cf --- /dev/null +++ b/pydruid/druidapi/datasource.py @@ -0,0 +1,86 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import requests, time +from .consts import COORD_BASE +from .rest import check_error +from .util import dict_get +from .error import DruidError, ClientError +import json + +REQ_DATASOURCES = COORD_BASE + '/datasources' +REQ_DATASOURCE = REQ_DATASOURCES + '/{}' + +# Segment load status +REQ_DATASOURCES = COORD_BASE + '/datasources' +REQ_DS_LOAD_STATUS = REQ_DATASOURCES + '/{}/loadstatus' + +class DatasourceClient: + ''' + Client for datasource APIs. Prefer to use SQL to query the + INFORMATION_SCHEMA to obtain information. + + See https://druid.apache.org/docs/latest/api-reference/api-reference.html#datasources + ''' + + def __init__(self, rest_client): + self.rest_client = rest_client + + def drop(self, ds_name, if_exists=False): + ''' + Drops a data source. + + Marks as unused all segments belonging to a datasource. + + Marking all segments as unused is equivalent to dropping the table. + + Parameters + ---------- + ds_name: str + The name of the datasource to query + + Returns + ------- + Returns a map of the form + {"numChangedSegments": } with the number of segments in the database whose + state has been changed (that is, the segments were marked as unused) as the result + of this API call. + + Reference + --------- + `DELETE /druid/coordinator/v1/datasources/{dataSourceName}` + ''' + r = self.rest_client.delete(REQ_DATASOURCE, args=[ds_name]) + if if_exists and r.status_code == requests.codes.not_found: + return + check_error(r) + + def load_status_req(self, ds_name, params=None): + response = self.rest_client.get(REQ_DS_LOAD_STATUS, args=[ds_name], params=params) + if len(response.text)==0: + raise ClientError(f'Table "{ds_name}" not found.') + return json.loads(response.text) + + def load_status(self, ds_name): + return self.load_status_req(ds_name, { + 'forceMetadataRefresh': 'true', + 'interval': '1970-01-01/2999-01-01'}) + + def wait_until_ready(self, ds_name): + while True: + resp = self.load_status(ds_name) + if dict_get(resp, ds_name) == 100.0: + return + time.sleep(0.5) diff --git a/pydruid/druidapi/display.py b/pydruid/druidapi/display.py new file mode 100644 index 0000000..43625ff --- /dev/null +++ b/pydruid/druidapi/display.py @@ -0,0 +1,178 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from . import consts +import time + +class DisplayClient: + ''' + Abstract base class to display various kinds of results. + ''' + + def __init__(self, druid=None): + # If the client is None, it must be backfilled by the caller. + # This case occurs only when creating the DruidClient to avoid + # a circular depencency. + self._druid = druid + + # Basic display operations + + def text(self, msg): + raise NotImplementedError() + + def alert(self, msg): + raise NotImplementedError() + + def error(self, msg): + raise NotImplementedError() + + # Tabular formatting + + def new_table(self): + raise NotImplementedError() + + def show_table(self, table): + raise NotImplementedError() + + def data_table(self, rows, cols=None): + ''' + Display a table of data with the optional column headings. + + Parameters + ---------- + objects: list[list] + The data to display as a list of lists, where each inner list represents one + row of data. Rows should be of the same width: ragged rows will display blank + cells. Data can be of any scalar type and is formatted correctly for that type. + + cols: list[str] + Optional list of column headings. + ''' + table = self.new_table() + table.rows(rows) + table.headers(cols) + self.show_table(table) + + def object_list(self, objects, cols=None): + ''' + Display a list of objects represented as dictionaries with optional headings. + + Parameters + ---------- + objects: list[dict] + List of dictionaries: one dictionary for each row. + + cols: dict, Default = None + A list of column headings in the form `{'key': 'label'}` + ''' + table = self.new_table() + table.from_object_list(objects, cols) + self.show_table(table) + + def object(self, obj, labels=None): + ''' + Display a single object represented as a dictionary with optional headings. + The object is displayed in two columns: keys and values. + + Parameters + ---------- + objects: list[dict] + List of dictionaries: one dictionary for each row. + + labels: list, Default = None + A list of column headings in the form `['key', 'value']`. Default headings + are used if the lables are not provided. + ''' + table = self.new_table() + table.from_object(obj, labels) + self.show_table(table) + + # SQL formatting + + def sql(self, sql): + ''' + Run a query and display the result as a table. + + Parameters + ---------- + query + The query as either a string or a SqlRequest object. + ''' + self._druid.sql.sql_query(sql).show(display=self) + + def table(self, table_name): + ''' + Describe a table by returning the list of columns in the table. + + Parameters + ---------- + table_name str + The name of the table as either "table" or "schema.table". + If the form is "table", then the 'druid' schema is assumed. + ''' + self._druid.sql._schema_query(table_name).show(display=self) + + def function(self, table_name): + ''' + Retrieve the list of parameters for a partial external table defined in + the Druid catalog. + + Parameters + ---------- + table_name str + The name of the table as either "table" or "schema.table". + If the form is "table", then the 'ext' schema is assumed. + ''' + return self._druid.sql._function_args_query(table_name).show(display=self) + + def schemas(self): + ''' + Display the list of schemas available in Druid. + ''' + self._druid.sql._schemas_query().show() + + def tables(self, schema=consts.DRUID_SCHEMA): + self._druid.sql._tables_query(schema).show(display=self) + + def run_task(self, query): + ''' + Run an MSQ task while displaying progress in the cell output. + :param query: INSERT/REPLACE statement to run + :return: None + ''' + from tqdm import tqdm + + task = self._druid.sql.task(query) + with tqdm(total=100.0) as pbar: + previous_progress = 0.0 + current_progress = 0.0 + while True: + reports=task.reports_no_wait() + # check if progress metric is available and display it + try: + current_progress = reports['multiStageQuery']['payload']['counters']['0']['0']['sortProgress']['progressDigest']*100.0 + except Exception: + pass + pbar.update( current_progress - previous_progress ) # update requires a relative value + previous_progress = current_progress + # present status if available + try: + pbar.set_description(f"Loading data, status:[{reports['multiStageQuery']['payload']['status']['status']}]") + # stop when job is done + if reports['multiStageQuery']['payload']['status']['status'] in ['SUCCESS', 'FAILED']: + break; + except Exception: + pbar.set_description('Initializing...') + time.sleep(1) diff --git a/pydruid/druidapi/druid.py b/pydruid/druidapi/druid.py new file mode 100644 index 0000000..ebf8cb1 --- /dev/null +++ b/pydruid/druidapi/druid.py @@ -0,0 +1,160 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .rest import DruidRestClient +from .status import StatusClient +from .catalog import CatalogClient +from .sql import QueryClient +from .tasks import TaskClient +from .datasource import DatasourceClient +from .basic_auth import BasicAuthClient + +class DruidClient: + ''' + Client for a Druid cluster. Functionality is split into a number of + specialized "clients" that group many of Druid's REST API calls. + ''' + + def __init__(self, router_endpoint, display_client=None, auth=None): + self.rest_client = DruidRestClient(router_endpoint, auth=auth) + self.status_client = None + self.catalog_client = None + self.sql_client = None + self.tasks_client = None + self.datasource_client = None + if display_client: + self.display_client = display_client + else: + from .text_display import TextDisplayClient + self.display_client = TextDisplayClient() + self.display_client._druid = self + + @property + def rest(self): + ''' + Returns the low-level REST client. Useful for debugging and to access REST API + calls not yet wrapped by the various function-specific clients. + + If you find you need to use this, consider creating a wrapper function in Python + and contributing it to Druid via a pull request. + ''' + return self.rest_client + + def trace(self, enable=True): + ''' + Enable or disable tracing. When enabled, the Druid client prints the + URL and payload for each REST API call. Useful for debugging, or if you want + to learn what the code does so you can replicate it in your own client. + ''' + self.rest_client.enable_trace(enable) + + @property + def status(self) -> StatusClient: + ''' + Returns the status client for the Router service. + ''' + if not self.status_client: + self.status_client = StatusClient(self.rest_client) + return self.status_client + + def status_for(self, endpoint) -> StatusClient: + ''' + Returns the status client for a Druid service. + + Parameters + ---------- + endpoint: str + The URL for a Druid service. + ''' + return StatusClient(DruidRestClient(endpoint), True) + + @property + def catalog(self) -> CatalogClient: + ''' + Returns the catalog client to interact with the Druid catalog. + ''' + if not self.catalog_client: + self.catalog_client = CatalogClient(self.rest_client) + return self.catalog_client + + @property + def sql(self) -> QueryClient: + ''' + Returns the SQL query client to submit interactive or MSQ queries. + ''' + if not self.sql_client: + self.sql_client = QueryClient(self) + return self.sql_client + + @property + def tasks(self) -> TaskClient: + ''' + Returns the Overlord tasks client to submit and track tasks. + ''' + if not self.tasks_client: + self.tasks_client = TaskClient(self.rest_client) + return self.tasks_client + + @property + def datasources(self) -> DatasourceClient: + ''' + Returns the Coordinator datasources client to manipulate datasources. + Prefer to use the SQL client to query the INFORMATION_SCHEMA to obtain + information about datasources. + ''' + if not self.datasource_client: + self.datasource_client = DatasourceClient(self.rest_client) + return self.datasource_client + + def basic_security(self, authenticator, authorizer=None): + ''' + Returns a client to work with a basic authorization authenticator/authorizer pair. + This client assumes the typical case of one authenticator and one authorizer. If + you have more than one, create multiple clients. + + The basic security API is not proxied through the Router: it must work directly with + the Coordinator. Create an ad hoc Druid client for your Coordinator. Because you have + basic security enabled, you must specify the admin user and password: + + ``` + coord = druidapi.jupyter_client('http://localhost:8081', auth=('admin', 'admin-pwd')) + ac = coord.basic_security('yourAuthenticator', 'yourAuthorizer') + ``` + + Parameters + ---------- + authenticator: str + Authenticator name as set in the `druid.auth.authenticatorChain` + runtime property. + + authorizer: str, default = same as authenticator + Authorizer name as set in the `druid.auth.authorizers` runtime property. + Defaults to the same name as the `authenticator` parameter for simple cases. + ''' + return BasicAuthClient(self.rest_client, authenticator, authorizer) + + @property + def display(self): + return self.display_client + + def close(self): + self.rest_client.close() + self.rest_client = None + self.catalog_client = None + self.tasks_client = None + self.datasource_client = None + self.sql_client = None + + diff --git a/pydruid/druidapi/error.py b/pydruid/druidapi/error.py new file mode 100644 index 0000000..8e1af52 --- /dev/null +++ b/pydruid/druidapi/error.py @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +class ClientError(Exception): + ''' + Indicates an error with usage of the Python API. + ''' + + def __init__(self, msg): + self.message = msg + +class DruidError(Exception): + ''' + Indicates that something went wrong on Druid, typically as the result of a + request that this client sent. + ''' + + def __init__(self, msg): + self.message = msg diff --git a/pydruid/druidapi/html_display.py b/pydruid/druidapi/html_display.py new file mode 100644 index 0000000..e61a6c9 --- /dev/null +++ b/pydruid/druidapi/html_display.py @@ -0,0 +1,141 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from IPython.display import display, HTML +from html import escape +from .display import DisplayClient +from .base_table import BaseTable + +class HtmlDisplayClient(DisplayClient): + + def __init__(self): + DisplayClient.__init__(self) + global initialized + if not initialized: + display(HTML(STYLES)) + initialized = True + + def text(self, msg): + html('
' + escape_for_html(msg) + '
') + + def alert(self, msg): + html('
' + escape_for_html(msg.replace('\n', '
')) + '
') + + def error(self, msg): + html('
ERROR: ' + escape_for_html(msg.replace('\n', '
')) + '
') + + def new_table(self): + return HtmlTable() + + def show_table(self, table): + self.text(table.format()) + +STYLES = ''' + +''' + +def escape_for_html(s): + # Annoying: IPython treats $ as the start of Latex, which is cool, + # but not wanted here. + return s.replace('$', '\\$') + +def html(s): + display(HTML(s)) + +initialized = False + +alignments = ['druid-left', 'druid-center', 'druid-right'] + +def start_tag(tag, align): + s = '<' + tag + if align: + s += ' class="{}"'.format(alignments[align]) + return s + '>' + +class HtmlTable(BaseTable): + + def __init__(self): + BaseTable.__init__(self) + + def widths(self, widths): + self._widths = widths + + def format(self) -> str: + if not self._rows and not self._headers: + return '' + _, width = self.row_width(self._rows) + headers = self.pad_headers(width) + rows = self.pad_rows(self._rows, width) + s = '\n' + s += self.gen_header(headers) + s += self.gen_rows(rows) + return s + '\n
' + + def gen_header(self, headers): + if not headers: + return '' + s = '' + for i in range(len(headers)): + s += start_tag('th', self.col_align(i)) + escape(headers[i]) + '' + return s + '\n' + + def gen_rows(self, rows): + html_rows = [] + for row in rows: + r = '' + for i in range(len(row)): + r += start_tag('td', self.col_align(i)) + cell = row[i] + value = '' if cell is None else escape(str(cell)) + r += value + '' + html_rows.append(r + '') + return '\n'.join(html_rows) + + def col_align(self, col): + if not self._align: + return None + if col >= len(self._align): + return None + return self._align[col] diff --git a/pydruid/druidapi/rest.py b/pydruid/druidapi/rest.py new file mode 100644 index 0000000..6f5bf5c --- /dev/null +++ b/pydruid/druidapi/rest.py @@ -0,0 +1,291 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import requests +from .util import dict_get +from urllib.parse import quote +from .error import ClientError + +def check_error(response): + ''' + Raises an HttpError from the requests library if the response code is neither + OK (200) nor Accepted (202). + + Druid's REST API is inconsistent with how it reports errors. Some APIs return + an error as a JSON object. Others return a text message. Still others return + nothing at all. With the JSON format, sometimes the error returns an + 'errorMessage' field, other times only a generic 'error' field. + + This method attempts to parse these variations. If the error response JSON + matches one of the known error formats, then raises a `ClientError` with the error + message. Otherise, raises a Requests library `HTTPError` for a generic error. + If the response includes a JSON payload, then the it is returned in the json field + of the `HTTPError` object so that the client can perhaps decode it. + ''' + code = response.status_code + if code == requests.codes.ok or code == requests.codes.accepted: + return + json = None + try: + json = response.json() + except Exception: + # If we can't get the JSON, raise a Requests error + response.raise_for_status() + + # Druid JSON payload. Try to make sense of the error + msg = dict_get(json, 'errorMessage') + if not msg: + msg = dict_get(json, 'error') + if msg: + # We have an explanation from Druid. Raise a Client exception + raise ClientError(msg) + + # Don't know what the Druid JSON is. Raise a Requests exception, but + # add on the JSON in the hopes that the caller can make use of it. + try: + response.raise_for_status() + except Exception as e: + e.json = json + raise e + +def build_url(endpoint, req, args=None) -> str: + ''' + Returns the full URL for a REST call given the relative request API and + optional parameters to fill placeholders within the request URL. + + Parameters + ---------- + endpoint: str + The base URL for the service. + + req: str + Relative URL, with optional {} placeholders + + args: list + Optional list of values to match {} placeholders in the URL. + ''' + url = endpoint + req + if args: + quoted = [quote(arg) for arg in args] + url = url.format(*quoted) + return url + +class DruidRestClient: + ''' + Wrapper around the basic Druid REST API operations using the + requests Python package. Handles the grunt work of building up + URLs, working with JSON, etc. + + The REST client accepts an endpoint that represents a Druid service, typically + the Router. All requests are made to this service, which means using the service + URL as the base. That is, if the service is http://localhost:8888, then + a request for status is just '/status': the methods here build up the URL by + concatenating the service endpoint with the request URL. + ''' + + def __init__(self, endpoint, auth=None): + ''' + Creates a Druid rest client endpoint using the given endpoint URI and + optional authentication. + + Parameters + ---------- + endpoint: str + The Druid router endpoint of the form `'server:port'`. Use + `'localhost:8888'` for a Druid instance running locally. + + auth: str, default = None + Optional authorization credentials in the format described + by the Requests library. For Basic auth use + `auth=('user', 'password')` + ''' + self.endpoint = endpoint + self.trace = False + self.session = requests.Session() + if auth: + self.session.auth = auth + + def enable_trace(self, flag=True): + self.trace = flag + + def build_url(self, req, args=None) -> str: + ''' + Returns the full URL for a REST call given the relative request API and + optional parameters to fill placeholders within the request URL. + + Parameters + ---------- + req: str + Relative URL, with optional {} placeholders + + args: list + Optional list of values to match {} placeholders in the URL. + ''' + return build_url(self.endpoint, req, args) + + def get(self, req, args=None, params=None, require_ok=True) -> requests.Request: + ''' + Generic GET request to this service. + + Parameters + ---------- + req: str + The request URL without host, port or query string. + Example: `/status` + + args: [str], default = None + Optional parameters to fill in to the URL. + Example: `/customer/{}` + + params: dict, default = None + Optional map of query variables to send in + the URL. Query parameters are the name/value pairs + that appear after the `?` marker. + + require_ok: bool, default = True + Whether to require an OK (200) response. If `True`, and + the request returns a different response code, then raises + a `RestError` exception. + + Returns + ------- + The `requests` `Request` object. + ''' + url = self.build_url(req, args) + if self.trace: + print('GET:', url) + r = self.session.get(url, params=params) + if require_ok: + check_error(r) + return r + + def get_json(self, url_tail, args=None, params=None): + ''' + Generic GET request which expects a JSON response. + ''' + r = self.get(url_tail, args, params) + try: + result=r.json() + except Exception as e: + result='{"message":"ERROR: Unable to parse into JSON.", "raw_result":"' + r.text + '"}' + return result + + def post(self, req, body, args=None, headers=None, require_ok=True) -> requests.Response: + ''' + Issues a POST request for the given URL on this + node, with the given payload and optional URL query + parameters. + ''' + url = self.build_url(req, args) + if self.trace: + print('POST:', url) + print('body:', body) + r = self.session.post(url, data=body, headers=headers) + if require_ok: + check_error(r) + return r + + def post_json(self, req, body, args=None, headers=None, params=None) -> requests.Response: + ''' + Issues a POST request for the given URL on this node, with a JSON request. Returns + the JSON response. + + Parameters + ---------- + req: str + URL relative to the service base URL. + + body: any + JSON-encodable Python object to send in the request body. + + args: array[str], default = None + Arguments to include in the relative URL to replace {} markers. + + headers: dict, default = None + Additional HTTP header fields to send in the request. + + params: dict, default = None + Parameters to inlude in the URL as the `?name=value` query string. + + Returns + ------- + The JSON response as a Python object. + + See + --- + `post_only_json()` for the form that returns the response object, not JSON. + ''' + r = self.post_only_json(req, body, args, headers, params) + check_error(r) + return r.json() + + def post_only_json(self, req, body, args=None, headers=None, params=None, require_ok=True) -> requests.Request: + ''' + Issues a POST request for the given URL on this node, with a JSON request, returning + the Requests library `Response` object. + + Parameters + ---------- + req: str + URL relative to the service base URL. + + body: any + JSON-encodable Python object to send in the request body. + + args: array[str], default = None + Arguments to include in the relative URL to replace {} markers. + + headers: dict, default = None + Additional HTTP header fields to send in the request. + + params: dict, default = None + Parameters to inlude in the URL as the `?name=value` query string. + + Returns + ------- + The JSON response as a Python object. + + See + --- + `post_json()` for the form that returns the response JSON. + ''' + url = self.build_url(req, args) + if self.trace: + print('POST:', url) + print('body:', body) + r = self.session.post(url, json=body, headers=headers, params=params) + if require_ok: + check_error(r) + return r + + def delete(self, req, args=None, params=None, headers=None, require_ok=True): + url = self.build_url(req, args) + if self.trace: + print('DELETE:', url) + r = self.session.delete(url, params=params, headers=headers) + if require_ok: + check_error(r) + return r + + def delete_json(self, req, args=None, params=None, headers=None): + return self.delete(req, args=args, params=params, headers=headers).json() + + def close(self): + ''' + Close the session. Use in scripts and tests when the system will otherwise complain + about open sockets. + ''' + self.session.close() + self.session = None diff --git a/pydruid/druidapi/sql.py b/pydruid/druidapi/sql.py new file mode 100644 index 0000000..90650e0 --- /dev/null +++ b/pydruid/druidapi/sql.py @@ -0,0 +1,1128 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time, requests +from . import consts +from .util import dict_get, split_table_name +from .error import DruidError, ClientError + +REQ_SQL = consts.ROUTER_BASE + '/sql' +REQ_SQL_TASK = REQ_SQL + '/task' +REQ_SQL_ASYNC = REQ_SQL + '/statements' + +class SqlRequest: + + def __init__(self, query_client, sql): + self.query_client = query_client + self.sql = sql + self.context = None + self.params = None + self.header = False + self.format = consts.SQL_OBJECT + self.headers = None + self.types = None + self.sql_types = None + + def with_format(self, result_format): + self.format = result_format + return self + + def with_headers(self, sql_types=False, druidTypes=False): + self.headers = True + self.types = druidTypes + self.sql_types = sql_types + return self + + def with_context(self, context): + if not self.context: + self.context = context + else: + self.context.update(context) + return self + + def add_context(self, key, value): + return self.with_context({key: value}) + + def with_parameters(self, params): + ''' + Set the array of parameters. Parameters must each be a map of 'type'/'value' pairs: + {'type': the_type, 'value': the_value}. The type must be a valid SQL type + (in upper case). See the consts module for a list. + ''' + for param in params: + self.add_parameters(param) + return self + + def add_parameter(self, value): + ''' + Add one parameter value. Infers the type of the parameter from the Python type. + ''' + if value is None: + raise ClientError('Druid does not support null parameter values') + data_type = None + value_type = type(value) + if value_type is str: + data_type = consts.SQL_VARCHAR_TYPE + elif value_type is int: + data_type = consts.SQL_BIGINT_TYPE + elif value_type is float: + data_type = consts.SQL_DOUBLE_TYPE + elif value_type is list: + data_type = consts.SQL_ARRAY_TYPE + else: + raise ClientError('Unsupported value type') + if not self.params: + self.params = [] + self.params.append({'type': data_type, 'value': value}) + + def response_header(self): + self.header = True + return self + + def request_headers(self, headers): + self.headers = headers + return self + + def to_common_format(self): + self.header = False + self.sql_types = False + self.types = False + self.format = consts.SQL_OBJECT + return self + + def to_request(self): + query_obj = {'query': self.sql} + if self.context: + query_obj['context'] = self.context + if self.params: + query_obj['parameters'] = self.params + if self.header: + query_obj['header'] = True + if self.format: + query_obj['resultFormat'] = self.format + if self.sql_types is not None: # Note: boolean variable + query_obj['sqlTypesHeader'] = self.sql_types + if self.types is not None: # Note: boolean variable + query_obj['typesHeader'] = self.types + return query_obj + + def result_format(self): + return self.format.lower() + + def run(self): + return self.query_client.sql_query(self) + +def request_from_sql_query(query_client, sql_query): + try: + req = SqlRequest(query_client, sql_query['query']) + except KeyError: + raise ClientError('A SqlRequest dictionary must have \'query\' set') + req.context = sql_query.get('context') + req.params = sql_query.get('parameters') + req.header = sql_query.get('header') + req.format = sql_query.get('resultFormat') + req.format = consts.SQL_OBJECT if req.format is None else req.format + req.sql_types = sql_query.get('sqlTypesHeader') + req.types = sql_query.get('typesHeader') + return req + +def parse_rows(fmt, context, results): + if fmt == consts.SQL_ARRAY_WITH_TRAILER: + rows = results['results'] + elif fmt == consts.SQL_ARRAY: + rows = results + else: + return results + if not context.get('headers', False): + return rows + header_size = 1 + if context.get('sqlTypesHeader', False): + header_size += 1 + if context.get('typesHeader', False): + header_size += 1 + return rows[header_size:] + +def label_non_null_cols(results): + if not results: + return [] + is_null = {} + for key in results[0].keys(): + is_null[key] = True + for row in results: + for key, value in row.items(): + # The following is hack to check for null values, empty strings and numeric 0s. + is_null[key] = not not value + return is_null + +def filter_null_cols(results): + ''' + Filter columns from a Druid result set by removing all null-like + columns. A column is considered null if all values for that column + are null. A value is null if it is either a JSON null, an empty + string, or a numeric 0. All rows are preserved, as is the order + of the remaining columns. + ''' + if not results: + return results + is_null = label_non_null_cols(results) + revised = [] + for row in results: + new_row = {} + for key, value in row.items(): + if is_null[key]: + continue + new_row[key] = value + revised.append(new_row) + return revised + +def parse_object_schema(results): + schema = [] + if len(results) == 0: + return schema + row = results[0] + for k, v in row.items(): + druid_type = None + sql_type = None + if type(v) is str: + druid_type = consts.DRUID_STRING_TYPE + sql_type = consts.SQL_VARCHAR_TYPE + elif type(v) is int or type(v) is float: + druid_type = consts.DRUID_LONG_TYPE + sql_type = consts.SQL_BIGINT_TYPE + schema.append(ColumnSchema(k, sql_type, druid_type)) + return schema + +def parse_array_schema(context, results): + schema = [] + if len(results) == 0: + return schema + has_headers = context.get(consts.HEADERS_KEY, False) + if not has_headers: + return schema + has_sql_types = context.get(consts.SQL_TYPES_HEADERS_KEY, False) + has_druid_types = context.get(consts.DRUID_TYPE_HEADERS_KEY, False) + size = len(results[0]) + for i in range(size): + druid_type = None + if has_druid_types: + druid_type = results[1][i] + sql_type = None + if has_sql_types: + sql_type = results[2][i] + schema.append(ColumnSchema(results[0][i], sql_type, druid_type)) + return schema + +def parse_schema(fmt, context, results): + if fmt == consts.SQL_OBJECT: + return parse_object_schema(results) + elif fmt == consts.SQL_ARRAY or fmt == consts.SQL_ARRAY_WITH_TRAILER: + return parse_array_schema(context, results) + else: + return [] + +def is_response_ok(http_response): + code = http_response.status_code + return code == requests.codes.ok or code == requests.codes.accepted + +class ColumnSchema: + + def __init__(self, name, sql_type, druid_type): + self.name = name + self.sql_type = sql_type + self.druid_type = druid_type + + def __str__(self): + return '{{name={}, SQL type={}, Druid type={}}}'.format(self.name, self.sql_type, self.druid_type) + +class SqlQueryResult: + ''' + Response from a classic request/response query. + ''' + + def __init__(self, request, response): + self.http_response = response + self._json = None + self._rows = None + self._schema = None + self.request = request + self._error = None + self._id = None + if not is_response_ok(response): + try: + self._error = response.json() + except Exception: + self._error = response.text + if not self._error: + self._error = 'Failed with HTTP status {}'.format(response.status_code) + try: + self._id = self.http_response.headers['X-Druid-SQL-Query-Id'] + except KeyError: + self._error = 'Query returned no query ID' + + @property + def _druid(self): + return self.request.query_client.druid_client + + @property + def result_format(self): + return self.request.result_format() + + @property + def ok(self): + ''' + Reports if the query succeeded. + + The query rows and schema are available only if ok is True. + ''' + return is_response_ok(self.http_response) + + @property + def error(self): + ''' + If the query fails, returns the error, if any provided by Druid. + ''' + if self.ok: + return None + if self._error: + return self._error + if not self.http_response: + return { 'error': 'unknown'} + if is_response_ok(self.http_response): + return None + return {'error': 'HTTP {}'.format(self.http_response.status_code)} + + @property + def error_message(self): + if self.ok: + return None + err = self.error + if not err: + return 'unknown' + if type(err) is str: + return err + msg = err.get('error') + text = err.get('errorMessage') + if not msg and not text: + return 'unknown' + if not msg: + return text + if not text: + return msg + return msg + ': ' + text + + @property + def id(self): + ''' + Returns the unique identifier for the query. + ''' + return self._id + + @property + def non_null(self): + if not self.ok: + return None + if self.result_format != consts.SQL_OBJECT: + return None + return filter_null_cols(self.rows) + + @property + def as_array(self): + if self.result_format == consts.SQL_OBJECT: + rows = [] + for obj in self.rows: + if hasattr(obj, "values"): + rows.append([v for v in obj.values()]) + return rows + else: + return self.rows + + @property + def json(self): + if not self.ok: + return None + if not self._json: + self._json = self.http_response.json() + return self._json + + @property + def rows(self): + ''' + Returns the rows of data for the query. + + Druid supports many data formats. The method makes its best + attempt to map the format into an array of rows of some sort. + ''' + if not self._rows: + json = self.json + if not json: + return self.http_response.text + self._rows = parse_rows(self.result_format, self.request.context, json) + return self._rows + + @property + def schema(self): + ''' + Returns the data schema as a list of ColumnSchema objects. + + Druid supports many data formats; not all of which provide + schema information. This method makes a best effort to + extract the schema from the query results. + ''' + if not self._schema: + self._schema = parse_schema(self.result_format, self.request.context, self.json) + return self._schema + + def _display(self, display): + return self._druid.display if not display else display + + def show(self, non_null=False, display=None): + display = self._display(display) + if not self.ok: + display.error(self.error_message) + return + data = None + if non_null: + data = self.non_null + if not data: + data = self.as_array + if not data: + display.alert('Query returned no results') + return + display.data_table(data, [c.name for c in self.schema]) + + def show_schema(self, display=None): + display = self._display(display) + if not self.ok: + display.error(self.error_message) + return + data = [] + for c in self.schema: + data.append([c.name, c.sql_type, c.druid_type]) + if not data: + display.alert('Query returned no schema') + return + display.data_table(data, ['Name', 'SQL Type', 'Druid Type']) + +class QueryTaskResult: + ''' + Response from an asynchronous MSQ query, which may be an ingestion or a retrieval + query. Can monitor task progress and wait for the task to complete. For a SELECT query, + obtains the rows from the task reports. There are no results for an ingestion query, + just a success/failure status. + + Note that SELECT query support is preliminary. The result structure is subject to + change. Use a version of the library that matches your version of Druid for best + results with MSQ SELECT queries. + ''' + + def __init__(self, request, response): + self._request = request + self.http_response = response + self._status = None + self._results = None + self._details = None + self._schema = None + self._rows = None + self._reports = None + self._results = None + self._error = None + self._id = None + if not is_response_ok(response): + self._state = consts.FAILED_STATE + try: + self._error = response.json() + except Exception: + self._error = response.text + if not self._error: + self._error = 'Failed with HTTP status {}'.format(response.status_code) + return + + # Typical response: + # {'taskId': '6f7b514a446d4edc9d26a24d4bd03ade_fd8e242b-7d93-431d-b65b-2a512116924c_bjdlojgj', + # 'state': 'RUNNING'} + self.response_obj = response.json() + self._id = self.response_obj['taskId'] + self._state = self.response_obj['state'] + + @property + def ok(self): + ''' + Reports if the query completed successfully or is still running. + Use succeeded() to check if the task is done and successful. + ''' + return not self._error + + @property + def id(self): + return self._id + + def _druid(self): + return self._request.query_client.druid_client + + def _tasks(self): + return self._druid().tasks + + @property + def status(self): + ''' + Polls Druid for an update on the query run status. + ''' + self.check_valid() + # Example: + # {'task': 'talaria-sql-w000-b373b68d-2675-4035-b4d2-7a9228edead6', + # 'status': { + # 'id': 'talaria-sql-w000-b373b68d-2675-4035-b4d2-7a9228edead6', + # 'groupId': 'talaria-sql-w000-b373b68d-2675-4035-b4d2-7a9228edead6', + # 'type': 'talaria0', 'createdTime': '2022-04-28T23:19:50.331Z', + # 'queueInsertionTime': '1970-01-01T00:00:00.000Z', + # 'statusCode': 'RUNNING', 'status': 'RUNNING', 'runnerStatusCode': 'PENDING', + # 'duration': -1, 'location': {'host': None, 'port': -1, 'tlsPort': -1}, + # 'dataSource': 'w000', 'errorMsg': None}} + self._status = self._tasks().task_status(self._id) + self._state = self._status['status']['status'] + if self._state == consts.FAILED_STATE: + self._error = self._status['status']['errorMsg'] + return self._status + + @property + def done(self): + ''' + Reports whether the query is done. The query is done when the Overlord task + that runs the query completes. A completed task is one with a status of either + SUCCESS or FAILED. + ''' + return self._state == consts.FAILED_STATE or self._state == consts.SUCCESS_STATE + + @property + def succeeded(self): + ''' + Reports if the query succeeded. + ''' + return self._state == consts.SUCCESS_STATE + + @property + def state(self): + ''' + Reports the task state from the Overlord task. + + Updated after each call to status(). + ''' + return self._state + + @property + def error(self): + return self._error + + @property + def error_message(self): + err = self.error() + if not err: + return 'unknown' + if type(err) is str: + return err + msg = dict_get(err, 'error') + text = dict_get(err, 'errorMessage') + if not msg and not text: + return 'unknown' + if text: + text = text.replace('\\n', '\n') + if not msg: + return text + if not text: + return msg + return msg + ': ' + text + + def join(self): + ''' + Wait for the task to complete, if still running. Returns at task + completion: success or failure. + + Returns True for success, False for failure. + ''' + if not self.done: + self.status + while not self.done: + time.sleep(0.5) + self.status + return self.succeeded + + def check_valid(self): + if not self._id: + raise ClientError('Operation is invalid on a failed query') + + def wait_until_done(self): + ''' + Wait for the task to complete. Raises an error if the task fails. + A caller can proceed to do something with the successful result + once this method returns without raising an error. + ''' + if not self.join(): + raise DruidError('Query failed: ' + self.error_message) + + def wait(self): + ''' + Wait for a SELECT query to finish running, then returns the rows from the query. + ''' + self.wait_until_done() + return self.rows + + @property + def reports(self) -> dict: + self.check_valid() + if not self._reports: + self.join() + self._reports = self._tasks().task_reports(self._id) + return self._reports + + def reports_no_wait(self) -> dict: + return self._tasks().task_reports(self._id, require_ok=False) + + @property + def results(self): + if not self._results: + rpts = self.reports() + self._results = rpts['multiStageQuery']['payload']['results'] + return self._results + + @property + def schema(self): + if not self._schema: + results = self.results + sig = results['signature'] + sql_types = results['sqlTypeNames'] + size = len(sig) + self._schema = [] + for i in range(size): + self._schema.append(ColumnSchema(sig[i]['name'], sql_types[i], sig[i]['type'])) + return self._schema + + @property + def rows(self): + if not self._rows: + results = self.results + self._rows = results['results'] + return self._rows + + def _display(self, display): + return self._druid().display if not display else display + + def show(self, non_null=False, display=None): + display = self._display(display) + if not self.done: + display.alert('Task has not finished running') + return + if not self.succeeded: + display.error(self.error_message) + return + data = self.rows + if non_null: + data = filter_null_cols(data) + if not data: + display.alert('Query returned no {}rows'.format("visible " if non_null else '')) + return + display.data_table(data, [c.name for c in self.schema]) + +class AsynchQueryResult: + ''' + Response from an asynchronous MSQ SELECT query from Deep Storage. + ''' + + def __init__(self, request, response): + self._request = request + self.http_response = response + self._state = None # state :init and status + self._durationMs = None # durationMs :init and status + self._status_detail = None # result structure :status + self._schema = None # schema :init + self._pages = None # pages :status + self._rows = None # response is json array of objects : statemets//results + self._error = None # status ?? property unknown + self._id = None # queryId + if not is_response_ok(response): + self._state = consts.FAILED_STATE + try: + self._error = response.json() + except Exception: + self._error = response.text + if not self._error: + self._error = 'Failed with HTTP status {}'.format(response.status_code) + return + + # Typical response: + # {'taskId': '6f7b514a446d4edc9d26a24d4bd03ade_fd8e242b-7d93-431d-b65b-2a512116924c_bjdlojgj', + # 'state': 'RUNNING'} + self.response_obj = response.json() + self._id = self.response_obj['queryId'] + self._state = self.response_obj['state'] + + @property + def ok(self): + ''' + Reports if the query completed successfully or is still running. + Use succeeded() to check if the task is done and successful. + ''' + return not self._error + + @property + def id(self): + return self._id + + def _druid(self): + return self._request.query_client.druid_client + + def _query_client(self): + return self._request.query_client + # def _tasks(self): + # return self._druid().tasks + + @property + def status(self): + ''' + Polls Druid for an update on the query run status. + ''' + self.check_valid() + status = self._query_client().statement_status(self._id) + self._state = dict_get( status, 'state') + self._schema = dict_get( status, 'schema') + self._durationMs = dict_get( status, 'durationMs', 0) + if 'result' in status.keys(): + self._pages = dict_get( status['result'], 'pages') + self._results = status['result'] + if self._state == consts.FAILED_STATE: + self._error = dict_get( dict_get(status,'errorDetails'), 'errorMessage') + return self._state + + + @property + def done(self): + ''' + Reports whether the query is done. The query is done when the Overlord task + that runs the query completes. A completed task is one with a status of either + SUCCESS or FAILED. + ''' + return self._state == consts.FAILED_STATE or self._state == consts.SUCCESS_STATE + + @property + def succeeded(self): + ''' + Reports if the query succeeded. + ''' + return self._state == consts.SUCCESS_STATE + + @property + def state(self): + ''' + Reports the task state from the Overlord task. + + Updated after each call to status(). + ''' + return self._state + + @property + def error(self): + return self._error + + @property + def error_message(self): + err = self.error + if not err: + return 'unknown' + if type(err) is str: + return err + msg = dict_get(err, 'error') + text = dict_get(err, 'errorMessage') + if not msg and not text: + return 'unknown' + if text: + text = text.replace('\\n', '\n') + if not msg: + return text + if not text: + return msg + return msg + ': ' + text + + def join(self): + ''' + Wait for the task to complete, if still running. Returns at task + completion: success or failure. + + Returns True for success, False for failure. + ''' + if not self.done: + self.status + while not self.done: + time.sleep(0.5) + self.status + return self.succeeded + + def check_valid(self): + if not self._id: + raise ClientError('Operation is invalid on a failed query') + + def wait_until_done(self): + ''' + Wait for the task to complete. Raises an error if the task fails. + A caller can proceed to do something with the successful result + once this method returns without raising an error. + ''' + if not self.join(): + raise DruidError('Query failed: ' + self.error_message) + + def wait(self): + ''' + Wait for a SELECT query to finish running, then returns the rows from the query. + ''' + self.wait_until_done() + return self.rows + + @property + def schema(self): + return self._schema + + @property + def rows(self): + import json + if not self._rows: + if self.succeeded: + page = 0 + self._rows=[] + while page < len(self._pages): + results = self._query_client().statement_results(self._id, page) + for obj in results.text.splitlines(): + try: + if len(obj) > 0 : + self._rows.append( json.loads(obj)) + except Exception as ex: + raise ClientError(f"Could not parse JSON from row [{obj}]") + page+=1 + return self._rows + + def paged_rows(self, pageNum): + import json + if self.succeeded: + self._rows=[] + if pageNum < len(self._pages): + results = self._query_client().statement_results(self._id, pageNum) + try: + self._rows = json.loads(results.text) + except Exception as ex: + raise ClientError(f"Could not parse JSON from result [{results}]") + return self._rows + + + def _display(self, display): + return self._druid().display if not display else display + + def show(self, non_null=False, display=None): + display = self._display(display) + if not self.done: + display.alert('Task has not finished running') + return + if not self.succeeded: + display.error(self.error_message) + return + data = self.rows + if non_null: + data = filter_null_cols(data) + if not data: + display.alert('Query returned no {}rows'.format("visible " if non_null else '')) + return + display.data_table(data, [c.name for c in self.schema]) + +class QueryClient: + + def __init__(self, druid, rest_client=None): + self.druid_client = druid + self._rest_client = druid.rest_client if not rest_client else rest_client + + @property + def rest_client(self): + return self._rest_client + + def _prepare_query(self, request, asynch=False, rowsPerPage= None ): + if not request: + raise ClientError('No query provided.') + # If the request is a dictionary, assume it is already in SqlQuery form. + query_obj = None + if type(request) == dict: + query_obj = request + request = request_from_sql_query(self, request) + elif type(request) == str: + request = self.sql_request(request) + if not request.sql: + raise ClientError('No query provided.') + if self.rest_client.trace: + print(request.sql) + if asynch: + request.add_context( 'executionMode', 'ASYNC') + if rowsPerPage: + request.add_context( 'rowsPerPage', rowsPerPage) + if not query_obj: + query_obj = request.to_request() + return (request, query_obj) + + def sql_query(self, request) -> SqlQueryResult: + ''' + Submits a SQL query with control over the context, parameters and other + options. Returns a response with either a detailed error message, or + the rows and query ID. + + Parameters + ---------- + request: str | SqlRequest | dict + If a string, then gives the SQL query to execute. + + Can also be a `SqlRequest`, obtained from the + 'sql_request()` method, with optional query context, query parameters or + other options. + + Can also be a dictionary that represents a `SqlQuery` object. The + `SqlRequest` is a convenient wrapper to generate a `SqlQuery`. + + Note that some of the Druid SqlQuery options will return data in a format + that this library cannot parse. In that case, obtain the raw payload from + the response and avoid using the rows() and schema() methods. + + Returns + ------- + A SqlQueryResult object that provides either the error message for a failed query, + or the results of a successul query. The object provides access to the schema and + rows if data is requested in a supported format. The default request object sets the + options to return data in the required format. + ''' + request, query_obj = self._prepare_query(request) + r = self.rest_client.post_only_json(REQ_SQL, query_obj, headers=request.headers) + return SqlQueryResult(request, r) + + def sql(self, sql, *args) -> list: + ''' + Run a SQL query and return the results. Typically used to receive data as part + of another operation, rathre than to display results to the user. + + Parameters + ---------- + sql: str + The SQL statement with optional Python `{}` parameters. + + args: list[str], Default = None + Array of values to insert into the parameters. + ''' + if len(args) > 0: + sql = sql.format(*args) + resp = self.sql_query(sql) + if resp.ok: + return resp.rows + raise ClientError(resp.error_message) + + def explain_sql(self, query): + ''' + Runs an EXPLAIN PLAN FOR query for the given query. + + Returns + ------- + An object with the plan JSON parsed into Python objects: + plan: the query plan + columns: column schema + tables: dictionary of name/type pairs + ''' + if not query: + raise ClientError('No query provided.') + results = self.sql('EXPLAIN PLAN FOR ' + query) + return results[0] + + def sql_request(self, sql) -> SqlRequest: + ''' + Creates a SqlRequest object for the given SQL query text. + ''' + return SqlRequest(self, sql) + + def task(self, query) -> QueryTaskResult: + ''' + Submits an MSQ query. Returns a QueryTaskResult to track the task. + + Parameters + ---------- + query + The query as either a string or a SqlRequest object. + ''' + request, query_obj = self._prepare_query(query) + r = self.rest_client.post_only_json(REQ_SQL_TASK, query_obj, headers=request.headers) + return QueryTaskResult(request, r) + + def statement(self, query, rowsPerPage=None) -> AsynchQueryResult: + ''' + Submits an MSQ asynch query. Returns a AsynchQueryResult to track the task. + + Parameters + ---------- + query + The query as either a string or a SqlRequest object. + ''' + request, query_obj = self._prepare_query(query, asynch=True, rowsPerPage=rowsPerPage) + r = self.rest_client.post_only_json(REQ_SQL_ASYNC, query_obj, headers=request.headers) + return AsynchQueryResult(request, r) + + def statement_status(self, id) : + ''' + + :param id: + :return: + ''' + ''' + Submits an MSQ asynch query status request. + :param id: id of the query to retrieve status from + :return: json response object. + ''' + response = self.rest_client.get_json(REQ_SQL_ASYNC+f'/{id}', "") + return response + + def statement_results(self, id, page=0 ): + ''' + :param id: queryId to retrieve results from + :param page: the page of rows to retrieve + :return: json array with rows for the page of results + ''' + response = self.rest_client.get(REQ_SQL_ASYNC+f'/{id}/results', params={"page":page} ) + return response + + def run_task(self, query): + ''' + Submits an MSQ query and wait for completion. Returns a QueryTaskResult to track the task. + + Parameters + ---------- + query + The query as either a string or a SqlRequest object. + ''' + resp = self.task(query) + if not resp.ok: + raise ClientError(resp.error_message) + resp.wait_until_done() + + def async_sql(self, query, rowsPerPage=None): + ''' + Submits an MSQ asynchronous query request using the sql/statements API Returns a + :param query: The SQL query statement + :return: rows + ''' + resp = self.statement(query, rowsPerPage=rowsPerPage) + if not resp.ok: + raise ClientError(resp.error_message) + return resp + + def _tables_query(self, schema): + return self.sql_query(''' + SELECT TABLE_NAME AS TableName + FROM INFORMATION_SCHEMA.TABLES + WHERE TABLE_SCHEMA = '{}' + ORDER BY TABLE_NAME + '''.format(schema)) + + def tables(self, schema=consts.DRUID_SCHEMA): + ''' + Returns a list of tables in the given schema. + + Parameters + ---------- + schema + The schema to query, `druid` by default. + ''' + return self._tables_query(schema).rows + + def _schemas_query(self): + return self.sql_query(''' + SELECT SCHEMA_NAME AS SchemaName + FROM INFORMATION_SCHEMA.SCHEMATA + ORDER BY SCHEMA_NAME + ''') + + def schemas(self): + return self._schemas_query().rows + + def _schema_query(self, table_name): + parts = split_table_name(table_name, consts.DRUID_SCHEMA) + return self.sql_query(''' + SELECT + ORDINAL_POSITION AS "Position", + COLUMN_NAME AS "Name", + DATA_TYPE AS "Type" + FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_SCHEMA = '{}' + AND TABLE_NAME = '{}' + ORDER BY ORDINAL_POSITION + '''.format(parts[0], parts[1])) + + def table_schema(self, table_name): + ''' + Returns the schema of a table as an array of dictionaries of the + form {"Position": "", "Name": "", "Type": ""} + + Parameters + ---------- + table_name: str + The name of the table as either "table" or "schema.table". + If the form is "table", then the 'druid' schema is assumed. + ''' + return self._schema_query(table_name).rows + + def _function_args_query(self, table_name): + parts = split_table_name(table_name, consts.EXT_SCHEMA) + return self.sql_query(''' + SELECT + ORDINAL_POSITION AS "Position", + PARAMETER_NAME AS "Parameter", + DATA_TYPE AS "Type", + IS_OPTIONAL AS "Optional" + FROM INFORMATION_SCHEMA.PARAMETERS + WHERE SCHEMA_NAME = '{}' + AND FUNCTION_NAME = '{}' + ORDER BY ORDINAL_POSITION + '''.format(parts[0], parts[1])) + + def function_parameters(self, table_name): + ''' + Retruns the list of parameters for a partial external table defined in + the Druid catalog. Returns the parameters as an array of objects in the + form {"Position": , "Parameter": "", "Type": "", + "Optional": True|False} + + Parameters + ---------- + table_name str + The name of the table as either "table" or "schema.table". + If the form is "table", then the 'ext' schema is assumed. + ''' + return self._function_args_query(table_name).rows + + def wait_until_ready(self, table_name, verify_load_status=True): + ''' + Waits for a datasource to be loaded in the cluster, and to become available to SQL. + + Parameters + ---------- + table_name str + The name of a datasource in the 'druid' schema. + verify_load_status + If true, checks whether all published segments are loaded before testing query. + If false, tries the test query before checking whether all published segments are loaded. + ''' + if verify_load_status: + self.druid_client.datasources.wait_until_ready(table_name) + while True: + try: + self.sql('SELECT 1 FROM "{}" LIMIT 1'.format(table_name)); + return + except Exception: + time.sleep(0.5) diff --git a/pydruid/druidapi/status.py b/pydruid/druidapi/status.py new file mode 100644 index 0000000..89141d2 --- /dev/null +++ b/pydruid/druidapi/status.py @@ -0,0 +1,124 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time + +STATUS_BASE = '/status' +REQ_STATUS = STATUS_BASE +REQ_HEALTH = STATUS_BASE + '/health' +REQ_PROPERTIES = STATUS_BASE + '/properties' +REQ_IN_CLUSTER = STATUS_BASE + '/selfDiscovered/status' + +ROUTER_BASE = '/druid/router/v1' +REQ_BROKERS = ROUTER_BASE + '/brokers' + +class StatusClient: + ''' + Client for status APIs. These APIs are available on all nodes. + If used with the Router, they report the status of just the Router. + To check the status of other nodes, first create a REST endpoint for that + node: + + status_client = StatusClient(DruidRestClient("")) + + You can find the service endpoints by querying the sys.servers table using SQL. + + See https://druid.apache.org/docs/latest/api-reference/api-reference.html#process-information + ''' + + def __init__(self, rest_client, owns_client=False): + self.rest_client = rest_client + self.owns_client = owns_client + + def close(self): + if self.owns_client: + self.rest_client.close() + self.rest_client = None + + #-------- Common -------- + + @property + def status(self): + ''' + Returns the Druid version, loaded extensions, memory used, total memory + and other useful information about the Druid service. + + GET `/status` + ''' + return self.rest_client.get_json(REQ_STATUS) + + @property + def is_healthy(self) -> bool: + ''' + Returns `True` if the node is healthy, `False` otherwise. Check service health + before using other Druid API methods to ensure the server is ready. + + See also `wait_until_ready()`. + + GET `/status/health` + ''' + try: + return self.rest_client.get_json(REQ_HEALTH) + except Exception: + return False + + def wait_until_ready(self): + ''' + Sleeps until the node reports itself as healthy. Will run forever if the node + is down or never becomes healthy. + ''' + while not self.is_healthy: + time.sleep(0.5) + + @property + def properties(self) -> map: + ''' + Returns the effective set of Java properties used by the service, including + system properties and properties from the `common_runtime.propeties` and + `runtime.properties` files. + + GET `/status/properties` + ''' + return self.rest_client.get_json(REQ_PROPERTIES) + + @property + def in_cluster(self): + ''' + Returns `True` if the node is visible within the cluster, `False` if not. + That is, returns the value of the `{"selfDiscovered": true/false}` + field in the response. + + GET `/status/selfDiscovered/status` + ''' + try: + result = self.rest_client.get_json(REQ_IN_CLUSTER) + return result.get('selfDiscovered', False) + except ConnectionError: + return False + + @property + def version(self): + ''' + Returns the version of the Druid server. If the server is running in an IDE, the + version will be empty. + ''' + return self.status.get('version') + + @property + def brokers(self): + ''' + Returns the list of broker nodes known to this node. Must be called on the Router. + ''' + return self.rest_client.get_json(REQ_BROKERS) diff --git a/pydruid/druidapi/tasks.py b/pydruid/druidapi/tasks.py new file mode 100644 index 0000000..8a32c86 --- /dev/null +++ b/pydruid/druidapi/tasks.py @@ -0,0 +1,204 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .consts import OVERLORD_BASE +import requests + +REQ_TASKS = OVERLORD_BASE + '/tasks' +REQ_POST_TASK = OVERLORD_BASE + '/task' +REQ_GET_TASK = REQ_POST_TASK + '/{}' +REQ_TASK_STATUS = REQ_GET_TASK + '/status' +REQ_TASK_REPORTS = REQ_GET_TASK + '/reports' +REQ_END_TASK = REQ_GET_TASK + '/shutdown' +REQ_END_DS_TASKS = OVERLORD_BASE + '/datasources/{}/shutdownAllTasks' + +class TaskClient: + ''' + Client for Overlord task-related APIs. + + See https://druid.apache.org/docs/latest/api-reference/api-reference.html#tasks + ''' + + def __init__(self, rest_client): + self.client = rest_client + + def tasks(self, state=None, table=None, task_type=None, max=None, created_time_interval=None): + ''' + Retrieves the list of tasks. + + Parameters + ---------- + state: str, default = None + Filter list of tasks by task state. Valid options are "running", + "complete", "waiting", and "pending". Constants are defined for + each of these in the `consts` file. + + table: str, default = None + Return tasks for only for one Druid table (datasource). + + created_time_interval: str, Default = None + Return tasks created within the specified interval. + + max: int, default = None + Maximum number of "complete" tasks to return. Only applies when state is set to "complete". + + task_type: str, default = None + Filter tasks by task type. + + Reference + --------- + `GET /druid/indexer/v1/tasks` + ''' + params = {} + if state: + params['state'] = state + if table: + params['datasource'] = table + if task_type: + params['type'] = task_type + if max is not None: + params['max'] = max + if created_time_interval: + params['createdTimeInterval'] = created_time_interval + return self.client.get_json(REQ_TASKS, params=params) + + def task(self, task_id) -> dict: + ''' + Retrieves the "payload" of a task. + + Parameters + ---------- + task_id: str + The ID of the task to retrieve. + + Returns + ------- + The task payload as a Python dictionary. + + Reference + --------- + `GET /druid/indexer/v1/task/{taskId}` + ''' + return self.client.get_json(REQ_GET_TASK, args=[task_id]) + + def task_status(self, task_id) -> dict: + ''' + Retrieves the status of a task. + + Parameters + ---------- + task_id: str + The ID of the task to retrieve. + + Returns + ------- + The task status as a Python dictionary. See the `consts` module for a list + of status codes. + + Reference + --------- + `GET /druid/indexer/v1/task/{taskId}/status` + ''' + return self.client.get_json(REQ_TASK_STATUS, args=[task_id]) + + def task_reports(self, task_id, require_ok = True) -> dict: + ''' + Retrieves the completion report for a completed task. + + Parameters + ---------- + task_id: str + The ID of the task to retrieve. + + Returns + ------- + The task reports as a Python dictionary. + + Reference + --------- + `GET /druid/indexer/v1/task/{taskId}/reports` + ''' + if require_ok: + return self.client.get_json(REQ_TASK_REPORTS, args=[task_id]) + else: + resp = self.client.get(REQ_TASK_REPORTS, args=[task_id], require_ok=require_ok) + if resp.status_code == requests.codes.ok: + try: + result = resp.json() + except Exception as ex: + result = {"message":"Payload could not be converted to json.", "payload":f"{resp.content}", "exception":f"{ex}"} + return result + else: + return {"message":f"Request return code:{resp.status_code}"} + + + def submit_task(self, payload): + ''' + Submits a task to the Overlord. + + Returns the `taskId` of the submitted task. + + Parameters + ---------- + payload: object + The task object represented as a Python dictionary. + + Returns + ------- + The REST response. + + Reference + --------- + `POST /druid/indexer/v1/task` + ''' + return self.client.post_json(REQ_POST_TASK, payload) + + def shut_down_task(self, task_id): + ''' + Shuts down a task. + + Parameters + ---------- + task_id: str + The ID of the task to shut down. + + Returns + ------- + The REST response. + + Reference + --------- + `POST /druid/indexer/v1/task/{taskId}/shutdown` + ''' + return self.client.post_json(REQ_END_TASK, body='', args=[task_id]) + + def shut_down_tasks_for(self, table): + ''' + Shuts down all tasks for a table (datasource). + + Parameters + ---------- + table: str + The name of the table (datasource). + + Returns + ------- + The REST response. + + Reference + --------- + `POST /druid/indexer/v1/datasources/{dataSource}/shutdownAllTasks` + ''' + return self.client.post_json(REQ_END_DS_TASKS, body='', args=[table]) diff --git a/pydruid/druidapi/text_display.py b/pydruid/druidapi/text_display.py new file mode 100644 index 0000000..d991e97 --- /dev/null +++ b/pydruid/druidapi/text_display.py @@ -0,0 +1,181 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .display import DisplayClient +from .base_table import pad, BaseTable + +alignments = ['', '^', '>'] + +def simple_table(table_def): + table = [] + if table_def.headers: + table.append(' '.join(table_def.format_row(table_def.headers))) + for row in table_def.rows: + table.append(' '.join(table_def.format_row(row))) + return table + +def border_table(table_def): + fmt = ' | '.join(table_def.formats) + table = [] + if table_def.headers: + table.append(fmt.format(*table_def.headers)) + bar = '' + for i in range(table_def.width): + width = table_def.widths[i] + if i > 0: + bar += '+' + if table_def.width == 1: + pass + elif i == 0: + width += 1 + elif i == table_def.width - 1: + width += 1 + else: + width += 2 + bar += '-' * width + table.append(bar) + for row in table_def.rows: + table.append(fmt.format(*row)) + return table + +class TableDef: + + def __init__(self): + self.width = None + self.headers = None + self.align = None + self.formats = None + self.rows = None + self.widths = None + + def find_widths(self): + self.widths = [0 for i in range(self.width)] + if self.headers: + for i in range(len(self.headers)): + self.widths[i] = len(self.headers[i]) + for row in self.rows: + for i in range(len(row)): + if row[i] is not None: + self.widths[i] = max(self.widths[i], len(row[i])) + + def apply_widths(self, widths): + if not widths: + return + for i in range(min(len(self.widths), len(widths))): + if widths[i] is not None: + self.widths[i] = widths[i] + + def define_row_formats(self): + self.formats = [] + for i in range(self.width): + f = '{{:{}{}.{}}}'.format( + alignments[self.align[i]], + self.widths[i], self.widths[i]) + self.formats.append(f) + + def format_header(self): + if not self.headers: + return None + return self.format_row(self.headers) + + def format_row(self, data_row): + row = [] + for i in range(self.width): + value = data_row[i] + if not value: + row.append(' ' * self.widths[i]) + else: + row.append(self.formats[i].format(value)) + return row + +class TextTable(BaseTable): + + def __init__(self): + BaseTable.__init__(self) + self.formatter = simple_table + self._widths = None + + def with_border(self): + self.formatter = border_table + + def widths(self, widths): + self._widths = widths + + def compute_def(self, rows): + table_def = TableDef() + min_width, max_width = self.row_width(rows) + table_def.width = max_width + table_def.headers = self.pad_headers(max_width) + table_def.rows = self.format_rows(rows, min_width, max_width) + table_def.find_widths() + table_def.apply_widths(self._widths) + table_def.align = self.find_alignments(rows, max_width) + table_def.define_row_formats() + return table_def + + def format(self): + if not self._rows: + self._rows = [] + table_rows = self.formatter(self.compute_def(self._rows)) + return '\n'.join(table_rows) + + def format_rows(self, rows, min_width, max_width): + if not self._col_fmt: + return self.default_row_format(rows, min_width, max_width) + else: + return self.apply_row_formats(rows, max_width) + + def default_row_format(self, rows, min_width, max_width): + new_rows = [] + if min_width <= max_width: + rows = self.pad_rows(rows, max_width) + for row in rows: + new_row = ['' if v is None else str(v) for v in row] + new_rows.append(pad(new_row, max_width, None)) + return new_rows + + def apply_row_formats(self, rows, max_width): + new_rows = [] + fmts = self._col_fmt + if len(fmts) < max_width: + fmts = fmts.copy() + for i in range(len(fmts), max_width): + fmts.append(lambda v: v) + for row in rows: + new_row = [] + for i in range(len(row)): + new_row.append(fmts[i](row[i])) + new_rows.append(pad(new_row, max_width, None)) + return new_rows + +class TextDisplayClient(DisplayClient): + + def __init__(self): + DisplayClient.__init__(self) + + def text(self, msg): + print(msg) + + def alert(self, msg): + print("Alert:", msg) + + def error(self, msg): + print("ERROR:", msg) + + def new_table(self): + return TextTable() + + def show_table(self, table): + print(table.format()) diff --git a/pydruid/druidapi/util.py b/pydruid/druidapi/util.py new file mode 100644 index 0000000..a2ad9f5 --- /dev/null +++ b/pydruid/druidapi/util.py @@ -0,0 +1,35 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .error import ClientError + +def dict_get(dict, key, default=None): + ''' + Returns the value of key in the given dict, or the default value if + the key is not found. + ''' + if not dict: + return default + return dict.get(key, default) + +def split_table_name(table_name, default_schema): + if not table_name: + raise ClientError('Table name is required') + parts = table_name.split('.') + if len(parts) > 2: + raise ClientError('Druid supports one or two-part table names') + if len(parts) == 2: + return parts + return [default_schema, parts[0]] diff --git a/requirements-dev.in b/requirements-dev.in index 83a794b..5320fa1 100644 --- a/requirements-dev.in +++ b/requirements-dev.in @@ -8,3 +8,4 @@ pre-commit pycurl pytest tox +requests \ No newline at end of file