diff --git a/cmd/connector/config/config.toml b/cmd/connector/config/config.toml index 70ed958..1531297 100644 --- a/cmd/connector/config/config.toml +++ b/cmd/connector/config/config.toml @@ -12,4 +12,7 @@ with_acknowledge = true # Signals if in case of data payload processing error, we should send the ack signal or not. If you want to block # incoming data in case of a local error, this should be set to true. - blocking_ack_on_error = false + blocking_ack_on_error = true + # This flag specifies if we should drop messages if there is no connection to the host + dropMessagesIfNoConnection = false + Version = 1 diff --git a/config/config.go b/config/config.go index dd7ee03..4df91f7 100644 --- a/config/config.go +++ b/config/config.go @@ -7,10 +7,12 @@ type Config struct { // WebSocketConfig holds web sockets config type WebSocketConfig struct { - Url string `toml:"url"` - MarshallerType string `toml:"marshaller_type"` - Mode string `toml:"mode"` - RetryDuration uint32 `toml:"retry_duration"` - WithAcknowledge bool `toml:"with_acknowledge"` - BlockingAckOnError bool `toml:"blocking_ack_on_error"` + Url string `toml:"url"` + MarshallerType string `toml:"marshaller_type"` + Mode string `toml:"mode"` + RetryDuration uint32 `toml:"retry_duration"` + WithAcknowledge bool `toml:"with_acknowledge"` + BlockingAckOnError bool `toml:"blocking_ack_on_error"` + DropMessagesIfNoConnection bool // Set to `true` to drop messages if there is no active WebSocket connection to send to. + Version uint32 // Defines the payload version. } diff --git a/process/firehoseDataProcessor.go b/process/firehoseDataProcessor.go index df8a694..488d375 100644 --- a/process/firehoseDataProcessor.go +++ b/process/firehoseDataProcessor.go @@ -57,7 +57,7 @@ func NewFirehoseDataProcessor( } // ProcessPayload will process the received payload only for TopicSaveBlock, otherwise ignores it. -func (dp *dataProcessor) ProcessPayload(payload []byte, topic string) error { +func (dp *dataProcessor) ProcessPayload(payload []byte, topic string, _ uint32) error { operationHandler, found := dp.operationHandlers[topic] if !found { return nil diff --git a/process/firehoseDataProcessor_test.go b/process/firehoseDataProcessor_test.go index 04092f4..4046b9f 100644 --- a/process/firehoseDataProcessor_test.go +++ b/process/firehoseDataProcessor_test.go @@ -12,8 +12,9 @@ import ( "github.com/multiversx/mx-chain-core-go/data/block" outportcore "github.com/multiversx/mx-chain-core-go/data/outport" "github.com/multiversx/mx-chain-core-go/marshal" - "github.com/multiversx/mx-chain-ws-connector-template-go/testscommon" "github.com/stretchr/testify/require" + + "github.com/multiversx/mx-chain-ws-connector-template-go/testscommon" ) var protoMarshaller = &marshal.GogoProtoMarshalizer{} @@ -86,14 +87,14 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) { fi, _ := NewFirehoseDataProcessor(&testscommon.IoWriterStub{}, createContainer(), protoMarshaller) - err := fi.ProcessPayload(nil, outportcore.TopicSaveBlock) + err := fi.ProcessPayload(nil, outportcore.TopicSaveBlock, 1) require.Equal(t, errNilOutportBlockData, err) outportBlock := createOutportBlock() outportBlock.BlockData = nil outportBlockBytes, _ := protoMarshaller.Marshal(outportBlock) - err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock) + err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1) require.Equal(t, errNilOutportBlockData, err) }) @@ -102,7 +103,7 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) { fi, _ := NewFirehoseDataProcessor(&testscommon.IoWriterStub{}, createContainer(), protoMarshaller) - err := fi.ProcessPayload([]byte("invalid payload"), outportcore.TopicSaveBlock) + err := fi.ProcessPayload([]byte("invalid payload"), outportcore.TopicSaveBlock, 1) require.NotNil(t, err) }) @@ -123,7 +124,7 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) { fi, _ := NewFirehoseDataProcessor(ioWriter, createContainer(), protoMarshaller) outportBlockBytes, _ := protoMarshaller.Marshal(outportBlock) - err := fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock) + err := fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1) require.NotNil(t, err) require.Equal(t, 0, ioWriterCalledCt) }) @@ -163,7 +164,7 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) { } fi, _ := NewFirehoseDataProcessor(ioWriter, createContainer(), marshaller) - err := fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock) + err := fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1) require.Equal(t, errUnmarshal, err) }) @@ -197,13 +198,13 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) { outportBlock := createOutportBlock() outportBlockBytes, _ := protoMarshaller.Marshal(outportBlock) - err := fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock) + err := fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1) require.True(t, strings.Contains(err.Error(), err1.Error())) - err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock) + err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1) require.True(t, strings.Contains(err.Error(), err2.Error())) - err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock) + err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1) require.Nil(t, err) require.Equal(t, 5, ioWriterCalledCt) @@ -253,7 +254,7 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) { fi, _ := NewFirehoseDataProcessor(ioWriter, createContainer(), protoMarshaller) - err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock) + err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1) require.Nil(t, err) require.Equal(t, 2, ioWriterCalledCt) }) @@ -265,12 +266,12 @@ func TestFirehoseIndexer_NoOperationFunctions(t *testing.T) { fi, _ := NewFirehoseDataProcessor(&testscommon.IoWriterStub{}, createContainer(), protoMarshaller) - require.Nil(t, fi.ProcessPayload([]byte("payload"), "random topic")) - require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicSaveRoundsInfo)) - require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicSaveValidatorsRating)) - require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicSaveValidatorsPubKeys)) - require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicSaveAccounts)) - require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicFinalizedBlock)) + require.Nil(t, fi.ProcessPayload([]byte("payload"), "random topic", 1)) + require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicSaveRoundsInfo, 1)) + require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicSaveValidatorsRating, 1)) + require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicSaveValidatorsPubKeys, 1)) + require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicSaveAccounts, 1)) + require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicFinalizedBlock, 1)) } func TestFirehoseIndexer_Close(t *testing.T) { diff --git a/process/interface.go b/process/interface.go index 4f07660..48d3f15 100644 --- a/process/interface.go +++ b/process/interface.go @@ -14,7 +14,7 @@ type WSConnector interface { // DataProcessor defines a payload processor for incoming ws data type DataProcessor interface { - ProcessPayload(payload []byte, topic string) error + ProcessPayload(payload []byte, topic string, version uint32) error Close() error IsInterfaceNil() bool }