Skip to content

Commit

Permalink
Added scattering flag to cursor (#86)
Browse files Browse the repository at this point in the history
  • Loading branch information
FelGel authored Jan 21, 2021
1 parent 6c806b8 commit 513ec31
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 0 deletions.
6 changes: 6 additions & 0 deletions pkg/dataplane/itemscursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type ItemsCursor struct {
items []Item
getItemsInput *GetItemsInput
container Container
scattered bool
}

func NewItemsCursor(container Container, getItemsInput *GetItemsInput) (*ItemsCursor, error) {
Expand Down Expand Up @@ -125,6 +126,10 @@ func (ic *ItemsCursor) GetItem() Item {
return ic.currentItem
}

func (ic *ItemsCursor) Scattered() bool {
return ic.scattered
}

func (ic *ItemsCursor) setResponse(response *Response) {
ic.currentResponse = response

Expand All @@ -134,4 +139,5 @@ func (ic *ItemsCursor) setResponse(response *Response) {
ic.nextMarker = getItemsOutput.NextMarker
ic.items = getItemsOutput.Items
ic.itemIndex = 0
ic.scattered = getItemsOutput.Scattered
}
125 changes: 125 additions & 0 deletions pkg/dataplane/test/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package test

import (
"fmt"
"math/rand"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -897,6 +898,117 @@ func (suite *syncKVTestSuite) TestPutItems() {
suite.deleteItems(items)
}

func (suite *syncKVTestSuite) TestScatteredCursor() {
path := "/emd0"

scatteredItemKeys := []string{"louise", "karen"}
items := map[string]map[string]interface{}{
"bob": {"age": 42, "feature": "mustache"},
"linda": {"age": 41, "feature": "singing"},
"natan": {"age": 35, "feature": "one leg"},
"donald": {"age": 20, "feature": "teeth"},
scatteredItemKeys[0]: {"timestamp": time.Now().UnixNano(), "blob0": randomString(60000)},
scatteredItemKeys[1]: {"timestamp": time.Now().UnixNano(), "blob0": randomString(60000)},
}

putItemsInput := &v3io.PutItemsInput{
Path: path,
Items: items,
}

// Store initial items
suite.populateDataPlaneInput(&putItemsInput.DataPlaneInput)
response, err := suite.container.PutItemsSync(putItemsInput)
suite.Require().NoError(err, "Failed to put items")
putItemsOutput := response.Output.(*v3io.PutItemsOutput)
suite.Require().True(putItemsOutput.Success)
response.Release()

// update `scatteredItemKeys` items with big KV entries to force them to scatter
for _, key := range scatteredItemKeys {
updateItemInput := v3io.UpdateItemInput{
Path: fmt.Sprintf("%s/%s", path, key),
}

// because of request size limit we will have to update items in parts
for i := 0; i < 4; i++ {
attributes := map[string]interface{}{}
for j := 0; j < 30; j++ {
attributes[fmt.Sprintf("%s_%s_%d_%d", "blob", key, i, j)] = randomString(60000)
}
updateItemInput.Attributes = attributes
suite.populateDataPlaneInput(&updateItemInput.DataPlaneInput)
response, err := suite.container.UpdateItemSync(&updateItemInput)
suite.Require().NoError(err, "Failed to update item")
response.Release()
}
}

// Get cursor
getItemsInput := v3io.GetItemsInput{
Path: path + "/",
AttributeNames: []string{"**"},
AllowObjectScatter: "true",
}
suite.populateDataPlaneInput(&getItemsInput.DataPlaneInput)
cursor, err := v3io.NewItemsCursor(suite.container, &getItemsInput)
suite.Require().NoError(err, "Failed to get cursor")

// extract and combine scattered items
scatteredItems := map[string]map[string]interface{}{}
var retrievedItems []map[string]interface{}
for cursor.NextSync() {
item := cursor.GetItem()
inode := item["__inode_number"]
ctimeSec := item["__ctime_secs"]
ctimeNSec := item["__ctime_nsecs"]
ctime := int64(ctimeSec.(int))*1e9 + int64(ctimeNSec.(int))
objectID := fmt.Sprintf("%d.%d", inode.(int), ctime)

scatteredItem, scatteredItemFound := scatteredItems[objectID]
if scatteredItemFound {
for key, value := range item {
scatteredItem[key] = value
}
item = scatteredItem
}
if cursor.Scattered() || scatteredItemFound {
scatteredItems[objectID] = item
} else {
retrievedItems = append(retrievedItems, item)
}

if !cursor.Scattered() {
for _, scatteredItem := range scatteredItems {
retrievedItems = append(retrievedItems, scatteredItem)
}
scatteredItems = map[string]map[string]interface{}{}
}
}
cursor.Release()

// validate results
suite.Assert().Equal(len(items), len(retrievedItems))
for _, retrievedItem := range retrievedItems {
suite.Assert().Contains(items, retrievedItem["__name"])
for _, scatteredItemKey := range scatteredItemKeys {
if retrievedItem["__name"] == scatteredItemKey {

// count number of blob keys
blobCounter := 0
for key, _ := range retrievedItem {
if strings.HasPrefix(key, "blob_") {
blobCounter++
}
}
suite.Assert().Equal(4*30, blobCounter)
}
}
}

suite.deleteItems(items)
}

func (suite *syncKVTestSuite) TestPutItemsWithError() {
items := map[string]map[string]interface{}{
"bob": {"age": 42, "feature": "mustache"},
Expand Down Expand Up @@ -1497,3 +1609,16 @@ func validateCommonPrefix(suite *syncContainerTestSuite, prefix *v3io.CommonPref
suite.Require().Nil(prefix.InodeNumber)
}
}

func randomString(len int) string {
bytes := make([]byte, len)
for i := 0; i < len; i++ {
bytes[i] = byte(randInt(97, 122))
}

return string(bytes)
}

func randInt(min int, max int) int {
return min + rand.Intn(max-min)
}

0 comments on commit 513ec31

Please sign in to comment.