diff --git a/Cargo.lock b/Cargo.lock index 17ca562be0..9d7a6922bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1800,6 +1800,7 @@ dependencies = [ "clap", "contracts", "cow-amm", + "database", "derive_more 1.0.0", "ethabi", "ethcontract", @@ -1835,6 +1836,7 @@ dependencies = [ "shared", "solver", "solvers-dto", + "sqlx", "tap", "tempfile", "thiserror", diff --git a/crates/database/src/lib.rs b/crates/database/src/lib.rs index ec30cd557d..3df8147cdb 100644 --- a/crates/database/src/lib.rs +++ b/crates/database/src/lib.rs @@ -72,6 +72,7 @@ pub const TABLES: &[&str] = &[ "auction_participants", "app_data", "jit_orders", + "bad_tokens" ]; /// The names of potentially big volume tables we use in the db. diff --git a/crates/driver/Cargo.toml b/crates/driver/Cargo.toml index 35dd748162..0be45d0691 100644 --- a/crates/driver/Cargo.toml +++ b/crates/driver/Cargo.toml @@ -24,6 +24,7 @@ axum = { workspace = true } bigdecimal = { workspace = true } chrono = { workspace = true, features = ["clock"], default-features = false } cow-amm = { path = "../cow-amm" } +database = { path = "../database" } derive_more = { workspace = true } ethabi = "18.0" ethereum-types = { workspace = true } @@ -47,6 +48,7 @@ reqwest = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } serde_with = { workspace = true } +sqlx = { workspace = true } tap = "1.0.1" thiserror = { workspace = true } tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal", "time"] } diff --git a/crates/driver/src/domain/competition/auction.rs b/crates/driver/src/domain/competition/auction.rs index 59cddff203..6bb2804d13 100644 --- a/crates/driver/src/domain/competition/auction.rs +++ b/crates/driver/src/domain/competition/auction.rs @@ -505,6 +505,10 @@ impl Tokens { pub fn iter(&self) -> impl Iterator { self.0.values() } + + pub fn iter_keys(&self) -> impl Iterator { + self.0.keys() + } } #[derive(Debug, Clone)] diff --git a/crates/driver/src/domain/competition/mod.rs b/crates/driver/src/domain/competition/mod.rs index dd3aa06073..cd7f42a646 100644 --- a/crates/driver/src/domain/competition/mod.rs +++ b/crates/driver/src/domain/competition/mod.rs @@ -7,12 +7,14 @@ use { crate::{ domain::{competition::solution::Settlement, eth}, infra::{ - self, - blockchain::Ethereum, - notify, - observe, - simulator::{RevertError, SimulatorError}, - solver::{self, SolutionMerging, Solver}, + self, + bad_token::BadTokenMonitor, + blockchain::Ethereum, + database::Postgres, + notify, + observe, + simulator::{RevertError, SimulatorError}, + solver::{self, SolutionMerging, Solver}, Simulator, }, util::Bytes, @@ -49,6 +51,7 @@ pub struct Competition { pub eth: Ethereum, pub liquidity: infra::liquidity::Fetcher, pub simulator: Simulator, + pub bad_token_monitor: BadTokenMonitor, pub mempools: Mempools, /// Cached solutions with the most recent solutions at the front. pub settlements: Mutex>, @@ -56,7 +59,7 @@ pub struct Competition { impl Competition { /// Solve an auction as part of this competition. - pub async fn solve(&self, auction: &Auction) -> Result, Error> { + pub async fn solve(&self, auction: &Auction, db: &Postgres) -> Result, Error> { let liquidity = match self.solver.liquidity() { solver::Liquidity::Fetch => { self.liquidity @@ -257,6 +260,8 @@ impl Competition { &infra::simulator::Error::Revert(err), true, ); + + self.bad_token_monitor.consolidate(&db, auction.tokens().iter_keys()); return; } } diff --git a/crates/driver/src/infra/api/mod.rs b/crates/driver/src/infra/api/mod.rs index 9517b858d1..3b26ccc8b0 100644 --- a/crates/driver/src/infra/api/mod.rs +++ b/crates/driver/src/infra/api/mod.rs @@ -1,19 +1,21 @@ use { + super::database::Postgres, crate::{ domain::{self, Mempools}, infra::{ - self, - config::file::OrderPriorityStrategy, - liquidity, - solver::{Solver, Timeouts}, - tokens, - Ethereum, + self, + bad_token::BadTokenMonitor, + config::file::OrderPriorityStrategy, + liquidity, + solver::{Solver, Timeouts}, + tokens, + Ethereum, Simulator, }, - }, - error::Error, - futures::Future, - std::{net::SocketAddr, sync::Arc}, + }, + error::Error, + futures::Future, + std::{net::SocketAddr, sync::Arc}, tokio::sync::oneshot, }; @@ -26,6 +28,7 @@ pub struct Api { pub solvers: Vec, pub liquidity: liquidity::Fetcher, pub simulator: Simulator, + pub db: Postgres, pub eth: Ethereum, pub mempools: Mempools, pub addr: SocketAddr, @@ -57,6 +60,8 @@ impl Api { app = routes::metrics(app); app = routes::healthz(app); + let mut bad_token_monitors = BadTokenMonitor::collect_from_path("./src/infra/bad_token/configs", &solvers); + // Multiplex each solver as part of the API. Multiple solvers are multiplexed // on the same driver so only one liquidity collector collects the liquidity // for all of them. This is important because liquidity collection is @@ -70,15 +75,21 @@ impl Api { let router = routes::reveal(router); let router = routes::settle(router); let router = router.with_state(State(Arc::new(Inner { + db: self.db.clone(), eth: self.eth.clone(), solver: solver.clone(), competition: domain::Competition { - solver, eth: self.eth.clone(), liquidity: self.liquidity.clone(), + bad_token_monitor: { + let mut monitor = bad_token_monitors.remove(&solver.address()).unwrap(); + monitor.initialize(&self.db); + monitor + }, simulator: self.simulator.clone(), mempools: self.mempools.clone(), settlements: Default::default(), + solver, }, liquidity: self.liquidity.clone(), tokens: tokens.clone(), @@ -128,6 +139,10 @@ impl State { &self.0.tokens } + fn database(&self) -> &Postgres { + &self.0.db + } + fn pre_processor(&self) -> &domain::competition::AuctionProcessor { &self.0.pre_processor } @@ -138,6 +153,7 @@ impl State { } struct Inner { + db: Postgres, eth: Ethereum, solver: Solver, competition: domain::Competition, diff --git a/crates/driver/src/infra/api/routes/solve/mod.rs b/crates/driver/src/infra/api/routes/solve/mod.rs index eccafb8ead..cb0a868534 100644 --- a/crates/driver/src/infra/api/routes/solve/mod.rs +++ b/crates/driver/src/infra/api/routes/solve/mod.rs @@ -36,7 +36,7 @@ async fn route( .pre_processor() .prioritize(auction, &competition.solver.account().address()) .await; - let result = competition.solve(&auction).await; + let result = competition.solve(&auction, state.database()).await; observe::solved(state.solver().name(), &result); Ok(axum::Json(dto::Solved::new(result?, &competition.solver))) }; diff --git a/crates/driver/src/infra/bad_token/configs/sample.toml b/crates/driver/src/infra/bad_token/configs/sample.toml new file mode 100644 index 0000000000..c6a8ef84eb --- /dev/null +++ b/crates/driver/src/infra/bad_token/configs/sample.toml @@ -0,0 +1,20 @@ +solver = "0x1234567890abcdef1234567890abcdef12345678" +timespan = 3 + + +[[tokens.supported]] +address = "0xabcdefabcdefabcdefabcdefabcdefabcd" + +[[tokens.supported]] +address = "0x1234123412341234123412341234123412341234" + +[[tokens.unsupported]] +address = "0x5678567856785678567856785678567856785678" + +[[tokens.unsupported]] +address = "0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef" + +[heuristic.ThresholdBased] +threshold = 10 + +mode = "LogOnly" \ No newline at end of file diff --git a/crates/driver/src/infra/bad_token/mod.rs b/crates/driver/src/infra/bad_token/mod.rs new file mode 100644 index 0000000000..0a96a7886c --- /dev/null +++ b/crates/driver/src/infra/bad_token/mod.rs @@ -0,0 +1,354 @@ +use std::{collections::{BTreeSet, HashMap, HashSet}, default, fs::{self, File}, io::BufReader}; + +use anyhow::{Context, Ok}; +use itertools::Itertools; +use serde::{Deserialize, Serialize}; +use sqlx::{prelude::FromRow, PgConnection}; +use toml::Value as TomlValue; + +use crate::domain::eth::{Address, TokenAddress}; + +use super::{database::{bad_tokens::{cleanup, insert, load_deny_list_for_solver, load_token_list_for_solver}, Postgres}, Solver}; + +#[derive(Debug, Deserialize)] +pub struct Config { + solver: Address, + supported_tokens: Vec, + unsupported_tokens: Vec, + timespan: u32, // needs to be discussed + heuristic: Heuristic, + mode: Mode, +} + +impl Config { + pub fn supported_tokens(&self) -> Vec { + self.supported_tokens.clone() + } + + pub fn unsupported_tokens(&self) -> Vec { + self.unsupported_tokens.clone() + } + + pub fn mode(&self) -> Mode { + self.mode.to_owned() + } + + pub fn timespan(&self) -> u32 { + self.timespan + } +} + +pub struct ConfigFile { + solver: Option, + tokens: Option, + timespan: Option, + heuristic: Option, + mode: Option, +} + +impl Default for Config { + fn default() -> Self { + Self { + supported_tokens: Vec::new(), + unsupported_tokens: Vec::new(), + timespan: 1, + ..Default::default() + } + } +} + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub enum Heuristic { + ThresholdBased(Threshold), +} + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub enum HeuristicState { + ThresholdBased(ThresholdState) +} + +impl HeuristicState { + pub fn default(heuristic: Heuristic) -> Self { + match heuristic { + Heuristic::ThresholdBased(threshold) => { + Self::ThresholdBased(ThresholdState::default()) + }, + } + } +} + +#[derive(Debug, Deserialize, Serialize, Default)] +pub struct ThresholdState { + count: u32, +} + +impl Default for Heuristic { + fn default() -> Self { + Self::ThresholdBased(Threshold::default()) + } +} + +#[derive(Debug, Deserialize)] +pub struct Threshold { + threshold: u32, +} + +impl From for Threshold { + fn from(value: u32) -> Self { + Self { + threshold: value, + } + } +} + +impl Default for Threshold { + fn default() -> Self { + Self { threshold: 10u32} + } +} + +#[derive(Debug, Deserialize, Clone)] +pub enum Mode { + LogOnly, + Enabled, +} + +impl Default for Mode { + fn default() -> Self { + Self::LogOnly + } +} + +#[derive(Debug, Default, Deserialize)] +pub struct BadTokenMonitor { + config: Config, + deny_list: HashSet, + explicit_deny_list: BTreeSet, + explicit_allow_list: BTreeSet, + heuristic_state: HashMap +} + +impl BadTokenMonitor { + pub fn solver(&self) -> Address { + self.config.solver.clone() + } + + pub fn heuristic(&self) -> Heuristic { + self.config.heuristic.clone() + } + + pub fn timespan(&self) -> u32 { + self.config.timespan() + } + + pub fn deny_list(&self) -> Vec { + self.deny_list.clone().into_iter().collect_vec() + } +} + +impl BadTokenMonitor { + pub fn from_config(config: Config) -> Self { + Self { + config, + deny_list: HashSet::new(), + explicit_deny_list: BTreeSet::from_iter(config.unsupported_tokens().into_iter()), + explicit_allow_list: BTreeSet::from_iter(config.supported_tokens().into_iter()), + heuristic_state: HashMap::new(), + } + } + + pub fn new(solver: Address) -> Self { + Self::default() + } + + pub fn from_file_path(file_path: &str) -> Result { + let file = fs::read_to_string(file_path) + .map_err(|err| format!("Unable to read file {}\n{:?}", file_path, e))?; + + let config_file: ConfigFile = match toml::from_str(&file) { + Ok(s) => s, + Err(e) => { + return Err(format!("Config file malformatted {}", e)); + } + }; + + Self::from_config_file(config_file) + } + + pub fn from_config_file(config_file: ConfigFile) -> Result { + let solver = config_file.solver.map(|address| address.as_bytes().clone().into()) + .ok_or(format!("no solver associated with this config"))?; + let supported_tokens = Vec::new(); + let unsupported_tokens = Vec::new(); + let timespan = config_file.timespan.unwrap_or(1u32); + + if let Some(table) = config_file.tokens { + if let Some(TomlValue::Array(tokens)) = table.unsupported { + for token in tokens { + address = token.address.map(|address| address.as_bytes().clone().into()) + .ok_or(format!("address is not correct \n{}", err))?; + unsupported_tokens.push(address); + } + } + + if let Some(TomlValue::Array(tokens)) = table.supported { + for token in tokens { + address = token.address.map(|address| address.as_bytes().clone().into()) + .ok_or(format!("address {} is not correct \n{}", address, err))?; + supported_tokens.push(address); + } + } + } + + let heuristic = match &config_file.heuristic { + Some(TomlValue::Table(table)) => { + if let Some(params) = table.get("ThresholdBased") { + Heuristic::ThresholdBased(params.threshold.into()) + } else { + Heuristic::default() + } + }, + _ => Heuristic::default(), + }; + + let mode = match &config_file.mode { + Some(mode) if "LogOnly" == mode.as_str() => Mode::LogOnly, + Some(mode) if "Enabled" == mode.as_str() => Mode::Enabled, + _ => return Err(format!("unsupported mode")), + }; + + let config = Config { + solver, + supported_tokens, + unsupported_tokens, + timespan, + heuristic, + mode, + }; + + Ok(Self::from_config(config)) + } + + pub fn collect_from_path(file_path: &str, solvers: &Vec) -> HashMap { + let paths = fs::read_dir(file_path).unwrap(); + + let mut bad_token_monitors = HashMap::new(); + for path in paths { + if let Ok(file) = path { + let monitor = match Self::from_file_path(file.path()) { + Ok(monitor) => monitor, + Err(_) => continue, + }; + + bad_token_monitors.insert(monitor.solver(), monitor); + } + } + + let result = HashMap::new(); + + for solver in solvers { + match bad_token_monitors.remove(&solver.address()) { + Some(monitor) => result.insert(solver.address(), monitor), + None => result.insert(solver.address(), BadTokenMonitor::new(solver.address())), + } + } + + result + } + + pub async fn consolidate(&mut self, db: &Postgres, tokens: impl Iterator) -> Result<(), sqlx::Error> { + if Mode::LogOnly = self.config.mode() { + format!("implement logging"); + return; + } + + let mut tokens_to_insert = Vec::new(); + let default_heuristic_state = HeuristicState::default(self.heuristic()); + for token in tokens { + if self.explicit_allow_list.contains(token) { + continue; + } + + if self.explicit_deny_list.contains(token) { + continue; + } + + let mut heuristic_state = self + .heuristic_state + .get(token) + .map(|state| state.to_owned()) + .unwrap_or(default_heuristic_state); + + match (self.heuristic(), &mut heuristic_state) { + ( + Heuristic::ThresholdBased(threshold), + HeuristicState::ThresholdBased(threshold_state) + ) => { + if threshold.threshold <= threshold_state.count + 1 { + self.deny_list.insert(token.to_owned()) + } + + threshold_state.count += 1; + }, + + (_, _) => format!("unimplimented"), + } + + self.heuristic_state.insert(token.to_owned(), heuristic_state.to_owned()); + tokens_to_insert.push((token.to_owned(), heuristic_state)); + } + + let mut ex = db.pool.begin().await.context("begin")?; + insert(&mut ex, self.solver(), tokens_to_insert).await; + ex.commit().await.context("commit"); + + Ok(()) + } + + pub async fn initialize(&mut self, db: &Postgres) -> Result<(), sqlx::Error> { + if Mode::LogOnly = self.config.mode() { + format!("implement logging"); + return; + } + + let mut ex = db.pool.begin().await.context("begin")?; + let heuristic_states = load_token_list_for_solver(&mut ex, &self.solver()).await; + ex.commit().await.context("commit"); + + let deny_list = HashSet::new(); + for (token, state) in heuristic_states { + match (self.heuristic(), state) { + ( + Heuristic::ThresholdBased(threshold), + HeuristicState::ThresholdBased(threshold_state) + ) => { + if threshold.threshold <= threshold_state.count { + deny_list.insert(token) + } + }, + + (_, _) => format!("unimplimented"), + } + } + + self.deny_list = deny_list; + self.heuristic_state = heuristic_states; + + Ok(()) + } + + async fn periodic_cleanup(&mut self, db: &Postgres) -> Result<(), sqlx::Error> { + let mut ex = db.pool.begin().await.context("begin")?; + let response = cleanup(&mut ex, &self.solver(), self.timespan(), self.deny_list()).await; + ex.commit().await.context("commit"); + + if let Ok(tokens) = response { + for token in tokens { + self.deny_list.remove(&token) + } + } + + // should the heuristic state be reseted?? + Ok(()) + } +} diff --git a/crates/driver/src/infra/cli.rs b/crates/driver/src/infra/cli.rs index 9159f990f7..ced7c6e976 100644 --- a/crates/driver/src/infra/cli.rs +++ b/crates/driver/src/infra/cli.rs @@ -21,6 +21,11 @@ pub struct Args { #[clap(long, env)] pub ethrpc: Url, + /// Url of the Postgres database. By default connects to locally running + /// postgres. + #[clap(long, env, default_value = "postgresql://")] + pub db_url: Url, + /// Path to the driver configuration file. This file should be in TOML /// format. For an example see /// https://github.com/cowprotocol/services/blob/main/crates/driver/example.toml. diff --git a/crates/driver/src/infra/database/bad_tokens.rs b/crates/driver/src/infra/database/bad_tokens.rs new file mode 100644 index 0000000000..e9e82cee76 --- /dev/null +++ b/crates/driver/src/infra/database/bad_tokens.rs @@ -0,0 +1,149 @@ +use std::collections::{BTreeMap, HashMap}; + +use database::bad_tokens::{BadTokenIn, BadTokenOut}; +use ethcontract::H160; +use itertools::Itertools; +use num::range; +use serde::{Deserialize, Serialize}; +use sqlx::{types::Json, PgConnection, QueryBuilder}; + +use crate::{domain::eth::{Address, ContractAddress, TokenAddress}, infra::bad_token::{Heuristic, HeuristicState}}; + +pub async fn insert(ex: &mut PgConnection, solver: Address, tokens: Vec<(TokenAddress, HeuristicState)>) { + let mut bad_tokens_to_insert = Vec::new(); + for token in tokens { + bad_tokens_to_insert.push(BadToken{ + solver: database::Address(solver.0.0), + token: database::Address(token.0.0.0.0), + heuristic_state: Json(token.1), + }); + } + insert_to_db(ex, &bad_tokens_to_insert[..]).await; +} + +pub async fn load(ex: &mut PgConnection) -> sqlx::Result> { + fetch_all(ex).await +} + +pub async fn load_token_list_for_solver(ex: &mut PgConnection, solver: &Address) -> HashMap { + let mut tokens = HashMap::new(); + + if let Ok(token_list) = load_token_list(ex, database::Address(solver.0.0)).await { + for token in token_list { + tokens.insert(TokenAddress(ContractAddress(H160(token.0.0))), token.1); + } + } + + tokens +} + +pub async fn cleanup(ex: &mut PgConnection, solver: &Address, timespan: u32, tokens: Vec) -> sqlx::Result>{ + let response = rejuvenate_bad_tokens( + ex, + database::Address(solver.0.0), + timespan, + tokens + .into_iter() + .map(|x| database::Address(x.0.0)) + .collect_vec() + .as_slice() + ).await? + .into_iter() + .map(|x| TokenAddress(ContractAddress(H160(x.0)))) + .collect_vec(); + + response +} + + +#[allow(dead_code)] +#[derive(sqlx::FromRow, Debug, Serialize, Deserialize)] +pub struct BadToken { + pub solver: database::Address, + pub token: database::Address, + pub heuristic_state: Json +} + + +pub async fn fetch_all(ex: &mut PgConnection) -> Result, sqlx::Error> { + const QUERY: &str = r#" +SELECT solver, token, heuristic +FROM bad_tokens; +"#; + + sqlx::query_as(QUERY).fetch_all(ex).await +} + +pub async fn load_token_list( + ex: &mut PgConnection, + solver: database::Address +) -> sqlx::Result, HeuristicState>> { + const QUERY: &str = r#" +SELECT token, heuristic_state +FROM bad_tokens +where solver = $1; +"#; + + #[derive(sqlx::FromRow, Debug, Serialize, Deserialize)] + struct Row { + token: database::Address, + heuristic_state: Json + } + + let rows: Vec = sqlx::query_as(QUERY).bind(solver).fetch_all(ex).await?; + + let response = BTreeMap::new(); + for row in rows { + response.insert(row.token, row.heuristic_state.0) + } + + response +} + +pub async fn insert_to_db(ex: &mut PgConnection, tokens: &[BadToken]) -> sqlx::Result<()> { + const QUERY: &str = r#" +INSERT INTO bad_tokens (solver, token, heuristic_state, time_stamp) +"#; + let mut query_builder = QueryBuilder::new(QUERY); + query_builder.push_values(tokens, |mut builder, token| { + builder.push_bind(token.solver) + .push_bind(token.token) + .push_bind(token.heuristic_state) + .push("NOW()"); + }); + + query_builder.push(" ON CONFLICT (solver, token) DO UPDATE SET heuristic_state = EXCLUDED.heuristic_state, time_stamp = NOW()"); + + query_builder.build().execute(ex).await?; + + Ok(()) +} + +pub async fn rejuvenate_bad_tokens( + ex: &mut PgConnection, + solver: database::Address, + timespan: u32, + tokens: &[database::Address] +) -> sqlx::Result> { + let placeholder_vector = tokens + .iter() + .enumerate() + .map(|(i, _)| format!("${}", i + 3)) + .collect::>() + .join(", "); + + let query = format!(" +DELETE FROM bad_tokens +where solver = $1 +AND time_stamp < NOW() - INTERVAL '1 day' * $2 +AND token in ({}) +RETURNING token;", placeholders); + + let sql_query = sqlx::query_as(&query).bind(solver).bind(timespan); + + for token in tokens { + sql_query = sql_query.bind(*token) + } + + sql_query.fetch_all(ex).await? +} \ No newline at end of file diff --git a/crates/driver/src/infra/database/mod.rs b/crates/driver/src/infra/database/mod.rs new file mode 100644 index 0000000000..399d8324aa --- /dev/null +++ b/crates/driver/src/infra/database/mod.rs @@ -0,0 +1,16 @@ +use sqlx::PgPool; + +pub mod bad_tokens; + +#[derive(Clone)] +pub struct Postgres { + pub pool: PgPool, +} + +impl Postgres { + pub async fn new(url: &str) -> sqlx::Result { + Ok(Self { + pool: PgPool::connect(url).await?, + }) + } +} diff --git a/crates/driver/src/infra/mod.rs b/crates/driver/src/infra/mod.rs index b33526f545..34a6074f13 100644 --- a/crates/driver/src/infra/mod.rs +++ b/crates/driver/src/infra/mod.rs @@ -11,6 +11,8 @@ pub mod simulator; pub mod solver; pub mod time; pub mod tokens; +pub mod bad_token; +pub mod database; pub use { self::solver::Solver, diff --git a/crates/driver/src/run.rs b/crates/driver/src/run.rs index 68dd25b9d5..ea2821fcd8 100644 --- a/crates/driver/src/run.rs +++ b/crates/driver/src/run.rs @@ -2,13 +2,14 @@ use { crate::{ domain::Mempools, infra::{ - self, - blockchain::{self, Ethereum}, - cli, - config, - liquidity, - simulator::{self, Simulator}, - solver::Solver, + self, + blockchain::{self, Ethereum}, + cli, + config, + database::Postgres, + liquidity, + simulator::{self, Simulator}, + solver::Solver, Api, }, }, @@ -49,6 +50,10 @@ async fn run_with(args: cli::Args, addr_sender: Option