Skip to content

Commit

Permalink
Remove lock in consumer group, clean up context API (#52)
Browse files Browse the repository at this point in the history
  • Loading branch information
pavius authored Feb 18, 2020
1 parent 86cd450 commit 29e890d
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 42 deletions.
15 changes: 7 additions & 8 deletions pkg/dataplane/http/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,7 @@ func NewClient(newClientInput *NewClientInput) *fasthttp.Client {
}
}

func NewDefaultClient() *fasthttp.Client {
return NewClient(&NewClientInput{})
}

func NewContext(parentLogger logger.Logger, client *fasthttp.Client, newContextInput *v3io.NewContextInput) (v3io.Context, error) {
func NewContext(parentLogger logger.Logger, newContextInput *NewContextInput) (v3io.Context, error) {
requestChanLen := newContextInput.RequestChanLen
if requestChanLen == 0 {
requestChanLen = 1024
Expand All @@ -82,9 +78,14 @@ func NewContext(parentLogger logger.Logger, client *fasthttp.Client, newContextI
numWorkers = 8
}

httpClient := newContextInput.HTTPClient
if httpClient == nil {
httpClient = NewClient(&NewClientInput{})
}

newContext := &context{
logger: parentLogger.GetChild("context.http"),
httpClient: client,
httpClient: httpClient,
requestChan: make(chan *v3io.Request, requestChanLen),
numWorkers: numWorkers,
}
Expand Down Expand Up @@ -687,8 +688,6 @@ func (c *context) PutRecordsSync(putRecordsInput *v3io.PutRecordsInput) (*v3io.R
}

buffer.WriteString(`]}`)
str := buffer.String()
fmt.Println(str)

response, err := c.sendRequest(&putRecordsInput.DataPlaneInput,
http.MethodPost,
Expand Down
9 changes: 9 additions & 0 deletions pkg/dataplane/http/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package v3iohttp

import "github.com/valyala/fasthttp"

type NewContextInput struct {
HTTPClient *fasthttp.Client
NumWorkers int
RequestChanLen int
}
9 changes: 0 additions & 9 deletions pkg/dataplane/streamconsumergroup/sequencenumberhandler.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package streamconsumergroup

import (
"sync"
"time"

"github.com/v3io/v3io-go/pkg/common"
Expand All @@ -17,7 +16,6 @@ type sequenceNumberHandler struct {
logger logger.Logger
member *member
markedShardSequenceNumbers []uint64
markedShardSequenceNumbersLock sync.RWMutex
stopMarkedShardSequenceNumberCommitterChan chan struct{}
lastCommittedShardSequenceNumbers []uint64
}
Expand Down Expand Up @@ -54,12 +52,7 @@ func (snh *sequenceNumberHandler) stop() error {
}

func (snh *sequenceNumberHandler) markShardSequenceNumber(shardID int, sequenceNumber uint64) error {

// lock semantics are reverse - it's OK to write in parallel since each write goes
// to a different cell in the array, but once a read is happening we need to stop the world
snh.markedShardSequenceNumbersLock.RLock()
snh.markedShardSequenceNumbers[shardID] = sequenceNumber
snh.markedShardSequenceNumbersLock.RUnlock()

return nil
}
Expand Down Expand Up @@ -88,9 +81,7 @@ func (snh *sequenceNumberHandler) commitMarkedShardSequenceNumbers() error {
var markedShardSequenceNumbersCopy []uint64

// create a copy of the marked shard sequenceNumbers
snh.markedShardSequenceNumbersLock.Lock()
markedShardSequenceNumbersCopy = append(markedShardSequenceNumbersCopy, snh.markedShardSequenceNumbers...)
snh.markedShardSequenceNumbersLock.Unlock()

// if there was no chance since last, do nothing
if common.Uint64SlicesEqual(snh.lastCommittedShardSequenceNumbers, markedShardSequenceNumbersCopy) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/dataplane/test/streamconsumergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ type recordData struct {
}

type streamConsumerGroupTestSuite struct {
StreamTestSuite
streamTestSuite
streamPath string
}

func (suite *streamConsumerGroupTestSuite) SetupSuite() {
suite.StreamTestSuite.SetupSuite()
suite.streamTestSuite.SetupSuite()
suite.createContainer()
suite.streamPath = fmt.Sprintf("%s/test-stream-0/", suite.testPath)
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/dataplane/test/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,22 +632,22 @@ func (suite *syncContainerKVTestSuite) SetupSuite() {

type syncStreamTestSuite struct {
syncTestSuite
StreamTestSuite StreamTestSuite
streamTestSuite streamTestSuite
}

func (suite *syncStreamTestSuite) SetupTest() {
suite.StreamTestSuite = StreamTestSuite{
suite.streamTestSuite = streamTestSuite{
testSuite: suite.syncTestSuite.testSuite,
}
suite.StreamTestSuite.SetupTest()
suite.streamTestSuite.SetupTest()
}

func (suite *syncStreamTestSuite) TearDownTest() {
suite.StreamTestSuite.TearDownTest()
suite.streamTestSuite.TearDownTest()
}

func (suite *syncStreamTestSuite) TestStream() {
streamPath := fmt.Sprintf("%s/mystream/", suite.StreamTestSuite.testPath)
streamPath := fmt.Sprintf("%s/mystream/", suite.streamTestSuite.testPath)

//
// Create the stream
Expand Down
16 changes: 6 additions & 10 deletions pkg/dataplane/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (suite *testSuite) createContext() {
var err error

// create a context
suite.container, err = v3iohttp.NewContext(suite.logger, v3iohttp.NewDefaultClient(), &v3io.NewContextInput{})
suite.container, err = v3iohttp.NewContext(suite.logger, &v3iohttp.NewContextInput{})
suite.Require().NoError(err)

// populate fields that would have been populated by session/container
Expand All @@ -57,7 +57,7 @@ func (suite *testSuite) createContext() {
func (suite *testSuite) createContainer() {

// create a context
context, err := v3iohttp.NewContext(suite.logger, v3iohttp.NewDefaultClient(), &v3io.NewContextInput{})
context, err := v3iohttp.NewContext(suite.logger, &v3iohttp.NewContextInput{})
suite.Require().NoError(err)

session, err := context.NewSession(&v3io.NewSessionInput{
Expand All @@ -74,17 +74,13 @@ func (suite *testSuite) createContainer() {
suite.Require().NoError(err)
}

type StreamTestSuite struct { // nolint: deadcode
type streamTestSuite struct { // nolint: deadcode
testSuite
testPath string
}

func (suite *StreamTestSuite) SetupSuite() {
suite.testSuite.SetupSuite()
func (suite *streamTestSuite) SetupTest() {
suite.testPath = "/stream-test"
}

func (suite *StreamTestSuite) SetupTest() {
err := suite.deleteAllStreamsInPath(suite.testPath)

// get the underlying root error
Expand All @@ -97,12 +93,12 @@ func (suite *StreamTestSuite) SetupTest() {
}
}

func (suite *StreamTestSuite) TearDownTest() {
func (suite *streamTestSuite) TearDownTest() {
err := suite.deleteAllStreamsInPath(suite.testPath)
suite.Require().NoError(err, "Failed to tear down test suite")
}

func (suite *StreamTestSuite) deleteAllStreamsInPath(path string) error {
func (suite *streamTestSuite) deleteAllStreamsInPath(path string) error {
getContainerContentsInput := v3io.GetContainerContentsInput{
Path: path,
}
Expand Down
8 changes: 0 additions & 8 deletions pkg/dataplane/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,12 @@ import (
"strconv"
"strings"
"time"

"github.com/valyala/fasthttp"
)

//
// Control plane
//

type NewContextInput struct {
Client *fasthttp.Client
NumWorkers int
RequestChanLen int
}

type NewSessionInput struct {
URL string
Username string
Expand Down

0 comments on commit 29e890d

Please sign in to comment.