Skip to content

Commit

Permalink
Add new type of invalidation task, more statistics for monitoring, As…
Browse files Browse the repository at this point in the history
…yncExecutor to process async tasks and minor improvements
  • Loading branch information
manuelgdlvh committed Dec 27, 2024
1 parent c8b99a8 commit 74e0be2
Show file tree
Hide file tree
Showing 15 changed files with 642 additions and 169 deletions.
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,23 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).


## 0.3.0 - 2024-12-27

### Added

- Add new invalidation task for replace entry with latest value from remote repository.
- Add more statistics for Cache's and Cache manager.
* Expired tasks
* Pending async tasks
* Expired async tasks
* GET's errors
- Add Async Executor to be able to execute async cache tasks in Cache Manager.
- Add integration tests for the new use cases.

### Fixed


## 0.2.0 - 2024-12-24

### Added
Expand Down
10 changes: 7 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "remcached"
version = "0.2.0"
version = "0.3.0"
edition = "2021"
readme = "README.md"
description = "Caching system designed for efficient storage and retrieval of entities from remote repositories (REST APIs, Database, ...etc)"
Expand All @@ -18,11 +18,14 @@ dashmap = "6.1.0"
thiserror = "2.0.9"
anyhow = "1.0.95"
log = "0.4.22"
tokio = { version = "1.42.0", optional = true }


[features]
tokio = ["dep:tokio"]

[dev-dependencies]
sqlx = { version = "0.8.2", features = ["runtime-tokio", "postgres"] }
tokio = { version = "1.42.0", features = ["rt-multi-thread", "macros"] }
awaitility = "0.3.1"
rstest = "0.22.0"
rstest_reuse = "0.7.0"
Expand All @@ -34,4 +37,5 @@ env_logger = "0.11.6"

[[test]]
name = "integration_tests"
path = "tests/integration/mod.rs"
path = "tests/integration/mod.rs"
required-features = ["tokio"]
112 changes: 70 additions & 42 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,82 +10,110 @@ management system is flexible and can be integrated into your application to enh
operations
load and improving response times for frequently accessed data.

Manual Operations Supported:
<b>Manual Operations through Cache Manager:</b>
1. Force Entry Expiration
2. Invalidation
3. Flush All
4. Initialize
5. Stop
2. Invalidation with properties (Replace entry with provided tuple of Key - Value)
3. Invalidation (Replace entry with latest value from remote repository)
4. Flush All
5. Initialize
6. Stop

Note: When the cache is stopped, all requests will be directed to the remote repository. This is particularly useful for flushing all data or performing intensive operations in the cache without blocking.
<b>Note: When the cache is stopped, all requests will be directed to the remote repository. This is particularly useful for flushing all data or performing intensive operations in the cache without blocking.</b>

Automatic Operations:
<b>Automatic Operations:</b>

1. Entry Expiration

RCache and Cache Manager provide read-only structures that expose detailed statistics for monitoring.

## Key Features
## Crate features
1. <b>tokio</b> - Allow us to use TokioAsyncExecutor passing the Handle of your runtime for async cache tasks (Invalidation only at the moment!)

1. Automatic Cache Cleanup: Once an entry expires, it is automatically removed from the cache, maintaining memory
efficiency.
2. Declarative Cache Design: Cache configuration is intuitive and allows developers to easily define expiration policies
for different types of data.
3. Cache Efficiency: Reduces the need to repeatedly query the remote repository by storing frequently accessed data, leading to
faster response times.
## Async Executor

## How to Use
The AsyncExecutor is a way to manage tasks that run asynchronously, separate from the Cache Manager (which does not support async tasks). You can either create your own async executor using the AsyncExecutor and AsyncTask traits or use the one that is already implemented, such as TokioAsyncExecutor (which is the only option available right now).

1. Implement the RCommands Trait
## Cache Manager Cycle. What is?

The first step is to implement the RCommands trait. This trait should define the essential operations for your specific
repository:

PUT: This operation stores a new entity or updates an existing one in the cache and remote repository.
GET: This operation retrieves the entity from the cache, if it exists, otherwise it can fetch the data from the remote repository.

The trait ensures that all operations are standardized and consistent across different repositories.
## Configuration
### Cache Manager
1. <b>max_pending_ms_await</b> - Maximum wait time (in milliseconds) between moving tasks from the input channel to the binary heap to process them.
2. <b>max_pending_first_poll_ms_await</b> - Maximum wait time to poll the first item in the channel.
3. <b>max_pending_bulk_poll_ms_await</b> - Maximum wait time to poll the next items of the channel after first poll.
4. <b>max_task_drain_size</b> - Maximum number of tasks polled from the channel after first poll.

2. Construct a Cache Manager

Once the RCommands trait is implemented, you need to construct a CacheManager struct.
The CacheManager is responsible for managing multiple cache repositories and handling events such as entry expiration,
invalidation, flush all, ...etc.
### Cache
1. <b>cache_id</b> - Unique identifier of cache
2. <b>entry_expires_in</b> - Expiration time of cached entry

3. Build Your Cache Repositories

For each type of entity you want to cache, you need to create a corresponding repository that leverages the cache
system. Each repository should handle the specific operations for the entity, ensuring that put and get operations
interact with both the cache and the underlying remote repository in a consistent manner.

