Skip to content

Commit

Permalink
prototype tools for handling prod issues
Browse files Browse the repository at this point in the history
  • Loading branch information
Richard Kuo (Danswer) committed Jan 7, 2025
1 parent 7f81947 commit 5cf1467
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 0 deletions.
83 changes: 83 additions & 0 deletions backend/scripts/celery_purge_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Tool to run operations on Celery/Redis in production
# this is a work in progress and isn't completely put together yet
# but can serve as a stub for future operations
import argparse
import logging
from logging import getLogger

from redis import Redis

from onyx.background.celery.celery_redis import celery_get_queue_length
from onyx.configs.app_configs import REDIS_DB_NUMBER_CELERY
from onyx.redis.redis_pool import RedisPool

# Configure the logger
logging.basicConfig(
level=logging.INFO, # Set the log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", # Log format
handlers=[logging.StreamHandler()], # Output logs to console
)

logger = getLogger(__name__)

REDIS_PASSWORD = ""


def celery_purge_queue(queue: str, tenant_id: str) -> None:
"""Purging a celery queue is extremely difficult because the queue is a list
and the only way an item can be removed from a list is by VALUE, which is
a linear scan. Therefore, to purge the list of many values is roughly
n^2."""

pool = RedisPool.create_pool(
host="127.0.0.1",
port=6380,
db=REDIS_DB_NUMBER_CELERY,
password=REDIS_PASSWORD,
ssl=True,
ssl_cert_reqs="optional",
ssl_ca_certs=None,
)

r = Redis(connection_pool=pool)

length = celery_get_queue_length(queue, r)

logger.info(f"queue={queue} length={length}")

# processed = 0
# deleted = 0
# for i in range(len(OnyxCeleryPriority)):
# queue_name = queue
# if i > 0:
# queue_name += CELERY_SEPARATOR
# queue_name += str(i)

# length = r.llen(queue_name)
# for i in range(length):
# task_raw: bytes | None = r.lindex(queue_name, i)
# if not task_raw:
# break

# processed += 1
# task_str = task_raw.decode("utf-8")
# task = json.loads(task_str)
# task_kwargs_str = task["headers"]["kwargsrepr"]
# task_kwargs = json.loads(task_kwargs_str)
# task_tenant_id = task_kwargs["tenant_id"]
# if task_tenant_id and task_tenant_id == "tenant_id":
# print("Delete tenant_id={tenant_id}")
# if
# deleted += 1

# logger.info(f"processed={processed} deleted={deleted}")


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Purge celery queue by tenant id")
parser.add_argument("--queue", type=str, help="Queue to purge", required=True)

parser.add_argument("--tenant", type=str, help="Tenant ID to purge", required=True)

args = parser.parse_args()
celery_purge_queue(queue=args.queue, tenant_id=args.tenant)
84 changes: 84 additions & 0 deletions backend/scripts/onyx_redis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# Tool to run helpful operations on Redis in production
import argparse
import logging
import sys
import time
from logging import getLogger
from typing import cast

from redis import Redis

from onyx.redis.redis_pool import RedisPool

# Configure the logger
logging.basicConfig(
level=logging.INFO, # Set the log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", # Log format
handlers=[logging.StreamHandler()], # Output logs to console
)

logger = getLogger(__name__)

REDIS_PASSWORD = ""


def onyx_redis(command: str) -> int:
pool = RedisPool.create_pool(
host="127.0.0.1",
port=6380,
password=REDIS_PASSWORD,
ssl=True,
ssl_cert_reqs="optional",
ssl_ca_certs=None,
)

r = Redis(connection_pool=pool)

if command == "purge_connectorsync":
"""Purge connector tasksets. Used when the tasks represented in the tasksets
have been purged."""
return purge_by_match_and_type("*connectorsync_taskset*", "set", r)

return 255


def purge_by_match_and_type(match_pattern: str, match_type: str, r: Redis) -> int:
"""match_pattern: glob style expression
match_type: https://redis.io/docs/latest/commands/type/
"""

# cursor = "0"
# while cursor != 0:
# cursor, data = self.scan(
# cursor=cursor, match=match, count=count, _type=_type, **kwargs
# )

start = time.monotonic()

count = 0
for key in r.scan_iter(match_pattern, count=10000):
key_type = r.type(key)
if key_type != match_type.encode("utf-8"):
continue

key = cast(bytes, key)
key_str = key.decode("utf-8")

count += 1
logger.info(f"Deleting item {count}: {key_str}")
r.delete(key)

logger.info(f"Found {count} matches.")

elapsed = time.monotonic() - start
logger.info(f"Time elapsed: {elapsed:.2f}s")
return 0


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Onyx Redis Tools")
parser.add_argument("--command", type=str, help="Operation to run", required=True)

args = parser.parse_args()
exitcode = onyx_redis(command=args.command)
sys.exit(exitcode)

0 comments on commit 5cf1467

Please sign in to comment.