Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Key With One Evaluation of the init Future #432

Open
ShiromMakkad opened this issue Jun 27, 2024 · 12 comments
Open

Update Key With One Evaluation of the init Future #432

ShiromMakkad opened this issue Jun 27, 2024 · 12 comments
Assignees
Labels
question Further information is requested

Comments

@ShiromMakkad
Copy link

ShiromMakkad commented Jun 27, 2024

I'm working on a refresh ahead cache that's a wrapper around a moka cache. This cache will need to make update on its keys every x minutes, usually by calling an API. I don't want there to be multiple requests to update a key. Instead I want the requests to be coalesced into one.

From moka::future::OwnedKeySelector::or_insert_with:

This method guarantees that concurrent calls on the same not-existing entry are coalesced into one evaluation of the init future. Only one of the calls evaluates its future (thus returned entry’s is_fresh method returns true), and other calls wait for that future to resolve (and their is_fresh return false).

However, this only runs when the key doesn't exist. I want to update its value. To do this, I'd use moka::future::OwnedKeySelector::and_try_compute_with. But it says,

This method guarantees that concurrent calls on the same key are executed serially. That is, and_try_compute_with calls on the same key never run concurrently. The calls are serialized by the order of their invocation. It uses a key-level lock to achieve this.

I'm looking for a function that has the same behavior as and_try_compute_with but the same concurrency as or_insert_with. Did I miss something when reading the docs or is there not a way to do this?

Right now, I'm simply removing the key before running the closure, but this resets the TTL on the value. I don't want to do that. Also, one question about this approach: Can multiple calls to remove at the same time return copies of the value, or will only one call get the value and all others get None back?

@tatsuya6502 tatsuya6502 self-assigned this Jun 28, 2024
@tatsuya6502 tatsuya6502 added the question Further information is requested label Jun 28, 2024
@tatsuya6502
Copy link
Member

tatsuya6502 commented Jun 28, 2024

Hi. Thank you for using moka.

I'm looking for a function that has the same behavior as and_try_compute_with but the same concurrency as or_insert_with.

I believe you can use and_try_compute_with method for your use case.

Maybe the document is a bit confusing, but and_try_compute_with and or_insert_with actually have the same concurrency. They both use key-level lock and never evaluate more than one future at a time on the same key.

or_insert_with:

Only one of the calls evaluates its future (...), and other calls wait for that future to resolve (...).

Here is how or_insert_with works under the hood:

  1. Two async tasks A and B calls or_insert_with on the same key at the same time.
  2. Only one of them, let's say A, can acquire an exclusive writer lock on the key-level RwLock.
  3. B should have failed to acquire the writer lock, so it will try to get a shared reader lock instead. Since A still holds the writer lock, B has to wait for A to release the lock.
  4. A's init future is evaluated.
  5. When the future is resolved to a value V, A inserts V to the cache and also stores it to the place where the writer lock protects.
  6. A releases the writer lock.
  7. Now B acquires a shared reader lock, and gets V protected by the lock.

Here is how and_try_compute_with works under the hood:

  1. Two async tasks A and B calls and_compute_with on the same key at the same time.
  2. Only one of them, let's say A, can acquire an exclusive writer lock on the key-level RwLock.
  3. Since A still holds the writer lock, B has to wait for A to release the lock.
  4. A's f closure is evaluated, and then returned future is evaluated.
  5. When the future is resolved, and let's say Op::Put(V) is returned, A inserts V to the cache.
  6. A releases the writer lock.
  7. Now B acquires the exclusive writer lock.
  8. B's f closure is evaluated, and then returned future is evaluated.
  9. When the future is resolved, and let's say Op::Nop (no-operation) is returned, B gets the existing V inserted by A.

In both cases, B has to wait for A to finish evaluating the future. (The same concurrency.)

In and_try_compute_with's case, B's f closure is evaluated at step 8, but if it returns std::future::ready(Op::Nop), it should be resolved immediately and the stored value V is returned. The time spent on resolving the future will be negligible.

