Skip to content

Commit

Permalink
feat: try redis...
Browse files Browse the repository at this point in the history
  • Loading branch information
zumuta committed Jan 14, 2024
1 parent 3d72af9 commit c2cc2cc
Show file tree
Hide file tree
Showing 13 changed files with 84 additions and 62 deletions.
13 changes: 6 additions & 7 deletions backend/src/kwai/api/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
import jwt
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from faststream.rabbit import RabbitBroker
from faststream.redis import RedisBroker
from faststream.security import SASLPlaintext
from jwt import ExpiredSignatureError

from kwai.core.db.database import Database
Expand Down Expand Up @@ -54,13 +55,11 @@ async def get_publisher(
settings=Depends(get_settings),
) -> AsyncGenerator[Publisher, None]:
"""Get the publisher dependency."""
broker = RabbitBroker(
host=settings.rabbitmq.host,
port=settings.rabbitmq.port,
login=settings.rabbitmq.user,
password=settings.rabbitmq.password,
virtualhost=settings.rabbitmq.vhost,
security = SASLPlaintext(
username="",
password="wazari",
)
broker = RedisBroker(url="redis://api.kwai.com:6379", security=security)
await broker.connect()
try:
yield FaststreamPublisher(broker)
Expand Down
8 changes: 3 additions & 5 deletions backend/src/kwai/core/events/faststream_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
See: https://faststream.airt.ai/latest/
"""
from faststream.rabbit import ExchangeType, RabbitBroker, RabbitExchange
from faststream.redis import RedisBroker
from loguru import logger

from kwai.core.events.event import Event
Expand All @@ -16,16 +16,14 @@ class FaststreamPublisher(Publisher):
while the event name is the routing key.
"""

def __init__(self, broker: RabbitBroker):
def __init__(self, broker: RedisBroker):
self._broker = broker

async def publish(self, event: Event):
logger.info(
f"Publishing event {event.meta.name} to exchange {event.meta.module}"
)
exchange = RabbitExchange(name=event.meta.module, type=ExchangeType.TOPIC)
await self._broker.publish(
event.data,
routing_key=event.meta.name,
exchange=exchange,
stream=f"{event.meta.version}.{event.meta.module}.{event.meta.name}",
)
4 changes: 2 additions & 2 deletions backend/src/kwai/events/v1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
A version is used to handle changes of the event data. When a structure of an event
changes, a new version should be created when there are still old events to process.
"""
from faststream.rabbit import RabbitRouter
from faststream.redis import RedisRouter

from kwai.events.v1.identity import router as identity_router

router = RabbitRouter(prefix="v1.")
router = RedisRouter(prefix="v1.")
router.include_router(identity_router)
4 changes: 2 additions & 2 deletions backend/src/kwai/events/v1/identity/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
"""Module for defining the event router for the identity module."""
from faststream.rabbit import RabbitRouter
from faststream.redis import RedisRouter

from kwai.events.v1.identity.user_invitation_tasks import (
router as user_invitation_router,
)
from kwai.events.v1.identity.user_recovery_tasks import router as user_recovery_router

router = RabbitRouter(prefix="identity.")
router = RedisRouter(prefix="identity.")
router.include_router(user_invitation_router)
router.include_router(user_recovery_router)
16 changes: 4 additions & 12 deletions backend/src/kwai/events/v1/identity/user_invitation_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Any

from fast_depends import Depends
from faststream.rabbit import ExchangeType, RabbitExchange, RabbitQueue, RabbitRouter
from faststream.redis import RedisRouter
from loguru import logger

from kwai.core.domain.exceptions import UnprocessableException
Expand All @@ -29,19 +29,10 @@
UserInvitationNotFoundException,
)

router = RabbitRouter()
exchange = RabbitExchange(
name=UserInvitationCreatedEvent.meta.module, type=ExchangeType.TOPIC
)
router = RedisRouter()


@router.subscriber(
RabbitQueue(
UserInvitationCreatedEvent.meta.name,
routing_key=UserInvitationCreatedEvent.meta.name,
),
exchange,
)
@router.subscriber(stream=UserInvitationCreatedEvent.meta.name)
async def email_user_invitation_task(
event: dict[str, Any],
settings=Depends(get_settings),
Expand All @@ -50,6 +41,7 @@ async def email_user_invitation_task(
template_engine=Depends(create_template_engine),
):
"""Task for sending the user invitation email."""
print("First task!")
command = MailUserInvitationCommand(uuid=event["data"]["uuid"])

try:
Expand Down
15 changes: 3 additions & 12 deletions backend/src/kwai/events/v1/identity/user_recovery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Any

from fast_depends import Depends
from faststream.rabbit import ExchangeType, RabbitExchange, RabbitQueue, RabbitRouter
from faststream.redis import RedisRouter
from loguru import logger

from kwai.api.dependencies import create_database
Expand All @@ -26,19 +26,10 @@
UserRecoveryNotFoundException,
)

router = RabbitRouter()
exchange = RabbitExchange(
name=UserRecoveryCreatedEvent.meta.module, type=ExchangeType.TOPIC
)
router = RedisRouter()


@router.subscriber(
RabbitQueue(
UserRecoveryCreatedEvent.meta.name,
routing_key=UserRecoveryCreatedEvent.meta.name,
),
exchange,
)
@router.subscriber(stream=UserRecoveryCreatedEvent.meta.name)
async def email_user_recovery_task(
event: dict[str, Any],
settings=Depends(get_settings),
Expand Down
15 changes: 8 additions & 7 deletions backend/src/kwai/kwai_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import sys

from faststream import BaseMiddleware, FastStream
from faststream.rabbit import RabbitBroker
from faststream.redis import RedisBroker
from faststream.security import SASLPlaintext
from faststream.types import DecodedMessage
from loguru import logger

Expand Down Expand Up @@ -60,13 +61,13 @@ def log_format(record):
middlewares = []


broker = RabbitBroker(
host=settings.rabbitmq.host,
port=settings.rabbitmq.port,
login=settings.rabbitmq.user,
password=settings.rabbitmq.password,
virtualhost=settings.rabbitmq.vhost,
broker = RedisBroker(
url="redis://api.kwai.com:6379",
middlewares=middlewares,
security=SASLPlaintext(
username="",
password="wazari",
),
)
broker.include_router(router)
app = FastStream(broker)
44 changes: 32 additions & 12 deletions backend/src/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion backend/src/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ cryptography = "^41.0.3"
markdown = "3.3.7"
pydantic= "2.5.2"
svcs = "^23.21.0"
faststream = {extras = ["rabbit"], version = "^0.3.12"}
faststream = {extras = ["rabbit", "redis"], version = "^0.3.13"}

[tool.poetry.group.dev]
optional = true
Expand Down
18 changes: 18 additions & 0 deletions backend/vagrant/development/Vagrant.provision.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ KWAI_DATABASE_PASSWORD=$3
KWAI_RABBITMQ_VHOST=$4
KWAI_RABBITMQ_USER=$5
KWAI_RABBITMQ_PASSWORD=$6
KWAI_REDIS_PASSWORD=$7

apt-get update

Expand Down Expand Up @@ -66,3 +67,20 @@ rabbitmqctl add_user "$KWAI_RABBITMQ_USER" "$KWAI_RABBITMQ_PASSWORD"
rabbitmqctl add_vhost "$KWAI_RABBITMQ_VHOST"
rabbitmqctl set_user_tags "$KWAI_RABBITMQ_USER" administrator
rabbitmqctl set_permissions -p "$KWAI_RABBITMQ_VHOST" "$KWAI_RABBITMQ_USER" ".*" ".*" ".*"

# Install Redis
apt-get install redis-server -y
REDIS_CONFIG=$(cat <<EOF
port 6379
daemonize yes
save 60 1
bind 0.0.0.0
tcp-keepalive 300
dbfilename dump.rdb
dir /var/lib/redis
rdbcompression yes
requirepass ${KWAI_REDIS_PASSWORD}
EOF
)
echo "${REDIS_CONFIG}" > /etc/redis/redis.conf
sudo /etc/init.d/redis-server restart
3 changes: 2 additions & 1 deletion backend/vagrant/development/Vagrantfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ Vagrant.configure("2") do |config|
kwai_config['database']['password'],
kwai_config['rabbitmq']['vhost'],
kwai_config['rabbitmq']['user'],
kwai_config['rabbitmq']['password']
kwai_config['rabbitmq']['password'],
kwai_config['redis']['password']
]

# Mailcatcher
Expand Down
1 change: 1 addition & 0 deletions backend/vagrant/test/Vagrant.provision.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ KWAI_EMAIL_PORT=$8
KWAI_EMAIL_USER=$9
KWAI_EMAIL_PASSWORD=${10}
KWAI_JWT_SECRET=${11}
KWAI_REDIS_PASSWORD=${12}

KWAI_TOML_FILE=$VAGRANT_HOME/.kwai.toml
PROFILE=$(cat <<EOF
Expand Down
3 changes: 2 additions & 1 deletion backend/vagrant/test/Vagrantfile
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Vagrant.configure("2") do |config|
kwai_config['mail']['port'],
kwai_config['mail']['user'],
kwai_config['mail']['password'],
kwai_config['jwt']['secret']
kwai_config['jwt']['secret'],
kwai_config['redis']['password']
]
end

0 comments on commit c2cc2cc

Please sign in to comment.