Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check and clean nodes which were created by the server, but client is not aware of it #3

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
79 changes: 76 additions & 3 deletions zk/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ type Conn struct {
debugCloseRecvLoop bool
debugReauthDone chan struct{}

cleanupChan chan zkNode

logger Logger
logInfo bool // true if information messages are logged; false if only errors are logged

Expand Down Expand Up @@ -206,6 +208,7 @@ func Connect(servers []string, sessionTimeout time.Duration, options ...connOpti
logger: DefaultLogger,
logInfo: true, // default is true for backwards compatability
buf: make([]byte, bufferSize),
cleanupChan: make(chan string),
}

// Set provided options.
Expand All @@ -225,6 +228,12 @@ func Connect(servers []string, sessionTimeout time.Duration, options ...connOpti
conn.invalidateWatches(ErrClosing)
close(conn.eventChan)
}()

go func() {
conn.cleanLoop()
close(conn.cleanupChan)
}()

return conn, ec, nil
}

Expand Down Expand Up @@ -1102,7 +1111,7 @@ func (c *Conn) CreateProtectedEphemeralSequential(path string, data []byte, acl
case ErrConnectionClosed:
children, _, err := c.Children(rootPath)
if err != nil {
return "", err
return protectedPath, err
}
for _, p := range children {
parts := strings.Split(p, "/")
Expand All @@ -1115,10 +1124,10 @@ func (c *Conn) CreateProtectedEphemeralSequential(path string, data []byte, acl
case nil:
return newPath, nil
default:
return "", err
return protectedPath, err
}
}
return "", err
return protectedPath, err
}

