Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: gas market #2621

Merged
merged 5 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
300 changes: 225 additions & 75 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ members = [
"crates/rooch-open-rpc-macros",
"crates/rooch-open-rpc-spec",
"crates/rooch-open-rpc-spec-builder",
"crates/rooch-oracle",
"crates/rooch-pipeline-processor",
"crates/rooch-proposer",
"crates/rooch-relayer",
Expand All @@ -62,7 +63,8 @@ default-members = [
"moveos/moveos",
"frameworks/framework-release",
"crates/rooch",
"crates/rooch-faucet"
"crates/rooch-faucet",
"crates/rooch-oracle"
]

# All workspace members should inherit these keys
Expand Down Expand Up @@ -169,6 +171,7 @@ ethers = { version = "2.0.7", features = ["legacy"] }
eyre = "0.6.8"
fastcrypto = { git = "https://github.com/MystenLabs/fastcrypto", rev = "56f6223b84ada922b6cb2c672c69db2ea3dc6a13" }
futures = "0.3.28"
futures-util = "0.3.30"
hex = "0.4.3"
rustc-hex = "2.1"
itertools = "0.13.0"
Expand Down Expand Up @@ -215,10 +218,12 @@ tiny-keccak = { version = "2", features = ["keccak", "sha3"] }
tiny-bip39 = "1.0.0"
tokio = { version = "1.40.0", features = ["full"] }
tokio-util = "0.7.12"
tokio-tungstenite = { version = "0.23.1", features = ["native-tls"] }
tonic = { version = "0.8", features = ["gzip"] }
tracing = "0.1.37"
tracing-appender = "0.2.2"
tracing-subscriber = { version = "0.3.15" }
tungstenite = "0.24.0"

codespan-reporting = "0.11.1"
codespan = "0.11.1"
Expand Down
23 changes: 23 additions & 0 deletions crates/rooch-oracle/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
name = "rooch-oracle"
version = "0.1.0"
edition = "2021"

[dependencies]
tokio = { workspace = true }
tokio-tungstenite = { workspace = true }
tungstenite = { workspace = true }
serde_json = { workspace = true }
futures-util = { workspace = true }
tracing-subscriber = { workspace = true }
log = { workspace = true }
clap = { workspace = true }
reqwest = { workspace = true }
anyhow = { workspace = true }
bcs = { workspace = true }

rooch-rpc-api = { workspace = true }
move-core-types = { workspace = true }
moveos-types = { workspace = true }
rooch-types = { workspace = true }
rooch-rpc-client = { workspace = true }
127 changes: 127 additions & 0 deletions crates/rooch-oracle/src/binance.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright (c) RoochNetwork
// SPDX-License-Identifier: Apache-2.0

use crate::data_process::{execute_transaction, parse_and_convert, subscribe_websocket, State};
use clap::Parser;
use log::info;
use move_core_types::account_address::AccountAddress;
use move_core_types::identifier::Identifier;
use move_core_types::language_storage::ModuleId;
use moveos_types::move_types::FunctionId;
use moveos_types::transaction::MoveAction;
use rooch_rpc_client::wallet_context::WalletContext;
use serde_json::Value;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{mpsc, RwLock};
use tokio::time::Instant;

#[derive(Parser, Debug, Clone)]
pub struct BinanceConfig {
#[arg(
long,
default_value = "wss://stream.binance.com:9443/ws/btcusdt@ticker"
)]
pub binance_url: String,

#[arg(
long,
env = "ROOCH_BINANCE_WALLET_DIR",
default_value = "~/.rooch/rooch_config"
)]
pub binance_wallet_dir: Option<PathBuf>,

#[arg(long, env = "ROOCH_BINANCE_WALLET_PWD")]
pub binance_wallet_pwd: Option<String>,

#[arg(long, default_value = "10")]
pub binance_submit_interval: u64,

#[arg(long, env = "ROOCH_BINANCE_ORACLE_ID")]
pub binance_oracle_id: String,

#[arg(long, env = "ROOCH_BINANCE_ADMIN_ID")]
pub binance_admin_id: String,
}

