Skip to content

Commit

Permalink
feat: gas market
Browse files Browse the repository at this point in the history
  • Loading branch information
mx819812523 committed Sep 12, 2024
1 parent d34e08a commit f67586c
Show file tree
Hide file tree
Showing 12 changed files with 1,069 additions and 76 deletions.
300 changes: 225 additions & 75 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ members = [
"crates/rooch-open-rpc-macros",
"crates/rooch-open-rpc-spec",
"crates/rooch-open-rpc-spec-builder",
"crates/rooch-oracle",
"crates/rooch-pipeline-processor",
"crates/rooch-proposer",
"crates/rooch-relayer",
Expand All @@ -62,7 +63,8 @@ default-members = [
"moveos/moveos",
"frameworks/framework-release",
"crates/rooch",
"crates/rooch-faucet"
"crates/rooch-faucet",
"crates/rooch-oracle"
]

# All workspace members should inherit these keys
Expand Down Expand Up @@ -169,6 +171,7 @@ ethers = { version = "2.0.7", features = ["legacy"] }
eyre = "0.6.8"
fastcrypto = { git = "https://github.com/MystenLabs/fastcrypto", rev = "56f6223b84ada922b6cb2c672c69db2ea3dc6a13" }
futures = "0.3.28"
futures-util = "0.3.30"
hex = "0.4.3"
rustc-hex = "2.1"
itertools = "0.13.0"
Expand Down Expand Up @@ -215,10 +218,12 @@ tiny-keccak = { version = "2", features = ["keccak", "sha3"] }
tiny-bip39 = "1.0.0"
tokio = { version = "1.40.0", features = ["full"] }
tokio-util = "0.7.12"
tokio-tungstenite = { version = "0.23.1", features = ["native-tls"] }
tonic = { version = "0.8", features = ["gzip"] }
tracing = "0.1.37"
tracing-appender = "0.2.2"
tracing-subscriber = { version = "0.3.15" }
tungstenite = "0.24.0"

codespan-reporting = "0.11.1"
codespan = "0.11.1"
Expand Down
23 changes: 23 additions & 0 deletions crates/rooch-oracle/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
name = "rooch-oracle"
version = "0.1.0"
edition = "2021"

[dependencies]
tokio = { workspace = true }
tokio-tungstenite = { workspace = true }
tungstenite = { workspace = true }
serde_json = { workspace = true }
futures-util = { workspace = true }
tracing-subscriber = { workspace = true }
log = { workspace = true }
clap = { workspace = true }
reqwest = { workspace = true }
anyhow = { workspace = true }
bcs = { workspace = true }

rooch-rpc-api = { workspace = true }
move-core-types = { workspace = true }
moveos-types = { workspace = true }
rooch-types = { workspace = true }
rooch-rpc-client = { workspace = true }
108 changes: 108 additions & 0 deletions crates/rooch-oracle/src/binance.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright (c) RoochNetwork
// SPDX-License-Identifier: Apache-2.0

use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{mpsc, RwLock};
use clap::Parser;
use log::info;
use move_core_types::account_address::AccountAddress;
use move_core_types::identifier::Identifier;
use move_core_types::language_storage::ModuleId;
use moveos_types::move_types::FunctionId;
use moveos_types::transaction::MoveAction;
use rooch_rpc_client::wallet_context::WalletContext;
use serde_json::Value;
use tokio::time::Instant;
use crate::data_process::{execute_transaction, parse_and_convert, subscribe_websocket, State};

#[derive(Parser, Debug, Clone)]
pub struct BinanceConfig {
#[arg(long, default_value = "wss://stream.binance.com:9443/ws/btcusdt@ticker")]
pub binance_url: String,

#[arg(long, env = "ROOCH_BINANCE_WALLET_DIR", default_value = "~/.rooch/rooch_config")]
pub binance_wallet_dir: Option<PathBuf>,

#[arg(long, env = "ROOCH_BINANCE_WALLET_PWD")]
pub binance_wallet_pwd: Option<String>,

#[arg(long, default_value = "10")]
pub binance_submit_interval: u64,

#[arg(long, env = "ROOCH_BINANCE_ORACLE_ID")]
pub binance_oracle_id: String,

#[arg(long, env = "ROOCH_BINANCE_ADMIN_ID")]
pub binance_admin_id: String,
}

pub struct Binance {
pub wallet_state: Arc<RwLock<State>>,
binance_config: BinanceConfig
}

impl Binance {
pub async fn new(
config: BinanceConfig,
) -> Self {
let wallet = WalletContext::new(config.binance_wallet_dir.clone()).unwrap();
let wallet_pwd = config.binance_wallet_pwd.clone();
Self {
wallet_state: Arc::new(RwLock::new(State {
wallet_pwd,
context: wallet,
})),
binance_config: config,
}
}

pub async fn subscribe(
&self,
package_id: &str,
) {
let (tx, mut rx) = mpsc::channel(1);
let url = self.binance_config.binance_url.clone();
let handle = tokio::spawn(async move {
subscribe_websocket(url, tx, None).await;
});
let function_id = FunctionId::new(
ModuleId::new(AccountAddress::from_hex_literal(package_id).unwrap(), Identifier::new("trusted_oracle").unwrap()),
Identifier::new("submit_data").unwrap(),
);
let address_mapping = self.wallet_state.read().await.context.address_mapping.clone();
let oracle_obj = parse_and_convert(format!("object_id:{}", self.binance_config.binance_oracle_id).as_str(), &address_mapping);
let ticker = parse_and_convert("string:BTCUSD", &address_mapping);
let identifier = parse_and_convert("string:Binance", &address_mapping);
let admin_obj = parse_and_convert(format!("object_id:{}", self.binance_config.binance_admin_id).as_str(), &address_mapping);
let mut last_execution = Instant::now() - Duration::from_secs(10); // 初始化为10秒前
while let Some(msg) = rx.recv().await {
let wallet_state = self.wallet_state.write().await;

let msg_value = serde_json::from_str::<Value>(&msg).unwrap();
if msg_value["c"].as_str().is_none() || Instant::now().duration_since(last_execution) < Duration::from_secs(self.binance_config.binance_submit_interval) {
continue;
}
last_execution = Instant::now();
let price = format!("u256:{}", msg_value["c"].as_str().unwrap().parse::<f64>().unwrap() * 10f64.powi(8));
let decimal = "8u8".to_string();
let args = vec![
oracle_obj.clone(),
ticker.clone(),
parse_and_convert(price.as_str(), &address_mapping),
parse_and_convert(decimal.as_str(), &address_mapping),
identifier.clone(),
admin_obj.clone()
];
let move_action = MoveAction::new_function_call(
function_id.clone(),
vec![],
args,
);
let _ = execute_transaction(move_action, wallet_state).await;
info!("Received Binance price: {}", msg_value["c"]);
}
handle.await.expect("The task failed");
}
}
132 changes: 132 additions & 0 deletions crates/rooch-oracle/src/data_process.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright (c) RoochNetwork
// SPDX-License-Identifier: Apache-2.0

use std::collections::BTreeMap;
use std::str::FromStr;
use tokio::sync::{mpsc, RwLockWriteGuard};
use std::time::Duration;
use futures_util::{SinkExt, StreamExt};
use log::{error, info, warn};
use serde_json::Value;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message;
use moveos_types::transaction::MoveAction;
use rooch_types::address::RoochAddress;
use rooch_rpc_api::jsonrpc_types::KeptVMStatusView;
use rooch_rpc_client::wallet_context::WalletContext;
use anyhow::Result;
use move_core_types::account_address::AccountAddress;
use rooch_types::function_arg::FunctionArg;

pub async fn subscribe_websocket(url: String, tx: mpsc::Sender<String>, subscribe_msg: Option<Value>) {
loop {
let (ws_stream, _) = match connect_async(&url).await {
Ok(stream) => stream,
Err(e) => {
warn!("Failed to connect: {} error:{}", url, e);
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
}
};

info!("Connected to {}", url);

let (mut write, mut read) = ws_stream.split();

if subscribe_msg.is_some() {
if let Err(e) = write.send(Message::Text(subscribe_msg.clone().unwrap().to_string())).await {
warn!("Failed to send message: {}", e);
continue;
}
}


while let Some(message) = read.next().await {
match message {
Ok(msg) => {
if let Message::Text(text) = msg {
if let Err(e) = tx.send(text).await {
warn!("Failed to send message through channel: {}", e);
break;
}
}
},
Err(e) => {
warn!("Error: {}", e);
break;
}
}
}

warn!("Connection lost or error occurred, restarting...");
tokio::time::sleep(Duration::from_secs(5)).await;
}
}


pub async fn subscribe_http(url: String, tx: mpsc::Sender<Value>, interval: u64) {
loop {
match reqwest::get(&url).await {
Ok(response) => {
match response.json::<Value>().await {
Ok(value) => {
if let Err(e) = tx.send(value).await {
warn!("Failed to send message through channel: {}", e);
}
}
Err(_) => {}
}
}
Err(e) => {
warn!("Failed to fetch price: {}", e);
}
};

tokio::time::sleep(Duration::from_secs(interval)).await;
}
}

pub struct State {
pub(crate) wallet_pwd: Option<String>,
pub context: WalletContext,
}

pub async fn execute_transaction<'a>(
action: MoveAction,
state: RwLockWriteGuard<'a, State>,
) -> Result<()> {
let sender: RoochAddress = state.context.client_config.active_address.unwrap();
let pwd = state.wallet_pwd.clone();
let result = state.context
.sign_and_execute(sender, action, pwd, None)
.await;
match result {
Ok(tx) => match tx.execution_info.status {
KeptVMStatusView::Executed => {
info!(
"Executed success tx_has: {}",
tx.execution_info.tx_hash
);
}
_ => {
error!("Transfer gases failed {:?}", tx.execution_info.status);
}
},
Err(e) => {
error!("Transfer gases failed {}", e);
}
};
Ok(())
}

