Skip to content

Commit

Permalink
使用 BloomFilter 加速
Browse files Browse the repository at this point in the history
  • Loading branch information
Ghost-chu committed Jan 4, 2025
1 parent 3292c88 commit ef97c0d
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
import com.ghostchu.btn.sparkle.module.banhistory.internal.BanHistory;
import com.ghostchu.btn.sparkle.module.banhistory.internal.BanHistoryRepository;
import com.ghostchu.btn.sparkle.module.clientdiscovery.ClientDiscoveryService;
import com.ghostchu.btn.sparkle.module.clientdiscovery.ClientIdentity;
import com.ghostchu.btn.sparkle.util.IPMerger;
import com.ghostchu.btn.sparkle.util.IPUtil;
import com.ghostchu.btn.sparkle.util.PeerUtil;
import com.ghostchu.btn.sparkle.util.TimeUtil;
import com.google.common.hash.BloomFilter;
import inet.ipaddr.IPAddress;
import inet.ipaddr.format.util.DualIPv4v6Tries;
import io.micrometer.core.instrument.MeterRegistry;
Expand Down Expand Up @@ -36,6 +39,7 @@
import java.time.OffsetDateTime;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -283,17 +287,25 @@ public void cronUpdateTrunkerFile() {
// Trunker, A BitTorrent Tracker, not a typo but a name
var startAt = System.currentTimeMillis();
final var ipTries = new DualIPv4v6Tries();
//Set<ClientIdentity> clientDiscoveries = Collections.synchronizedSet(new HashSet<>());
List<ClientIdentity> clientDiscoveries = new CopyOnWriteArrayList<>();
BloomFilter<String> bloomFilter = BloomFilter.create((from, into) -> into.putString(from, StandardCharsets.ISO_8859_1), 10_000_000, 0.01);
AtomicLong count = new AtomicLong(0);
AtomicLong success = new AtomicLong(0);
try (var service = Executors.newFixedThreadPool(Math.max(1, Runtime.getRuntime().availableProcessors()))) {
scanFile(peerInfo -> {
count.incrementAndGet();
var peerId = new String(peerInfo.getPeerId().toByteArray(), StandardCharsets.ISO_8859_1);
var peerClientName = peerInfo.getUserAgent();
if (peerClientName.length() > 250) {
// get the first 64 chars and append "..." and append last 64 chars
peerClientName = peerClientName.substring(0, 64) + "[...]" + peerClientName.substring(peerClientName.length() - 64);
}
try {
// synchronized (clientDiscoveries) {
// clientDiscoveries.add(new ClientIdentity(PeerUtil.cutPeerId(peerId), PeerUtil.cutClientName("[UA] " + peerInfo.getUserAgent())));
// }
if (bloomFilter.put(peerId + "@@@" + peerClientName)) {
synchronized (clientDiscoveries) {
clientDiscoveries.add(new ClientIdentity(PeerUtil.cutPeerId(peerId), PeerUtil.cutClientName("[UA] " + peerInfo.getUserAgent())));
}
}
if (
((peerInfo.getUserAgent().contains("Transmission") == peerId.startsWith("-TR")))
|| ((peerInfo.getUserAgent().contains("aria2") == peerId.startsWith("A2")))
Expand All @@ -315,12 +327,13 @@ public void cronUpdateTrunkerFile() {
} catch (Exception e) {
log.debug("Unable to handle PeerInfo check: {}, clientIp is {}", peerInfo, Arrays.toString(peerInfo.getIp().getClientIp().toByteArray()), e);
} finally {
// synchronized (clientDiscoveries) {
// if (clientDiscoveries.size() > 5000) {
// clientDiscoveryService.handleIdentities(OffsetDateTime.now(), OffsetDateTime.now(), clientDiscoveries);
// clientDiscoveries.clear();
// }
// }
if (clientDiscoveries.size() > 10000) {
synchronized (clientDiscoveries) {
clientDiscoveryService.handleIdentities(OffsetDateTime.now(), OffsetDateTime.now(), clientDiscoveries);
clientDiscoveries.clear();
}
}

}
}, service);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@

import java.io.Serializable;
import java.time.OffsetDateTime;
import java.util.Set;

import java.util.Collection;
@Service
public class ClientDiscoveryService {
private final ClientDiscoveryRepository clientDiscoveryRepository;
Expand All @@ -25,7 +24,7 @@ public ClientDiscoveryService(ClientDiscoveryRepository clientDiscoveryRepositor
this.clientDiscoveryRepository = clientDiscoveryRepository;
}

public void handleIdentities(OffsetDateTime timeForFoundAt, OffsetDateTime timeForLastSeenAt, Set<ClientIdentity> clientIdentities) {
public void handleIdentities(OffsetDateTime timeForFoundAt, OffsetDateTime timeForLastSeenAt, Collection<ClientIdentity> clientIdentities) {
meterRegistry.counter("sparkle_client_discovery_processed").increment();
for (ClientIdentity ci : clientIdentities) {
clientDiscoveryRepository.saveIgnoreConflict(ci.hash(), ByteUtil.filterUTF8(ci.getClientName()), ByteUtil.filterUTF8(ci.getPeerId()), timeForFoundAt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.ghostchu.btn.sparkle.module.userapp.internal.UserApplication;
import com.ghostchu.btn.sparkle.util.*;
import com.ghostchu.btn.sparkle.util.ipdb.GeoIPManager;
import com.google.common.hash.BloomFilter;
import io.micrometer.core.instrument.MeterRegistry;
import jakarta.transaction.Transactional;
import lombok.Data;
Expand All @@ -32,6 +33,7 @@
import org.springframework.stereotype.Service;

import java.net.InetAddress;
import java.nio.charset.StandardCharsets;
import java.time.OffsetDateTime;
import java.util.*;

Expand Down Expand Up @@ -209,6 +211,7 @@ public long handlePeerHistories(InetAddress inetAddress, UserApplication userApp
long processed = 0;
var it = ping.getPeers().iterator();
var submitId = UUID.randomUUID().toString();
BloomFilter<String> bloomFilter = BloomFilter.create((from, into) -> into.putString(from, StandardCharsets.ISO_8859_1), 10_000_000, 0.01);
while (it.hasNext()) {
var peer = it.next();
var peerId = ByteUtil.filterUTF8(PeerUtil.cutPeerId(peer.getPeerId()));
Expand All @@ -235,15 +238,20 @@ public long handlePeerHistories(InetAddress inetAddress, UserApplication userApp
.lastTimeSeen(TimeUtil.toUTC(peer.getLastTimeSeen().getTime()))
.submitterIp(inetAddress)
.build());
identitySet.add(new ClientIdentity(peerId, peerClientName));
if (bloomFilter.put(peerId + "@@@" + peerClientName)) {
identitySet.add(new ClientIdentity(peerId, peerClientName));
}
// 避免爆内存,必须及时清理
it.remove();
if (identitySet.size() >= 5000 || peerHistoryList.size() >= 5000) {
if (peerHistoryList.size() >= 5000) {
peerHistoryService.saveHistories(peerHistoryList);
clientDiscoveryService.handleIdentities(now, now, identitySet);
meterRegistry.counter("sparkle_ping_histories_processed").increment(peerHistoryList.size());
processed += peerHistoryList.size();
peerHistoryList.clear();
}

if (identitySet.size() > 5000) {
clientDiscoveryService.handleIdentities(now, now, identitySet);
identitySet.clear();
}
}
Expand Down

0 comments on commit ef97c0d

Please sign in to comment.