pub struct Binance {
pub wallet_state: Arc<RwLock<State>>,
binance_config: BinanceConfig,
}

impl Binance {
pub async fn new(config: BinanceConfig) -> Self {
let wallet = WalletContext::new(config.binance_wallet_dir.clone()).unwrap();
let wallet_pwd = config.binance_wallet_pwd.clone();
Self {
wallet_state: Arc::new(RwLock::new(State {
wallet_pwd,
context: wallet,
})),
binance_config: config,
}
}

pub async fn subscribe(&self, package_id: &str) {
let (tx, mut rx) = mpsc::channel(1);
let url = self.binance_config.binance_url.clone();
let handle = tokio::spawn(async move {
subscribe_websocket(url, tx, None).await;
});
let function_id = FunctionId::new(
ModuleId::new(
AccountAddress::from_hex_literal(package_id).unwrap(),
Identifier::new("trusted_oracle").unwrap(),
),
Identifier::new("submit_data").unwrap(),
);
let address_mapping = self
.wallet_state
.read()
.await
.context
.address_mapping
.clone();
let oracle_obj = parse_and_convert(
format!("object_id:{}", self.binance_config.binance_oracle_id).as_str(),
&address_mapping,
);
let ticker = parse_and_convert("string:BTCUSD", &address_mapping);
let identifier = parse_and_convert("string:Binance", &address_mapping);
let admin_obj = parse_and_convert(
format!("object_id:{}", self.binance_config.binance_admin_id).as_str(),
&address_mapping,
);
let mut last_execution = Instant::now() - Duration::from_secs(10); // 初始化为10秒前
while let Some(msg) = rx.recv().await {
let wallet_state = self.wallet_state.write().await;

let msg_value = serde_json::from_str::<Value>(&msg).unwrap();
if msg_value["c"].as_str().is_none()
|| Instant::now().duration_since(last_execution)
< Duration::from_secs(self.binance_config.binance_submit_interval)
{
continue;
}
last_execution = Instant::now();
let price = format!(
"u256:{}",
msg_value["c"].as_str().unwrap().parse::<f64>().unwrap() * 10f64.powi(8)
);
let decimal = "8u8".to_string();
let args = vec![
oracle_obj.clone(),
ticker.clone(),
parse_and_convert(price.as_str(), &address_mapping),
parse_and_convert(decimal.as_str(), &address_mapping),
identifier.clone(),
admin_obj.clone(),
];
let move_action = MoveAction::new_function_call(function_id.clone(), vec![], args);
let _ = execute_transaction(move_action, wallet_state).await;
info!("Received Binance price: {}", msg_value["c"]);
}
handle.await.expect("The task failed");
}
}
129 changes: 129 additions & 0 deletions crates/rooch-oracle/src/data_process.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright (c) RoochNetwork
// SPDX-License-Identifier: Apache-2.0

use anyhow::Result;
use futures_util::{SinkExt, StreamExt};
use log::{error, info, warn};
use move_core_types::account_address::AccountAddress;
use moveos_types::transaction::MoveAction;
use rooch_rpc_api::jsonrpc_types::KeptVMStatusView;
use rooch_rpc_client::wallet_context::WalletContext;
use rooch_types::address::RoochAddress;
use rooch_types::function_arg::FunctionArg;
use serde_json::Value;
use std::collections::BTreeMap;
use std::str::FromStr;
use std::time::Duration;
use tokio::sync::{mpsc, RwLockWriteGuard};
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message;

pub async fn subscribe_websocket(
url: String,
tx: mpsc::Sender<String>,
subscribe_msg: Option<Value>,
) {
loop {
let (ws_stream, _) = match connect_async(&url).await {
Ok(stream) => stream,
Err(e) => {
warn!("Failed to connect: {} error:{}", url, e);
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
}
};

info!("Connected to {}", url);

let (mut write, mut read) = ws_stream.split();

if subscribe_msg.is_some() {
if let Err(e) = write
.send(Message::Text(subscribe_msg.clone().unwrap().to_string()))
.await
{
warn!("Failed to send message: {}", e);
continue;
}
}

while let Some(message) = read.next().await {
match message {
Ok(msg) => {
if let Message::Text(text) = msg {
if let Err(e) = tx.send(text).await {
warn!("Failed to send message through channel: {}", e);
break;
}
}
}
Err(e) => {
warn!("Error: {}", e);
break;
}
}
}

warn!("Connection lost or error occurred, restarting...");
tokio::time::sleep(Duration::from_secs(5)).await;
}
}