@tatsuya6502
Copy link
Member

tatsuya6502 commented Jun 28, 2024

Also, one question about this approach: Can multiple calls to remove at the same time return copies of the value, or will only one call get the value and all others get None back?

It should be the latter one; only one call gets the value and all others get None back.

@ShiromMakkad
Copy link
Author

ShiromMakkad commented Jun 28, 2024

Thanks for your response. So I can either go with my current solution (although it fails on the ttl and I could lose the key if the update fails) or I can have B resolve immediately.

Is there a way that I can say if(this closure had to block and wait for A) return Op::Nop at the top of my closure? Or do I need to track that external to moka? The behavior I'm looking for is similar to or_insert_with where B acquires a shared reader lock rather than evaluating its f closure, but any solution where B doesn't make a network call works.

@tatsuya6502
Copy link
Member

Is there a way that I can say if(this closure had to block and wait for A) return Op::Nop at the top of my closure?

No. Currently, moka does not provide a way to know if the closure is going to block. (The same thing to or_insert_with.)

I am not sure how your refresh ahead cache would work, but I guess instead of B being blocked from step 3 to step 7 below, you want B to return immediately with the current V in the cache?

  1. Since A still holds the writer lock, B has to wait for A to release the lock.
  2. A's f closure is evaluated, and then returned future is evaluated.
  3. When the future is resolved, and let's say Op::Put(V) is returned, A inserts V to the cache.
  4. A releases the writer lock.
  5. Now B acquires the exclusive writer lock.
  6. B's f closure is evaluated, and then returned future is evaluated.
  7. When the future is resolved, and let's say Op::Nop (no-operation) is returned, B gets the existing V inserted by A.

@ShiromMakkad
Copy link
Author

I don't really mind if 'B' blocks during steps 3-7 but yes, returning immediately would be the ideal behavior.

I've got a seperate function for getting data. The update function needs to:

  1. Return whether the update returned successfully and if there was an error, return that error.
  2. Ensure that multiple update operations cannot happen at once. This is to prevent unnecessary load to the upstream service by fetching the exact same data multiple times.
  3. I don't need update to return the value stored in the cache. I've got a seperate function for that.

'B' blocking while 'A' is updating isnt ideal but it's acceptable as long as 'B's function early returns (so it has to know that A just ran).

Considering the behavior of '.remove()', this should be possible as-is but it would be convenient if there was a ready-made function for this so I can preserve ttls.

@tatsuya6502
Copy link
Member

tatsuya6502 commented Jun 29, 2024

Thank you for explaining. I have a better understanding of the issue now.

  1. Ensure that multiple update operations cannot happen at once. This is to prevent unnecessary load to the upstream service by fetching the exact same data multiple times.

This is done by and_try_compute_with. It guarantees only one update operation to happen at a time for the same key.

as long as 'B's function early returns (so it has to know that A just ran).

OK. So this is why you wanted the update operations from A and B to be coalesced.

I think what you really want is to ensure that the update operation for the same key should not happen more than once in a given time frame. (B should return early if its f is called within X seconds after A has updated the value. Not necessary if they happen at the same time.) This cannot be handled by coalescing because there is a case when B is called after A has updated the value. They happen at separate times, so they cannot be coalesced.

You can achieve this by storing the last update time of the value in the cache and use and_try_compute_with. The following code snippet demonstrates this:

// Cargo.toml
//
// [dependencies]
// futures-util = "0.3.30"
// moka = { version = "0.12.7", features = ["future"] }
// tokio = { version = "1.38.0", features = ["rt-multi-thread", "macros", "sync", "time"] }

use std::{sync::Arc, time::{Duration, Instant}};

use moka::future::Cache;

pub type Key = String;
pub type Value = String;

#[derive(Clone, Debug)]
pub struct ValueAndUpdatedAt<V> {
    /// The value.
    #[allow(unused)]
    value: V,
    /// The time when the value was last updated.
    updated_at: Instant,
}

