Skip to content

Commit

Permalink
feat: allow a streamed proposal channel on top of existing one
Browse files Browse the repository at this point in the history
  • Loading branch information
guy-starkware committed Nov 21, 2024
1 parent 2ff3249 commit cf16406
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 20 deletions.
3 changes: 2 additions & 1 deletion crates/papyrus_protobuf/src/converters/test_instances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use papyrus_test_utils::{auto_impl_get_test_instance, get_number_of_variants, Ge
use rand::Rng;
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::core::ContractAddress;
use starknet_api::transaction::Transaction;
use starknet_api::transaction::{Transaction, TransactionHash};

use crate::consensus::{
ConsensusMessage,
Expand Down Expand Up @@ -52,6 +52,7 @@ auto_impl_get_test_instance! {
}
pub struct TransactionBatch {
pub transactions: Vec<Transaction>,
pub tx_hashes: Vec<TransactionHash>,
}
pub enum ProposalPart {
Init(ProposalInit) = 0,
Expand Down
3 changes: 3 additions & 0 deletions crates/papyrus_protobuf/src/proto/p2p/proto/consensus.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ syntax = "proto3";
import "p2p/proto/transaction.proto";
import "p2p/proto/common.proto";

// To be deprecated
message Proposal {
uint64 height = 1;
uint32 round = 2;
Expand Down Expand Up @@ -54,6 +55,8 @@ message ProposalInit {

message TransactionBatch {
repeated Transaction transactions = 1;
// TODO(guyn): remove this once we know how to calculate hashes
repeated Felt252 tx_hashes = 2;
}

message ProposalFin {
Expand Down
41 changes: 39 additions & 2 deletions crates/sequencing/papyrus_consensus/src/stream_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ type StreamKey = (PeerId, StreamId);
const CHANNEL_BUFFER_LENGTH: usize = 100;

#[derive(Debug, Clone)]
struct StreamData<T: Clone + Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>> {
struct StreamData<
T: Clone + Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError> + 'static,
> {
next_message_id: MessageId,
// Last message ID. If None, it means we have not yet gotten to it.
fin_message_id: Option<MessageId>,
Expand All @@ -56,7 +58,7 @@ impl<T: Clone + Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError
/// - Buffering inbound messages and reporting them to the application in order.
/// - Sending outbound messages to the network, wrapped in StreamMessage.
pub struct StreamHandler<
T: Clone + Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>,
T: Clone + Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError> + 'static,
> {
// For each stream ID from the network, send the application a Receiver
// that will receive the messages in order. This allows sending such Receivers.
Expand Down Expand Up @@ -100,6 +102,41 @@ impl<T: Clone + Send + Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversi
}
}

/// Create a new StreamHandler and start it running in a new task.
/// Gets network input/output channels and returns application input/output channels.
pub fn get_channels(
inbound_network_receiver: BroadcastTopicServer<StreamMessage<T>>,
outbound_network_sender: BroadcastTopicClient<StreamMessage<T>>,
) -> (mpsc::Sender<(StreamId, mpsc::Receiver<T>)>, mpsc::Receiver<mpsc::Receiver<T>>) {
// The inbound messages come into StreamHandler via inbound_network_receiver,
// and are forwarded to the consensus via inbound_internal_receiver
// (the StreamHandler keeps the inbound_internal_sender to pass messsage).
let (inbound_internal_sender, inbound_internal_receiver): (
mpsc::Sender<mpsc::Receiver<T>>,
mpsc::Receiver<mpsc::Receiver<T>>,
) = mpsc::channel(CHANNEL_BUFFER_LENGTH);
// The outbound messages that an application would like to send are:
// 1. Sent into outbound_internal_sender as tuples of (StreamId, Receiver)
// 2. Ingested by StreamHandler by its outbound_internal_receiver.
// 3. Broadcast by the StreamHandler using its outbound_network_sender.
let (outbound_internal_sender, outbound_internal_receiver): (
mpsc::Sender<(StreamId, mpsc::Receiver<T>)>,
mpsc::Receiver<(StreamId, mpsc::Receiver<T>)>,
) = mpsc::channel(CHANNEL_BUFFER_LENGTH);

let mut stream_handler = StreamHandler::<T>::new(
inbound_internal_sender, // Sender<Receiver<T>>,
inbound_network_receiver, // BroadcastTopicServer<StreamMessage<T>>,
outbound_internal_receiver, // Receiver<(StreamId, Receiver<T>)>,
outbound_network_sender, // BroadcastTopicClient<StreamMessage<T>>
);
tokio::spawn(async move {
stream_handler.run().await;
});

(outbound_internal_sender, inbound_internal_receiver)
}

/// Listen for messages coming from the network and from the application.
/// - Outbound messages are wrapped as StreamMessage and sent to the network directly.
/// - Inbound messages are stripped of StreamMessage and buffered until they can be sent in the
Expand Down
18 changes: 17 additions & 1 deletion crates/starknet_api/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ use std::fs::read_to_string;
use std::path::{Path, PathBuf};

use infra_utils::path::cargo_manifest_dir;
use serde::{Deserialize, Serialize};
use starknet_types_core::felt::Felt;

use crate::core::{ContractAddress, Nonce};
use crate::block::BlockNumber;
use crate::core::{ChainId, ContractAddress, Nonce};
use crate::transaction::{Transaction, TransactionHash};

pub mod declare;
pub mod deploy_account;
Expand All @@ -27,6 +30,19 @@ pub fn read_json_file<P: AsRef<Path>>(path_in_resource_dir: P) -> serde_json::Va
serde_json::from_str(&json_str).unwrap()
}

#[derive(Deserialize, Serialize, Debug)]
/// A struct used for reading the transaction test data (e.g., for transaction hash tests).
pub struct TransactionTestData {
/// The actual transaction.
pub transaction: Transaction,
/// The expected transaction hash.
pub transaction_hash: TransactionHash,
/// An optional transaction hash to query.
pub only_query_transaction_hash: Option<TransactionHash>,
pub chain_id: ChainId,
pub block_number: BlockNumber,
}

#[derive(Debug, Default)]
pub struct NonceManager {
next_nonce: HashMap<ContractAddress, Felt>,
Expand Down
39 changes: 39 additions & 0 deletions crates/starknet_api/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,31 @@ impl From<crate::executable_transaction::Transaction> for Transaction {
}
}

impl From<(Transaction, TransactionHash)> for crate::executable_transaction::Transaction {
fn from(tup: (Transaction, TransactionHash)) -> Self {
let (tx, tx_hash) = tup;
match tx {
Transaction::Declare(_tx) => {
unimplemented!("Declare transactions are not supported yet.")
}
Transaction::Deploy(_tx) => {
unimplemented!("Deploy transactions are not supported yet.")
}
Transaction::DeployAccount(_tx) => {
unimplemented!("DeployAccount transactions are not supported yet.")
}
Transaction::Invoke(tx) => crate::executable_transaction::Transaction::Account(
crate::executable_transaction::AccountTransaction::Invoke(
crate::executable_transaction::InvokeTransaction { tx, tx_hash },
),
),
Transaction::L1Handler(_) => {
unimplemented!("L1Handler transactions are not supported yet.")
}
}
}
}

#[derive(Copy, Clone, Debug, Eq, PartialEq, Default)]
pub struct TransactionOptions {
/// Transaction that shouldn't be broadcasted to StarkNet. For example, users that want to
Expand Down Expand Up @@ -775,6 +800,20 @@ impl std::fmt::Display for TransactionHash {
}
}

// TODO(guyn): this is only used for conversion of transactions->executable transactions
// It should be removed once we integrate a proper way to calculate executable transaction hashes
impl From<TransactionHash> for Vec<u8> {
fn from(tx_hash: TransactionHash) -> Vec<u8> {
tx_hash.0.to_bytes_be().to_vec()
}
}
impl From<Vec<u8>> for TransactionHash {
fn from(bytes: Vec<u8>) -> TransactionHash {
let array: [u8; 32] = bytes.try_into().expect("Expected a Vec of length 32");
TransactionHash(StarkHash::from_bytes_be(&array))
}
}

/// A transaction version.
#[derive(
Debug,
Expand Down
20 changes: 4 additions & 16 deletions crates/starknet_api/src/transaction_hash_test.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
use pretty_assertions::assert_eq;
use serde::{Deserialize, Serialize};
use sha3::{Digest, Keccak256};
use starknet_types_core::felt::Felt;

use super::{get_transaction_hash, validate_transaction_hash, CONSTRUCTOR_ENTRY_POINT_SELECTOR};
use crate::block::BlockNumber;
use crate::core::ChainId;
use crate::test_utils::read_json_file;
use crate::transaction::{Transaction, TransactionHash, TransactionOptions};
use crate::test_utils::{read_json_file, TransactionTestData};
use crate::transaction::{Transaction, TransactionOptions};

#[test]
fn test_constructor_selector() {
Expand All @@ -19,18 +16,9 @@ fn test_constructor_selector() {
assert_eq!(constructor_felt, CONSTRUCTOR_ENTRY_POINT_SELECTOR);
}

#[derive(Deserialize, Serialize)]
struct TransactionTestData {
transaction: Transaction,
transaction_hash: TransactionHash,
only_query_transaction_hash: Option<TransactionHash>,
chain_id: ChainId,
block_number: BlockNumber,
}

#[test]
fn test_transaction_hash() {
// The details were taken from Starknet Mainnet. You can found the transactions by hash in:
// The details were taken from Starknet Mainnet. You can find the transactions by hash in:
// https://alpha-mainnet.starknet.io/feeder_gateway/get_transaction?transactionHash=<transaction_hash>
let transactions_test_data_vec: Vec<TransactionTestData> =
serde_json::from_value(read_json_file("transaction_hash.json")).unwrap();
Expand Down Expand Up @@ -64,7 +52,7 @@ fn test_transaction_hash() {

#[test]
fn test_deprecated_transaction_hash() {
// The details were taken from Starknet Mainnet. You can found the transactions by hash in:
// The details were taken from Starknet Mainnet. You can find the transactions by hash in:
// https://alpha-mainnet.starknet.io/feeder_gateway/get_transaction?transactionHash=<transaction_hash>
let transaction_test_data_vec: Vec<TransactionTestData> =
serde_json::from_value(read_json_file("deprecated_transaction_hash.json")).unwrap();
Expand Down
28 changes: 28 additions & 0 deletions crates/starknet_api/src/transaction_test.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use super::Transaction;
use crate::block::NonzeroGasPrice;
use crate::executable_transaction::Transaction as ExecutableTransaction;
use crate::execution_resources::GasAmount;
use crate::test_utils::{read_json_file, TransactionTestData};
use crate::transaction::Fee;

#[test]
Expand All @@ -25,3 +28,28 @@ fn test_fee_div_ceil() {
Fee(28).checked_div_ceil(NonzeroGasPrice::try_from(3_u8).unwrap()).unwrap()
);
}

#[test]
fn convert_executable_transaction_and_back() {
// The details were taken from Starknet Mainnet. You can find the transactions by hash in:
// https://alpha-mainnet.starknet.io/feeder_gateway/get_transaction?transactionHash=<transaction_hash>
let mut transactions_test_data_vec: Vec<TransactionTestData> =
serde_json::from_value(read_json_file("transaction_hash.json")).unwrap();

let (tx, tx_hash) = loop {
match transactions_test_data_vec.pop() {
Some(data) => {
if let Transaction::Invoke(tx) = data.transaction {
// Do something with the data
break (Transaction::Invoke(tx), data.transaction_hash);
}
}
None => {
panic!("Could not find a single Invoke transaction in the test data");
}
}
};
let executable_tx: ExecutableTransaction = (tx.clone(), tx_hash.clone()).into();
let tx_back = Transaction::from(executable_tx);
assert_eq!(tx, tx_back);
}

0 comments on commit cf16406

Please sign in to comment.