From ab463116dbd9a36f4ef1557c2e05e915f34f4d93 Mon Sep 17 00:00:00 2001 From: Muhammad Arif <40067075+thearifismail@users.noreply.github.com> Date: Tue, 10 Dec 2024 09:08:27 -0500 Subject: [PATCH] RHINENG-12670: Add script for publishing hosts (#2109) * Add script for publishing hosts rh-pre-commit.version: 2.2.0 rh-pre-commit.check-secrets: ENABLED * responded to Chris H comments rh-pre-commit.version: 2.2.0 rh-pre-commit.check-secrets: ENABLED * add replication slots checking and inv_publish_hosts script to Dockerfile rh-pre-commit.version: 2.2.0 rh-pre-commit.check-secrets: ENABLED * Update the name of inv_publish_hosts.py file rh-pre-commit.version: 2.2.0 rh-pre-commit.check-secrets: ENABLED * RHINENG-13638: use env var for number of kafka partitions (#2059) * use env var for number of kafka partitions rh-pre-commit.version: 2.2.0 rh-pre-commit.check-secrets: ENABLED * Fix a var and provide missing descriptions rh-pre-commit.version: 2.2.0 rh-pre-commit.check-secrets: ENABLED * Fixed the spellings of payload tracker topics rh-pre-commit.version: 2.2.0 rh-pre-commit.check-secrets: ENABLED * Update deploy/clowdapp.yml Co-authored-by: Asa Price * Fixed descriptions following Asa's recommendations rh-pre-commit.version: 2.2.0 rh-pre-commit.check-secrets: ENABLED * Remove NUMBER_OF_KAFKA_EVENT_TOPIC_PARTITIONS from within the app rh-pre-commit.version: 2.2.0 rh-pre-commit.check-secrets: ENABLED * Removed kafka event topic partitions rh-pre-commit.version: 2.2.0 rh-pre-commit.check-secrets: ENABLED * Remove kafka partition numbers from all podspecs rh-pre-commit.version: 2.2.0 rh-pre-commit.check-secrets: ENABLED * Respond to comments rh-pre-commit.version: 2.2.0 rh-pre-commit.check-secrets: ENABLED * Switch back to integer values for number of Kafka partitions rh-pre-commit.version: 2.2.0 rh-pre-commit.check-secrets: ENABLED * Reslolved conflicts rh-pre-commit.version: 2.2.0 rh-pre-commit.check-secrets: ENABLED * Added double brackets to the variables for partition numbers rh-pre-commit.version: 2.2.0 rh-pre-commit.check-secrets: ENABLED * Update the tempalate apiversion rh-pre-commit.version: 2.2.0 rh-pre-commit.check-secrets: ENABLED * rh-pre-commit.version: 2.2.0 rh-pre-commit.check-secrets: ENABLED * Roved old vars put back in by code rebasing rh-pre-commit.version: 2.2.0 rh-pre-commit.check-secrets: ENABLED * Update deploy/clowdapp.yml Co-authored-by: Asa Price * Update deploy/clowdapp.yml Co-authored-by: Asa Price * Update deploy/clowdapp.yml Co-authored-by: Asa Price * use env var for number of kafka partitions rh-pre-commit.version: 2.2.0 rh-pre-commit.check-secrets: ENABLED * Resolve code conflicts introduced by new merges rh-pre-commit.version: 2.2.0 rh-pre-commit.check-secrets: ENABLED --------- Co-authored-by: Asa Price --------- Co-authored-by: Asa Price --- Dockerfile | 1 + deploy/clowdapp.yml | 44 +++++++++++++++++++++ inv_publish_hosts.py | 91 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 136 insertions(+) create mode 100755 inv_publish_hosts.py diff --git a/Dockerfile b/Dockerfile index 0e44c0050..7d9943dfd 100644 --- a/Dockerfile +++ b/Dockerfile @@ -28,6 +28,7 @@ COPY gunicorn.conf.py gunicorn.conf.py COPY host_reaper.py host_reaper.py COPY host_synchronizer.py host_synchronizer.py COPY inv_mq_service.py inv_mq_service.py +COPY inv_publish_hosts.py inv_publish_hosts.py COPY inv_export_service.py inv_export_service.py COPY logconfig.yaml logconfig.yaml COPY manage.py manage.py diff --git a/deploy/clowdapp.yml b/deploy/clowdapp.yml index adadd5a18..9ac2b6e5d 100644 --- a/deploy/clowdapp.yml +++ b/deploy/clowdapp.yml @@ -932,6 +932,37 @@ objects: requests: cpu: ${CPU_REQUEST_REAPER} memory: ${MEMORY_REQUEST_REAPER} + - name: syndicator + schedule: ${SYNDICATOR_CRON_SCHEDULE} + concurrencyPolicy: "Forbid" + suspend: ${{SYNDICATOR_SUSPEND}} + restartPolicy: Never + podSpec: + image: ${IMAGE}:${IMAGE_TAG} + args: ["./inv_publish_hosts.py"] + env: + - name: INVENTORY_LOG_LEVEL + value: ${LOG_LEVEL} + - name: INVENTORY_DB_SSL_MODE + value: ${INVENTORY_DB_SSL_MODE} + - name: INVENTORY_DB_SSL_CERT + value: ${INVENTORY_DB_SSL_CERT} + - name: NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: CLOWDER_ENABLED + value: "true" + - name: INVENTORY_DB_SCHEMA + value: "${INVENTORY_DB_SCHEMA}" + resources: + limits: + cpu: ${CPU_LIMIT_SYNDICATOR} + memory: ${MEMORY_LIMIT_SYNDICATOR} + requests: + cpu: ${CPU_REQUEST_SYNDICATOR} + memory: ${MEMORY_REQUEST_SYNDICATOR} + - name: sp-validator schedule: '@hourly' suspend: ${{SP_VALIDATOR_SUSPEND}} @@ -1373,6 +1404,15 @@ parameters: - name: MEMORY_LIMIT_SYNCHRONIZER value: 512Mi +- name: CPU_REQUEST_SYNDICATOR + value: 250m +- name: CPU_LIMIT_SYNDICATOR + value: 500m +- name: MEMORY_REQUEST_SYNDICATOR + value: 256Mi +- name: MEMORY_LIMIT_SYNDICATOR + value: 512Mi + - name: CPU_REQUEST_EXPORT_SVC value: 250m - name: CPU_LIMIT_EXPORT_SVC @@ -1513,6 +1553,10 @@ parameters: value: 'true' - name: REAPER_SUSPEND value: 'true' +- name: SYNDICATOR_SUSPEND + value: 'false' +- name: SYNDICATOR_CRON_SCHEDULE + value: '*/5 * * * *' - name: KAFKA_SP_VALIDATOR_MAX_MESSAGES value: '10000' - name: TENANT_TRANSLATOR_HOST diff --git a/inv_publish_hosts.py b/inv_publish_hosts.py new file mode 100755 index 000000000..110d376cc --- /dev/null +++ b/inv_publish_hosts.py @@ -0,0 +1,91 @@ +#!/usr/bin/python +import sys +from functools import partial + +from sqlalchemy import create_engine +from sqlalchemy import text as sa_text +from sqlalchemy.orm import sessionmaker + +from app import create_app +from app.config import Config +from app.environment import RuntimeEnvironment +from app.logging import get_logger +from lib.handlers import ShutdownHandler +from lib.handlers import register_shutdown + +__all__ = ("main", "run") + +LOGGER_NAME = "hosts-syndicator" +RUNTIME_ENVIRONMENT = RuntimeEnvironment.JOB + +PUBLICATION_NAME = "hbi_hosts_pub" +CHECK_PUBLICATION = f"SELECT EXISTS(SELECT * FROM pg_catalog.pg_publication WHERE pubname = '{PUBLICATION_NAME}')" +CREATE_PUBLICATION = f"CREATE PUBLICATION {PUBLICATION_NAME} FOR TABLE hbi.hosts \ +WHERE (hbi.hosts.canonical_facts->'insights_id' IS NOT NULL)" +CHECK_REPLICATION_SLOTS = "SELECT slot_name, active FROM pg_replication_slots" + + +def _init_config(): + config = Config(RUNTIME_ENVIRONMENT) + config.log_configuration() + return config + + +def _init_db(config): + engine = create_engine(config.db_uri) + return sessionmaker(bind=engine) + + +def _excepthook(logger, type, value, traceback): + logger.exception("Inventory migration failed", exc_info=value) + + +def run(logger, session, application): + with application.app.app_context(): + logger.info(f"Checking for publication using the following SQL statement:\n\t{CHECK_PUBLICATION}") + result = session.execute(sa_text(CHECK_PUBLICATION)) + found = result.cursor.fetchone()[0] + if found: + logger.info(f'Publication "{PUBLICATION_NAME}" found!') + else: + logger.info(f'Creating publication "{PUBLICATION_NAME}" using \n\t{CREATE_PUBLICATION}') + try: + session.execute(sa_text(CREATE_PUBLICATION)) + + # check for inactive replication_slots + replication_slots = session.execute(sa_text(CHECK_REPLICATION_SLOTS)).all() + inactive = 0 + for slot in replication_slots: + slot_name, active = slot + if not active: + inactive += 1 + logger.error(f"Replication slot named {slot_name} is not active") + if inactive > 0: + exit(1) + except Exception as e: + session.rollback() + logger.error(f'Error encountered when creating the publication "{PUBLICATION_NAME}" \n\t{e}') + raise e + else: + session.commit() + logger.info(f'Publication "{PUBLICATION_NAME}" created!!!') + + +def main(logger): + config = _init_config() + application = create_app(RUNTIME_ENVIRONMENT) + + Session = _init_db(config) + session = Session() + register_shutdown(session.get_bind().dispose, "Closing database") + + shutdown_handler = ShutdownHandler() + shutdown_handler.register() + run(logger, session, application) + + +if __name__ == "__main__": + logger = get_logger(LOGGER_NAME) + sys.excepthook = partial(_excepthook, logger) + + main(logger)