diff --git a/api/proto.lock b/api/proto.lock index 2fbdf20..22051c0 100644 --- a/api/proto.lock +++ b/api/proto.lock @@ -5,8 +5,8 @@ sha512 = "d75800df0d4744c6b0f4d9a9952d3bfd0bb6b24a8babd19104cc11b54a525f85551b3c [[schemas]] subject = "nfts" -version = 30 -sha512 = "bee70bd6f0f18a8049f93bceb9c4b500b49352f9d19d55d5a411da92cbd786c88bec47f73e1ef6946ceefc7de8e558f704bf8187be9d9f4e49bd102baec29327" +version = 31 +sha512 = "449574f8551ab8c17824af9e08b1658ad1b26ac80340230ddf02e7a1e0979d8a47025913a6598799cf83dd1a9cda87697ee87a13f404ebb52c95ea0084205767" [[schemas]] subject = "organization" @@ -20,8 +20,8 @@ sha512 = "c5ddf43d2958ec690ee2261d0ff9808b67ce810d2fc4b6077f96f561929a920f03509f [[schemas]] subject = "solana_nfts" -version = 11 -sha512 = "967fefde938a0f6ce05194e4fca15673e681caac54d8aeec114c5d38418632b9696dbaf5362345a15114e5abb49de55d0af8b9edcc0f2c91f9ef1ccc4ff55d68" +version = 12 +sha512 = "4f85496c50a82cb40faa097cf6d0cb23275b3b90cb561d01388f3e5a71282a8b8e1eea617b7d712b0e415d65af483209fac2db1591456fa814a1f41a1c457433" [[schemas]] subject = "timestamp" diff --git a/api/proto.toml b/api/proto.toml index 44a45d7..5bb1a54 100644 --- a/api/proto.toml +++ b/api/proto.toml @@ -3,9 +3,9 @@ endpoint = "https://schemas.holaplex.tools" [schemas] organization = 5 -nfts = 30 +nfts = 31 customer = 2 treasury = 23 -solana_nfts = 11 +solana_nfts = 12 polygon_nfts = 6 timestamp = 1 \ No newline at end of file diff --git a/api/src/dataloaders/collection.rs b/api/src/dataloaders/collection.rs index 59d4df2..b129d54 100644 --- a/api/src/dataloaders/collection.rs +++ b/api/src/dataloaders/collection.rs @@ -2,9 +2,17 @@ use std::collections::HashMap; use async_graphql::{dataloader::Loader as DataLoader, FieldError, Result}; use poem::async_trait; -use sea_orm::prelude::*; +use redis::{AsyncCommands, Client as Redis}; +use sea_orm::{prelude::*, FromQueryResult, QueryFilter, QuerySelect}; -use crate::{db::Connection, entities::collections, objects::Collection}; +use crate::{ + db::Connection, + entities::{ + collection_mints, collections, drops, + sea_orm_active_enums::{CreationStatus, DropType}, + }, + objects::Collection, +}; #[derive(Debug, Clone)] pub struct Loader { @@ -35,3 +43,188 @@ impl DataLoader for Loader { .collect() } } + +#[derive(FromQueryResult, Debug, Clone)] +struct CollectionTotalMintsCount { + id: Uuid, + count: i64, +} + +#[derive(Debug, Clone)] +pub struct TotalMintsLoader { + pub db: Connection, + pub redis: Redis, +} + +impl TotalMintsLoader { + #[must_use] + pub fn new(db: Connection, redis: Redis) -> Self { + Self { db, redis } + } +} + +#[async_trait] +impl DataLoader for TotalMintsLoader { + type Error = FieldError; + type Value = i64; + + async fn load(&self, keys: &[Uuid]) -> Result, Self::Error> { + let mut results: HashMap = HashMap::new(); + let mut missing_keys: Vec = Vec::new(); + + let mut redis_connection = self.redis.get_async_connection().await?; + + for key in keys { + let redis_key = format!("collection:{key}:total_mints"); + match redis_connection.get::<_, i64>(&redis_key).await { + Ok(value) => { + results.insert(*key, value); + }, + Err(_) => { + missing_keys.push(*key); + }, + } + } + + if missing_keys.is_empty() { + return Ok(results); + } + + let conn = self.db.get(); + let count_results = collection_mints::Entity::find() + .select_only() + .column_as(collection_mints::Column::Id.count(), "count") + .column_as(collection_mints::Column::CollectionId, "id") + .filter( + collection_mints::Column::CollectionId + .is_in(missing_keys.iter().map(ToOwned::to_owned)) + .and(collection_mints::Column::CreationStatus.ne(CreationStatus::Queued)), + ) + .group_by(collection_mints::Column::CollectionId) + .into_model::() + .all(conn) + .await?; + let count_results = count_results + .into_iter() + .map(|result| (result.id, result.count)) + .collect::>(); + + for key in missing_keys { + let count = count_results.get(&key).copied().unwrap_or_default(); + let redis_key = format!("collection:{key}:total_mints"); + + redis_connection + .set::<_, i64, Option>(&redis_key, count) + .await?; + + results.insert(key, count); + } + + Ok(results) + } +} + +#[derive(FromQueryResult)] +struct CollectionSupplyCount { + id: Uuid, + count: i64, +} + +#[derive(Debug, Clone)] +pub struct SupplyLoader { + pub db: Connection, + pub redis: Redis, +} + +impl SupplyLoader { + #[must_use] + pub fn new(db: Connection, redis: Redis) -> Self { + Self { db, redis } + } +} + +#[async_trait] +impl DataLoader for SupplyLoader { + type Error = FieldError; + type Value = Option; + + async fn load(&self, keys: &[Uuid]) -> Result, Self::Error> { + let mut results: HashMap = HashMap::new(); + let mut missing_keys: Vec = Vec::new(); + + let mut redis_connection = self.redis.get_async_connection().await?; + + for key in keys { + let redis_key = format!("collection:{key}:supply"); + match redis_connection.get::<_, Option>(&redis_key).await { + Ok(value) => { + results.insert(*key, value); + }, + Err(_) => { + missing_keys.push(*key); + }, + } + } + + if missing_keys.is_empty() { + return Ok(results); + } + + let conn = self.db.get(); + let mut computed_supplies: Vec = Vec::new(); + + let collection_with_drops = collections::Entity::find() + .filter(collections::Column::Id.is_in(missing_keys.iter().map(ToOwned::to_owned))) + .inner_join(drops::Entity) + .select_also(drops::Entity) + .all(conn) + .await?; + + for (collection, drop) in collection_with_drops { + if let Some(drop) = drop { + if drop.drop_type == DropType::Open { + computed_supplies.push(collection.id); + continue; + } + continue; + } + + let redis_key = format!("collection:{}:supply", collection.id); + + let supply = redis_connection + .set::<_, Option, Option>(&redis_key, collection.supply) + .await?; + + results.insert(collection.id, supply); + } + + let count_results = collection_mints::Entity::find() + .select_only() + .column_as(collection_mints::Column::Id.count(), "count") + .column_as(collection_mints::Column::CollectionId, "id") + .filter( + collection_mints::Column::CollectionId + .is_in(computed_supplies.iter().map(ToOwned::to_owned)), + ) + .group_by(collection_mints::Column::CollectionId) + .into_model::() + .all(conn) + .await? + .into_iter() + .map(|result| (result.id, result.count)) + .collect::>(); + + for key in computed_supplies { + let count = count_results.get(&key).copied(); + let redis_key = format!("collection:{key}:supply"); + + redis_connection + .set::<_, Option, Option>(&redis_key, count) + .await?; + + results.insert(key, count); + } + + Ok(results) + } +} diff --git a/api/src/dataloaders/collection_drop.rs b/api/src/dataloaders/collection_drop.rs index 49642a7..4f12237 100644 --- a/api/src/dataloaders/collection_drop.rs +++ b/api/src/dataloaders/collection_drop.rs @@ -2,13 +2,9 @@ use std::collections::HashMap; use async_graphql::{dataloader::Loader as DataLoader, FieldError, Result}; use poem::async_trait; -use sea_orm::{prelude::*, JoinType, QuerySelect}; +use sea_orm::prelude::*; -use crate::{ - db::Connection, - entities::{collections, drops}, - objects::Drop, -}; +use crate::{db::Connection, entities::drops, objects::Drop}; #[derive(Debug, Clone)] pub struct Loader { @@ -29,26 +25,13 @@ impl DataLoader for Loader { async fn load(&self, keys: &[Uuid]) -> Result, Self::Error> { let drops = drops::Entity::find() - .join(JoinType::InnerJoin, drops::Relation::Collections.def()) - .select_also(collections::Entity) .filter(drops::Column::CollectionId.is_in(keys.iter().map(ToOwned::to_owned))) .all(self.db.get()) .await?; - drops + Ok(drops .into_iter() - .map(|(drop, collection)| { - Ok(( - drop.collection_id, - Drop::new( - drop.clone(), - collection.ok_or(FieldError::new(format!( - "no collection for the drop {}", - drop.id - )))?, - ), - )) - }) - .collect::>>() + .map(|drop| (drop.collection_id, drop.into())) + .collect::>()) } } diff --git a/api/src/dataloaders/drop.rs b/api/src/dataloaders/drop.rs index 3cacca8..118ac9d 100644 --- a/api/src/dataloaders/drop.rs +++ b/api/src/dataloaders/drop.rs @@ -1,21 +1,17 @@ use std::collections::HashMap; -use async_graphql::{dataloader::Loader as DataLoader, FieldError, Result}; +use async_graphql::{dataloader::Loader, FieldError, Result}; use poem::async_trait; -use sea_orm::{prelude::*, JoinType, QuerySelect}; +use sea_orm::prelude::*; -use crate::{ - db::Connection, - entities::{collections, drops}, - objects::Drop, -}; +use crate::{db::Connection, entities::drops, objects::Drop}; #[derive(Debug, Clone)] -pub struct Loader { +pub struct DropLoader { pub db: Connection, } -impl Loader { +impl DropLoader { #[must_use] pub fn new(db: Connection) -> Self { Self { db } @@ -23,31 +19,19 @@ impl Loader { } #[async_trait] -impl DataLoader for Loader { +impl Loader for DropLoader { type Error = FieldError; type Value = Drop; async fn load(&self, keys: &[Uuid]) -> Result, Self::Error> { let drops = drops::Entity::find() - .join(JoinType::InnerJoin, drops::Relation::Collections.def()) - .select_also(collections::Entity) .filter(drops::Column::Id.is_in(keys.iter().map(ToOwned::to_owned))) .all(self.db.get()) .await?; - drops + Ok(drops .into_iter() - .map(|(drop, collection)| { - Ok(( - drop.id, - Drop::new( - drop.clone(), - collection.ok_or_else(|| { - FieldError::new(format!("no collection for the drop {}", drop.id)) - })?, - ), - )) - }) - .collect::>>() + .map(|drop| (drop.id, drop.into())) + .collect()) } } diff --git a/api/src/dataloaders/drops.rs b/api/src/dataloaders/drops.rs index 3572f9e..936e56d 100644 --- a/api/src/dataloaders/drops.rs +++ b/api/src/dataloaders/drops.rs @@ -2,13 +2,9 @@ use std::collections::HashMap; use async_graphql::{dataloader::Loader as DataLoader, FieldError, Result}; use poem::async_trait; -use sea_orm::{prelude::*, JoinType, QuerySelect}; +use sea_orm::prelude::*; -use crate::{ - db::Connection, - entities::{collections, drops}, - objects::Drop, -}; +use crate::{db::Connection, entities::drops, objects::Drop}; #[derive(Debug, Clone)] pub struct ProjectLoader { @@ -29,17 +25,13 @@ impl DataLoader for ProjectLoader { async fn load(&self, keys: &[Uuid]) -> Result, Self::Error> { let drops = drops::Entity::find() - .join(JoinType::InnerJoin, drops::Relation::Collections.def()) - .select_also(collections::Entity) .filter(drops::Column::ProjectId.is_in(keys.iter().map(ToOwned::to_owned))) .all(self.db.get()) .await?; Ok(drops .into_iter() - .filter_map(|(drop, collection)| { - collection.map(|collection| (drop.project_id, Drop::new(drop, collection))) - }) + .map(|drop| (drop.project_id, drop.into())) .fold(HashMap::new(), |mut acc, (project, drop)| { acc.entry(project).or_insert_with(Vec::new); diff --git a/api/src/dataloaders/mod.rs b/api/src/dataloaders/mod.rs index f6648c7..c37100d 100644 --- a/api/src/dataloaders/mod.rs +++ b/api/src/dataloaders/mod.rs @@ -14,14 +14,17 @@ mod project_collections; mod switch_collection_histories; mod update_histories; -pub use collection::Loader as CollectionLoader; +pub use collection::{ + Loader as CollectionLoader, SupplyLoader as CollectionSupplyLoader, + TotalMintsLoader as CollectionTotalMintsLoader, +}; pub use collection_drop::Loader as CollectionDropLoader; pub use collection_mints::{ CollectionMintLoader, Loader as CollectionMintsLoader, OwnerLoader as CollectionMintsOwnerLoader, QueuedMintsLoader, }; pub use creators::Loader as CreatorsLoader; -pub use drop::Loader as DropLoader; +pub use drop::DropLoader; pub use drops::ProjectLoader as ProjectDropsLoader; pub use holders::Loader as HoldersLoader; pub use metadata_json::{ diff --git a/api/src/entities/collection_mints.rs b/api/src/entities/collection_mints.rs index 27dd451..0a8ad80 100644 --- a/api/src/entities/collection_mints.rs +++ b/api/src/entities/collection_mints.rs @@ -88,4 +88,12 @@ impl Entity { .select_also(collections::Entity) .filter(Column::Id.eq(id)) } + + pub fn filter_by_collection(id: Uuid) -> Select { + Self::find().filter( + Column::CollectionId + .eq(id) + .and(Column::CreationStatus.ne(CreationStatus::Queued)), + ) + } } diff --git a/api/src/entities/collections.rs b/api/src/entities/collections.rs index 2c87027..f72abc9 100644 --- a/api/src/entities/collections.rs +++ b/api/src/entities/collections.rs @@ -15,7 +15,6 @@ pub struct Model { #[sea_orm(nullable)] pub credits_deduction_id: Option, pub creation_status: CreationStatus, - pub total_mints: i64, #[sea_orm(column_type = "Text", nullable)] pub address: Option, #[sea_orm(nullable)] diff --git a/api/src/events.rs b/api/src/events.rs index 9cacdb5..5e4f71b 100644 --- a/api/src/events.rs +++ b/api/src/events.rs @@ -8,7 +8,6 @@ use hub_core::{ uuid::{self, Uuid}, }; use sea_orm::{ - sea_query::{Expr, SimpleExpr}, ActiveModelTrait, ColumnTrait, EntityTrait, JoinType, QueryFilter, QuerySelect, RelationTrait, Set, TransactionTrait, }; @@ -350,7 +349,6 @@ impl Processor { project_id: Set(Uuid::from_str(&project_id)?), credits_deduction_id: Set(None), creation_status: Set(CreationStatus::Created), - total_mints: Set(0), address: Set(Some(mint_address)), signature: Set(None), seller_fee_basis_points: Set(seller_fee_basis_points @@ -470,18 +468,6 @@ impl Processor { index_attributes(&self.db, json_model.id, attributes).await?; index_files(&self.db, json_model.id, files).await?; - let collection_id = Uuid::from_str(&collection_id)?; - - collections::Entity::update_many() - .col_expr( - collections::Column::TotalMints, - >::into(Expr::col(collections::Column::TotalMints)) - .add(SimpleExpr::Value(1.into())), - ) - .filter(collections::Column::Id.eq(collection_id)) - .exec(self.db.get()) - .await?; - Ok(()) } diff --git a/api/src/handlers.rs b/api/src/handlers.rs index f79bba5..0012726 100644 --- a/api/src/handlers.rs +++ b/api/src/handlers.rs @@ -31,7 +31,13 @@ pub async fn graphql_handler( balance: Balance, req: GraphQLRequest, ) -> Result { - let context = AppContext::new(state.connection.clone(), user_id, organization, balance); + let context = AppContext::new( + state.connection.clone(), + state.redis.clone(), + user_id, + organization, + balance, + ); Ok(state .schema diff --git a/api/src/lib.rs b/api/src/lib.rs index 9a3c1ee..a3a4de4 100644 --- a/api/src/lib.rs +++ b/api/src/lib.rs @@ -25,10 +25,11 @@ use blockchains::{polygon::Polygon, solana::Solana}; use dataloaders::{ CollectionDropLoader, CollectionLoader, CollectionMintHistoriesLoader, CollectionMintLoader, CollectionMintMintHistoryLoader, CollectionMintTransfersLoader, CollectionMintsLoader, - CollectionMintsOwnerLoader, CreatorsLoader, DropLoader, DropMintHistoryLoader, HoldersLoader, - MetadataJsonAttributesLoader, MetadataJsonLoader, MintCreatorsLoader, MinterMintHistoryLoader, - ProjectCollectionLoader, ProjectCollectionsLoader, ProjectDropsLoader, QueuedMintsLoader, - SwitchCollectionHistoryLoader, UpdateMintHistoryLoader, + CollectionMintsOwnerLoader, CollectionSupplyLoader, CollectionTotalMintsLoader, CreatorsLoader, + DropLoader, DropMintHistoryLoader, HoldersLoader, MetadataJsonAttributesLoader, + MetadataJsonLoader, MintCreatorsLoader, MinterMintHistoryLoader, ProjectCollectionLoader, + ProjectCollectionsLoader, ProjectDropsLoader, QueuedMintsLoader, SwitchCollectionHistoryLoader, + UpdateMintHistoryLoader, }; use db::Connection; use hub_core::{ @@ -46,6 +47,7 @@ use metrics::Metrics; use mutations::Mutation; use poem::{async_trait, FromRequest, Request, RequestBody}; use queries::Query; +use redis::Client as Redis; #[allow(clippy::pedantic)] pub mod proto { @@ -239,6 +241,7 @@ pub struct AppState { pub polygon: Polygon, pub asset_proxy: AssetProxy, pub metadata_json_upload_job_queue: JobQueue, + pub redis: Redis, } impl AppState { @@ -253,6 +256,7 @@ impl AppState { polygon: Polygon, asset_proxy: AssetProxy, metadata_json_upload_job_queue: JobQueue, + redis: Redis, ) -> Self { Self { schema, @@ -263,6 +267,7 @@ impl AppState { polygon, asset_proxy, metadata_json_upload_job_queue, + redis, } } } @@ -272,6 +277,7 @@ pub struct AppContext { user_id: UserID, organization_id: OrganizationId, balance: Balance, + redis: Redis, project_drops_loader: DataLoader, project_collections_loader: DataLoader, project_collection_loader: DataLoader, @@ -294,6 +300,8 @@ pub struct AppContext { collection_mint_transfers_loader: DataLoader, switch_collection_history_loader: DataLoader, queued_mints_loader: DataLoader, + collection_total_mints_loader: DataLoader, + collection_supply_loader: DataLoader, } impl AppContext { @@ -301,6 +309,7 @@ impl AppContext { #[allow(clippy::similar_names)] pub fn new( db: Connection, + redis: Redis, user_id: UserID, organization_id: OrganizationId, balance: Balance, @@ -346,12 +355,21 @@ impl AppContext { let switch_collection_history_loader = DataLoader::new(SwitchCollectionHistoryLoader::new(db.clone()), tokio::spawn); let queued_mints_loader = DataLoader::new(QueuedMintsLoader::new(db.clone()), tokio::spawn); + let collection_total_mints_loader = DataLoader::new( + CollectionTotalMintsLoader::new(db.clone(), redis.clone()), + tokio::spawn, + ); + let collection_supply_loader = DataLoader::new( + CollectionSupplyLoader::new(db.clone(), redis.clone()), + tokio::spawn, + ); Self { db, user_id, organization_id, balance, + redis, project_drops_loader, project_collections_loader, project_collection_loader, @@ -374,6 +392,8 @@ impl AppContext { collection_mint_transfers_loader, switch_collection_history_loader, queued_mints_loader, + collection_total_mints_loader, + collection_supply_loader, } } } diff --git a/api/src/main.rs b/api/src/main.rs index 0f8f2a1..e7743ac 100644 --- a/api/src/main.rs +++ b/api/src/main.rs @@ -64,7 +64,7 @@ pub fn main() { let metadata_json_upload_task_context = MetadataJsonUploadContext::new(hub_uploads, solana.clone(), polygon.clone()); - let job_queue = JobQueue::new(redis_client, connection.clone()); + let job_queue = JobQueue::new(redis_client.clone(), connection.clone()); let worker = Worker::::new( job_queue.clone(), connection.clone(), @@ -80,6 +80,7 @@ pub fn main() { polygon.clone(), common.asset_proxy, job_queue.clone(), + redis_client, ); let cons = common.consumer_cfg.build::().await?; diff --git a/api/src/mutations/drop.rs b/api/src/mutations/drop.rs index b782290..507189e 100644 --- a/api/src/mutations/drop.rs +++ b/api/src/mutations/drop.rs @@ -4,7 +4,7 @@ use hub_core::{ credits::{CreditsClient, TransactionId}, producer::Producer, }; -use sea_orm::{prelude::*, JoinType, ModelTrait, QuerySelect, Set, TransactionTrait}; +use sea_orm::{prelude::*, ModelTrait, Set, TransactionTrait}; use serde::{Deserialize, Serialize}; use super::collection::{validate_creators, validate_json, validate_solana_creator_verification}; @@ -67,7 +67,7 @@ impl Mutation { let owner_address = fetch_owner(conn, input.project, input.blockchain).await?; let supply = if input.drop_type == DropType::Open { - Some(0) + None } else { input.supply.map(TryInto::try_into).transpose()? }; @@ -161,7 +161,7 @@ impl Mutation { .await?; Ok(CreateDropPayload { - drop: Drop::new(drop_model, collection), + drop: drop_model.into(), }) } @@ -293,9 +293,7 @@ impl Mutation { drop_am.creation_status = Set(CreationStatus::Pending); let drop = drop_am.update(conn).await?; - Ok(CreateDropPayload { - drop: Drop::new(drop, collection), - }) + Ok(CreateDropPayload { drop: drop.into() }) } /// This mutation allows for the temporary blocking of the minting of editions and can be resumed by calling the resumeDrop mutation. pub async fn pause_drop( @@ -306,26 +304,19 @@ impl Mutation { let AppContext { db, .. } = ctx.data::()?; let conn = db.get(); - let (drop, collection) = Drops::find() - .join(JoinType::InnerJoin, drops::Relation::Collections.def()) - .select_also(Collections) + let drop = Drops::find() .filter(drops::Column::Id.eq(input.drop)) .one(conn) .await? .ok_or(Error::new("drop not found"))?; - let collection_model = collection.ok_or(Error::new(format!( - "no collection found for drop {}", - input.drop - )))?; - let mut drops_active_model: drops::ActiveModel = drop.into(); drops_active_model.paused_at = Set(Some(Utc::now().into())); let drop_model = drops_active_model.update(db.get()).await?; Ok(PauseDropPayload { - drop: Drop::new(drop_model, collection_model), + drop: drop_model.into(), }) } @@ -338,27 +329,20 @@ impl Mutation { let AppContext { db, .. } = ctx.data::()?; let conn = db.get(); - let (drop, collection) = Drops::find() - .join(JoinType::InnerJoin, drops::Relation::Collections.def()) - .select_also(Collections) + let drop = Drops::find() .filter(drops::Column::Id.eq(input.drop)) .one(conn) .await? .ok_or(Error::new("drop not found"))?; - let collection_model = collection.ok_or(Error::new(format!( - "no collection found for drop {}", - input.drop - )))?; - let mut drops_active_model: drops::ActiveModel = drop.into(); drops_active_model.paused_at = Set(None); - let drop_model = drops_active_model.update(db.get()).await?; + let drop_model = drops_active_model.update(conn).await?; Ok(ResumeDropPayload { - drop: Drop::new(drop_model, collection_model), + drop: drop_model.into(), }) } @@ -375,19 +359,12 @@ impl Mutation { let AppContext { db, .. } = ctx.data::()?; let conn = db.get(); - let (drop, collection) = Drops::find() - .join(JoinType::InnerJoin, drops::Relation::Collections.def()) - .select_also(Collections) + let drop = Drops::find() .filter(drops::Column::Id.eq(input.drop)) .one(conn) .await? .ok_or(Error::new("drop not found"))?; - let collection_model = collection.ok_or(Error::new(format!( - "no collection found for drop {}", - input.drop - )))?; - let mut drops_active_model: drops::ActiveModel = drop.into(); drops_active_model.shutdown_at = Set(Some(Utc::now().into())); @@ -395,7 +372,7 @@ impl Mutation { let drop_model = drops_active_model.update(db.get()).await?; Ok(ShutdownDropPayload { - drop: Drop::new(drop_model, collection_model), + drop: drop_model.into(), }) } @@ -598,7 +575,7 @@ impl Mutation { tx.commit().await?; Ok(PatchDropPayload { - drop: Drop::new(drop_model, collection), + drop: drop_model.into(), }) } } diff --git a/api/src/mutations/mint.rs b/api/src/mutations/mint.rs index 2ecbafb..4c372be 100644 --- a/api/src/mutations/mint.rs +++ b/api/src/mutations/mint.rs @@ -6,6 +6,7 @@ use hub_core::{ credits::{CreditsClient, TransactionId}, producer::Producer, }; +use redis::AsyncCommands; use sea_orm::{ prelude::*, sea_query::{Func, SimpleExpr}, @@ -39,7 +40,8 @@ use crate::{ objects::{CollectionMint, Creator, MetadataJsonInput}, proto::{ self, nft_events::Event as NftEvent, CreationStatus as NftCreationStatus, MetaplexMetadata, - MintCollectionCreation, MintCreation, NftEventKey, NftEvents, RetryUpdateSolanaMintPayload, + MintCollectionCreation, MintCreation, MintOpenDropTransaction, NftEventKey, NftEvents, + RetryUpdateSolanaMintPayload, SolanaMintOpenDropBatchedPayload, }, Actions, AppContext, OrganizationId, UserID, }; @@ -62,10 +64,12 @@ impl Mutation { user_id, organization_id, balance, + redis, .. } = ctx.data::()?; let credits = ctx.data::>()?; let conn = db.get(); + let mut redis_conn = redis.get_async_connection().await?; let solana = ctx.data::()?; let polygon = ctx.data::()?; let nfts_producer = ctx.data::>()?; @@ -89,11 +93,17 @@ impl Mutation { // Call check_drop_status to check that drop is currently running check_drop_status(&drop_model)?; - if collection.supply == Some(collection.total_mints) { + let total_mints = collection_mints::Entity::filter_by_collection(collection.id) + .count(conn) + .await?; + + let total_mints = i64::try_from(total_mints)?; + + if collection.supply == Some(total_mints) { return Err(Error::new("Collection is sold out")); } - let edition = collection.total_mints.add(1); + let edition = total_mints.add(1); let owner_address = fetch_owner(conn, collection.project_id, collection.blockchain).await?; @@ -123,10 +133,6 @@ impl Mutation { let collection_mint_model = collection_mint_active_model.insert(conn).await?; - let mut collection_am = collections::ActiveModel::from(collection.clone()); - collection_am.total_mints = Set(edition); - collection_am.update(&tx).await?; - // inserts a mint histories record in the database let mint_history_am = mint_histories::ActiveModel { mint_id: Set(collection_mint_model.id), @@ -220,6 +226,10 @@ impl Mutation { tx.commit().await?; + redis_conn + .del(format!("collection:{}:total_mints", collection.id)) + .await?; + let event_key = NftEventKey { id: collection_mint_model.id.to_string(), user_id: user_id.to_string(), @@ -407,10 +417,12 @@ impl Mutation { user_id, organization_id, balance, + redis, .. } = ctx.data::()?; let credits = ctx.data::>()?; let conn = db.get(); + let mut redis_conn = redis.get_async_connection().await?; let nfts_producer = ctx.data::>()?; let metadata_json_upload_job_queue = ctx.data::()?; @@ -494,10 +506,6 @@ impl Mutation { am.insert(&tx).await?; } - let mut collection_am = collections::ActiveModel::from(collection.clone()); - collection_am.total_mints = Set(collection.total_mints.add(1)); - collection_am.update(&tx).await?; - let mint_history_am = mint_histories::ActiveModel { mint_id: Set(collection_mint_model.id), wallet: Set(input.recipient), @@ -512,6 +520,10 @@ impl Mutation { tx.commit().await?; + redis_conn + .del(format!("collection:{}:total_mints", collection.id)) + .await?; + metadata_json_upload_job_queue .enqueue(MetadataJsonUploadTask { caller: MetadataJsonUploadCaller::MintToCollection( @@ -972,6 +984,7 @@ impl Mutation { user_id, organization_id, balance, + redis, .. } = ctx.data::()?; @@ -980,6 +993,7 @@ impl Mutation { let solana = ctx.data::()?; let conn = db.get(); + let mut redis_conn = redis.get_async_connection().await?; let UserID(id) = user_id; let OrganizationId(org) = organization_id; @@ -1049,12 +1063,6 @@ impl Mutation { let tx = conn.begin().await?; - let mut collection_am = collections::ActiveModel::from(collection.clone()); - - collection_am.total_mints = Set(collection.total_mints.add(1)); - - collection_am.update(&tx).await?; - let mut mint_am: collection_mints::ActiveModel = mint.into(); mint_am.creation_status = Set(CreationStatus::Pending); @@ -1079,6 +1087,10 @@ impl Mutation { tx.commit().await?; + redis_conn + .del(format!("collection:{}:total_mints", collection.id)) + .await?; + match collection.blockchain { BlockchainEnum::Solana => { solana @@ -1138,10 +1150,12 @@ impl Mutation { user_id, organization_id, balance, + redis, .. } = ctx.data::()?; let credits = ctx.data::>()?; let conn = db.get(); + let mut redis_conn = redis.get_async_connection().await?; let solana = ctx.data::()?; let nfts_producer = ctx.data::>()?; @@ -1208,10 +1222,6 @@ impl Mutation { let tx = conn.begin().await?; - let mut collection_am = collections::ActiveModel::from(collection.clone()); - collection_am.total_mints = Set(collection.total_mints.add(1)); - collection_am.update(&tx).await?; - let mut mint_am: collection_mints::ActiveModel = mint.into(); mint_am.creation_status = Set(CreationStatus::Pending); @@ -1236,6 +1246,10 @@ impl Mutation { tx.commit().await?; + redis_conn + .del(format!("collection:{}:total_mints", collection.id)) + .await?; + let event_key = NftEventKey { id: mint.id.to_string(), user_id: user_id.to_string(), @@ -1289,6 +1303,194 @@ impl Mutation { collection_mint: mint.into(), }) } + + async fn mint_random_queued_to_drop_batched( + &self, + ctx: &Context<'_>, + input: MintRandomQueuedBatchedInput, + ) -> Result { + let AppContext { + db, + user_id, + organization_id, + balance, + .. + } = ctx.data::()?; + let credits = ctx.data::>()?; + let conn = db.get(); + let nfts_producer = ctx.data::>()?; + + let UserID(id) = user_id; + let OrganizationId(org) = organization_id; + + let user_id = id.ok_or(Error::new("X-USER-ID header not found"))?; + let org_id = org.ok_or(Error::new("X-ORGANIZATION-ID header not found"))?; + let balance = balance + .0 + .ok_or(Error::new("X-CREDIT-BALANCE header not found"))?; + + let batch_size = input.recipients.len(); + + if batch_size == 0 { + return Err(Error::new("No recipients provided")); + } + + if batch_size > 250 { + return Err(Error::new("Batch size cannot be greater than 250")); + } + + let drop = drops::Entity::find_by_id(input.drop) + .one(conn) + .await? + .ok_or(Error::new("drop not found"))?; + + let result = CollectionMints::find() + .select_also(metadata_jsons::Entity) + .join( + JoinType::InnerJoin, + collection_mints::Entity::belongs_to(metadata_jsons::Entity) + .from(collection_mints::Column::Id) + .to(metadata_jsons::Column::Id) + .into(), + ) + .filter(collection_mints::Column::CollectionId.eq(drop.collection_id)) + .filter(collection_mints::Column::CreationStatus.eq(CreationStatus::Queued)) + .order_by(SimpleExpr::FunctionCall(Func::random()), Order::Asc) + .limit(Some(batch_size.try_into()?)) + .all(conn) + .await?; + + let (mints, _): (Vec<_>, Vec<_>) = result.iter().cloned().unzip(); + + let creators = mints.load_many(mint_creators::Entity, conn).await?; + + if mints.len() != batch_size { + return Err(Error::new("Not enough mints found for the drop")); + } + + let collection = collections::Entity::find_by_id(drop.collection_id) + .one(conn) + .await? + .ok_or(Error::new("collection not found"))?; + + let project_id = collection.project_id; + let blockchain = collection.blockchain; + + if blockchain != BlockchainEnum::Solana { + return Err(Error::new("Only Solana is supported at this time")); + } + + let owner_address = fetch_owner(conn, project_id, blockchain).await?; + + let action = if input.compressed { + Actions::MintCompressed + } else { + Actions::Mint + }; + + let event_key = NftEventKey { + id: collection.id.to_string(), + user_id: user_id.to_string(), + project_id: project_id.to_string(), + }; + + let mut transactions = Vec::new(); + + for (((mint, metadata_json), creators), recipient) in result + .into_iter() + .zip(creators.into_iter()) + .zip(input.recipients.into_iter()) + { + let metadata_json = metadata_json.ok_or(Error::new("No metadata json found"))?; + let metadata_uri = metadata_json + .uri + .ok_or(Error::new("No metadata json uri found"))?; + + let TransactionId(deduction_id) = credits + .submit_pending_deduction( + org_id, + user_id, + action, + collection.blockchain.into(), + balance, + ) + .await?; + + let tx = conn.begin().await?; + + let mut mint_am: collection_mints::ActiveModel = mint.into(); + + mint_am.creation_status = Set(CreationStatus::Pending); + mint_am.credits_deduction_id = Set(Some(deduction_id)); + mint_am.compressed = Set(Some(input.compressed)); + mint_am.owner = Set(Some(recipient.clone())); + mint_am.seller_fee_basis_points = Set(collection.seller_fee_basis_points); + + let mint = mint_am.update(&tx).await?; + + let mint_history_am = mint_histories::ActiveModel { + mint_id: Set(mint.id), + wallet: Set(recipient.clone()), + collection_id: Set(collection.id), + tx_signature: Set(None), + status: Set(CreationStatus::Pending), + created_at: Set(Utc::now().into()), + ..Default::default() + }; + + mint_history_am.insert(&tx).await?; + + tx.commit().await?; + + nfts_producer + .send( + Some(&NftEvents { + event: Some(NftEvent::DropMinted(MintCreation { + drop_id: drop.id.to_string(), + status: NftCreationStatus::InProgress as i32, + })), + }), + Some(&NftEventKey { + id: mint.id.to_string(), + project_id: drop.project_id.to_string(), + user_id: user_id.to_string(), + }), + ) + .await?; + + transactions.push(MintOpenDropTransaction { + recipient_address: recipient, + metadata: Some(MetaplexMetadata { + owner_address: owner_address.clone(), + name: metadata_json.name, + symbol: metadata_json.symbol, + metadata_uri, + seller_fee_basis_points: mint.seller_fee_basis_points.into(), + creators: creators.into_iter().map(Into::into).collect(), + }), + mint_id: mint.id.to_string(), + }); + } + + nfts_producer + .send( + Some(&NftEvents { + event: Some(NftEvent::SolanaMintOpenDropBatched( + SolanaMintOpenDropBatchedPayload { + collection_id: collection.id.to_string(), + compressed: input.compressed, + mint_open_drop_transactions: transactions, + }, + )), + }), + Some(&event_key), + ) + .await?; + + Ok(MintRandomQueuedBatchedPayload { + collection_mints: mints.into_iter().map(Into::into).collect(), + }) + } } fn validate_compress(blockchain: BlockchainEnum, compressed: bool) -> Result<(), Error> { @@ -1473,3 +1675,17 @@ pub struct MintRandomQueuedInput { recipient: String, compressed: bool, } + +/// Represents input data for `mint_random_queued_batched` mutation +#[derive(Debug, Clone, InputObject)] +pub struct MintRandomQueuedBatchedInput { + drop: Uuid, + recipients: Vec, + compressed: bool, +} + +/// Represents payload data for `mint_random_queued_batched` mutation +#[derive(Debug, Clone, SimpleObject)] +pub struct MintRandomQueuedBatchedPayload { + collection_mints: Vec, +} diff --git a/api/src/objects/collection.rs b/api/src/objects/collection.rs index e0d2c3d..9399284 100644 --- a/api/src/objects/collection.rs +++ b/api/src/objects/collection.rs @@ -1,4 +1,4 @@ -use async_graphql::{Context, Object, Result}; +use async_graphql::{Context, Error, Object, Result}; use sea_orm::entity::prelude::*; use super::{metadata_json::MetadataJson, CollectionMint, Drop, Holder}; @@ -21,16 +21,12 @@ pub struct Collection { pub id: Uuid, /// The blockchain of the collection. pub blockchain: Blockchain, - /// The total supply of the collection. Setting to `null` implies unlimited minting. - pub supply: Option, /// The creation status of the collection. When the collection is in a `CREATED` status you can mint NFTs from the collection. pub creation_status: CreationStatus, /// The blockchain address of the collection used to view it in blockchain explorers. /// On Solana this is the mint address. /// On EVM chains it is the concatenation of the contract address and the token id `{contractAddress}:{tokenId}`. pub address: Option, - /// The current number of NFTs minted from the collection. - pub total_mints: i64, /// The transaction signature of the collection. pub signature: Option, /// The royalties assigned to mints belonging to the collection expressed in basis points. @@ -57,8 +53,18 @@ impl Collection { self.blockchain } /// The total supply of the collection. Setting to `null` implies unlimited minting. - async fn supply(&self) -> Option { - self.supply + async fn supply(&self, ctx: &Context<'_>) -> Result> { + let AppContext { + collection_supply_loader, + .. + } = ctx.data::()?; + + let supply = collection_supply_loader + .load_one(self.id) + .await? + .ok_or(Error::new("Unable to find collection supply"))?; + + Ok(supply) } /// The creation status of the collection. When the collection is in a `CREATED` status you can mint NFTs from the collection. @@ -91,11 +97,6 @@ impl Collection { self.address.clone() } - /// The current number of NFTs minted from the collection. - async fn total_mints(&self) -> i64 { - self.total_mints - } - /// The transaction signature of the collection. async fn signature(&self) -> Option { self.signature.clone() @@ -176,6 +177,21 @@ impl Collection { collection_drop_loader.load_one(self.id).await } + + /// The current number of NFTs minted from the collection. + async fn total_mints(&self, ctx: &Context<'_>) -> Result { + let AppContext { + collection_total_mints_loader, + .. + } = ctx.data::()?; + + let total_mints = collection_total_mints_loader + .load_one(self.id) + .await? + .ok_or(Error::new("Unable to find collection total mints"))?; + + Ok(total_mints) + } } impl From for Collection { @@ -183,9 +199,7 @@ impl From for Collection { Model { id, blockchain, - supply, creation_status, - total_mints, signature, seller_fee_basis_points, address, @@ -193,15 +207,14 @@ impl From for Collection { credits_deduction_id, created_at, created_by, + .. }: Model, ) -> Self { Self { id, blockchain, - supply, creation_status, address, - total_mints, signature, seller_fee_basis_points, project_id, diff --git a/api/src/objects/drop.rs b/api/src/objects/drop.rs index 500aed5..d4106fb 100644 --- a/api/src/objects/drop.rs +++ b/api/src/objects/drop.rs @@ -5,102 +5,121 @@ use sea_orm::entity::prelude::*; use super::{Collection, CollectionMint}; use crate::{ entities::{ - collections, drops, mint_histories, + drops, mint_histories, sea_orm_active_enums::{CreationStatus, DropType}, }, AppContext, }; + /// An NFT campaign that controls the minting rules for a collection, such as its start date and end date. #[derive(Clone, Debug)] pub struct Drop { - pub drop: drops::Model, - pub collection: collections::Model, -} - -impl Drop { - #[must_use] - pub fn new(drop: drops::Model, collection: collections::Model) -> Self { - Self { drop, collection } - } + pub id: Uuid, + pub drop_type: DropType, + pub project_id: Uuid, + pub collection_id: Uuid, + pub creation_status: CreationStatus, + pub start_time: Option, + pub end_time: Option, + pub price: i64, + pub created_by: Uuid, + pub created_at: DateTimeWithTimeZone, + pub paused_at: Option, + pub shutdown_at: Option, } #[Object] impl Drop { /// The unique identifier for the drop. async fn id(&self) -> Uuid { - self.drop.id + self.id } // The type of the drop. async fn drop_type(&self) -> DropType { - self.drop.drop_type + self.drop_type } /// The identifier of the project to which the drop is associated. async fn project_id(&self) -> Uuid { - self.drop.project_id + self.project_id } /// The creation status of the drop. async fn creation_status(&self) -> CreationStatus { - self.drop.creation_status + self.creation_status } /// The date and time in UTC when the drop is eligible for minting. A value of `null` means the drop can be minted immediately. async fn start_time(&self) -> Option { - self.drop.start_time + self.start_time } /// The end date and time in UTC for the drop. A value of `null` means the drop does not end until it is fully minted. async fn end_time(&self) -> Option { - self.drop.end_time + self.end_time } /// The cost to mint the drop in US dollars. When purchasing with crypto the user will be charged at the current conversion rate for the blockchain's native coin at the time of minting. async fn price(&self) -> i64 { - self.drop.price + self.price } /// The user id of the person who created the drop. async fn created_by_id(&self) -> Uuid { - self.drop.created_by + self.created_by } /// The date and time in UTC when the drop was created. async fn created_at(&self) -> DateTimeWithTimeZone { - self.drop.created_at + self.created_at } // The paused_at field represents the date and time in UTC when the drop was paused. // If it is null, the drop is currently not paused. async fn paused_at(&self) -> Option { - self.drop.paused_at + self.paused_at } /// The shutdown_at field represents the date and time in UTC when the drop was shutdown /// If it is null, the drop is currently not shutdown async fn shutdown_at(&self) -> Option { - self.drop.shutdown_at + self.shutdown_at } /// The collection for which the drop is managing mints. - async fn collection(&self) -> Result { - Ok(self.collection.clone().into()) + async fn collection(&self, ctx: &Context<'_>) -> Result> { + let AppContext { + collection_loader, .. + } = ctx.data::()?; + + collection_loader.load_one(self.collection_id).await } /// The current status of the drop. - async fn status(&self) -> Result { - let now = Utc::now(); - let scheduled = self.drop.start_time.map(|start_time| now < start_time); - let expired = self.drop.end_time.map(|end_time| now > end_time); - let paused_at = self.drop.paused_at; - let shutdown_at = self.drop.shutdown_at; + async fn status(&self, ctx: &Context<'_>) -> Result { + let AppContext { + collection_total_mints_loader, + collection_supply_loader, + .. + } = ctx.data::()?; - let total_mints = self.collection.total_mints; - let minted = self - .collection - .supply - .map(|supply| supply == total_mints && total_mints > 0); + let now = Utc::now(); + let scheduled = self.start_time.map(|start_time| now < start_time); + let expired = self.end_time.map(|end_time| now > end_time); + let paused_at = self.paused_at; + let shutdown_at = self.shutdown_at; + + let total_mints = collection_total_mints_loader + .load_one(self.collection_id) + .await? + .ok_or(Error::new("Unable to find collection total mints"))?; + let supply = collection_supply_loader + .load_one(self.collection_id) + .await? + .ok_or(Error::new("Unable to find collection supply"))?; + + let minted = supply.map(|supply| supply == total_mints && total_mints > 0); match ( scheduled, @@ -108,7 +127,7 @@ impl Drop { minted, paused_at, shutdown_at, - self.drop.creation_status, + self.creation_status, ) { (_, _, _, Some(_), ..) => Ok(DropStatus::Paused), (_, _, _, _, Some(_), _) => Ok(DropStatus::Shutdown), @@ -130,7 +149,9 @@ impl Drop { (_, _, Some(false), ..) | (_, _, None, _, _, CreationStatus::Created) => { Ok(DropStatus::Minting) }, - (_, _, _, _, _, CreationStatus::Queued) => Err(Error::new("Invalid Drop Status")), + (_, _, _, _, _, CreationStatus::Queued) => { + Err(Error::new("Unable to calculate drop status")) + }, } } @@ -140,7 +161,7 @@ impl Drop { .. } = ctx.data::()?; - queued_mints_loader.load_one(self.drop.id).await + queued_mints_loader.load_one(self.id).await } #[graphql(deprecation = "Use `mint_histories` under `Collection` Object instead.")] @@ -151,7 +172,42 @@ impl Drop { .. } = ctx.data::()?; - drop_mint_history_loader.load_one(self.drop.id).await + drop_mint_history_loader.load_one(self.id).await + } +} + +impl From for Drop { + fn from( + drops::Model { + id, + drop_type, + project_id, + collection_id, + creation_status, + start_time, + end_time, + price, + created_by, + created_at, + paused_at, + shutdown_at, + .. + }: drops::Model, + ) -> Self { + Self { + id, + drop_type, + project_id, + collection_id, + creation_status, + start_time, + end_time, + price, + created_by, + created_at, + paused_at, + shutdown_at, + } } } diff --git a/api/src/objects/project.rs b/api/src/objects/project.rs index c0d3715..8925dc3 100644 --- a/api/src/objects/project.rs +++ b/api/src/objects/project.rs @@ -32,7 +32,7 @@ impl Project { let drop = drop_loader.load_one(id).await?; if let Some(drop) = drop { - if drop.drop.project_id == self.id { + if drop.project_id == self.id { return Ok(Some(drop)); }