Skip to content

Commit

Permalink
refactor: payment enum for auth token or wallet
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Jan 31, 2024
1 parent c1c86d4 commit e73989d
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 75 deletions.
4 changes: 2 additions & 2 deletions file-exchange/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,14 @@ pub struct DownloaderArgs {
env = "MNEMONIC",
help = "Mnemonic for payment wallet"
)]
pub mnemonic: String,
pub mnemonic: Option<String>,
#[clap(
long,
value_name = "provider_url",
env = "PROVIDER",
help = "Blockchain provider endpoint"
)]
pub provider: String,
pub provider: Option<String>,
#[clap(
long,
value_name = "verifier",
Expand Down
129 changes: 65 additions & 64 deletions file-exchange/src/download_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::{

use crate::util::build_wallet;

use self::signer::{ReceiptSigner, TapReceipt};
use self::signer::ReceiptSigner;

pub mod signer;

Expand All @@ -42,15 +42,19 @@ pub struct Downloader {
_gateway_url: Option<String>,
static_endpoints: Vec<String>,
output_dir: String,
free_query_auth_token: Option<String>,
indexer_urls: Arc<StdMutex<Vec<IndexerEndpoint>>>,
indexer_blocklist: Arc<StdMutex<HashSet<String>>>,
// key is the file manifest identifier (IPFS hash) and value is a HashSet of downloaded chunk indices
target_chunks: Arc<StdMutex<HashMap<String, HashSet<u64>>>>,
chunk_max_retry: u64,
bundle_finder: Finder,
#[allow(dead_code)]
receipt_signer: ReceiptSigner,
payment: PaymentMethod,
}

/// A downloader can either provide a free query auth token or receipt signer
pub enum PaymentMethod {
FreeQuery(String),
TAPSigner(ReceiptSigner),
}

impl Downloader {
Expand All @@ -63,25 +67,35 @@ impl Downloader {
.await
.expect("Read bundle");

let wallet = build_wallet(&args.mnemonic).expect("Mnemonic build wallet");
let signing_key = wallet.signer().to_bytes();
let secp256k1_private_key =
SecretKey::from_slice(&signing_key).expect("Private key from wallet");
let provider = Provider::<Http>::try_from(&args.provider).expect("Connect to the provider");
//TODO: migrate ethers type to alloy
let chain_id = U256::from(
provider
.get_chainid()
.await
.expect("Get chain id from provider")
.as_u128(),
);
let receipt_signer = ReceiptSigner::new(
secp256k1_private_key,
chain_id,
Address::from_str(&args.verifier).expect("Parse verifier"),
)
.await;
let payment = if let Some(mnemonic) = &args.mnemonic {
let wallet = build_wallet(mnemonic).expect("Mnemonic build wallet");
let signing_key = wallet.signer().to_bytes();
let secp256k1_private_key =
SecretKey::from_slice(&signing_key).expect("Private key from wallet");
let provider =
Provider::<Http>::try_from(&args.provider.expect("Provider required to connect"))
.expect("Connect to the provider");
//TODO: migrate ethers type to alloy
let chain_id = U256::from(
provider
.get_chainid()
.await
.expect("Get chain id from provider")
.as_u128(),
);
PaymentMethod::TAPSigner(
ReceiptSigner::new(
secp256k1_private_key,
chain_id,
Address::from_str(&args.verifier).expect("Parse verifier"),
)
.await,
)
} else if let Some(token) = &args.free_query_auth_token {
PaymentMethod::FreeQuery(token.clone())
} else {
panic!("No payment wallet nor free query token provided");
};

Downloader {
http_client: reqwest::Client::new(),
Expand All @@ -90,13 +104,12 @@ impl Downloader {
_gateway_url: args.gateway_url,
static_endpoints: args.indexer_endpoints,
output_dir: args.output_dir,
free_query_auth_token: args.free_query_auth_token,
indexer_urls: Arc::new(StdMutex::new(Vec::new())),
indexer_blocklist: Arc::new(StdMutex::new(HashSet::new())),
target_chunks: Arc::new(StdMutex::new(HashMap::new())),
chunk_max_retry: args.max_retry,
bundle_finder: Finder::new(ipfs_client),
receipt_signer,
payment,
}
}

Expand Down Expand Up @@ -194,22 +207,15 @@ impl Downloader {
let client = self.http_client.clone();
//TODO: can utilize operator address for on-chain checks
let request = self.download_range_request(&meta, i, file.clone())?;
//TODO: create specialized receipts with indexer allocation and cost models
let receipt = self
.receipt_signer
.create_receipt(
graphql::allocation_id(&request.receiver),
&discover::Finder::fees(),
)
.await;
let payment = self.payment_header(&request.receiver).await?;
let block_list = self.indexer_blocklist.clone();
let target_chunks: Arc<StdMutex<HashMap<String, HashSet<u64>>>> =
self.target_chunks.clone();
let url = request.query_endpoint.clone();
// Spawn a new asynchronous task for each range request
let handle: tokio::task::JoinHandle<Result<Arc<Mutex<File>>, Error>> =
tokio::spawn(async move {
match download_chunk_and_write_to_file(&client, request, receipt).await {
match download_chunk_and_write_to_file(&client, request, payment).await {
Ok(r) => {
// Update downloaded status
target_chunks
Expand Down Expand Up @@ -258,6 +264,22 @@ impl Downloader {
Ok(())
}

/// Make a header for chunk request authorization either free or paid
async fn payment_header(&self, receiver: &str) -> Result<(HeaderName, String), Error> {
match &self.payment {
PaymentMethod::FreeQuery(token) => Ok((AUTHORIZATION, token.to_string())),
PaymentMethod::TAPSigner(signer) => {
let receipt = signer
.create_receipt(graphql::allocation_id(receiver), &discover::Finder::fees())
.await?;
Ok((
HeaderName::from_str("Scalar-Receipt").unwrap(),
receipt.serialize(),
))
}
}
}

/// Generate a request to download a chunk
fn download_range_request(
&self,
Expand Down Expand Up @@ -301,7 +323,6 @@ impl Downloader {
chunk_hash,
file,
max_retry: self.chunk_max_retry,
auth_token: self.free_query_auth_token.clone(),
})
}

Expand Down Expand Up @@ -358,7 +379,6 @@ impl Downloader {
pub struct DownloadRangeRequest {
receiver: String,
query_endpoint: String,
auth_token: Option<String>,
file_hash: String,
start: u64,
end: u64,
Expand All @@ -371,7 +391,7 @@ pub struct DownloadRangeRequest {
async fn download_chunk_and_write_to_file(
http_client: &Client,
request: DownloadRangeRequest,
receipt: Option<TapReceipt>,
auth_header: (HeaderName, String),
) -> Result<Arc<Mutex<File>>, Error> {
let mut attempts = 0;

Expand All @@ -384,8 +404,7 @@ async fn download_chunk_and_write_to_file(
match request_chunk(
http_client,
&request.query_endpoint,
request.auth_token.clone(),
receipt.clone(),
auth_header.clone(),
&request.file_hash,
request.start,
request.end,
Expand Down Expand Up @@ -430,40 +449,22 @@ async fn download_chunk_and_write_to_file(
async fn request_chunk(
http_client: &Client,
query_endpoint: &str,
auth_token: Option<String>,
receipt: Option<TapReceipt>,
auth_header: (HeaderName, String),
file_hash: &str,
start: u64,
end: u64,
) -> Result<Bytes, Error> {
let range = format!("bytes={}-{}", start, end);
if auth_token.is_none() && receipt.is_none() {
let e = "No auth token provided and no receipt constructed".to_string();
tracing::error!(e);
return Err(Error::InvalidConfig(e));
};

tracing::debug!(query_endpoint, range, "Make range request");
let request = http_client
let response = http_client
.get(query_endpoint)
.header("file_hash", file_hash)
.header(CONTENT_RANGE, range);

let request = if let Some(auth) = auth_token {
request.header(AUTHORIZATION, auth)
} else {
request
};
let request = if let Some(receipt) = receipt {
request.header(
HeaderName::from_str("Scalar-Receipt").unwrap(),
&receipt.serialize(),
)
} else {
request
};

let response = request.send().await.map_err(Error::Request)?;
.header(CONTENT_RANGE, range)
.header(auth_header.0, auth_header.1)
.send()
.await
.map_err(Error::Request)?;

// Check if the server supports range requests
if response.status().is_success() && response.headers().contains_key(CONTENT_RANGE) {
Expand Down
15 changes: 9 additions & 6 deletions file-exchange/src/download_client/signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use secp256k1::SecretKey;
use tap_core::eip_712_signed_message::EIP712SignedMessage;
use tap_core::tap_receipt::Receipt;

use crate::errors::Error;
use crate::util::GRT;

pub struct ReceiptSigner {
Expand Down Expand Up @@ -46,7 +47,11 @@ impl ReceiptSigner {
}
}

pub async fn create_receipt(&self, allocation_id: Address, fee: &GRT) -> Option<TapReceipt> {
pub async fn create_receipt(
&self,
allocation_id: Address,
fee: &GRT,
) -> Result<TapReceipt, Error> {
let nonce = rand::thread_rng().next_u64();
let timestamp_ns = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
Expand All @@ -60,11 +65,9 @@ impl ReceiptSigner {
nonce,
value: fee.0.as_u128().unwrap_or(0),
};
let wallet =
Wallet::from_bytes(self.signer.as_ref()).expect("failed to prepare receipt wallet");
let signed = EIP712SignedMessage::new(&self.domain, receipt, &wallet)
let wallet = Wallet::from_bytes(self.signer.as_ref()).map_err(Error::WalletError)?;
EIP712SignedMessage::new(&self.domain, receipt, &wallet)
.await
.expect("failed to sign receipt");
Some(signed)
.map_err(|e| Error::ContractError(e.to_string()))
}
}
8 changes: 5 additions & 3 deletions file-exchange/tests/file_transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,13 @@ mod tests {
]
.to_vec(),
verifier: String::from("0xfC24cE7a4428A6B89B52645243662A02BA734ECF"),
mnemonic: String::from(
mnemonic: Some(String::from(
"sheriff obscure trick beauty army fat wink legal flee leader section suit",
),
)),
free_query_auth_token: Some("Bearer free-token".to_string()),
provider: String::from("https://arbitrum-sepolia.infura.io/v3/aaaaaaaaaaaaaaaaaaaa"),
provider: Some(String::from(
"https://arbitrum-sepolia.infura.io/v3/aaaaaaaaaaaaaaaaaaaa",
)),
..Default::default()
};

Expand Down

0 comments on commit e73989d

Please sign in to comment.