Skip to content

Commit

Permalink
Draft payload dispersal
Browse files Browse the repository at this point in the history
Signed-off-by: litt3 <[email protected]>
  • Loading branch information
litt3 committed Jan 24, 2025
1 parent ce2ff6d commit 1ae4c59
Show file tree
Hide file tree
Showing 9 changed files with 419 additions and 56 deletions.
23 changes: 23 additions & 0 deletions api/clients/codecs/blob_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,26 @@ func GenericDecodeBlob(data []byte) ([]byte, error) {

return data, nil
}

// CreateCodec creates a new BlobCodec based on the defined polynomial form of payloads, and the desired
// BlobEncodingVersion
func CreateCodec(payloadPolynomialForm PolynomialForm, version BlobEncodingVersion) (BlobCodec, error) {
lowLevelCodec, err := BlobEncodingVersionToCodec(version)
if err != nil {
return nil, fmt.Errorf("create low level codec: %w", err)
}

switch payloadPolynomialForm {
case PolynomialFormCoeff:
// Data must NOT be IFFTed during blob construction, since the payload is already in PolynomialFormCoeff after
// being encoded.
return NewNoIFFTCodec(lowLevelCodec), nil
case PolynomialFormEval:
// Data MUST be IFFTed during blob construction, since the payload is in PolynomialFormEval after being encoded,
// but must be in PolynomialFormCoeff to produce a valid blob.
return NewIFFTCodec(lowLevelCodec), nil
default:
return nil, fmt.Errorf("unsupported polynomial form: %d", payloadPolynomialForm)
}

}
38 changes: 36 additions & 2 deletions api/clients/v2/mock/blob_verifier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 43 additions & 0 deletions api/clients/v2/payload_client_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package clients

import (
"time"

"github.com/Layr-Labs/eigenda/api/clients/codecs"
"github.com/Layr-Labs/eigenda/core"
v2 "github.com/Layr-Labs/eigenda/core/v2"
)

// PayloadClientConfig contains configuration values that are needed by both PayloadRetriever and PayloadDisperser
type PayloadClientConfig struct {
// The blob encoding version to use when writing and reading blobs
BlobEncodingVersion codecs.BlobEncodingVersion

// The Ethereum RPC URL to use for querying the Ethereum blockchain.
EthRpcUrl string

// The address of the EigenDABlobVerifier contract
EigenDABlobVerifierAddr string

// PayloadPolynomialForm is the initial form of a Payload after being encoded. The configured form does not imply
// any restrictions on the contents of a payload: it merely dictates how payload data is treated after being
// encoded.
//
// Since blobs sent to the disperser must be in coefficient form, the initial form of the encoded payload dictates
// what data processing must be performed during blob construction.
//
// The chosen form also dictates how the KZG commitment made to the blob can be used. If the encoded payload starts
// in PolynomialFormEval (meaning the data WILL be IFFTed before computing the commitment) then it will be possible
// to open points on the KZG commitment to prove that the field elements correspond to the commitment. If the
// encoded payload starts in PolynomialFormCoeff (meaning the data will NOT be IFFTed before computing the
// commitment) then it will not be possible to create a commitment opening: the blob will need to be supplied in its
// entirety to perform a verification that any part of the data matches the KZG commitment.
PayloadPolynomialForm codecs.PolynomialForm

// The timeout duration for contract calls
ContractCallTimeout time.Duration

BlobVersion v2.BlobVersion

Quorums []core.QuorumID
}
227 changes: 227 additions & 0 deletions api/clients/v2/payload_disperser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
package clients

import (
"context"
"fmt"
"time"

"github.com/Layr-Labs/eigenda/api/clients/codecs"
"github.com/Layr-Labs/eigenda/api/clients/v2/verification"
dispgrpc "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2"
"github.com/Layr-Labs/eigenda/common/geth"
core "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigensdk-go/logging"
gethcommon "github.com/ethereum/go-ethereum/common"
)

