From e52c5152787259eb16e567dd78c854733df053ed Mon Sep 17 00:00:00 2001 From: Ghost_chu Date: Sat, 21 Dec 2024 00:14:22 +0800 Subject: [PATCH] Remove DatabaseCare --- .../module/analyse/AnalyseService.java | 221 ++++++++---------- .../githubupdate/GithubUpdateService.java | 28 +-- .../btn/sparkle/util/DatabaseCare.java | 7 - 3 files changed, 114 insertions(+), 142 deletions(-) delete mode 100644 src/main/java/com/ghostchu/btn/sparkle/util/DatabaseCare.java diff --git a/src/main/java/com/ghostchu/btn/sparkle/module/analyse/AnalyseService.java b/src/main/java/com/ghostchu/btn/sparkle/module/analyse/AnalyseService.java index bee545df..6f46ebed 100644 --- a/src/main/java/com/ghostchu/btn/sparkle/module/analyse/AnalyseService.java +++ b/src/main/java/com/ghostchu/btn/sparkle/module/analyse/AnalyseService.java @@ -91,10 +91,8 @@ public class AnalyseService { @Lock(LockModeType.READ) @Scheduled(fixedRateString = "${analyse.untrustip.interval}") public void cronUntrustedIPAddresses() throws InterruptedException { - try { - DatabaseCare.generateParallel.acquire(); - var startAt = System.currentTimeMillis(); - var ipTries = new DualIPv4v6Tries(); + var startAt = System.currentTimeMillis(); + var ipTries = new DualIPv4v6Tries(); /* CREATE MATERIALIZED VIEW progress_cheat_blocker_agg_view WITH (timescaledb.continuous) AS @@ -113,22 +111,22 @@ SELECT add_continuous_aggregate_policy('progress_cheat_blocker_agg_view', end_offset => INTERVAL '1 minute', schedule_interval => INTERVAL '1 hour'); */ - var query = entityManager.createNativeQuery(""" - SELECT peer_ip, SUM(app_count) AS untrust_count - FROM progress_cheat_blocker_agg_view - WHERE bucket >= ? AND bucket <= ? - GROUP BY peer_ip - HAVING SUM(app_count) > ? - ORDER BY untrust_count DESC; - """); - query.setParameter(1, new Timestamp(System.currentTimeMillis() - untrustedIpAddressGenerateOffset)); - query.setParameter(2, new Timestamp(System.currentTimeMillis())); - query.setParameter(3, untrustedIpAddressGenerateThreshold); - List queryResult = query.getResultList(); - for (Object[] arr : queryResult) { - var ipAddr = IPUtil.toIPAddress(((InetAddress) arr[0]).getHostAddress()); - ipTries.add(ipAddr); - } + var query = entityManager.createNativeQuery(""" + SELECT peer_ip, SUM(app_count) AS untrust_count + FROM progress_cheat_blocker_agg_view + WHERE bucket >= ? AND bucket <= ? + GROUP BY peer_ip + HAVING SUM(app_count) > ? + ORDER BY untrust_count DESC; + """); + query.setParameter(1, new Timestamp(System.currentTimeMillis() - untrustedIpAddressGenerateOffset)); + query.setParameter(2, new Timestamp(System.currentTimeMillis())); + query.setParameter(3, untrustedIpAddressGenerateThreshold); + List queryResult = query.getResultList(); + for (Object[] arr : queryResult) { + var ipAddr = IPUtil.toIPAddress(((InetAddress) arr[0]).getHostAddress()); + ipTries.add(ipAddr); + } // banHistoryRepository // .generateUntrustedIPAddresses( // TimeUtil.toUTC(System.currentTimeMillis() - untrustedIpAddressGenerateOffset), @@ -136,16 +134,13 @@ HAVING SUM(app_count) > ? // untrustedIpAddressGenerateThreshold, // Duration.ofMinutes(30) // ) - ipMerger.merge(ipTries); - List untrustedIps = new ArrayList<>(); - filterIP(ipTries).forEach(ip -> untrustedIps.add(new AnalysedRule(null, ip.toString(), UNTRUSTED_IP, "Generated at " + MsgUtil.getNowDateTimeString()))); - analysedRuleRepository.deleteAllByModule(UNTRUSTED_IP); - meterRegistry.gauge("sparkle_analyse_untrusted_ip_address", Collections.emptyList(), untrustedIps.size()); - analysedRuleRepository.saveAll(untrustedIps); - log.info("Untrusted IPs: {}, tooked {} ms", untrustedIps.size(), System.currentTimeMillis() - startAt); - } finally { - DatabaseCare.generateParallel.release(); - } + ipMerger.merge(ipTries); + List untrustedIps = new ArrayList<>(); + filterIP(ipTries).forEach(ip -> untrustedIps.add(new AnalysedRule(null, ip.toString(), UNTRUSTED_IP, "Generated at " + MsgUtil.getNowDateTimeString()))); + analysedRuleRepository.deleteAllByModule(UNTRUSTED_IP); + meterRegistry.gauge("sparkle_analyse_untrusted_ip_address", Collections.emptyList(), untrustedIps.size()); + analysedRuleRepository.saveAll(untrustedIps); + log.info("Untrusted IPs: {}, tooked {} ms", untrustedIps.size(), System.currentTimeMillis() - startAt); } @Transactional @@ -166,33 +161,28 @@ public List getOverDownloadIPAddresses() { @Lock(LockModeType.READ) @Scheduled(fixedRateString = "${analyse.highriskips.interval}") public void cronHighRiskIps() throws InterruptedException { - try { - DatabaseCare.generateParallel.acquire(); - var startAt = System.currentTimeMillis(); - final var ipTries = new DualIPv4v6Tries(); - banHistoryRepository.findAllByPaging((Specification) (root, query, criteriaBuilder) -> { - if (query != null) - query.distinct(true); - return criteriaBuilder.and(criteriaBuilder.between(root.get("insertTime"), pastTimestamp(highRiskIpsOffset), nowTimestamp()), - criteriaBuilder.equal(root.get("module"), PCB_MODULE_NAME), - criteriaBuilder.like(root.get("peerClientName"), "aria2/%")); - }, page -> page.forEach(rule -> { - try { - ipTries.add(IPUtil.toIPAddress(rule.getPeerIp().getHostAddress())); - } catch (Exception e) { - log.warn("Unable to convert IP address: {}", rule.getPeerIp().getHostAddress(), e); - } - })); - var filtered = filterIP(ipTries); - var highRiskIps = ipMerger.merge(filtered) - .stream() - .map(ip -> new AnalysedRule(null, ip, HIGH_RISK_IP, "Generated at " + MsgUtil.getNowDateTimeString())).toList(); - analysedRuleRepository.replaceAll(HIGH_RISK_IP, highRiskIps); - meterRegistry.gauge("sparkle_analyse_high_risk_ips", Collections.emptyList(), highRiskIps.size()); - log.info("High risk IPs: {}, tooked {} ms", highRiskIps.size(), System.currentTimeMillis() - startAt); - } finally { - DatabaseCare.generateParallel.release(); - } + var startAt = System.currentTimeMillis(); + final var ipTries = new DualIPv4v6Tries(); + banHistoryRepository.findAllByPaging((Specification) (root, query, criteriaBuilder) -> { + if (query != null) + query.distinct(true); + return criteriaBuilder.and(criteriaBuilder.between(root.get("insertTime"), pastTimestamp(highRiskIpsOffset), nowTimestamp()), + criteriaBuilder.equal(root.get("module"), PCB_MODULE_NAME), + criteriaBuilder.like(root.get("peerClientName"), "aria2/%")); + }, page -> page.forEach(rule -> { + try { + ipTries.add(IPUtil.toIPAddress(rule.getPeerIp().getHostAddress())); + } catch (Exception e) { + log.warn("Unable to convert IP address: {}", rule.getPeerIp().getHostAddress(), e); + } + })); + var filtered = filterIP(ipTries); + var highRiskIps = ipMerger.merge(filtered) + .stream() + .map(ip -> new AnalysedRule(null, ip, HIGH_RISK_IP, "Generated at " + MsgUtil.getNowDateTimeString())).toList(); + analysedRuleRepository.replaceAll(HIGH_RISK_IP, highRiskIps); + meterRegistry.gauge("sparkle_analyse_high_risk_ips", Collections.emptyList(), highRiskIps.size()); + log.info("High risk IPs: {}, tooked {} ms", highRiskIps.size(), System.currentTimeMillis() - startAt); } public List getHighRiskIps() { @@ -313,71 +303,66 @@ public List getTrackerHighRisk() { @Lock(LockModeType.READ) //@Scheduled(fixedRateString = "${analyse.overdownload.interval}") public void cronUpdateOverDownload() throws InterruptedException { - try { - DatabaseCare.generateParallel.acquire(); - var startAt = System.currentTimeMillis(); - var query = entityManager.createNativeQuery(""" - WITH LatestSnapshots AS ( - SELECT DISTINCT ON (s.torrent, s.peer_ip, s.user_application) - s.id, - s.torrent, - s.peer_ip, - s.user_application, - s.to_peer_traffic, - s.last_time_seen - FROM - public.peer_history s - WHERE - s.last_time_seen >= ? AND s.to_peer_traffic > 0 - ORDER BY - s.torrent, s.peer_ip, s.user_application, s.last_time_seen DESC - ), - AggregatedUploads AS ( - SELECT - ls.torrent, - ls.peer_ip, - SUM(ls.to_peer_traffic) AS total_uploaded - FROM - LatestSnapshots ls - GROUP BY - ls.torrent, ls.peer_ip - HAVING - SUM(ls.to_peer_traffic) > 0 - ) - SELECT - au.torrent, - au.peer_ip, - au.total_uploaded, - t.size, - (au.total_uploaded / t.size::float) * 100 AS upload_percentage + var startAt = System.currentTimeMillis(); + var query = entityManager.createNativeQuery(""" + WITH LatestSnapshots AS ( + SELECT DISTINCT ON (s.torrent, s.peer_ip, s.user_application) + s.id, + s.torrent, + s.peer_ip, + s.user_application, + s.to_peer_traffic, + s.last_time_seen FROM - AggregatedUploads au - JOIN - public.torrent t ON au.torrent = t.id + public.peer_history s WHERE - t.size::float > 0 AND au.total_uploaded > t.size::float * ? + s.last_time_seen >= ? AND s.to_peer_traffic > 0 ORDER BY - upload_percentage DESC; - - """); - query.setParameter(1, new Timestamp(System.currentTimeMillis() - overDownloadGenerateOffset)); - query.setParameter(2, overDownloadGenerateThreshold); - List queryResult = query.getResultList(); - var ipTries = new DualIPv4v6Tries(); - for (Object[] arr : queryResult) { - var ipAddr = IPUtil.toIPAddress(((InetAddress) arr[1]).getHostAddress()); - ipTries.add(ipAddr); - } - ipTries = filterIP(ipTries); - List rules = new ArrayList<>(); - ipMerger.merge(ipTries).forEach(i -> rules.add(new AnalysedRule(null, i, OVER_DOWNLOAD, - "Generated at " + MsgUtil.getNowDateTimeString()))); - analysedRuleRepository.replaceAll(OVER_DOWNLOAD, rules); - meterRegistry.gauge("sparkle_analyse_over_download_ips", Collections.emptyList(), rules.size()); - log.info("Over download IPs: {}, tooked {} ms", rules.size(), System.currentTimeMillis() - startAt); - } finally { - DatabaseCare.generateParallel.release(); + s.torrent, s.peer_ip, s.user_application, s.last_time_seen DESC + ), + AggregatedUploads AS ( + SELECT + ls.torrent, + ls.peer_ip, + SUM(ls.to_peer_traffic) AS total_uploaded + FROM + LatestSnapshots ls + GROUP BY + ls.torrent, ls.peer_ip + HAVING + SUM(ls.to_peer_traffic) > 0 + ) + SELECT + au.torrent, + au.peer_ip, + au.total_uploaded, + t.size, + (au.total_uploaded / t.size::float) * 100 AS upload_percentage + FROM + AggregatedUploads au + JOIN + public.torrent t ON au.torrent = t.id + WHERE + t.size::float > 0 AND au.total_uploaded > t.size::float * ? + ORDER BY + upload_percentage DESC; + + """); + query.setParameter(1, new Timestamp(System.currentTimeMillis() - overDownloadGenerateOffset)); + query.setParameter(2, overDownloadGenerateThreshold); + List queryResult = query.getResultList(); + var ipTries = new DualIPv4v6Tries(); + for (Object[] arr : queryResult) { + var ipAddr = IPUtil.toIPAddress(((InetAddress) arr[1]).getHostAddress()); + ipTries.add(ipAddr); } + ipTries = filterIP(ipTries); + List rules = new ArrayList<>(); + ipMerger.merge(ipTries).forEach(i -> rules.add(new AnalysedRule(null, i, OVER_DOWNLOAD, + "Generated at " + MsgUtil.getNowDateTimeString()))); + analysedRuleRepository.replaceAll(OVER_DOWNLOAD, rules); + meterRegistry.gauge("sparkle_analyse_over_download_ips", Collections.emptyList(), rules.size()); + log.info("Over download IPs: {}, tooked {} ms", rules.size(), System.currentTimeMillis() - startAt); } // diff --git a/src/main/java/com/ghostchu/btn/sparkle/module/githubupdate/GithubUpdateService.java b/src/main/java/com/ghostchu/btn/sparkle/module/githubupdate/GithubUpdateService.java index d2011487..bc12d6ab 100644 --- a/src/main/java/com/ghostchu/btn/sparkle/module/githubupdate/GithubUpdateService.java +++ b/src/main/java/com/ghostchu/btn/sparkle/module/githubupdate/GithubUpdateService.java @@ -5,7 +5,6 @@ import com.ghostchu.btn.sparkle.module.banhistory.BanHistoryService; import com.ghostchu.btn.sparkle.module.banhistory.internal.BanHistory; import com.ghostchu.btn.sparkle.module.banhistory.internal.BanHistoryRepository; -import com.ghostchu.btn.sparkle.util.DatabaseCare; import com.ghostchu.btn.sparkle.util.IPUtil; import jakarta.transaction.Transactional; import lombok.Cleanup; @@ -60,22 +59,17 @@ public void githubRuleUpdate() throws IOException, InterruptedException { throw new IllegalArgumentException("Organization " + orgName + " not found"); } var repository = organization.getRepository(repoName); - try { - DatabaseCare.generateParallel.acquire(); - updateFile(repository, "untrusted-ips.txt", () -> generateUntrustedIps().getBytes(StandardCharsets.UTF_8)); - updateFile(repository, "high-risk-ips.txt", () -> generateHighRiskIps().getBytes(StandardCharsets.UTF_8)); - updateFile(repository, "overdownload-ips.txt", () -> generateOverDownloadIps().getBytes(StandardCharsets.UTF_8)); - //updateFile(repository, "strange_ipv6_block.txt", () -> generateStrangeIPV6().getBytes(StandardCharsets.UTF_8)); - updateFile(repository, "random-peerid.txt", () -> generateGopeedDev().getBytes(StandardCharsets.UTF_8)); - updateFile(repository, "hp_torrent.txt", () -> generateHpTorrents().getBytes(StandardCharsets.UTF_8)); - updateFile(repository, "dt_torrent.txt", () -> generateDtTorrents().getBytes(StandardCharsets.UTF_8)); - //updateFile(repository, "go.torrent dev 20181121.txt", () -> generateBaiduNetdisk().getBytes(StandardCharsets.UTF_8)); - updateFile(repository, "0xde-0xad-0xbe-0xef.txt", () -> generateDeadBeef().getBytes(StandardCharsets.UTF_8)); - updateFile(repository, "123pan.txt", () -> generate123pan().getBytes(StandardCharsets.UTF_8)); - updateFile(repository, "tracker-high-risk-ips.txt", () -> generateTrackerHighRiskIps().getBytes(StandardCharsets.UTF_8)); - } finally { - DatabaseCare.generateParallel.release(); - } + updateFile(repository, "untrusted-ips.txt", () -> generateUntrustedIps().getBytes(StandardCharsets.UTF_8)); + updateFile(repository, "high-risk-ips.txt", () -> generateHighRiskIps().getBytes(StandardCharsets.UTF_8)); + updateFile(repository, "overdownload-ips.txt", () -> generateOverDownloadIps().getBytes(StandardCharsets.UTF_8)); + //updateFile(repository, "strange_ipv6_block.txt", () -> generateStrangeIPV6().getBytes(StandardCharsets.UTF_8)); + updateFile(repository, "random-peerid.txt", () -> generateGopeedDev().getBytes(StandardCharsets.UTF_8)); + updateFile(repository, "hp_torrent.txt", () -> generateHpTorrents().getBytes(StandardCharsets.UTF_8)); + updateFile(repository, "dt_torrent.txt", () -> generateDtTorrents().getBytes(StandardCharsets.UTF_8)); + //updateFile(repository, "go.torrent dev 20181121.txt", () -> generateBaiduNetdisk().getBytes(StandardCharsets.UTF_8)); + updateFile(repository, "0xde-0xad-0xbe-0xef.txt", () -> generateDeadBeef().getBytes(StandardCharsets.UTF_8)); + updateFile(repository, "123pan.txt", () -> generate123pan().getBytes(StandardCharsets.UTF_8)); + updateFile(repository, "tracker-high-risk-ips.txt", () -> generateTrackerHighRiskIps().getBytes(StandardCharsets.UTF_8)); } private String generateTrackerHighRiskIps() { diff --git a/src/main/java/com/ghostchu/btn/sparkle/util/DatabaseCare.java b/src/main/java/com/ghostchu/btn/sparkle/util/DatabaseCare.java deleted file mode 100644 index 2589b4bd..00000000 --- a/src/main/java/com/ghostchu/btn/sparkle/util/DatabaseCare.java +++ /dev/null @@ -1,7 +0,0 @@ -package com.ghostchu.btn.sparkle.util; - -import java.util.concurrent.Semaphore; - -public class DatabaseCare { - public static final Semaphore generateParallel = new Semaphore(3); -}