Repositories can be easily built using the RCommands trait, and you can define custom expiration policies based on the
entity type (for example, a short-lived session cache or a longer-lived configuration data cache).
## Monitoring
RCache and Cache Manager provide read-only structures that expose detailed statistics for monitoring.
### Cache Manager
1. <b>tasks_total</b> - Total of tasks processed or ready to be processed.
2. <b>merged_tasks_total</b> - Tasks merged to reduce processing of same Operation - Key.
3. <b>expired_tasks_total</b> - Total tasks expired (Currently only can be expired invalidations using expires_in value).
4. <b>pending_tasks_total</b> - Total pending tasks waiting to be processed.
5. <b>pending_async_tasks_total</b> - Total pending async tasks scheduled in AsyncExecutor but not finished.
6. <b>expired_async_tasks_total</b> - Total expired async tasks that were scheduled but not finished on time and aborted.
7. <b>cycles_total</b> - Number of executed cycles.
8. <b>cycles_time_ms</b> - Total amount in milliseconds executing cycles.

4. Start the Cache Manager
### Cache
1. <b>hits_total</b> - Total amount of cache hits.
2. <b>hits_time_ms</b> - Total amount in milliseconds of cache hits.
3. <b>miss_total</b> - Total amount of cache miss.
4. <b>miss_time_ms</b> - Total amount in milliseconds of cache miss.
5. <b>gets_errors_total</b> - Total amount of errors in GET operations.
6. <b>puts_total</b> - Total amount of PUT operations.
7. <b>puts_errors_total</b> - Total amount of errors in PUT operations.
8. <b>puts_time_ms</b> - Total amount in milliseconds of PUT operations.
9. <b>invalidations_processed_total</b> - Total amount of invalidations processed. (With/out properties)
10. <b>invalidations_processed_time_ms</b> - Total amount in milliseconds of invalidations processed. (With/out properties)
11. <b>expirations_processed_total</b> - Total amount of entry expirations processed.
12. <b>expirations_processed_time_ms</b> - Total amount in milliseconds of entry expirations processed.

Once the cache repositories are built, you can start the CacheManager. It will initialize and begin managing cache
operations for all the repositories that have been registered. The CacheManager will automatically handle cache
expiration according to the rules you have set.

## Example of usage

```

## How to Use (Using Database and Tokio runtime)


```
#[tokio::main]
async fn main() {
-------------------- Cache Manager --------------------
let async_executor = TokioAsyncExecutor::new(Handle::current());
let mut cache_manager = CacheManager::new(async_executor, CacheManagerConfig::new(3000, 2000, 20, 2048));
-------------------- Cache's --------------------
let db_pool = get_db_pool().await;
let mut cache_manager = CacheManager::new(CacheManagerConfig::new(MAX_PENDING_MS_AWAIT, 20, 2048));
let r_cache = RCache::build(
&mut cache_manager,
RCacheConfig::new("cache_id", 60000),
GameDbCommands::new(db_pool.clone()),
);
let r_cache_2 = RCache::build(
&mut cache_manager,
RCacheConfig::new("cache_id_2", 60000),
GameDbCommands::new(db_pool),
);
-------------------- Start --------------------
cache_manager.start();
let result = r_cache.get(&1234).await;
let result = r_cache_2.get(&1234).await;
}
// Commands for this entity
-------------------- Commands --------------------
pub struct GameDbCommands {
db_pool: Pool<Postgres>,
}
Expand Down Expand Up @@ -124,7 +152,7 @@ impl RCommands for GameDbCommands {
}
}
// Entity
-------------------- Entity --------------------
#[derive(Clone, Debug, PartialEq)]
pub struct Game {
pub id: u64,
Expand Down
75 changes: 75 additions & 0 deletions src/async_executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use std::future::Future;

use crate::types::GenericError;

pub trait AsyncExecutor: Send + 'static {
type Task: AsyncTask;
fn execute<F>(&self, exp_time: u128, future: F) -> Self::Task
where
F: Future<Output=Result<(), GenericError>> + Send + 'static;
}


pub trait AsyncTask: Send {
fn is_finished(&self) -> bool;
fn is_expired(&self) -> bool;
fn abort(&self);
}


#[cfg(feature = "tokio")]
pub mod tokio {
use std::future::Future;
use tokio::runtime::Handle;
use tokio::task::JoinHandle;
use crate::async_executor::{AsyncExecutor, AsyncTask};
use crate::types::GenericError;
use crate::utils::is_now_after;

pub struct TokioAsyncExecutor {
handle: Handle,
}

impl TokioAsyncExecutor {
pub fn new(handle: Handle) -> Self {
Self { handle }
}
}

impl AsyncExecutor for TokioAsyncExecutor {
type Task = TokioAsyncTask;
fn execute<F>(&self, exp_time: u128, future: F) -> Self::Task
where
F: Future<Output=Result<(), GenericError>> + Send + 'static,
{
let task = self.handle.spawn(future);
TokioAsyncTask::new(task, exp_time)
}
}

pub struct TokioAsyncTask {
delegate: JoinHandle<Result<(), GenericError>>,
exp_time: u128,
}

impl TokioAsyncTask {
pub fn new(delegate: JoinHandle<Result<(), GenericError>>, exp_time: u128) -> Self {
Self { delegate, exp_time }
}
}

impl AsyncTask for TokioAsyncTask {
fn is_finished(&self) -> bool {
self.delegate.is_finished()
}

fn is_expired(&self) -> bool {
is_now_after(self.exp_time)
}

fn abort(&self) {
self.delegate.abort();
}
}
}

Loading

0 comments on commit 74e0be2

Please sign in to comment.