Skip to content

Commit

Permalink
fix: first pass at fixing the behaviour of update_category (now updat…
Browse files Browse the repository at this point in the history
…e_categories) (#136)

* fix: first pass at fixing the behaviour of update_category (now update_categories)

* fix: chart test data generation was failing due to exhausting file descriptors on the server

* chore: fmt

* fix: avoiding pulling snap categories if they are already present
  • Loading branch information
sminez authored Oct 17, 2024
1 parent e746e13 commit 9dd3542
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 40 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ integration-test: clear-db-data
MOCK_ADMIN_URL='http://127.0.0.1:11111/__admin__/register-snap' \
HOST='0.0.0.0' \
PORT='8080' \
cargo test --test '*'
cargo test --test '*' $(ARGS)

.PHONY: test-all
test-all: test integration-test
Expand Down
107 changes: 86 additions & 21 deletions src/features/user/infrastructure.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! Infrastructure for user handling
use serde::{de::DeserializeOwned, Deserialize};
use sqlx::{Acquire, Executor, Row};
use sqlx::{pool::PoolConnection, Postgres, QueryBuilder, Row};
use std::sync::Arc;
use tokio::sync::Notify;
use tracing::error;

use crate::{
Expand Down Expand Up @@ -260,8 +262,15 @@ async fn get_snap_categories(
}
}

/// Update the category (we do this every time we get a vote for the time being)
pub(crate) async fn update_category(app_ctx: &AppContext, snap_id: &str) -> Result<(), UserError> {
/// Update the categories for a given snap.
///
/// In the case where we do not have categories, we need to fetch them and store them in the DB.
/// This is racey without coordination so we check to see if any other tasks are currently attempting
/// this and block on them completing if they are, if not then we set up the Notify and they block on us.
pub(crate) async fn update_categories(
snap_id: &str,
app_ctx: &AppContext,
) -> Result<(), UserError> {
let mut pool = app_ctx
.infrastructure()
.repository()
Expand All @@ -271,30 +280,86 @@ pub(crate) async fn update_category(app_ctx: &AppContext, snap_id: &str) -> Resu
UserError::Unknown
})?;

let (n_rows,): (i64,) =
sqlx::query_as("SELECT COUNT(*) FROM snap_categories WHERE snap_id = $1;")
.bind(snap_id)
.fetch_one(&mut *pool)
.await
.map_err(|error| {
error!("{error:?}");
UserError::FailedToCastVote
})?;

// If we have categories for the requested snap in place already then skip updating.
// Eventually we will need to update and refresh categories over time but the assumption for now is
// that snap categories do not change frequently so we do not need to eagerly update them.
if n_rows > 0 {
return Ok(());
}

let mut guard = app_ctx.infrastructure().category_updates.lock().await;
let (notifier, should_wait) = match guard.get(&snap_id.to_string()) {
Some(notifier) => (notifier.clone(), true),
None => (Arc::new(Notify::new()), false),
};

if should_wait {
// Another task is updating the categories for this snap so wait for it to complete and then
// return: https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html#method.notified
drop(guard);
notifier.notified().await;
return Ok(());
}

// At this point we can release the mutex for other calls to update_categories to proceed while
// we update the DB state for the snap_id we are interested in. Any calls between now and when
// we complete the update will block on the notifier we insert here.
guard.insert(snap_id.to_string(), notifier.clone());
drop(guard);

// We can't early return while holding the Notifier as that will leave any waiting tasks
// blocked. Rather than attempt to retry at this stage we allow for stale category data
// until a new task attempts to get data for the same snap.
if let Err(e) = update_categories_inner(snap_id, pool, app_ctx).await {
error!(%snap_id, "unable to update snap categories: {e}");
}

// Grab the mutex around the category_updates so any incoming tasks block behind us and then
// notify all blocked tasks before removing the Notify from the map.
let mut guard = app_ctx.infrastructure().category_updates.lock().await;
notifier.notify_waiters();
guard.remove(&snap_id.to_string());

Ok(())
}

async fn update_categories_inner(
snap_id: &str,
mut pool: PoolConnection<Postgres>,
app_ctx: &AppContext,
) -> Result<(), UserError> {
let client = app_ctx.http_client();
let base = &app_ctx.config().snapcraft_io_uri;
let categories = get_snap_categories(snap_id, base, client).await?;

// Do a transaction because bulk querying doesn't seem to work cleanly
let mut tx = pool.begin().await?;
// The trailing space after the query here is important as the builder will append directly to
// the string provided.
let mut query_builder: QueryBuilder<Postgres> =
QueryBuilder::new("INSERT INTO snap_categories(snap_id, category) ");

// Reset the categories since we're refreshing all of them
tx.execute(
sqlx::query("DELETE FROM snap_categories WHERE snap_categories.snap_id = $1;")
.bind(snap_id),
)
.await?;

for category in categories.iter() {
tx.execute(
sqlx::query("INSERT INTO snap_categories (snap_id, category) VALUES ($1, $2); ")
.bind(snap_id)
.bind(category),
)
.await?;
}
query_builder.push_values(categories, |mut b, category| {
b.push_bind(snap_id).push_bind(category);
});

query_builder
.build()
.execute(&mut *pool)
.await
.map_err(|error| {
error!("{error:?}");
UserError::FailedToCastVote
})?;

tx.commit().await?;
Ok(())
}

