diff --git a/.env.example b/.env.example index 3782d8a..1132dc6 100644 --- a/.env.example +++ b/.env.example @@ -15,6 +15,14 @@ GLUTEMULO_CARTO_USER=pcaro GLUTEMULO_CARTO_API_KEY=AADDAADDSS GLUTEMULO_CARTO_ORG= +# when your backend in redis +GLUTEMULO_REDIS_HOST=redis +#GLUTEMULO_REDIS_PASSWORD=redis_password +GLUTEMULO_REDIS_PORT=6379 +#GLUTEMULO_REDIS_DB=0 +#GLUTEMULO_REDIS_KEY_PREFIX="gluto:" +#GLUTEMULO_REDIS_EXPIRE_SECONDS=900 + GLUTEMULO_INGESTOR_TOPIC=simple-topic GLUTEMULO_INGESTOR_BOOTSTRAP_SERVERS=192.168.240.41:9092 GLUTEMULO_INGESTOR_GROUP_ID=A group diff --git a/glutemulo/backend/redis.py b/glutemulo/backend/redis.py new file mode 100644 index 0000000..5a4284e --- /dev/null +++ b/glutemulo/backend/redis.py @@ -0,0 +1,16 @@ +import redis +import json +import time + + +class RedisBackend: + def __init__(self, expires_seconds, key_prefix="gluto:", **redis_conf): + self.expires_seconds = expires_seconds + self.key_prefix = key_prefix + # self.redis = redis.StrictRedis(host='localhost', port=6379, db=0) + self.redis = redis.StrictRedis(**redis_conf) + + def consume(self, messages): + for msg in messages: + key = self.key_prefix + str(time.time()) + self.redis.set(key, json.dumps(msg), ex=self.expires_seconds) diff --git a/glutemulo/config.py b/glutemulo/config.py index dc57c52..65df2d5 100644 --- a/glutemulo/config.py +++ b/glutemulo/config.py @@ -21,7 +21,7 @@ ), } - ingestor_enabled = config['ingestor_enabled'] = env.bool("INGESTOR_ENABLED", False) + ingestor_enabled = config["ingestor_enabled"] = env.bool("INGESTOR_ENABLED", False) with env.prefixed("INGESTOR_"): if ingestor_enabled: config.update( @@ -75,3 +75,17 @@ config[ "postgres_uri" ] = f'host={env("HOST")} port={env("PORT")} dbname={env("DBNAME")} user={env("USER")} password={env("PASSWORD")}' + elif backend == "redis" and ingestor_enabled: + with env.prefixed("REDIS_"): + config.update( + { + "redis_expire_seconds": env.int("EXPIRE_SECONDS", 15 * 60), + "redis_key_prefix": env("KEY_PREFIX", "gluto:"), + "redis_connection": { + "host": env.str("HOST", "redis"), + "port": env.int("PORT", 6379), + "password": env.str("PASSWORD", None), + "db": env.int("DB", 0), + }, + } + ) diff --git a/glutemulo/gluto.py b/glutemulo/gluto.py index b766eb8..4a29417 100644 --- a/glutemulo/gluto.py +++ b/glutemulo/gluto.py @@ -13,6 +13,10 @@ from glutemulo.backend.postgres import PostgresBackend as Backend log.debug("Using POSTGRES backend") +elif config["backend"] == "redis": + from glutemulo.backend.redis import RedisBackend + + log.debug("Using REDIS backend") else: from glutemulo.backend.logger import LoggerBackend as Backend @@ -33,15 +37,23 @@ group_id=config["ingestor_group_id"], auto_offset_reset=config["ingestor_auto_offset_reset"], max_poll_records=config["ingestor_max_poll_records"], - fetch_min_bytes=config["ingestor_fetch_min_bytes"] + fetch_min_bytes=config["ingestor_fetch_min_bytes"], ) - backend = Backend( - config["ingestor_dataset"], - config["ingestor_dataset_columns"], - config["ingestor_dataset_ddl"], - config["ingestor_dataset_autocreate"], - ) + if config["backend"] == "redis": + backend = RedisBackend( + config["redis_expire_seconds"], + config["redis_key_prefix"], + **config["redis_connection"], + ) + else: + + backend = Backend( + config["ingestor_dataset"], + config["ingestor_dataset_columns"], + config["ingestor_dataset_ddl"], + config["ingestor_dataset_autocreate"], + ) while True: for messages in consumer.consume(config["ingestor_wait_interval"]): if messages: