Skip to content

Commit

Permalink
feat(eigen-client-m0-implementation): grafana metrics (#334)
Browse files Browse the repository at this point in the history
* initial commit

* initial commit

* fix ambiguous job name & add docker restart command

* fix integration test command

* update readme, remove fetching lambda repo

* reorganize readme instructions

* Fix concurrent dispatcher (#338)

* Add shutdown to dispatch batches

* Add JoinSet

* Format code

* Fix unbounded channel breaking authenticated dispersal

* Fix pr comments

* feat(eigen-client-m0-implementation): optimize concurrent dispatcher (#345)

* initial commit

* optimize dispatch_batches fn

* remove commented code

* remove needless variables

* optimize inclusion_poller fn

* break loop if dispatch fail

* remove client_lock variable

* switch to retriable err

* replace arbitrary value with config

---------

Co-authored-by: Gianbelinche <[email protected]>
  • Loading branch information
juan518munoz and gianbelinche authored Nov 19, 2024
1 parent 64d0a38 commit 4c5af89
Show file tree
Hide file tree
Showing 4 changed files with 259 additions and 209 deletions.
4 changes: 2 additions & 2 deletions core/node/da_clients/src/eigen/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use zksync_da_client::{
};

use super::{blob_info::BlobInfo, memstore::MemStore, sdk::RawEigenClient, Disperser};
use crate::utils::to_non_retriable_da_error;
use crate::utils::{to_non_retriable_da_error, to_retriable_da_error};

/// EigenClient is a client for the Eigen DA service.
/// It can be configured to use one of two dispersal methods:
Expand Down Expand Up @@ -58,7 +58,7 @@ impl DataAvailabilityClient for EigenClient {
Disperser::Remote(remote_disperser) => remote_disperser
.dispatch_blob(data)
.await
.map_err(to_non_retriable_da_error)?,
.map_err(to_retriable_da_error)?,
Disperser::Memory(memstore) => memstore
.clone()
.put_blob(data)
Expand Down
63 changes: 59 additions & 4 deletions core/node/da_clients/src/eigen/eigenda-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,27 @@ cargo install --path zkstack_cli/crates/zkstack --force --locked
zkstack containers --observability true
```

3. Create `eigen_da` chain
3. Temporary metrics setup (until `era-observabilty` changes are also merged)

a. Setup the observability container at least once so the `era-observability` directory is cloned.

```bash
zkstack containers --observability true
```

b. Add `lambda` remote to the `era-observability` project:

```bash
cd era-observability && git remote add lambda https://github.com/lambdaclass/era-observability.git
```

c. Fetch and checkout the `eigenda` branch:

```bash
git fetch lambda && git checkout eigenda
```

4. Create `eigen_da` chain

```bash
zkstack chain create \
Expand All @@ -91,7 +111,7 @@ zkstack chain create \
--set-as-default false
```

4. Initialize created ecosystem
5. Initialize created ecosystem

```bash
zkstack ecosystem init \
Expand All @@ -107,7 +127,42 @@ zkstack ecosystem init \

You may enable observability here if you want to.

5. Start the server
6. Setup grafana dashboard for Data Availability

a. Get the running port of the eigen_da chain in the `chains/eigen_da/configs/general.yaml` file:

```yaml
prometheus:
listener_port: 3414 # <- this is the port
```

(around line 108)

Then modify the `era-observability/etc/prometheus/prometheus.yml` with the retrieved port:

```yaml
- job_name: 'zksync'
scrape_interval: 5s
honor_labels: true
static_configs:
- targets: ['host.docker.internal:3312'] # <- change this to the port
```

b. Enable the Data Availability Grafana dashboard

```bash
mv era-observability/additional_dashboards/EigenDA.json era-observability/dashboards/EigenDA.json
```

c. Restart the era-observability container

```bash
docker ps --filter "label=com.docker.compose.project=era-observability" -q | xargs docker restart
```

(this can also be done through the docker dashboard)

7. Start the server

```bash
zkstack server --chain eigen_da
Expand All @@ -125,7 +180,7 @@ And with the server running on one terminal, you can run the server integration
following command:

```bash
zkstack dev test --chain eigen_da
zkstack dev test integration --chain eigen_da
```

## Mainnet/Testnet setup
Expand Down
79 changes: 44 additions & 35 deletions core/node/da_clients/src/eigen/sdk.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::{str::FromStr, time::Duration};
use std::{str::FromStr, sync::Arc, time::Duration};

use secp256k1::{ecdsa::RecoverableSignature, SecretKey};
use tokio::{sync::mpsc, time::Instant};
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
use tokio::{
sync::{mpsc, Mutex},
time::Instant,
};
use tokio_stream::{wrappers::UnboundedReceiverStream, StreamExt};
use tonic::{
transport::{Channel, ClientTlsConfig, Endpoint},
Streaming,
Expand All @@ -28,7 +31,7 @@ use crate::eigen::{

#[derive(Debug, Clone)]
pub(crate) struct RawEigenClient {
client: DisperserClient<Channel>,
client: Arc<Mutex<DisperserClient<Channel>>>,
private_key: SecretKey,
pub config: DisperserConfig,
verifier: Verifier,
Expand All @@ -37,14 +40,14 @@ pub(crate) struct RawEigenClient {
pub(crate) const DATA_CHUNK_SIZE: usize = 32;

impl RawEigenClient {
pub(crate) const BUFFER_SIZE: usize = 1000;

pub async fn new(private_key: SecretKey, config: DisperserConfig) -> anyhow::Result<Self> {
let endpoint =
Endpoint::from_str(config.disperser_rpc.as_str())?.tls_config(ClientTlsConfig::new())?;
let client = DisperserClient::connect(endpoint)
.await
.map_err(|e| anyhow::anyhow!("Failed to connect to Disperser server: {}", e))?;
let client = Arc::new(Mutex::new(
DisperserClient::connect(endpoint)
.await
.map_err(|e| anyhow::anyhow!("Failed to connect to Disperser server: {}", e))?,
));

let verifier_config = VerifierConfig {
verify_certs: true,
Expand Down Expand Up @@ -72,13 +75,16 @@ impl RawEigenClient {
account_id: String::default(), // Account Id is not used in non-authenticated mode
};

let mut client_clone = self.client.clone();
let disperse_reply = client_clone.disperse_blob(request).await?.into_inner();
let disperse_reply = self
.client
.lock()
.await
.disperse_blob(request)
.await?
.into_inner();

let disperse_time = Instant::now();
let blob_info = self
.await_for_inclusion(client_clone, disperse_reply)
.await?;
let blob_info = self.await_for_inclusion(disperse_reply).await?;
let disperse_elapsed = Instant::now() - disperse_time;

let blob_info = blob_info::BlobInfo::try_from(blob_info)
Expand Down Expand Up @@ -118,25 +124,29 @@ impl RawEigenClient {
}

async fn dispatch_blob_authenticated(&self, data: Vec<u8>) -> anyhow::Result<String> {
let mut client_clone = self.client.clone();
let (tx, rx) = mpsc::channel(Self::BUFFER_SIZE);
let (tx, rx) = mpsc::unbounded_channel();

let disperse_time = Instant::now();
let response_stream = client_clone.disperse_blob_authenticated(ReceiverStream::new(rx));
let padded_data = convert_by_padding_empty_byte(&data);

// 1. send DisperseBlobRequest
self.disperse_data(padded_data, &tx).await?;
let padded_data = convert_by_padding_empty_byte(&data);
self.disperse_data(padded_data, &tx)?;

// this await is blocked until the first response on the stream, so we only await after sending the `DisperseBlobRequest`
let mut response_stream = response_stream.await?.into_inner();
let mut response_stream = self
.client
.clone()
.lock()
.await
.disperse_blob_authenticated(UnboundedReceiverStream::new(rx))
.await?;
let response_stream = response_stream.get_mut();

// 2. receive BlobAuthHeader
let blob_auth_header = self.receive_blob_auth_header(&mut response_stream).await?;
let blob_auth_header = self.receive_blob_auth_header(response_stream).await?;

// 3. sign and send BlobAuthHeader
self.submit_authentication_data(blob_auth_header.clone(), &tx)
.await?;
self.submit_authentication_data(blob_auth_header.clone(), &tx)?;

// 4. receive DisperseBlobReply
let reply = response_stream
Expand All @@ -152,9 +162,7 @@ impl RawEigenClient {
};

// 5. poll for blob status until it reaches the Confirmed state
let blob_info = self
.await_for_inclusion(client_clone, disperse_reply)
.await?;
let blob_info = self.await_for_inclusion(disperse_reply).await?;

let blob_info = blob_info::BlobInfo::try_from(blob_info)
.map_err(|e| anyhow::anyhow!("Failed to convert blob info: {}", e))?;
Expand Down Expand Up @@ -183,10 +191,10 @@ impl RawEigenClient {
}
}

async fn disperse_data(
fn disperse_data(
&self,
data: Vec<u8>,
tx: &mpsc::Sender<disperser::AuthenticatedRequest>,
tx: &mpsc::UnboundedSender<disperser::AuthenticatedRequest>,
) -> anyhow::Result<()> {
let req = disperser::AuthenticatedRequest {
payload: Some(DisperseRequest(disperser::DisperseBlobRequest {
Expand All @@ -197,14 +205,13 @@ impl RawEigenClient {
};

tx.send(req)
.await
.map_err(|e| anyhow::anyhow!("Failed to send DisperseBlobRequest: {}", e))
}

async fn submit_authentication_data(
fn submit_authentication_data(
&self,
blob_auth_header: BlobAuthHeader,
tx: &mpsc::Sender<disperser::AuthenticatedRequest>,
tx: &mpsc::UnboundedSender<disperser::AuthenticatedRequest>,
) -> anyhow::Result<()> {
// TODO: replace challenge_parameter with actual auth header when it is available
let digest = zksync_basic_types::web3::keccak256(
Expand All @@ -228,7 +235,6 @@ impl RawEigenClient {
};

tx.send(req)
.await
.map_err(|e| anyhow::anyhow!("Failed to send AuthenticationData: {}", e))
}

Expand Down Expand Up @@ -258,7 +264,6 @@ impl RawEigenClient {

async fn await_for_inclusion(
&self,
mut client: DisperserClient<Channel>,
disperse_blob_reply: DisperseBlobReply,
) -> anyhow::Result<DisperserBlobInfo> {
let polling_request = disperser::BlobStatusRequest {
Expand All @@ -269,7 +274,10 @@ impl RawEigenClient {
while Instant::now() - start_time < Duration::from_millis(self.config.status_query_timeout)
{
tokio::time::sleep(Duration::from_millis(self.config.status_query_interval)).await;
let resp = client
let resp = self
.client
.lock()
.await
.get_blob_status(polling_request.clone())
.await?
.into_inner();
Expand Down Expand Up @@ -326,7 +334,8 @@ impl RawEigenClient {
.batch_header_hash;
let get_response = self
.client
.clone()
.lock()
.await
.retrieve_blob(disperser::RetrieveBlobRequest {
batch_header_hash,
blob_index,
Expand Down
Loading

0 comments on commit 4c5af89

Please sign in to comment.