Skip to content

Commit

Permalink
Support peer task data multiplex (dragonflyoss#347)
Browse files Browse the repository at this point in the history
Signed-off-by: Jim Ma <[email protected]>
  • Loading branch information
jim3ma authored Jun 18, 2021
1 parent 41599d6 commit c5acdbf
Show file tree
Hide file tree
Showing 31 changed files with 577 additions and 197 deletions.
2 changes: 2 additions & 0 deletions client/config/constants_otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
AttributeTaskContentLength = attribute.Key("d7y.peer.task.content_length")
AttributePeerID = attribute.Key("d7y.peer.id")
AttributeTargetPeerID = attribute.Key("d7y.peer.target.id")
AttributeReusePeerID = attribute.Key("d7y.peer.reuse.id")
AttributeTargetPeerAddr = attribute.Key("d7y.peer.target.addr")
AttributeMainPeer = attribute.Key("d7y.peer.task.main_peer")
AttributePeerPacketCode = attribute.Key("d7y.peer.packet.code")
Expand All @@ -42,6 +43,7 @@ const (

SpanFilePeerTask = "file-peer-task"
SpanStreamPeerTask = "stream-peer-task"
SpanReusePeerTask = "reuse-peer-task"
SpanRegisterTask = "register"
SpanFirstSchedule = "schedule-#1"
SpanGetPieceTasks = "get-piece-tasks"
Expand Down
6 changes: 4 additions & 2 deletions client/config/peerhost.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,10 @@ type StorageOption struct {
// after this period cache file will be gc
TaskExpireTime clientutil.Duration `mapstructure:"task_expire_time" yaml:"task_expire_time"`
// DiskGCThreshold indicates the threshold to gc the oldest tasks
DiskGCThreshold unit.Bytes `mapstructure:"disk_gc_threshold" yaml:"disk_gc_threshold"`
StoreStrategy StoreStrategy `mapstructure:"strategy" yaml:"strategy"`
DiskGCThreshold unit.Bytes `mapstructure:"disk_gc_threshold" yaml:"disk_gc_threshold"`
// Multiplex indicates reusing underlying storage for same task id
Multiplex bool `mapstructure:"multiplex" yaml:"multiplex"`
StoreStrategy StoreStrategy `mapstructure:"strategy" yaml:"strategy"`
}

type StoreStrategy string
Expand Down
1 change: 1 addition & 0 deletions client/config/peerhost_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,5 +114,6 @@ var peerHostConfig = PeerHostOption{
Duration: DefaultTaskExpireTime,
},
StoreStrategy: AdvanceLocalTaskStoreStrategy,
Multiplex: false,
},
}
1 change: 1 addition & 0 deletions client/config/peerhost_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,5 +113,6 @@ var peerHostConfig = PeerHostOption{
Duration: DefaultTaskExpireTime,
},
StoreStrategy: AdvanceLocalTaskStoreStrategy,
Multiplex: false,
},
}
6 changes: 3 additions & 3 deletions client/daemon/peer/peertask_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func newFilePeerTask(ctx context.Context,
span.SetAttributes(config.AttributePeerID.String(request.PeerId))
span.SetAttributes(semconv.HTTPURLKey.String(request.Url))

logger.Infof("request overview, url: %s, filter: %s, meta: %s, biz: %s, peer: %s", request.Url, request.Filter, request.UrlMata, request.BizId, request.PeerId)
logger.Infof("request overview, url: %s, filter: %s, meta: %s, biz: %s, peer: %s", request.Url, request.Filter, request.UrlMeta, request.BizId, request.PeerId)
// trace register
_, regSpan := tracer.Start(ctx, config.SpanRegisterTask)
result, err := schedulerClient.RegisterPeerTask(ctx, request)
Expand Down Expand Up @@ -330,7 +330,7 @@ func (pt *filePeerTask) finish() error {
pt.Warnf("wait progress stopped failed, context done, but progress not stopped")
}
}
pt.Debugf("finished: close done channel")
pt.Debugf("finished: close channel")
close(pt.done)
pt.span.SetAttributes(config.AttributePeerTaskSuccess.Bool(true))
pt.span.End()
Expand Down Expand Up @@ -391,7 +391,7 @@ func (pt *filePeerTask) cleanUnfinished() {
pt.Errorf("peer task fail callback failed: %s", err)
}

