Skip to content

Commit

Permalink
Feature/add transactions (#14)
Browse files Browse the repository at this point in the history
* fix: add transactions and worker for heartbeat

* fix: generate load in new task

* impl semaphore to limit lease tasks

* fix: clippy fixes

---------

Co-authored-by: Anil Sharma <[email protected]>
  • Loading branch information
anil0906 and anil-kindred authored Apr 29, 2024
1 parent 03aaea2 commit 918ff56
Show file tree
Hide file tree
Showing 24 changed files with 452 additions and 202 deletions.
67 changes: 56 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@ test.json:
$(call pp,rust test.json...)
cargo +nightly test --workspace --exclude scylla_pg_js -- -Z unstable-options --format json --report-time > coverage/test-report.json

test.load.add_task:
$(call pp,run app...)
cargo run --release --bin load_add_task

# make withenv RECIPE=test.load.lease_task worker=worker1
test.load.lease_task:
$(call pp,run app...)
cargo run --release --bin load_lease_task -- ${worker}
# PHONY ###########################################################################################

# To force rebuild of not-file-related targets, make the targets "phony".
Expand Down
49 changes: 29 additions & 20 deletions docs/scylla_pg_client.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,57 +10,65 @@ Initiate instance of Scylla of passing DBConfig. We can use this instance to int

```typescript
type DbConfig = {
pgHost: string
pgPort: number
pgUser: string
pgPassword: string
pgDatabase: string
pgHost: string
pgPort: number
pgUser: string
pgPassword: string
pgDatabase: string
};

let sc = await Scylla.initiate({
pgHost: "127.0.0.1",
pgPort: 5432,
pgUser: "admin",
pgPassword: "admin",
pgDatabase: "scylla",
pgPoolSize: 50
});
pgHost: "127.0.0.1",
pgPort: 5432,
pgUser: "admin",
pgPassword: "admin",
pgDatabase: "scylla",
pgPoolSize: 50
});

```

### Add Tasks

Queue is Logical division of tasks. Workers can choose tasks from certain queue. Highest priority tasks will be leased first.
Queue is Logical division of tasks. Workers can choose tasks from certain queue. Highest priority tasks will be leased
first.

```typescript
let atm = {
rn: "4b8d323c-19ab-470f-b7c8-d0380b91ca3a",
queue: "task_queue",
priority: 0.4,
spec: {a: 1, b: 2}
}
let task_added = await sc.addTask(atm);
}
let task_added = await sc.addTask(atm);

```

### Lease N Tasks

This will lease 3 tasks based on time and priority in descending order. WorkerId will be assigned to it and last argument is taskTimeOutInSecs. Worker needs to send heartbeat before that otherwise it will be picked by monitor and reset to ready state.
This will lease 3 tasks based on time and priority in descending order. WorkerId will be assigned to it and last
argument is taskTimeOutInSecs. Worker needs to send heartbeat before that otherwise it will be picked by monitor and
reset to ready state.
Task timeout is optional. Default value is 10 seconds.

```typescript
let task_added = await sc.leaseNTasks("task_queue", 3, "worker_id", 10);
```

### Sending Heart beat

This process is essential to let others know that task is still being processed and optionally progress can be updated by worker.
This process is essential to let others know that task is still being processed and optionally progress can be updated
by worker.
Again taskTimeOutInSecs is optional, if skipped it will be set to default to 10 seconds.

```typescript
let task = await sc.heartBeatTask("4b8d323c-19ab-470f-b7c8-d0380b91ca3a", 0.2, 20);
let task = await sc.heartBeatTask("4b8d323c-19ab-470f-b7c8-d0380b91ca3a", "worker1", 0.2, 20);
```

### Complete Task

Once task is completed, worker can complete the task. So it can be removed from the queue based on `MONITOR_TASK_RETENTION_PERIOD_IN_SECS` in monitor.
Once task is completed, worker can complete the task. So it can be removed from the queue based
on `MONITOR_TASK_RETENTION_PERIOD_IN_SECS` in monitor.

```typescript
let task = await sc.completeTask("4b8d323c-19ab-470f-b7c8-d0380b91ca3a");
Expand All @@ -74,4 +82,5 @@ In case task is not yet picked up for processing. It can be cancelled. This is a
let task = await sc.cancelTask("4b8d323c-19ab-470f-b7c8-d0380b91ca3a");
```

There are other functions like `yieldTask`, `getTask`, `getTasks`, `leaseTask` and `abortTask`. That has been part of library and documentation for those will be added soon.
There are other functions like `yieldTask`, `getTask`, `getTasks`, `leaseTask` and `abortTask`. That has been part of
library and documentation for those will be added soon.
2 changes: 1 addition & 1 deletion scylla_models/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "scylla_models"
version = "0.1.23"
version = "0.1.24"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down
4 changes: 2 additions & 2 deletions scylla_operations/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "scylla_operations"
version = "0.1.1"
version = "0.1.24"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand All @@ -9,7 +9,7 @@ edition = "2021"
async-trait = "0.1"
thiserror = "1.0"
serde_json = "1.0"
tokio = {version = "1.20", features = ["macros", "rt"] }
tokio = { version = "1.20", features = ["macros", "rt"] }
chrono = { version = "0.4.19", features = ["serde"] }
# Logging
log = { version = "0.4", features = ["kv_unstable", "std", "kv_unstable_serde"] }
Expand Down
16 changes: 12 additions & 4 deletions scylla_operations/src/update_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ fn prepare_status_task(mut task: Task, update_task_model: &UpdateTaskModel) -> T
}
task
}

