Skip to content

Commit

Permalink
Merge pull request #5 from GeographicaGS/pcaro/29/aena-last-data
Browse files Browse the repository at this point in the history
Redis backend
  • Loading branch information
ManuelLR authored Jul 25, 2019
2 parents 1a59610 + 1683566 commit 0f0cecd
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 8 deletions.
8 changes: 8 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ GLUTEMULO_CARTO_USER=pcaro
GLUTEMULO_CARTO_API_KEY=AADDAADDSS
GLUTEMULO_CARTO_ORG=

# when your backend in redis
GLUTEMULO_REDIS_HOST=redis
#GLUTEMULO_REDIS_PASSWORD=redis_password
GLUTEMULO_REDIS_PORT=6379
#GLUTEMULO_REDIS_DB=0
#GLUTEMULO_REDIS_KEY_PREFIX="gluto:"
#GLUTEMULO_REDIS_EXPIRE_SECONDS=900

GLUTEMULO_INGESTOR_TOPIC=simple-topic
GLUTEMULO_INGESTOR_BOOTSTRAP_SERVERS=192.168.240.41:9092
GLUTEMULO_INGESTOR_GROUP_ID=A group
Expand Down
16 changes: 16 additions & 0 deletions glutemulo/backend/redis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import redis
import json
import time


class RedisBackend:
def __init__(self, expires_seconds, key_prefix="gluto:", **redis_conf):
self.expires_seconds = expires_seconds
self.key_prefix = key_prefix
# self.redis = redis.StrictRedis(host='localhost', port=6379, db=0)
self.redis = redis.StrictRedis(**redis_conf)

def consume(self, messages):
for msg in messages:
key = self.key_prefix + str(time.time())
self.redis.set(key, json.dumps(msg), ex=self.expires_seconds)
16 changes: 15 additions & 1 deletion glutemulo/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
),
}

ingestor_enabled = config['ingestor_enabled'] = env.bool("INGESTOR_ENABLED", False)
ingestor_enabled = config["ingestor_enabled"] = env.bool("INGESTOR_ENABLED", False)
with env.prefixed("INGESTOR_"):
if ingestor_enabled:
config.update(
Expand Down Expand Up @@ -75,3 +75,17 @@
config[
"postgres_uri"
] = f'host={env("HOST")} port={env("PORT")} dbname={env("DBNAME")} user={env("USER")} password={env("PASSWORD")}'
elif backend == "redis" and ingestor_enabled:
with env.prefixed("REDIS_"):
config.update(
{
"redis_expire_seconds": env.int("EXPIRE_SECONDS", 15 * 60),
"redis_key_prefix": env("KEY_PREFIX", "gluto:"),
"redis_connection": {
"host": env.str("HOST", "redis"),
"port": env.int("PORT", 6379),
"password": env.str("PASSWORD", None),
"db": env.int("DB", 0),
},
}
)
26 changes: 19 additions & 7 deletions glutemulo/gluto.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
from glutemulo.backend.postgres import PostgresBackend as Backend

log.debug("Using POSTGRES backend")
elif config["backend"] == "redis":
from glutemulo.backend.redis import RedisBackend

log.debug("Using REDIS backend")
else:
from glutemulo.backend.logger import LoggerBackend as Backend

Expand All @@ -33,15 +37,23 @@
group_id=config["ingestor_group_id"],
auto_offset_reset=config["ingestor_auto_offset_reset"],
max_poll_records=config["ingestor_max_poll_records"],
fetch_min_bytes=config["ingestor_fetch_min_bytes"]
fetch_min_bytes=config["ingestor_fetch_min_bytes"],
)

backend = Backend(
config["ingestor_dataset"],
config["ingestor_dataset_columns"],
config["ingestor_dataset_ddl"],
config["ingestor_dataset_autocreate"],
)
if config["backend"] == "redis":
backend = RedisBackend(
config["redis_expire_seconds"],
config["redis_key_prefix"],
**config["redis_connection"],
)
else:

backend = Backend(
config["ingestor_dataset"],
config["ingestor_dataset_columns"],
config["ingestor_dataset_ddl"],
config["ingestor_dataset_autocreate"],
)
while True:
for messages in consumer.consume(config["ingestor_wait_interval"]):
if messages:
Expand Down

0 comments on commit 0f0cecd

Please sign in to comment.