Skip to content

Commit

Permalink
Cache can be safely disabled temporarily using the DisableCache conte…
Browse files Browse the repository at this point in the history
…xt manager. Basic profiling times in the postgres data source. Added 'needs_commit' parameter to query method to avoid default committing.
  • Loading branch information
Dani Ramirez committed Jan 30, 2019
1 parent 07054fc commit 2dd8546
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 37 deletions.
5 changes: 1 addition & 4 deletions src/core/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,4 @@ def get_config(self, key):
try:
return self._config[key]
except (TypeError, KeyError):
try:
return self._default_config[key]
except KeyError:
return None
return self._default_config[key]
16 changes: 16 additions & 0 deletions src/core/common/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from src.core.data_sources.base import DataSource


class DisabledCache:
data_source = None

def __init__(self, ds):
if ds and not isinstance(ds, DataSource):
raise TypeError('DisabledCache can only be applied to DataSource subclasses.')
self.data_source = ds

def __enter__(self):
self.data_source.disable_cache()

def __exit__(self, *args):
self.data_source.enable_cache()
13 changes: 10 additions & 3 deletions src/core/data_sources/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from time import time
from typing import Type

from ..caches.base import LongitudeCache
Expand Down Expand Up @@ -75,14 +76,15 @@ def enable_cache(self):
def disable_cache(self):
self._use_cache = False

def query(self, statement, params=None, use_cache=True, query_config=None, **opts):
def query(self, statement, params=None, use_cache=True, needs_commit=False, query_config=None, **opts):
"""
This method has to be called to interact with the data source. Each children class will have to implement
its own .execute_query(...) with the specific behavior for each interface.
:param statement: Unformatted SQL query
:param params: Values to be passed to the query when formatting it
:param use_cache: Bool to indicate if this specific query should use cache or not (default: True)
:param use_cache: Boolean to indicate if this specific query should use cache or not (default: True)
:param needs_commit: Boolean to indicate if this specific query needs to commit to db (default: False)
:param query_config: Specific query configuration. If None, the default one will be used.
:param opts:
:return: Result of querying the database
Expand All @@ -97,7 +99,10 @@ def query(self, statement, params=None, use_cache=True, query_config=None, **opt

normalized_response = None
if self._cache and self._use_cache and use_cache:
start = time()
normalized_response = self._cache.get(formatted_query)
if normalized_response:
normalized_response.profiling['cache_time'] = time() - start

if normalized_response:
normalized_response.mark_as_cached()
Expand All @@ -106,6 +111,7 @@ def query(self, statement, params=None, use_cache=True, query_config=None, **opt
for r in range(self.tries):
try:
response = self.execute_query(formatted_query=formatted_query,
needs_commit=needs_commit,
query_config=query_config,
**opts)
normalized_response = self.parse_response(response)
Expand All @@ -117,11 +123,12 @@ def query(self, statement, params=None, use_cache=True, query_config=None, **opt
self.logger.error('Query could not be executed. Retries left: %d' % (self.tries - r))
raise LongitudeRetriesExceeded

def execute_query(self, formatted_query, query_config, **opts):
def execute_query(self, formatted_query, needs_commit, query_config, **opts):
"""
:raise LongitudeQueryCannotBeExecutedException
:param formatted_query:
:param needs_commit:
:param query_config:
:param opts:
:return:
Expand Down
2 changes: 1 addition & 1 deletion src/core/data_sources/carto.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def is_ready(self):
else:
return False

def execute_query(self, formatted_query, query_config, **opts):
def execute_query(self, formatted_query, needs_commit, query_config, **opts):
parse_json = query_config.custom['parse_json']
do_post = query_config.custom['do_post']
format_ = query_config.custom['format']
Expand Down
28 changes: 20 additions & 8 deletions src/core/data_sources/postgres/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import psycopg2.extensions
from ..base import DataSource
from ..base import LongitudeQueryResponse
from time import time


class DefaultPostgresDataSource(DataSource):
Expand Down Expand Up @@ -39,15 +40,26 @@ def setup(self):
def is_ready(self):
return super().is_ready and self._conn and self._cursor

def execute_query(self, formatted_query, query_config, **opts):
def execute_query(self, formatted_query, needs_commit, query_config, **opts):
data = {
'fields': [],
'rows': [],
'profiling': {}
}

