Skip to content

Commit

Permalink
Extract storage driver into peer tasks (dragonflyoss#998)
Browse files Browse the repository at this point in the history
* chore: extract storage instead load every time

Signed-off-by: Jim Ma <[email protected]>

* fix: test

Signed-off-by: Jim Ma <[email protected]>

* fix: gofmt

Signed-off-by: Jim Ma <[email protected]>
  • Loading branch information
jim3ma authored Jan 19, 2022
1 parent ac7e8dd commit 5083963
Show file tree
Hide file tree
Showing 13 changed files with 104 additions and 71 deletions.
23 changes: 16 additions & 7 deletions client/daemon/peer/peertask_conductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ type peerTaskConductor struct {
storageManager storage.Manager
peerTaskManager *peerTaskManager

storage storage.TaskStorageDriver

// schedule options
schedulerOption config.SchedulerOption
schedulerClient schedulerclient.SchedulerClient
Expand Down Expand Up @@ -307,6 +309,10 @@ func (pt *peerTaskConductor) GetTaskID() string {
return pt.taskID
}

func (pt *peerTaskConductor) GetStorage() storage.TaskStorageDriver {
return pt.storage
}

func (pt *peerTaskConductor) GetContentLength() int64 {
return pt.contentLength.Load()
}
Expand Down Expand Up @@ -401,7 +407,8 @@ func (pt *peerTaskConductor) storeTinyPeerTask() {
pt.SetContentLength(l)
pt.SetTotalPieces(1)
ctx := pt.ctx
err := pt.peerTaskManager.storageManager.RegisterTask(ctx,
var err error
pt.storage, err = pt.peerTaskManager.storageManager.RegisterTask(ctx,
storage.RegisterTaskRequest{
CommonTaskRequest: storage.CommonTaskRequest{
PeerID: pt.tinyData.PeerID,
Expand All @@ -416,7 +423,7 @@ func (pt *peerTaskConductor) storeTinyPeerTask() {
pt.cancel(base.Code_ClientError, err.Error())
return
}
n, err := pt.peerTaskManager.storageManager.WritePiece(ctx,
n, err := pt.storage.WritePiece(ctx,
&storage.WritePieceRequest{
PeerTaskMetadata: storage.PeerTaskMetadata{
PeerID: pt.tinyData.PeerID,
Expand Down Expand Up @@ -640,6 +647,7 @@ func (pt *peerTaskConductor) pullSinglePiece() {
}

request := &DownloadPieceRequest{
storage: pt.storage,
piece: pt.singlePiece.PieceInfo,
log: pt.Log(),
TaskID: pt.GetTaskID(),
Expand Down Expand Up @@ -870,6 +878,7 @@ func (pt *peerTaskConductor) dispatchPieceRequest(pieceRequestCh chan *DownloadP
pt.requestedPieces.Set(piece.PieceNum)
}
req := &DownloadPieceRequest{
storage: pt.storage,
piece: piece,
log: pt.Log(),
TaskID: pt.GetTaskID(),
Expand Down Expand Up @@ -1080,9 +1089,9 @@ func (pt *peerTaskConductor) reportFailResult(request *DownloadPieceRequest, res
span.End()
}

func (pt *peerTaskConductor) InitStorage() error {
func (pt *peerTaskConductor) InitStorage() (err error) {
// prepare storage
err := pt.storageManager.RegisterTask(pt.ctx,
pt.storage, err = pt.storageManager.RegisterTask(pt.ctx,
storage.RegisterTaskRequest{
CommonTaskRequest: storage.CommonTaskRequest{
PeerID: pt.GetPeerID(),
Expand All @@ -1100,7 +1109,7 @@ func (pt *peerTaskConductor) InitStorage() error {

func (pt *peerTaskConductor) UpdateStorage() error {
// update storage
err := pt.storageManager.UpdateTask(pt.ctx,
err := pt.storage.UpdateTask(pt.ctx,
&storage.UpdateTaskRequest{
PeerTaskMetadata: storage.PeerTaskMetadata{
PeerID: pt.GetPeerID(),
Expand Down Expand Up @@ -1249,7 +1258,7 @@ func (pt *peerTaskConductor) fail() {

// Validate stores metadata and validates digest
func (pt *peerTaskConductor) Validate() error {
err := pt.peerTaskManager.storageManager.Store(pt.ctx,
err := pt.storage.Store(pt.ctx,
&storage.StoreRequest{
CommonTaskRequest: storage.CommonTaskRequest{
PeerID: pt.peerID,
Expand All @@ -1266,7 +1275,7 @@ func (pt *peerTaskConductor) Validate() error {
if !pt.peerTaskManager.calculateDigest {
return nil
}
err = pt.storageManager.ValidateDigest(
err = pt.storage.ValidateDigest(
&storage.PeerTaskMetadata{
PeerID: pt.GetPeerID(),
TaskID: pt.GetTaskID(),
Expand Down
2 changes: 2 additions & 0 deletions client/daemon/peer/peertask_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type Task interface {
Context() context.Context
Log() *logger.SugaredLoggerOnWith

GetStorage() storage.TaskStorageDriver

GetPeerID() string
GetTaskID() string

Expand Down
15 changes: 15 additions & 0 deletions client/daemon/peer/peertask_manager_mock_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion client/daemon/peer/peertask_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,6 @@ func setupMockManager(ctrl *gomock.Controller, ts *testSpec, opt componentsOptio
runningPeerTasks: sync.Map{},
pieceManager: &pieceManager{
calculateDigest: true,
storageManager: storageManager,
pieceDownloader: opt.pieceDownloader,
computePieceSize: func(contentLength int64) uint32 {
return opt.pieceSize
Expand Down
3 changes: 2 additions & 1 deletion client/daemon/peer/peertask_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (s *streamTask) Start(ctx context.Context) (io.ReadCloser, map[string]strin
}

func (s *streamTask) writeOnePiece(w io.Writer, pieceNum int32) (int64, error) {
pr, pc, err := s.peerTaskConductor.pieceManager.ReadPiece(s.ctx, &storage.ReadPieceRequest{
pr, pc, err := s.peerTaskConductor.storage.ReadPiece(s.ctx, &storage.ReadPieceRequest{
PeerTaskMetadata: storage.PeerTaskMetadata{
PeerID: s.peerTaskConductor.peerID,
TaskID: s.peerTaskConductor.taskID,
Expand Down Expand Up @@ -198,6 +198,7 @@ func (s *streamTask) writeToPipe(firstPiece *pieceInfo, pw *io.PipeWriter) {
}
return
case cur = <-s.pieceCh:
// FIXME check missing piece for non-block broker channel
continue
case <-s.peerTaskConductor.failCh:
ptError := fmt.Errorf("context done due to peer task fail: %d/%s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,6 @@ func TestStreamPeerTask_BackSource_Partial_WithContentLength(t *testing.T) {

pm := &pieceManager{
calculateDigest: true,
storageManager: storageManager,
pieceDownloader: downloader,
computePieceSize: func(contentLength int64) uint32 {
return uint32(pieceSize)
Expand Down
3 changes: 3 additions & 0 deletions client/daemon/peer/piece_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"strings"
"time"

"d7y.io/dragonfly/v2/client/daemon/storage"
"d7y.io/dragonfly/v2/client/daemon/upload"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/rpc/base"
Expand All @@ -34,6 +35,7 @@ import (
type DownloadPieceRequest struct {
piece *base.PieceInfo
log *logger.SugaredLoggerOnWith
storage storage.TaskStorageDriver
TaskID string
PeerID string
DstPid string
Expand All @@ -50,6 +52,7 @@ type DownloadPieceResult struct {
FinishTime int64
}

//go:generate mockgen -source piece_downloader.go -destination ../test/mock/peer/piece_downloader.go
type PieceDownloader interface {
DownloadPiece(context.Context, *DownloadPieceRequest) (io.Reader, io.Closer, error)
}
Expand Down
17 changes: 5 additions & 12 deletions client/daemon/peer/piece_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,10 @@ import (
type PieceManager interface {
DownloadSource(ctx context.Context, pt Task, request *scheduler.PeerTaskRequest) error
DownloadPiece(ctx context.Context, request *DownloadPieceRequest) (*DownloadPieceResult, error)
ReadPiece(ctx context.Context, req *storage.ReadPieceRequest) (io.Reader, io.Closer, error)
}

type pieceManager struct {
*rate.Limiter
storageManager storage.TaskStorageDriver
pieceDownloader PieceDownloader
computePieceSize func(contentLength int64) uint32

Expand All @@ -57,7 +55,6 @@ var _ PieceManager = (*pieceManager)(nil)

func NewPieceManager(s storage.TaskStorageDriver, pieceDownloadTimeout time.Duration, opts ...func(*pieceManager)) (PieceManager, error) {
pm := &pieceManager{
storageManager: s,
computePieceSize: util.ComputePieceSize,
calculateDigest: true,
}
Expand Down Expand Up @@ -169,7 +166,7 @@ func (pm *pieceManager) DownloadPiece(ctx context.Context, request *DownloadPiec
},
}

result.Size, err = pm.storageManager.WritePiece(ctx, writePieceRequest)
result.Size, err = request.storage.WritePiece(ctx, writePieceRequest)
result.FinishTime = time.Now().UnixNano()

span.RecordError(err)
Expand All @@ -181,10 +178,6 @@ func (pm *pieceManager) DownloadPiece(ctx context.Context, request *DownloadPiec
return result, nil
}

func (pm *pieceManager) ReadPiece(ctx context.Context, req *storage.ReadPieceRequest) (io.Reader, io.Closer, error) {
return pm.storageManager.ReadPiece(ctx, req)
}

func (pm *pieceManager) processPieceFromSource(pt Task,
reader io.Reader, contentLength int64, pieceNum int32, pieceOffset uint64, pieceSize uint32, isLastPiece func(n int64) (int32, bool)) (
result *DownloadPieceResult, md5 string, err error) {
Expand All @@ -210,7 +203,7 @@ func (pm *pieceManager) processPieceFromSource(pt Task,
reader = digestutils.NewDigestReader(pt.Log(), reader)
}
var n int64
result.Size, err = pm.storageManager.WritePiece(
result.Size, err = pt.GetStorage().WritePiece(
pt.Context(),
&storage.WritePieceRequest{
UnknownLength: unknownLength,
Expand Down Expand Up @@ -270,7 +263,7 @@ func (pm *pieceManager) DownloadSource(ctx context.Context, pt Task, request *sc
if contentLength < 0 {
log.Warnf("can not get content length for %s", request.Url)
} else {
err = pm.storageManager.UpdateTask(ctx,
err = pt.GetStorage().UpdateTask(ctx,
&storage.UpdateTaskRequest{
PeerTaskMetadata: storage.PeerTaskMetadata{
PeerID: pt.GetPeerID(),
Expand Down Expand Up @@ -356,7 +349,7 @@ func (pm *pieceManager) downloadKnownLengthSource(ctx context.Context, pt Task,

if pieceNum == maxPieceNum-1 {
// last piece
err = pm.storageManager.UpdateTask(ctx,
err = pt.GetStorage().UpdateTask(ctx,
&storage.UpdateTaskRequest{
PeerTaskMetadata: storage.PeerTaskMetadata{
PeerID: pt.GetPeerID(),
Expand Down Expand Up @@ -430,7 +423,7 @@ func (pm *pieceManager) downloadUnknownLengthSource(ctx context.Context, pt Task
// last piece, piece size maybe 0
contentLength = int64(pieceSize)*int64(pieceNum) + result.Size
pt.SetTotalPieces(int32(math.Ceil(float64(contentLength) / float64(pieceSize))))
err = pm.storageManager.UpdateTask(ctx,
err = pt.GetStorage().UpdateTask(ctx,
&storage.UpdateTaskRequest{
PeerTaskMetadata: storage.PeerTaskMetadata{
PeerID: pt.GetPeerID(),
Expand Down
11 changes: 9 additions & 2 deletions client/daemon/peer/piece_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,10 @@ func TestPieceManager_DownloadSource(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
/********** prepare test start **********/
mockPeerTask := NewMockTask(ctrl)
var totalPieces = &atomic.Int32{}
var (
totalPieces = &atomic.Int32{}
taskStorage storage.TaskStorageDriver
)
mockPeerTask.EXPECT().SetContentLength(gomock.Any()).AnyTimes().DoAndReturn(
func(arg0 int64) error {
return nil
Expand All @@ -151,6 +154,10 @@ func TestPieceManager_DownloadSource(t *testing.T) {
func() string {
return taskID
})
mockPeerTask.EXPECT().GetStorage().AnyTimes().DoAndReturn(
func() storage.TaskStorageDriver {
return taskStorage
})
mockPeerTask.EXPECT().AddTraffic(gomock.Any()).AnyTimes().DoAndReturn(func(int642 uint64) {})
mockPeerTask.EXPECT().ReportPieceResult(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
func(*DownloadPieceRequest, *DownloadPieceResult, error) {
Expand All @@ -166,7 +173,7 @@ func TestPieceManager_DownloadSource(t *testing.T) {
mockPeerTask.EXPECT().Log().AnyTimes().DoAndReturn(func() *logger.SugaredLoggerOnWith {
return logger.With("test case", tc.name)
})
err = storageManager.RegisterTask(context.Background(),
taskStorage, err = storageManager.RegisterTask(context.Background(),
storage.RegisterTaskRequest{
CommonTaskRequest: storage.CommonTaskRequest{
PeerID: mockPeerTask.GetPeerID(),
Expand Down
4 changes: 2 additions & 2 deletions client/daemon/storage/local_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestLocalTaskStore_PutAndGetPiece_Simple(t *testing.T) {

var s = sm.(*storageManager)

err = s.CreateTask(
_, err = s.CreateTask(
RegisterTaskRequest{
CommonTaskRequest: CommonTaskRequest{
PeerID: peerID,
Expand Down Expand Up @@ -243,7 +243,7 @@ func TestLocalTaskStore_PutAndGetPiece_Advance(t *testing.T) {

var s = sm.(*storageManager)

err = s.CreateTask(
_, err = s.CreateTask(
RegisterTaskRequest{
CommonTaskRequest: CommonTaskRequest{
PeerID: peerID,
Expand Down
Loading

0 comments on commit 5083963

Please sign in to comment.