Skip to content

Commit

Permalink
Integrated code lifecycle: Fix an issue with concurrent build queue a…
Browse files Browse the repository at this point in the history
…ccess (#9876)
  • Loading branch information
BBesrour authored Nov 27, 2024
1 parent 83ac531 commit 4eaba4e
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,9 @@ private BuildAgentInformation getUpdatedLocalBuildAgentInformation(BuildJobQueue
}

private List<BuildJobQueueItem> getProcessingJobsOfNode(String memberAddress) {
return processingJobs.values().stream().filter(job -> Objects.equals(job.buildAgent().memberAddress(), memberAddress)).toList();
// NOTE: we should not use streams with IMap, because it can be unstable, when many items are added at the same time and there is a slow network condition
List<BuildJobQueueItem> processingJobsList = new ArrayList<>(processingJobs.values());
return processingJobsList.stream().filter(job -> Objects.equals(job.buildAgent().memberAddress(), memberAddress)).toList();
}

private void removeOfflineNodes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,36 +108,45 @@ public void pushDockerImageCleanupInfo() {
}
}

/**
* @return a copy of the queued build jobs as ArrayList
*/
public List<BuildJobQueueItem> getQueuedJobs() {
return queue.stream().toList();
// NOTE: we should not use streams with IQueue directly, because it can be unstable, when many items are added at the same time and there is a slow network condition
return new ArrayList<>(queue);
}

/**
* @return a copy of the processing jobs as ArrayList
*/
public List<BuildJobQueueItem> getProcessingJobs() {
return processingJobs.values().stream().toList();
// NOTE: we should not use streams with IMap, because it can be unstable, when many items are added at the same time and there is a slow network condition
return new ArrayList<>(processingJobs.values());
}

public List<BuildJobQueueItem> getQueuedJobsForCourse(long courseId) {
return queue.stream().filter(job -> job.courseId() == courseId).toList();
return getQueuedJobs().stream().filter(job -> job.courseId() == courseId).toList();
}

public List<BuildJobQueueItem> getProcessingJobsForCourse(long courseId) {
return processingJobs.values().stream().filter(job -> job.courseId() == courseId).toList();
return getProcessingJobs().stream().filter(job -> job.courseId() == courseId).toList();
}

public List<BuildJobQueueItem> getQueuedJobsForParticipation(long participationId) {
return queue.stream().filter(job -> job.participationId() == participationId).toList();
return getQueuedJobs().stream().filter(job -> job.participationId() == participationId).toList();
}

public List<BuildJobQueueItem> getProcessingJobsForParticipation(long participationId) {
return processingJobs.values().stream().filter(job -> job.participationId() == participationId).toList();
return getProcessingJobs().stream().filter(job -> job.participationId() == participationId).toList();
}

public List<BuildAgentInformation> getBuildAgentInformation() {
return buildAgentInformation.values().stream().toList();
// NOTE: we should not use streams with IMap, because it can be unstable, when many items are added at the same time and there is a slow network condition
return new ArrayList<>(buildAgentInformation.values());
}

public List<BuildAgentInformation> getBuildAgentInformationWithoutRecentBuildJobs() {
return buildAgentInformation.values().stream().map(agent -> new BuildAgentInformation(agent.buildAgent(), agent.maxNumberOfConcurrentBuildJobs(),
return getBuildAgentInformation().stream().map(agent -> new BuildAgentInformation(agent.buildAgent(), agent.maxNumberOfConcurrentBuildJobs(),
agent.numberOfCurrentBuildJobs(), agent.runningBuildJobs(), agent.status(), null, null)).toList();
}

Expand All @@ -156,9 +165,10 @@ public void resumeBuildAgent(String agent) {
*/
public void cancelBuildJob(String buildJobId) {
// Remove build job if it is queued
if (queue.stream().anyMatch(job -> Objects.equals(job.id(), buildJobId))) {
List<BuildJobQueueItem> queuedJobs = getQueuedJobs();
if (queuedJobs.stream().anyMatch(job -> Objects.equals(job.id(), buildJobId))) {
List<BuildJobQueueItem> toRemove = new ArrayList<>();
for (BuildJobQueueItem job : queue) {
for (BuildJobQueueItem job : queuedJobs) {
if (Objects.equals(job.id(), buildJobId)) {
toRemove.add(job);
}
Expand Down Expand Up @@ -197,7 +207,8 @@ public void cancelAllQueuedBuildJobs() {
* Cancel all running build jobs.
*/
public void cancelAllRunningBuildJobs() {
for (BuildJobQueueItem buildJob : processingJobs.values()) {
List<BuildJobQueueItem> runningJobs = getProcessingJobs();
for (BuildJobQueueItem buildJob : runningJobs) {
cancelBuildJob(buildJob.id());
}
}
Expand All @@ -208,7 +219,7 @@ public void cancelAllRunningBuildJobs() {
* @param agentName name of the agent
*/
public void cancelAllRunningBuildJobsForAgent(String agentName) {
processingJobs.values().stream().filter(job -> Objects.equals(job.buildAgent().name(), agentName)).forEach(job -> cancelBuildJob(job.id()));
getProcessingJobs().stream().filter(job -> Objects.equals(job.buildAgent().name(), agentName)).forEach(job -> cancelBuildJob(job.id()));
}

/**
Expand All @@ -217,8 +228,9 @@ public void cancelAllRunningBuildJobsForAgent(String agentName) {
* @param courseId id of the course
*/
public void cancelAllQueuedBuildJobsForCourse(long courseId) {
List<BuildJobQueueItem> queuedJobs = getQueuedJobs();
List<BuildJobQueueItem> toRemove = new ArrayList<>();
for (BuildJobQueueItem job : queue) {
for (BuildJobQueueItem job : queuedJobs) {
if (job.courseId() == courseId) {
toRemove.add(job);
}
Expand All @@ -232,7 +244,8 @@ public void cancelAllQueuedBuildJobsForCourse(long courseId) {
* @param courseId id of the course
*/
public void cancelAllRunningBuildJobsForCourse(long courseId) {
for (BuildJobQueueItem buildJob : processingJobs.values()) {
List<BuildJobQueueItem> runningJobs = getProcessingJobs();
for (BuildJobQueueItem buildJob : runningJobs) {
if (buildJob.courseId() == courseId) {
cancelBuildJob(buildJob.id());
}
Expand All @@ -246,14 +259,16 @@ public void cancelAllRunningBuildJobsForCourse(long courseId) {
*/
public void cancelAllJobsForParticipation(long participationId) {
List<BuildJobQueueItem> toRemove = new ArrayList<>();
for (BuildJobQueueItem queuedJob : queue) {
List<BuildJobQueueItem> queuedJobs = getQueuedJobs();
for (BuildJobQueueItem queuedJob : queuedJobs) {
if (queuedJob.participationId() == participationId) {
toRemove.add(queuedJob);
}
}
queue.removeAll(toRemove);

for (BuildJobQueueItem runningJob : processingJobs.values()) {
List<BuildJobQueueItem> runningJobs = getProcessingJobs();
for (BuildJobQueueItem runningJob : runningJobs) {
if (runningJob.participationId() == participationId) {
cancelBuildJob(runningJob.id());
}
Expand Down

0 comments on commit 4eaba4e

Please sign in to comment.