Skip to content

Commit

Permalink
zkplus: Add option to check for root node existance (#217)
Browse files Browse the repository at this point in the history
Instead of always trying to create it if it doesn't exist.

This will allow zkplus to work in environments where certain
root nodes are read-only.
  • Loading branch information
benkeith-splunk authored Apr 8, 2021
1 parent c5ab602 commit a2eaf9d
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 3 deletions.
11 changes: 11 additions & 0 deletions zkplus/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ type Builder struct {
pathPrefix string
zkConnector ZkConnector
logger log.Logger
createRoot bool
}

// NewBuilder creates a new builder for making ZkPlus connections
func NewBuilder() *Builder {
return &Builder{
pathPrefix: "",
logger: DefaultLogger,
createRoot: true,
}
}

Expand Down Expand Up @@ -58,6 +60,14 @@ func (b *Builder) Logger(logger log.Logger) *Builder {
return b
}

// CreateRootNode determines whether the root zk node is created if it doesn't
// exist already in ZK. If this is false, but the root node does not exist in
// ZK, the connection will error out.
func (b *Builder) CreateRootNode(createRoot bool) *Builder {
b.createRoot = createRoot
return b
}

// ZkPlus copies the config from another connection
func (b *Builder) ZkPlus(zkPlus *ZkPlus) *Builder {
b.pathPrefix = zkPlus.pathPrefix
Expand Down Expand Up @@ -95,6 +105,7 @@ func (b *Builder) Build() (*ZkPlus, error) {
ret := &ZkPlus{
pathPrefix: prefix,
logger: b.logger,
createRoot: b.createRoot,

zkConnector: b.zkConnector,
exposedChan: make(chan zk.Event),
Expand Down
1 change: 1 addition & 0 deletions zkplus/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func TestInnerBuilder(t *testing.T) {

q := NewBuilder().ZkPlus(zkp)
assert.Equal(t, "/a/b", q.pathPrefix)
assert.Equal(t, true, q.createRoot)

zkp2, _, err := q.BuildDirect()
assert.NoError(t, err)
Expand Down
19 changes: 16 additions & 3 deletions zkplus/zkplus.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type ZkPlus struct {
pathPrefix string
zkConnector ZkConnector
logger log.Logger
createRoot bool

connectedConn zktest.ZkConnSupported
connectedChan <-chan zk.Event
Expand Down Expand Up @@ -98,9 +99,21 @@ func (z *ZkPlus) ensureRootPath(conn zktest.ZkConnSupported) error {
continue
}
totalPath = totalPath + "/" + p
_, err := conn.Create(totalPath, []byte(""), 0, zk.WorldACL(zk.PermAll))
if err != nil && err != zk.ErrNodeExists {
return errors.Annotatef(err, "cannot create path %s", totalPath)

if z.createRoot {
_, err := conn.Create(totalPath, []byte(""), 0, zk.WorldACL(zk.PermAll))
if err != nil && err != zk.ErrNodeExists {
return errors.Annotatef(err, "cannot create path %s", totalPath)
}
} else {
exists, _, err := conn.Exists(totalPath)
if err != nil {
return errors.Annotatef(err, "cannot verify that node %q exists", totalPath)
}

if !exists {
return fmt.Errorf("root node %q does not exist", totalPath)
}
}
}
return nil
Expand Down
39 changes: 39 additions & 0 deletions zkplus/zkplus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package zkplus

import (
"errors"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -61,6 +62,7 @@ func testPrefix(t *testing.T, zkp zktest.ZkConnSupported) {
func TestErrorEnsureRoot(t *testing.T) {
zkp := &ZkPlus{
pathPrefix: "/a/b/c",
createRoot: true,
}
z, ch, _ := zktest.New().Connect()
createError := make(chan struct{}, 3)
Expand All @@ -81,6 +83,43 @@ func TestErrorEnsureRoot(t *testing.T) {
zkp.Close()
}

func TestErrorEnsureRootNoCreate(t *testing.T) {
zkp := &ZkPlus{
pathPrefix: "/a/b/c",
createRoot: false,
}
z, ch, _ := zktest.New().Connect()
existsError := make(chan struct{}, 3)
z.SetErrorCheck(func(s string) error {
fmt.Printf("func(%s)\n", s)
if s == "exists" {
existsError <- struct{}{}
return errors.New("i don't allow exists")
}
return nil
})
assert.Error(t, zkp.ensureRootPath(z))
<-existsError

z.SetErrorCheck(func(s string) error {
return nil
})
assert.Error(t, zkp.ensureRootPath(z))

_, err := z.Create("/a", []byte(""), 0, zk.WorldACL(zk.PermAll))
assert.NoError(t, err)
_, err = z.Create("/a/b", []byte(""), 0, zk.WorldACL(zk.PermAll))
assert.NoError(t, err)
_, err = z.Create("/a/b/c", []byte(""), 0, zk.WorldACL(zk.PermAll))
assert.NoError(t, err)

assert.NoError(t, zkp.ensureRootPath(z))

zkp, err = NewBuilder().PathPrefix("/test").CreateRootNode(false).Connector(&StaticConnector{C: z, Ch: ch}).Build()
assert.NoError(t, err)
zkp.Close()
}

func TestWatches(t *testing.T) {
z, ch, _ := zktest.New().Connect()
zkp, err := NewBuilder().PathPrefix("/test").Connector(&StaticConnector{C: z, Ch: ch}).Build()
Expand Down

0 comments on commit a2eaf9d

Please sign in to comment.