pub fn parse_and_convert(arg: &str, address_mapping: &BTreeMap<String, AccountAddress>) -> Vec<u8> {
let mapping = |input: &str| -> Option<AccountAddress> {
address_mapping.get(input).cloned()
};
FunctionArg::from_str(arg)
.unwrap()
.into_bytes(&mapping)
.unwrap()
}


7 changes: 7 additions & 0 deletions crates/rooch-oracle/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// Copyright (c) RoochNetwork
// SPDX-License-Identifier: Apache-2.0

pub mod data_process;
pub mod okx;
pub mod binance;
pub mod pyth;
63 changes: 63 additions & 0 deletions crates/rooch-oracle/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (c) RoochNetwork
// SPDX-License-Identifier: Apache-2.0

use clap::Parser;
use rooch_oracle::binance::{Binance, BinanceConfig};
use rooch_oracle::okx::{Okx, OkxConfig};
use rooch_oracle::pyth::{Pyth, PythConfig};

#[derive(Parser, Clone)]
#[clap(
name = "Rooch Oracle",
about = "Oracle backend for BTC tokens price on Rooch",
rename_all = "kebab-case"
)]
pub struct Config {
#[clap(flatten)]
pub okx_config: OkxConfig,
#[clap(flatten)]
pub binance_config: BinanceConfig,
#[clap(flatten)]
pub pyth_config: PythConfig,
#[clap(short, long, env = "ROOCH_ORACLE_PACKAGE")]
pub package_id: String,
}

#[tokio::main]
async fn main() {
let _ = tracing_subscriber::fmt::try_init();

let config = Config::parse();
let Config {
okx_config,
binance_config,
pyth_config,
package_id
} = config;
let okx_handle = tokio::spawn({
let package_id = package_id.clone();
async move {
let okx = Okx::new(okx_config).await;
okx.subscribe(package_id.as_str()).await;
}
}
);
let binance_handle = tokio::spawn({
let package_id = package_id.clone();
async move {
let binance = Binance::new(binance_config).await;
binance.subscribe(package_id.as_str()).await;
}
});
let pyth_handle = tokio::spawn({
let package_id = package_id.clone();
async move {
let pyth = Pyth::new(pyth_config).await;
pyth.subscribe(package_id.as_str()).await;
}
});

okx_handle.await.expect("okx error");
binance_handle.await.expect("binance error");
pyth_handle.await.expect("binance error")
}
Loading

0 comments on commit f67586c

Please sign in to comment.