Skip to content

Commit

Permalink
change function name for processing
Browse files Browse the repository at this point in the history
  • Loading branch information
kshitijrajsharma committed Jan 1, 2024
1 parent c62c75e commit cb8d9bf
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 8 deletions.
4 changes: 2 additions & 2 deletions API/api_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ def process_raw_data(self, params):
raise ex


@celery.task(bind=True, name="process_hdx_request")
def process_hdx_request(self, params):
@celery.task(bind=True, name="process_custom_request")
def process_custom_request(self, params):
params = DynamicCategoriesModel(**params)

if not params.dataset:
Expand Down
4 changes: 2 additions & 2 deletions API/hdx.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from src.config import RATE_LIMIT_PER_MIN
from src.validation.models import DynamicCategoriesModel

from .api_worker import process_hdx_request
from .api_worker import process_custom_request
from .auth import AuthUser, UserRole, staff_required

router = APIRouter(prefix="/custom", tags=["Custom Exports"])
Expand Down Expand Up @@ -794,7 +794,7 @@ async def process_custom_requests(
raise HTTPException(
status_code=400, detail=[{"msg": "Categories can't be empty"}]
)
task = process_hdx_request.apply_async(
task = process_custom_request.apply_async(
args=(params.model_dump(),), queue=queue_name, track_started=True
)
return JSONResponse({"task_id": task.id, "track_link": f"/tasks/status/{task.id}/"})
6 changes: 2 additions & 4 deletions API/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@
router = APIRouter(prefix="/tasks", tags=["Tasks"])


# Connect to the Redis server using the URL
redis_client = redis.StrictRedis.from_url(CELERY_BROKER_URL)


@router.get("/status/{task_id}/", response_model=SnapshotTaskResponse)
@version(1)
def get_task_status(
Expand Down Expand Up @@ -166,6 +162,7 @@ def discard_all_waiting_tasks(user: AuthUser = Depends(admin_required)):
@version(1)
def get_queue_info():
queue_info = {}
redis_client = redis.StrictRedis.from_url(CELERY_BROKER_URL)

for queue_name in queues:
# Get queue length
Expand All @@ -183,6 +180,7 @@ def get_queue_info():
async def get_list_details(queue_name: str):
if queue_name not in queues:
raise HTTPException(status_code=404, detail=f"Queue '{queue_name}' not found")
redis_client = redis.StrictRedis.from_url(CELERY_BROKER_URL)

list_items = redis_client.lrange(queue_name, 0, -1)

Expand Down

0 comments on commit cb8d9bf

Please sign in to comment.