diff --git a/cmd/hermes/cmd_eth_chains.go b/cmd/hermes/cmd_eth_chains.go index 026e8e4..dda4b56 100644 --- a/cmd/hermes/cmd_eth_chains.go +++ b/cmd/hermes/cmd_eth_chains.go @@ -22,7 +22,6 @@ func cmdEthChainsAction(c *cli.Context) error { chains := []string{ params.MainnetName, params.SepoliaName, - params.PraterName, params.HoleskyName, } diff --git a/eth/genesis.go b/eth/genesis.go index 80fe581..9cc4ed0 100644 --- a/eth/genesis.go +++ b/eth/genesis.go @@ -54,10 +54,6 @@ var GenesisConfigs = map[string]*GenesisConfig{ GenesisValidatorRoot: hexToBytes("d8ea171f3c94aea21ebc42a1ed61052acf3f9209c00e4efbaaddac09ed9b8078"), GenesisTime: time.Unix(1655733600, 0), }, - params.PraterName: { - GenesisValidatorRoot: hexToBytes("043db0d9a83813551ee2f33450d23797757d430911a9320530ad8a0eabc43efb"), - GenesisTime: time.Unix(1616508000, 0), // https://github.com/eth-clients/goerli - }, params.HoleskyName: { GenesisValidatorRoot: hexToBytes("9143aa7c615a7f7115e2b6aac319c03529df8242ae705fba9df39b79c59fa8b1"), GenesisTime: time.Unix(1695902400, 0), diff --git a/eth/network_config.go b/eth/network_config.go index 3844137..154289a 100644 --- a/eth/network_config.go +++ b/eth/network_config.go @@ -35,12 +35,6 @@ func DeriveKnownNetworkConfig(ctx context.Context, network string) (*NetworkConf Beacon: params.SepoliaConfig(), Network: defaultBeaconNetworkConfig, }, nil - case params.PraterName: - return &NetworkConfig{ - Genesis: GenesisConfigs[network], - Beacon: params.PraterConfig(), - Network: defaultBeaconNetworkConfig, - }, nil case params.HoleskyName: return &NetworkConfig{ Genesis: GenesisConfigs[network], diff --git a/eth/output_full.go b/eth/output_full.go new file mode 100644 index 0000000..8f5e528 --- /dev/null +++ b/eth/output_full.go @@ -0,0 +1,374 @@ +package eth + +import ( + "encoding/hex" + "fmt" + + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/probe-lab/hermes/host" + ssz "github.com/prysmaticlabs/fastssz" + ethtypes "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" +) + +type TraceEventPhase0Block struct { + host.TraceEventPayloadMetaData + Block *ethtypes.SignedBeaconBlock +} + +type TraceEventAltairBlock struct { + host.TraceEventPayloadMetaData + Block *ethtypes.SignedBeaconBlockAltair +} + +type TraceEventBellatrixBlock struct { + host.TraceEventPayloadMetaData + Block *ethtypes.SignedBeaconBlockBellatrix +} + +type TraceEventCapellaBlock struct { + host.TraceEventPayloadMetaData + Block *ethtypes.SignedBeaconBlockCapella +} + +type TraceEventDenebBlock struct { + host.TraceEventPayloadMetaData + Block *ethtypes.SignedBeaconBlockDeneb +} + +type TraceEventAttestation struct { + host.TraceEventPayloadMetaData + Attestation *ethtypes.Attestation +} + +type TraceEventSignedAggregateAttestationAndProof struct { + host.TraceEventPayloadMetaData + SignedAggregateAttestationAndProof *ethtypes.SignedAggregateAttestationAndProof +} + +type TraceEventSignedContributionAndProof struct { + host.TraceEventPayloadMetaData + SignedContributionAndProof *ethtypes.SignedContributionAndProof +} + +type TraceEventVoluntaryExit struct { + host.TraceEventPayloadMetaData + VoluntaryExit *ethtypes.VoluntaryExit +} + +type TraceEventSyncCommitteeMessage struct { + host.TraceEventPayloadMetaData + SyncCommitteeMessage *ethtypes.SyncCommitteeMessage +} + +type TraceEventBLSToExecutionChange struct { + host.TraceEventPayloadMetaData + BLSToExecutionChange *ethtypes.BLSToExecutionChange +} + +type TraceEventBlobSidecar struct { + host.TraceEventPayloadMetaData + BlobSidecar *ethtypes.BlobSidecar +} + +type TraceEventProposerSlashing struct { + host.TraceEventPayloadMetaData + ProposerSlashing *ethtypes.ProposerSlashing +} + +type TraceEventAttesterSlashing struct { + host.TraceEventPayloadMetaData + AttesterSlashing *ethtypes.AttesterSlashing +} + +// FullOutput is a renderer for full output. +type FullOutput struct { + cfg *PubSubConfig +} + +var _ host.DataStreamRenderer = (*FullOutput)(nil) + +// NewFullOutput creates a new instance of FullOutput. +func NewFullOutput(cfg *PubSubConfig) host.DataStreamRenderer { + return &FullOutput{cfg: cfg} +} + +// RenderPayload renders message into the destination. +func (t *FullOutput) RenderPayload(evt *host.TraceEvent, msg *pubsub.Message, dst ssz.Unmarshaler) (*host.TraceEvent, error) { + if t.cfg.Encoder == nil { + return nil, fmt.Errorf("no network encoding provided to raw output renderer") + } + + if err := t.cfg.Encoder.DecodeGossip(msg.Data, dst); err != nil { + return nil, fmt.Errorf("decode gossip message: %w", err) + } + + var ( + err error + payload any + ) + + switch d := dst.(type) { + case *ethtypes.SignedBeaconBlock: + payload, err = t.renderPhase0Block(msg, d) + case *ethtypes.SignedBeaconBlockAltair: + payload, err = t.renderAltairBlock(msg, d) + case *ethtypes.SignedBeaconBlockBellatrix: + payload, err = t.renderBellatrixBlock(msg, d) + case *ethtypes.SignedBeaconBlockCapella: + payload, err = t.renderCapellaBlock(msg, d) + case *ethtypes.SignedBeaconBlockDeneb: + payload, err = t.renderDenebBlock(msg, d) + case *ethtypes.Attestation: + payload, err = t.renderAttestation(msg, d) + case *ethtypes.SignedAggregateAttestationAndProof: + payload, err = t.renderAggregateAttestationAndProof(msg, d) + case *ethtypes.SignedContributionAndProof: + payload, err = t.renderContributionAndProof(msg, d) + case *ethtypes.VoluntaryExit: + payload, err = t.renderVoluntaryExit(msg, d) + case *ethtypes.SyncCommitteeMessage: + payload, err = t.renderSyncCommitteeMessage(msg, d) + case *ethtypes.BLSToExecutionChange: + payload, err = t.renderBLSToExecutionChange(msg, d) + case *ethtypes.BlobSidecar: + payload, err = t.renderBlobSidecar(msg, d) + case *ethtypes.ProposerSlashing: + payload, err = t.renderProposerSlashing(msg, d) + case *ethtypes.AttesterSlashing: + payload, err = t.renderAttesterSlashing(msg, d) + default: + return nil, fmt.Errorf("unsupported message type: %T", dst) + } + + if err != nil { + return nil, err + } + + evt.Payload = payload + + return evt, nil +} + +func (t *FullOutput) renderPhase0Block( + msg *pubsub.Message, + block *ethtypes.SignedBeaconBlock, +) (*TraceEventPhase0Block, error) { + return &TraceEventPhase0Block{ + TraceEventPayloadMetaData: host.TraceEventPayloadMetaData{ + PeerID: msg.ReceivedFrom.String(), + Topic: msg.GetTopic(), + Seq: msg.GetSeqno(), + MsgID: hex.EncodeToString([]byte(msg.ID)), + MsgSize: len(msg.Data), + }, + Block: block, + }, nil +} + +func (t *FullOutput) renderAltairBlock( + msg *pubsub.Message, + block *ethtypes.SignedBeaconBlockAltair, +) (*TraceEventAltairBlock, error) { + return &TraceEventAltairBlock{ + TraceEventPayloadMetaData: host.TraceEventPayloadMetaData{ + PeerID: msg.ReceivedFrom.String(), + Topic: msg.GetTopic(), + Seq: msg.GetSeqno(), + MsgID: hex.EncodeToString([]byte(msg.ID)), + MsgSize: len(msg.Data), + }, + Block: block, + }, nil +} + +func (t *FullOutput) renderBellatrixBlock( + msg *pubsub.Message, + block *ethtypes.SignedBeaconBlockBellatrix, +) (*TraceEventBellatrixBlock, error) { + return &TraceEventBellatrixBlock{ + TraceEventPayloadMetaData: host.TraceEventPayloadMetaData{ + PeerID: msg.ReceivedFrom.String(), + Topic: msg.GetTopic(), + Seq: msg.GetSeqno(), + MsgID: hex.EncodeToString([]byte(msg.ID)), + MsgSize: len(msg.Data), + }, + Block: block, + }, nil +} + +func (t *FullOutput) renderCapellaBlock( + msg *pubsub.Message, + block *ethtypes.SignedBeaconBlockCapella, +) (*TraceEventCapellaBlock, error) { + return &TraceEventCapellaBlock{ + TraceEventPayloadMetaData: host.TraceEventPayloadMetaData{ + PeerID: msg.ReceivedFrom.String(), + Topic: msg.GetTopic(), + Seq: msg.GetSeqno(), + MsgID: hex.EncodeToString([]byte(msg.ID)), + MsgSize: len(msg.Data), + }, + Block: block, + }, nil +} + +func (t *FullOutput) renderDenebBlock( + msg *pubsub.Message, + block *ethtypes.SignedBeaconBlockDeneb, +) (*TraceEventDenebBlock, error) { + return &TraceEventDenebBlock{ + TraceEventPayloadMetaData: host.TraceEventPayloadMetaData{ + PeerID: msg.ReceivedFrom.String(), + Topic: msg.GetTopic(), + Seq: msg.GetSeqno(), + MsgID: hex.EncodeToString([]byte(msg.ID)), + MsgSize: len(msg.Data), + }, + Block: block, + }, nil +} + +func (t *FullOutput) renderAttestation( + msg *pubsub.Message, + attestation *ethtypes.Attestation, +) (*TraceEventAttestation, error) { + return &TraceEventAttestation{ + TraceEventPayloadMetaData: host.TraceEventPayloadMetaData{ + PeerID: msg.ReceivedFrom.String(), + Topic: msg.GetTopic(), + Seq: msg.GetSeqno(), + MsgID: hex.EncodeToString([]byte(msg.ID)), + MsgSize: len(msg.Data), + }, + Attestation: attestation, + }, nil +} + +func (t *FullOutput) renderAggregateAttestationAndProof( + msg *pubsub.Message, + agg *ethtypes.SignedAggregateAttestationAndProof, +) (*TraceEventSignedAggregateAttestationAndProof, error) { + return &TraceEventSignedAggregateAttestationAndProof{ + TraceEventPayloadMetaData: host.TraceEventPayloadMetaData{ + PeerID: msg.ReceivedFrom.String(), + Topic: msg.GetTopic(), + Seq: msg.GetSeqno(), + MsgID: hex.EncodeToString([]byte(msg.ID)), + MsgSize: len(msg.Data), + }, + SignedAggregateAttestationAndProof: agg, + }, nil +} + +func (t *FullOutput) renderContributionAndProof( + msg *pubsub.Message, + cp *ethtypes.SignedContributionAndProof, +) (*TraceEventSignedContributionAndProof, error) { + return &TraceEventSignedContributionAndProof{ + TraceEventPayloadMetaData: host.TraceEventPayloadMetaData{ + PeerID: msg.ReceivedFrom.String(), + Topic: msg.GetTopic(), + Seq: msg.GetSeqno(), + MsgID: hex.EncodeToString([]byte(msg.ID)), + MsgSize: len(msg.Data), + }, + SignedContributionAndProof: cp, + }, nil +} + +func (t *FullOutput) renderVoluntaryExit( + msg *pubsub.Message, + ve *ethtypes.VoluntaryExit, +) (*TraceEventVoluntaryExit, error) { + return &TraceEventVoluntaryExit{ + TraceEventPayloadMetaData: host.TraceEventPayloadMetaData{ + PeerID: msg.ReceivedFrom.String(), + Topic: msg.GetTopic(), + Seq: msg.GetSeqno(), + MsgID: hex.EncodeToString([]byte(msg.ID)), + MsgSize: len(msg.Data), + }, + VoluntaryExit: ve, + }, nil +} + +func (t *FullOutput) renderSyncCommitteeMessage( + msg *pubsub.Message, + sc *ethtypes.SyncCommitteeMessage, +) (*TraceEventSyncCommitteeMessage, error) { + return &TraceEventSyncCommitteeMessage{ + TraceEventPayloadMetaData: host.TraceEventPayloadMetaData{ + PeerID: msg.ReceivedFrom.String(), + Topic: msg.GetTopic(), + Seq: msg.GetSeqno(), + MsgID: hex.EncodeToString([]byte(msg.ID)), + MsgSize: len(msg.Data), + }, + SyncCommitteeMessage: sc, + }, nil +} + +func (t *FullOutput) renderBLSToExecutionChange( + msg *pubsub.Message, + blsec *ethtypes.BLSToExecutionChange, +) (*TraceEventBLSToExecutionChange, error) { + return &TraceEventBLSToExecutionChange{ + TraceEventPayloadMetaData: host.TraceEventPayloadMetaData{ + PeerID: msg.ReceivedFrom.String(), + Topic: msg.GetTopic(), + Seq: msg.GetSeqno(), + MsgID: hex.EncodeToString([]byte(msg.ID)), + MsgSize: len(msg.Data), + }, + BLSToExecutionChange: blsec, + }, nil +} + +func (t *FullOutput) renderBlobSidecar( + msg *pubsub.Message, + blob *ethtypes.BlobSidecar, +) (*TraceEventBlobSidecar, error) { + return &TraceEventBlobSidecar{ + TraceEventPayloadMetaData: host.TraceEventPayloadMetaData{ + PeerID: msg.ReceivedFrom.String(), + Topic: msg.GetTopic(), + Seq: msg.GetSeqno(), + MsgID: hex.EncodeToString([]byte(msg.ID)), + MsgSize: len(msg.Data), + }, + BlobSidecar: blob, + }, nil +} + +func (t *FullOutput) renderProposerSlashing( + msg *pubsub.Message, + ps *ethtypes.ProposerSlashing, +) (*TraceEventProposerSlashing, error) { + return &TraceEventProposerSlashing{ + TraceEventPayloadMetaData: host.TraceEventPayloadMetaData{ + PeerID: msg.ReceivedFrom.String(), + Topic: msg.GetTopic(), + Seq: msg.GetSeqno(), + MsgID: hex.EncodeToString([]byte(msg.ID)), + MsgSize: len(msg.Data), + }, + ProposerSlashing: ps, + }, nil +} + +func (t *FullOutput) renderAttesterSlashing( + msg *pubsub.Message, + as *ethtypes.AttesterSlashing, +) (*TraceEventAttesterSlashing, error) { + return &TraceEventAttesterSlashing{ + TraceEventPayloadMetaData: host.TraceEventPayloadMetaData{ + PeerID: msg.ReceivedFrom.String(), + Topic: msg.GetTopic(), + Seq: msg.GetSeqno(), + MsgID: hex.EncodeToString([]byte(msg.ID)), + MsgSize: len(msg.Data), + }, + AttesterSlashing: as, + }, nil +} diff --git a/eth/output_full_test.go b/eth/output_full_test.go new file mode 100644 index 0000000..429aa9b --- /dev/null +++ b/eth/output_full_test.go @@ -0,0 +1,233 @@ +package eth + +import ( + "encoding/hex" + "testing" + + pubsub "github.com/libp2p/go-libp2p-pubsub" + pb "github.com/libp2p/go-libp2p-pubsub/pb" + "github.com/libp2p/go-libp2p/core/peer" + ethtypes "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" + "github.com/stretchr/testify/require" +) + +func TestFullOutputRenderMethods(t *testing.T) { + var ( + renderer = &FullOutput{} + topic = "test-topic" + baseMsg = &pubsub.Message{ + Message: &pb.Message{ + From: []byte("peer-id2"), + Data: []byte{0xAA, 0xBB, 0xCC}, + Seqno: []byte{0x01, 0x02}, + Topic: &topic, + }, + ID: "msg-id", + ReceivedFrom: peer.ID("peer-id1"), + } + ) + + cases := []struct { + name string + payload any + expectedType any + render func(msg *pubsub.Message, payload any) (any, error) + }{ + { + name: "renderPhase0Block", + payload: ðtypes.SignedBeaconBlock{}, + expectedType: &TraceEventPhase0Block{}, + render: func(msg *pubsub.Message, payload any) (any, error) { + return renderer.renderPhase0Block(msg, payload.(*ethtypes.SignedBeaconBlock)) + }, + }, + { + name: "renderAltairBlock", + payload: ðtypes.SignedBeaconBlockAltair{}, + expectedType: &TraceEventAltairBlock{}, + render: func(msg *pubsub.Message, payload any) (any, error) { + return renderer.renderAltairBlock(msg, payload.(*ethtypes.SignedBeaconBlockAltair)) + }, + }, + { + name: "renderCapellaBlock", + payload: ðtypes.SignedBeaconBlockCapella{}, + expectedType: &TraceEventCapellaBlock{}, + render: func(msg *pubsub.Message, payload any) (any, error) { + return renderer.renderCapellaBlock(msg, payload.(*ethtypes.SignedBeaconBlockCapella)) + }, + }, + { + name: "renderDenebBlock", + payload: ðtypes.SignedBeaconBlockDeneb{}, + expectedType: &TraceEventDenebBlock{}, + render: func(msg *pubsub.Message, payload any) (any, error) { + return renderer.renderDenebBlock(msg, payload.(*ethtypes.SignedBeaconBlockDeneb)) + }, + }, + { + name: "renderAttestation", + payload: ðtypes.Attestation{}, + expectedType: &TraceEventAttestation{}, + render: func(msg *pubsub.Message, payload any) (any, error) { + return renderer.renderAttestation(msg, payload.(*ethtypes.Attestation)) + }, + }, + { + name: "renderAggregateAttestationAndProof", + payload: ðtypes.SignedAggregateAttestationAndProof{}, + expectedType: &TraceEventSignedAggregateAttestationAndProof{}, + render: func(msg *pubsub.Message, payload any) (any, error) { + return renderer.renderAggregateAttestationAndProof(msg, payload.(*ethtypes.SignedAggregateAttestationAndProof)) + }, + }, + { + name: "renderContributionAndProof", + payload: ðtypes.SignedContributionAndProof{}, + expectedType: &TraceEventSignedContributionAndProof{}, + render: func(msg *pubsub.Message, payload any) (any, error) { + return renderer.renderContributionAndProof(msg, payload.(*ethtypes.SignedContributionAndProof)) + }, + }, + { + name: "renderVoluntaryExit", + payload: ðtypes.VoluntaryExit{}, + expectedType: &TraceEventVoluntaryExit{}, + render: func(msg *pubsub.Message, payload any) (any, error) { + return renderer.renderVoluntaryExit(msg, payload.(*ethtypes.VoluntaryExit)) + }, + }, + { + name: "renderSyncCommitteeMessage", + payload: ðtypes.SyncCommitteeMessage{}, + expectedType: &TraceEventSyncCommitteeMessage{}, + render: func(msg *pubsub.Message, payload any) (any, error) { + return renderer.renderSyncCommitteeMessage(msg, payload.(*ethtypes.SyncCommitteeMessage)) + }, + }, + { + name: "renderBLSToExecutionChange", + payload: ðtypes.BLSToExecutionChange{}, + expectedType: &TraceEventBLSToExecutionChange{}, + render: func(msg *pubsub.Message, payload any) (any, error) { + return renderer.renderBLSToExecutionChange(msg, payload.(*ethtypes.BLSToExecutionChange)) + }, + }, + { + name: "renderBlobSidecar", + payload: ðtypes.BlobSidecar{}, + expectedType: &TraceEventBlobSidecar{}, + render: func(msg *pubsub.Message, payload any) (any, error) { + return renderer.renderBlobSidecar(msg, payload.(*ethtypes.BlobSidecar)) + }, + }, + { + name: "renderProposerSlashing", + payload: ðtypes.ProposerSlashing{}, + expectedType: &TraceEventProposerSlashing{}, + render: func(msg *pubsub.Message, payload any) (any, error) { + return renderer.renderProposerSlashing(msg, payload.(*ethtypes.ProposerSlashing)) + }, + }, + { + name: "renderAttesterSlashing", + payload: ðtypes.AttesterSlashing{}, + expectedType: &TraceEventAttesterSlashing{}, + render: func(msg *pubsub.Message, payload any) (any, error) { + return renderer.renderAttesterSlashing(msg, payload.(*ethtypes.AttesterSlashing)) + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + result, err := tc.render(baseMsg, tc.payload) + require.NoError(t, err) + require.NotNil(t, result) + require.IsType(t, tc.expectedType, result) + + switch typedResult := result.(type) { + case *TraceEventPhase0Block: + require.Equal(t, baseMsg.ReceivedFrom.String(), typedResult.PeerID) + require.Equal(t, baseMsg.GetTopic(), typedResult.Topic) + require.Equal(t, hex.EncodeToString(baseMsg.GetSeqno()), hex.EncodeToString(typedResult.Seq)) + require.Equal(t, len(baseMsg.GetData()), typedResult.MsgSize) + require.Equal(t, tc.payload.(*ethtypes.SignedBeaconBlock), typedResult.Block) + case *TraceEventAltairBlock: + require.Equal(t, baseMsg.ReceivedFrom.String(), typedResult.PeerID) + require.Equal(t, baseMsg.GetTopic(), typedResult.Topic) + require.Equal(t, hex.EncodeToString(baseMsg.GetSeqno()), hex.EncodeToString(typedResult.Seq)) + require.Equal(t, len(baseMsg.GetData()), typedResult.MsgSize) + require.Equal(t, tc.payload.(*ethtypes.SignedBeaconBlockAltair), typedResult.Block) + case *TraceEventCapellaBlock: + require.Equal(t, baseMsg.ReceivedFrom.String(), typedResult.PeerID) + require.Equal(t, baseMsg.GetTopic(), typedResult.Topic) + require.Equal(t, hex.EncodeToString(baseMsg.GetSeqno()), hex.EncodeToString(typedResult.Seq)) + require.Equal(t, len(baseMsg.GetData()), typedResult.MsgSize) + require.Equal(t, tc.payload.(*ethtypes.SignedBeaconBlockCapella), typedResult.Block) + case *TraceEventDenebBlock: + require.Equal(t, baseMsg.ReceivedFrom.String(), typedResult.PeerID) + require.Equal(t, baseMsg.GetTopic(), typedResult.Topic) + require.Equal(t, hex.EncodeToString(baseMsg.GetSeqno()), hex.EncodeToString(typedResult.Seq)) + require.Equal(t, len(baseMsg.GetData()), typedResult.MsgSize) + require.Equal(t, tc.payload.(*ethtypes.SignedBeaconBlockDeneb), typedResult.Block) + case *TraceEventAttestation: + require.Equal(t, baseMsg.ReceivedFrom.String(), typedResult.PeerID) + require.Equal(t, baseMsg.GetTopic(), typedResult.Topic) + require.Equal(t, hex.EncodeToString(baseMsg.GetSeqno()), hex.EncodeToString(typedResult.Seq)) + require.Equal(t, len(baseMsg.GetData()), typedResult.MsgSize) + require.Equal(t, tc.payload.(*ethtypes.Attestation), typedResult.Attestation) + case *TraceEventSignedAggregateAttestationAndProof: + require.Equal(t, baseMsg.ReceivedFrom.String(), typedResult.PeerID) + require.Equal(t, baseMsg.GetTopic(), typedResult.Topic) + require.Equal(t, hex.EncodeToString(baseMsg.GetSeqno()), hex.EncodeToString(typedResult.Seq)) + require.Equal(t, len(baseMsg.GetData()), typedResult.MsgSize) + require.Equal(t, tc.payload.(*ethtypes.SignedAggregateAttestationAndProof), typedResult.SignedAggregateAttestationAndProof) + case *TraceEventSignedContributionAndProof: + require.Equal(t, baseMsg.ReceivedFrom.String(), typedResult.PeerID) + require.Equal(t, baseMsg.GetTopic(), typedResult.Topic) + require.Equal(t, hex.EncodeToString(baseMsg.GetSeqno()), hex.EncodeToString(typedResult.Seq)) + require.Equal(t, len(baseMsg.GetData()), typedResult.MsgSize) + require.Equal(t, tc.payload.(*ethtypes.SignedContributionAndProof), typedResult.SignedContributionAndProof) + case *TraceEventVoluntaryExit: + require.Equal(t, baseMsg.ReceivedFrom.String(), typedResult.PeerID) + require.Equal(t, baseMsg.GetTopic(), typedResult.Topic) + require.Equal(t, hex.EncodeToString(baseMsg.GetSeqno()), hex.EncodeToString(typedResult.Seq)) + require.Equal(t, len(baseMsg.GetData()), typedResult.MsgSize) + require.Equal(t, tc.payload.(*ethtypes.VoluntaryExit), typedResult.VoluntaryExit) + case *TraceEventSyncCommitteeMessage: + require.Equal(t, baseMsg.ReceivedFrom.String(), typedResult.PeerID) + require.Equal(t, baseMsg.GetTopic(), typedResult.Topic) + require.Equal(t, hex.EncodeToString(baseMsg.GetSeqno()), hex.EncodeToString(typedResult.Seq)) + require.Equal(t, len(baseMsg.GetData()), typedResult.MsgSize) + require.Equal(t, tc.payload.(*ethtypes.SyncCommitteeMessage), typedResult.SyncCommitteeMessage) + case *TraceEventBLSToExecutionChange: + require.Equal(t, baseMsg.ReceivedFrom.String(), typedResult.PeerID) + require.Equal(t, baseMsg.GetTopic(), typedResult.Topic) + require.Equal(t, hex.EncodeToString(baseMsg.GetSeqno()), hex.EncodeToString(typedResult.Seq)) + require.Equal(t, len(baseMsg.GetData()), typedResult.MsgSize) + require.Equal(t, tc.payload.(*ethtypes.BLSToExecutionChange), typedResult.BLSToExecutionChange) + case *TraceEventBlobSidecar: + require.Equal(t, baseMsg.ReceivedFrom.String(), typedResult.PeerID) + require.Equal(t, baseMsg.GetTopic(), typedResult.Topic) + require.Equal(t, hex.EncodeToString(baseMsg.GetSeqno()), hex.EncodeToString(typedResult.Seq)) + require.Equal(t, len(baseMsg.GetData()), typedResult.MsgSize) + require.Equal(t, tc.payload.(*ethtypes.BlobSidecar), typedResult.BlobSidecar) + case *TraceEventProposerSlashing: + require.Equal(t, baseMsg.ReceivedFrom.String(), typedResult.PeerID) + require.Equal(t, baseMsg.GetTopic(), typedResult.Topic) + require.Equal(t, hex.EncodeToString(baseMsg.GetSeqno()), hex.EncodeToString(typedResult.Seq)) + require.Equal(t, len(baseMsg.GetData()), typedResult.MsgSize) + require.Equal(t, tc.payload.(*ethtypes.ProposerSlashing), typedResult.ProposerSlashing) + case *TraceEventAttesterSlashing: + require.Equal(t, baseMsg.ReceivedFrom.String(), typedResult.PeerID) + require.Equal(t, baseMsg.GetTopic(), typedResult.Topic) + require.Equal(t, hex.EncodeToString(baseMsg.GetSeqno()), hex.EncodeToString(typedResult.Seq)) + require.Equal(t, len(baseMsg.GetData()), typedResult.MsgSize) + require.Equal(t, tc.payload.(*ethtypes.AttesterSlashing), typedResult.AttesterSlashing) + default: + t.Fatalf("unexpected result type: %T", result) + } + }) + } +} diff --git a/eth/output_kinesis.go b/eth/output_kinesis.go new file mode 100644 index 0000000..e1e6a35 --- /dev/null +++ b/eth/output_kinesis.go @@ -0,0 +1,352 @@ +package eth + +import ( + "encoding/hex" + "fmt" + "time" + + "github.com/ethereum/go-ethereum/common/hexutil" + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/probe-lab/hermes/host" + ssz "github.com/prysmaticlabs/fastssz" + ethtypes "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" +) + +// KinesisOutput is a renderer for Kinesis output. +type KinesisOutput struct { + cfg *PubSubConfig +} + +var _ host.DataStreamRenderer = (*KinesisOutput)(nil) + +// NewKinesisOutput creates a new instance of KinesisOutput. +func NewKinesisOutput(cfg *PubSubConfig) host.DataStreamRenderer { + return &KinesisOutput{cfg: cfg} +} + +// RenderPayload renders message into the destination. +func (k *KinesisOutput) RenderPayload(evt *host.TraceEvent, msg *pubsub.Message, dst ssz.Unmarshaler) (*host.TraceEvent, error) { + if k.cfg.Encoder == nil { + return nil, fmt.Errorf("no network encoding provided to kinenis output renderer") + } + + if err := k.cfg.Encoder.DecodeGossip(msg.Data, dst); err != nil { + return nil, fmt.Errorf("decode gossip message: %w", err) + } + + var ( + err error + payload map[string]any + ) + + switch d := dst.(type) { + case *ethtypes.SignedBeaconBlock: + payload, err = k.renderPhase0Block(msg, d) + case *ethtypes.SignedBeaconBlockAltair: + payload, err = k.renderAltairBlock(msg, d) + case *ethtypes.SignedBeaconBlockBellatrix: + payload, err = k.renderBellatrixBlock(msg, d) + case *ethtypes.SignedBeaconBlockCapella: + payload, err = k.renderCapellaBlock(msg, d) + case *ethtypes.SignedBeaconBlockDeneb: + payload, err = k.renderDenebBlock(msg, d) + case *ethtypes.Attestation: + payload, err = k.renderAttestation(msg, d) + case *ethtypes.SignedAggregateAttestationAndProof: + payload, err = k.renderAggregateAttestationAndProof(msg, d) + case *ethtypes.SignedContributionAndProof: + payload, err = k.renderContributionAndProof(msg, d) + case *ethtypes.VoluntaryExit: + payload, err = k.renderVoluntaryExit(msg, d) + case *ethtypes.SyncCommitteeMessage: + payload, err = k.renderSyncCommitteeMessage(msg, d) + case *ethtypes.BLSToExecutionChange: + payload, err = k.renderBLSToExecutionChange(msg, d) + case *ethtypes.BlobSidecar: + payload, err = k.renderBlobSidecar(msg, d) + case *ethtypes.ProposerSlashing: + payload, err = k.renderProposerSlashing(msg, d) + case *ethtypes.AttesterSlashing: + payload, err = k.renderAttesterSlashing(msg, d) + default: + return nil, fmt.Errorf("unsupported message type: %T", dst) + } + + if err != nil { + return nil, err + } + + evt.Payload = payload + + return evt, nil +} + +func (k *KinesisOutput) renderPhase0Block( + msg *pubsub.Message, + block *ethtypes.SignedBeaconBlock, +) (map[string]any, error) { + root, err := block.GetBlock().HashTreeRoot() + if err != nil { + return nil, fmt.Errorf("failed to determine block hash tree root: %w", err) + } + + return map[string]any{ + "PeerID": msg.ReceivedFrom.String(), + "Topic": msg.GetTopic(), + "Seq": msg.GetSeqno(), + "MsgID": hex.EncodeToString([]byte(msg.ID)), + "MsgSize": len(msg.Data), + "Slot": block.GetBlock().GetSlot(), + "Root": root, + "ValIdx": block.GetBlock().GetProposerIndex(), + "TimeInSlot": k.cfg.GenesisTime.Add(time.Duration(block.GetBlock().GetSlot()) * k.cfg.SecondsPerSlot), + }, nil +} + +func (k *KinesisOutput) renderAltairBlock( + msg *pubsub.Message, + block *ethtypes.SignedBeaconBlockAltair, +) (map[string]any, error) { + root, err := block.GetBlock().HashTreeRoot() + if err != nil { + return nil, fmt.Errorf("failed to determine block hash tree root: %w", err) + } + + return map[string]any{ + "PeerID": msg.ReceivedFrom.String(), + "Topic": msg.GetTopic(), + "Seq": msg.GetSeqno(), + "MsgID": hex.EncodeToString([]byte(msg.ID)), + "MsgSize": len(msg.Data), + "Slot": block.GetBlock().GetSlot(), + "Root": root, + "ValIdx": block.GetBlock().GetProposerIndex(), + "TimeInSlot": k.cfg.GenesisTime.Add(time.Duration(block.GetBlock().GetSlot()) * k.cfg.SecondsPerSlot), + }, nil +} + +func (k *KinesisOutput) renderBellatrixBlock( + msg *pubsub.Message, + block *ethtypes.SignedBeaconBlockBellatrix, +) (map[string]any, error) { + root, err := block.GetBlock().HashTreeRoot() + if err != nil { + return nil, fmt.Errorf("failed to determine block hash tree root: %w", err) + } + + return map[string]any{ + "PeerID": msg.ReceivedFrom.String(), + "Topic": msg.GetTopic(), + "Seq": msg.GetSeqno(), + "MsgID": hex.EncodeToString([]byte(msg.ID)), + "MsgSize": len(msg.Data), + "Slot": block.GetBlock().GetSlot(), + "Root": root, + "ValIdx": block.GetBlock().GetProposerIndex(), + "TimeInSlot": k.cfg.GenesisTime.Add(time.Duration(block.GetBlock().GetSlot()) * k.cfg.SecondsPerSlot), + }, nil +} + +func (k *KinesisOutput) renderCapellaBlock( + msg *pubsub.Message, + block *ethtypes.SignedBeaconBlockCapella, +) (map[string]any, error) { + root, err := block.GetBlock().HashTreeRoot() + if err != nil { + return nil, fmt.Errorf("failed to determine block hash tree root: %w", err) + } + + return map[string]any{ + "PeerID": msg.ReceivedFrom.String(), + "Topic": msg.GetTopic(), + "Seq": msg.GetSeqno(), + "MsgID": hex.EncodeToString([]byte(msg.ID)), + "MsgSize": len(msg.Data), + "Slot": block.GetBlock().GetSlot(), + "Root": root, + "ValIdx": block.GetBlock().GetProposerIndex(), + "TimeInSlot": k.cfg.GenesisTime.Add(time.Duration(block.GetBlock().GetSlot()) * k.cfg.SecondsPerSlot), + }, nil +} + +func (k *KinesisOutput) renderDenebBlock( + msg *pubsub.Message, + block *ethtypes.SignedBeaconBlockDeneb, +) (map[string]any, error) { + root, err := block.GetBlock().HashTreeRoot() + if err != nil { + return nil, fmt.Errorf("failed to determine block hash tree root: %w", err) + } + + return map[string]any{ + "PeerID": msg.ReceivedFrom.String(), + "Topic": msg.GetTopic(), + "Seq": msg.GetSeqno(), + "MsgID": hex.EncodeToString([]byte(msg.ID)), + "MsgSize": len(msg.Data), + "Slot": block.GetBlock().GetSlot(), + "Root": root, + "ValIdx": block.GetBlock().GetProposerIndex(), + "TimeInSlot": k.cfg.GenesisTime.Add(time.Duration(block.GetBlock().GetSlot()) * k.cfg.SecondsPerSlot), + }, nil +} + +func (k *KinesisOutput) renderAttestation( + msg *pubsub.Message, + attestation *ethtypes.Attestation, +) (map[string]any, error) { + payload := map[string]any{ + "PeerID": msg.ReceivedFrom.String(), + "Topic": msg.GetTopic(), + "Seq": msg.GetSeqno(), + "MsgID": hex.EncodeToString([]byte(msg.ID)), + "MsgSize": len(msg.Data), + "CommIdx": attestation.GetData().GetCommitteeIndex(), + "Slot": attestation.GetData().GetSlot(), + "BeaconBlockRoot": attestation.GetData().GetBeaconBlockRoot(), + "Source": attestation.GetData().GetSource(), + "Target": attestation.GetData().GetTarget(), + } + + // If the attestation only has one aggregation bit set, we can add a field to the payload that denotes _which_ + // aggregation bit is set. This is required to determine which validator created the attestation. In the + // pursuit of reducing the amount of data stored in the data stream we omit this field if the attestation is + // aggregated. + if attestation.GetAggregationBits().Count() == 1 { + payload["AggregatePos"] = attestation.AggregationBits.BitIndices()[0] + } + + return payload, nil +} + +func (k *KinesisOutput) renderAggregateAttestationAndProof( + msg *pubsub.Message, + agg *ethtypes.SignedAggregateAttestationAndProof, +) (map[string]any, error) { + return map[string]any{ + "PeerID": msg.ReceivedFrom.String(), + "Topic": msg.GetTopic(), + "Seq": msg.GetSeqno(), + "MsgID": hex.EncodeToString([]byte(msg.ID)), + "MsgSize": len(msg.Data), + "Sig": hexutil.Encode(agg.GetSignature()), + "AggIdx": agg.GetMessage().GetAggregatorIndex(), + "SelectionProof": hexutil.Encode(agg.GetMessage().GetSelectionProof()), + }, nil +} + +func (k *KinesisOutput) renderContributionAndProof( + msg *pubsub.Message, + cp *ethtypes.SignedContributionAndProof, +) (map[string]any, error) { + return map[string]any{ + "PeerID": msg.ReceivedFrom.String(), + "Topic": msg.GetTopic(), + "Seq": msg.GetSeqno(), + "MsgID": hex.EncodeToString([]byte(msg.ID)), + "MsgSize": len(msg.Data), + "Sig": hexutil.Encode(cp.GetSignature()), + "AggIdx": cp.GetMessage().GetAggregatorIndex(), + "Contrib_Slot": cp.GetMessage().GetContribution().GetSlot(), + "Contrib_SubCommitteeIdx": cp.GetMessage().GetContribution().GetSubcommitteeIndex(), + "Contrib_BlockRoot": cp.GetMessage().GetContribution().GetBlockRoot(), + }, nil +} + +func (k *KinesisOutput) renderVoluntaryExit( + msg *pubsub.Message, + ve *ethtypes.VoluntaryExit, +) (map[string]any, error) { + return map[string]any{ + "PeerID": msg.ReceivedFrom.String(), + "Topic": msg.GetTopic(), + "Seq": msg.GetSeqno(), + "MsgID": hex.EncodeToString([]byte(msg.ID)), + "MsgSize": len(msg.Data), + "Epoch": ve.GetEpoch(), + "ValIdx": ve.GetValidatorIndex(), + }, nil +} + +func (k *KinesisOutput) renderSyncCommitteeMessage( + msg *pubsub.Message, + sc *ethtypes.SyncCommitteeMessage, +) (map[string]any, error) { + return map[string]any{ + "PeerID": msg.ReceivedFrom.String(), + "MsgID": hex.EncodeToString([]byte(msg.ID)), + "MsgSize": len(msg.Data), + "Topic": msg.GetTopic(), + "Seq": msg.GetSeqno(), + "Slot": sc.GetSlot(), + "ValIdx": sc.GetValidatorIndex(), + "BlockRoot": hexutil.Encode(sc.GetBlockRoot()), + "Signature": hexutil.Encode(sc.GetSignature()), + }, nil +} + +func (k *KinesisOutput) renderBLSToExecutionChange( + msg *pubsub.Message, + blsec *ethtypes.BLSToExecutionChange, +) (map[string]any, error) { + return map[string]any{ + "PeerID": msg.ReceivedFrom.String(), + "MsgID": hex.EncodeToString([]byte(msg.ID)), + "MsgSize": len(msg.Data), + "Topic": msg.GetTopic(), + "Seq": msg.GetSeqno(), + "ValIdx": blsec.GetValidatorIndex(), + "FromBlsPubkey": hexutil.Encode(blsec.GetFromBlsPubkey()), + "ToExecutionAddress": hexutil.Encode(blsec.GetToExecutionAddress()), + }, nil +} + +func (k *KinesisOutput) renderBlobSidecar(msg *pubsub.Message, blob *ethtypes.BlobSidecar) (map[string]any, error) { + return map[string]any{ + "PeerID": msg.ReceivedFrom.String(), + "MsgID": hex.EncodeToString([]byte(msg.ID)), + "MsgSize": len(msg.Data), + "Topic": msg.GetTopic(), + "Seq": msg.GetSeqno(), + "Slot": blob.GetSignedBlockHeader().GetHeader().GetSlot(), + "ValIdx": blob.GetSignedBlockHeader().GetHeader().GetProposerIndex(), + "index": blob.GetIndex(), + "StateRoot": hexutil.Encode(blob.GetSignedBlockHeader().GetHeader().GetStateRoot()), + "BodyRoot": hexutil.Encode(blob.GetSignedBlockHeader().GetHeader().GetBodyRoot()), + "ParentRoot": hexutil.Encode(blob.GetSignedBlockHeader().GetHeader().GetParentRoot()), + }, nil +} + +func (k *KinesisOutput) renderProposerSlashing( + msg *pubsub.Message, + ps *ethtypes.ProposerSlashing, +) (map[string]any, error) { + return map[string]any{ + "PeerID": msg.ReceivedFrom.String(), + "MsgID": hex.EncodeToString([]byte(msg.ID)), + "MsgSize": len(msg.Data), + "Topic": msg.GetTopic(), + "Seq": msg.GetSeqno(), + "Header1_Slot": ps.GetHeader_1().GetHeader().GetSlot(), + "Header1_ProposerIndex": ps.GetHeader_1().GetHeader().GetProposerIndex(), + "Header1_StateRoot": hexutil.Encode(ps.GetHeader_1().GetHeader().GetStateRoot()), + "Header2_Slot": ps.GetHeader_2().GetHeader().GetSlot(), + "Header2_ProposerIndex": ps.GetHeader_2().GetHeader().GetProposerIndex(), + "Header2_StateRoot": hexutil.Encode(ps.GetHeader_2().GetHeader().GetStateRoot()), + }, nil +} + +func (k *KinesisOutput) renderAttesterSlashing( + msg *pubsub.Message, + as *ethtypes.AttesterSlashing, +) (map[string]any, error) { + return map[string]any{ + "PeerID": msg.ReceivedFrom.String(), + "MsgID": hex.EncodeToString([]byte(msg.ID)), + "MsgSize": len(msg.Data), + "Topic": msg.GetTopic(), + "Seq": msg.GetSeqno(), + "Att1_indices": as.GetAttestation_1().GetAttestingIndices(), + "Att2_indices": as.GetAttestation_2().GetAttestingIndices(), + }, nil +} diff --git a/eth/output_kinesis_test.go b/eth/output_kinesis_test.go new file mode 100644 index 0000000..9773ea9 --- /dev/null +++ b/eth/output_kinesis_test.go @@ -0,0 +1,492 @@ +package eth + +import ( + "crypto/rand" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common/hexutil" + pubsub "github.com/libp2p/go-libp2p-pubsub" + pb "github.com/libp2p/go-libp2p-pubsub/pb" + "github.com/libp2p/go-libp2p/core/peer" + v1 "github.com/prysmaticlabs/prysm/v5/proto/engine/v1" + ethtypes "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" + "github.com/stretchr/testify/require" +) + +func TestKinesisOutputRenderMethods(t *testing.T) { + var ( + topic = "test-topic" + renderer = &KinesisOutput{ + cfg: &PubSubConfig{ + GenesisTime: time.Unix(0, 0), + SecondsPerSlot: time.Duration(12) * time.Second, + }, + } + baseMsg = &pubsub.Message{ + Message: &pb.Message{ + From: []byte("peer-id2"), + Data: []byte{0xAA, 0xBB, 0xCC}, + Seqno: []byte{0x01, 0x02}, + Topic: &topic, + }, + ID: "msg-id", + ReceivedFrom: peer.ID("peer-id1"), + } + commonExpected = map[string]any{ + "PeerID": peer.ID("peer-id1").String(), + "Topic": topic, + "MsgID": "msg-id", + "MsgSize": 3, + } + ) + + cases := []struct { + name string + payload any + expected map[string]any + render func(msg *pubsub.Message, payload any) (map[string]any, error) + }{ + { + name: "renderPhase0Block", + payload: ðtypes.SignedBeaconBlock{ + Block: mockBeaconBlock(), + }, + expected: commonExpected, + render: func(msg *pubsub.Message, payload any) (map[string]any, error) { + return renderer.renderPhase0Block(msg, payload.(*ethtypes.SignedBeaconBlock)) + }, + }, + { + name: "renderAltairBlock", + payload: ðtypes.SignedBeaconBlockAltair{ + Block: mockBeaconBlockAltair(), + }, + expected: commonExpected, + render: func(msg *pubsub.Message, payload any) (map[string]any, error) { + return renderer.renderAltairBlock(msg, payload.(*ethtypes.SignedBeaconBlockAltair)) + }, + }, + { + name: "renderBellatrixBlock", + payload: ðtypes.SignedBeaconBlockBellatrix{ + Block: mockBeaconBlockBellatrix(), + }, + expected: commonExpected, + render: func(msg *pubsub.Message, payload any) (map[string]any, error) { + return renderer.renderBellatrixBlock(msg, payload.(*ethtypes.SignedBeaconBlockBellatrix)) + }, + }, + { + name: "renderCapellaBlock", + payload: ðtypes.SignedBeaconBlockCapella{ + Block: mockBeaconBlockCapella(), + }, + expected: commonExpected, + render: func(msg *pubsub.Message, payload any) (map[string]any, error) { + return renderer.renderCapellaBlock(msg, payload.(*ethtypes.SignedBeaconBlockCapella)) + }, + }, + { + name: "renderDenebBlock", + payload: ðtypes.SignedBeaconBlockDeneb{ + Block: mockBeaconBlockDeneb(), + }, + expected: commonExpected, + render: func(msg *pubsub.Message, payload any) (map[string]any, error) { + return renderer.renderDenebBlock(msg, payload.(*ethtypes.SignedBeaconBlockDeneb)) + }, + }, + { + name: "renderAttestation", + payload: ðtypes.Attestation{ + Data: ðtypes.AttestationData{ + Slot: 1, + CommitteeIndex: 2, + BeaconBlockRoot: genMockBytes(32), + Source: ðtypes.Checkpoint{}, + Target: ðtypes.Checkpoint{}, + }, + }, + expected: commonExpected, + render: func(msg *pubsub.Message, payload any) (map[string]any, error) { + return renderer.renderAttestation(msg, payload.(*ethtypes.Attestation)) + }, + }, + { + name: "renderAggregateAttestationAndProof", + payload: ðtypes.SignedAggregateAttestationAndProof{ + Signature: genMockBytes(96), + Message: ðtypes.AggregateAttestationAndProof{ + AggregatorIndex: 1, + SelectionProof: genMockBytes(96), + }, + }, + expected: commonExpected, + render: func(msg *pubsub.Message, payload any) (map[string]any, error) { + return renderer.renderAggregateAttestationAndProof(msg, payload.(*ethtypes.SignedAggregateAttestationAndProof)) + }, + }, + { + name: "renderContributionAndProof", + payload: ðtypes.SignedContributionAndProof{ + Signature: genMockBytes(96), + Message: ðtypes.ContributionAndProof{ + AggregatorIndex: 1, + SelectionProof: genMockBytes(96), + Contribution: ðtypes.SyncCommitteeContribution{ + Slot: 1, + SubcommitteeIndex: 2, + BlockRoot: genMockBytes(32), + }, + }, + }, + expected: commonExpected, + render: func(msg *pubsub.Message, payload any) (map[string]any, error) { + return renderer.renderContributionAndProof(msg, payload.(*ethtypes.SignedContributionAndProof)) + }, + }, + { + name: "renderVoluntaryExit", + payload: ðtypes.VoluntaryExit{ + Epoch: 1, + ValidatorIndex: 2, + }, + expected: commonExpected, + render: func(msg *pubsub.Message, payload any) (map[string]any, error) { + return renderer.renderVoluntaryExit(msg, payload.(*ethtypes.VoluntaryExit)) + }, + }, + { + name: "renderSyncCommitteeMessage", + payload: ðtypes.SyncCommitteeMessage{ + Slot: 1, + ValidatorIndex: 2, + BlockRoot: genMockBytes(32), + Signature: genMockBytes(96), + }, + expected: commonExpected, + render: func(msg *pubsub.Message, payload any) (map[string]any, error) { + return renderer.renderSyncCommitteeMessage(msg, payload.(*ethtypes.SyncCommitteeMessage)) + }, + }, + { + name: "renderBLSToExecutionChange", + payload: ðtypes.BLSToExecutionChange{ + ValidatorIndex: 2, + FromBlsPubkey: genMockBytes(48), + ToExecutionAddress: genMockBytes(20), + }, + expected: commonExpected, + render: func(msg *pubsub.Message, payload any) (map[string]any, error) { + return renderer.renderBLSToExecutionChange(msg, payload.(*ethtypes.BLSToExecutionChange)) + }, + }, + { + name: "renderBlobSidecar", + payload: ðtypes.BlobSidecar{ + SignedBlockHeader: ðtypes.SignedBeaconBlockHeader{ + Header: ðtypes.BeaconBlockHeader{ + Slot: 1, + ProposerIndex: 2, + StateRoot: genMockBytes(32), + BodyRoot: genMockBytes(32), + ParentRoot: genMockBytes(32), + }, + }, + Index: 1, + }, + expected: commonExpected, + render: func(msg *pubsub.Message, payload any) (map[string]any, error) { + return renderer.renderBlobSidecar(msg, payload.(*ethtypes.BlobSidecar)) + }, + }, + { + name: "renderProposerSlashing", + payload: ðtypes.ProposerSlashing{ + Header_1: ðtypes.SignedBeaconBlockHeader{ + Header: ðtypes.BeaconBlockHeader{ + Slot: 1, + ProposerIndex: 2, + StateRoot: genMockBytes(32), + }, + }, + Header_2: ðtypes.SignedBeaconBlockHeader{ + Header: ðtypes.BeaconBlockHeader{ + Slot: 1, + ProposerIndex: 2, + StateRoot: genMockBytes(32), + }, + }, + }, + expected: commonExpected, + render: func(msg *pubsub.Message, payload any) (map[string]any, error) { + return renderer.renderProposerSlashing(msg, payload.(*ethtypes.ProposerSlashing)) + }, + }, + { + + name: "renderProposerSlashing", + payload: ðtypes.AttesterSlashing{ + Attestation_1: ðtypes.IndexedAttestation{ + AttestingIndices: []uint64{1, 2, 3}, + }, + Attestation_2: ðtypes.IndexedAttestation{ + AttestingIndices: []uint64{4, 5, 6}, + }, + }, + expected: commonExpected, + render: func(msg *pubsub.Message, payload any) (map[string]any, error) { + return renderer.renderAttesterSlashing(msg, payload.(*ethtypes.AttesterSlashing)) + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + result, err := tc.render(baseMsg, tc.payload) + require.NoError(t, err) + require.NotNil(t, result) + require.Equal(t, tc.expected["PeerID"], result["PeerID"]) + require.Equal(t, tc.expected["Topic"], result["Topic"]) + require.Equal(t, tc.expected["MsgSize"], result["MsgSize"]) + + switch typedResult := tc.payload.(type) { + case *ethtypes.SignedBeaconBlock: + root, err := typedResult.GetBlock().HashTreeRoot() + if err != nil { + t.Fatalf("failed to determine block hash tree root: %v", err) + } + + require.Equal(t, typedResult.GetBlock().GetSlot(), result["Slot"]) + require.Equal(t, typedResult.GetBlock().GetProposerIndex(), result["ValIdx"]) + require.Equal(t, root, result["Root"]) + require.Equal(t, renderer.cfg.GenesisTime.Add(time.Duration(typedResult.GetBlock().GetSlot())*renderer.cfg.SecondsPerSlot), result["TimeInSlot"]) + case *ethtypes.SignedBeaconBlockAltair: + root, err := typedResult.GetBlock().HashTreeRoot() + if err != nil { + t.Fatalf("failed to determine block hash tree root: %v", err) + } + + require.Equal(t, typedResult.GetBlock().GetSlot(), result["Slot"]) + require.Equal(t, typedResult.GetBlock().GetProposerIndex(), result["ValIdx"]) + require.Equal(t, root, result["Root"]) + require.Equal(t, renderer.cfg.GenesisTime.Add(time.Duration(typedResult.GetBlock().GetSlot())*renderer.cfg.SecondsPerSlot), result["TimeInSlot"]) + case *ethtypes.SignedBeaconBlockBellatrix: + root, err := typedResult.GetBlock().HashTreeRoot() + if err != nil { + t.Fatalf("failed to determine block hash tree root: %v", err) + } + + require.Equal(t, typedResult.GetBlock().GetSlot(), result["Slot"]) + require.Equal(t, typedResult.GetBlock().GetProposerIndex(), result["ValIdx"]) + require.Equal(t, root, result["Root"]) + require.Equal(t, renderer.cfg.GenesisTime.Add(time.Duration(typedResult.GetBlock().GetSlot())*renderer.cfg.SecondsPerSlot), result["TimeInSlot"]) + case *ethtypes.SignedBeaconBlockCapella: + root, err := typedResult.GetBlock().HashTreeRoot() + if err != nil { + t.Fatalf("failed to determine block hash tree root: %v", err) + } + + require.Equal(t, typedResult.GetBlock().GetSlot(), result["Slot"]) + require.Equal(t, typedResult.GetBlock().GetProposerIndex(), result["ValIdx"]) + require.Equal(t, root, result["Root"]) + require.Equal(t, renderer.cfg.GenesisTime.Add(time.Duration(typedResult.GetBlock().GetSlot())*renderer.cfg.SecondsPerSlot), result["TimeInSlot"]) + case *ethtypes.SignedBeaconBlockDeneb: + root, err := typedResult.GetBlock().HashTreeRoot() + if err != nil { + t.Fatalf("failed to determine block hash tree root: %v", err) + } + + require.Equal(t, typedResult.GetBlock().GetSlot(), result["Slot"]) + require.Equal(t, typedResult.GetBlock().GetProposerIndex(), result["ValIdx"]) + require.Equal(t, root, result["Root"]) + require.Equal(t, renderer.cfg.GenesisTime.Add(time.Duration(typedResult.GetBlock().GetSlot())*renderer.cfg.SecondsPerSlot), result["TimeInSlot"]) + case *ethtypes.Attestation: + require.Equal(t, typedResult.GetData().GetSlot(), result["Slot"]) + require.Equal(t, typedResult.GetData().GetCommitteeIndex(), result["CommIdx"]) + require.Equal(t, typedResult.GetData().GetBeaconBlockRoot(), result["BeaconBlockRoot"]) + require.Equal(t, typedResult.GetData().GetSource(), result["Source"]) + require.Equal(t, typedResult.GetData().GetTarget(), result["Target"]) + case *ethtypes.SignedAggregateAttestationAndProof: + require.Equal(t, typedResult.GetMessage().GetAggregatorIndex(), result["AggIdx"]) + require.Equal(t, hexutil.Encode(typedResult.GetMessage().GetSelectionProof()), result["SelectionProof"]) + require.Equal(t, hexutil.Encode(typedResult.GetSignature()), result["Sig"]) + case *ethtypes.SignedContributionAndProof: + require.Equal(t, typedResult.GetMessage().GetAggregatorIndex(), result["AggIdx"]) + require.Equal(t, hexutil.Encode(typedResult.GetSignature()), result["Sig"]) + require.Equal(t, typedResult.GetMessage().GetContribution().GetSlot(), result["Contrib_Slot"]) + require.Equal(t, typedResult.GetMessage().GetContribution().GetSubcommitteeIndex(), result["Contrib_SubCommitteeIdx"]) + require.Equal(t, typedResult.GetMessage().GetContribution().GetBlockRoot(), result["Contrib_BlockRoot"]) + case *ethtypes.VoluntaryExit: + require.Equal(t, typedResult.GetEpoch(), result["Epoch"]) + require.Equal(t, typedResult.GetValidatorIndex(), result["ValIdx"]) + case *ethtypes.SyncCommitteeMessage: + require.Equal(t, typedResult.GetSlot(), result["Slot"]) + require.Equal(t, typedResult.GetValidatorIndex(), result["ValIdx"]) + require.Equal(t, hexutil.Encode(typedResult.GetBlockRoot()), result["BlockRoot"]) + require.Equal(t, hexutil.Encode(typedResult.GetSignature()), result["Signature"]) + case *ethtypes.BLSToExecutionChange: + require.Equal(t, typedResult.GetValidatorIndex(), result["ValIdx"]) + require.Equal(t, hexutil.Encode(typedResult.GetFromBlsPubkey()), result["FromBlsPubkey"]) + require.Equal(t, hexutil.Encode(typedResult.GetToExecutionAddress()), result["ToExecutionAddress"]) + case *ethtypes.BlobSidecar: + require.Equal(t, typedResult.GetSignedBlockHeader().GetHeader().GetSlot(), result["Slot"]) + require.Equal(t, typedResult.GetSignedBlockHeader().GetHeader().GetProposerIndex(), result["ValIdx"]) + require.Equal(t, typedResult.GetIndex(), result["index"]) + require.Equal(t, hexutil.Encode(typedResult.GetSignedBlockHeader().GetHeader().GetStateRoot()), result["StateRoot"]) + require.Equal(t, hexutil.Encode(typedResult.GetSignedBlockHeader().GetHeader().GetBodyRoot()), result["BodyRoot"]) + require.Equal(t, hexutil.Encode(typedResult.GetSignedBlockHeader().GetHeader().GetParentRoot()), result["ParentRoot"]) + case *ethtypes.ProposerSlashing: + require.Equal(t, typedResult.GetHeader_1().GetHeader().GetSlot(), result["Header1_Slot"]) + require.Equal(t, typedResult.GetHeader_1().GetHeader().GetProposerIndex(), result["Header1_ProposerIndex"]) + require.Equal(t, hexutil.Encode(typedResult.GetHeader_1().GetHeader().GetStateRoot()), result["Header1_StateRoot"]) + require.Equal(t, typedResult.GetHeader_2().GetHeader().GetSlot(), result["Header2_Slot"]) + require.Equal(t, typedResult.GetHeader_2().GetHeader().GetProposerIndex(), result["Header2_ProposerIndex"]) + require.Equal(t, hexutil.Encode(typedResult.GetHeader_2().GetHeader().GetStateRoot()), result["Header2_StateRoot"]) + case *ethtypes.AttesterSlashing: + require.Equal(t, typedResult.GetAttestation_1().GetAttestingIndices(), result["Att1_indices"]) + require.Equal(t, typedResult.GetAttestation_2().GetAttestingIndices(), result["Att2_indices"]) + default: + t.Fatalf("unexpected result type: %T", result) + } + }) + } +} + +func mockBeaconBlock() *ethtypes.BeaconBlock { + return ðtypes.BeaconBlock{ + Slot: 1, + ProposerIndex: 2, + ParentRoot: genMockBytes(32), + StateRoot: genMockBytes(32), + Body: ðtypes.BeaconBlockBody{ + RandaoReveal: genMockBytes(96), + Eth1Data: ðtypes.Eth1Data{ + DepositRoot: genMockBytes(32), + BlockHash: genMockBytes(32), + }, + Graffiti: genMockBytes(32), + }, + } +} + +func mockBeaconBlockAltair() *ethtypes.BeaconBlockAltair { + return ðtypes.BeaconBlockAltair{ + Slot: 1, + ProposerIndex: 2, + ParentRoot: genMockBytes(32), + StateRoot: genMockBytes(32), + Body: ðtypes.BeaconBlockBodyAltair{ + RandaoReveal: genMockBytes(96), + Eth1Data: ðtypes.Eth1Data{ + DepositRoot: genMockBytes(32), + BlockHash: genMockBytes(32), + }, + Graffiti: genMockBytes(32), + SyncAggregate: ðtypes.SyncAggregate{ + SyncCommitteeBits: genMockBytes(64), + SyncCommitteeSignature: genMockBytes(96), + }, + }, + } +} + +func mockBeaconBlockBellatrix() *ethtypes.BeaconBlockBellatrix { + return ðtypes.BeaconBlockBellatrix{ + Slot: 1, + ProposerIndex: 2, + ParentRoot: genMockBytes(32), + StateRoot: genMockBytes(32), + Body: ðtypes.BeaconBlockBodyBellatrix{ + RandaoReveal: genMockBytes(96), + Eth1Data: ðtypes.Eth1Data{ + DepositRoot: genMockBytes(32), + BlockHash: genMockBytes(32), + }, + Graffiti: genMockBytes(32), + SyncAggregate: ðtypes.SyncAggregate{ + SyncCommitteeBits: genMockBytes(64), + SyncCommitteeSignature: genMockBytes(96), + }, + ExecutionPayload: &v1.ExecutionPayload{ + ParentHash: genMockBytes(32), + BlockHash: genMockBytes(32), + FeeRecipient: genMockBytes(20), + StateRoot: genMockBytes(32), + ReceiptsRoot: genMockBytes(32), + LogsBloom: genMockBytes(256), + PrevRandao: genMockBytes(32), + BaseFeePerGas: genMockBytes(32), + }, + }, + } +} + +func mockBeaconBlockCapella() *ethtypes.BeaconBlockCapella { + return ðtypes.BeaconBlockCapella{ + Slot: 1, + ProposerIndex: 2, + ParentRoot: genMockBytes(32), + StateRoot: genMockBytes(32), + Body: ðtypes.BeaconBlockBodyCapella{ + RandaoReveal: genMockBytes(96), + Eth1Data: ðtypes.Eth1Data{ + DepositRoot: genMockBytes(32), + BlockHash: genMockBytes(32), + }, + Graffiti: genMockBytes(32), + SyncAggregate: ðtypes.SyncAggregate{ + SyncCommitteeBits: genMockBytes(64), + SyncCommitteeSignature: genMockBytes(96), + }, + ExecutionPayload: &v1.ExecutionPayloadCapella{ + ParentHash: genMockBytes(32), + BlockHash: genMockBytes(32), + FeeRecipient: genMockBytes(20), + StateRoot: genMockBytes(32), + ReceiptsRoot: genMockBytes(32), + LogsBloom: genMockBytes(256), + PrevRandao: genMockBytes(32), + BaseFeePerGas: genMockBytes(32), + }, + }, + } +} + +func mockBeaconBlockDeneb() *ethtypes.BeaconBlockDeneb { + return ðtypes.BeaconBlockDeneb{ + Slot: 1, + ProposerIndex: 2, + ParentRoot: genMockBytes(32), + StateRoot: genMockBytes(32), + Body: ðtypes.BeaconBlockBodyDeneb{ + RandaoReveal: genMockBytes(96), + Eth1Data: ðtypes.Eth1Data{ + DepositRoot: genMockBytes(32), + BlockHash: genMockBytes(32), + }, + Graffiti: genMockBytes(32), + SyncAggregate: ðtypes.SyncAggregate{ + SyncCommitteeBits: genMockBytes(64), + SyncCommitteeSignature: genMockBytes(96), + }, + ExecutionPayload: &v1.ExecutionPayloadDeneb{ + ParentHash: genMockBytes(32), + BlockHash: genMockBytes(32), + FeeRecipient: genMockBytes(20), + StateRoot: genMockBytes(32), + ReceiptsRoot: genMockBytes(32), + LogsBloom: genMockBytes(256), + PrevRandao: genMockBytes(32), + BaseFeePerGas: genMockBytes(32), + }, + }, + } +} + +func genMockBytes(size int) []byte { + root := make([]byte, size) + _, _ = rand.Read(root) + return root +} diff --git a/eth/prysm.go b/eth/prysm.go index 88b0659..addd8d6 100644 --- a/eth/prysm.go +++ b/eth/prysm.go @@ -44,7 +44,8 @@ type PrysmClient struct { func NewPrysmClient(host string, portHTTP int, portGRPC int, timeout time.Duration, genesis *GenesisConfig) (*PrysmClient, error) { tracer := otel.GetTracerProvider().Tracer("prysm_client") - conn, err := grpc.Dial(fmt.Sprintf("%s:%d", host, portGRPC), + conn, err := grpc.NewClient( + fmt.Sprintf("%s:%d", host, portGRPC), grpc.WithTransportCredentials(insecure.NewCredentials()), ) if err != nil { diff --git a/eth/pubsub.go b/eth/pubsub.go index dcf9ccc..6b52bd8 100644 --- a/eth/pubsub.go +++ b/eth/pubsub.go @@ -2,19 +2,17 @@ package eth import ( "context" - "encoding/hex" "fmt" "log/slog" "strings" "time" - "github.com/ethereum/go-ethereum/common/hexutil" pubsub "github.com/libp2p/go-libp2p-pubsub" pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb" "github.com/libp2p/go-libp2p/core/peer" + ssz "github.com/prysmaticlabs/fastssz" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/encoder" - "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" ethtypes "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" "github.com/thejerf/suture/v4" @@ -57,6 +55,7 @@ type PubSub struct { host *host.Host cfg *PubSubConfig gs *pubsub.PubSub + dsr host.DataStreamRenderer } func NewPubSub(h *host.Host, cfg *PubSubConfig) (*PubSub, error) { @@ -64,12 +63,20 @@ func NewPubSub(h *host.Host, cfg *PubSubConfig) (*PubSub, error) { return nil, fmt.Errorf("validate configuration: %w", err) } - ps := &PubSub{ - host: h, - cfg: cfg, + var dsr host.DataStreamRenderer + + switch cfg.DataStream.OutputType() { + case host.DataStreamOutputTypeFull: + dsr = NewFullOutput(cfg) + default: + dsr = NewKinesisOutput(cfg) } - return ps, nil + return &PubSub{ + host: h, + cfg: cfg, + dsr: dsr, + }, nil } func (p *PubSub) Serve(ctx context.Context) error { @@ -122,7 +129,7 @@ func (p *PubSub) mapPubSubTopicWithHandlers(topic string) host.TopicHandler { case strings.Contains(topic, p2p.GossipProposerSlashingMessage): return p.handleProposerSlashingMessage case strings.Contains(topic, p2p.GossipContributionAndProofMessage): - return p.handleContributtionAndProofMessage + return p.handleContributionAndProofMessage case strings.Contains(topic, p2p.GossipSyncCommitteeMessage): return p.handleSyncCommitteeMessage case strings.Contains(topic, p2p.GossipBlsToExecutionChangeMessage): @@ -152,120 +159,110 @@ func (n *Node) FilterIncomingSubscriptions(id peer.ID, subs []*pubsubpb.RPC_SubO } func (p *PubSub) handleBeaconBlock(ctx context.Context, msg *pubsub.Message) error { - now := time.Now() + var ( + err error + evt = &host.TraceEvent{ + Type: eventTypeHandleMessage, + Topic: msg.GetTopic(), + PeerID: p.host.ID(), + Timestamp: time.Now(), + } + block ssz.Unmarshaler + ) - genericBlock, root, err := p.getSignedBeaconBlockForForkDigest(msg.Data) - if err != nil { - return err + switch p.cfg.ForkVersion { + case Phase0ForkVersion: + block = ðtypes.SignedBeaconBlock{} + case AltairForkVersion: + block = ðtypes.SignedBeaconBlockAltair{} + case BellatrixForkVersion: + block = ðtypes.SignedBeaconBlockBellatrix{} + case CapellaForkVersion: + block = ðtypes.SignedBeaconBlockCapella{} + case DenebForkVersion: + block = ðtypes.SignedBeaconBlockDeneb{} + default: + return fmt.Errorf("handleBeaconBlock(): unrecognized fork-version: %s", p.cfg.ForkVersion.String()) } - slot := genericBlock.GetSlot() - ProposerIndex := genericBlock.GetProposerIndex() - - slotStart := p.cfg.GenesisTime.Add(time.Duration(slot) * p.cfg.SecondsPerSlot) + evt, err = p.dsr.RenderPayload(evt, msg, block) + if err != nil { + slog.Warn( + "failed rendering topic handler event", "topic", msg.GetTopic(), "err", tele.LogAttrError(err), + ) - evt := &host.TraceEvent{ - Type: eventTypeHandleMessage, - PeerID: p.host.ID(), - Timestamp: now, - Payload: map[string]any{ - "PeerID": msg.ReceivedFrom.String(), - "MsgID": hex.EncodeToString([]byte(msg.ID)), - "MsgSize": len(msg.Data), - "Topic": msg.GetTopic(), - "Seq": msg.GetSeqno(), - "ValIdx": ProposerIndex, - "Slot": slot, - "Root": root, - "TimeInSlot": now.Sub(slotStart).Seconds(), - }, + return nil } if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { - slog.Warn("failed putting topic handler event", tele.LogAttrError(err)) + slog.Warn( + "failed putting topic handler event", "topic", msg.GetTopic(), "err", tele.LogAttrError(err), + ) } return nil } func (p *PubSub) handleAttestation(ctx context.Context, msg *pubsub.Message) error { - now := time.Now() - if msg == nil || msg.Topic == nil || *msg.Topic == "" { - return fmt.Errorf("nil message or topic") - } - - attestation := ethtypes.Attestation{} - err := p.cfg.Encoder.DecodeGossip(msg.Data, &attestation) - if err != nil { - return fmt.Errorf("decode attestation gossip message: %w", err) + return fmt.Errorf("handleAttestation(): nil message or topic") } - payload := map[string]any{ - "PeerID": msg.ReceivedFrom.String(), - "MsgID": hex.EncodeToString([]byte(msg.ID)), - "MsgSize": len(msg.Data), - "Topic": msg.GetTopic(), - "Seq": msg.GetSeqno(), - "CommIdx": attestation.GetData().GetCommitteeIndex(), - "Slot": attestation.GetData().GetSlot(), - "BeaconBlockRoot": attestation.GetData().GetBeaconBlockRoot(), - "Source": attestation.GetData().GetSource(), - "Target": attestation.GetData().GetTarget(), - } + var ( + err error + evt = &host.TraceEvent{ + Type: eventTypeHandleMessage, + Topic: msg.GetTopic(), + PeerID: p.host.ID(), + Timestamp: time.Now(), + } + ) - // If the attestation only has one aggregation bit set, we can add an additional field to the payload - // that denotes _which_ aggregation bit is set. This is required to determine which validator created the attestation. - // In the pursuit of reducing the amount of data stored in the data stream we omit this field if the attestation is - // aggregated. - if attestation.GetAggregationBits().Count() == 1 { - payload["AggregatePos"] = attestation.AggregationBits.BitIndices()[0] - } + evt, err = p.dsr.RenderPayload(evt, msg, ðtypes.Attestation{}) + if err != nil { + slog.Warn( + "failed rendering topic handler event", "topic", msg.GetTopic(), "err", tele.LogAttrError(err), + ) - evt := &host.TraceEvent{ - Type: eventTypeHandleMessage, - PeerID: p.host.ID(), - Timestamp: now, - Payload: payload, + return nil } if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { - slog.Warn("failed putting topic handler event", tele.LogAttrError(err)) + slog.Warn( + "failed putting topic handler event", "topic", msg.GetTopic(), "err", tele.LogAttrError(err), + ) } return nil } func (p *PubSub) handleAggregateAndProof(ctx context.Context, msg *pubsub.Message) error { - now := time.Now() - - ap := ðtypes.SignedAggregateAttestationAndProof{} - if err := p.cfg.Encoder.DecodeGossip(msg.Data, ap); err != nil { - return fmt.Errorf("decode aggregate and proof message: %w", err) - } - - evt := &host.TraceEvent{ - Type: eventTypeHandleMessage, - PeerID: p.host.ID(), - Timestamp: now, - Payload: map[string]any{ - "PeerID": msg.ReceivedFrom.String(), - "MsgID": hex.EncodeToString([]byte(msg.ID)), - "MsgSize": len(msg.Data), - "Topic": msg.GetTopic(), - "Seq": msg.GetSeqno(), - "Sig": hexutil.Encode(ap.GetSignature()), - "AggIdx": ap.GetMessage().GetAggregatorIndex(), - "SelectionProof": hexutil.Encode(ap.GetMessage().GetSelectionProof()), - // There are other details in the SignedAggregateAttestationAndProof message, add them when needed. - }, + if msg == nil || msg.Topic == nil || *msg.Topic == "" { + return fmt.Errorf("handleAggregateAndProof(): nil message or topic") + } + + var ( + err error + evt = &host.TraceEvent{ + Type: eventTypeHandleMessage, + Topic: msg.GetTopic(), + PeerID: p.host.ID(), + Timestamp: time.Now(), + } + ) + + evt, err = p.dsr.RenderPayload(evt, msg, ðtypes.SignedAggregateAttestationAndProof{}) + if err != nil { + slog.Warn( + "failed rendering topic handler event", "topic", msg.GetTopic(), "err", tele.LogAttrError(err), + ) + + return nil } if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { slog.Warn( - "failed putting topic handler event", - "topic", msg.GetTopic(), - "err", tele.LogAttrError(err), + "failed putting topic handler event", "topic", msg.GetTopic(), "err", tele.LogAttrError(err), ) } @@ -273,233 +270,235 @@ func (p *PubSub) handleAggregateAndProof(ctx context.Context, msg *pubsub.Messag } func (p *PubSub) handleExitMessage(ctx context.Context, msg *pubsub.Message) error { - now := time.Now() - - ve := ðtypes.VoluntaryExit{} - if err := p.cfg.Encoder.DecodeGossip(msg.Data, ve); err != nil { - return fmt.Errorf("decode voluntary exit message: %w", err) + if msg == nil || msg.Topic == nil || *msg.Topic == "" { + return fmt.Errorf("handleExitMessage(): nil message or topic") } - evt := &host.TraceEvent{ - Type: eventTypeHandleMessage, - PeerID: p.host.ID(), - Timestamp: now, - Payload: map[string]any{ - "PeerID": msg.ReceivedFrom.String(), - "MsgID": hex.EncodeToString([]byte(msg.ID)), - "MsgSize": len(msg.Data), - "Topic": msg.GetTopic(), - "Seq": msg.GetSeqno(), - "Epoch": ve.GetEpoch(), - "ValIdx": ve.GetValidatorIndex(), - }, + var ( + err error + evt = &host.TraceEvent{ + Type: eventTypeHandleMessage, + Topic: msg.GetTopic(), + PeerID: p.host.ID(), + Timestamp: time.Now(), + } + ) + + evt, err = p.dsr.RenderPayload(evt, msg, ðtypes.VoluntaryExit{}) + if err != nil { + slog.Warn( + "failed rendering topic handler event", "topic", msg.GetTopic(), "err", tele.LogAttrError(err), + ) + + return nil } if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { - slog.Warn("failed putting voluntary exit event", tele.LogAttrError(err)) + slog.Warn( + "failed putting topic handler event", "topic", msg.GetTopic(), "err", tele.LogAttrError(err), + ) } return nil } func (p *PubSub) handleAttesterSlashingMessage(ctx context.Context, msg *pubsub.Message) error { - now := time.Now() - - as := ðtypes.AttesterSlashing{} - if err := p.cfg.Encoder.DecodeGossip(msg.Data, as); err != nil { - return fmt.Errorf("decode attester slashing message: %w", err) + if msg == nil || msg.Topic == nil || *msg.Topic == "" { + return fmt.Errorf("handleAttesterSlashingMessage(): nil message or topic") } - evt := &host.TraceEvent{ - Type: eventTypeHandleMessage, - PeerID: p.host.ID(), - Timestamp: now, - Payload: map[string]any{ - "PeerID": msg.ReceivedFrom.String(), - "MsgID": hex.EncodeToString([]byte(msg.ID)), - "MsgSize": len(msg.Data), - "Topic": msg.GetTopic(), - "Seq": msg.GetSeqno(), - "Att1_indices": as.GetAttestation_1().GetAttestingIndices(), - "Att2_indices": as.GetAttestation_2().GetAttestingIndices(), - }, + var ( + err error + evt = &host.TraceEvent{ + Type: eventTypeHandleMessage, + Topic: msg.GetTopic(), + PeerID: p.host.ID(), + Timestamp: time.Now(), + } + ) + + evt, err = p.dsr.RenderPayload(evt, msg, ðtypes.AttesterSlashing{}) + if err != nil { + slog.Warn( + "failed rendering topic handler event", "topic", msg.GetTopic(), "err", tele.LogAttrError(err), + ) + + return nil } if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { - slog.Warn("failed putting attester slashing event", tele.LogAttrError(err)) + slog.Warn( + "failed putting topic handler event", "topic", msg.GetTopic(), "err", tele.LogAttrError(err), + ) } return nil } func (p *PubSub) handleProposerSlashingMessage(ctx context.Context, msg *pubsub.Message) error { - now := time.Now() - - ps := ðtypes.ProposerSlashing{} - if err := p.cfg.Encoder.DecodeGossip(msg.Data, ps); err != nil { - return fmt.Errorf("decode proposer slashing message: %w", err) - } - - evt := &host.TraceEvent{ - Type: eventTypeHandleMessage, - PeerID: p.host.ID(), - Timestamp: now, - Payload: map[string]any{ - "PeerID": msg.ReceivedFrom.String(), - "MsgID": hex.EncodeToString([]byte(msg.ID)), - "MsgSize": len(msg.Data), - "Topic": msg.GetTopic(), - "Seq": msg.GetSeqno(), - "Header1_Slot": ps.GetHeader_1().GetHeader().GetSlot(), - "Header1_ProposerIndex": ps.GetHeader_1().GetHeader().GetProposerIndex(), - "Header1_StateRoot": hexutil.Encode(ps.GetHeader_1().GetHeader().GetStateRoot()), - "Header2_Slot": ps.GetHeader_2().GetHeader().GetSlot(), - "Header2_ProposerIndex": ps.GetHeader_2().GetHeader().GetProposerIndex(), - "Header2_StateRoot": hexutil.Encode(ps.GetHeader_2().GetHeader().GetStateRoot()), - }, + if msg == nil || msg.Topic == nil || *msg.Topic == "" { + return fmt.Errorf("handleProposerSlashingMessage(): nil message or topic") + } + + var ( + err error + evt = &host.TraceEvent{ + Type: eventTypeHandleMessage, + Topic: msg.GetTopic(), + PeerID: p.host.ID(), + Timestamp: time.Now(), + } + ) + + evt, err = p.dsr.RenderPayload(evt, msg, ðtypes.ProposerSlashing{}) + if err != nil { + slog.Warn( + "failed rendering topic handler event", "topic", msg.GetTopic(), "err", tele.LogAttrError(err), + ) + + return nil } if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { - slog.Warn("failed putting proposer slashing event", tele.LogAttrError(err)) + slog.Warn( + "failed putting topic handler event", "topic", msg.GetTopic(), "err", tele.LogAttrError(err), + ) } return nil } -func (p *PubSub) handleContributtionAndProofMessage(ctx context.Context, msg *pubsub.Message) error { - now := time.Now() - - cp := ðtypes.SignedContributionAndProof{} - if err := p.cfg.Encoder.DecodeGossip(msg.Data, cp); err != nil { - return fmt.Errorf("decode contribution and proof message: %w", err) +func (p *PubSub) handleContributionAndProofMessage(ctx context.Context, msg *pubsub.Message) error { + if msg == nil || msg.Topic == nil || *msg.Topic == "" { + return fmt.Errorf("handleContributionAndProofMessage(): nil message or topic") } - evt := &host.TraceEvent{ - Type: eventTypeHandleMessage, - PeerID: p.host.ID(), - Timestamp: now, - Payload: map[string]any{ - "PeerID": msg.ReceivedFrom.String(), - "MsgID": hex.EncodeToString([]byte(msg.ID)), - "MsgSize": len(msg.Data), - "Topic": msg.GetTopic(), - "Seq": msg.GetSeqno(), - "Sig": hexutil.Encode(cp.GetSignature()), - "AggIdx": cp.GetMessage().GetAggregatorIndex(), - "Contrib_Slot": cp.GetMessage().GetContribution().GetSlot(), - "Contrib_SubCommitteeIdx": cp.GetMessage().GetContribution().GetSubcommitteeIndex(), - "Contrib_BlockRoot": cp.GetMessage().GetContribution().GetBlockRoot(), - }, + var ( + err error + evt = &host.TraceEvent{ + Type: eventTypeHandleMessage, + Topic: msg.GetTopic(), + PeerID: p.host.ID(), + Timestamp: time.Now(), + } + ) + + evt, err = p.dsr.RenderPayload(evt, msg, ðtypes.SignedContributionAndProof{}) + if err != nil { + slog.Warn( + "failed rendering topic handler event", "topic", msg.GetTopic(), "err", tele.LogAttrError(err), + ) + + return nil } if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { - slog.Warn("failed putting contribution and proof event", tele.LogAttrError(err)) + slog.Warn( + "failed putting topic handler event", "topic", msg.GetTopic(), "err", tele.LogAttrError(err), + ) } return nil } func (p *PubSub) handleSyncCommitteeMessage(ctx context.Context, msg *pubsub.Message) error { - now := time.Now() - - sc := ðtypes.SyncCommitteeMessage{} - if err := p.cfg.Encoder.DecodeGossip(msg.Data, sc); err != nil { - return fmt.Errorf("decode sync committee message: %w", err) - } - - evt := &host.TraceEvent{ - Type: eventTypeHandleMessage, - PeerID: p.host.ID(), - Timestamp: now, - Payload: map[string]any{ - "PeerID": msg.ReceivedFrom.String(), - "MsgID": hex.EncodeToString([]byte(msg.ID)), - "MsgSize": len(msg.Data), - "Topic": msg.GetTopic(), - "Seq": msg.GetSeqno(), - "Slot": sc.GetSlot(), - "ValIdx": sc.GetValidatorIndex(), - "BlockRoot": hexutil.Encode(sc.GetBlockRoot()), - "Signature": hexutil.Encode(sc.GetSignature()), - }, + if msg == nil || msg.Topic == nil || *msg.Topic == "" { + return fmt.Errorf("handleSyncCommitteeMessage(): nil message or topic") + } + + var ( + err error + evt = &host.TraceEvent{ + Type: eventTypeHandleMessage, + Topic: msg.GetTopic(), + PeerID: p.host.ID(), + Timestamp: time.Now(), + } + ) + + evt, err = p.dsr.RenderPayload(evt, msg, ðtypes.SyncCommitteeMessage{}) + if err != nil { + slog.Warn( + "failed rendering topic handler event", "topic", msg.GetTopic(), "err", tele.LogAttrError(err), + ) + + return nil } if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { - slog.Warn("failed putting sync committee event", tele.LogAttrError(err)) + slog.Warn( + "failed putting topic handler event", "topic", msg.GetTopic(), "err", tele.LogAttrError(err), + ) } return nil } func (p *PubSub) handleBlsToExecutionChangeMessage(ctx context.Context, msg *pubsub.Message) error { - now := time.Now() - - pb := ðtypes.BLSToExecutionChange{} - if err := p.cfg.Encoder.DecodeGossip(msg.Data, pb); err != nil { - return fmt.Errorf("decode bls to execution change message: %w", err) + if msg == nil || msg.Topic == nil || *msg.Topic == "" { + return fmt.Errorf("handleBlsToExecutionChangeMessage(): nil message or topic") } - evt := &host.TraceEvent{ - Type: eventTypeHandleMessage, - PeerID: p.host.ID(), - Timestamp: now, - Payload: map[string]any{ - "PeerID": msg.ReceivedFrom.String(), - "MsgID": hex.EncodeToString([]byte(msg.ID)), - "MsgSize": len(msg.Data), - "Topic": msg.GetTopic(), - "Seq": msg.GetSeqno(), - "ValIdx": pb.GetValidatorIndex(), - "FromBlsPubkey": hexutil.Encode(pb.GetFromBlsPubkey()), - "ToExecutionAddress": hexutil.Encode(pb.GetToExecutionAddress()), - }, + var ( + err error + evt = &host.TraceEvent{ + Type: eventTypeHandleMessage, + Topic: msg.GetTopic(), + PeerID: p.host.ID(), + Timestamp: time.Now(), + } + ) + + evt, err = p.dsr.RenderPayload(evt, msg, ðtypes.BLSToExecutionChange{}) + if err != nil { + slog.Warn( + "failed rendering topic handler event", "topic", msg.GetTopic(), "err", tele.LogAttrError(err), + ) + + return nil } if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { - slog.Warn("failed putting bls to execution change event", tele.LogAttrError(err)) + slog.Warn( + "failed putting topic handler event", "topic", msg.GetTopic(), "err", tele.LogAttrError(err), + ) } return nil } func (p *PubSub) handleBlobSidecar(ctx context.Context, msg *pubsub.Message) error { - now := time.Now() + if msg == nil || msg.Topic == nil || *msg.Topic == "" { + return fmt.Errorf("handleBlobSidecar(): nil message or topic") + } + + var ( + err error + evt = &host.TraceEvent{ + Type: eventTypeHandleMessage, + Topic: msg.GetTopic(), + PeerID: p.host.ID(), + Timestamp: time.Now(), + } + ) switch p.cfg.ForkVersion { case DenebForkVersion: - var blob ethtypes.BlobSidecar - err := p.cfg.Encoder.DecodeGossip(msg.Data, &blob) - if err != nil { - slog.Error("decode blob sidecar gossip message", tele.LogAttrError(err)) - return err - } + blob := ethtypes.BlobSidecar{} - slot := blob.GetSignedBlockHeader().GetHeader().GetSlot() - slotStart := p.cfg.GenesisTime.Add(time.Duration(slot) * p.cfg.SecondsPerSlot) - proposerIndex := blob.GetSignedBlockHeader().GetHeader().GetProposerIndex() + evt, err = p.dsr.RenderPayload(evt, msg, &blob) + if err != nil { + slog.Warn( + "failed rendering topic handler event", "topic", msg.GetTopic(), "err", tele.LogAttrError(err), + ) - evt := &host.TraceEvent{ - Type: "HANDLE_MESSAGE", - PeerID: p.host.ID(), - Timestamp: now, - Payload: map[string]any{ - "PeerID": msg.ReceivedFrom.String(), - "MsgID": hex.EncodeToString([]byte(msg.ID)), - "MsgSize": len(msg.Data), - "Topic": msg.GetTopic(), - "Seq": msg.GetSeqno(), - "Slot": slot, - "ValIdx": proposerIndex, - "index": blob.GetIndex(), - "TimeInSlot": now.Sub(slotStart).Seconds(), - "StateRoot": hexutil.Encode(blob.GetSignedBlockHeader().GetHeader().GetStateRoot()), - "BodyRoot": hexutil.Encode(blob.GetSignedBlockHeader().GetHeader().GetBodyRoot()), - "ParentRoot": hexutil.Encode(blob.GetSignedBlockHeader().GetHeader().GetParentRoot()), - }, + return nil } if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { - slog.Warn("failed putting topic handler event", tele.LogAttrError(err)) + slog.Warn( + "failed putting topic handler event", "topic", msg.GetTopic(), "err", tele.LogAttrError(err), + ) } default: return fmt.Errorf("non recognized fork-version: %d", p.cfg.ForkVersion[:]) @@ -507,86 +506,3 @@ func (p *PubSub) handleBlobSidecar(ctx context.Context, msg *pubsub.Message) err return nil } - -type GenericSignedBeaconBlock interface { - GetBlock() GenericBeaconBlock -} - -type GenericBeaconBlock interface { - GetSlot() primitives.Slot - GetProposerIndex() primitives.ValidatorIndex -} - -func (p *PubSub) getSignedBeaconBlockForForkDigest(msgData []byte) (genericSbb GenericBeaconBlock, root [32]byte, err error) { - // get the correct fork - - switch p.cfg.ForkVersion { - case Phase0ForkVersion: - phase0Sbb := ethtypes.SignedBeaconBlock{} - err = p.cfg.Encoder.DecodeGossip(msgData, &phase0Sbb) - if err != nil { - return genericSbb, [32]byte{}, fmt.Errorf("error decoding phase0 beacon block gossip message: %w", err) - } - genericSbb = phase0Sbb.GetBlock() - root, err = phase0Sbb.Block.HashTreeRoot() - if err != nil { - return genericSbb, [32]byte{}, fmt.Errorf("invalid hash tree root: %w", err) - } - return genericSbb, root, nil - - case AltairForkVersion: - altairSbb := ethtypes.SignedBeaconBlockAltair{} - err = p.cfg.Encoder.DecodeGossip(msgData, &altairSbb) - if err != nil { - return genericSbb, [32]byte{}, fmt.Errorf("error decoding altair beacon block gossip message: %w", err) - } - genericSbb = altairSbb.GetBlock() - root, err = altairSbb.Block.HashTreeRoot() - if err != nil { - return genericSbb, [32]byte{}, fmt.Errorf("invalid hash tree root: %w", err) - } - return genericSbb, root, nil - - case BellatrixForkVersion: - BellatrixSbb := ethtypes.SignedBeaconBlockBellatrix{} - err = p.cfg.Encoder.DecodeGossip(msgData, &BellatrixSbb) - if err != nil { - return genericSbb, [32]byte{}, fmt.Errorf("error decoding bellatrix beacon block gossip message: %w", err) - } - genericSbb = BellatrixSbb.GetBlock() - root, err = BellatrixSbb.Block.HashTreeRoot() - if err != nil { - return genericSbb, [32]byte{}, fmt.Errorf("invalid hash tree root: %w", err) - } - return genericSbb, root, nil - - case CapellaForkVersion: - capellaSbb := ethtypes.SignedBeaconBlockCapella{} - err = p.cfg.Encoder.DecodeGossip(msgData, &capellaSbb) - if err != nil { - return genericSbb, [32]byte{}, fmt.Errorf("error decoding capella beacon block gossip message: %w", err) - } - genericSbb = capellaSbb.GetBlock() - root, err = capellaSbb.Block.HashTreeRoot() - if err != nil { - return genericSbb, [32]byte{}, fmt.Errorf("invalid hash tree root: %w", err) - } - return genericSbb, root, nil - - case DenebForkVersion: - denebSbb := ethtypes.SignedBeaconBlockDeneb{} - err = p.cfg.Encoder.DecodeGossip(msgData, &denebSbb) - if err != nil { - return genericSbb, [32]byte{}, fmt.Errorf("error decoding deneb beacon block gossip message: %w", err) - } - genericSbb = denebSbb.GetBlock() - root, err = denebSbb.Block.HashTreeRoot() - if err != nil { - return genericSbb, [32]byte{}, fmt.Errorf("invalid hash tree root: %w", err) - } - return genericSbb, root, nil - - default: - return genericSbb, [32]byte{}, fmt.Errorf("non recognized fork-version: %d", p.cfg.ForkVersion[:]) - } -} diff --git a/eth/reqresp.go b/eth/reqresp.go index b52422c..f3d7838 100644 --- a/eth/reqresp.go +++ b/eth/reqresp.go @@ -970,7 +970,7 @@ func (r *ReqResp) readFirstChunkedBlock(stream core.Stream, encoding encoder.Net return nil, err } if code != 0 { - return nil, fmt.Errorf(errMsg) + return nil, errors.New(errMsg) } // set deadline for reading from stream if err = stream.SetWriteDeadline(time.Now().Add(r.cfg.WriteTimeout)); err != nil { @@ -999,7 +999,7 @@ func (r *ReqResp) readResponseChunk(stream core.Stream, encoding encoder.Network return nil, err } if code != 0 { - return nil, fmt.Errorf(errMsg) + return nil, errors.New(errMsg) } // No-op for now with the rpc context. forkD, err := r.readForkDigestFromStream(stream) diff --git a/go.mod b/go.mod index 6449739..1443ec6 100644 --- a/go.mod +++ b/go.mod @@ -19,20 +19,21 @@ require ( github.com/lmittmann/tint v1.0.4 github.com/multiformats/go-multiaddr v0.13.0 github.com/prometheus/client_golang v1.20.0 - github.com/prysmaticlabs/go-bitfield v0.0.0-20210809151128-385d8c5e3fb7 - github.com/prysmaticlabs/prysm/v5 v5.0.2 + github.com/prysmaticlabs/fastssz v0.0.0-20240620202422-a981b8ef89d3 + github.com/prysmaticlabs/go-bitfield v0.0.0-20240328144219-a1caa50c3a1e + github.com/prysmaticlabs/prysm/v5 v5.1.0 github.com/stretchr/testify v1.9.0 github.com/thejerf/suture/v4 v4.0.5 github.com/urfave/cli/v2 v2.27.1 - go.opentelemetry.io/otel v1.24.0 - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.24.0 + go.opentelemetry.io/otel v1.29.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.29.0 go.opentelemetry.io/otel/exporters/prometheus v0.46.0 - go.opentelemetry.io/otel/metric v1.24.0 - go.opentelemetry.io/otel/sdk v1.24.0 - go.opentelemetry.io/otel/sdk/metric v1.24.0 - go.opentelemetry.io/otel/trace v1.24.0 + go.opentelemetry.io/otel/metric v1.29.0 + go.opentelemetry.io/otel/sdk v1.29.0 + go.opentelemetry.io/otel/sdk/metric v1.29.0 + go.opentelemetry.io/otel/trace v1.29.0 golang.org/x/time v0.5.0 - google.golang.org/grpc v1.62.1 + google.golang.org/grpc v1.65.0 google.golang.org/protobuf v1.34.2 gopkg.in/yaml.v3 v3.0.1 ) @@ -58,7 +59,7 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.11.0 // indirect github.com/btcsuite/btcd/btcec/v2 v2.3.2 // indirect - github.com/cenkalti/backoff/v4 v4.2.1 // indirect + github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash v1.1.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/consensys/bavard v0.1.13 // indirect @@ -86,15 +87,14 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt/v4 v4.5.1 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect - github.com/golang/protobuf v1.5.3 // indirect + github.com/golang/protobuf v1.5.4 // indirect github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/gopacket v1.1.19 // indirect github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8 // indirect - github.com/gorilla/mux v1.8.0 // indirect github.com/gorilla/websocket v1.5.3 // indirect - github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect github.com/herumi/bls-eth-go-binary v0.0.0-20210917013441-d37c07cfda4e // indirect github.com/holiman/uint256 v1.2.4 // indirect @@ -173,16 +173,14 @@ require ( github.com/prometheus/common v0.55.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect github.com/prometheus/prom2json v1.3.0 // indirect - github.com/prysmaticlabs/fastssz v0.0.0-20221107182844-78142813af44 // indirect - github.com/prysmaticlabs/gohashtree v0.0.4-beta // indirect + github.com/prysmaticlabs/gohashtree v0.0.4-beta.0.20240624100937-73632381301b // indirect github.com/prysmaticlabs/prombbolt v0.0.0-20210126082820-9b7adba6db7c // indirect github.com/quic-go/qpack v0.4.0 // indirect github.com/quic-go/quic-go v0.46.0 // indirect github.com/quic-go/webtransport-go v0.8.0 // indirect github.com/raulk/go-watchdog v1.3.0 // indirect github.com/rivo/uniseg v0.4.4 // indirect - github.com/rogpeppe/go-internal v1.11.0 // indirect - github.com/rs/cors v1.7.0 // indirect + github.com/rogpeppe/go-internal v1.12.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/schollz/progressbar/v3 v3.3.4 // indirect github.com/shirou/gopsutil v3.21.11+incompatible // indirect @@ -202,8 +200,8 @@ require ( github.com/yusufpapurcu/wmi v1.2.3 // indirect go.etcd.io/bbolt v1.3.6 // indirect go.opencensus.io v0.24.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0 // indirect - go.opentelemetry.io/proto/otlp v1.1.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.29.0 // indirect + go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/dig v1.18.0 // indirect go.uber.org/fx v1.22.2 // indirect go.uber.org/mock v0.4.0 // indirect @@ -213,16 +211,15 @@ require ( golang.org/x/exp v0.0.0-20240808152545-0cdaa3abc0fa // indirect golang.org/x/mod v0.20.0 // indirect golang.org/x/net v0.28.0 // indirect - golang.org/x/oauth2 v0.21.0 // indirect + golang.org/x/oauth2 v0.22.0 // indirect golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.24.0 // indirect golang.org/x/term v0.23.0 // indirect golang.org/x/text v0.17.0 // indirect golang.org/x/tools v0.24.0 // indirect google.golang.org/api v0.44.0 // indirect - google.golang.org/genproto v0.0.0-20240123012728-ef4313101c80 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20240822170219-fc7c04adadcd // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect k8s.io/apimachinery v0.20.0 // indirect diff --git a/go.sum b/go.sum index 7c0f626..55dfd9d 100644 --- a/go.sum +++ b/go.sum @@ -150,8 +150,8 @@ github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1/go.mod h1:7SFka0XMvUgj3hfZtyd github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= -github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= -github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/cp v1.1.1 h1:nCb6ZLdB7NRaqsm91JtQTAme2SKJzXVsdPIPkyJr1MU= github.com/cespare/cp v1.1.1/go.mod h1:SOGHArjBr4JWaSDEVpWpo/hNg6RoKrls6Oh40hiwW+s= @@ -363,8 +363,8 @@ github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.1/go.mod h1:DopwsBzvsk0Fs44TXzsVbJyPhcCPeIwnvohx4u74HPM= -github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= -github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= @@ -428,8 +428,6 @@ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORR github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= -github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= -github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= @@ -438,8 +436,8 @@ github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/grpc-gateway v1.5.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw= github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 h1:Wqo399gCIufwto+VfwCSvsnfGpF/w5E9CNxSwbpD6No= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0/go.mod h1:qmOFXW2epJhM0qSnUUYpldc7gVz2KMQwJ/QYCDIa7XU= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 h1:asbCHRVmodnJTuQ3qamDwqVOIjwqUPTYmYuemVOx+Ys= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0/go.mod h1:ggCgvZ2r7uOoQjOyu2Y1NhHmEPPzzuhWgcza5M1Ji1I= github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE= github.com/hashicorp/consul/sdk v0.3.0/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -834,18 +832,18 @@ github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0leargg github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/prometheus/prom2json v1.3.0 h1:BlqrtbT9lLH3ZsOVhXPsHzFrApCTKRifB7gjJuypu6Y= github.com/prometheus/prom2json v1.3.0/go.mod h1:rMN7m0ApCowcoDlypBHlkNbp5eJQf/+1isKykIP5ZnM= -github.com/prysmaticlabs/fastssz v0.0.0-20221107182844-78142813af44 h1:c3p3UzV4vFA7xaCDphnDWOjpxcadrQ26l5b+ypsvyxo= -github.com/prysmaticlabs/fastssz v0.0.0-20221107182844-78142813af44/go.mod h1:MA5zShstUwCQaE9faGHgCGvEWUbG87p4SAXINhmCkvg= -github.com/prysmaticlabs/go-bitfield v0.0.0-20210809151128-385d8c5e3fb7 h1:0tVE4tdWQK9ZpYygoV7+vS6QkDvQVySboMVEIxBJmXw= -github.com/prysmaticlabs/go-bitfield v0.0.0-20210809151128-385d8c5e3fb7/go.mod h1:wmuf/mdK4VMD+jA9ThwcUKjg3a2XWM9cVfFYjDyY4j4= -github.com/prysmaticlabs/gohashtree v0.0.4-beta h1:H/EbCuXPeTV3lpKeXGPpEV9gsUpkqOOVnWapUyeWro4= -github.com/prysmaticlabs/gohashtree v0.0.4-beta/go.mod h1:BFdtALS+Ffhg3lGQIHv9HDWuHS8cTvHZzrHWxwOtGOs= +github.com/prysmaticlabs/fastssz v0.0.0-20240620202422-a981b8ef89d3 h1:0LZAwwHnsZFfXm4IK4rzFV4N5IVSKZKLmuBMA4kAlFk= +github.com/prysmaticlabs/fastssz v0.0.0-20240620202422-a981b8ef89d3/go.mod h1:h2OlIZD/M6wFvV3YMZbW16lFgh3Rsye00G44J2cwLyU= +github.com/prysmaticlabs/go-bitfield v0.0.0-20240328144219-a1caa50c3a1e h1:ATgOe+abbzfx9kCPeXIW4fiWyDdxlwHw07j8UGhdTd4= +github.com/prysmaticlabs/go-bitfield v0.0.0-20240328144219-a1caa50c3a1e/go.mod h1:wmuf/mdK4VMD+jA9ThwcUKjg3a2XWM9cVfFYjDyY4j4= +github.com/prysmaticlabs/gohashtree v0.0.4-beta.0.20240624100937-73632381301b h1:VK7thFOnhxAZ/5aolr5Os4beiubuD08WiuiHyRqgwks= +github.com/prysmaticlabs/gohashtree v0.0.4-beta.0.20240624100937-73632381301b/go.mod h1:HRuvtXLZ4WkaB1MItToVH2e8ZwKwZPY5/Rcby+CvvLY= github.com/prysmaticlabs/prombbolt v0.0.0-20210126082820-9b7adba6db7c h1:9PHRCuO/VN0s9k+RmLykho7AjDxblNYI5bYKed16NPU= github.com/prysmaticlabs/prombbolt v0.0.0-20210126082820-9b7adba6db7c/go.mod h1:ZRws458tYHS/Zs936OQ6oCrL+Ict5O4Xpwve1UQ6C9M= github.com/prysmaticlabs/protoc-gen-go-cast v0.0.0-20230228205207-28762a7b9294 h1:q9wE0ZZRdTUAAeyFP/w0SwBEnCqlVy2+on6X2/e+eAU= github.com/prysmaticlabs/protoc-gen-go-cast v0.0.0-20230228205207-28762a7b9294/go.mod h1:ZVEbRdnMkGhp/pu35zq4SXxtvUwWK0J1MATtekZpH2Y= -github.com/prysmaticlabs/prysm/v5 v5.0.2 h1:xcSUvrCVfOGslKYUb5Hpyz98N9I8fC2p7DMAZfiqEIA= -github.com/prysmaticlabs/prysm/v5 v5.0.2/go.mod h1:XG4nOU925zemOimoexcrFP4oA57f+RTQbp7V/TH9UOM= +github.com/prysmaticlabs/prysm/v5 v5.1.0 h1:TY9A6tm0v7bI1z9YH+xkDh7XH7qm4ZK8sTeyckxbj4A= +github.com/prysmaticlabs/prysm/v5 v5.1.0/go.mod h1:SWb5kE/FhtQrLS2yt+IDj+leB7IhXrcOv6lhDnU1nBY= github.com/quic-go/qpack v0.4.0 h1:Cr9BXA1sQS2SmDUWjSofMPNKmvF6IiIfDRmgU0w1ZCo= github.com/quic-go/qpack v0.4.0/go.mod h1:UZVnYIfi5GRk+zI9UMaCPsmZ2xKJP7XBUvVyT1Knj9A= github.com/quic-go/quic-go v0.46.0 h1:uuwLClEEyk1DNvchH8uCByQVjo3yKL9opKulExNDs7Y= @@ -862,8 +860,8 @@ github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUc github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= -github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= -github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik= github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= @@ -1018,24 +1016,24 @@ go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= -go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= -go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0 h1:t6wl9SPayj+c7lEIFgm4ooDBZVb01IhLB4InpomhRw8= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0/go.mod h1:iSDOcsnSA5INXzZtwaBPrKp/lWu/V14Dd+llD0oI2EA= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.24.0 h1:Mw5xcxMwlqoJd97vwPxA8isEaIoxsta9/Q51+TTJLGE= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.24.0/go.mod h1:CQNu9bj7o7mC6U7+CA/schKEYakYXWr79ucDHTMGhCM= +go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw= +go.opentelemetry.io/otel v1.29.0/go.mod h1:N/WtXPs1CNCUEx+Agz5uouwCba+i+bJGFicT8SR4NP8= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.29.0 h1:dIIDULZJpgdiHz5tXrTgKIMLkus6jEFa7x5SOKcyR7E= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.29.0/go.mod h1:jlRVBe7+Z1wyxFSUs48L6OBQZ5JwH2Hg/Vbl+t9rAgI= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.29.0 h1:nSiV3s7wiCam610XcLbYOmMfJxB9gO4uK3Xgv5gmTgg= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.29.0/go.mod h1:hKn/e/Nmd19/x1gvIHwtOwVWM+VhuITSWip3JUDghj0= go.opentelemetry.io/otel/exporters/prometheus v0.46.0 h1:I8WIFXR351FoLJYuloU4EgXbtNX2URfU/85pUPheIEQ= go.opentelemetry.io/otel/exporters/prometheus v0.46.0/go.mod h1:ztwVUHe5DTR/1v7PeuGRnU5Bbd4QKYwApWmuutKsJSs= -go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI= -go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco= -go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw= -go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg= -go.opentelemetry.io/otel/sdk/metric v1.24.0 h1:yyMQrPzF+k88/DbH7o4FMAs80puqd+9osbiBrJrz/w8= -go.opentelemetry.io/otel/sdk/metric v1.24.0/go.mod h1:I6Y5FjH6rvEnTTAYQz3Mmv2kl6Ek5IIrmwTLqMrrOE0= -go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI= -go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= -go.opentelemetry.io/proto/otlp v1.1.0 h1:2Di21piLrCqJ3U3eXGCTPHE9R8Nh+0uglSnOyxikMeI= -go.opentelemetry.io/proto/otlp v1.1.0/go.mod h1:GpBHCBWiqvVLDqmHZsoMM3C5ySeKTC7ej/RNTae6MdY= +go.opentelemetry.io/otel/metric v1.29.0 h1:vPf/HFWTNkPu1aYeIsc98l4ktOQaL6LeSoeV2g+8YLc= +go.opentelemetry.io/otel/metric v1.29.0/go.mod h1:auu/QWieFVWx+DmQOUMgj0F8LHWdgalxXqvp7BII/W8= +go.opentelemetry.io/otel/sdk v1.29.0 h1:vkqKjk7gwhS8VaWb0POZKmIEDimRCMsopNYnriHyryo= +go.opentelemetry.io/otel/sdk v1.29.0/go.mod h1:pM8Dx5WKnvxLCb+8lG1PRNIDxu9g9b9g59Qr7hfAAok= +go.opentelemetry.io/otel/sdk/metric v1.29.0 h1:K2CfmJohnRgvZ9UAj2/FhIf/okdWcNdBwe1m8xFXiSY= +go.opentelemetry.io/otel/sdk/metric v1.29.0/go.mod h1:6zZLdCl2fkauYoZIOn/soQIDSWFmNSRcICarHfuhNJQ= +go.opentelemetry.io/otel/trace v1.29.0 h1:J/8ZNK4XgR7a21DZUAsbF8pZ5Jcw1VhACmnYt39JTi4= +go.opentelemetry.io/otel/trace v1.29.0/go.mod h1:eHl3w0sp3paPkYstJOmAimxhiFXPg+MMTlEh3nsQgWQ= +go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= +go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= @@ -1199,8 +1197,8 @@ golang.org/x/oauth2 v0.0.0-20210218202405-ba52d332ba99/go.mod h1:KelEdhl1UZF7XfJ golang.org/x/oauth2 v0.0.0-20210220000619-9bb904979d93/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20210313182246-cd4f82c27b84/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= -golang.org/x/oauth2 v0.21.0 h1:tsimM75w1tF/uws5rbeHzIWxEqElMehnc+iW793zsZs= -golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= +golang.org/x/oauth2 v0.22.0 h1:BzDx2FehcG7jJwgWLELCdmLuxk2i+x9UDpSiss2u0ZA= +golang.org/x/oauth2 v0.22.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/perf v0.0.0-20180704124530-6e6d33e29852/go.mod h1:JLpeXjPJfIyPr5TlbXLkXWLhP8nz10XfvxElABhCtcw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -1489,12 +1487,10 @@ google.golang.org/genproto v0.0.0-20210303154014-9728d6b83eeb/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20210310155132-4ce2db91004e/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210319143718-93e7006c17a6/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210402141018-6c239bbf2bb1/go.mod h1:9lPAdzaEmUacj36I+k7YKbEc5CXzPIeORRgDAUOu28A= -google.golang.org/genproto v0.0.0-20240123012728-ef4313101c80 h1:KAeGQVN3M9nD0/bQXnr/ClcEMJ968gUXJQ9pwfSynuQ= -google.golang.org/genproto v0.0.0-20240123012728-ef4313101c80/go.mod h1:cc8bqMqtv9gMOr0zHg2Vzff5ULhhL2IXP4sbcn32Dro= -google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80 h1:Lj5rbfG876hIAYFjqiJnPHfhXbv+nzTWfm04Fg/XSVU= -google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80/go.mod h1:4jWUdICTdgc3Ibxmr8nAJiiLHwQBY0UI0XZcEMaFKaA= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 h1:AjyfHzEPEFp/NpvfN5g+KDla3EMojjhRVZc1i7cj+oM= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80/go.mod h1:PAREbraiVEVGVdTZsVWjSbbTtSyGbAgIIvni8a8CD5s= +google.golang.org/genproto/googleapis/api v0.0.0-20240822170219-fc7c04adadcd h1:BBOTEWLuuEGQy9n1y9MhVJ9Qt0BDu21X8qZs71/uPZo= +google.golang.org/genproto/googleapis/api v0.0.0-20240822170219-fc7c04adadcd/go.mod h1:fO8wJzT2zbQbAjbIoos1285VfEIYKDDY+Dt+WpTkh6g= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd h1:6TEm2ZxXoQmFWFlt1vNxvVOa1Q0dXFQD1m/rYjXmS0E= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.16.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9MZMio= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= @@ -1520,8 +1516,8 @@ google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA5 google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= -google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk= -google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE= +google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc= +google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/host/callback.go b/host/callback.go index c2df4db..6067551 100644 --- a/host/callback.go +++ b/host/callback.go @@ -28,6 +28,11 @@ func (c *CallbackDataStream) Type() DataStreamType { return DataStreamTypeCallback } +// OutputType returns the output type to be used by this data stream. +func (c *CallbackDataStream) OutputType() DataStreamOutputType { + return DataStreamOutputTypeFull +} + // OnEvent sets the callback function that will be called when an event is received. func (c *CallbackDataStream) OnEvent(onRecord func(ctx context.Context, event *TraceEvent)) { c.cb = onRecord diff --git a/host/flush_tracer.go b/host/flush_tracer.go index de87c2b..c83b202 100644 --- a/host/flush_tracer.go +++ b/host/flush_tracer.go @@ -18,11 +18,20 @@ import ( type TraceEvent struct { Type string + Topic string PeerID peer.ID Timestamp time.Time Payload any `json:"Data"` // cannot use field "Data" because of gk.Record method } +type TraceEventPayloadMetaData struct { + PeerID string `json:"PeerID"` + Topic string `json:"Topic"` + Seq []byte `json:"Seq"` + MsgID string `json:"MsgID"` + MsgSize int `json:"MsgSize"` +} + func (t *TraceEvent) PartitionKey() string { u, err := uuid.NewUUID() if err != nil { diff --git a/host/kinesis.go b/host/kinesis.go index dad62f6..41aebeb 100644 --- a/host/kinesis.go +++ b/host/kinesis.go @@ -31,6 +31,11 @@ func (k *KinesisDataStream) Type() DataStreamType { return DataStreamTypeKinesis } +// OutputType returns the output type to be used by this data stream. +func (k *KinesisDataStream) OutputType() DataStreamOutputType { + return DataStreamOutputTypeKinesis +} + // Start begins the data stream's operation. func (k *KinesisDataStream) Start(ctx context.Context) error { dsCtx, dsCancel := context.WithCancel(ctx) diff --git a/host/producer.go b/host/producer.go index 5919641..818aa8e 100644 --- a/host/producer.go +++ b/host/producer.go @@ -2,6 +2,9 @@ package host import ( "context" + + pubsub "github.com/libp2p/go-libp2p-pubsub" + ssz "github.com/prysmaticlabs/fastssz" ) type DataStream interface { @@ -9,6 +12,12 @@ type DataStream interface { Stop(ctx context.Context) error PutRecord(ctx context.Context, event *TraceEvent) error Type() DataStreamType + OutputType() DataStreamOutputType +} + +// DataStreamRenderer is an interface to support rendering a data-stream message into a destination. +type DataStreamRenderer interface { + RenderPayload(evt *TraceEvent, msg *pubsub.Message, dst ssz.Unmarshaler) (*TraceEvent, error) } type DataStreamType int @@ -44,3 +53,13 @@ func DataStreamtypeFromStr(str string) DataStreamType { return DataStreamTypeLogger } } + +// DataStreamOutputType is the output type of the data stream. +type DataStreamOutputType int + +const ( + // DataStreamOutputTypeKinesis outputs the data stream decorated with metadata and in a format ingested by Kinesis. + DataStreamOutputTypeKinesis DataStreamOutputType = iota + // DataStreamOutputTypeFull outputs the data stream decorated with metadata and containing the raw/full event data. + DataStreamOutputTypeFull +) diff --git a/host/trace_logger.go b/host/trace_logger.go index 516568b..2c36c54 100644 --- a/host/trace_logger.go +++ b/host/trace_logger.go @@ -31,3 +31,8 @@ func (t *TraceLogger) PutRecord(ctx context.Context, event *TraceEvent) error { func (t *TraceLogger) Type() DataStreamType { return DataStreamTypeLogger } + +// OutputType returns the output type to be used by this data stream. +func (t *TraceLogger) OutputType() DataStreamOutputType { + return DataStreamOutputTypeKinesis +} diff --git a/tele/tele.go b/tele/tele.go index ad5ad76..b6d4c68 100644 --- a/tele/tele.go +++ b/tele/tele.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "log/slog" + "net" "net/http" "net/http/pprof" "time" @@ -141,9 +142,17 @@ func OtelCollectorTraceProvider(ctx context.Context, host string, port int) (*sd defer cancel() slog.Debug("Connecting to trace collector", "addr", addr) - conn, err := grpc.DialContext(ctx, addr, + + // Set up a custom context-aware dialer. + dialer := func(ctx context.Context, addr string) (net.Conn, error) { + var d net.Dialer + return d.DialContext(ctx, "tcp", addr) + } + + conn, err := grpc.NewClient( + addr, grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithBlock(), + grpc.WithContextDialer(dialer), ) if err != nil { return nil, fmt.Errorf("failed to create gRPC connection to otel collector: %w", err)