Skip to content

Commit

Permalink
update dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
AdoAdoAdo committed Feb 5, 2024
1 parent c126a75 commit c6df1e4
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 25 deletions.
5 changes: 4 additions & 1 deletion cmd/connector/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,7 @@
with_acknowledge = true
# Signals if in case of data payload processing error, we should send the ack signal or not. If you want to block
# incoming data in case of a local error, this should be set to true.
blocking_ack_on_error = false
blocking_ack_on_error = true
# This flag specifies if we should drop messages if there is no connection to the host
dropMessagesIfNoConnection = false
Version = 1
14 changes: 8 additions & 6 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ type Config struct {

// WebSocketConfig holds web sockets config
type WebSocketConfig struct {
Url string `toml:"url"`
MarshallerType string `toml:"marshaller_type"`
Mode string `toml:"mode"`
RetryDuration uint32 `toml:"retry_duration"`
WithAcknowledge bool `toml:"with_acknowledge"`
BlockingAckOnError bool `toml:"blocking_ack_on_error"`
Url string `toml:"url"`
MarshallerType string `toml:"marshaller_type"`
Mode string `toml:"mode"`
RetryDuration uint32 `toml:"retry_duration"`
WithAcknowledge bool `toml:"with_acknowledge"`
BlockingAckOnError bool `toml:"blocking_ack_on_error"`
DropMessagesIfNoConnection bool // Set to `true` to drop messages if there is no active WebSocket connection to send to.
Version uint32 // Defines the payload version.
}
2 changes: 1 addition & 1 deletion process/firehoseDataProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewFirehoseDataProcessor(
}

// ProcessPayload will process the received payload only for TopicSaveBlock, otherwise ignores it.
func (dp *dataProcessor) ProcessPayload(payload []byte, topic string) error {
func (dp *dataProcessor) ProcessPayload(payload []byte, topic string, _ uint32) error {
operationHandler, found := dp.operationHandlers[topic]
if !found {
return nil
Expand Down
33 changes: 17 additions & 16 deletions process/firehoseDataProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ import (
"github.com/multiversx/mx-chain-core-go/data/block"
outportcore "github.com/multiversx/mx-chain-core-go/data/outport"
"github.com/multiversx/mx-chain-core-go/marshal"
"github.com/multiversx/mx-chain-ws-connector-template-go/testscommon"
"github.com/stretchr/testify/require"

"github.com/multiversx/mx-chain-ws-connector-template-go/testscommon"
)

var protoMarshaller = &marshal.GogoProtoMarshalizer{}
Expand Down Expand Up @@ -86,14 +87,14 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) {

fi, _ := NewFirehoseDataProcessor(&testscommon.IoWriterStub{}, createContainer(), protoMarshaller)

err := fi.ProcessPayload(nil, outportcore.TopicSaveBlock)
err := fi.ProcessPayload(nil, outportcore.TopicSaveBlock, 1)
require.Equal(t, errNilOutportBlockData, err)

outportBlock := createOutportBlock()
outportBlock.BlockData = nil
outportBlockBytes, _ := protoMarshaller.Marshal(outportBlock)

err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock)
err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1)
require.Equal(t, errNilOutportBlockData, err)
})

Expand All @@ -102,7 +103,7 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) {

fi, _ := NewFirehoseDataProcessor(&testscommon.IoWriterStub{}, createContainer(), protoMarshaller)

err := fi.ProcessPayload([]byte("invalid payload"), outportcore.TopicSaveBlock)
err := fi.ProcessPayload([]byte("invalid payload"), outportcore.TopicSaveBlock, 1)
require.NotNil(t, err)
})

Expand All @@ -123,7 +124,7 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) {
fi, _ := NewFirehoseDataProcessor(ioWriter, createContainer(), protoMarshaller)

outportBlockBytes, _ := protoMarshaller.Marshal(outportBlock)
err := fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock)
err := fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1)
require.NotNil(t, err)
require.Equal(t, 0, ioWriterCalledCt)
})
Expand Down Expand Up @@ -163,7 +164,7 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) {
}

fi, _ := NewFirehoseDataProcessor(ioWriter, createContainer(), marshaller)
err := fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock)
err := fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1)
require.Equal(t, errUnmarshal, err)
})

Expand Down Expand Up @@ -197,13 +198,13 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) {
outportBlock := createOutportBlock()
outportBlockBytes, _ := protoMarshaller.Marshal(outportBlock)

err := fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock)
err := fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1)
require.True(t, strings.Contains(err.Error(), err1.Error()))

err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock)
err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1)
require.True(t, strings.Contains(err.Error(), err2.Error()))

err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock)
err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1)
require.Nil(t, err)

require.Equal(t, 5, ioWriterCalledCt)
Expand Down Expand Up @@ -253,7 +254,7 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) {

fi, _ := NewFirehoseDataProcessor(ioWriter, createContainer(), protoMarshaller)

err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock)
err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1)
require.Nil(t, err)
require.Equal(t, 2, ioWriterCalledCt)
})
Expand All @@ -265,12 +266,12 @@ func TestFirehoseIndexer_NoOperationFunctions(t *testing.T) {

fi, _ := NewFirehoseDataProcessor(&testscommon.IoWriterStub{}, createContainer(), protoMarshaller)

require.Nil(t, fi.ProcessPayload([]byte("payload"), "random topic"))
require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicSaveRoundsInfo))
require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicSaveValidatorsRating))
require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicSaveValidatorsPubKeys))
require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicSaveAccounts))
require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicFinalizedBlock))
require.Nil(t, fi.ProcessPayload([]byte("payload"), "random topic", 1))
require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicSaveRoundsInfo, 1))
require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicSaveValidatorsRating, 1))
require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicSaveValidatorsPubKeys, 1))
require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicSaveAccounts, 1))
require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicFinalizedBlock, 1))
}

func TestFirehoseIndexer_Close(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion process/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type WSConnector interface {

// DataProcessor defines a payload processor for incoming ws data
type DataProcessor interface {
ProcessPayload(payload []byte, topic string) error
ProcessPayload(payload []byte, topic string, version uint32) error
Close() error
IsInterfaceNil() bool
}
Expand Down

0 comments on commit c6df1e4

Please sign in to comment.