Expand Down
12 changes: 7 additions & 5 deletions src/features/user/use_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
},
};

use super::infrastructure::update_category;
use super::infrastructure::update_categories;

/// Create a [`User`] entry, or note that the user has recently been seen, within the current
/// [`AppContext`].
Expand All @@ -36,11 +36,12 @@ pub async fn delete_user(app_ctx: &AppContext, client_hash: &str) -> Result<(),
#[allow(unused_must_use)]
pub async fn vote(app_ctx: &AppContext, vote: Vote) -> Result<(), UserError> {
// Ignore but log warning, it's not fatal
update_category(app_ctx, &vote.snap_id)
update_categories(&vote.snap_id, app_ctx)
.await
.inspect_err(|e| warn!("{}", e));
let result = save_vote_to_db(app_ctx, vote).await;
result?;

save_vote_to_db(app_ctx, vote).await?;

Ok(())
}

Expand All @@ -54,9 +55,10 @@ pub async fn get_snap_votes(
client_hash: String,
) -> Result<Vec<Vote>, UserError> {
// Ignore but log warning, it's not fatal
update_category(app_ctx, &snap_id)
update_categories(&snap_id, app_ctx)
.await
.inspect_err(|e| warn!("{}", e));

get_snap_votes_by_client_hash(app_ctx, snap_id, client_hash).await
}

Expand Down
7 changes: 6 additions & 1 deletion src/utils/infrastructure.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
//! Utilities and structs for creating server infrastructure (database, etc).
use std::{
collections::HashMap,
error::Error,
fmt::{Debug, Formatter},
sync::Arc,
};

use sqlx::{pool::PoolConnection, postgres::PgPoolOptions, PgPool, Postgres};
use tokio::sync::OnceCell;
use tokio::sync::{Mutex, Notify, OnceCell};
use tracing::level_filters::LevelFilter;
use tracing_subscriber::{reload::Handle, Registry};

Expand All @@ -26,6 +27,9 @@ pub struct Infrastructure {
pub log_reload_handle: &'static Handle<LevelFilter, Registry>,
/// The utility which lets us encode user tokens with our JWT credentials
pub jwt_encoder: Arc<JwtEncoder>,
/// In progress category updates that we need to block on
/// FIXME: The logic for this should really live here but it's all DB related.
pub category_updates: Arc<Mutex<HashMap<String, Arc<Notify>>>>,
}

impl Infrastructure {
Expand All @@ -48,6 +52,7 @@ impl Infrastructure {
postgres,
jwt_encoder,
log_reload_handle: reload_handle,
category_updates: Default::default(),
})
}

Expand Down
15 changes: 5 additions & 10 deletions tests/chart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
pub mod common;

use common::{Category, TestHelper};
use futures::future::join_all;
use rand::{thread_rng, Rng};

// !! This test expects to be the only one making use of the "Development" category
Expand All @@ -14,21 +13,17 @@ async fn category_chart_returns_expected_top_snap() -> anyhow::Result<()> {
let t = TestHelper::new();

// Generate a random set of snaps within the given category
let mut tasks = Vec::with_capacity(25);
for _ in 0..25 {
let client = t.clone();
tasks.push(tokio::spawn(async move {
let (upvotes, downvotes) = random_votes(50, 100, 25, 75);
client
.test_snap_with_initial_votes(1, upvotes, downvotes, &[Category::Development])
.await
}));
let (upvotes, downvotes) = random_votes(25, 50, 15, 35);
client
.test_snap_with_initial_votes(1, upvotes, downvotes, &[Category::Development])
.await?;
}
join_all(tasks).await;

// A snap that should be returned as the top snap for the category
let snap_id = t
.test_snap_with_initial_votes(1, 100, 0, &[Category::Development])
.test_snap_with_initial_votes(1, 50, 0, &[Category::Development])
.await?;

let user_token = t.authenticate(t.random_sha_256()).await?;
Expand Down
2 changes: 0 additions & 2 deletions tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,7 @@ impl TestHelper {
vote_up: bool,
) -> anyhow::Result<()> {
let id: String = self.random_sha_256();
// The first call registers and the second authenticates
let token = self.authenticate(id.clone()).await?;
self.authenticate(id).await?;
self.vote(snap_id, snap_revision, vote_up, &token).await?;

Ok(())
Expand Down

0 comments on commit 9dd3542

Please sign in to comment.