pub async fn subscribe_http(url: String, tx: mpsc::Sender<Value>, interval: u64) {
loop {
match reqwest::get(&url).await {
Ok(response) => {
if let Ok(value) = response.json::<Value>().await {
if let Err(e) = tx.send(value).await {
warn!("Failed to send message through channel: {}", e);
}
}
}
Err(e) => {
warn!("Failed to fetch price: {}", e);
}
};

tokio::time::sleep(Duration::from_secs(interval)).await;
}
}

pub struct State {
pub(crate) wallet_pwd: Option<String>,
pub context: WalletContext,
}

#[allow(clippy::needless_lifetimes)]
pub async fn execute_transaction<'a>(
action: MoveAction,
state: RwLockWriteGuard<'a, State>,
) -> Result<()> {
let sender: RoochAddress = state.context.client_config.active_address.unwrap();
let pwd = state.wallet_pwd.clone();
let result = state
.context
.sign_and_execute(sender, action, pwd, None)
.await;
match result {
Ok(tx) => match tx.execution_info.status {
KeptVMStatusView::Executed => {
info!("Executed success tx_has: {}", tx.execution_info.tx_hash);
}
_ => {
error!("Transfer gases failed {:?}", tx.execution_info.status);
}
},
Err(e) => {
error!("Transfer gases failed {}", e);
}
};
Ok(())
}

pub fn parse_and_convert(arg: &str, address_mapping: &BTreeMap<String, AccountAddress>) -> Vec<u8> {
let mapping = |input: &str| -> Option<AccountAddress> { address_mapping.get(input).cloned() };
FunctionArg::from_str(arg)
.unwrap()
.into_bytes(&mapping)
.unwrap()
}
7 changes: 7 additions & 0 deletions crates/rooch-oracle/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// Copyright (c) RoochNetwork
// SPDX-License-Identifier: Apache-2.0

pub mod binance;
pub mod data_process;
pub mod okx;
pub mod pyth;
62 changes: 62 additions & 0 deletions crates/rooch-oracle/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) RoochNetwork
// SPDX-License-Identifier: Apache-2.0

use clap::Parser;
use rooch_oracle::binance::{Binance, BinanceConfig};
use rooch_oracle::okx::{Okx, OkxConfig};
use rooch_oracle::pyth::{Pyth, PythConfig};

#[derive(Parser, Clone)]
#[clap(
name = "Rooch Oracle",
about = "Oracle backend for BTC tokens price on Rooch",
rename_all = "kebab-case"
)]
pub struct Config {
#[clap(flatten)]
pub okx_config: OkxConfig,
#[clap(flatten)]
pub binance_config: BinanceConfig,
#[clap(flatten)]
pub pyth_config: PythConfig,
#[clap(short, long, env = "ROOCH_ORACLE_PACKAGE")]
pub package_id: String,
}

#[tokio::main]
async fn main() {
let _ = tracing_subscriber::fmt::try_init();

let config = Config::parse();
let Config {
okx_config,
binance_config,
pyth_config,
package_id,
} = config;
let okx_handle = tokio::spawn({
let package_id = package_id.clone();
async move {
let okx = Okx::new(okx_config).await;
okx.subscribe(package_id.as_str()).await;
}
});
let binance_handle = tokio::spawn({
let package_id = package_id.clone();
async move {
let binance = Binance::new(binance_config).await;
binance.subscribe(package_id.as_str()).await;
}
});
let pyth_handle = tokio::spawn({
let package_id = package_id.clone();
async move {
let pyth = Pyth::new(pyth_config).await;
pyth.subscribe(package_id.as_str()).await;
}
});

okx_handle.await.expect("okx error");
binance_handle.await.expect("binance error");
pyth_handle.await.expect("binance error")
}
Loading
Loading