Skip to content

Commit

Permalink
feat: 降冷优化TencentBlueKing#2523
Browse files Browse the repository at this point in the history
* feat: 按月分表查询/upsert接口初始化索引TencentBlueKing#2523

* feat: 任务去重TencentBlueKing#2523

* feat: 代码调整TencentBlueKing#2523

* feat: 按月分表只有save/insert才会创建索引TencentBlueKing#2523

* feat: 按月分表只有save/insert才会创建索引TencentBlueKing#2523
  • Loading branch information
zacYL authored Aug 30, 2024
1 parent 4fdfd89 commit 00f2b0e
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ import com.tencent.bkrepo.common.mongo.dao.util.sharding.ShardingUtils
import org.springframework.data.mongodb.core.index.IndexDefinition
import java.util.concurrent.TimeUnit

/**
* 注意:按月分表的索引只有在insert/save的情况下才会创建
*/
abstract class MonthRangeShardingMongoDao<E> : RangeShardingMongoDao<E>() {

private val indexCache = CacheBuilder.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ import java.time.Duration
data class DataSeparationConfig(
// 当前时间到(当前时间-keepDays)为热数据,不允许拆分,避免误操作将所有数据进行降冷
var keepDays: Duration = Duration.ofDays(365),
// 特殊项目仓库配置
var specialRepos: MutableList<String> = mutableListOf(),
// 降冷特殊项目仓库配置
var specialSeparateRepos: MutableList<String> = mutableListOf(),
// 恢复特殊项目仓库配置
var specialRestoreRepos: MutableList<String> = mutableListOf(),
// 允许同时执行将冷任务数
var separateTaskConcurrency: Int = Runtime.getRuntime().availableProcessors() * 2,
// 允许同时执行的恢复任务数
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ package com.tencent.bkrepo.job.separation.dao

import com.tencent.bkrepo.common.mongo.dao.simple.SimpleMongoDao
import com.tencent.bkrepo.job.separation.model.TSeparationTask
import com.tencent.bkrepo.job.separation.pojo.SeparationContent
import com.tencent.bkrepo.job.separation.pojo.task.SeparationCount
import com.tencent.bkrepo.job.separation.pojo.task.SeparationTaskState
import com.tencent.bkrepo.repository.constant.SYSTEM_USER
Expand All @@ -76,10 +77,28 @@ class SeparationTaskDao : SimpleMongoDao<TSeparationTask>() {
return find(Query(criteria))
}

fun exist(projectId: String, repoName: String, state: String): Boolean {
fun exist(
projectId: String, repoName: String, state: String,
content: SeparationContent? = null,
type: String? = null,
separationDate: LocalDateTime? = null,
overwrite: Boolean? = null
): Boolean {
val criteria = Criteria().and(TSeparationTask::projectId.name).isEqualTo(projectId)
.and(TSeparationTask::repoName.name).isEqualTo(repoName)
.and(TSeparationTask::state.name).ne(state)
separationDate?.let {
criteria.and(TSeparationTask::separationDate.name).isEqualTo(separationDate)
}
type?.let {
criteria.and(TSeparationTask::type.name).isEqualTo(type)
}
content?.let {
criteria.and(TSeparationTask::content.name).isEqualTo(content)
}
overwrite?.let {
criteria.and(TSeparationTask::overwrite.name).isEqualTo(overwrite)
}
return exists(Query(criteria))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,13 @@
package com.tencent.bkrepo.job.separation.dao.repo

import com.mongodb.client.result.DeleteResult
import com.mongodb.client.result.UpdateResult
import com.tencent.bkrepo.common.mongo.dao.sharding.MonthRangeShardingMongoDao
import com.tencent.bkrepo.common.mongo.dao.util.Pages
import com.tencent.bkrepo.job.separation.model.repo.TSeparationMavenMetadataRecord
import com.tencent.bkrepo.job.separation.pojo.query.MavenMetadata
import com.tencent.bkrepo.job.separation.util.SeparationUtils
import org.springframework.data.mongodb.core.query.Criteria
import org.springframework.data.mongodb.core.query.Query
import org.springframework.data.mongodb.core.query.Update
import org.springframework.data.mongodb.core.query.isEqualTo
import org.springframework.stereotype.Repository
import java.time.LocalDateTime
Expand All @@ -73,20 +71,36 @@ class SeparationMavenMetadataDao : MonthRangeShardingMongoDao<TSeparationMavenMe

fun upsertMetaData(
mavenMetadata: MavenMetadata, separationDate: LocalDateTime
): UpdateResult {
): TSeparationMavenMetadataRecord {
val (startOfDay, endOfDay) = SeparationUtils.findStartAndEndTimeOfDate(separationDate)
val criteria = Criteria.where(TSeparationMavenMetadataRecord::projectId.name).isEqualTo(mavenMetadata.projectId)
.and(TSeparationMavenMetadataRecord::repoName.name).isEqualTo(mavenMetadata.repoName)
.and(TSeparationMavenMetadataRecord::groupId.name).isEqualTo(mavenMetadata.groupId)
.and(TSeparationMavenMetadataRecord::artifactId.name).isEqualTo(mavenMetadata.artifactId)
.and(TSeparationMavenMetadataRecord::version.name).isEqualTo(mavenMetadata.version)
.and(TSeparationMavenMetadataRecord::classifier.name).isEqualTo(mavenMetadata.classifier)
.and(TSeparationMavenMetadataRecord::extension.name).isEqualTo(mavenMetadata.extension)
.and(TSeparationMavenMetadataRecord::separationDate.name).isEqualTo(separationDate)

val metadataQuery = Query(criteria)
val update = Update().set(TSeparationMavenMetadataRecord::buildNo.name, mavenMetadata.buildNo)
.set(TSeparationMavenMetadataRecord::timestamp.name, mavenMetadata.timestamp)
return this.upsert(metadataQuery, update)
.and(TSeparationMavenMetadataRecord::separationDate.name).gte(startOfDay).lt(endOfDay)
var existedRecord = this.findOne(Query(criteria))
if (existedRecord == null) {
existedRecord = TSeparationMavenMetadataRecord(
id = null,
projectId = mavenMetadata.projectId,
repoName = mavenMetadata.repoName,
groupId = mavenMetadata.groupId,
artifactId = mavenMetadata.artifactId,
version = mavenMetadata.version,
classifier = mavenMetadata.classifier,
extension = mavenMetadata.extension,
separationDate = separationDate,
buildNo = mavenMetadata.buildNo,
timestamp = mavenMetadata.timestamp
)
} else {
existedRecord.buildNo = mavenMetadata.buildNo
existedRecord.timestamp = mavenMetadata.timestamp
}
return this.save(existedRecord)
}

fun search(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,25 @@ class SeparationRecoveryEventConsumer(
return
}
logger.info("current separation recovery message header is ${message.headers}")
doSeparationRecovery(message.payload)
try {
doSeparationRecovery(message.payload)
} catch (e: Exception) {
logger.warn("handle msg error: ${message.payload}")
}

}

private fun doSeparationRecovery(event: ArtifactEvent) {
val recoveryNodeInfo = buildRecoveryNodeInfo(event)
var flag = false
val projectRepoKey = "${recoveryNodeInfo.projectId}/${recoveryNodeInfo.repoName}"
dataSeparationConfig.specialRestoreRepos.forEach {
val regex = Regex(it.replace("*", ".*"))
if (regex.matches(projectRepoKey)) {
flag = true
}
}
if (!flag) return
val task = when (recoveryNodeInfo.repoType) {
RepositoryType.MAVEN.name -> {
val recoveryVersionInfo = RepoSpecialSeparationMappings.getRecoveryPackageVersionData(recoveryNodeInfo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,10 @@ class SeparationTaskServiceImpl(
SEPARATE -> {
validateSeparateTaskParams(request)
val task = buildSeparationTask(request)
separationTaskDao.save(task)
createTask(task)
}
RESTORE -> {
restoreTaskCheck(request.projectId, request.repoName)
createRestoreTask(request)
}
else -> {
Expand Down Expand Up @@ -124,6 +125,24 @@ class SeparationTaskServiceImpl(
return exist || failedExist
}

private fun restoreTaskCheck(
projectId: String,
repoName: String,
) {
var flag = false
val projectRepoKey = "${projectId}/${repoName}"
dataSeparationConfig.specialRestoreRepos.forEach {
val regex = Regex(it.replace("*", ".*"))
if (regex.matches(projectRepoKey)) {
flag = true
}
}
if (!flag) throw BadRequestException(
CommonMessageCode.PARAMETER_INVALID,
projectRepoKey
)
}

private fun createRestoreTask(request: SeparationTaskRequest) {
with(request) {
if (separateAt.isNullOrEmpty()) {
Expand All @@ -136,16 +155,33 @@ class SeparationTaskServiceImpl(
}
separatedDates.forEach {
val task = buildSeparationTask(request, it)
separationTaskDao.save(task)
createTask(task)
}
} else {
val date = LocalDateTime.parse(separateAt, DateTimeFormatter.ISO_DATE_TIME)
val task = buildSeparationTask(request, date)
separationTaskDao.save(task)
createTask(task)
}
}
}

private fun createTask(task: TSeparationTask) {
val exist = separationTaskDao.exist(
projectId = task.projectId,
repoName = task.repoName,
state = SeparationTaskState.FINISHED.name,
content = task.content,
type = task.type,
separationDate = task.separationDate,
overwrite = task.overwrite
)
if (exist) {
logger.info("$task is existed, ignore")
return
}
separationTaskDao.save(task)
}

private fun getRepoInfo(projectId: String, repoName: String): RepositoryDetail {
val repo = repositoryClient.getRepoDetail(projectId, repoName).data
?: run {
Expand All @@ -167,6 +203,20 @@ class SeparationTaskServiceImpl(
SeparationTaskRequest::separateAt.name
)
}

var flag = false
val projectRepoKey = "$projectId/$repoName"
dataSeparationConfig.specialSeparateRepos.forEach {
val regex = Regex(it.replace("*", ".*"))
if (regex.matches(projectRepoKey)) {
flag = true
}
}
if (!flag) throw BadRequestException(
CommonMessageCode.PARAMETER_INVALID,
projectRepoKey
)

val separateDate = try {
LocalDateTime.parse(separateAt, DateTimeFormatter.ISO_DATE_TIME)
} catch (e: DateTimeParseException) {
Expand All @@ -179,13 +229,7 @@ class SeparationTaskServiceImpl(
if (LocalDateTime.now().minusDays(dataSeparationConfig.keepDays.toDays()).isAfter(separateDate)) {
return
}
val projectRepoKey = "$projectId/$repoName"
dataSeparationConfig.specialRepos.forEach {
val regex = Regex(it.replace("*", ".*"))
if (regex.matches(projectRepoKey)) {
return
}
}

logger.warn("Separation date [$separateAt] is illegal!")
throw BadRequestException(
CommonMessageCode.PARAMETER_INVALID,
Expand Down

0 comments on commit 00f2b0e

Please sign in to comment.