From c2ae32daa7bdc51a921d531e83e0ec662f5b6709 Mon Sep 17 00:00:00 2001 From: Thierry Schellenbach Date: Tue, 15 May 2012 14:10:05 +0200 Subject: [PATCH 01/10] added support for a prefix based router, including tests --- nydus/db/routers/__init__.py | 1 + nydus/db/routers/prefix_partition.py | 43 ++++++++++++++++++++++++++++ nydus/db/routers/redis.py | 4 +-- tests/nydus/db/routers/tests.py | 31 ++++++++++++++++++++ 4 files changed, 77 insertions(+), 2 deletions(-) create mode 100644 nydus/db/routers/prefix_partition.py diff --git a/nydus/db/routers/__init__.py b/nydus/db/routers/__init__.py index a33f475..67ee82d 100644 --- a/nydus/db/routers/__init__.py +++ b/nydus/db/routers/__init__.py @@ -8,3 +8,4 @@ from .base import BaseRouter, RoundRobinRouter +from .prefix_partition import PrefixPartitionRouter \ No newline at end of file diff --git a/nydus/db/routers/prefix_partition.py b/nydus/db/routers/prefix_partition.py new file mode 100644 index 0000000..c14ec50 --- /dev/null +++ b/nydus/db/routers/prefix_partition.py @@ -0,0 +1,43 @@ +from nydus.db.routers import BaseRouter + +class PrefixPartitionRouter(BaseRouter): + ''' + Routes based on the configured prefixes + + Example config: + + 'redis': { + 'engine': 'nydus.db.backends.redis.Redis', + 'router': 'django_redis.nydus_router.PrefixPartitionRouter', + 'hosts': { + 0: {'db': 0, 'host': 'default.redis.goteam.be', 'port': 6379}, + 'user:loves:': {'db': 1, 'host': 'default.redis.goteam.be', 'port': 6379}, + 'loves:': {'db': 2, 'host': 'default.redis.goteam.be', 'port': 6379}, + 'hash:entity:': {'db': 0, 'host': 'entities.redis.goteam.be', 'port': 6379}, + } + } + + We route to one and only one redis. + Use a seperate config if you want hashing based partitioning. + ''' + + def _route(self, cluster, attr, key, *args, **kwargs): + """ + Perform routing and return db_nums + """ + assert 'default' in cluster.hosts, 'The prefix router requires a default key to route to' + hosts = None + if key: + for host in cluster.hosts: + if key.startswith(str(host)): + hosts = [host] + if not hosts: + hosts = ['default'] + elif func == 'pipeline': + raise ValueError('Pipelines requires a key for proper routing') + + if not hosts: + raise ValueError, 'I didnt expect this while writing the code so lets fail' + + return hosts + diff --git a/nydus/db/routers/redis.py b/nydus/db/routers/redis.py index 6b44330..41a2855 100644 --- a/nydus/db/routers/redis.py +++ b/nydus/db/routers/redis.py @@ -6,8 +6,8 @@ :license: Apache License 2.0, see LICENSE for more details. """ -from nydus.db.routers import RoundRobinRouter +from nydus.db.routers import RoundRobinRouter, PrefixPartitionRouter from nydus.db.routers.keyvalue import ConsistentHashingRouter, PartitionRouter -__all__ = ('ConsistentHashingRouter', 'PartitionRouter', 'RoundRobinRouter') +__all__ = ('PrefixPartitionRouter', 'ConsistentHashingRouter', 'PartitionRouter', 'RoundRobinRouter') diff --git a/tests/nydus/db/routers/tests.py b/tests/nydus/db/routers/tests.py index 0a822a6..3adfb0c 100644 --- a/tests/nydus/db/routers/tests.py +++ b/tests/nydus/db/routers/tests.py @@ -14,6 +14,37 @@ from nydus.db.routers.keyvalue import ConsistentHashingRouter, PartitionRouter +class PrefixPartitionTest(TestCase): + def test_partitions(self): + ''' + Verify if we write ton one and only one redis database + ''' + from nydus.db import create_cluster + import mock + engine = 'nydus.db.backends.redis.Redis' + router = 'nydus.db.routers.redis.PrefixPartitionRouter' + nydus_config = dict(engine=engine, router=router, hosts={ + 'default': {'db': 0, 'host': 'localhost', 'port': 6379}, + 'user:loves:': {'db': 1, 'host': 'localhost', 'port': 6379} + }) + redis = create_cluster(nydus_config) + + keys = [ + ('user:loves:test', 1), + ('default_test',0), + ('hash:entity:test', 0) + ] + + for key, redis_db in keys: + with mock.patch('redis.client.StrictRedis.execute_command') as fake_set: + result = redis.set(key, '1') + args, kwargs = fake_set.call_args + instance, cmd, key, key_value = args + connection_kwargs = instance.connection_pool.connection_kwargs + db = connection_kwargs['db'] + self.assertEqual(db, redis_db) + + class DummyConnection(BaseConnection): def __init__(self, i): self.host = 'dummyhost' From f98d64b66d94760b466dff416cb65c5dcd3a4236 Mon Sep 17 00:00:00 2001 From: Thierry Schellenbach Date: Tue, 15 May 2012 14:11:48 +0200 Subject: [PATCH 02/10] added support for a prefix based router, including tests --- nydus/db/base.py | 1 + 1 file changed, 1 insertion(+) diff --git a/nydus/db/base.py b/nydus/db/base.py index c8fb857..4d60fe0 100644 --- a/nydus/db/base.py +++ b/nydus/db/base.py @@ -82,6 +82,7 @@ def _execute(self, attr, args, kwargs): results = [] for conn in connections: + print conn for retry in xrange(self.max_connection_retries): try: results.append(getattr(conn, attr)(*args, **kwargs)) From eb3da71216b08967f0ad91c9479da1a146373d7e Mon Sep 17 00:00:00 2001 From: Thierry Schellenbach Date: Tue, 15 May 2012 14:41:05 +0200 Subject: [PATCH 03/10] added support for silently failing redis servers, by setting host:dict(fail_silently=True) --- nydus/db/backends/base.py | 1 + nydus/db/base.py | 29 +++++++++++++++++++----- tests/nydus/db/connections/tests.py | 35 +++++++++++++++++++++++++++++ 3 files changed, 60 insertions(+), 5 deletions(-) diff --git a/nydus/db/backends/base.py b/nydus/db/backends/base.py index 34760ab..79dc858 100644 --- a/nydus/db/backends/base.py +++ b/nydus/db/backends/base.py @@ -44,6 +44,7 @@ class BaseConnection(object): def __init__(self, num, **options): self._connection = None self.num = num + self.options = options @property def identifier(self): diff --git a/nydus/db/base.py b/nydus/db/base.py index 4d60fe0..8c852c3 100644 --- a/nydus/db/base.py +++ b/nydus/db/base.py @@ -9,7 +9,8 @@ from collections import defaultdict from nydus.db.routers import BaseRouter from nydus.utils import import_string, ThreadPool - +import logging +logger = logging.getLogger(__name__) def create_cluster(settings): """ @@ -79,20 +80,38 @@ def __iter__(self): def _execute(self, attr, args, kwargs): connections = self._connections_for(attr, *args, **kwargs) - results = [] for conn in connections: - print conn + conn_options = conn.options + fail_silently = conn_options.get('fail_silently', False) + for retry in xrange(self.max_connection_retries): try: results.append(getattr(conn, attr)(*args, **kwargs)) except tuple(conn.retryable_exceptions), e: + #retry, raise an error or fail silently + error = None if not self.router.retryable: - raise e + error = e elif retry == self.max_connection_retries - 1: - raise self.MaxRetriesExceededError(e) + error = self.MaxRetriesExceededError(e) else: conn = self._connections_for(attr, retry_for=conn.num, *args, **kwargs)[0] + + if error and fail_silently: + #fail silently by returning None, usefull for cache like usage of redis + if self.router.retryable: + logger.error('failing silently after %s retries for conn %s with command %s', self.max_connection_retries, conn, attr) + else: + logger.error('failing silently for conn %s with command %s', conn, attr) + results = [None] + break + else: + raise error + + #going for another retry + logger.warn('retrying connection %s with command %s', conn, attr) + else: break diff --git a/tests/nydus/db/connections/tests.py b/tests/nydus/db/connections/tests.py index 311bcb9..a00a205 100644 --- a/tests/nydus/db/connections/tests.py +++ b/tests/nydus/db/connections/tests.py @@ -32,6 +32,41 @@ def get_dbs(self, cluster, attr, key=None, *args, **kwargs): return [0] +class BrokenRedisTest(BaseTest): + def test_broken_redis(self): + ''' + Verify if we write ton one and only one redis database + ''' + from nydus.db import create_cluster + import mock + + engine = 'nydus.db.backends.redis.Redis' + router = 'nydus.db.routers.redis.PrefixPartitionRouter' + nydus_config = dict(engine=engine, router=router, hosts={ + 'default': {'db': 0, 'host': 'localhost', 'port': 6380, 'fail_silently': True}, + 'user': {'db': 0, 'host': 'localhost', 'port': 6380, 'fail_silently': False}, + }) + redis = create_cluster(nydus_config) + + #test silent failures + key = 'default_test' + set_result = redis.set(key, '1') + assert not set_result + result = redis.get(key) + assert not result + + #assert by default we fail loudly + from redis.exceptions import ConnectionError + try: + key = 'user:loves:test' + set_result = redis.set(key, '1') + result = redis.get(key) + except ConnectionError, e: + pass + else: + raise Exception, 'we were hoping for a connection error' + + class ClusterTest(BaseTest): def test_create_cluster(self): c = create_cluster({ From 31cd00e08e43e7c8a2e89252b315ba6f94715aff Mon Sep 17 00:00:00 2001 From: Thierry Schellenbach Date: Tue, 15 May 2012 15:21:19 +0200 Subject: [PATCH 04/10] upgraded version number --- nydus/__init__.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/nydus/__init__.py b/nydus/__init__.py index 3c6deb4..28a5686 100644 --- a/nydus/__init__.py +++ b/nydus/__init__.py @@ -11,3 +11,6 @@ .get_distribution('nydus').version except Exception, e: VERSION = 'unknown' + +#Just make sure we don't clash with the source project +VERSION = '10.0.0' \ No newline at end of file From 40216069f1789f7495a864c7c95ec206093dbd73 Mon Sep 17 00:00:00 2001 From: Thierry Schellenbach Date: Tue, 15 May 2012 15:34:51 +0200 Subject: [PATCH 05/10] slightly udpated docs --- nydus/db/routers/prefix_partition.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nydus/db/routers/prefix_partition.py b/nydus/db/routers/prefix_partition.py index c14ec50..ef065de 100644 --- a/nydus/db/routers/prefix_partition.py +++ b/nydus/db/routers/prefix_partition.py @@ -10,7 +10,7 @@ class PrefixPartitionRouter(BaseRouter): 'engine': 'nydus.db.backends.redis.Redis', 'router': 'django_redis.nydus_router.PrefixPartitionRouter', 'hosts': { - 0: {'db': 0, 'host': 'default.redis.goteam.be', 'port': 6379}, + 'default': {'db': 0, 'host': 'default.redis.goteam.be', 'port': 6379}, 'user:loves:': {'db': 1, 'host': 'default.redis.goteam.be', 'port': 6379}, 'loves:': {'db': 2, 'host': 'default.redis.goteam.be', 'port': 6379}, 'hash:entity:': {'db': 0, 'host': 'entities.redis.goteam.be', 'port': 6379}, From db79807e3def2375f4dc3ab7679a7f66c446e6ef Mon Sep 17 00:00:00 2001 From: Thierry Schellenbach Date: Wed, 16 May 2012 15:46:54 +0200 Subject: [PATCH 06/10] fail silently support for .map() --- nydus/db/backends/redis.py | 15 +++++++++- nydus/db/base.py | 3 +- tests/nydus/db/connections/tests.py | 43 +++++++++++++++++++---------- 3 files changed, 43 insertions(+), 18 deletions(-) diff --git a/nydus/db/backends/redis.py b/nydus/db/backends/redis.py index 98237c1..d0f8fef 100644 --- a/nydus/db/backends/redis.py +++ b/nydus/db/backends/redis.py @@ -15,6 +15,8 @@ class RedisPipeline(BasePipeline): + silent_exceptions = frozenset([RedisError]) + def __init__(self, connection): self.pending = [] self.connection = connection @@ -25,7 +27,17 @@ def add(self, command): getattr(self.pipe, command._attr)(*command._args, **command._kwargs) def execute(self): - return self.pipe.execute() + fail_silently = self.connection.fail_silently + try: + results = self.pipe.execute() + except tuple(self.silent_exceptions), e: + if fail_silently: + results = [] + else: + raise + + return results + class Redis(BaseConnection): @@ -39,6 +51,7 @@ def __init__(self, host='localhost', port=6379, db=0, timeout=None, password=Non self.db = db self.timeout = timeout self.__password = password + self.fail_silently = options.get('fail_silently', False) super(Redis, self).__init__(**options) @property diff --git a/nydus/db/base.py b/nydus/db/base.py index 8c852c3..a37ef1f 100644 --- a/nydus/db/base.py +++ b/nydus/db/base.py @@ -82,8 +82,7 @@ def _execute(self, attr, args, kwargs): connections = self._connections_for(attr, *args, **kwargs) results = [] for conn in connections: - conn_options = conn.options - fail_silently = conn_options.get('fail_silently', False) + fail_silently = conn.fail_silently for retry in xrange(self.max_connection_retries): try: diff --git a/tests/nydus/db/connections/tests.py b/tests/nydus/db/connections/tests.py index a00a205..8bb3266 100644 --- a/tests/nydus/db/connections/tests.py +++ b/tests/nydus/db/connections/tests.py @@ -32,40 +32,53 @@ def get_dbs(self, cluster, attr, key=None, *args, **kwargs): return [0] + class BrokenRedisTest(BaseTest): - def test_broken_redis(self): - ''' - Verify if we write ton one and only one redis database - ''' + def setUp(self): from nydus.db import create_cluster - import mock - engine = 'nydus.db.backends.redis.Redis' router = 'nydus.db.routers.redis.PrefixPartitionRouter' nydus_config = dict(engine=engine, router=router, hosts={ - 'default': {'db': 0, 'host': 'localhost', 'port': 6380, 'fail_silently': True}, - 'user': {'db': 0, 'host': 'localhost', 'port': 6380, 'fail_silently': False}, + 'default': {'db': 0, 'host': 'localhost', 'port': 6380, 'fail_silently': False}, + 'simple_cache': {'db': 0, 'host': 'localhost', 'port': 6380, 'fail_silently': True}, + 'app_critical': {'db': 0, 'host': 'localhost', 'port': 6380, 'fail_silently': False}, }) redis = create_cluster(nydus_config) - + self.redis = redis + + def test_broken_redis(self): #test silent failures - key = 'default_test' - set_result = redis.set(key, '1') + key = 'simple_cache:test' + set_result = self.redis.set(key, '1') assert not set_result - result = redis.get(key) + result = self.redis.get(key) assert not result #assert by default we fail loudly from redis.exceptions import ConnectionError try: - key = 'user:loves:test' - set_result = redis.set(key, '1') - result = redis.get(key) + key = 'app_critical:test' + set_result = self.redis.set(key, '1') + result = self.redis.get(key) except ConnectionError, e: pass else: raise Exception, 'we were hoping for a connection error' + def test_map(self): + keys = ['simple_cache:test', 'simple_cache:test_two', 'app_critical:test'] + with self.redis.map() as conn: + results = [conn.get(k) for k in keys] + + for result, key in zip(results, keys): + result_object = result._wrapped + if 'app_critical' in key: + assert 'Error' in result_object + else: + assert result_object is None, 'we should get None when failing' + + + class ClusterTest(BaseTest): def test_create_cluster(self): From bc905a72d4d869d0f75da8518c0b16cd8aa460f1 Mon Sep 17 00:00:00 2001 From: Thierry Schellenbach Date: Wed, 16 May 2012 15:48:00 +0200 Subject: [PATCH 07/10] updated router example --- nydus/db/routers/prefix_partition.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nydus/db/routers/prefix_partition.py b/nydus/db/routers/prefix_partition.py index ef065de..623827c 100644 --- a/nydus/db/routers/prefix_partition.py +++ b/nydus/db/routers/prefix_partition.py @@ -8,7 +8,7 @@ class PrefixPartitionRouter(BaseRouter): 'redis': { 'engine': 'nydus.db.backends.redis.Redis', - 'router': 'django_redis.nydus_router.PrefixPartitionRouter', + 'router': 'nydus.db.routers.redis.PrefixPartitionRouter', 'hosts': { 'default': {'db': 0, 'host': 'default.redis.goteam.be', 'port': 6379}, 'user:loves:': {'db': 1, 'host': 'default.redis.goteam.be', 'port': 6379}, From 1bdefc019601503659dfcd211d8e1af9c9e3d8b4 Mon Sep 17 00:00:00 2001 From: Thierry Schellenbach Date: Wed, 16 May 2012 15:51:44 +0200 Subject: [PATCH 08/10] improved error reporting when not able to find hosts --- nydus/db/routers/prefix_partition.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/nydus/db/routers/prefix_partition.py b/nydus/db/routers/prefix_partition.py index 623827c..ccd0b31 100644 --- a/nydus/db/routers/prefix_partition.py +++ b/nydus/db/routers/prefix_partition.py @@ -33,11 +33,12 @@ def _route(self, cluster, attr, key, *args, **kwargs): hosts = [host] if not hosts: hosts = ['default'] - elif func == 'pipeline': + elif attr == 'pipeline': raise ValueError('Pipelines requires a key for proper routing') if not hosts: - raise ValueError, 'I didnt expect this while writing the code so lets fail' + error_message = 'The prefix partition router couldnt find a host for command %s and key %s' % (attr, key) + raise ValueError(error_message) return hosts From 965170d6654c5d1dc7509c5341339866119caf60 Mon Sep 17 00:00:00 2001 From: Thierry Schellenbach Date: Wed, 16 May 2012 16:33:25 +0200 Subject: [PATCH 09/10] more testing and better errors for prefix partition routing --- nydus/db/routers/prefix_partition.py | 18 ++++++++++--- tests/nydus/db/routers/tests.py | 40 +++++++++++++++++++++++----- 2 files changed, 47 insertions(+), 11 deletions(-) diff --git a/nydus/db/routers/prefix_partition.py b/nydus/db/routers/prefix_partition.py index ccd0b31..6a2edb9 100644 --- a/nydus/db/routers/prefix_partition.py +++ b/nydus/db/routers/prefix_partition.py @@ -21,11 +21,22 @@ class PrefixPartitionRouter(BaseRouter): Use a seperate config if you want hashing based partitioning. ''' + def _pre_routing(self, cluster, attr, key, *args, **kwargs): + """ + Requesting a pipeline without a key to partition on is just plain wrong. + We raise a valueError if you try + """ + if not key and attr == 'pipeline': + raise ValueError('Pipelines requires a key for proper routing') + return key + def _route(self, cluster, attr, key, *args, **kwargs): """ Perform routing and return db_nums """ - assert 'default' in cluster.hosts, 'The prefix router requires a default key to route to' + if 'default' not in cluster.hosts: + error_message = 'The prefix router requires a default host' + raise ValueError(error_message) hosts = None if key: for host in cluster.hosts: @@ -33,9 +44,8 @@ def _route(self, cluster, attr, key, *args, **kwargs): hosts = [host] if not hosts: hosts = ['default'] - elif attr == 'pipeline': - raise ValueError('Pipelines requires a key for proper routing') - + + #sanity check, dont see how this can happen if not hosts: error_message = 'The prefix partition router couldnt find a host for command %s and key %s' % (attr, key) raise ValueError(error_message) diff --git a/tests/nydus/db/routers/tests.py b/tests/nydus/db/routers/tests.py index 3adfb0c..52ab8a1 100644 --- a/tests/nydus/db/routers/tests.py +++ b/tests/nydus/db/routers/tests.py @@ -14,13 +14,9 @@ from nydus.db.routers.keyvalue import ConsistentHashingRouter, PartitionRouter -class PrefixPartitionTest(TestCase): - def test_partitions(self): - ''' - Verify if we write ton one and only one redis database - ''' +class PrefixPartitionTest(BaseTest): + def setUp(self): from nydus.db import create_cluster - import mock engine = 'nydus.db.backends.redis.Redis' router = 'nydus.db.routers.redis.PrefixPartitionRouter' nydus_config = dict(engine=engine, router=router, hosts={ @@ -28,6 +24,13 @@ def test_partitions(self): 'user:loves:': {'db': 1, 'host': 'localhost', 'port': 6379} }) redis = create_cluster(nydus_config) + self.redis = redis + + def test_partitions(self): + ''' + Verify if we write ton one and only one redis database + ''' + import mock keys = [ ('user:loves:test', 1), @@ -37,12 +40,35 @@ def test_partitions(self): for key, redis_db in keys: with mock.patch('redis.client.StrictRedis.execute_command') as fake_set: - result = redis.set(key, '1') + result = self.redis.set(key, '1') args, kwargs = fake_set.call_args instance, cmd, key, key_value = args connection_kwargs = instance.connection_pool.connection_kwargs db = connection_kwargs['db'] self.assertEqual(db, redis_db) + + def test_missing_default(self): + from nydus.db import create_cluster + from functools import partial + + engine = 'nydus.db.backends.redis.Redis' + router = 'nydus.db.routers.redis.PrefixPartitionRouter' + nydus_config = dict(engine=engine, router=router, hosts={ + 'base': {'db': 0, 'host': 'localhost', 'port': 6379}, + 'user:loves:': {'db': 1, 'host': 'localhost', 'port': 6379} + }) + redis = create_cluster(nydus_config) + + redis_call = partial(redis.get, 'thiswillbreak') + self.assertRaises(ValueError, redis_call) + + def test_pipeline(self): + redis = self.redis + #we prefer map above direct pipeline usage, but if you really need it: + redis.pipeline('default:test') + + #this should fail as we require a key + self.assertRaises(ValueError, redis.pipeline) class DummyConnection(BaseConnection): From 04f22baad7c532f009748a54a1d07adc389f65de Mon Sep 17 00:00:00 2001 From: Thierry Schellenbach Date: Wed, 16 May 2012 16:33:56 +0200 Subject: [PATCH 10/10] upgraded the version --- nydus/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nydus/__init__.py b/nydus/__init__.py index 28a5686..3eb26d4 100644 --- a/nydus/__init__.py +++ b/nydus/__init__.py @@ -13,4 +13,4 @@ VERSION = 'unknown' #Just make sure we don't clash with the source project -VERSION = '10.0.0' \ No newline at end of file +VERSION = '10.0.1' \ No newline at end of file