Skip to content

Commit

Permalink
Remove DatabaseCare
Browse files Browse the repository at this point in the history
  • Loading branch information
Ghost-chu committed Dec 20, 2024
1 parent 1a2e8a4 commit e52c515
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 142 deletions.
221 changes: 103 additions & 118 deletions src/main/java/com/ghostchu/btn/sparkle/module/analyse/AnalyseService.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,8 @@ public class AnalyseService {
@Lock(LockModeType.READ)
@Scheduled(fixedRateString = "${analyse.untrustip.interval}")
public void cronUntrustedIPAddresses() throws InterruptedException {
try {
DatabaseCare.generateParallel.acquire();
var startAt = System.currentTimeMillis();
var ipTries = new DualIPv4v6Tries();
var startAt = System.currentTimeMillis();
var ipTries = new DualIPv4v6Tries();
/*
CREATE MATERIALIZED VIEW progress_cheat_blocker_agg_view
WITH (timescaledb.continuous) AS
Expand All @@ -113,39 +111,36 @@ SELECT add_continuous_aggregate_policy('progress_cheat_blocker_agg_view',
end_offset => INTERVAL '1 minute',
schedule_interval => INTERVAL '1 hour');
*/
var query = entityManager.createNativeQuery("""
SELECT peer_ip, SUM(app_count) AS untrust_count
FROM progress_cheat_blocker_agg_view
WHERE bucket >= ? AND bucket <= ?
GROUP BY peer_ip
HAVING SUM(app_count) > ?
ORDER BY untrust_count DESC;
""");
query.setParameter(1, new Timestamp(System.currentTimeMillis() - untrustedIpAddressGenerateOffset));
query.setParameter(2, new Timestamp(System.currentTimeMillis()));
query.setParameter(3, untrustedIpAddressGenerateThreshold);
List<Object[]> queryResult = query.getResultList();
for (Object[] arr : queryResult) {
var ipAddr = IPUtil.toIPAddress(((InetAddress) arr[0]).getHostAddress());
ipTries.add(ipAddr);
}
var query = entityManager.createNativeQuery("""
SELECT peer_ip, SUM(app_count) AS untrust_count
FROM progress_cheat_blocker_agg_view
WHERE bucket >= ? AND bucket <= ?
GROUP BY peer_ip
HAVING SUM(app_count) > ?
ORDER BY untrust_count DESC;
""");
query.setParameter(1, new Timestamp(System.currentTimeMillis() - untrustedIpAddressGenerateOffset));
query.setParameter(2, new Timestamp(System.currentTimeMillis()));
query.setParameter(3, untrustedIpAddressGenerateThreshold);
List<Object[]> queryResult = query.getResultList();
for (Object[] arr : queryResult) {
var ipAddr = IPUtil.toIPAddress(((InetAddress) arr[0]).getHostAddress());
ipTries.add(ipAddr);
}
// banHistoryRepository
// .generateUntrustedIPAddresses(
// TimeUtil.toUTC(System.currentTimeMillis() - untrustedIpAddressGenerateOffset),
// OffsetDateTime.now(),
// untrustedIpAddressGenerateThreshold,
// Duration.ofMinutes(30)
// )
ipMerger.merge(ipTries);
List<AnalysedRule> untrustedIps = new ArrayList<>();
filterIP(ipTries).forEach(ip -> untrustedIps.add(new AnalysedRule(null, ip.toString(), UNTRUSTED_IP, "Generated at " + MsgUtil.getNowDateTimeString())));
analysedRuleRepository.deleteAllByModule(UNTRUSTED_IP);
meterRegistry.gauge("sparkle_analyse_untrusted_ip_address", Collections.emptyList(), untrustedIps.size());
analysedRuleRepository.saveAll(untrustedIps);
log.info("Untrusted IPs: {}, tooked {} ms", untrustedIps.size(), System.currentTimeMillis() - startAt);
} finally {
DatabaseCare.generateParallel.release();
}
ipMerger.merge(ipTries);
List<AnalysedRule> untrustedIps = new ArrayList<>();
filterIP(ipTries).forEach(ip -> untrustedIps.add(new AnalysedRule(null, ip.toString(), UNTRUSTED_IP, "Generated at " + MsgUtil.getNowDateTimeString())));
analysedRuleRepository.deleteAllByModule(UNTRUSTED_IP);
meterRegistry.gauge("sparkle_analyse_untrusted_ip_address", Collections.emptyList(), untrustedIps.size());
analysedRuleRepository.saveAll(untrustedIps);
log.info("Untrusted IPs: {}, tooked {} ms", untrustedIps.size(), System.currentTimeMillis() - startAt);
}

@Transactional
Expand All @@ -166,33 +161,28 @@ public List<AnalysedRule> getOverDownloadIPAddresses() {
@Lock(LockModeType.READ)
@Scheduled(fixedRateString = "${analyse.highriskips.interval}")
public void cronHighRiskIps() throws InterruptedException {
try {
DatabaseCare.generateParallel.acquire();
var startAt = System.currentTimeMillis();
final var ipTries = new DualIPv4v6Tries();
banHistoryRepository.findAllByPaging((Specification<BanHistory>) (root, query, criteriaBuilder) -> {
if (query != null)
query.distinct(true);
return criteriaBuilder.and(criteriaBuilder.between(root.get("insertTime"), pastTimestamp(highRiskIpsOffset), nowTimestamp()),
criteriaBuilder.equal(root.get("module"), PCB_MODULE_NAME),
criteriaBuilder.like(root.get("peerClientName"), "aria2/%"));
}, page -> page.forEach(rule -> {
try {
ipTries.add(IPUtil.toIPAddress(rule.getPeerIp().getHostAddress()));
} catch (Exception e) {
log.warn("Unable to convert IP address: {}", rule.getPeerIp().getHostAddress(), e);
}
}));
var filtered = filterIP(ipTries);
var highRiskIps = ipMerger.merge(filtered)
.stream()
.map(ip -> new AnalysedRule(null, ip, HIGH_RISK_IP, "Generated at " + MsgUtil.getNowDateTimeString())).toList();
analysedRuleRepository.replaceAll(HIGH_RISK_IP, highRiskIps);
meterRegistry.gauge("sparkle_analyse_high_risk_ips", Collections.emptyList(), highRiskIps.size());
log.info("High risk IPs: {}, tooked {} ms", highRiskIps.size(), System.currentTimeMillis() - startAt);
} finally {
DatabaseCare.generateParallel.release();
}
var startAt = System.currentTimeMillis();
final var ipTries = new DualIPv4v6Tries();
banHistoryRepository.findAllByPaging((Specification<BanHistory>) (root, query, criteriaBuilder) -> {
if (query != null)
query.distinct(true);
return criteriaBuilder.and(criteriaBuilder.between(root.get("insertTime"), pastTimestamp(highRiskIpsOffset), nowTimestamp()),
criteriaBuilder.equal(root.get("module"), PCB_MODULE_NAME),
criteriaBuilder.like(root.get("peerClientName"), "aria2/%"));
}, page -> page.forEach(rule -> {
try {
ipTries.add(IPUtil.toIPAddress(rule.getPeerIp().getHostAddress()));
} catch (Exception e) {
log.warn("Unable to convert IP address: {}", rule.getPeerIp().getHostAddress(), e);
}
}));
var filtered = filterIP(ipTries);
var highRiskIps = ipMerger.merge(filtered)
.stream()
.map(ip -> new AnalysedRule(null, ip, HIGH_RISK_IP, "Generated at " + MsgUtil.getNowDateTimeString())).toList();
analysedRuleRepository.replaceAll(HIGH_RISK_IP, highRiskIps);
meterRegistry.gauge("sparkle_analyse_high_risk_ips", Collections.emptyList(), highRiskIps.size());
log.info("High risk IPs: {}, tooked {} ms", highRiskIps.size(), System.currentTimeMillis() - startAt);
}

public List<AnalysedRule> getHighRiskIps() {
Expand Down Expand Up @@ -313,71 +303,66 @@ public List<AnalysedRule> getTrackerHighRisk() {
@Lock(LockModeType.READ)
//@Scheduled(fixedRateString = "${analyse.overdownload.interval}")
public void cronUpdateOverDownload() throws InterruptedException {
try {
DatabaseCare.generateParallel.acquire();
var startAt = System.currentTimeMillis();
var query = entityManager.createNativeQuery("""
WITH LatestSnapshots AS (
SELECT DISTINCT ON (s.torrent, s.peer_ip, s.user_application)
s.id,
s.torrent,
s.peer_ip,
s.user_application,
s.to_peer_traffic,
s.last_time_seen
FROM
public.peer_history s
WHERE
s.last_time_seen >= ? AND s.to_peer_traffic > 0
ORDER BY
s.torrent, s.peer_ip, s.user_application, s.last_time_seen DESC
),
AggregatedUploads AS (
SELECT
ls.torrent,
ls.peer_ip,
SUM(ls.to_peer_traffic) AS total_uploaded
FROM
LatestSnapshots ls
GROUP BY
ls.torrent, ls.peer_ip
HAVING
SUM(ls.to_peer_traffic) > 0
)
SELECT
au.torrent,
au.peer_ip,
au.total_uploaded,
t.size,
(au.total_uploaded / t.size::float) * 100 AS upload_percentage
var startAt = System.currentTimeMillis();
var query = entityManager.createNativeQuery("""
WITH LatestSnapshots AS (
SELECT DISTINCT ON (s.torrent, s.peer_ip, s.user_application)
s.id,
s.torrent,
s.peer_ip,
s.user_application,
s.to_peer_traffic,
s.last_time_seen
FROM
AggregatedUploads au
JOIN
public.torrent t ON au.torrent = t.id
public.peer_history s
WHERE
t.size::float > 0 AND au.total_uploaded > t.size::float * ?
s.last_time_seen >= ? AND s.to_peer_traffic > 0
ORDER BY
upload_percentage DESC;
""");
query.setParameter(1, new Timestamp(System.currentTimeMillis() - overDownloadGenerateOffset));
query.setParameter(2, overDownloadGenerateThreshold);
List<Object[]> queryResult = query.getResultList();
var ipTries = new DualIPv4v6Tries();
for (Object[] arr : queryResult) {
var ipAddr = IPUtil.toIPAddress(((InetAddress) arr[1]).getHostAddress());
ipTries.add(ipAddr);
}
ipTries = filterIP(ipTries);
List<AnalysedRule> rules = new ArrayList<>();
ipMerger.merge(ipTries).forEach(i -> rules.add(new AnalysedRule(null, i, OVER_DOWNLOAD,
"Generated at " + MsgUtil.getNowDateTimeString())));
analysedRuleRepository.replaceAll(OVER_DOWNLOAD, rules);
meterRegistry.gauge("sparkle_analyse_over_download_ips", Collections.emptyList(), rules.size());
log.info("Over download IPs: {}, tooked {} ms", rules.size(), System.currentTimeMillis() - startAt);
} finally {
DatabaseCare.generateParallel.release();
s.torrent, s.peer_ip, s.user_application, s.last_time_seen DESC
),
AggregatedUploads AS (
SELECT
ls.torrent,
ls.peer_ip,
SUM(ls.to_peer_traffic) AS total_uploaded
FROM
LatestSnapshots ls
GROUP BY
ls.torrent, ls.peer_ip
HAVING
SUM(ls.to_peer_traffic) > 0
)
SELECT
au.torrent,
au.peer_ip,
au.total_uploaded,
t.size,
(au.total_uploaded / t.size::float) * 100 AS upload_percentage
FROM
AggregatedUploads au
JOIN
public.torrent t ON au.torrent = t.id
WHERE
t.size::float > 0 AND au.total_uploaded > t.size::float * ?
ORDER BY
upload_percentage DESC;
""");
query.setParameter(1, new Timestamp(System.currentTimeMillis() - overDownloadGenerateOffset));
query.setParameter(2, overDownloadGenerateThreshold);
List<Object[]> queryResult = query.getResultList();
var ipTries = new DualIPv4v6Tries();
for (Object[] arr : queryResult) {
var ipAddr = IPUtil.toIPAddress(((InetAddress) arr[1]).getHostAddress());
ipTries.add(ipAddr);
}
ipTries = filterIP(ipTries);
List<AnalysedRule> rules = new ArrayList<>();
ipMerger.merge(ipTries).forEach(i -> rules.add(new AnalysedRule(null, i, OVER_DOWNLOAD,
"Generated at " + MsgUtil.getNowDateTimeString())));
analysedRuleRepository.replaceAll(OVER_DOWNLOAD, rules);
meterRegistry.gauge("sparkle_analyse_over_download_ips", Collections.emptyList(), rules.size());
log.info("Over download IPs: {}, tooked {} ms", rules.size(), System.currentTimeMillis() - startAt);
}

//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import com.ghostchu.btn.sparkle.module.banhistory.BanHistoryService;
import com.ghostchu.btn.sparkle.module.banhistory.internal.BanHistory;
import com.ghostchu.btn.sparkle.module.banhistory.internal.BanHistoryRepository;
import com.ghostchu.btn.sparkle.util.DatabaseCare;
import com.ghostchu.btn.sparkle.util.IPUtil;
import jakarta.transaction.Transactional;
import lombok.Cleanup;
Expand Down Expand Up @@ -60,22 +59,17 @@ public void githubRuleUpdate() throws IOException, InterruptedException {
throw new IllegalArgumentException("Organization " + orgName + " not found");
}
var repository = organization.getRepository(repoName);
try {
DatabaseCare.generateParallel.acquire();
updateFile(repository, "untrusted-ips.txt", () -> generateUntrustedIps().getBytes(StandardCharsets.UTF_8));
updateFile(repository, "high-risk-ips.txt", () -> generateHighRiskIps().getBytes(StandardCharsets.UTF_8));
updateFile(repository, "overdownload-ips.txt", () -> generateOverDownloadIps().getBytes(StandardCharsets.UTF_8));
//updateFile(repository, "strange_ipv6_block.txt", () -> generateStrangeIPV6().getBytes(StandardCharsets.UTF_8));
updateFile(repository, "random-peerid.txt", () -> generateGopeedDev().getBytes(StandardCharsets.UTF_8));
updateFile(repository, "hp_torrent.txt", () -> generateHpTorrents().getBytes(StandardCharsets.UTF_8));
updateFile(repository, "dt_torrent.txt", () -> generateDtTorrents().getBytes(StandardCharsets.UTF_8));
//updateFile(repository, "go.torrent dev 20181121.txt", () -> generateBaiduNetdisk().getBytes(StandardCharsets.UTF_8));
updateFile(repository, "0xde-0xad-0xbe-0xef.txt", () -> generateDeadBeef().getBytes(StandardCharsets.UTF_8));
updateFile(repository, "123pan.txt", () -> generate123pan().getBytes(StandardCharsets.UTF_8));
updateFile(repository, "tracker-high-risk-ips.txt", () -> generateTrackerHighRiskIps().getBytes(StandardCharsets.UTF_8));
} finally {
DatabaseCare.generateParallel.release();
}
updateFile(repository, "untrusted-ips.txt", () -> generateUntrustedIps().getBytes(StandardCharsets.UTF_8));
updateFile(repository, "high-risk-ips.txt", () -> generateHighRiskIps().getBytes(StandardCharsets.UTF_8));
updateFile(repository, "overdownload-ips.txt", () -> generateOverDownloadIps().getBytes(StandardCharsets.UTF_8));
//updateFile(repository, "strange_ipv6_block.txt", () -> generateStrangeIPV6().getBytes(StandardCharsets.UTF_8));
updateFile(repository, "random-peerid.txt", () -> generateGopeedDev().getBytes(StandardCharsets.UTF_8));
updateFile(repository, "hp_torrent.txt", () -> generateHpTorrents().getBytes(StandardCharsets.UTF_8));
updateFile(repository, "dt_torrent.txt", () -> generateDtTorrents().getBytes(StandardCharsets.UTF_8));
//updateFile(repository, "go.torrent dev 20181121.txt", () -> generateBaiduNetdisk().getBytes(StandardCharsets.UTF_8));
updateFile(repository, "0xde-0xad-0xbe-0xef.txt", () -> generateDeadBeef().getBytes(StandardCharsets.UTF_8));
updateFile(repository, "123pan.txt", () -> generate123pan().getBytes(StandardCharsets.UTF_8));
updateFile(repository, "tracker-high-risk-ips.txt", () -> generateTrackerHighRiskIps().getBytes(StandardCharsets.UTF_8));
}

private String generateTrackerHighRiskIps() {
Expand Down
7 changes: 0 additions & 7 deletions src/main/java/com/ghostchu/btn/sparkle/util/DatabaseCare.java

This file was deleted.

0 comments on commit e52c515

Please sign in to comment.