async fn refresh(
    cache: &Cache<Key, ValueAndUpdatedAt<Value>>,
    key: &Key,
) -> Result<(), Box<String>> {
    use moka::ops::compute::{CompResult, Op};

    let key = key.clone();

    let result = cache
        .entry(key.clone())
        .and_try_compute_with(move |maybe_entry| async {
            if let Some(entry) = maybe_entry {
                // Do early return if the entry was updated less than 8 seconds ago.
                if entry.into_value().updated_at.elapsed() < Duration::from_secs(8) {
                    return Ok(Op::Nop) as Result<_, Box<String>>;
                }
            }

            // If we are here, `maybe_entry` should be `None` or the entry was
            // updated more than or equal to 8 seconds ago. Get the latest value
            // from the upstream service and insert it to this cache.
            //
            // Since `and_try_compute_with` ensures that only one task can enter
            // this async block at a time for the same key, we can be sure that
            // the value in the upstream service is retrieved only once.
            let value = get_latest_from_upstream(key).await?;

            // Update the value in the cache.
            Ok(Op::Put(ValueAndUpdatedAt {
                value,
                updated_at: Instant::now(),
            }))
        })
        .await;

    match result {
        Err(e) => {
            eprintln!("Error: {e:?}");
            return Err(e);
        }
        Ok(CompResult::Unchanged(_entry)) => {
            println!(
                "  Value existed in the cache. Did not update the value \
                because not enough time has passed since the last update."
            );
        }
        Ok(CompResult::ReplacedWith(_entry)) => {
            println!(
                "  Value existed in the cache. \
                Updated the cached value with the latest one from the upstream service."
            );
        }
        Ok(CompResult::Inserted(_entry)) => {
            println!(
                "  Value did not exist in the cache. \
                Inserted the latest value from the upstream service."
            );
        }
        Ok(CompResult::StillNone(_key)) => unreachable!(),
        Ok(CompResult::Removed(_entry)) => unreachable!(),
    }

    Ok(())
}

async fn get_latest_from_upstream(_key: Key) -> Result<Value, Box<String>> {
    // Simulate an upstream service that takes some time to respond
    tokio::time::sleep(Duration::from_secs(1)).await;
    Ok("value".to_string())
}

#[tokio::main]
async fn main() -> Result<(), Box<String>> {
    let cache = Cache::builder()
        .max_capacity(100)
        .time_to_live(Duration::from_secs(360))
        .build();

    let key = "key".to_string();

    println!("Refreshing the value in the cache.");
    refresh(&cache, &key).await?;

    println!("\nSleeping for 10 seconds...");
    tokio::time::sleep(Duration::from_secs(10)).await;

    println!("\nRun two async tasks concurrently and try to refresh the value for the same key.");
    let barrier = Arc::new(tokio::sync::Barrier::new(2));

    let handles = (0..2).into_iter().map(|_| {
        let cache = cache.clone();
        let key = key.clone();
        let barrier = barrier.clone();

        tokio::spawn(async move {
            barrier.wait().await;
            refresh(&cache, &key).await
        })
    });

    // Wait for all tasks to complete.
    futures_util::future::join_all(handles).await;
    println!("\nAll tasks completed.");

    println!("\nSleeping for 5 seconds...");
    tokio::time::sleep(Duration::from_secs(5)).await;

    println!("\nTry to refresh the value for the key.");
    refresh(&cache, &key).await?;

    println!("\nSleeping for 5 seconds...");
    tokio::time::sleep(Duration::from_secs(5)).await;

    println!("\nRefresh the value for the key again.");
    refresh(&cache, &key).await?;

    Ok(())
}
$ cargo run

Refreshing the value in the cache.
  Value did not exist in the cache. Inserted the latest value from the upstream service.

Sleeping for 10 seconds...