// PayloadDisperser provides the ability to disperse payloads to EigenDA
//
// This struct is goroutine safe.
type PayloadDisperser struct {
log logging.Logger
config *PayloadDisperserConfig
codec codecs.BlobCodec
disperserClient DisperserClient
blobVerifier verification.IBlobVerifier
}

// BuildPayloadDisperser builds a PayloadDisperser from config structs
func BuildPayloadDisperser(
log logging.Logger,
payloadDisperserConfig *PayloadDisperserConfig,
disperserClientConfig *DisperserClientConfig,
// signer to sign blob dispersal requests
signer core.BlobRequestSigner,
// prover is used to compute commitments to a new blob during the dispersal process
//
// IMPORTANT: it is permissible for the prover parameter to be nil, but operating with this configuration
// puts a trust assumption on the disperser. With a nil prover, the disperser is responsible for computing
// the commitments to a blob, and the PayloadDisperser doesn't have a mechanism to verify these commitments.
//
// TODO: In the future, an optimized method of commitment verification using fiat shamir transformation will
// be implemented. This feature will allow a PayloadDisperser to offload commitment generation onto the
// disperser, but the disperser's commitments will be verifiable without needing a full-fledged prover
prover encoding.Prover,
accountant *Accountant,
ethConfig geth.EthClientConfig,
) (*PayloadDisperser, error) {

codec, err := codecs.CreateCodec(
payloadDisperserConfig.PayloadPolynomialForm,
payloadDisperserConfig.BlobEncodingVersion)
if err != nil {
return nil, fmt.Errorf("create codec: %w", err)
}

disperserClient, err := NewDisperserClient(disperserClientConfig, signer, prover, accountant)
if err != nil {
return nil, fmt.Errorf("unable to create disperser client: %s", err)
}

ethClient, err := geth.NewClient(ethConfig, gethcommon.Address{}, 0, log)
if err != nil {
return nil, fmt.Errorf("new eth client: %w", err)
}

blobVerifier, err := verification.NewBlobVerifier(*ethClient, payloadDisperserConfig.EigenDABlobVerifierAddr)
if err != nil {
return nil, fmt.Errorf("new blob verifier: %w", err)
}

return &PayloadDisperser{
log: log,
config: payloadDisperserConfig,
codec: codec,
disperserClient: disperserClient,
blobVerifier: blobVerifier,
}, nil
}

// SendPayload executes the dispersal of a payload, with these basic steps:
//
// 1. Encode payload into a blob
// 2. Disperse the blob
// 3. Continually poll the disperser with GetBlobStatus, until a terminal status is reached, or the polling timeout
// is reached
// 4. Construct an EigenDACert if dispersal is successful
// 5. Verify the constructed cert with a call to an ethereum contract
// 6. Return the valid cert
func (pd *PayloadDisperser) SendPayload(
ctx context.Context,
payload []byte,
salt uint32,
) (*verification.EigenDACert, error) {

blobBytes, err := pd.codec.EncodeBlob(payload)
if err != nil {
return nil, fmt.Errorf("encode payload to blob: %w", err)
}

timeoutCtx, cancel := context.WithTimeout(ctx, pd.config.DisperseBlobTimeout)
defer cancel()
blobStatus, blobKey, err := pd.disperserClient.DisperseBlob(
timeoutCtx,
blobBytes,
pd.config.BlobVersion,
pd.config.Quorums,
salt)

if err != nil {
return nil, fmt.Errorf("disperse blob: %w", err)
}

timeoutCtx, cancel = context.WithTimeout(ctx, pd.config.BlobCertifiedTimeout)
defer cancel()

blobStatusReply, err := pd.pollBlobStatus(timeoutCtx, blobKey, blobStatus.ToProfobuf())
if err != nil {
return nil, fmt.Errorf("poll blob status: %w", err)
}

timeoutCtx, cancel = context.WithTimeout(ctx, pd.config.ContractCallTimeout)
defer cancel()
nonSignerStakesAndSignature, err := pd.blobVerifier.GetNonSignerStakesAndSignature(
timeoutCtx, blobStatusReply.GetSignedBatch())
if err != nil {
return nil, fmt.Errorf("get non signer stake and signature: %w", err)
}

eigenDACert, err := verification.BuildEigenDACert(
blobStatusReply.GetBlobInclusionInfo(),
blobStatusReply.GetSignedBatch().GetHeader(),
nonSignerStakesAndSignature)

if err != nil {
return nil, fmt.Errorf("build eigen da cert: %w", err)
}

err = pd.verifyCertWithTimeout(ctx, eigenDACert)
if err != nil {
return nil, fmt.Errorf("verify cert with timeout for blobKey %v: %w", blobKey, err)
}

return eigenDACert, nil
}

