Skip to content

Commit

Permalink
Get JIT orders by owner (#2955)
Browse files Browse the repository at this point in the history
# Description
Similar to #2941, implements
[https://api.cow.fi/mainnet/api/v1/account/{owner}/orders](https://api.cow.fi/docs/#/default/get_api_v1_account__owner__orders)

# Changes
<!-- List of detailed changes (how the change is accomplished) -->

- [ ] Afterall use `JIT_ORDERS_SELECT` to fetch AND convert data
received from `jit_order` table into `orders::FullOrder`. This is needed
so I could do UNION in function `pub fn user_orders` since this function
does offsetting, limiting and ordering as part of the SQL query.
- [ ] Add already mentioned UNION to also return jit_orders.

## How to test
expanded e2e test.
  • Loading branch information
sunce86 authored Sep 8, 2024
1 parent 8f61a82 commit 067f45f
Show file tree
Hide file tree
Showing 7 changed files with 272 additions and 332 deletions.
129 changes: 37 additions & 92 deletions crates/database/src/jit_orders.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use {
crate::{
byte_array::ByteArray,
orders::{self, BuyTokenDestination, OrderKind, SellTokenSource, SigningScheme},
Address,
AppId,
Expand All @@ -17,32 +16,39 @@ use {
},
};

const JIT_ORDERS_SELECT: &str = r#"
pub const SELECT: &str = r#"
o.uid, o.owner, o.creation_timestamp, o.sell_token, o.buy_token, o.sell_amount, o.buy_amount,
o.valid_to, o.app_data, o.fee_amount, o.kind, o.partially_fillable, o.signature,
o.receiver, o.signing_scheme, o.sell_token_balance, o.buy_token_balance,
o.valid_to, o.app_data, o.fee_amount, o.fee_amount AS full_fee_amount, o.kind, o.partially_fillable, o.signature,
o.receiver, o.signing_scheme, '\x9008d19f58aabd9ed0d60971565aa8510560ab41'::bytea AS settlement_contract, o.sell_token_balance, o.buy_token_balance,
'liquidity'::OrderClass AS class,
(SELECT COALESCE(SUM(t.buy_amount), 0) FROM trades t WHERE t.order_uid = o.uid) AS sum_buy,
(SELECT COALESCE(SUM(t.sell_amount), 0) FROM trades t WHERE t.order_uid = o.uid) AS sum_sell,
(SELECT COALESCE(SUM(t.fee_amount), 0) FROM trades t WHERE t.order_uid = o.uid) AS sum_fee,
COALESCE((SELECT SUM(surplus_fee) FROM order_execution oe WHERE oe.order_uid = o.uid), 0) as executed_surplus_fee
FALSE AS invalidated,
FALSE AS presignature_pending,
ARRAY[]::record[] AS pre_interactions,
ARRAY[]::record[] AS post_interactions,
NULL AS ethflow_data,
NULL AS onchain_user,
NULL AS onchain_placement_error,
COALESCE((SELECT SUM(surplus_fee) FROM order_execution oe WHERE oe.order_uid = o.uid), 0) as executed_surplus_fee,
NULL AS full_app_data
"#;

pub const FROM: &str = "jit_orders o";

pub async fn get_by_id(
ex: &mut PgConnection,
uid: &OrderUid,
) -> Result<Option<orders::FullOrder>, sqlx::Error> {
#[rustfmt::skip]
const QUERY: &str = const_format::concatcp!(
"SELECT ",
JIT_ORDERS_SELECT,
" FROM jit_orders o",
SELECT,
" FROM ", FROM,
" WHERE o.uid = $1 ",
);
sqlx::query_as::<_, JitOrderWithExecutions>(QUERY)
.bind(uid)
.fetch_optional(ex)
.await
.map(|r| r.map(Into::into))
sqlx::query_as(QUERY).bind(uid).fetch_optional(ex).await
}

pub async fn get_by_tx(
Expand All @@ -52,9 +58,10 @@ pub async fn get_by_tx(
const QUERY: &str = const_format::concatcp!(
orders::SETTLEMENT_LOG_INDICES,
"SELECT ",
JIT_ORDERS_SELECT,
" FROM jit_orders o
JOIN trades t ON t.order_uid = o.uid",
SELECT,
" FROM ",
FROM,
" JOIN trades t ON t.order_uid = o.uid",
" WHERE
t.block_number = (SELECT block_number FROM settlement) AND
-- BETWEEN is inclusive
Expand All @@ -65,11 +72,7 @@ pub async fn get_by_tx(
WHERE ord.uid = o.uid)
",
);
sqlx::query_as::<_, JitOrderWithExecutions>(QUERY)
.bind(tx_hash)
.fetch_all(ex)
.await
.map(|r| r.into_iter().map(Into::into).collect())
sqlx::query_as(QUERY).bind(tx_hash).fetch_all(ex).await
}

/// 1:1 mapping to the `jit_orders` table, used to store orders.
Expand Down Expand Up @@ -161,78 +164,6 @@ pub async fn insert(ex: &mut PgConnection, jit_orders: &[JitOrder]) -> Result<()
Ok(())
}

/// Jit order combined with trades table and order_execution table, suitable for
/// API responses.
#[derive(Debug, Clone, Default, PartialEq, sqlx::FromRow)]
struct JitOrderWithExecutions {
pub uid: OrderUid,
pub owner: Address,
pub creation_timestamp: DateTime<Utc>,
pub sell_token: Address,
pub buy_token: Address,
pub sell_amount: BigDecimal,
pub buy_amount: BigDecimal,
pub valid_to: i64,
pub app_data: AppId,
pub fee_amount: BigDecimal,
pub kind: OrderKind,
pub partially_fillable: bool,
pub signature: Vec<u8>,
pub sum_sell: BigDecimal,
pub sum_buy: BigDecimal,
pub sum_fee: BigDecimal,
pub receiver: Address,
pub signing_scheme: SigningScheme,
pub sell_token_balance: SellTokenSource,
pub buy_token_balance: BuyTokenDestination,
pub executed_surplus_fee: BigDecimal,
}

impl From<JitOrderWithExecutions> for orders::FullOrder {
fn from(jit_order: JitOrderWithExecutions) -> Self {
orders::FullOrder {
uid: jit_order.uid,
owner: jit_order.owner,
creation_timestamp: jit_order.creation_timestamp,
sell_token: jit_order.sell_token,
buy_token: jit_order.buy_token,
sell_amount: jit_order.sell_amount,
buy_amount: jit_order.buy_amount,
valid_to: jit_order.valid_to,
app_data: jit_order.app_data,
fee_amount: jit_order.fee_amount.clone(),
full_fee_amount: jit_order.fee_amount,
kind: jit_order.kind,
class: orders::OrderClass::Liquidity,
partially_fillable: jit_order.partially_fillable,
signature: jit_order.signature,
sum_sell: jit_order.sum_sell,
sum_buy: jit_order.sum_buy,
sum_fee: jit_order.sum_fee,
invalidated: false,
receiver: Some(jit_order.receiver),
signing_scheme: jit_order.signing_scheme,
settlement_contract: ByteArray(
hex::decode("9008d19f58aabd9ed0d60971565aa8510560ab41")
.unwrap()
.as_slice()
.try_into()
.unwrap(),
),
sell_token_balance: jit_order.sell_token_balance,
buy_token_balance: jit_order.buy_token_balance,
presignature_pending: false,
pre_interactions: Vec::new(),
post_interactions: Vec::new(),
ethflow_data: None,
onchain_user: None,
onchain_placement_error: None,
executed_surplus_fee: jit_order.executed_surplus_fee,
full_app_data: None,
}
}
}

#[cfg(test)]
mod tests {
pub async fn read_order(
Expand Down Expand Up @@ -283,4 +214,18 @@ mod tests {
let read_jit_order = read_order(&mut db, &ByteArray([1u8; 56])).await.unwrap();
assert!(read_jit_order.is_none());
}

#[tokio::test]
#[ignore]
async fn postgres_get_by_id() {
let mut db = PgConnection::connect("postgresql://").await.unwrap();
let mut db = db.begin().await.unwrap();
crate::clear_DANGER_(&mut db).await.unwrap();

let jit_order = JitOrder::default();

// insert a jit order and make sure "SELECT" query works properly
insert(&mut db, &[jit_order.clone()]).await.unwrap();
get_by_id(&mut db, &jit_order.uid).await.unwrap().unwrap();
}
}
1 change: 1 addition & 0 deletions crates/database/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub mod onchain_broadcasted_orders;
pub mod onchain_invalidations;
pub mod order_events;
pub mod order_execution;
pub mod order_history;
pub mod orders;
pub mod quotes;
pub mod settlement_call_data;
Expand Down
177 changes: 177 additions & 0 deletions crates/database/src/order_history.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
use {
crate::{jit_orders, orders, Address},
futures::stream::BoxStream,
sqlx::PgConnection,
};

pub fn user_orders<'a>(
ex: &'a mut PgConnection,
owner: &'a Address,
offset: i64,
limit: Option<i64>,
) -> BoxStream<'a, Result<orders::FullOrder, sqlx::Error>> {
// As a future consideration for this query we could move from offset to an
// approach called keyset pagination where the offset is identified by "key"
// of the previous query. In our case that would be the lowest
// creation_timestamp. This way the database can start immediately at the
// offset through the index without enumerating the first N elements
// before as is the case with OFFSET.
// On the other hand that approach is less flexible so we will consider if we
// see that these queries are taking too long in practice.
#[rustfmt::skip]
const QUERY: &str = const_format::concatcp!(
"(SELECT ", orders::SELECT,
" FROM ", orders::FROM,
" LEFT OUTER JOIN onchain_placed_orders onchain_o on onchain_o.uid = o.uid",
" WHERE o.owner = $1",
" ORDER BY creation_timestamp DESC LIMIT $2 + $3 ) ",
" UNION ",
" (SELECT ", orders::SELECT,
" FROM ", orders::FROM,
" LEFT OUTER JOIN onchain_placed_orders onchain_o on onchain_o.uid = o.uid",
" WHERE onchain_o.sender = $1 ",
" ORDER BY creation_timestamp DESC LIMIT $2 + $3 ) ",
" UNION ",
" (SELECT ", jit_orders::SELECT,
" FROM ", jit_orders::FROM,
" WHERE o.owner = $1 AND NOT EXISTS (SELECT 1 FROM orders ord WHERE o.uid = ord.uid)",
" ORDER BY creation_timestamp DESC LIMIT $2 + $3 ) ",
" ORDER BY creation_timestamp DESC ",
" LIMIT $2 ",
" OFFSET $3 ",
);
sqlx::query_as(QUERY)
.bind(owner)
.bind(limit)
.bind(offset)
.fetch(ex)
}

#[cfg(test)]
mod tests {
use {
super::*,
crate::{
byte_array::ByteArray,
events::EventIndex,
onchain_broadcasted_orders::{insert_onchain_order, OnchainOrderPlacement},
},
chrono::{DateTime, Utc},
futures::StreamExt,
sqlx::Connection,
};

type Data = ([u8; 56], Address, DateTime<Utc>);
async fn user_orders(
ex: &mut PgConnection,
owner: &Address,
offset: i64,
limit: Option<i64>,
) -> Vec<Data> {
super::user_orders(ex, owner, offset, limit)
.map(|o| {
let o = o.unwrap();
(o.uid.0, o.owner, o.creation_timestamp)
})
.collect::<Vec<_>>()
.await
}

#[tokio::test]
#[ignore]
async fn postgres_user_orders_performance_many_users_with_some_orders() {
// The following test can be used as performance test,
// if the values for i and j are increased ->i=100
// and j=1000 the query should still 10 ms
let mut db = PgConnection::connect("postgresql://").await.unwrap();
let mut db = db.begin().await.unwrap();
crate::clear_DANGER_(&mut db).await.unwrap();

for i in 0..1u32 {
let mut owner_bytes = i.to_ne_bytes().to_vec();
owner_bytes.append(&mut vec![0; 20 - owner_bytes.len()]);
let owner = ByteArray(owner_bytes.try_into().unwrap());
for j in 0..10u32 {
let mut i_as_bytes = i.to_ne_bytes().to_vec();
let mut j_as_bytes = j.to_ne_bytes().to_vec();
let mut order_uid_info = vec![0; 56 - i_as_bytes.len() - j_as_bytes.len()];
order_uid_info.append(&mut j_as_bytes);
i_as_bytes.append(&mut order_uid_info);
let uid = ByteArray(i_as_bytes.try_into().unwrap());
let order = orders::Order {
owner,
uid,
creation_timestamp: Utc::now(),
..Default::default()
};
orders::insert_order(&mut db, &order).await.unwrap();
if j % 10 == 0 {
let onchain_order = OnchainOrderPlacement {
order_uid: uid,
sender: owner,
placement_error: None,
};
let event_index = EventIndex::default();
insert_onchain_order(&mut db, &event_index, &onchain_order)
.await
.unwrap();
}
}
}

let now = std::time::Instant::now();
let number_of_query_executions = 100;
for _ in 0..number_of_query_executions {
let _result = user_orders(&mut db, &ByteArray([2u8; 20]), 10, Some(10)).await;
}
let elapsed = now.elapsed();
println!(
"Time per execution {:?}",
elapsed / number_of_query_executions
);
assert!(elapsed / number_of_query_executions < std::time::Duration::from_secs(1));
}

#[tokio::test]
#[ignore]
async fn postgres_user_orders_performance_user_with_many_orders() {
// The following test can be used as performance test close to prod env,
// if the values for j increased ->j=100_000 query should still finish
// below 200 ms
let mut db = PgConnection::connect("postgresql://").await.unwrap();
let mut db = db.begin().await.unwrap();
crate::clear_DANGER_(&mut db).await.unwrap();

for i in 0..1u32 {
let mut owner_bytes = i.to_ne_bytes().to_vec();
owner_bytes.append(&mut vec![0; 20 - owner_bytes.len()]);
let owner = ByteArray(owner_bytes.try_into().unwrap());
for j in 0..10u32 {
let mut i_as_bytes = i.to_ne_bytes().to_vec();
let mut j_as_bytes = j.to_ne_bytes().to_vec();
let mut order_uid_info = vec![0; 56 - i_as_bytes.len() - j_as_bytes.len()];
order_uid_info.append(&mut j_as_bytes);
i_as_bytes.append(&mut order_uid_info);
let order = orders::Order {
owner,
uid: ByteArray(i_as_bytes.try_into().unwrap()),
creation_timestamp: Utc::now(),
..Default::default()
};
orders::insert_order(&mut db, &order).await.unwrap();
}
}

let now = std::time::Instant::now();
let number_of_query_executions = 100;
for _ in 0..number_of_query_executions {
let _result = user_orders(&mut db, &ByteArray([0u8; 20]), 10, Some(10)).await;
}
let elapsed = now.elapsed();
println!(
"Time per execution {:?}",
elapsed / number_of_query_executions
);
assert!(elapsed / number_of_query_executions < std::time::Duration::from_secs(1));
}
}
Loading

0 comments on commit 067f45f

Please sign in to comment.