From cb8d9bf8750353f06b6dda92d5c2aa1119c0ab1e Mon Sep 17 00:00:00 2001 From: kshitijrajsharma Date: Mon, 1 Jan 2024 19:51:08 +0545 Subject: [PATCH] change function name for processing --- API/api_worker.py | 4 ++-- API/hdx.py | 4 ++-- API/tasks.py | 6 ++---- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/API/api_worker.py b/API/api_worker.py index 458a0d79..95d55c6e 100644 --- a/API/api_worker.py +++ b/API/api_worker.py @@ -195,8 +195,8 @@ def process_raw_data(self, params): raise ex -@celery.task(bind=True, name="process_hdx_request") -def process_hdx_request(self, params): +@celery.task(bind=True, name="process_custom_request") +def process_custom_request(self, params): params = DynamicCategoriesModel(**params) if not params.dataset: diff --git a/API/hdx.py b/API/hdx.py index d5fdd486..b0bf2dd6 100644 --- a/API/hdx.py +++ b/API/hdx.py @@ -6,7 +6,7 @@ from src.config import RATE_LIMIT_PER_MIN from src.validation.models import DynamicCategoriesModel -from .api_worker import process_hdx_request +from .api_worker import process_custom_request from .auth import AuthUser, UserRole, staff_required router = APIRouter(prefix="/custom", tags=["Custom Exports"]) @@ -794,7 +794,7 @@ async def process_custom_requests( raise HTTPException( status_code=400, detail=[{"msg": "Categories can't be empty"}] ) - task = process_hdx_request.apply_async( + task = process_custom_request.apply_async( args=(params.model_dump(),), queue=queue_name, track_started=True ) return JSONResponse({"task_id": task.id, "track_link": f"/tasks/status/{task.id}/"}) diff --git a/API/tasks.py b/API/tasks.py index 27a538c3..ff1b4ab2 100644 --- a/API/tasks.py +++ b/API/tasks.py @@ -16,10 +16,6 @@ router = APIRouter(prefix="/tasks", tags=["Tasks"]) -# Connect to the Redis server using the URL -redis_client = redis.StrictRedis.from_url(CELERY_BROKER_URL) - - @router.get("/status/{task_id}/", response_model=SnapshotTaskResponse) @version(1) def get_task_status( @@ -166,6 +162,7 @@ def discard_all_waiting_tasks(user: AuthUser = Depends(admin_required)): @version(1) def get_queue_info(): queue_info = {} + redis_client = redis.StrictRedis.from_url(CELERY_BROKER_URL) for queue_name in queues: # Get queue length @@ -183,6 +180,7 @@ def get_queue_info(): async def get_list_details(queue_name: str): if queue_name not in queues: raise HTTPException(status_code=404, detail=f"Queue '{queue_name}' not found") + redis_client = redis.StrictRedis.from_url(CELERY_BROKER_URL) list_items = redis_client.lrange(queue_name, 0, -1)