Skip to content

Commit

Permalink
retry close until timeout(default 120s) when close returns ErrReplica (
Browse files Browse the repository at this point in the history
  • Loading branch information
tangyoupeng authored Dec 4, 2024
1 parent 450d0de commit 8c11f83
Showing 1 changed file with 36 additions and 2 deletions.
38 changes: 36 additions & 2 deletions pkg/object/hdfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type hdfsclient struct {
c *hdfs.Client
dfsReplication int
umask os.FileMode
closeTimeout time.Duration
closeMaxDelay time.Duration
}

func (h *hdfsclient) String() string {
Expand Down Expand Up @@ -163,8 +165,19 @@ func (h *hdfsclient) Put(key string, in io.Reader, getters ...AttrGetter) (err e
_ = f.Close()
return err
}
err = f.Close()
if err != nil && !IsErrReplicating(err) {
start := time.Now()
sleeptime := 400 * time.Millisecond
for {
err = f.Close()
if IsErrReplicating(err) && start.Add(h.closeTimeout).After(time.Now()) {
time.Sleep(sleeptime)
sleeptime = min(2*sleeptime, h.closeMaxDelay)
continue
} else {
break
}
}
if err != nil {
return err
}
if !PutInplace {
Expand All @@ -173,6 +186,13 @@ func (h *hdfsclient) Put(key string, in io.Reader, getters ...AttrGetter) (err e
return err
}

func min(a, b time.Duration) time.Duration {
if a < b {
return a
}
return b
}

func IsErrReplicating(err error) bool {
pe, ok := err.(*os.PathError)
return ok && pe.Err == hdfs.ErrReplicating
Expand Down Expand Up @@ -328,13 +348,27 @@ func newHDFS(addr, username, sk, token string) (ObjectStorage, error) {
umask = uint16(x)
}
}
var closeTimeout = 120 * time.Second
if v, found := conf["ipc.client.rpc-timeout.ms"]; found {
if x, err := strconv.Atoi(v); err == nil {
closeTimeout = time.Duration(x) * time.Millisecond
}
}
var closeMaxDelay = 60 * time.Second
if v, found := conf["dfs.client.block.write.locateFollowingBlock.max.delay.ms"]; found {
if x, err := strconv.Atoi(v); err == nil {
closeMaxDelay = time.Duration(x) * time.Millisecond
}
}

return &hdfsclient{
addr: strings.Join(rpcAddr, ","),
basePath: basePath,
c: c,
dfsReplication: replication,
umask: os.FileMode(umask),
closeTimeout: closeTimeout,
closeMaxDelay: closeMaxDelay,
}, nil
}

Expand Down

0 comments on commit 8c11f83

Please sign in to comment.