/// # Errors
/// Returns `ScyllaOperationsError`
fn validate_yield_operation(task: &Task) -> Result<(), ScyllaOperationsError> {
Expand All @@ -67,17 +68,20 @@ fn prepare_yield_task(mut task: Task) -> Task {
task.history.push(task_yield_history);
task
}

/// # Errors
/// Returns `ScyllaOperationsError`
fn validate_heart_beat_operation(task: &Task) -> Result<(), ScyllaOperationsError> {
if task.status == TaskStatus::Running {
fn validate_heart_beat_operation(task: &Task, utm: &UpdateTaskModel) -> Result<(), ScyllaOperationsError> {
if task.status == TaskStatus::Running && task.owner == utm.worker {
Ok(())
} else {
} else if task.status != TaskStatus::Running {
Err(ScyllaOperationsError::InvalidOperation(
UpdateOperation::HeartBeat,
TaskStatus::Running,
task.status.clone(),
))
} else {
Err(ScyllaOperationsError::ValidationFailed("Only owner can extend the heartbeat.".to_string()))
}
}

Expand All @@ -89,6 +93,7 @@ fn prepare_heart_beat_task(mut task: Task, update_task_model: &UpdateTaskModel)
}
task
}

/// # Errors
/// Returns `ScyllaOperationsError`
fn validate_lease_operation(task: &Task, update_task_model: &UpdateTaskModel) -> Result<(), ScyllaOperationsError> {
Expand Down Expand Up @@ -120,6 +125,7 @@ fn prepare_lease_task(mut task: Task, update_task_model: &UpdateTaskModel) -> Ta
task.history.push(task_assignment_history);
task
}

/// # Errors
/// Returns `ScyllaOperationsError`
fn validate_reset_operation(task: &Task) -> Result<(), ScyllaOperationsError> {
Expand Down Expand Up @@ -163,6 +169,7 @@ fn prepare_reset_task(mut task: Task) -> Task {
}
task
}

/// # Arguments
/// public function to update task
/// # Example
Expand Down Expand Up @@ -195,7 +202,7 @@ pub fn request_handler(task: Task, update_task_model: &UpdateTaskModel) -> Resul
Ok(prepare_status_task(task, update_task_model))
}
UpdateOperation::HeartBeat => {
validate_heart_beat_operation(&task)?;
validate_heart_beat_operation(&task, update_task_model)?;
Ok(prepare_heart_beat_task(task, update_task_model))
}
UpdateOperation::Yield => {
Expand All @@ -212,5 +219,6 @@ pub fn request_handler(task: Task, update_task_model: &UpdateTaskModel) -> Resul
}
}
}

#[cfg(test)]
mod tests;
Loading

0 comments on commit 918ff56

Please sign in to comment.