Skip to content

Commit

Permalink
Merge pull request #208 from hotosm/enhance/queu_name
Browse files Browse the repository at this point in the history
Enhance : Queue naming
  • Loading branch information
kshitijrajsharma authored Feb 14, 2024
2 parents 8517386 + 1a47912 commit 46635f5
Show file tree
Hide file tree
Showing 10 changed files with 315 additions and 49 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/Unit-Test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,11 @@ jobs:
- name: Launch Default Worker
run: |
celery --app API.api_worker worker --loglevel=INFO --queues='raw_default' &
celery --app API.api_worker worker --loglevel=INFO --queues='raw_ondemand' &
- name: Launch Special Worker
run: |
celery --app API.api_worker worker --loglevel=INFO --queues='raw_special' &
celery --app API.api_worker worker --loglevel=INFO --queues='raw_daemon' &
- name: Run Tests
run: |
Expand Down
4 changes: 2 additions & 2 deletions API/custom_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ async def process_custom_requests(
]
],
},
"queue": "raw_default",
"queue": "raw_ondemand",
"dataset": {
"dataset_prefix": "hotosm_project_1",
"dataset_folder": "TM",
Expand Down Expand Up @@ -798,7 +798,7 @@ async def process_custom_requests(
dict: Result message.
"""
queue_name = params.queue
if params.queue != "raw_special" and user.role != UserRole.ADMIN.value:
if params.queue != "raw_daemon" and user.role != UserRole.ADMIN.value:
raise HTTPException(
status_code=403,
detail=[{"msg": "Insufficient Permission to choose queue"}],
Expand Down
266 changes: 266 additions & 0 deletions API/data/hdx.sql

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion API/data/tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ CREATE TABLE if not exists public.hdx (
cid INT NULL,
hdx_upload BOOLEAN DEFAULT true,
dataset JSONB,
queue VARCHAR DEFAULT 'raw_special',
queue VARCHAR DEFAULT 'raw_daemon',
meta BOOLEAN DEFAULT false,
categories JSONB NULL,
geometry public.geometry(MultiPolygon, 4326) NULL
Expand Down
4 changes: 2 additions & 2 deletions API/raw_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,8 +440,8 @@ def get_osm_current_snapshot_as_file(
],
)

# queue_name = "raw_special" if not params.uuid else "raw_default"
queue_name = "raw_default" # Everything directs to default now
# queue_name = "raw_daemon" if not params.uuid else "raw_ondemand"
queue_name = "raw_ondemand" # Everything directs to default now
task = process_raw_data.apply_async(
args=(params.model_dump(),),
queue=queue_name,
Expand Down
2 changes: 1 addition & 1 deletion API/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def discard_all_waiting_tasks(user: AuthUser = Depends(admin_required)):
return JSONResponse({"tasks_discarded": purged})


queues = ["raw_default", "raw_special"]
queues = ["raw_ondemand", "raw_daemon"]


@router.get("/queue/")
Expand Down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,20 +106,20 @@ uvicorn API.main:app --reload
### Queues

Currently there are two type of queue implemented :
- "raw_special" : Queue for recurring exports which will replace the previous exports if present on the system , can be enabled through uuid:false API Param
- "raw_default" : Queue for default exports which will create each unique id for exports
- "raw_daemon" : Queue for recurring exports which will replace the previous exports if present on the system , can be enabled through uuid:false API Param
- "raw_ondemand" : Queue for default exports which will create each unique id for exports

### Start Celery Worker

You should be able to start [celery](https://docs.celeryq.dev/en/stable/getting-started/first-steps-with-celery.html#running-the-celery-worker-server) worker by running following command on different shell

- Start for default queue
```
celery --app API.api_worker worker --loglevel=INFO --queues="raw_default" -n 'default_worker'
celery --app API.api_worker worker --loglevel=INFO --queues="raw_ondemand" -n 'default_worker'
```
- Start for recurring queue
```
celery --app API.api_worker worker --loglevel=INFO --queues="raw_special" -n 'recurring_worker'
celery --app API.api_worker worker --loglevel=INFO --queues="raw_daemon" -n 'recurring_worker'
```

Set no of request that a worker can take at a time by using --concurrency
Expand All @@ -134,7 +134,7 @@ pip install SQLAlchemy==2.0.25
Raw Data API uses flower for monitoring the Celery distributed queue. Run this command on a different shell , if you are running redis on same machine your broker could be `redis://localhost:6379//`.

```
celery --broker=redis://redis:6379// --app API.api_worker flower --port=5000 --queues="raw_special,raw_default"
celery --broker=redis://redis:6379// --app API.api_worker flower --port=5000 --queues="raw_daemon,raw_ondemand"
```

OR Simply use flower from application itself
Expand Down
4 changes: 2 additions & 2 deletions src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -1960,7 +1960,7 @@ def create_hdx(self, hdx_data):
hdx_data.get("iso3", None),
hdx_data.get("hdx_upload", True),
json.dumps(hdx_data.get("dataset")),
hdx_data.get("queue", "raw_special"),
hdx_data.get("queue", "raw_daemon"),
hdx_data.get("meta", False),
json.dumps(hdx_data.get("categories", {})),
json.dumps(hdx_data.get("geometry")),
Expand Down Expand Up @@ -2090,7 +2090,7 @@ def update_hdx(self, hdx_id: int, hdx_data):
hdx_data.get("iso3", None),
hdx_data.get("hdx_upload", True),
json.dumps(hdx_data.get("dataset")),
hdx_data.get("queue", "raw_special"),
hdx_data.get("queue", "raw_daemon"),
hdx_data.get("meta", False),
json.dumps(hdx_data.get("categories", {})),
json.dumps(hdx_data.get("geometry")),
Expand Down
66 changes: 33 additions & 33 deletions src/validation/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,22 +292,22 @@ class StatsRequestParams(BaseModel, GeometryValidatorMixin):
max_length=3,
example="NPL",
)
geometry: Optional[
Union[Polygon, MultiPolygon, Feature, FeatureCollection]
] = Field(
default=None,
example={
"type": "Polygon",
"coordinates": [
[
[83.96919250488281, 28.194446860487773],
[83.99751663208006, 28.194446860487773],
[83.99751663208006, 28.214869548073377],
[83.96919250488281, 28.214869548073377],
[83.96919250488281, 28.194446860487773],
]
],
},
geometry: Optional[Union[Polygon, MultiPolygon, Feature, FeatureCollection]] = (
Field(
default=None,
example={
"type": "Polygon",
"coordinates": [
[
[83.96919250488281, 28.194446860487773],
[83.99751663208006, 28.194446860487773],
[83.99751663208006, 28.214869548073377],
[83.96919250488281, 28.214869548073377],
[83.96919250488281, 28.194446860487773],
]
],
},
)
)

@validator("geometry", pre=True, always=True)
Expand Down Expand Up @@ -579,7 +579,7 @@ class DynamicCategoriesModel(BaseModel, GeometryValidatorMixin):
default=None, description="Dataset Configurations for HDX Upload"
)
queue: Optional[str] = Field(
default="raw_special",
default="raw_daemon",
description="Lets you decide which queue you wanna place your task, Requires admin access",
)
meta: bool = Field(
Expand All @@ -604,22 +604,22 @@ class DynamicCategoriesModel(BaseModel, GeometryValidatorMixin):
}
],
)
geometry: Optional[
Union[Polygon, MultiPolygon, Feature, FeatureCollection]
] = Field(
default=None,
example={
"type": "Polygon",
"coordinates": [
[
[83.96919250488281, 28.194446860487773],
[83.99751663208006, 28.194446860487773],
[83.99751663208006, 28.214869548073377],
[83.96919250488281, 28.214869548073377],
[83.96919250488281, 28.194446860487773],
]
],
},
geometry: Optional[Union[Polygon, MultiPolygon, Feature, FeatureCollection]] = (
Field(
default=None,
example={
"type": "Polygon",
"coordinates": [
[
[83.96919250488281, 28.194446860487773],
[83.99751663208006, 28.194446860487773],
[83.99751663208006, 28.214869548073377],
[83.96919250488281, 28.214869548073377],
[83.96919250488281, 28.194446860487773],
]
],
},
)
)

@validator("geometry", pre=True, always=True)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_API.py
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,7 @@ def test_custom_submit_normal_custom_polygon_TM_project():
]
],
},
"queue": "raw_default",
"queue": "raw_ondemand",
"dataset": {
"dataset_prefix": "hotosm_project_1",
"dataset_folder": "TM",
Expand Down

0 comments on commit 46635f5

Please sign in to comment.