diff --git a/src/main/java/com/ghostchu/btn/sparkle/module/analyse/AnalyseService.java b/src/main/java/com/ghostchu/btn/sparkle/module/analyse/AnalyseService.java index fdbdb58..44df772 100644 --- a/src/main/java/com/ghostchu/btn/sparkle/module/analyse/AnalyseService.java +++ b/src/main/java/com/ghostchu/btn/sparkle/module/analyse/AnalyseService.java @@ -6,9 +6,12 @@ import com.ghostchu.btn.sparkle.module.banhistory.internal.BanHistory; import com.ghostchu.btn.sparkle.module.banhistory.internal.BanHistoryRepository; import com.ghostchu.btn.sparkle.module.clientdiscovery.ClientDiscoveryService; +import com.ghostchu.btn.sparkle.module.clientdiscovery.ClientIdentity; import com.ghostchu.btn.sparkle.util.IPMerger; import com.ghostchu.btn.sparkle.util.IPUtil; +import com.ghostchu.btn.sparkle.util.PeerUtil; import com.ghostchu.btn.sparkle.util.TimeUtil; +import com.google.common.hash.BloomFilter; import inet.ipaddr.IPAddress; import inet.ipaddr.format.util.DualIPv4v6Tries; import io.micrometer.core.instrument.MeterRegistry; @@ -36,6 +39,7 @@ import java.time.OffsetDateTime; import java.time.temporal.ChronoUnit; import java.util.*; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; @@ -283,17 +287,25 @@ public void cronUpdateTrunkerFile() { // Trunker, A BitTorrent Tracker, not a typo but a name var startAt = System.currentTimeMillis(); final var ipTries = new DualIPv4v6Tries(); - //Set clientDiscoveries = Collections.synchronizedSet(new HashSet<>()); + List clientDiscoveries = new CopyOnWriteArrayList<>(); + BloomFilter bloomFilter = BloomFilter.create((from, into) -> into.putString(from, StandardCharsets.ISO_8859_1), 10_000_000, 0.01); AtomicLong count = new AtomicLong(0); AtomicLong success = new AtomicLong(0); try (var service = Executors.newFixedThreadPool(Math.max(1, Runtime.getRuntime().availableProcessors()))) { scanFile(peerInfo -> { count.incrementAndGet(); var peerId = new String(peerInfo.getPeerId().toByteArray(), StandardCharsets.ISO_8859_1); + var peerClientName = peerInfo.getUserAgent(); + if (peerClientName.length() > 250) { + // get the first 64 chars and append "..." and append last 64 chars + peerClientName = peerClientName.substring(0, 64) + "[...]" + peerClientName.substring(peerClientName.length() - 64); + } try { -// synchronized (clientDiscoveries) { -// clientDiscoveries.add(new ClientIdentity(PeerUtil.cutPeerId(peerId), PeerUtil.cutClientName("[UA] " + peerInfo.getUserAgent()))); -// } + if (bloomFilter.put(peerId + "@@@" + peerClientName)) { + synchronized (clientDiscoveries) { + clientDiscoveries.add(new ClientIdentity(PeerUtil.cutPeerId(peerId), PeerUtil.cutClientName("[UA] " + peerInfo.getUserAgent()))); + } + } if ( ((peerInfo.getUserAgent().contains("Transmission") == peerId.startsWith("-TR"))) || ((peerInfo.getUserAgent().contains("aria2") == peerId.startsWith("A2"))) @@ -315,12 +327,13 @@ public void cronUpdateTrunkerFile() { } catch (Exception e) { log.debug("Unable to handle PeerInfo check: {}, clientIp is {}", peerInfo, Arrays.toString(peerInfo.getIp().getClientIp().toByteArray()), e); } finally { -// synchronized (clientDiscoveries) { -// if (clientDiscoveries.size() > 5000) { -// clientDiscoveryService.handleIdentities(OffsetDateTime.now(), OffsetDateTime.now(), clientDiscoveries); -// clientDiscoveries.clear(); -// } -// } + if (clientDiscoveries.size() > 10000) { + synchronized (clientDiscoveries) { + clientDiscoveryService.handleIdentities(OffsetDateTime.now(), OffsetDateTime.now(), clientDiscoveries); + clientDiscoveries.clear(); + } + } + } }, service); } diff --git a/src/main/java/com/ghostchu/btn/sparkle/module/clientdiscovery/ClientDiscoveryService.java b/src/main/java/com/ghostchu/btn/sparkle/module/clientdiscovery/ClientDiscoveryService.java index c534552..97d8b0f 100644 --- a/src/main/java/com/ghostchu/btn/sparkle/module/clientdiscovery/ClientDiscoveryService.java +++ b/src/main/java/com/ghostchu/btn/sparkle/module/clientdiscovery/ClientDiscoveryService.java @@ -13,8 +13,7 @@ import java.io.Serializable; import java.time.OffsetDateTime; -import java.util.Set; - +import java.util.Collection; @Service public class ClientDiscoveryService { private final ClientDiscoveryRepository clientDiscoveryRepository; @@ -25,7 +24,7 @@ public ClientDiscoveryService(ClientDiscoveryRepository clientDiscoveryRepositor this.clientDiscoveryRepository = clientDiscoveryRepository; } - public void handleIdentities(OffsetDateTime timeForFoundAt, OffsetDateTime timeForLastSeenAt, Set clientIdentities) { + public void handleIdentities(OffsetDateTime timeForFoundAt, OffsetDateTime timeForLastSeenAt, Collection clientIdentities) { meterRegistry.counter("sparkle_client_discovery_processed").increment(); for (ClientIdentity ci : clientIdentities) { clientDiscoveryRepository.saveIgnoreConflict(ci.hash(), ByteUtil.filterUTF8(ci.getClientName()), ByteUtil.filterUTF8(ci.getPeerId()), timeForFoundAt); diff --git a/src/main/java/com/ghostchu/btn/sparkle/module/ping/PingService.java b/src/main/java/com/ghostchu/btn/sparkle/module/ping/PingService.java index 6f728e5..96838b0 100644 --- a/src/main/java/com/ghostchu/btn/sparkle/module/ping/PingService.java +++ b/src/main/java/com/ghostchu/btn/sparkle/module/ping/PingService.java @@ -20,6 +20,7 @@ import com.ghostchu.btn.sparkle.module.userapp.internal.UserApplication; import com.ghostchu.btn.sparkle.util.*; import com.ghostchu.btn.sparkle.util.ipdb.GeoIPManager; +import com.google.common.hash.BloomFilter; import io.micrometer.core.instrument.MeterRegistry; import jakarta.transaction.Transactional; import lombok.Data; @@ -32,6 +33,7 @@ import org.springframework.stereotype.Service; import java.net.InetAddress; +import java.nio.charset.StandardCharsets; import java.time.OffsetDateTime; import java.util.*; @@ -209,6 +211,7 @@ public long handlePeerHistories(InetAddress inetAddress, UserApplication userApp long processed = 0; var it = ping.getPeers().iterator(); var submitId = UUID.randomUUID().toString(); + BloomFilter bloomFilter = BloomFilter.create((from, into) -> into.putString(from, StandardCharsets.ISO_8859_1), 10_000_000, 0.01); while (it.hasNext()) { var peer = it.next(); var peerId = ByteUtil.filterUTF8(PeerUtil.cutPeerId(peer.getPeerId())); @@ -235,15 +238,20 @@ public long handlePeerHistories(InetAddress inetAddress, UserApplication userApp .lastTimeSeen(TimeUtil.toUTC(peer.getLastTimeSeen().getTime())) .submitterIp(inetAddress) .build()); - identitySet.add(new ClientIdentity(peerId, peerClientName)); + if (bloomFilter.put(peerId + "@@@" + peerClientName)) { + identitySet.add(new ClientIdentity(peerId, peerClientName)); + } // 避免爆内存,必须及时清理 it.remove(); - if (identitySet.size() >= 5000 || peerHistoryList.size() >= 5000) { + if (peerHistoryList.size() >= 5000) { peerHistoryService.saveHistories(peerHistoryList); - clientDiscoveryService.handleIdentities(now, now, identitySet); meterRegistry.counter("sparkle_ping_histories_processed").increment(peerHistoryList.size()); processed += peerHistoryList.size(); peerHistoryList.clear(); + } + + if (identitySet.size() > 5000) { + clientDiscoveryService.handleIdentities(now, now, identitySet); identitySet.clear(); } }