pt.Debugf("clean unfinished: close done channel")
pt.Debugf("clean unfinished: close channel")
close(pt.done)
pt.span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false))
pt.span.SetAttributes(config.AttributePeerTaskCode.Int(int(pt.failedCode)))
Expand Down
2 changes: 0 additions & 2 deletions client/daemon/peer/peertask_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ func TestFilePeerTask_BackSource_WithContentLength(t *testing.T) {
Url: "http://localhost/test/data",
Filter: "",
BizId: "d7y-test",
UrlMata: nil,
PeerId: peerID,
PeerHost: &scheduler.PeerHost{},
},
Expand Down Expand Up @@ -214,7 +213,6 @@ func TestFilePeerTask_BackSource_WithoutContentLength(t *testing.T) {
Url: "http://localhost/test/data",
Filter: "",
BizId: "d7y-test",
UrlMata: nil,
PeerId: peerID,
PeerHost: &scheduler.PeerHost{},
},
Expand Down
48 changes: 33 additions & 15 deletions client/daemon/peer/peertask_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"sync"
"time"
Expand Down Expand Up @@ -48,7 +49,7 @@ type TaskManager interface {
// StartStreamPeerTask starts a peer task with stream io
// tiny stands task file is tiny and task is done
StartStreamPeerTask(ctx context.Context, req *scheduler.PeerTaskRequest) (
reader io.Reader, attribute map[string]string, err error)
readCloser io.ReadCloser, attribute map[string]string, err error)

IsPeerTaskRunning(pid string) bool

Expand Down Expand Up @@ -105,6 +106,11 @@ type peerTaskManager struct {
runningPeerTasks sync.Map

perPeerRateLimit rate.Limit

// enableMultiplex indicates reusing completed peer task storage
// currently, only check completed peer task after register to scheduler
// TODO multiplex the running peer task
enableMultiplex bool
}

