Skip to content

Commit

Permalink
feat: 增加配置,当远程项目和仓库不存在时是否自动创建;一次性任务支持分发仓库 #1105
Browse files Browse the repository at this point in the history
  • Loading branch information
zacYL committed Aug 29, 2023
1 parent 2cfe4c4 commit d357d02
Show file tree
Hide file tree
Showing 9 changed files with 331 additions and 179 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

package com.tencent.bkrepo.replication.pojo.remote.request

import com.tencent.bkrepo.replication.pojo.request.ReplicaObjectType
import com.tencent.bkrepo.replication.pojo.request.ReplicaType
import com.tencent.bkrepo.replication.pojo.task.objects.PackageConstraint
import com.tencent.bkrepo.replication.pojo.task.objects.PathConstraint
Expand Down Expand Up @@ -61,6 +62,10 @@ data class RemoteConfigCreateRequest(
val remoteProjectId: String? = null,
@ApiModelProperty("远程仓库")
val remoteRepoName: String? = null,
@ApiModelProperty("当远程项目或者仓库不存在时是否进行创建, 只支持clusterId存在的情况下", required = false)
val createProjectOrRepo: Boolean = false,
@ApiModelProperty("同步对象类型, 只支持clusterId存在的情况", required = true)
val replicaObjectType: ReplicaObjectType? = null,
@ApiModelProperty("包限制条件", required = false)
val packageConstraints: List<PackageConstraint>? = null,
@ApiModelProperty("路径限制条件", required = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

package com.tencent.bkrepo.replication.pojo.remote.request

import com.tencent.bkrepo.replication.pojo.request.ReplicaObjectType
import com.tencent.bkrepo.replication.pojo.request.ReplicaType
import com.tencent.bkrepo.replication.pojo.task.objects.PackageConstraint
import com.tencent.bkrepo.replication.pojo.task.objects.PathConstraint
Expand Down Expand Up @@ -59,6 +60,10 @@ data class RemoteConfigUpdateRequest(
val remoteProjectId: String? = null,
@ApiModelProperty("远程仓库")
val remoteRepoName: String? = null,
@ApiModelProperty("当远程项目或者仓库不存在时是否进行创建, 只支持clusterId存在的情况下", required = false)
val createProjectOrRepo: Boolean = false,
@ApiModelProperty("同步对象类型, 只支持clusterId存在的情况", required = true)
val replicaObjectType: ReplicaObjectType? = null,
@ApiModelProperty("包限制条件", required = false)
val packageConstraints: List<PackageConstraint>? = null,
@ApiModelProperty("路径限制条件", required = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

package com.tencent.bkrepo.replication.pojo.remote.request

import com.tencent.bkrepo.replication.pojo.request.ReplicaObjectType
import com.tencent.bkrepo.replication.pojo.request.ReplicaType
import com.tencent.bkrepo.replication.pojo.task.setting.ReplicaSetting
import io.swagger.annotations.ApiModel
Expand Down Expand Up @@ -57,6 +58,10 @@ data class RemoteRunOnceTaskCreateRequest(
val remoteProjectId: String? = null,
@ApiModelProperty("远程仓库")
val remoteRepoName: String? = null,
@ApiModelProperty("当远程项目或者仓库不存在时是否进行创建, 只支持clusterId存在的情况", required = false)
val createProjectOrRepo: Boolean = false,
@ApiModelProperty("同步对象类型, 只支持clusterId存在的情况", required = true)
val replicaObjectType: ReplicaObjectType? = null,
@ApiModelProperty("包名", required = false)
val packageName: String? = null,
@ApiModelProperty("包版本", required = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,9 @@ data class ReplicaSetting(
/**
* 是否校验文件存储一致性
*/
val storageConsistencyCheck: Boolean = false
val storageConsistencyCheck: Boolean = false,
/**
* 当远程项目或者仓库不存在时是否自动创建
*/
val automaticCreateRemoteRepo: Boolean = true
)
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,53 @@ abstract class AbstractReplicaService(
private val localDataManager: LocalDataManager
) : ReplicaService {

/**
* 同步 task Object
*/
protected fun replicaTaskObjects(replicaContext: ReplicaContext) {
with(replicaContext) {
// 检查版本
replicator.checkVersion(this)
if (task.setting.automaticCreateRemoteRepo) {
// 同步项目信息
replicator.replicaProject(replicaContext)
// 同步仓库信息
replicator.replicaRepo(replicaContext)
}
// 按仓库同步
if (includeAllData(this)) {
replicaByRepo(this)
return
}
replicaTaskObjectConstraints(this)
}
}


/**
* 判断是否包含所有仓库数据,进行仓库同步
*/
open fun includeAllData(context: ReplicaContext): Boolean {
return false
}

/**
* 同步task object 中的包列表或者paths
*/
open fun replicaTaskObjectConstraints(replicaContext: ReplicaContext) {
with(replicaContext) {
// 按包同步
taskObject.packageConstraints.orEmpty().forEach {
replicaByPackageConstraint(this, it)
}
// 按路径同步
taskObject.pathConstraints.orEmpty().forEach {
replicaByPathConstraint(this, it)
}
}
}


/**
* 同步整个仓库数据
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ class EventBasedReplicaService(
with(context) {
// 同步仓库
retry(times = RETRY_COUNT, delayInSeconds = DELAY_IN_SECONDS) {
replicator.replicaRepo(this)
if (task.setting.automaticCreateRemoteRepo) {
replicator.replicaRepo(this)
}
}
when (event.type) {
EventType.NODE_CREATED -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,12 @@ package com.tencent.bkrepo.replication.replica.type.manual
import com.tencent.bkrepo.common.service.otel.util.AsyncUtils.trace
import com.tencent.bkrepo.replication.config.ReplicationProperties
import com.tencent.bkrepo.replication.manager.LocalDataManager
import com.tencent.bkrepo.replication.pojo.request.ReplicaObjectType
import com.tencent.bkrepo.replication.pojo.task.objects.PackageConstraint
import com.tencent.bkrepo.replication.pojo.task.objects.PathConstraint
import com.tencent.bkrepo.replication.replica.type.AbstractReplicaService
import com.tencent.bkrepo.replication.replica.context.ReplicaContext
import com.tencent.bkrepo.replication.replica.executor.ManualThreadPoolExecutor
import com.tencent.bkrepo.replication.replica.type.AbstractReplicaService
import com.tencent.bkrepo.replication.service.ReplicaRecordService
import org.springframework.stereotype.Component
import java.util.concurrent.Callable
Expand All @@ -50,40 +53,71 @@ class ManualBasedReplicaService(
) : AbstractReplicaService(replicaRecordService, localDataManager) {
private val executor = ManualThreadPoolExecutor.instance
override fun replica(context: ReplicaContext) {
val semaphore = Semaphore(replicationProperties.manualConcurrencyNum)
with(context) {
replicaTaskObjects(context)
}

/**
* 是否包含所有仓库数据
*/
override fun includeAllData(context: ReplicaContext): Boolean {
return context.taskObject.packageConstraints.isNullOrEmpty() &&
context.taskObject.pathConstraints.isNullOrEmpty() &&
context.task.replicaObjectType == ReplicaObjectType.REPOSITORY
}

/**
* 同步task object 中的包列表或者paths
*/
override fun replicaTaskObjectConstraints(replicaContext: ReplicaContext) {
with(replicaContext) {
val semaphore = Semaphore(replicationProperties.manualConcurrencyNum)

// 按包同步
val futureList = mutableListOf<Future<*>>()
taskObject.packageConstraints.orEmpty().forEach {
semaphore.acquire()
futureList.add(
executor.submit(
Callable{
try {
replicaByPackageConstraint(this, it)
} finally {
semaphore.release()
}
}.trace()
)
)
}
// 按包同步
mapEachTaskObject(semaphore, futureList, taskObject.packageConstraints.orEmpty(), replicaContext)
// 按路径同步
taskObject.pathConstraints.orEmpty().forEach {
semaphore.acquire()
futureList.add(
executor.submit(
Callable {
try {
replicaByPathConstraint(this, it)
} finally {
semaphore.release()
}
}.trace()
)
)
}
mapEachTaskObject(semaphore, futureList, taskObject.pathConstraints.orEmpty(), replicaContext)

futureList.forEach { it.get() }
}
}


/**
* 遍历执行所有分发任务
*/
private fun mapEachTaskObject(
semaphore: Semaphore, futureList: MutableList<Future<*>>,
taskObjects: List<Any>, context: ReplicaContext
) {
for (taskObject in taskObjects) {
semaphore.acquire()
futureList.add(
executor.submit(
Callable{
try {
replicaTaskObject(context, taskObject)
} finally {
semaphore.release()
}
}.trace()
)
)
}
}

/**
* 分发具体内容
*/
private fun replicaTaskObject(replicaContext: ReplicaContext, constraint: Any) {
when(constraint) {
constraint is PathConstraint ->
replicaByPathConstraint(replicaContext, constraint as PathConstraint)
constraint is PackageConstraint ->
replicaByPackageConstraint(replicaContext, constraint as PackageConstraint)
else -> return
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,33 +43,13 @@ class ScheduledReplicaService(
) : AbstractReplicaService(replicaRecordService, localDataManager) {

override fun replica(context: ReplicaContext) {
with(context) {
// 检查版本
replicator.checkVersion(this)
// 同步项目
replicator.replicaProject(this)
// 同步仓库
replicator.replicaRepo(this)
// 按仓库同步
if (includeAllData(this)) {
replicaByRepo(this)
return
}
// 按包同步
taskObject.packageConstraints.orEmpty().forEach {
replicaByPackageConstraint(this, it)
}
// 按路径同步
taskObject.pathConstraints.orEmpty().forEach {
replicaByPathConstraint(this, it)
}
}
replicaTaskObjects(context)
}

/**
* 是否包含所有仓库数据
*/
private fun includeAllData(context: ReplicaContext): Boolean {
override fun includeAllData(context: ReplicaContext): Boolean {
return context.taskObject.packageConstraints.isNullOrEmpty() &&
context.taskObject.pathConstraints.isNullOrEmpty()
}
Expand Down
Loading

0 comments on commit d357d02

Please sign in to comment.