Skip to content

Commit

Permalink
Way to set query params at connection level (#99)
Browse files Browse the repository at this point in the history
  • Loading branch information
shounakmk219 authored Apr 26, 2024
1 parent 9bb2908 commit bfdd768
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 4 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,15 @@ places = Table('places', MetaData(bind=engine), autoload=True)
print(select([func.count('*')], from_obj=places).scalar())
```

To configure query parameters (such as `timeoutMs=10000`) at the engine level
you may pass them while creating the engine. For example:

```python
engine = create_engine(
"pinot://localhost:8000/query/sql?controller=http://localhost:9000/",
connect_args={"query_options": "useMultistageEngine=true;timeoutMs=10000"})
```

#### Pass the Pinot database context

> [!IMPORTANT]
Expand Down
3 changes: 2 additions & 1 deletion examples/pinot_live.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ def run_pinot_live_example() -> None:

# Query pinot.live with sqlalchemy
engine = create_engine(
"pinot+https://pinot-broker.pinot.live:443/query/sql?controller=https://pinot-controller.pinot.live/"
"pinot+https://pinot-broker.pinot.live:443/query/sql?controller=https://pinot-controller.pinot.live/",
connect_args={"query_options": "timeoutMs=10000"}
) # uses HTTP by default :(

airlineStats = Table("airlineStats", MetaData(bind=engine), autoload=True, schema="default")
Expand Down
3 changes: 2 additions & 1 deletion examples/pinot_quickstart_hybrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ def run_pinot_quickstart_hybrid_sqlalchemy_example() -> None:
registry.register("pinot", "pinotdb.sqlalchemy", "PinotDialect")

engine = create_engine(
"pinot://localhost:8000/query/sql?controller=http://localhost:9000/"
"pinot://localhost:8000/query/sql?controller=http://localhost:9000/",
connect_args={"query_options": "timeoutMs=10000"}
) # uses HTTP by default :(
# engine = create_engine('pinot+http://localhost:8000/query/sql?controller=http://localhost:9000/')
# engine = create_engine('pinot+https://localhost:8000/query/sql?controller=http://localhost:9000/')
Expand Down
17 changes: 15 additions & 2 deletions pinotdb/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ def __init__(self, *args, **kwargs):
self._kwargs = kwargs
self.closed = False
self.use_multistage_engine = kwargs.get('use_multistage_engine', False)
self.query_options = kwargs.get('query_options', None)
self.cursors = []
self.session = kwargs.get('session')
self.is_session_external = False
Expand Down Expand Up @@ -191,7 +192,7 @@ def cursor(self):
@check_closed
def execute(self, operation, parameters=None):
cursor = self.cursor()
return cursor.execute(operation, parameters)
return cursor.execute(operation, parameters, self.query_options)

def __enter__(self):
return self.cursor()
Expand Down Expand Up @@ -239,7 +240,7 @@ async def close(self):
@check_closed
async def execute(self, operation, parameters=None):
cursor = self.cursor()
return await cursor.execute(operation, parameters)
return await cursor.execute(operation, parameters, self.query_options)

async def __aenter__(self):
return self.cursor()
Expand Down Expand Up @@ -294,6 +295,7 @@ def __init__(
# interface (e.g. new minor version).
session=None,
use_multistage_engine=False,
query_options=None,
**kwargs
):
self.url = parse.urlunparse(
Expand All @@ -316,6 +318,7 @@ def __init__(
self._debug = debug
self._preserve_types = preserve_types
self._use_multistage_engine = use_multistage_engine
self._query_options = query_options
self.acceptable_respond_fraction = acceptable_respond_fraction
if ignore_exception_error_codes:
self._ignore_exception_error_codes = set(
Expand Down Expand Up @@ -477,6 +480,11 @@ def normalize_query_response(self, input_query, query_response):
# to follow the same camel casing convention, but rather should stick
# to PEP-8 instead.
def execute(self, operation, parameters=None, queryOptions=None, **kwargs):
if not queryOptions:
queryOptions = ""
if self._query_options:
queryOptions = queryOptions + ";" + self._query_options

query = self.finalize_query_payload(
operation, parameters, queryOptions)

Expand Down Expand Up @@ -573,6 +581,11 @@ class AsyncCursor(Cursor):
async def execute(
self, operation, parameters=None, queryOptions=None, **kwargs
):
if not queryOptions:
queryOptions = ""
if self._query_options:
queryOptions = queryOptions + ";" + self._query_options

query = self.finalize_query_payload(
operation, parameters, queryOptions)

Expand Down

0 comments on commit bfdd768

Please sign in to comment.