Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removed some dead code #2870

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 1 addition & 31 deletions jobsAdmin/JobsAdmin.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ var JobsAdmin interface {
/*ScheduleTransfer(jptm IJobPartTransferMgr)*/
ResurrectJob(jobId common.JobID, sourceSAS string, destinationSAS string, srcServiceClient *common.ServiceClient, dstServiceClient *common.ServiceClient, srcIsOAuth bool) bool

ResurrectJobParts()

// AppPathFolder returns the Azcopy application path folder.
// JobPartPlanFile will be created inside this folder.
AppPathFolder() string
Expand Down Expand Up @@ -401,7 +399,7 @@ func (ja *jobsAdmin) ResurrectJob(jobId common.JobID,
ScheduleTransfers: false,
CompletionChan: nil,
}
jm.AddJobPart2(args)
jm.AddJobPart(args)
}

jm, _ := ja.JobMgr(jobId)
Expand All @@ -411,34 +409,6 @@ func (ja *jobsAdmin) ResurrectJob(jobId common.JobID,
return true
}

// reconstructTheExistingJobParts reconstructs the in memory JobPartPlanInfo for existing memory map JobFile
func (ja *jobsAdmin) ResurrectJobParts() {
// Get all the Job part plan files in the plan directory
files := func(ext string) []os.FileInfo {
var files []os.FileInfo
_ = filepath.Walk(ja.planDir, func(path string, fileInfo os.FileInfo, _ error) error {
if !fileInfo.IsDir() && fileInfo.Size() != 0 && strings.HasSuffix(fileInfo.Name(), ext) {
files = append(files, fileInfo)
}
return nil
})
return files
}(fmt.Sprintf(".steV%d", ste.DataSchemaVersion))

// TODO : sort the file.
for f := 0; f < len(files); f++ {
planFile := ste.JobPartPlanFileName(files[f].Name())
jobID, partNum, err := planFile.Parse()
if err != nil {
continue
}
mmf := planFile.Map()
//todo : call the compute transfer function here for each job.
jm := ja.JobMgrEnsureExists(jobID, mmf.Plan().LogLevel, "")
jm.AddJobPart(partNum, planFile, mmf, EMPTY_SAS_STRING, EMPTY_SAS_STRING, false, nil)
}
}