func NewPeerTaskManager(
Expand All @@ -113,7 +119,8 @@ func NewPeerTaskManager(
storageManager storage.Manager,
schedulerClient schedulerclient.SchedulerClient,
schedulerOption config.SchedulerOption,
perPeerRateLimit rate.Limit) (TaskManager, error) {
perPeerRateLimit rate.Limit,
multiplex bool) (TaskManager, error) {

ptm := &peerTaskManager{
host: host,
Expand All @@ -123,11 +130,18 @@ func NewPeerTaskManager(
schedulerClient: schedulerClient,
schedulerOption: schedulerOption,
perPeerRateLimit: perPeerRateLimit,
enableMultiplex: multiplex,
}
return ptm, nil
}

func (ptm *peerTaskManager) StartFilePeerTask(ctx context.Context, req *FilePeerTaskRequest) (chan *FilePeerTaskProgress, *TinyData, error) {
if ptm.enableMultiplex {
progress, ok := ptm.tryReuseFilePeerTask(ctx, req)
if ok {
return progress, nil, nil
}
}
// TODO ensure scheduler is ok first
start := time.Now()
ctx, pt, tiny, err := newFilePeerTask(ctx, ptm.host, ptm.pieceManager,
Expand Down Expand Up @@ -173,13 +187,19 @@ func (ptm *peerTaskManager) StartFilePeerTask(ctx context.Context, req *FilePeer

ptm.runningPeerTasks.Store(req.PeerId, pt)

// FIXME 1. merge same task id
// FIXME 2. when failed due to schedulerClient error, relocate schedulerClient and retry
// FIXME when failed due to schedulerClient error, relocate schedulerClient and retry
progress, err := pt.Start(ctx)
return progress, nil, err
}

func (ptm *peerTaskManager) StartStreamPeerTask(ctx context.Context, req *scheduler.PeerTaskRequest) (reader io.Reader, attribute map[string]string, err error) {
func (ptm *peerTaskManager) StartStreamPeerTask(ctx context.Context, req *scheduler.PeerTaskRequest) (io.ReadCloser, map[string]string, error) {
if ptm.enableMultiplex {
r, attr, ok := ptm.tryReuseStreamPeerTask(ctx, req)
if ok {
return r, attr, nil
}
}

start := time.Now()
ctx, pt, tiny, err := newStreamPeerTask(ctx, ptm.host, ptm.pieceManager,
req, ptm.schedulerClient, ptm.schedulerOption, ptm.perPeerRateLimit)
Expand All @@ -190,7 +210,7 @@ func (ptm *peerTaskManager) StartStreamPeerTask(ctx context.Context, req *schedu
if tiny != nil {
logger.Infof("copied tasks data %d bytes to buffer", len(tiny.Content))
tiny.span.SetAttributes(config.AttributePeerTaskSuccess.Bool(true))
return bytes.NewBuffer(tiny.Content), map[string]string{
return ioutil.NopCloser(bytes.NewBuffer(tiny.Content)), map[string]string{
headers.ContentLength: fmt.Sprintf("%d", len(tiny.Content)),
}, nil
}
Expand All @@ -204,23 +224,21 @@ func (ptm *peerTaskManager) StartStreamPeerTask(ctx context.Context, req *schedu

ptm.runningPeerTasks.Store(req.PeerId, pt)

// FIXME 1. merge same task id
// FIXME 2. when failed due to schedulerClient error, relocate schedulerClient and retry

reader, attribute, err = pt.Start(ctx)
return reader, attribute, err
// FIXME when failed due to schedulerClient error, relocate schedulerClient and retry
reader, attribute, err := pt.Start(ctx)
return ioutil.NopCloser(reader), attribute, err
}

func (ptm *peerTaskManager) Stop(ctx context.Context) error {
// TODO
return nil
}

func (ptm *peerTaskManager) PeerTaskDone(pid string) {
ptm.runningPeerTasks.Delete(pid)
func (ptm *peerTaskManager) PeerTaskDone(peerID string) {
ptm.runningPeerTasks.Delete(peerID)
}

func (ptm *peerTaskManager) IsPeerTaskRunning(pid string) bool {
_, ok := ptm.runningPeerTasks.Load(pid)
func (ptm *peerTaskManager) IsPeerTaskRunning(peerID string) bool {
_, ok := ptm.runningPeerTasks.Load(peerID)
return ok
}
2 changes: 0 additions & 2 deletions client/daemon/peer/peertask_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ func TestPeerTaskManager_StartFilePeerTask(t *testing.T) {
Url: "http://localhost/test/data",
Filter: "",
BizId: "d7y-test",
UrlMata: nil,
PeerId: peerID,
PeerHost: &scheduler.PeerHost{},
},
Expand Down Expand Up @@ -281,7 +280,6 @@ func TestPeerTaskManager_StartStreamPeerTask(t *testing.T) {
Url: "http://localhost/test/data",
Filter: "",
BizId: "d7y-test",
UrlMata: nil,
PeerId: peerID,
PeerHost: &scheduler.PeerHost{},
})
Expand Down
138 changes: 138 additions & 0 deletions client/daemon/peer/peertask_reuse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright 2020 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package peer

import (
"context"
"fmt"
"io"
"time"

"d7y.io/dragonfly/v2/client/config"
"d7y.io/dragonfly/v2/client/daemon/storage"
"d7y.io/dragonfly/v2/pkg/dfcodes"
logger "d7y.io/dragonfly/v2/pkg/dflog"
"d7y.io/dragonfly/v2/pkg/idgen"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
"github.com/go-http-utils/headers"
"go.opentelemetry.io/otel/semconv"
"go.opentelemetry.io/otel/trace"
)

func (ptm *peerTaskManager) tryReuseFilePeerTask(ctx context.Context,
request *FilePeerTaskRequest) (chan *FilePeerTaskProgress, bool) {
taskID := idgen.TaskID(request.Url, request.Filter, request.UrlMeta, request.BizId)
reuse := ptm.storageManager.FindCompletedTask(taskID)
if reuse == nil {
return nil, false
}

log := logger.With("peer", request.PeerId, "task", taskID, "component", "reuseFilePeerTask")

_, span := tracer.Start(ctx, config.SpanReusePeerTask, trace.WithSpanKind(trace.SpanKindClient))
span.SetAttributes(config.AttributePeerHost.String(ptm.host.Uuid))
span.SetAttributes(semconv.NetHostIPKey.String(ptm.host.Ip))
span.SetAttributes(config.AttributeTaskID.String(taskID))
span.SetAttributes(config.AttributePeerID.String(request.PeerId))
span.SetAttributes(config.AttributeReusePeerID.String(reuse.PeerID))
span.SetAttributes(semconv.HTTPURLKey.String(request.Url))
defer span.End()

log.Infof("reuse from peer task: %s, size: %d", reuse.PeerID, reuse.ContentLength)
span.AddEvent("reuse peer task", trace.WithAttributes(config.AttributePeerID.String(reuse.PeerID)))

start := time.Now()
err := ptm.storageManager.Store(
context.Background(),
&storage.StoreRequest{
CommonTaskRequest: storage.CommonTaskRequest{
PeerID: reuse.PeerID,
TaskID: taskID,
Destination: request.Output,
},
MetadataOnly: false,
StoreOnly: true,
TotalPieces: reuse.TotalPieces,
})
if err != nil {
log.Errorf("store error when reuse peer task: %s", err)
span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false))
span.RecordError(err)
return nil, false
}
var cost = time.Now().Sub(start).Milliseconds()
log.Infof("reuse file peer task done, cost: %dms", cost)

pg := &FilePeerTaskProgress{
State: &ProgressState{
Success: true,
Code: dfcodes.Success,
Msg: "Success",
},
TaskID: taskID,
PeerID: request.PeerId,
ContentLength: reuse.ContentLength,
CompletedLength: reuse.ContentLength,
PeerTaskDone: true,
DoneCallback: func() {},
}

// make a new buffered channel, because we did not need to call newFilePeerTask
progressCh := make(chan *FilePeerTaskProgress, 1)
progressCh <- pg

span.SetAttributes(config.AttributePeerTaskSuccess.Bool(true))
return progressCh, true
}

func (ptm *peerTaskManager) tryReuseStreamPeerTask(ctx context.Context,
request *scheduler.PeerTaskRequest) (io.ReadCloser, map[string]string, bool) {
taskID := idgen.TaskID(request.Url, request.Filter, request.UrlMeta, request.BizId)
reuse := ptm.storageManager.FindCompletedTask(taskID)
if reuse == nil {
return nil, nil, false
}

log := logger.With("peer", request.PeerId, "task", taskID, "component", "reuseStreamPeerTask")
log.Infof("reuse from peer task: %s, size: %d", reuse.PeerID, reuse.ContentLength)

ctx, span := tracer.Start(ctx, config.SpanStreamPeerTask, trace.WithSpanKind(trace.SpanKindClient))
span.SetAttributes(config.AttributePeerHost.String(ptm.host.Uuid))
span.SetAttributes(semconv.NetHostIPKey.String(ptm.host.Ip))
span.SetAttributes(config.AttributeTaskID.String(taskID))
span.SetAttributes(config.AttributePeerID.String(request.PeerId))
span.SetAttributes(config.AttributeReusePeerID.String(reuse.PeerID))
span.SetAttributes(semconv.HTTPURLKey.String(request.Url))
defer span.End()

rc, err := ptm.storageManager.ReadAllPieces(ctx, &reuse.PeerTaskMetaData)
if err != nil {
log.Errorf("read all pieces error when reuse peer task: %s", err)
span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false))
span.RecordError(err)
return nil, nil, false
}

attr := map[string]string{}
attr[headers.ContentLength] = fmt.Sprintf("%d", reuse.ContentLength)
attr[config.HeaderDragonflyTask] = taskID
attr[config.HeaderDragonflyPeer] = request.PeerId

// TODO record time when file closed, need add a type to implement Close and WriteTo
span.SetAttributes(config.AttributePeerTaskSuccess.Bool(true))
return rc, attr, true
}
6 changes: 1 addition & 5 deletions client/daemon/peer/peertask_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func newStreamPeerTask(ctx context.Context,
span.SetAttributes(config.AttributePeerID.String(request.PeerId))
span.SetAttributes(semconv.HTTPURLKey.String(request.Url))

logger.Debugf("request overview, url: %s, filter: %s, meta: %s, biz: %s", request.Url, request.Filter, request.UrlMata, request.BizId)
logger.Debugf("request overview, url: %s, filter: %s, meta: %s, biz: %s", request.Url, request.Filter, request.UrlMeta, request.BizId)
// trace register
_, regSpan := tracer.Start(ctx, config.SpanRegisterTask)
result, err := schedulerClient.RegisterPeerTask(ctx, request)
Expand Down Expand Up @@ -348,10 +348,6 @@ func (s *streamPeerTask) Start(ctx context.Context) (io.Reader, map[string]strin
}
case cur = <-s.successPieceCh:
continue
//if !ok {
// s.Warnf("successPieceCh closed")
// continue
//}
}
}
}(firstPiece)
Expand Down
Loading

0 comments on commit c5acdbf

Please sign in to comment.