Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to keep a long live connection for listening? #218

Open
rushwing opened this issue Oct 9, 2024 · 0 comments
Open

How to keep a long live connection for listening? #218

rushwing opened this issue Oct 9, 2024 · 0 comments

Comments

@rushwing
Copy link

rushwing commented Oct 9, 2024

version: v0.0.7
Python: v3.12.4

I tried to use the following code to listen for database change and do some audit log.
It should be always listening unless user quit the job.

How should I modify the code?

`
import asyncio
import asyncpg
import asyncpg_listen
from asyncpg_listen import ConnectFunc, NotificationOrTimeout, Timeout, NO_TIMEOUT
import logging
import json

class DbLogger:
    def __init__(self, db_config):
        self.conn = None
        self.db_config = db_config
        self.logger = None
        self.setup_logging()

    def setup_logging(self):
        logging.basicConfig(
            level=logging.INFO,
            format="%(asctime)s - %(levelname)s - %(message)s",
            handlers=[
                logging.FileHandler("db_logger.log"),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)

    async def connect(self):
        if self.conn is None:
            self.conn = await asyncpg.connect(**self.db_config)
        return self.conn

    async def listen_notifications(self):
        conn_func: ConnectFunc = asyncpg_listen.connect_func(**self.db_config)
        listener = asyncpg_listen.NotificationListener(conn_func)
        listener_task = asyncio.create_task(
            listener.run(
                {
                    "STATION_CREATED": self.on_station_created,
                    "STATION_UPDATED": self.on_station_updated,
                    "STATION_DELETED": self.on_station_deleted,
                },
                policy=asyncpg_listen.ListenPolicy.ALL,
                notification_timeout=NO_TIMEOUT
            )
        )

        self.logger.info("Listening for notifications...")
        return listener_task

    async def on_station_created(self, notification: NotificationOrTimeout) -> None:
        if isinstance(notification, Timeout):
            self.logger.info(f"Received Station Created timeout: {notification}")
            return
        data = json.loads(notification.payload)
        self.logger.info(f"Station Created: {data}")

    async def on_station_updated(self, notification: NotificationOrTimeout) -> None:
        if isinstance(notification, Timeout):
            self.logger.info(f"Received Station Updated timeout: {notification}")
            return
        data = json.loads(notification.payload)
        self.logger.info(f"Station Updated: {data}")

    async def on_station_deleted(self, notification: NotificationOrTimeout) -> None:
        if isinstance(notification, Timeout):
            self.logger.info(f"Received Station Deleted timeout: {notification}")
            return
        data = json.loads(notification.payload)
        self.logger.info(f"Station Deleted: {data}")


async def main():
    db_config = {
        "database": "test_stations",
        "user": "tsd",
        "password": "tsd",
        "host": "localhost"
    }

    logger = DbLogger(db_config)
    listener_task = logger.listen_notifications()
    await listener_task


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("Program terminated by user.")

`

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant