Skip to content

Commit

Permalink
Merge pull request #198 from Senzing/issue-195.dockter.1
Browse files Browse the repository at this point in the history
#195 Massaged code
  • Loading branch information
docktermj authored Feb 18, 2021
2 parents b5f24c0 + cca2b75 commit a629506
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 27 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
[markdownlint](https://dlaa.me/markdownlint/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [1.7.1] - 2021-02-18

### Added in 1.7.1

- Added `endpoint_url` in AWS SQS configuration.

## [1.7.0] - 2021-01-19

### Added in 1.7.0
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ ENV REFRESHED_AT=2020-09-24

LABEL Name="senzing/stream-loader" \
Maintainer="[email protected]" \
Version="1.6.3"
Version="1.7.1"

HEALTHCHECK CMD ["/app/healthcheck.sh"]

Expand Down
56 changes: 30 additions & 26 deletions stream-loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import configparser
import confluent_kafka
import datetime
import importlib
import json
import linecache
import logging
Expand All @@ -27,7 +28,6 @@
import sys
import threading
import time
import importlib

# Import Senzing libraries.

Expand All @@ -42,9 +42,9 @@
pass

__all__ = []
__version__ = "1.6.5" # See https://www.python.org/dev/peps/pep-0396/
__version__ = "1.7.1" # See https://www.python.org/dev/peps/pep-0396/
__date__ = '2018-10-29'
__updated__ = '2020-11-05'
__updated__ = '2021-02-18'

SENZING_PRODUCT_ID = "5001" # See https://github.com/Senzing/knowledge-base/blob/master/lists/senzing-product-ids.md
log_format = '%(asctime)s %(message)s'
Expand Down Expand Up @@ -830,6 +830,7 @@ def get_parser():
"728": "Could not do performance test. G2 generic exception. Error: {0}",
"729": "Could not do performance test. Error: {0}",
"730": "There are not enough safe characters to do the translation. Unsafe Characters: {0}; Safe Characters: {1}",
"750": "Invalid SQS URL config for {0}",
"880": "Unspecific error when {1}. Error: {0}",
"881": "Could not G2Engine.primeEngine with '{0}'. Error: {1}",
"885": "License has expired.",
Expand Down Expand Up @@ -1911,7 +1912,6 @@ def add_to_info_queue(self, jsonline):
time.sleep(retry_delay)
self.info_channel = self.connect(self.info_credentials, self.rabbitmq_info_host, self.rabbitmq_info_port, self.rabbitmq_info_queue, self.rabbitmq_heartbeat, self.rabbitmq_info_exchange, self.rabbitmq_info_routing_key)


def callback(self, channel, method, header, body):
logging.debug(message_debug(903, threading.current_thread().name, body))

Expand Down Expand Up @@ -2015,7 +2015,7 @@ def run(self):
except Exception as err:
exit_error(880, err, "channel.start_consuming()")

def connect(self, credentials, host_name, port, queue_name, heartbeat, exchange = None, routing_key = None):
def connect(self, credentials, host_name, port, queue_name, heartbeat, exchange=None, routing_key=None):
rabbitmq_passive_declare = self.config.get("rabbitmq_use_existing_entities")

try:
Expand All @@ -2037,34 +2037,35 @@ def connect(self, credentials, host_name, port, queue_name, heartbeat, exchange

return channel


# -----------------------------------------------------------------------------
# Class: ReadSqsWriteG2Thread
# -----------------------------------------------------------------------------


class ReadSqsWriteG2Thread(WriteG2Thread):

PATTERN = "^([^/]+://[^/]+)/"

def __init__(self, config, g2_engine, g2_configuration_manager, governor):
super().__init__(config, g2_engine, g2_configuration_manager, governor)
self.data_source = self.config.get('data_source')
self.entity_type = self.config.get('entity_type')
self.exit_on_empty_queue = self.config.get('exit_on_empty_queue')
self.failure_queue_url = config.get("sqs_failure_queue_url")
self.queue_url = config.get("sqs_queue_url")

pat = re.compile(self.PATTERN)
m = pat.match(self.queue_url)
if m is None:
raise RuntimeError("Invalid SQS URL config for {}".format(self.queue_url))
self.endpoint = m.group(1)
self.sqs = boto3.client("sqs", endpoint_url = self.endpoint)

self.sqs_dead_letter_queue_enabled = config.get('sqs_dead_letter_queue_enabled')
self.sqs_wait_time_seconds = config.get('sqs_wait_time_seconds')

# Create sqs object.
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs.html
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html

regular_expression = "^([^/]+://[^/]+)/"
regex = re.compile(regular_expression)
match = regex.match(self.queue_url)
if not match:
exit_error(750, self.queue_url)
endpoint_url = match.group(1)
self.sqs = boto3.client("sqs", endpoint_url=endpoint_url)

def add_to_failure_queue(self, jsonline):
'''
Overwrite superclass method.
Expand Down Expand Up @@ -2189,7 +2190,6 @@ def run(self):


class ReadSqsWriteG2WithInfoThread(WriteG2Thread):
PATTERN = "^([^/]+://[^/]+)/"

def __init__(self, config, g2_engine, g2_configuration_manager, governor):
super().__init__(config, g2_engine, g2_configuration_manager, governor)
Expand All @@ -2199,17 +2199,21 @@ def __init__(self, config, g2_engine, g2_configuration_manager, governor):
self.failure_queue_url = config.get("sqs_failure_queue_url")
self.info_queue_url = config.get("sqs_info_queue_url")
self.queue_url = config.get("sqs_queue_url")

pat = re.compile(self.PATTERN)
m = pat.match(self.queue_url)
if m is None:
raise RuntimeError("Invalid SQS URL config for {}".format(self.queue_url))
self.endpoint = m.group(1)
self.sqs = boto3.client("sqs", endpoint_url = self.endpoint)

self.sqs_dead_letter_queue_enabled = config.get('sqs_dead_letter_queue_enabled')
self.sqs_wait_time_seconds = config.get('sqs_wait_time_seconds')

# Create sqs object.
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs.html
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html

regular_expression = "^([^/]+://[^/]+)/"
regex = re.compile(regular_expression)
match = regex.match(self.queue_url)
if not match:
exit_error(750, self.queue_url)
endpoint_url = match.group(1)
self.sqs = boto3.client("sqs", endpoint_url=endpoint_url)

def add_to_failure_queue(self, jsonline):
'''
Overwrite superclass method.
Expand Down Expand Up @@ -2773,7 +2777,7 @@ def import_plugins(config):
for database_url in database_urls:
if database_url.startswith("postgresql://"):
message_error(567, database_urls)
exit_error(567, database_urls)
exit_error(567, database_urls)
pass

try:
Expand Down

0 comments on commit a629506

Please sign in to comment.