// Close is responsible for calling close on all internal clients. This method will do its best to close all internal
// clients, even if some closes fail.
//
// Any and all errors returned from closing internal clients will be joined and returned.
//
// This method should only be called once.
func (pd *PayloadDisperser) Close() error {
err := pd.disperserClient.Close()
if err != nil {
return fmt.Errorf("close disperser client: %w", err)
}

return nil
}

func (pd *PayloadDisperser) pollBlobStatus(
ctx context.Context,
blobKey core.BlobKey,
initialStatus dispgrpc.BlobStatus,
) (*dispgrpc.BlobStatusReply, error) {

previousStatus := initialStatus

ticker := time.NewTicker(pd.config.BlobStatusPollInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return nil, fmt.Errorf(
"timed out waiting for %v blob status, final status was %v: %w",
dispgrpc.BlobStatus_CERTIFIED,
previousStatus,
ctx.Err())
case <-ticker.C:
// This call to the disperser doesn't have a dedicated timeout configured.
// If this call fails to return in a timely fashion, the timeout configured for the poll loop will trigger
blobStatusReply, err := pd.disperserClient.GetBlobStatus(ctx, blobKey)

if err != nil {
pd.log.Warn("get blob status", "err", err, "blobKey", blobKey)
continue
}

newStatus := blobStatusReply.Status
if newStatus != previousStatus {
pd.log.Info(
"Blob status changed",
"blob key",
blobKey,
"previous status",
previousStatus,
"new status",
newStatus)
previousStatus = newStatus
}

switch newStatus {
case dispgrpc.BlobStatus_CERTIFIED:
return blobStatusReply, nil
case dispgrpc.BlobStatus_QUEUED, dispgrpc.BlobStatus_ENCODED:
continue
default:
return nil, fmt.Errorf("terminal dispersal failure for blobKey %v. blob status: %v", blobKey, newStatus)
}
}
}
}

// verifyCertWithTimeout verifies an EigenDACert by making a call to VerifyBlobV2.
//
// This method times out after the duration configured in PayloadDisperserConfig.ContractCallTimeout
func (pd *PayloadDisperser) verifyCertWithTimeout(
ctx context.Context,
eigenDACert *verification.EigenDACert,
) error {
timeoutCtx, cancel := context.WithTimeout(ctx, pd.config.ContractCallTimeout)
defer cancel()

return pd.blobVerifier.VerifyBlobV2(timeoutCtx, eigenDACert)
}
21 changes: 21 additions & 0 deletions api/clients/v2/payload_disperser_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package clients

import "time"

// PayloadDisperserConfig contains an embedded PayloadClientConfig, plus all additional configuration values needed
// by a PayloadDisperser
type PayloadDisperserConfig struct {
PayloadClientConfig

// DisperseBlobTimeout is the duration after which the PayloadDisperser will time out, when trying to disperse a
// blob
DisperseBlobTimeout time.Duration

// BlobCertifiedTimeout is the duration after which the PayloadDisperser will time out, while polling
// the disperser for blob status, waiting for BlobStatus_CERTIFIED
BlobCertifiedTimeout time.Duration

// BlobStatusPollInterval is the tick rate for the PayloadDisperser to use, while polling the disperser with
// GetBlobStatus.
BlobStatusPollInterval time.Duration
}
Loading

0 comments on commit 1ae4c59

Please sign in to comment.