func (ja *jobsAdmin) ListJobs(givenStatus common.JobStatus) common.ListJobsResponse {
ret := common.ListJobsResponse{JobIDDetails: []common.JobIDDetails{}}
files := func(ext string) []os.FileInfo {
Expand Down
6 changes: 2 additions & 4 deletions jobsAdmin/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ func ToFixed(num float64, precision int) float64 {
func MainSTE(concurrency ste.ConcurrencySettings, targetRateInMegaBitsPerSec float64, azcopyJobPlanFolder, azcopyLogPathFolder string, providePerfAdvice bool) error {
// Initialize the JobsAdmin, resurrect Job plan files
initJobsAdmin(steCtx, concurrency, targetRateInMegaBitsPerSec, azcopyJobPlanFolder, azcopyLogPathFolder, providePerfAdvice)
// No need to read the existing JobPartPlan files since Azcopy is running in process
// JobsAdmin.ResurrectJobParts()
// TODO: We may want to list listen first and terminate if there is already an instance listening

// if we've a custom mime map
Expand Down Expand Up @@ -188,10 +186,10 @@ func ExecuteNewCopyJobPartOrder(order common.CopyJobPartOrderRequest) common.Cop
ExistingPlanMMF: nil,
SrcClient: order.SrcServiceClient,
DstClient: order.DstServiceClient,
SrcIsOAuth: order.S2SSourceCredentialType.IsAzureOAuth(),
SrcIsOAuth: order.S2SSourceCredentialType.IsAzureOAuth(),
ScheduleTransfers: true,
}
jm.AddJobPart2(args)
jm.AddJobPart(args)

// Update jobPart Status with the status Manager
jm.SendJobPartCreatedMsg(ste.JobPartCreatedMsg{TotalTransfers: uint32(len(order.Transfers.List)),
Expand Down
53 changes: 2 additions & 51 deletions ste/mgr-JobMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ type IJobMgr interface {
JobPartMgr(partNum PartNumber) (IJobPartMgr, bool)
// Throughput() XferThroughput
// If existingPlanMMF is nil, a new MMF is opened.
AddJobPart(partNum PartNumber, planFile JobPartPlanFileName, existingPlanMMF *JobPartPlanMMF, sourceSAS string,
destinationSAS string, scheduleTransfers bool, completionChan chan struct{}) IJobPartMgr
AddJobPart2(args *AddJobPartArgs) IJobPartMgr
AddJobPart(args *AddJobPartArgs) IJobPartMgr

SetIncludeExclude(map[string]int, map[string]int)
IncludeExclude() (map[string]int, map[string]int)
Expand Down Expand Up @@ -435,7 +433,7 @@ type AddJobPartArgs struct {
}

// initializeJobPartPlanInfo func initializes the JobPartPlanInfo handler for given JobPartOrder
func (jm *jobMgr) AddJobPart2(args *AddJobPartArgs) IJobPartMgr {
func (jm *jobMgr) AddJobPart(args *AddJobPartArgs) IJobPartMgr {
jpm := &jobPartMgr{
jobMgr: jm,
filename: args.PlanFile,
Expand Down Expand Up @@ -485,53 +483,6 @@ func (jm *jobMgr) AddJobPart2(args *AddJobPartArgs) IJobPartMgr {
return jpm
}

// initializeJobPartPlanInfo func initializes the JobPartPlanInfo handler for given JobPartOrder
func (jm *jobMgr) AddJobPart(partNum PartNumber, planFile JobPartPlanFileName, existingPlanMMF *JobPartPlanMMF, sourceSAS string,
destinationSAS string, scheduleTransfers bool, completionChan chan struct{}) IJobPartMgr {
jpm := &jobPartMgr{jobMgr: jm, filename: planFile, sourceSAS: sourceSAS,
destinationSAS: destinationSAS, pacer: jm.pacer,
slicePool: jm.slicePool,
cacheLimiter: jm.cacheLimiter,
fileCountLimiter: jm.fileCountLimiter,
closeOnCompletion: completionChan,
}
// If an existing plan MMF was supplied, re use it. Otherwise, init a new one.
if existingPlanMMF == nil {
jpm.planMMF = jpm.filename.Map()
} else {
jpm.planMMF = existingPlanMMF
}

jm.jobPartMgrs.Set(partNum, jpm)
jm.setFinalPartOrdered(partNum, jpm.planMMF.Plan().IsFinalPart)
jm.setDirection(jpm.Plan().FromTo)

jm.initMu.Lock()
defer jm.initMu.Unlock()
if jm.initState == nil {
var logger common.ILogger = jm
jm.initState = &jobMgrInitState{
securityInfoPersistenceManager: newSecurityInfoPersistenceManager(jm.ctx),
folderCreationTracker: NewFolderCreationTracker(jpm.Plan().Fpo, jpm.Plan()),
folderDeletionManager: common.NewFolderDeletionManager(jm.ctx, jpm.Plan().Fpo, logger),
exclusiveDestinationMapHolder: &atomic.Value{},
}
jm.initState.exclusiveDestinationMapHolder.Store(common.NewExclusiveStringMap(jpm.Plan().FromTo, runtime.GOOS))
}
jpm.jobMgrInitState = jm.initState // so jpm can use it as much as desired without locking (since the only mutation is the init in jobManager. As far as jobPartManager is concerned, the init state is read-only
jpm.exclusiveDestinationMap = jm.getExclusiveDestinationMap(partNum, jpm.Plan().FromTo)

if scheduleTransfers {
// If the schedule transfer is set to true
// Instead of the scheduling the Transfer for given JobPart
// JobPart is put into the partChannel
// from where it is picked up and scheduled
// jpm.ScheduleTransfers(jm.ctx, make(map[string]int), make(map[string]int))
jm.QueueJobParts(jpm)
}
return jpm
}

func (jm *jobMgr) AddJobOrder(order common.CopyJobPartOrderRequest) IJobPartMgr {
jppfn := JobPartPlanFileName(fmt.Sprintf(JobPartPlanFileNameFormat, order.JobID.String(), 0, DataSchemaVersion))
jppfn.Create(order) // Convert the order to a plan file
Expand Down
Loading