Skip to content

Commit

Permalink
feat: added pg_notify
Browse files Browse the repository at this point in the history
Added an action to send notifications via postgres LISTEN/NOTIFY
  • Loading branch information
mkanoor committed Nov 9, 2023
1 parent f12f82c commit e652a64
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 7 deletions.
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ RUN pip install -U pip \
aiokafka \
watchdog \
azure-servicebus \
psycopg \
&& ansible-galaxy collection install ansible.eda

RUN bash -c "if [ $DEVEL_COLLECTION_LIBRARY -ne 0 ]; then \
ansible-galaxy collection install git+https://github.com/ansible/event-driven-ansible.git --force; fi"
ansible-galaxy collection install git+https://github.com/mkanoor/event-driven-ansible.git,pg_listener --force; fi"

COPY . $WORKDIR
RUN chown -R $USER_ID ./
Expand Down
102 changes: 102 additions & 0 deletions ansible_rulebook/action/pg_notify.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Copyright 2023 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import logging
import uuid

import psycopg
import xxhash
from psycopg import OperationalError

from .control import Control
from .helper import Helper
from .metadata import Metadata

logger = logging.getLogger(__name__)
MAX_MESSAGE_LENGTH = 7 * 1024


class PGNotify:
"""The PGNotify action sends an event to a PG Pub Sub Channel
Needs
dbname
host
user
password
event
"""

def __init__(self, metadata: Metadata, control: Control, **action_args):
self.helper = Helper(metadata, control, "pg_notify")
self.action_args = action_args

async def __call__(self):
try:
conn = psycopg.connect(
host=self.action_args["host"],
dbname=self.action_args["dbname"],
user=self.action_args["user"],
password=self.action_args["password"],
autocommit=True,
)

cursor = conn.cursor()
if self.action_args.get("remove_meta", False):
event = self.action_args["event"].copy()
if "meta" in event:
event.pop("meta")
else:
event = self.action_args["event"]

payload = json.dumps(event)
message_length = len(payload)
if message_length >= MAX_MESSAGE_LENGTH:
xx_hash = xxhash.xxh32(payload.encode("utf-8")).hexdigest()
logger.info("Message length exceeds, will chunk")
message_uuid = str(uuid.uuid4())
number_of_chunks = int(message_length / MAX_MESSAGE_LENGTH) + 1
chunked = {
"chunked_msg_uuid": message_uuid,
"number_of_chunks": number_of_chunks,
"message_length": message_length,
"xx_hash": xx_hash,
}
logger.info(f"Chunk info {message_uuid}")
logger.info(f"Number of chunks {number_of_chunks}")
logger.info(f"Total data size {message_length}")
logger.info(f"XX Hash {xx_hash}")

sequence = 1
for i in range(0, message_length, MAX_MESSAGE_LENGTH):
chunked["chunk"] = payload[i : i + MAX_MESSAGE_LENGTH]
chunked["sequence"] = sequence
sequence += 1
cursor.execute(
f"NOTIFY {self.action_args['channel']}, "
f"'{json.dumps(chunked)}';"
)
else:
cursor.execute(
f"NOTIFY {self.action_args['channel']}, '{payload}';"
)
except OperationalError as e:
logger.error(f"PG Notify operational error {e}")
finally:
if cursor:
cursor.close()
if conn:
conn.close()

await self.helper.send_default_status()
2 changes: 2 additions & 0 deletions ansible_rulebook/rule_set_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from ansible_rulebook.action.debug import Debug
from ansible_rulebook.action.metadata import Metadata
from ansible_rulebook.action.noop import Noop
from ansible_rulebook.action.pg_notify import PGNotify
from ansible_rulebook.action.post_event import PostEvent
from ansible_rulebook.action.print_event import PrintEvent
from ansible_rulebook.action.retract_fact import RetractFact
Expand Down Expand Up @@ -74,6 +75,7 @@
"run_module": RunModule,
"run_job_template": RunJobTemplate,
"run_workflow_template": RunWorkflowTemplate,
"pg_notify": PGNotify,
}


Expand Down
47 changes: 47 additions & 0 deletions ansible_rulebook/schema/ruleset_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@
},
{
"$ref": "#/$defs/shutdown-action"
},
{
"$ref": "#/$defs/pg-notify-action"
}
]
}
Expand Down Expand Up @@ -241,6 +244,9 @@
},
{
"$ref": "#/$defs/shutdown-action"
},
{
"$ref": "#/$defs/pg-notify-action"
}
]
}
Expand Down Expand Up @@ -507,6 +513,47 @@
],
"additionalProperties": false
},
"pg-notify-action": {
"type": "object",
"properties": {
"pg_notify": {
"type": "object",
"properties": {
"dbname": {
"type": "string"
},
"host": {
"type": "string"
},
"user": {
"type": "string"
},
"password": {
"type": "string"
},
"channel": {
"type": "string"
},
"event": {
"type": "string"
}
},
"required": [
"dbname",
"host",
"user",
"password",
"channel",
"event"
],
"additionalProperties": false
}
},
"required": [
"pg_notify"
],
"additionalProperties": false
},
"post-event-action": {
"type": "object",
"properties": {
Expand Down
22 changes: 17 additions & 5 deletions ansible_rulebook/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@
from typing import Any, Dict, List, Optional, Tuple, Union

import ansible_runner
import jinja2
from jinja2.nativetypes import NativeTemplate
from jinja2.nativetypes import NativeEnvironment
from packaging import version
from packaging.version import InvalidVersion

Expand All @@ -43,6 +42,17 @@
EDA_BUILTIN_FILTER_PREFIX = "eda.builtin."


def j2_getenv(name, default=None):
"""
Returns the value of a given environment variable name.
Usage: "{{ getenv(name) }}"
Output: the value of the environment variable
"""
return os.environ[name]


def get_horizontal_rule(character):
try:
return character * int(os.get_terminal_size()[0])
Expand All @@ -52,9 +62,11 @@ def get_horizontal_rule(character):

def render_string(value: str, context: Dict) -> str:
if "{{" in value and "}}" in value:
return NativeTemplate(value, undefined=jinja2.StrictUndefined).render(
context
)
env = NativeEnvironment()
env.filters["getenv"] = j2_getenv

tmpl = env.from_string(value)
return tmpl.render(context)

return value

Expand Down
4 changes: 3 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ install_requires =
ansible-runner
websockets
drools_jpy == 0.3.8
watchdog
watchdog
psycopg
xxhash

[options.packages.find]
include =
Expand Down

0 comments on commit e652a64

Please sign in to comment.