diff --git a/CHANGELOG.md b/CHANGELOG.md index a368bb3..d54b892 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), [markdownlint](https://dlaa.me/markdownlint/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.7.1] - 2021-02-18 + +### Added in 1.7.1 + +- Added `endpoint_url` in AWS SQS configuration. + ## [1.7.0] - 2021-01-19 ### Added in 1.7.0 diff --git a/Dockerfile b/Dockerfile index fb5cdf3..3b8167a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,7 +5,7 @@ ENV REFRESHED_AT=2020-09-24 LABEL Name="senzing/stream-loader" \ Maintainer="support@senzing.com" \ - Version="1.6.3" + Version="1.7.1" HEALTHCHECK CMD ["/app/healthcheck.sh"] diff --git a/stream-loader.py b/stream-loader.py index 288de6b..e69a9ab 100755 --- a/stream-loader.py +++ b/stream-loader.py @@ -11,6 +11,7 @@ import configparser import confluent_kafka import datetime +import importlib import json import linecache import logging @@ -27,7 +28,6 @@ import sys import threading import time -import importlib # Import Senzing libraries. @@ -42,9 +42,9 @@ pass __all__ = [] -__version__ = "1.6.5" # See https://www.python.org/dev/peps/pep-0396/ +__version__ = "1.7.1" # See https://www.python.org/dev/peps/pep-0396/ __date__ = '2018-10-29' -__updated__ = '2020-11-05' +__updated__ = '2021-02-18' SENZING_PRODUCT_ID = "5001" # See https://github.com/Senzing/knowledge-base/blob/master/lists/senzing-product-ids.md log_format = '%(asctime)s %(message)s' @@ -830,6 +830,7 @@ def get_parser(): "728": "Could not do performance test. G2 generic exception. Error: {0}", "729": "Could not do performance test. Error: {0}", "730": "There are not enough safe characters to do the translation. Unsafe Characters: {0}; Safe Characters: {1}", + "750": "Invalid SQS URL config for {0}", "880": "Unspecific error when {1}. Error: {0}", "881": "Could not G2Engine.primeEngine with '{0}'. Error: {1}", "885": "License has expired.", @@ -1911,7 +1912,6 @@ def add_to_info_queue(self, jsonline): time.sleep(retry_delay) self.info_channel = self.connect(self.info_credentials, self.rabbitmq_info_host, self.rabbitmq_info_port, self.rabbitmq_info_queue, self.rabbitmq_heartbeat, self.rabbitmq_info_exchange, self.rabbitmq_info_routing_key) - def callback(self, channel, method, header, body): logging.debug(message_debug(903, threading.current_thread().name, body)) @@ -2015,7 +2015,7 @@ def run(self): except Exception as err: exit_error(880, err, "channel.start_consuming()") - def connect(self, credentials, host_name, port, queue_name, heartbeat, exchange = None, routing_key = None): + def connect(self, credentials, host_name, port, queue_name, heartbeat, exchange=None, routing_key=None): rabbitmq_passive_declare = self.config.get("rabbitmq_use_existing_entities") try: @@ -2037,7 +2037,6 @@ def connect(self, credentials, host_name, port, queue_name, heartbeat, exchange return channel - # ----------------------------------------------------------------------------- # Class: ReadSqsWriteG2Thread # ----------------------------------------------------------------------------- @@ -2045,8 +2044,6 @@ def connect(self, credentials, host_name, port, queue_name, heartbeat, exchange class ReadSqsWriteG2Thread(WriteG2Thread): - PATTERN = "^([^/]+://[^/]+)/" - def __init__(self, config, g2_engine, g2_configuration_manager, governor): super().__init__(config, g2_engine, g2_configuration_manager, governor) self.data_source = self.config.get('data_source') @@ -2054,17 +2051,21 @@ def __init__(self, config, g2_engine, g2_configuration_manager, governor): self.exit_on_empty_queue = self.config.get('exit_on_empty_queue') self.failure_queue_url = config.get("sqs_failure_queue_url") self.queue_url = config.get("sqs_queue_url") - - pat = re.compile(self.PATTERN) - m = pat.match(self.queue_url) - if m is None: - raise RuntimeError("Invalid SQS URL config for {}".format(self.queue_url)) - self.endpoint = m.group(1) - self.sqs = boto3.client("sqs", endpoint_url = self.endpoint) - self.sqs_dead_letter_queue_enabled = config.get('sqs_dead_letter_queue_enabled') self.sqs_wait_time_seconds = config.get('sqs_wait_time_seconds') + # Create sqs object. + # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs.html + # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html + + regular_expression = "^([^/]+://[^/]+)/" + regex = re.compile(regular_expression) + match = regex.match(self.queue_url) + if not match: + exit_error(750, self.queue_url) + endpoint_url = match.group(1) + self.sqs = boto3.client("sqs", endpoint_url=endpoint_url) + def add_to_failure_queue(self, jsonline): ''' Overwrite superclass method. @@ -2189,7 +2190,6 @@ def run(self): class ReadSqsWriteG2WithInfoThread(WriteG2Thread): - PATTERN = "^([^/]+://[^/]+)/" def __init__(self, config, g2_engine, g2_configuration_manager, governor): super().__init__(config, g2_engine, g2_configuration_manager, governor) @@ -2199,17 +2199,21 @@ def __init__(self, config, g2_engine, g2_configuration_manager, governor): self.failure_queue_url = config.get("sqs_failure_queue_url") self.info_queue_url = config.get("sqs_info_queue_url") self.queue_url = config.get("sqs_queue_url") - - pat = re.compile(self.PATTERN) - m = pat.match(self.queue_url) - if m is None: - raise RuntimeError("Invalid SQS URL config for {}".format(self.queue_url)) - self.endpoint = m.group(1) - self.sqs = boto3.client("sqs", endpoint_url = self.endpoint) - self.sqs_dead_letter_queue_enabled = config.get('sqs_dead_letter_queue_enabled') self.sqs_wait_time_seconds = config.get('sqs_wait_time_seconds') + # Create sqs object. + # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs.html + # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html + + regular_expression = "^([^/]+://[^/]+)/" + regex = re.compile(regular_expression) + match = regex.match(self.queue_url) + if not match: + exit_error(750, self.queue_url) + endpoint_url = match.group(1) + self.sqs = boto3.client("sqs", endpoint_url=endpoint_url) + def add_to_failure_queue(self, jsonline): ''' Overwrite superclass method. @@ -2773,7 +2777,7 @@ def import_plugins(config): for database_url in database_urls: if database_url.startswith("postgresql://"): message_error(567, database_urls) - exit_error(567, database_urls) + exit_error(567, database_urls) pass try: