Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prefix based router support #12

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
3 changes: 3 additions & 0 deletions nydus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@
.get_distribution('nydus').version
except Exception, e:
VERSION = 'unknown'

#Just make sure we don't clash with the source project
VERSION = '10.0.1'
1 change: 1 addition & 0 deletions nydus/db/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class BaseConnection(object):
def __init__(self, num, **options):
self._connection = None
self.num = num
self.options = options

@property
def identifier(self):
Expand Down
15 changes: 14 additions & 1 deletion nydus/db/backends/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@


class RedisPipeline(BasePipeline):
silent_exceptions = frozenset([RedisError])

def __init__(self, connection):
self.pending = []
self.connection = connection
Expand All @@ -25,7 +27,17 @@ def add(self, command):
getattr(self.pipe, command._attr)(*command._args, **command._kwargs)

def execute(self):
return self.pipe.execute()
fail_silently = self.connection.fail_silently
try:
results = self.pipe.execute()
except tuple(self.silent_exceptions), e:
if fail_silently:
results = []
else:
raise

return results



class Redis(BaseConnection):
Expand All @@ -39,6 +51,7 @@ def __init__(self, host='localhost', port=6379, db=0, timeout=None, password=Non
self.db = db
self.timeout = timeout
self.__password = password
self.fail_silently = options.get('fail_silently', False)
super(Redis, self).__init__(**options)

@property
Expand Down
27 changes: 23 additions & 4 deletions nydus/db/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
from collections import defaultdict
from nydus.db.routers import BaseRouter
from nydus.utils import import_string, ThreadPool

import logging
logger = logging.getLogger(__name__)

def create_cluster(settings):
"""
Expand Down Expand Up @@ -79,19 +80,37 @@ def __iter__(self):

def _execute(self, attr, args, kwargs):
connections = self._connections_for(attr, *args, **kwargs)

results = []
for conn in connections:
fail_silently = conn.fail_silently

for retry in xrange(self.max_connection_retries):
try:
results.append(getattr(conn, attr)(*args, **kwargs))
except tuple(conn.retryable_exceptions), e:
#retry, raise an error or fail silently
error = None
if not self.router.retryable:
raise e
error = e
elif retry == self.max_connection_retries - 1:
raise self.MaxRetriesExceededError(e)
error = self.MaxRetriesExceededError(e)
else:
conn = self._connections_for(attr, retry_for=conn.num, *args, **kwargs)[0]

if error and fail_silently:
#fail silently by returning None, usefull for cache like usage of redis
if self.router.retryable:
logger.error('failing silently after %s retries for conn %s with command %s', self.max_connection_retries, conn, attr)
else:
logger.error('failing silently for conn %s with command %s', conn, attr)
results = [None]
break
else:
raise error

#going for another retry
logger.warn('retrying connection %s with command %s', conn, attr)

else:
break

Expand Down
1 change: 1 addition & 0 deletions nydus/db/routers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@

from .base import BaseRouter, RoundRobinRouter

from .prefix_partition import PrefixPartitionRouter
54 changes: 54 additions & 0 deletions nydus/db/routers/prefix_partition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from nydus.db.routers import BaseRouter

class PrefixPartitionRouter(BaseRouter):
'''
Routes based on the configured prefixes

Example config:

'redis': {
'engine': 'nydus.db.backends.redis.Redis',
'router': 'nydus.db.routers.redis.PrefixPartitionRouter',
'hosts': {
'default': {'db': 0, 'host': 'default.redis.goteam.be', 'port': 6379},
'user:loves:': {'db': 1, 'host': 'default.redis.goteam.be', 'port': 6379},
'loves:': {'db': 2, 'host': 'default.redis.goteam.be', 'port': 6379},
'hash:entity:': {'db': 0, 'host': 'entities.redis.goteam.be', 'port': 6379},
}
}

We route to one and only one redis.
Use a seperate config if you want hashing based partitioning.
'''

def _pre_routing(self, cluster, attr, key, *args, **kwargs):
"""
Requesting a pipeline without a key to partition on is just plain wrong.
We raise a valueError if you try
"""
if not key and attr == 'pipeline':
raise ValueError('Pipelines requires a key for proper routing')
return key

def _route(self, cluster, attr, key, *args, **kwargs):
"""
Perform routing and return db_nums
"""
if 'default' not in cluster.hosts:
error_message = 'The prefix router requires a default host'
raise ValueError(error_message)
hosts = None
if key:
for host in cluster.hosts:
if key.startswith(str(host)):
hosts = [host]
if not hosts:
hosts = ['default']

#sanity check, dont see how this can happen
if not hosts:
error_message = 'The prefix partition router couldnt find a host for command %s and key %s' % (attr, key)
raise ValueError(error_message)

return hosts

4 changes: 2 additions & 2 deletions nydus/db/routers/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
:license: Apache License 2.0, see LICENSE for more details.
"""

from nydus.db.routers import RoundRobinRouter
from nydus.db.routers import RoundRobinRouter, PrefixPartitionRouter
from nydus.db.routers.keyvalue import ConsistentHashingRouter, PartitionRouter

__all__ = ('ConsistentHashingRouter', 'PartitionRouter', 'RoundRobinRouter')
__all__ = ('PrefixPartitionRouter', 'ConsistentHashingRouter', 'PartitionRouter', 'RoundRobinRouter')

48 changes: 48 additions & 0 deletions tests/nydus/db/connections/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,54 @@ def get_dbs(self, cluster, attr, key=None, *args, **kwargs):
return [0]



class BrokenRedisTest(BaseTest):
def setUp(self):
from nydus.db import create_cluster
engine = 'nydus.db.backends.redis.Redis'
router = 'nydus.db.routers.redis.PrefixPartitionRouter'
nydus_config = dict(engine=engine, router=router, hosts={
'default': {'db': 0, 'host': 'localhost', 'port': 6380, 'fail_silently': False},
'simple_cache': {'db': 0, 'host': 'localhost', 'port': 6380, 'fail_silently': True},
'app_critical': {'db': 0, 'host': 'localhost', 'port': 6380, 'fail_silently': False},
})
redis = create_cluster(nydus_config)
self.redis = redis

def test_broken_redis(self):
#test silent failures
key = 'simple_cache:test'
set_result = self.redis.set(key, '1')
assert not set_result
result = self.redis.get(key)
assert not result

#assert by default we fail loudly
from redis.exceptions import ConnectionError
try:
key = 'app_critical:test'
set_result = self.redis.set(key, '1')
result = self.redis.get(key)
except ConnectionError, e:
pass
else:
raise Exception, 'we were hoping for a connection error'

def test_map(self):
keys = ['simple_cache:test', 'simple_cache:test_two', 'app_critical:test']
with self.redis.map() as conn:
results = [conn.get(k) for k in keys]

for result, key in zip(results, keys):
result_object = result._wrapped
if 'app_critical' in key:
assert 'Error' in result_object
else:
assert result_object is None, 'we should get None when failing'




class ClusterTest(BaseTest):
def test_create_cluster(self):
c = create_cluster({
Expand Down
57 changes: 57 additions & 0 deletions tests/nydus/db/routers/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,63 @@
from nydus.db.routers.keyvalue import ConsistentHashingRouter, PartitionRouter


class PrefixPartitionTest(BaseTest):
def setUp(self):
from nydus.db import create_cluster
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency, can you move the imports to the top of the file?

engine = 'nydus.db.backends.redis.Redis'
router = 'nydus.db.routers.redis.PrefixPartitionRouter'
nydus_config = dict(engine=engine, router=router, hosts={
'default': {'db': 0, 'host': 'localhost', 'port': 6379},
'user:loves:': {'db': 1, 'host': 'localhost', 'port': 6379}
})
redis = create_cluster(nydus_config)
self.redis = redis

def test_partitions(self):
'''
Verify if we write ton one and only one redis database
'''
import mock

keys = [
('user:loves:test', 1),
('default_test',0),
('hash:entity:test', 0)
]

for key, redis_db in keys:
with mock.patch('redis.client.StrictRedis.execute_command') as fake_set:
result = self.redis.set(key, '1')
args, kwargs = fake_set.call_args
instance, cmd, key, key_value = args
connection_kwargs = instance.connection_pool.connection_kwargs
db = connection_kwargs['db']
self.assertEqual(db, redis_db)

def test_missing_default(self):
from nydus.db import create_cluster
from functools import partial

engine = 'nydus.db.backends.redis.Redis'
router = 'nydus.db.routers.redis.PrefixPartitionRouter'
nydus_config = dict(engine=engine, router=router, hosts={
'base': {'db': 0, 'host': 'localhost', 'port': 6379},
'user:loves:': {'db': 1, 'host': 'localhost', 'port': 6379}
})
redis = create_cluster(nydus_config)

redis_call = partial(redis.get, 'thiswillbreak')
self.assertRaises(ValueError, redis_call)

def test_pipeline(self):
redis = self.redis
#we prefer map above direct pipeline usage, but if you really need it:
redis.pipeline('default:test')

#this should fail as we require a key
self.assertRaises(ValueError, redis.pipeline)


class DummyConnection(BaseConnection):
def __init__(self, i):
self.host = 'dummyhost'
Expand Down