Run two async tasks concurrently and try to refresh the value for the same key.
  Value existed in the cache. Updated the cached value with the latest one from the upstream service.
  Value existed in the cache. Did not update the value because not enough time has passed since the last update.

All tasks completed.

Sleeping for 5 seconds...

Try to refresh the value for the key.
  Value existed in the cache. Did not update the value because not enough time has passed since the last update.

Sleeping for 5 seconds...

Refresh the value for the key again.
  Value existed in the cache. Updated the cached value with the latest one from the upstream service.

Well, actually the cache already stores the last update time for each key, but it is not exposed to the user. When I have more time, I will expose this via a new method of Entry. (There is a plan to add metadata() method to the Entry #312 (comment))

@tatsuya6502
Copy link
Member

Right now, I'm simply removing the key before running the closure, but this resets the TTL on the value. I don't want to do that.

In the above code snippet, and_try_compute_with resets the TTL of the value when it is updated. If you want to keep the TTL of the value, you can use per-entry expiration instead of the global TTL for the cache. (An example and the API)

  • Make the trait method expire_after_create to return the new TTL Some(Duration) for the value when it is newly inserted.
  • Make the trait method expire_after_update to return the passed duration_until_expiry, so the TTL of the value is not reset when the value itself is updated.
  • Make the trait method expire_after_read to return the passed duration_until_expiry, so the TTL of the value is not reset when the value is read.

@tatsuya6502
Copy link
Member

tatsuya6502 commented Jun 29, 2024

I don't really mind if 'B' blocks during steps 3-7 but yes, returning immediately would be the ideal behavior.

OK. For now, I would not have enough time to implement this, but hopefully in near future.

EDIT: Created #433. Please subscribe the issue if you are interested.

@ben-manes
Copy link

ben-manes commented Jun 29, 2024

This is a bit hard to understand, but please review Caffeine to see if anything it does helps refine your approach.

It’s refreshAfterWrite will asynchronously reload an entry if accessed after a time threshold and return the currently cached value. This way active content stays fresh, inactive is allowed to expire, and it hides the reload penalty from cache usages. While this is individual per key, a coalescing loader can batch over a time/space window for a more efficient operation. The in-flight reloads are stored in a secondary mapping, invalidated if the mapping was modified (linearizability), and dedupes its calls.

A common confusion is by users who instead want to periodically reload the cache contents. That’s not really a cache but a warm replica as no eviction should ever take place. That’s simply a scheduled task to recreate an immutable map, so it is clearer to write that code directly without needing a library.

@tatsuya6502
Copy link
Member

This is a bit hard to understand, but please review Caffeine to see if anything it does helps refine your approach.

Thank you for the info! I will take a look at refreshAfterWrite.

@tatsuya6502
Copy link
Member

Here is another example. This one only checks if B is blocked by A. If so, B does early return.

Like the previous example, this stores the last update time of the value in the cache and uses and_try_compute_with. Only the early return condition was changed:

// Cargo.toml
//
// [dependencies]
// futures-util = "0.3.30"
// moka = { version = "0.12.7", features = ["future"] }
// tokio = { version = "1.38.0", features = ["rt-multi-thread", "macros", "sync", "time"] }

use std::{sync::Arc, time::{Duration, Instant}};

use moka::future::Cache;

pub type Key = String;
pub type Value = String;

#[derive(Clone, Debug)]
pub struct ValueAndUpdatedAt<V> {
    /// The value.
    #[allow(unused)]
    value: V,
    /// The time when the value was last updated.
    updated_at: Instant,
}

async fn refresh(
    cache: &Cache<Key, ValueAndUpdatedAt<Value>>,
    key: &Key,
) -> Result<(), Box<String>> {
    use moka::ops::compute::{CompResult, Op};

    let key = key.clone();
    let started_at = Instant::now();

    let result = cache
        .entry(key.clone())
        .and_try_compute_with(move |maybe_entry| async move {
            if let Some(entry) = maybe_entry {
                // Do early return if we were blocked by another async task
                // updating the value for the same key.
                //
                // This can be checked if our started_at time is earlier than or
                // equal to the time that entry was updated by other async task.
                if started_at <= entry.into_value().updated_at {
                    return Ok(Op::Nop) as Result<_, Box<String>>;
                }
            }

            // If we are here, `maybe_entry` should be `None` or the entry was
            // updated _before_ we started the computation.
            //
            // Since `and_try_compute_with` ensures that only one task can enter
            // this block at a time for the same key, we can be sure that the
            // value for the key in the upstream service is updated only once.
            let value = get_latest_from_upstream(key).await?;

            // Update the value in the cache.
            Ok(Op::Put(ValueAndUpdatedAt {
                value,
                updated_at: Instant::now(),
            }))
        })
        .await;

    match result {
        Err(e) => {
            eprintln!("Error: {e:?}");
            return Err(e);
        }
        Ok(CompResult::Unchanged(_entry)) => {
            println!(
                "  Value existed in the cache. Did not update the value \
                because it was already done by another async task running concurrently."
            );
        }
        Ok(CompResult::ReplacedWith(_entry)) => {
            println!(
                "  Value existed in the cache. \
                Updated the cached value with the latest one from the upstream service."
            );
        }
        Ok(CompResult::Inserted(_entry)) => {
            println!(
                "  Value did not exist in the cache. \
                Inserted the latest value from the upstream service."
            );
        }
        Ok(CompResult::StillNone(_key)) => unreachable!(),
        Ok(CompResult::Removed(_entry)) => unreachable!(),
    }

    Ok(())
}

async fn get_latest_from_upstream(_key: Key) -> Result<Value, Box<String>> {
    // Simulate an upstream service that takes some time to respond
    tokio::time::sleep(Duration::from_secs(1)).await;
    Ok("value".to_string())
}

#[tokio::main]
async fn main() -> Result<(), Box<String>> {
    let cache = Cache::builder()
        .max_capacity(100)
        .time_to_live(Duration::from_secs(360))
        .build();

    let key = "key".to_string();

    println!("Refreshing the value in the cache.");
    refresh(&cache, &key).await?;

    println!("\nSleeping for 10 seconds...");
    tokio::time::sleep(Duration::from_secs(10)).await;

    println!("\nRun two async tasks concurrently and try to refresh the value for the same key.");
    let barrier = Arc::new(tokio::sync::Barrier::new(2));

    let handles = (0..2).into_iter().map(|_| {
        let cache = cache.clone();
        let key = key.clone();
        let barrier = barrier.clone();

        tokio::spawn(async move {
            barrier.wait().await;
            refresh(&cache, &key).await
        })
    });

    // Wait for all tasks to complete.
    futures_util::future::join_all(handles).await;
    println!("\nAll tasks completed.");

    println!("\nSleeping for 5 seconds...");
    tokio::time::sleep(Duration::from_secs(5)).await;

    println!("\nRefresh the value for the key.");
    refresh(&cache, &key).await?;

    println!("\nSleeping for 5 seconds...");
    tokio::time::sleep(Duration::from_secs(5)).await;

    println!("\nRefresh the value for the key again.");
    refresh(&cache, &key).await?;

    Ok(())
}
$ cargo run

Refreshing the value in the cache.
  Value did not exist in the cache. Inserted the latest value from the upstream service.

Sleeping for 10 seconds...

Run two async tasks concurrently and try to refresh the value for the same key.
  Value existed in the cache. Updated the cached value with the latest one from the upstream service.
  Value existed in the cache. Did not update the value because it was already done by another async task running concurrently.

All tasks completed.

Sleeping for 5 seconds...

Refresh the value for the key.
  Value existed in the cache. Updated the cached value with the latest one from the upstream service.

Sleeping for 5 seconds...

Refresh the value for the key again.
  Value existed in the cache. Updated the cached value with the latest one from the upstream service.

@ShiromMakkad
Copy link
Author

Thank you so much. I'll implement this example!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

3 participants