Skip to content

Commit

Permalink
GetShard: support multiple chunks in the same item
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Bondar authored and assaf758 committed Apr 4, 2021
1 parent e03da7d commit 30fbdd8
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 38 deletions.
34 changes: 20 additions & 14 deletions pkg/dataplane/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,55 +85,61 @@ func (i Item) GetFieldUint64(name string) (uint64, error) {
}

// For internal use only - DO NOT USE!
func (i Item) GetShard() (int, []*ItemChunkData, *ItemChunkMetadata, *ItemCurrentChunkMetadata, error) {
func (i Item) GetShard() (map[int]*ItemChunk, *ItemCurrentChunkMetadata, error) {
const streamDataPrefix = "__data_stream["
const streamMetadataPrefix = "__data_stream_metadata["
const offsetPrefix = "__data_stream[0000]["

var chunkDataArray []*ItemChunkData
chunkMetaData := ItemChunkMetadata{}
currentChunkMetadata := ItemCurrentChunkMetadata{}
var chunkID int
chunkMap := make(map[int]*ItemChunk)

for k, v := range i {
if strings.HasPrefix(k, streamDataPrefix) {
chunkID, _ = strconv.Atoi(k[len(streamDataPrefix):][:4])
chunkID, _ := strconv.Atoi(k[len(streamDataPrefix):][:4])
offset, _ := strconv.ParseUint(k[len(offsetPrefix):][:16], 16, 64)
data, ok := v.([]byte)
if !ok {
return 0, nil, nil, nil, v3ioerrors.ErrInvalidTypeConversion
return nil, nil, v3ioerrors.ErrInvalidTypeConversion
}
streamData := ItemChunkData{Offset: offset, Data: &data}
chunkDataArray = append(chunkDataArray, &streamData)
if _, ok := chunkMap[chunkID]; !ok {
chunkMap[chunkID] = &ItemChunk{}
}
chunkMap[chunkID].Data = append(chunkMap[chunkID].Data, &streamData)
}

if strings.HasPrefix(k, streamMetadataPrefix) {
chunkID, _ = strconv.Atoi(k[len(streamMetadataPrefix):][:4])
chunkID, _ := strconv.Atoi(k[len(streamMetadataPrefix):][:4])
if _, ok := chunkMap[chunkID]; !ok {
chunkMap[chunkID] = &ItemChunk{}
}

metadata, ok := v.([]byte)
if !ok {
return 0, nil, nil, nil, v3ioerrors.ErrInvalidTypeConversion
return nil, nil, v3ioerrors.ErrInvalidTypeConversion
}

buf := bytes.NewBuffer(metadata[8:64])
chunkMetaData := ItemChunkMetadata{}
err := binary.Read(buf, binary.LittleEndian, &chunkMetaData)
if err != nil {
return 0, nil, nil, nil, err
return nil, nil, err
}
chunkMap[chunkID].Metadata = &chunkMetaData

buf = bytes.NewBuffer(metadata[0:1])
var isCurrent bool
err = binary.Read(buf, binary.LittleEndian, &isCurrent)
if err != nil {
return 0, nil, nil, nil, err
return nil, nil, err
}
if isCurrent {
buf = bytes.NewBuffer(metadata[64:110])
err = binary.Read(buf, binary.LittleEndian, &currentChunkMetadata)
if err != nil {
return 0, nil, nil, nil, err
return nil, nil, err
}
}
}
}
return chunkID, chunkDataArray, &chunkMetaData, &currentChunkMetadata, nil
return chunkMap, &currentChunkMetadata, nil
}
48 changes: 24 additions & 24 deletions pkg/dataplane/test/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1258,21 +1258,21 @@ func (suite *syncStreamTestSuite) TestStream() {
for _, cursorItem := range cursorItems {
shardName, err := cursorItem.GetFieldString("__name")
suite.Require().NoError(err, "Failed to get item name")
chunkId, streamData, chunkMetadata, _, err := cursorItem.GetShard()
chunkMap , _, err := cursorItem.GetShard()
suite.Require().NoError(err, "Failed to get stream")

suite.Require().Equal(0, chunkId, "chunk indexes doesn't match")
suite.Require().True(chunkMetadata.ChunkSeqNumber == 0)
suite.Require().True(chunkMetadata.FirstRecordSeqNumber == 1)
suite.Require().True(chunkMetadata.LengthInBytes == 0)
suite.Require().Contains(chunkMap, 0, "chunk indexes doesn't match")
suite.Require().True(chunkMap[0].Metadata.ChunkSeqNumber == 0)
suite.Require().True(chunkMap[0].Metadata.FirstRecordSeqNumber == 1)
suite.Require().True(chunkMap[0].Metadata.LengthInBytes == 0)
switch shardName {
case "0":
suite.Require().True(strings.Contains(string(*streamData[0].Data), string(records[4].Data)))
suite.Require().True(strings.Contains(string(*chunkMap[0].Data[0].Data), string(records[4].Data)))
case "1":
suite.Require().True(strings.Contains(string(*streamData[0].Data), string(records[0].Data)))
suite.Require().True(strings.Contains(string(*streamData[0].Data), string(records[1].Data)))
suite.Require().True(strings.Contains(string(*chunkMap[0].Data[0].Data), string(records[0].Data)))
suite.Require().True(strings.Contains(string(*chunkMap[0].Data[0].Data), string(records[1].Data)))
case "2":
suite.Require().True(strings.Contains(string(*streamData[0].Data), string(records[3].Data)))
suite.Require().True(strings.Contains(string(*chunkMap[0].Data[0].Data), string(records[3].Data)))
}
}

Expand Down Expand Up @@ -1355,34 +1355,34 @@ func (suite *syncStreamBackupRestoreTestSuite) TestStream() {
cursorItems, err := cursor.AllSync()
suite.Require().NoError(err)

type Chunk struct {
Data []*v3io.ItemChunkData
ChunkMetadata *v3io.ItemChunkMetadata
}
type Shard struct {
Chunks map[int]*Chunk
Chunks map[int]*v3io.ItemChunk
CurrentChunk *v3io.ItemCurrentChunkMetadata
}
streamBackup := map[string]*Shard{}
for _, cursorItem := range cursorItems {
shardName, err := cursorItem.GetFieldString("__name")
suite.Require().NoError(err, "Failed to get item name")
chunkId, chunkDataArray, chunkMetadata, currentChunkMetadata, err := cursorItem.GetShard()
chunkMap, currentChunkMetadata, err := cursorItem.GetShard()
suite.Require().NoError(err, "Failed to get stream")

if _, ok := streamBackup[shardName]; !ok {
streamBackup[shardName] = &Shard{Chunks: map[int]*Chunk{}}
streamBackup[shardName] = &Shard{Chunks: map[int]*v3io.ItemChunk{}}
}

if _, ok := streamBackup[shardName].Chunks[chunkId]; !ok {
streamBackup[shardName].Chunks[chunkId] = &Chunk{}
}
if chunkMetadata != nil {
(*streamBackup[shardName].Chunks[chunkId]).ChunkMetadata = chunkMetadata
}
if len(chunkDataArray) != 0 {
(*streamBackup[shardName].Chunks[chunkId]).Data = chunkDataArray
for chunkId := range chunkMap {
if _, ok := streamBackup[shardName].Chunks[chunkId]; !ok {
streamBackup[shardName].Chunks[chunkId] = &v3io.ItemChunk{}
}

if chunkMap[chunkId].Metadata != nil {
(*streamBackup[shardName].Chunks[chunkId]).Metadata = chunkMap[chunkId].Metadata
}
if len(chunkMap[chunkId].Data) != 0 {
(*streamBackup[shardName].Chunks[chunkId]).Data = chunkMap[chunkId].Data
}
}

if currentChunkMetadata != nil {
streamBackup[shardName].CurrentChunk = currentChunkMetadata
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/dataplane/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,3 +469,8 @@ type ItemChunkData struct {
Offset uint64
Data *[]byte
}

type ItemChunk struct {
Metadata *ItemChunkMetadata
Data []*ItemChunkData
}

0 comments on commit 30fbdd8

Please sign in to comment.