Skip to content

Commit

Permalink
Store fee policy (#2118)
Browse files Browse the repository at this point in the history
# Description
Fixes #2111

Defines the table for storing fee policies per `<auction_id,
order_uid>`.

Currently supports taking fees as:
1. cut from a surplus - the price difference between the executed price
and limit_price. Optionally, for this type, we also have a CAP on max
volume (expressed in the same way as (2)
2. percent of volume (or more precise, percent of the `executed_amount`,
so refers to sell token for sell orders and buy token for buy orders).

Will be merged at the very end of feature implementation, since db
migrations are irreversible.

# Changes
<!-- List of detailed changes (how the change is accomplished) -->

- [x] Created table for fee policies
- [x] Implemented basic database function to read/write

## How to test
Roundtrip UT

---------

Co-authored-by: ilya <[email protected]>
Co-authored-by: Martin Beckmann <[email protected]>
  • Loading branch information
3 people authored Jan 12, 2024
1 parent 5122abe commit 398351d
Show file tree
Hide file tree
Showing 16 changed files with 286 additions and 31 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/pull-request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ jobs:
- uses: Swatinem/rust-cache@v2
# Start the build process in the background. The following cargo test command will automatically
# wait for the build process to be done before proceeding.
- run: cargo build -p orderbook -p database --tests &
- run: cargo build -p orderbook -p database -p autopilot --tests &
- uses: taiki-e/install-action@nextest
- uses: yu-ichiro/spin-up-docker-compose-action@v1
with:
file: docker-compose.yaml
up-opts: -d db migrations
- run: cargo nextest run postgres -p orderbook -p database --test-threads 1 --run-ignored ignored-only
- run: cargo nextest run postgres -p orderbook -p database -p autopilot --test-threads 1 --run-ignored ignored-only

test-local-node:
timeout-minutes: 60
Expand Down
10 changes: 3 additions & 7 deletions crates/autopilot/src/arguments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub struct Arguments {

/// The number of order events to insert in a single batch.
#[clap(long, env, default_value = "500")]
pub order_events_insert_batch_size: NonZeroUsize,
pub insert_batch_size: NonZeroUsize,

/// Skip syncing past events (useful for local deployments)
#[clap(long, env, action = clap::ArgAction::Set, default_value = "false")]
Expand Down Expand Up @@ -254,7 +254,7 @@ impl std::fmt::Display for Arguments {
order_events_cleanup_interval,
order_events_cleanup_threshold,
db_url,
order_events_insert_batch_size,
insert_batch_size,
native_price_estimation_results_required,
auction_update_interval,
max_settlement_transaction_wait,
Expand Down Expand Up @@ -322,11 +322,7 @@ impl std::fmt::Display for Arguments {
"order_events_cleanup_threshold: {:?}",
order_events_cleanup_threshold
)?;
writeln!(
f,
"order_events_insert_batch_size: {}",
order_events_insert_batch_size
)?;
writeln!(f, "insert_batch_size: {}", insert_batch_size)?;
writeln!(
f,
"native_price_estimation_results_required: {}",
Expand Down
2 changes: 1 addition & 1 deletion crates/autopilot/src/boundary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pub use {
competition::Competition,
order_events::{store_order_events, OrderEventLabel},
},
database::orders::Quote as DatabaseQuote,
database,
model::{
app_data::AppDataHash,
interaction::InteractionData,
Expand Down
24 changes: 10 additions & 14 deletions crates/autopilot/src/database.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
use {
sqlx::{Executor, PgConnection, PgPool},
std::{num::NonZeroUsize, time::Duration},
tracing::Instrument,
};

mod auction;
pub mod auction_prices;
pub mod auction_transaction;
pub mod competition;
pub mod ethflow_events;
mod events;
pub mod fee_policies;
pub mod on_settlement_event_updater;
pub mod onchain_order_events;
pub mod order_events;
pub mod orders;
mod quotes;
pub mod recent_settlements;

use {
sqlx::{Executor, PgConnection, PgPool},
std::{num::NonZeroUsize, time::Duration},
tracing::Instrument,
};

#[derive(Debug, Clone)]
pub struct Config {
pub order_events_insert_batch_size: NonZeroUsize,
pub insert_batch_size: NonZeroUsize,
}

#[derive(Debug, Clone)]
Expand All @@ -29,15 +30,10 @@ pub struct Postgres {
}

impl Postgres {
pub async fn new(
url: &str,
order_events_insert_batch_size: NonZeroUsize,
) -> sqlx::Result<Self> {
pub async fn new(url: &str, insert_batch_size: NonZeroUsize) -> sqlx::Result<Self> {
Ok(Self {
pool: PgPool::connect(url).await?,
config: Config {
order_events_insert_batch_size,
},
config: Config { insert_batch_size },
})
}

Expand Down
102 changes: 102 additions & 0 deletions crates/autopilot/src/database/fee_policies.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
use {
crate::infra::persistence::dto,
sqlx::{PgConnection, QueryBuilder},
};

pub async fn insert_batch(
ex: &mut PgConnection,
fee_policies: impl IntoIterator<Item = dto::FeePolicy>,
) -> Result<(), sqlx::Error> {
let mut query_builder = QueryBuilder::new(
"INSERT INTO fee_policies (auction_id, order_uid, kind, surplus_factor, \
max_volume_factor, volume_factor) ",
);

query_builder.push_values(fee_policies, |mut b, fee_policy| {
b.push_bind(fee_policy.auction_id)
.push_bind(fee_policy.order_uid)
.push_bind(fee_policy.kind)
.push_bind(fee_policy.surplus_factor)
.push_bind(fee_policy.max_volume_factor)
.push_bind(fee_policy.volume_factor);
});

query_builder.build().execute(ex).await.map(|_| ())
}

pub async fn fetch(
ex: &mut PgConnection,
auction_id: dto::AuctionId,
order_uid: database::OrderUid,
) -> Result<Vec<dto::FeePolicy>, sqlx::Error> {
const QUERY: &str = r#"
SELECT * FROM fee_policies
WHERE auction_id = $1 AND order_uid = $2
ORDER BY application_order
"#;
let rows = sqlx::query_as::<_, dto::FeePolicy>(QUERY)
.bind(auction_id)
.bind(order_uid)
.fetch_all(ex)
.await?
.into_iter()
.collect();
Ok(rows)
}

#[cfg(test)]
mod tests {
use {super::*, database::byte_array::ByteArray, sqlx::Connection};

#[tokio::test]
#[ignore]
async fn postgres_roundtrip() {
let mut db = PgConnection::connect("postgresql://").await.unwrap();
let mut db = db.begin().await.unwrap();
database::clear_DANGER_(&mut db).await.unwrap();

// same primary key for all fee policies
let (auction_id, order_uid) = (1, ByteArray([1; 56]));

// surplus fee policy without caps
let fee_policy_1 = dto::FeePolicy {
auction_id,
order_uid,
kind: dto::fee_policy::FeePolicyKind::Surplus,
surplus_factor: Some(0.1),
max_volume_factor: Some(1.0),
volume_factor: None,
};
// surplus fee policy with caps
let fee_policy_2 = dto::FeePolicy {
auction_id,
order_uid,
kind: dto::fee_policy::FeePolicyKind::Surplus,
surplus_factor: Some(0.2),
max_volume_factor: Some(0.05),
volume_factor: None,
};
// volume based fee policy
let fee_policy_3 = dto::FeePolicy {
auction_id,
order_uid,
kind: dto::fee_policy::FeePolicyKind::Volume,
surplus_factor: None,
max_volume_factor: None,
volume_factor: Some(0.06),
};
insert_batch(
&mut db,
vec![
fee_policy_1.clone(),
fee_policy_2.clone(),
fee_policy_3.clone(),
],
)
.await
.unwrap();

let output = fetch(&mut db, 1, order_uid).await.unwrap();
assert_eq!(output, vec![fee_policy_1, fee_policy_2, fee_policy_3]);
}
}
2 changes: 1 addition & 1 deletion crates/autopilot/src/database/onchain_order_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1379,7 +1379,7 @@ mod test {
db: Postgres {
pool: PgPool::connect_lazy("postgresql://").unwrap(),
config: Config {
order_events_insert_batch_size: NonZeroUsize::new(500).unwrap(),
insert_batch_size: NonZeroUsize::new(500).unwrap(),
},
},
web3,
Expand Down
2 changes: 1 addition & 1 deletion crates/autopilot/src/database/order_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub async fn store_order_events(
timestamp: DateTime<Utc>,
) -> Result<()> {
let mut ex = db.pool.begin().await.context("begin transaction")?;
for chunk in events.chunks(db.config.order_events_insert_batch_size.get()) {
for chunk in events.chunks(db.config.insert_batch_size.get()) {
let batch = chunk.iter().map(|(uid, label)| OrderEvent {
order_uid: ByteArray(uid.0),
timestamp,
Expand Down
62 changes: 62 additions & 0 deletions crates/autopilot/src/infra/persistence/dto/fee_policy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use crate::{boundary, domain};

#[derive(Debug, Clone, PartialEq, sqlx::FromRow)]
pub struct FeePolicy {
pub auction_id: domain::AuctionId,
pub order_uid: boundary::database::OrderUid,
pub kind: FeePolicyKind,
pub surplus_factor: Option<f64>,
pub max_volume_factor: Option<f64>,
pub volume_factor: Option<f64>,
}

impl FeePolicy {
pub fn from_domain(
auction_id: domain::AuctionId,
order_uid: domain::OrderUid,
policy: domain::fee::Policy,
) -> Self {
match policy {
domain::fee::Policy::Surplus {
factor,
max_volume_factor,
} => Self {
auction_id,
order_uid: boundary::database::byte_array::ByteArray(order_uid.0),
kind: FeePolicyKind::Surplus,
surplus_factor: Some(factor),
max_volume_factor: Some(max_volume_factor),
volume_factor: None,
},
domain::fee::Policy::Volume { factor } => Self {
auction_id,
order_uid: boundary::database::byte_array::ByteArray(order_uid.0),
kind: FeePolicyKind::Volume,
surplus_factor: None,
max_volume_factor: None,
volume_factor: Some(factor),
},
}
}
}

impl From<FeePolicy> for domain::fee::Policy {
fn from(row: FeePolicy) -> domain::fee::Policy {
match row.kind {
FeePolicyKind::Surplus => domain::fee::Policy::Surplus {
factor: row.surplus_factor.expect("missing surplus factor"),
max_volume_factor: row.max_volume_factor.expect("missing max volume factor"),
},
FeePolicyKind::Volume => domain::fee::Policy::Volume {
factor: row.volume_factor.expect("missing volume factor"),
},
}
}
}

#[derive(Debug, Clone, PartialEq, sqlx::Type)]
#[sqlx(type_name = "PolicyKind", rename_all = "lowercase")]
pub enum FeePolicyKind {
Surplus,
Volume,
}
6 changes: 5 additions & 1 deletion crates/autopilot/src/infra/persistence/dto/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
pub mod auction;
pub mod fee_policy;
pub mod order;
pub mod quote;

pub use auction::{Auction, AuctionId, AuctionWithId};
pub use {
auction::{Auction, AuctionId, AuctionWithId},
fee_policy::FeePolicy,
};
4 changes: 3 additions & 1 deletion crates/autopilot/src/infra/persistence/dto/quote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use {
number::conversions::big_decimal_to_u256,
};

pub fn into_domain(quote: boundary::DatabaseQuote) -> Result<domain::Quote, AmountOverflow> {
pub fn into_domain(
quote: boundary::database::orders::Quote,
) -> Result<domain::Quote, AmountOverflow> {
Ok(domain::Quote {
order_uid: domain::OrderUid(quote.order_uid.0),
sell_amount: big_decimal_to_u256(&quote.sell_amount).ok_or(AmountOverflow)?,
Expand Down
27 changes: 27 additions & 0 deletions crates/autopilot/src/infra/persistence/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use {
crate::{boundary, database::Postgres, domain},
anyhow::Context,
chrono::Utc,
itertools::Itertools,
std::sync::Arc,
tokio::time::Instant,
tracing::Instrument,
Expand Down Expand Up @@ -96,6 +98,31 @@ impl Persistence {
.instrument(tracing::Span::current()),
);
}

/// Saves the given fee policies to the DB as a single batch.
pub async fn store_fee_policies(
&self,
auction_id: domain::AuctionId,
fee_policies: Vec<(domain::OrderUid, Vec<domain::fee::Policy>)>,
) -> anyhow::Result<()> {
let fee_policies = fee_policies
.into_iter()
.flat_map(|(order_uid, policies)| {
policies
.into_iter()
.map(move |policy| dto::FeePolicy::from_domain(auction_id, order_uid, policy))
})
.collect_vec();

let mut ex = self.postgres.pool.begin().await.context("begin")?;
for chunk in fee_policies.chunks(self.postgres.config.insert_batch_size.get()) {
crate::database::fee_policies::insert_batch(&mut ex, chunk.iter().cloned())
.await
.context("fee_policies::insert_batch")?;
}

ex.commit().await.context("commit")
}
}

#[derive(Debug, thiserror::Error)]
Expand Down
2 changes: 1 addition & 1 deletion crates/autopilot/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ pub async fn start(args: impl Iterator<Item = String>) {
pub async fn run(args: Arguments) {
assert!(args.shadow.is_none(), "cannot run in shadow mode");

let db = Postgres::new(args.db_url.as_str(), args.order_events_insert_batch_size)
let db = Postgres::new(args.db_url.as_str(), args.insert_batch_size)
.await
.unwrap();
crate::database::run_database_metrics_work(db.clone());
Expand Down
Loading

0 comments on commit 398351d

Please sign in to comment.