start = time()
self._cursor.execute(formatted_query)
data = None
data['profiling']['execute_time'] = time() - start

if self._cursor.description:
data = {
'fields': self._cursor.description,
'rows': self._cursor.fetchall()
}
self._conn.commit()
data['fields'] = self._cursor.description
data['rows'] = self._cursor.fetchall()

if needs_commit:
start = time()
self._conn.commit()
data['profiling']['commit_time'] = time() - start

return data

@staticmethod
Expand All @@ -57,5 +69,5 @@ def _type_as_string(type_id):
def parse_response(self, response):
if response:
fields_names = {n.name: self._type_as_string(n.type_code).name for n in response['fields']}
return LongitudeQueryResponse(rows=response['rows'], fields=fields_names)
return LongitudeQueryResponse(rows=response['rows'], fields=fields_names, profiling=response['profiling'])
return None
3 changes: 0 additions & 3 deletions src/core/tests/test_data_source_base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import os
from unittest import TestCase, mock

from src.core.common.exceptions import LongitudeConfigError
from ..caches.base import LongitudeCache
from ..data_sources.base import DataSource, DataSourceQueryConfig, LongitudeQueryResponse

Expand Down Expand Up @@ -75,8 +74,6 @@ def test_cache_miss(self, execute_query_mock, parse_response_mock):
self.assertEqual('normalized response from data source', ds.query('some_query_not_in_cache'))
parse_response_mock.assert_called_once_with('some response from the server')



def test_abstract_methods_are_not_implemented(self):
ds = DataSource({})

Expand Down
3 changes: 2 additions & 1 deletion src/samples/carto_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
if ds.is_ready:
try:
data = ds.query('select * from %s limit 30' % CARTO_TABLE_NAME)
print(data)
[print(r) for r in data.rows]
print(data.profiling)
except LongitudeRetriesExceeded:
print("Too many retries and no success...")
else:
Expand Down
19 changes: 8 additions & 11 deletions src/samples/carto_sample_with_redis_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@
import sys

sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..'))
from src.core.common.helpers import DisabledCache
from src.core.caches.redis import RedisCache
from src.core.data_sources.base import LongitudeRetriesExceeded
from src.core.common.exceptions import LongitudeRetriesExceeded
from src.core.data_sources.carto import CartoDataSource
from src.samples.carto_sample_config import CARTO_API_KEY, CARTO_USER, CARTO_TABLE_NAME

Expand Down Expand Up @@ -56,17 +57,13 @@
print("It took %s with cache" % elapsed_with_cache)
print('Uses cache? ' + str(cached_data.comes_from_cache))

# Data is the same...
assert str(data) == str(cached_data)

# You can also disable the cache for a while (nothing gets read or written)
ds.disable_cache()
start = time.time()
data = ds.query(REPEATED_QUERY)
elapsed = time.time() - start
print('It took %s with disabled cache' % str(elapsed))
print('Uses cache? ' + str(data.comes_from_cache))
ds.enable_cache()
with DisabledCache(ds):
start = time.time()
data = ds.query(REPEATED_QUERY)
elapsed = time.time() - start
print('It took %s with disabled cache' % str(elapsed))
print('Uses cache? ' + str(data.comes_from_cache))

# Or disable specific queries via query_config (nothing gets read or written)
query_config = ds.copy_default_query_config()
Expand Down
18 changes: 12 additions & 6 deletions src/samples/postgres_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,18 @@
ds.setup()
if ds.is_ready:
try:
ds.query("drop table if exists users")
ds.query(
'create table users(id serial PRIMARY KEY, name varchar(50) UNIQUE NOT NULL, password varchar(50))')
ds.query("insert into users(name, password) values('longitude', 'password')")
data = ds.query('select * from users')
print(data)
r0 = ds.query("drop table if exists users")
r1 = ds.query(
'create table users(id serial PRIMARY KEY, name varchar(50) UNIQUE NOT NULL, password varchar(50))',
needs_commit=True)
print(r1.profiling)

r2 = ds.query("insert into users(name, password) values('longitude', 'password')", needs_commit=True)
print(r2.profiling)

r3 = ds.query('select * from users')
print(r3.rows)
print(r3.profiling)

except LongitudeRetriesExceeded:
print("Too many retries and no success...")
Expand Down

0 comments on commit 2dd8546

Please sign in to comment.