func (c *Conn) Delete(path string, version int32) error {
Expand Down Expand Up @@ -1276,3 +1285,67 @@ func (c *Conn) Server() string {
defer c.serverMu.Unlock()
return c.server
}

// zkNode represent zookeeper node with a session identifier within which the node was created
type zkNode struct {
path string
sessionID int64
}

// cleanLoop cleans obsolete nodes, which were created, but after some erroneous behaviour (e.g. ErrConnectionClosed)
// are left bound to the session, but the client is not aware of it anymore, since the error has been propagated up the call stack
func (c *Conn) cleanLoop() {
for {
select {
case l := <-c.cleanupChan:
c.clean(l)
case <-c.shouldQuit:
return
}
}
}

// clean deletes the node if it still exists,
// the input nodePath might not have sequence number at the end, so we have to check whether the prefix,
// protected by guid, matches one of the children of the root path
func (c *Conn) clean(node zkNode) {
// if it is a new session established, all ephemeral nodes from the previous will be cleared by zkServer
if c.sessionID != node.sessionID {
return
}

parts := strings.Split(node.path, "/")
rootPath := strings.Join(parts[:len(parts)-1], "/")
if rootPath == "" {
rootPath = "/"
}
nodeName := parts[len(parts)-1]

for {
children, _, err := c.Children(rootPath)
if err != nil {
if err == ErrNoNode {
break
}
c.logger.Printf("cannot get children of the node %s, err %v", rootPath, err)
continue
}
exist := false
for _, p := range children {
if strings.HasPrefix(p, nodeName) {
exist = true
break
}
}

if exist {
if err := c.Delete(node.path, -1); err != nil {
if err != ErrNoNode {
c.logger.Printf("cannot clean the node %s, err %v", node.path, err)
continue
}
}
}
break
}
}
45 changes: 45 additions & 0 deletions zk/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,48 @@ func TestRecurringReAuthHang(t *testing.T) {

<-conn.debugReauthDone
}

func TestClean(t *testing.T) {
ts, err := StartTestCluster(t, 1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()

zk, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()

acls := WorldACL(PermAll)
_, err = zk.Create("/cleanup", []byte{}, 0, acls)
if err != nil {
t.Fatalf("Create returned error: %v", err)
}

path, err := zk.CreateProtectedEphemeralSequential("/cleanup/lock-", []byte{}, acls)
if err != nil {
t.Fatalf("Create returned error: %v", err)
}

zk.cleanupChan <- zkNode{
path: path,
sessionID: zk.sessionID,
}

exists, _, evCh, err := zk.ExistsW(path)
if exists {
select {
case ev := <-evCh:
if ev.Err != nil {
t.Fatalf("ExistW event returned with error %v", err)
}
if ev.Type != EventNodeDeleted {
t.Fatal("Wrong event received")
}
case <-time.After(1 * time.Second):
t.Fatal("Node is not cleared")
}
}
}
118 changes: 40 additions & 78 deletions zk/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@ var (

// Lock is a mutual exclusion lock.
type Lock struct {
c *Conn
path string
acl []ACL
lockPath string
seq int
attemptedLockPath string
c *Conn
path string
acl []ACL
lockPath string
seq int
}

// NewLock creates a new lock instance using the provided connection, path, and acl.
Expand All @@ -44,33 +43,31 @@ func parseSeq(path string) (int, error) {
// is acquired or an error occurs. If this instance already has the lock
// then ErrDeadlock is returned.
func (l *Lock) Lock() error {
if l.lockPath != "" {
return ErrDeadlock
path, err := l.lock()
if err != nil {
if err == ErrConnectionClosed && path != "" {
l.c.cleanupChan <- zkNode{
path: path,
sessionID: l.c.sessionID,
}
}
return err
}
return nil
}

if l.attemptedLockPath != "" {
// Check whether lock has been acquired previously and it still exists
if lockExists(l.c, l.path, l.attemptedLockPath) {
l.lockPath = l.attemptedLockPath
return nil
}
func (l *Lock) lock() (string, error) {
path := ""
if l.lockPath != "" {
return path, ErrDeadlock
}

prefix := fmt.Sprintf("%s/lock-", l.path)

path := ""
var err error
tryLock:
for i := 0; i < 3; i++ {
path, err = l.c.CreateProtectedEphemeralSequential(prefix, []byte{}, l.acl)

if path != "" {
// Store the path of newly created sequential ephemeral znode
l.attemptedLockPath = path
}

switch err {
case ErrNoNode:
if err == ErrNoNode {
// Create parent node.
parts := strings.Split(l.path, "/")
pth := ""
Expand All @@ -79,35 +76,35 @@ tryLock:
pth += "/" + p
exists, _, err = l.c.Exists(pth)
if err != nil {
return err
return path, err
}
if exists == true {
continue
}
_, err = l.c.Create(pth, []byte{}, 0, l.acl)
if err != nil && err != ErrNodeExists {
return err
return path, err
}
}
case nil:
break tryLock
default:
return err
} else if err == nil {
break
} else {
return path, err
}
}
if err != nil {
return err
return path, err
}

seq, err := parseSeq(path)
if err != nil {
return err
return path, err
}

for {
children, _, err := l.c.Children(l.path)
if err != nil {
return err
return path, err
}

lowestSeq := seq
Expand All @@ -116,7 +113,7 @@ tryLock:
for _, p := range children {
s, err := parseSeq(p)
if err != nil {
return err
return path, err
}
if s < lowestSeq {
lowestSeq = s
Expand All @@ -135,21 +132,21 @@ tryLock:
// Wait on the node next in line for the lock
_, _, ch, err := l.c.GetW(l.path + "/" + prevSeqPath)
if err != nil && err != ErrNoNode {
return err
return path, err
} else if err != nil && err == ErrNoNode {
// try again
continue
}

ev := <-ch
if ev.Err != nil {
return ev.Err
return path, ev.Err
}
}

l.seq = seq
l.lockPath = path
return nil
return path, nil
}

// Unlock releases an acquired lock. If the lock is not currently acquired by
Expand All @@ -159,50 +156,15 @@ func (l *Lock) Unlock() error {
return ErrNotLocked
}
if err := l.c.Delete(l.lockPath, -1); err != nil {
if err == ErrConnectionClosed {
l.c.cleanupChan <- zkNode{
path: l.lockPath,
sessionID: l.c.sessionID,
}
}
return err
}
// Perform clean up
l.lockPath = ""
l.seq = 0
l.attemptedLockPath = ""

return nil
}

//Check whether lock got created and response was lost because of network partition failure.
//It queries zookeeper and scans existing sequential ephemeral znodes under the parent path
//It finds out that previously requested sequence number corresponds to child having lowest sequence number
func lockExists(c *Conn, rootPath string, znodePath string) bool {
seq, err := parseSeq(znodePath)
if err != nil {
return false
}

//Scan the existing znodes if there are any
children, _, err := c.Children(rootPath)
if err != nil {
return false
}

lowestSeq := seq
prevSeq := -1
for _, p := range children {
s, err := parseSeq(p)
if err != nil {
return false
}
if s < lowestSeq {
lowestSeq = s
}
if s < seq && s > prevSeq {
prevSeq = s
}
}

if seq == lowestSeq {
// Acquired the lock
return true
}

return false
}
Loading