If you are beginning your journey with Senzing, please start with Senzing Quick Start guides.
You are in the Senzing Garage where projects are "tinkered" on. Although this GitHub repository may help you understand an approach to using Senzing, it's not considered to be "production ready" and is not considered to be part of the Senzing product. Heck, it may not even be appropriate for your application of Senzing!
Pulls JSON records from a queue and inserts into Senzing Engine.
The stream-loader.py python script consumes data from various sources (Kafka, RabbitMQ, AWS SQS)
and publishes it to Senzing.
The senzing/stream-loader
docker image is a wrapper for use in docker formations (e.g. docker-compose, kubernetes).
To see all of the subcommands, run:
$ ./stream-loader.py --help
usage: stream-loader.py [-h]
{kafka,kafka-withinfo,rabbitmq,rabbitmq-withinfo,sleep,sqs,sqs-withinfo,url,version,docker-acceptance-test}
...
Load Senzing from a stream. For more information, see
https://github.com/senzing-garage/stream-loader
positional arguments:
{kafka,kafka-withinfo,rabbitmq,rabbitmq-withinfo,sleep,sqs,sqs-withinfo,url,version,docker-acceptance-test}
Subcommands (SENZING_SUBCOMMAND):
kafka Read JSON Lines from Apache Kafka topic.
kafka-withinfo Read JSON Lines from Apache Kafka topic. Return info to a queue.
rabbitmq Read JSON Lines from RabbitMQ queue.
rabbitmq-withinfo Read JSON Lines from RabbitMQ queue. Return info to a queue.
sleep Do nothing but sleep. For Docker testing.
sqs Read JSON Lines from AWS SQS queue.
sqs-withinfo Read JSON Lines from AWS SQS queue. Return info to a queue.
url Read JSON Lines from URL-addressable file.
version Print version of program.
docker-acceptance-test For Docker acceptance testing.
optional arguments:
-h, --help show this help message and exit
- Preamble
- Expectations
- Demonstrate using Command Line Interface
- Demonstrate using Docker
- Directives
- Configuration
- License
- References
At Senzing, we strive to create GitHub documentation in a "don't make me think" style. For the most part, instructions are copy and paste. Whenever thinking is needed, it's marked with a "thinking" icon 🤔. Whenever customization is needed, it's marked with a "pencil" icon ✏️. If the instructions are not clear, please let us know by opening a new Documentation issue describing where we can improve. Now on with the show...
- 🤔 - A "thinker" icon means that a little extra thinking may be required. Perhaps there are some choices to be made. Perhaps it's an optional step.
- ✏️ - A "pencil" icon means that the instructions may need modification before performing.
⚠️ - A "warning" icon means that something tricky is happening, so pay attention.
- Space: This repository and demonstration require 6 GB free disk space.
- Time: Budget 40 minutes to get the demonstration up-and-running, depending on CPU and network speeds.
- Background knowledge: This repository assumes a working knowledge of:
🤔 The following tasks need to be complete before proceeding. These are "one-time tasks" which may already have been completed.
- Install system dependencies:
- Use
apt
based installation for Debian, Ubuntu and others- See apt-packages.txt for list
- Use
yum
based installation for Red Hat, CentOS, openSuse and others.- See yum-packages.txt for list
- Use
- Install Python dependencies:
- See requirements.txt for list
- The following software programs need to be installed:
- 🤔 Optional: Some databases need additional support.
For other databases, this step may be skipped.
- Db2: See Support Db2.
- MS SQL: See Support MS SQL.
-
Get a local copy of template-python.py. Example:
-
✏️ Specify where to download file. Example:
export SENZING_DOWNLOAD_FILE=~/stream-loader.py
-
Download file. Example:
curl -X GET \ --output ${SENZING_DOWNLOAD_FILE} \ https://raw.githubusercontent.com/Senzing/stream-loader/main/stream-loader.py
-
Make file executable. Example:
chmod +x ${SENZING_DOWNLOAD_FILE}
-
-
🤔 Alternative: The entire git repository can be downloaded by following instructions at Clone repository
-
✏️ Identify the Senzing
g2
directory. Example:export SENZING_G2_DIR=/opt/senzing/g2
-
Here's a simple test to see if
SENZING_G2_DIR
is correct. The following command should return file contents. Example:cat ${SENZING_G2_DIR}/g2BuildVersion.json
-
-
Set common environment variables Example:
export PYTHONPATH=${SENZING_G2_DIR}/python
-
🤔 Set operating system specific environment variables. Choose one of the options.
-
Run the command. Example:
${SENZING_DOWNLOAD_FILE} --help
-
For more examples of use, see Examples of CLI.
🤔 The following tasks need to be complete before proceeding. These are "one-time tasks" which may already have been completed.
- The following software programs need to be installed:
- Configure Senzing database using Docker
🤔 Optional: Some databases need additional support. For other databases, these steps may be skipped.
- Db2: See
Support Db2
instructions to set
SENZING_OPT_IBM_DIR_PARAMETER
. - MS SQL: See
Support MS SQL
instructions to set
SENZING_OPT_MICROSOFT_DIR_PARAMETER
.
🤔 Optional: Use if storing data in an external database. If not specified, the internal SQLite database will be used.
-
✏️ Specify database. Example:
export DATABASE_PROTOCOL=postgresql export DATABASE_USERNAME=postgres export DATABASE_PASSWORD=postgres export DATABASE_HOST=senzing-postgresql export DATABASE_PORT=5432 export DATABASE_DATABASE=G2
-
Construct Database URL. Example:
export SENZING_DATABASE_URL="${DATABASE_PROTOCOL}://${DATABASE_USERNAME}:${DATABASE_PASSWORD}@${DATABASE_HOST}:${DATABASE_PORT}/${DATABASE_DATABASE}"
-
Construct parameter for
docker run
. Example:export SENZING_DATABASE_URL_PARAMETER="--env SENZING_DATABASE_URL=${SENZING_DATABASE_URL}"
Although the Docker run
command looks complex,
it accounts for all of the optional variations described above.
Unset *_PARAMETER
environment variables have no effect on the
docker run
command and may be removed or remain.
-
✏️ Set environment variables. Example:
export SENZING_DATA_SOURCE=TEST export SENZING_KAFKA_BOOTSTRAP_SERVER=senzing-kafka:9092 export SENZING_KAFKA_TOPIC=senzing-kafka-topic export SENZING_MONITORING_PERIOD=60 export SENZING_SUBCOMMAND=kafka
-
Run Docker container. Example:
sudo docker run \ --env SENZING_DATA_SOURCE="${SENZING_DATA_SOURCE}" \ --env SENZING_KAFKA_BOOTSTRAP_SERVER="${SENZING_KAFKA_BOOTSTRAP_SERVER}" \ --env SENZING_KAFKA_TOPIC="${SENZING_KAFKA_TOPIC}" \ --env SENZING_MONITORING_PERIOD="${SENZING_MONITORING_PERIOD}" \ --env SENZING_SUBCOMMAND="${SENZING_SUBCOMMAND}" \ --interactive \ --rm \ --tty \ ${SENZING_DATABASE_URL_PARAMETER} \ ${SENZING_NETWORK_PARAMETER} \ ${SENZING_OPT_IBM_DIR_PARAMETER} \ ${SENZING_OPT_MICROSOFT_DIR_PARAMETER} \ ${SENZING_RUNAS_USER_PARAMETER} \ senzing/stream-loader
-
For more examples of use, see Examples of Docker.
The stream loader will inspect each incoming JSON message for a "senzingStreamLoader" JSON property name. The "senzingStreamLoader" property value is used to direct the actions of the stream loader. The "senzingStreamLoader" property will be removed from the JSON message before the message is sent to the Senzing Engine.
-
The format of the "senzingStreamLoader" property value is:
{ "action": "<action-identifier>" }
-
The supported "action-identifiers" are:
-
In a message, it looks like this example:
{"senzingStreamLoader": {"action": "deleteRecordWithInfo"}, "DATA_SOURCE": "TEST", "RECORD_ID": "242131119", ...}
-
If no directive exists, the action taken by the stream-loader will be
addRecord
oraddRecordWithInfo
, depending on the stream-loader.py subcommand. For subcommands, see Overview.
Configuration values specified by environment variable or command line parameter.
- AWS_ACCESS_KEY_ID
- AWS_DEFAULT_REGION
- AWS_SECRET_ACCESS_KEY
- PYTHONPATH
- SENZING_AZURE_QUEUE_CONNECTION_STRING
- SENZING_AZURE_FAILURE_CONNECTION_STRING
- SENZING_AZURE_FAILURE_QUEUE_NAME
- SENZING_AZURE_INFO_CONNECTION_STRING
- SENZING_AZURE_INFO_QUEUE_NAME
- SENZING_AZURE_QUEUE_NAME
- SENZING_CONFIG_PATH
- SENZING_CONFIGURATION_CHECK_FREQUENCY
- SENZING_DATABASE_URL
- SENZING_DEBUG
- SENZING_DELAY_IN_SECONDS
- SENZING_DELAY_RANDOMIZED
- SENZING_ENGINE_CONFIGURATION_JSON
- SENZING_EXIT_ON_EMPTY_QUEUE
- SENZING_EXIT_ON_EXCEPTION
- SENZING_EXPIRATION_WARNING_IN_DAYS
- SENZING_INPUT_URL
- SENZING_KAFKA_BOOTSTRAP_SERVER
- SENZING_KAFKA_CONFIGURATION
- SENZING_KAFKA_FAILURE_BOOTSTRAP_SERVER
- SENZING_KAFKA_FAILURE_CONFIGURATION
- SENZING_KAFKA_FAILURE_TOPIC
- SENZING_KAFKA_GROUP
- SENZING_KAFKA_INFO_BOOTSTRAP_SERVER
- SENZING_KAFKA_INFO_CONFIGURATION
- SENZING_KAFKA_INFO_TOPIC
- SENZING_KAFKA_TOPIC
- SENZING_LICENSE_BASE64_ENCODED
- SENZING_LOG_LEVEL
- SENZING_LOG_LICENSE_PERIOD_IN_SECONDS
- SENZING_MONITORING_CHECK_FREQUENCY_IN_SECONDS
- SENZING_MONITORING_PERIOD_IN_SECONDS
- SENZING_NETWORK
- SENZING_PRIME_ENGINE
- SENZING_PSTACK_PID
- SENZING_QUEUE_MAX
- SENZING_RABBITMQ_EXCHANGE
- SENZING_RABBITMQ_FAILURE_EXCHANGE
- SENZING_RABBITMQ_FAILURE_HOST
- SENZING_RABBITMQ_FAILURE_PASSWORD
- SENZING_RABBITMQ_FAILURE_PORT
- SENZING_RABBITMQ_FAILURE_QUEUE
- SENZING_RABBITMQ_FAILURE_ROUTING_KEY
- SENZING_RABBITMQ_FAILURE_USERNAME
- SENZING_RABBITMQ_FAILURE_VIRTUAL_HOST
- SENZING_RABBITMQ_HEARTBEAT_IN_SECONDS
- SENZING_RABBITMQ_HEARTBEAT_IN_SECONDS
- SENZING_RABBITMQ_HOST
- SENZING_RABBITMQ_INFO_EXCHANGE
- SENZING_RABBITMQ_INFO_HOST
- SENZING_RABBITMQ_INFO_PASSWORD
- SENZING_RABBITMQ_INFO_PORT
- SENZING_RABBITMQ_INFO_QUEUE
- SENZING_RABBITMQ_INFO_ROUTING_KEY
- SENZING_RABBITMQ_INFO_USERNAME
- SENZING_RABBITMQ_INFO_VIRTUAL_HOST
- SENZING_RABBITMQ_PASSWORD
- SENZING_RABBITMQ_PORT
- SENZING_RABBITMQ_PREFETCH_COUNT
- SENZING_RABBITMQ_QUEUE
- SENZING_RABBITMQ_RECONNECT_DELAY_IN_SECONDS
- SENZING_RABBITMQ_RECONNECT_NUMBER_OF_RETRIES
- SENZING_RABBITMQ_USE_EXISTING_ENTITIES
- SENZING_RABBITMQ_USERNAME
- SENZING_RABBITMQ_VIRTUAL_HOST
- SENZING_RESOURCE_PATH
- SENZING_SKIP_DATABASE_PERFORMANCE_TEST
- SENZING_SKIP_GOVERNOR
- SENZING_SKIP_INFO_FILTER
- SENZING_SLEEP_TIME_IN_SECONDS
- SENZING_SQS_FAILURE_QUEUE_URL
- SENZING_SQS_INFO_QUEUE_DELAY_SECONDS
- SENZING_SQS_INFO_QUEUE_URL
- SENZING_SQS_QUEUE_URL
- SENZING_SQS_WAIT_TIME_SECONDS
- SENZING_STREAM_LOADER_DIRECTIVE_NAME
- SENZING_SUBCOMMAND
- SENZING_SUPPORT_PATH
- SENZING_THREADS_PER_PROCESS
View license information for the software container in this Docker image. Note that this license does not permit further distribution.
This Docker image may also contain software from the Senzing GitHub community under the Apache License 2.0.
Further, as with all Docker images, this likely also contains other software which may be under other licenses (such as Bash, etc. from the base distribution, along with any direct or indirect dependencies of the primary software being contained).
As for any pre-built image usage, it is the image user's responsibility to ensure that any use of this image complies with any relevant licenses for all software contained within.
- Development
- Errors
- Examples
- Related artifacts: