Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added optional extra_headers parameter #173

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,20 @@ TABLES
GoodBye!
```


# Compressed Data

Consider using the new "extra_headers" optional parameter to send
"Accept-Encoding: gzip" and have Druid return the results compressed,
increasing the performance of the query especially for large data sets.

```python
from pydruid.client import PyDruid

query = PyDruid(druid_url_goes_here, 'druid/v2', extra_headers={"Accept-Encoding": "gzip"})
```


# Contributing

Contributions are welcomed of course. We like to use `black` and `flake8`.
Expand Down
35 changes: 31 additions & 4 deletions pydruid/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from __future__ import division
from __future__ import absolute_import

import sys
import json
import re

Expand All @@ -24,19 +25,35 @@
from pydruid.query import QueryBuilder
from base64 import b64encode

if sys.version_info.major == 2 and sys.version_info.minor == 7:
import StringIO
from gzip import GzipFile

def decompress(data):
infile = StringIO.StringIO()
infile.write(data)
with GzipFile(fileobj=infile, mode="r") as f:
f.rewind()
ud = f.read()
return ud


else:
from gzip import decompress

# extract error from the <PRE> tag inside the HTML response
HTML_ERROR = re.compile("<pre>\\s*(.*?)\\s*</pre>", re.IGNORECASE)


class BaseDruidClient(object):
def __init__(self, url, endpoint):
def __init__(self, url, endpoint, extra_headers=None):
self.url = url
self.endpoint = endpoint
self.query_builder = QueryBuilder()
self.username = None
self.password = None
self.proxies = None
self.extra_headers = extra_headers

def set_basic_auth_credentials(self, username, password):
self.username = username
Expand All @@ -55,6 +72,8 @@ def _prepare_url_headers_and_body(self, query):
else:
url = self.url + "/" + self.endpoint
headers = {"Content-Type": "application/json"}
if self.extra_headers and isinstance(self.extra_headers, dict):
headers.update(self.extra_headers)
if (self.username is not None) and (self.password is not None):
authstring = "{}:{}".format(self.username, self.password)
b64string = b64encode(authstring.encode()).decode()
Expand Down Expand Up @@ -542,15 +561,23 @@ class PyDruid(BaseDruidClient):
1 6 2013-10-04T00:00:00.000Z user_2
"""

def __init__(self, url, endpoint):
super(PyDruid, self).__init__(url, endpoint)
def __init__(self, url, endpoint, extra_headers=None):
super(PyDruid, self).__init__(url, endpoint, extra_headers)

def _post(self, query):
try:
headers, querystr, url = self._prepare_url_headers_and_body(query)
req = urllib.request.Request(url, querystr, headers)
res = urllib.request.urlopen(req)
data = res.read().decode("utf-8")
content_encoding = res.info().get("Content-Encoding")
if content_encoding == "gzip":
data = decompress(res.read()).decode("utf-8")
elif content_encoding:
raise ValueError(
"Invalid content encoding: {}".format(content_encoding)
)
else:
data = res.read().decode("utf-8")
res.close()
except urllib.error.HTTPError as e:
err = e.read()
Expand Down
200 changes: 136 additions & 64 deletions tests/test_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- coding: UTF-8 -*-
import textwrap

import sys
import pytest
from mock import patch, Mock
from six.moves import urllib
Expand All @@ -11,29 +11,38 @@
from pydruid.utils.aggregators import doublesum
from pydruid.utils.filters import Dimension

if sys.version_info.major == 2 and sys.version_info.minor == 7:
from gzip import GzipFile

def compress(data):
out = StringIO()
with GzipFile(fileobj=out, mode="w") as f:
f.write(data)
return out.getvalue()


else:
from gzip import compress

def create_client():
return PyDruid("http://localhost:8083", "druid/v2/")

def create_client(headers=None):
return PyDruid("http://localhost:8083", "druid/v2/", headers)


def create_blank_query():
return Query({}, 'none')
return Query({}, "none")


def _http_error(code, msg, data = ''):
def _http_error(code, msg, data=""):
# Need a file-like object for the response data
fp = StringIO(data)
return urllib.error.HTTPError(
url='http://fakeurl:8080/druid/v2/',
hdrs={},
code=code,
msg=msg,
fp=fp,
url="http://fakeurl:8080/druid/v2/", hdrs={}, code=code, msg=msg, fp=fp
)


class TestPyDruid:
@patch('pydruid.client.urllib.request.urlopen')
@patch("pydruid.client.urllib.request.urlopen")
def test_druid_returns_error(self, mock_urlopen):
# given
mock_urlopen.side_effect = _http_error(500, "Druid error")
Expand All @@ -42,20 +51,22 @@ def test_druid_returns_error(self, mock_urlopen):
# when / then
with pytest.raises(IOError):
client.topn(
datasource="testdatasource",
granularity="all",
intervals="2015-12-29/pt1h",
aggregations={"count": doublesum("count")},
dimension="user_name",
metric="count",
filter=Dimension("user_lang") == "en",
threshold=1,
context={"timeout": 1000})

@patch('pydruid.client.urllib.request.urlopen')
datasource="testdatasource",
granularity="all",
intervals="2015-12-29/pt1h",
aggregations={"count": doublesum("count")},
dimension="user_name",
metric="count",
filter=Dimension("user_lang") == "en",
threshold=1,
context={"timeout": 1000},
)

@patch("pydruid.client.urllib.request.urlopen")
def test_druid_returns_html_error(self, mock_urlopen):
# given
message = textwrap.dedent("""
message = textwrap.dedent(
"""
<html>
<head>
<meta http-equiv="Content-Type" content="text/html;charset=ISO-8859-1"/>
Expand All @@ -68,24 +79,29 @@ def test_druid_returns_html_error(self, mock_urlopen):
<hr /><a href="http://eclipse.org/jetty">Powered by Jetty:// 9.3.19.v20170502</a><hr/>
</body>
</html>
""").strip()
mock_urlopen.side_effect = _http_error(500, 'Internal Server Error', message)
"""
).strip()
mock_urlopen.side_effect = _http_error(500, "Internal Server Error", message)
client = create_client()

# when / then
with pytest.raises(IOError) as e:
client.topn(
datasource="testdatasource",
granularity="all",
intervals="2015-12-29/pt1h",
aggregations={"count": doublesum("count")},
dimension="user_name",
metric="count",
filter=Dimension("user_lang") == "en",
threshold=1,
context={"timeout": 1000})

assert str(e.value) == textwrap.dedent("""
datasource="testdatasource",
granularity="all",
intervals="2015-12-29/pt1h",
aggregations={"count": doublesum("count")},
dimension="user_name",
metric="count",
filter=Dimension("user_lang") == "en",
threshold=1,
context={"timeout": 1000},
)

assert (
str(e.value)
== textwrap.dedent(
"""
HTTP Error 500: Internal Server Error
Druid Error: javax.servlet.ServletException: java.lang.OutOfMemoryError: GC overhead limit exceeded
Query is: {
Expand All @@ -112,9 +128,11 @@ def test_druid_returns_html_error(self, mock_urlopen):
"queryType": "topN",
"threshold": 1
}
""").strip()
"""
).strip()
)

@patch('pydruid.client.urllib.request.urlopen')
@patch("pydruid.client.urllib.request.urlopen")
def test_druid_returns_results(self, mock_urlopen):
# given
response = Mock()
Expand All @@ -126,28 +144,32 @@ def test_druid_returns_results(self, mock_urlopen):
"metric" : 100
} ]
} ]
""".encode("utf-8")
""".encode(
"utf-8"
)
response.info.return_value = {}
mock_urlopen.return_value = response
client = create_client()

# when
top = client.topn(
datasource="testdatasource",
granularity="all",
intervals="2015-12-29/pt1h",
aggregations={"count": doublesum("count")},
dimension="user_name",
metric="count",
filter=Dimension("user_lang") == "en",
threshold=1,
context={"timeout": 1000})
datasource="testdatasource",
granularity="all",
intervals="2015-12-29/pt1h",
aggregations={"count": doublesum("count")},
dimension="user_name",
metric="count",
filter=Dimension("user_lang") == "en",
threshold=1,
context={"timeout": 1000},
)

# then
assert top is not None
assert len(top.result) == 1
assert len(top.result[0]['result']) == 1
assert len(top.result[0]["result"]) == 1

@patch('pydruid.client.urllib.request.urlopen')
@patch("pydruid.client.urllib.request.urlopen")
def test_client_allows_to_export_last_query(self, mock_urlopen):
# given
response = Mock()
Expand All @@ -159,29 +181,79 @@ def test_client_allows_to_export_last_query(self, mock_urlopen):
"metric" : 100
} ]
} ]
""".encode("utf-8")
""".encode(
"utf-8"
)
response.info.return_value = {}
mock_urlopen.return_value = response
client = create_client()
client.topn(
datasource="testdatasource",
granularity="all",
intervals="2015-12-29/pt1h",
aggregations={"count": doublesum("count")},
dimension="user_name",
metric="count",
filter=Dimension("user_lang") == "en",
threshold=1,
context={"timeout": 1000})
datasource="testdatasource",
granularity="all",
intervals="2015-12-29/pt1h",
aggregations={"count": doublesum("count")},
dimension="user_name",
metric="count",
filter=Dimension("user_lang") == "en",
threshold=1,
context={"timeout": 1000},
)

# when / then
# assert that last_query.export_tsv method was called (it should throw an exception, given empty path)
# assert that last_query.export_tsv method was called (it should throw an
# exception, given empty path)
with pytest.raises(TypeError):
client.export_tsv(None)

@patch('pydruid.client.urllib.request.urlopen')
@patch("pydruid.client.urllib.request.urlopen")
def test_client_auth_creds(self, mock_urlopen):
client = create_client()
query = create_blank_query()
client.set_basic_auth_credentials('myUsername', 'myPassword')
client.set_basic_auth_credentials("myUsername", "myPassword")
headers, _, _ = client._prepare_url_headers_and_body(query)
assert headers["Authorization"] == "Basic bXlVc2VybmFtZTpteVBhc3N3b3Jk"

def test_client_allows_extra_headers(self):
client = create_client(headers={"Accept-Encoding": "gzip"})
query = create_blank_query()
headers, _, _ = client._prepare_url_headers_and_body(query)
assert headers['Authorization'] == "Basic bXlVc2VybmFtZTpteVBhc3N3b3Jk"
assert headers["Accept-Encoding"] == "gzip"

@patch("pydruid.client.urllib.request.urlopen")
def test_return_compressed_data(self, mock_urlopen):
# given
response = Mock()
response.read.return_value = compress(
"""
[ {
"timestamp" : "2015-12-30T14:14:49.000Z",
"result" : [ {
"dimension" : "aaaa",
"metric" : 100
} ]
} ]
""".encode(
"utf-8"
)
)
response.info.return_value = {"Content-Encoding": "gzip"}
mock_urlopen.return_value = response
client = create_client(headers={"Accept-Encoding": "gzip"})

# when
top = client.topn(
datasource="testdatasource",
granularity="all",
intervals="2015-12-29/pt1h",
aggregations={"count": doublesum("count")},
dimension="user_name",
metric="count",
filter=Dimension("user_lang") == "en",
threshold=1,
context={"timeout": 1000},
)

# then
assert top is not None
assert len(top.result) == 1
assert len(top.result[0]["result"]) == 1