diff --git a/README.md b/README.md index ba26b47c..f50b5e0b 100644 --- a/README.md +++ b/README.md @@ -268,6 +268,20 @@ TABLES GoodBye! ``` + +# Compressed Data + +Consider using the new "extra_headers" optional parameter to send +"Accept-Encoding: gzip" and have Druid return the results compressed, +increasing the performance of the query especially for large data sets. + +```python +from pydruid.client import PyDruid + +query = PyDruid(druid_url_goes_here, 'druid/v2', extra_headers={"Accept-Encoding": "gzip"}) +``` + + # Contributing Contributions are welcomed of course. We like to use `black` and `flake8`. diff --git a/pydruid/client.py b/pydruid/client.py index 95dca1dd..16e41eb6 100755 --- a/pydruid/client.py +++ b/pydruid/client.py @@ -16,6 +16,7 @@ from __future__ import division from __future__ import absolute_import +import sys import json import re @@ -24,19 +25,35 @@ from pydruid.query import QueryBuilder from base64 import b64encode +if sys.version_info.major == 2 and sys.version_info.minor == 7: + import StringIO + from gzip import GzipFile + + def decompress(data): + infile = StringIO.StringIO() + infile.write(data) + with GzipFile(fileobj=infile, mode="r") as f: + f.rewind() + ud = f.read() + return ud + + +else: + from gzip import decompress # extract error from the
tag inside the HTML response HTML_ERROR = re.compile("\\s*(.*?)\\s*", re.IGNORECASE) class BaseDruidClient(object): - def __init__(self, url, endpoint): + def __init__(self, url, endpoint, extra_headers=None): self.url = url self.endpoint = endpoint self.query_builder = QueryBuilder() self.username = None self.password = None self.proxies = None + self.extra_headers = extra_headers def set_basic_auth_credentials(self, username, password): self.username = username @@ -55,6 +72,8 @@ def _prepare_url_headers_and_body(self, query): else: url = self.url + "/" + self.endpoint headers = {"Content-Type": "application/json"} + if self.extra_headers and isinstance(self.extra_headers, dict): + headers.update(self.extra_headers) if (self.username is not None) and (self.password is not None): authstring = "{}:{}".format(self.username, self.password) b64string = b64encode(authstring.encode()).decode() @@ -542,15 +561,23 @@ class PyDruid(BaseDruidClient): 1 6 2013-10-04T00:00:00.000Z user_2 """ - def __init__(self, url, endpoint): - super(PyDruid, self).__init__(url, endpoint) + def __init__(self, url, endpoint, extra_headers=None): + super(PyDruid, self).__init__(url, endpoint, extra_headers) def _post(self, query): try: headers, querystr, url = self._prepare_url_headers_and_body(query) req = urllib.request.Request(url, querystr, headers) res = urllib.request.urlopen(req) - data = res.read().decode("utf-8") + content_encoding = res.info().get("Content-Encoding") + if content_encoding == "gzip": + data = decompress(res.read()).decode("utf-8") + elif content_encoding: + raise ValueError( + "Invalid content encoding: {}".format(content_encoding) + ) + else: + data = res.read().decode("utf-8") res.close() except urllib.error.HTTPError as e: err = e.read() diff --git a/tests/test_client.py b/tests/test_client.py index 8f2d0a1b..161eeebc 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,6 +1,6 @@ # -*- coding: UTF-8 -*- import textwrap - +import sys import pytest from mock import patch, Mock from six.moves import urllib @@ -11,29 +11,38 @@ from pydruid.utils.aggregators import doublesum from pydruid.utils.filters import Dimension +if sys.version_info.major == 2 and sys.version_info.minor == 7: + from gzip import GzipFile + + def compress(data): + out = StringIO() + with GzipFile(fileobj=out, mode="w") as f: + f.write(data) + return out.getvalue() + + +else: + from gzip import compress -def create_client(): - return PyDruid("http://localhost:8083", "druid/v2/") + +def create_client(headers=None): + return PyDruid("http://localhost:8083", "druid/v2/", headers) def create_blank_query(): - return Query({}, 'none') + return Query({}, "none") -def _http_error(code, msg, data = ''): +def _http_error(code, msg, data=""): # Need a file-like object for the response data fp = StringIO(data) return urllib.error.HTTPError( - url='http://fakeurl:8080/druid/v2/', - hdrs={}, - code=code, - msg=msg, - fp=fp, + url="http://fakeurl:8080/druid/v2/", hdrs={}, code=code, msg=msg, fp=fp ) class TestPyDruid: - @patch('pydruid.client.urllib.request.urlopen') + @patch("pydruid.client.urllib.request.urlopen") def test_druid_returns_error(self, mock_urlopen): # given mock_urlopen.side_effect = _http_error(500, "Druid error") @@ -42,20 +51,22 @@ def test_druid_returns_error(self, mock_urlopen): # when / then with pytest.raises(IOError): client.topn( - datasource="testdatasource", - granularity="all", - intervals="2015-12-29/pt1h", - aggregations={"count": doublesum("count")}, - dimension="user_name", - metric="count", - filter=Dimension("user_lang") == "en", - threshold=1, - context={"timeout": 1000}) - - @patch('pydruid.client.urllib.request.urlopen') + datasource="testdatasource", + granularity="all", + intervals="2015-12-29/pt1h", + aggregations={"count": doublesum("count")}, + dimension="user_name", + metric="count", + filter=Dimension("user_lang") == "en", + threshold=1, + context={"timeout": 1000}, + ) + + @patch("pydruid.client.urllib.request.urlopen") def test_druid_returns_html_error(self, mock_urlopen): # given - message = textwrap.dedent(""" + message = textwrap.dedent( + """ @@ -68,24 +79,29 @@ def test_druid_returns_html_error(self, mock_urlopen):
Powered by Jetty:// 9.3.19.v20170502
- """).strip() - mock_urlopen.side_effect = _http_error(500, 'Internal Server Error', message) + """ + ).strip() + mock_urlopen.side_effect = _http_error(500, "Internal Server Error", message) client = create_client() # when / then with pytest.raises(IOError) as e: client.topn( - datasource="testdatasource", - granularity="all", - intervals="2015-12-29/pt1h", - aggregations={"count": doublesum("count")}, - dimension="user_name", - metric="count", - filter=Dimension("user_lang") == "en", - threshold=1, - context={"timeout": 1000}) - - assert str(e.value) == textwrap.dedent(""" + datasource="testdatasource", + granularity="all", + intervals="2015-12-29/pt1h", + aggregations={"count": doublesum("count")}, + dimension="user_name", + metric="count", + filter=Dimension("user_lang") == "en", + threshold=1, + context={"timeout": 1000}, + ) + + assert ( + str(e.value) + == textwrap.dedent( + """ HTTP Error 500: Internal Server Error Druid Error: javax.servlet.ServletException: java.lang.OutOfMemoryError: GC overhead limit exceeded Query is: { @@ -112,9 +128,11 @@ def test_druid_returns_html_error(self, mock_urlopen): "queryType": "topN", "threshold": 1 } - """).strip() + """ + ).strip() + ) - @patch('pydruid.client.urllib.request.urlopen') + @patch("pydruid.client.urllib.request.urlopen") def test_druid_returns_results(self, mock_urlopen): # given response = Mock() @@ -126,28 +144,32 @@ def test_druid_returns_results(self, mock_urlopen): "metric" : 100 } ] } ] - """.encode("utf-8") + """.encode( + "utf-8" + ) + response.info.return_value = {} mock_urlopen.return_value = response client = create_client() # when top = client.topn( - datasource="testdatasource", - granularity="all", - intervals="2015-12-29/pt1h", - aggregations={"count": doublesum("count")}, - dimension="user_name", - metric="count", - filter=Dimension("user_lang") == "en", - threshold=1, - context={"timeout": 1000}) + datasource="testdatasource", + granularity="all", + intervals="2015-12-29/pt1h", + aggregations={"count": doublesum("count")}, + dimension="user_name", + metric="count", + filter=Dimension("user_lang") == "en", + threshold=1, + context={"timeout": 1000}, + ) # then assert top is not None assert len(top.result) == 1 - assert len(top.result[0]['result']) == 1 + assert len(top.result[0]["result"]) == 1 - @patch('pydruid.client.urllib.request.urlopen') + @patch("pydruid.client.urllib.request.urlopen") def test_client_allows_to_export_last_query(self, mock_urlopen): # given response = Mock() @@ -159,29 +181,79 @@ def test_client_allows_to_export_last_query(self, mock_urlopen): "metric" : 100 } ] } ] - """.encode("utf-8") + """.encode( + "utf-8" + ) + response.info.return_value = {} mock_urlopen.return_value = response client = create_client() client.topn( - datasource="testdatasource", - granularity="all", - intervals="2015-12-29/pt1h", - aggregations={"count": doublesum("count")}, - dimension="user_name", - metric="count", - filter=Dimension("user_lang") == "en", - threshold=1, - context={"timeout": 1000}) + datasource="testdatasource", + granularity="all", + intervals="2015-12-29/pt1h", + aggregations={"count": doublesum("count")}, + dimension="user_name", + metric="count", + filter=Dimension("user_lang") == "en", + threshold=1, + context={"timeout": 1000}, + ) # when / then - # assert that last_query.export_tsv method was called (it should throw an exception, given empty path) + # assert that last_query.export_tsv method was called (it should throw an + # exception, given empty path) with pytest.raises(TypeError): client.export_tsv(None) - @patch('pydruid.client.urllib.request.urlopen') + @patch("pydruid.client.urllib.request.urlopen") def test_client_auth_creds(self, mock_urlopen): client = create_client() query = create_blank_query() - client.set_basic_auth_credentials('myUsername', 'myPassword') + client.set_basic_auth_credentials("myUsername", "myPassword") + headers, _, _ = client._prepare_url_headers_and_body(query) + assert headers["Authorization"] == "Basic bXlVc2VybmFtZTpteVBhc3N3b3Jk" + + def test_client_allows_extra_headers(self): + client = create_client(headers={"Accept-Encoding": "gzip"}) + query = create_blank_query() headers, _, _ = client._prepare_url_headers_and_body(query) - assert headers['Authorization'] == "Basic bXlVc2VybmFtZTpteVBhc3N3b3Jk" + assert headers["Accept-Encoding"] == "gzip" + + @patch("pydruid.client.urllib.request.urlopen") + def test_return_compressed_data(self, mock_urlopen): + # given + response = Mock() + response.read.return_value = compress( + """ + [ { + "timestamp" : "2015-12-30T14:14:49.000Z", + "result" : [ { + "dimension" : "aaaa", + "metric" : 100 + } ] + } ] + """.encode( + "utf-8" + ) + ) + response.info.return_value = {"Content-Encoding": "gzip"} + mock_urlopen.return_value = response + client = create_client(headers={"Accept-Encoding": "gzip"}) + + # when + top = client.topn( + datasource="testdatasource", + granularity="all", + intervals="2015-12-29/pt1h", + aggregations={"count": doublesum("count")}, + dimension="user_name", + metric="count", + filter=Dimension("user_lang") == "en", + threshold=1, + context={"timeout": 1000}, + ) + + # then + assert top is not None + assert len(top.result) == 1 + assert len(top.result[0]["result"]) == 1