Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify IO statistics collection for UdpSocketHandler #1018

Merged
merged 1 commit into from
Dec 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 26 additions & 211 deletions src/freenet/io/comm/IOStatisticCollector.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,221 +4,36 @@
package freenet.io.comm;

import java.net.InetAddress;
import java.util.Arrays;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.Map;

import freenet.support.Logger;
import freenet.support.Logger.LogLevel;
import freenet.support.transport.ip.IPUtil;

public class IOStatisticCollector {
public static final int STATISTICS_ENTRIES = 10;
public static final int STATISTICS_DURATION_S = 30;
public static final int STATISTICS_DURATION = 1000*STATISTICS_DURATION_S;
private long lastrotate;

private static boolean logDEBUG;
private long totalbytesin;
private long totalbytesout;
private final LinkedHashMap<String, StatisticEntry> targets;
static boolean ENABLE_PER_ADDRESS_TRACKING = false;

public IOStatisticCollector() {
targets = new LinkedHashMap<String, StatisticEntry>();
// TODO: only for testing!!!!
// This should only happen once
//SNMPAgent.create();
//SNMPStarter.initialize();
logDEBUG = Logger.shouldLog(LogLevel.DEBUG, this);
}

public void addInfo(InetAddress addr, int port, int inbytes, int outbytes, boolean isLocal) {
try {
synchronized (this) {
_addInfo(addr, port, inbytes, outbytes, isLocal);
}
} catch (Throwable t) {
t.printStackTrace();
}
}

private void _addInfo(InetAddress addr, int port, int inbytes, int outbytes, boolean isLocal) {
rotate();
if(ENABLE_PER_ADDRESS_TRACKING) {
String key = addr + ":" + port;
StatisticEntry entry = targets.get(key);
if (entry == null) {
entry = new StatisticEntry();
targets.put(key, entry);
}
entry.addData(Math.max(inbytes, 0), Math.max(outbytes, 0));
}
if(!isLocal) {
synchronized(this) {
totalbytesout += Math.max(outbytes, 0);
totalbytesin += Math.max(inbytes, 0);
if(logDEBUG)
Logger.debug(IOStatisticCollector.class, "Add("+addr+":"+port+ ',' +inbytes+ ',' +outbytes+" -> "+totalbytesin+" : "+totalbytesout);
}
}
}

public void dumpInfo() {
synchronized (this) {
_dumpInfo();
}
}
private long totalBytesIn;
private long totalBytesOut;

public long[] getTotalIO() {
synchronized (this) {
return _getTotalIO();
}
}

private long[] _getTotalIO() {
long ret[] = new long[2];
synchronized(this) {
ret[0] = totalbytesout;
ret[1] = totalbytesin;
}
return ret;
}

public int[][] getTotalStatistics() {
synchronized (this) {
return _getTotalStatistics();
}
}
public void reportReceivedBytes(InetAddress addr, int bytes) {
if (isLocal(addr) || bytes <= 0) {
return;
ArneBab marked this conversation as resolved.
Show resolved Hide resolved
}
synchronized (this) {
totalBytesIn += bytes;
}
}

private int[][] _getTotalStatistics() {
//String[] keys = (String[])targets.keySet().toArray();
int ret[][] = new int[STATISTICS_ENTRIES][2];
for (int i = 0 ; i < STATISTICS_ENTRIES ; i++) {
ret[i][0] = ret[i][1] = 0;
}

for (Map.Entry<String,StatisticEntry> entry : targets.entrySet()) {
int inres[] = entry.getValue().getRecieved();
int outres[] = entry.getValue().getSent();
for (int i = 0 ; i < STATISTICS_ENTRIES ; i++) {
ret[i][1] += inres[i];
ret[i][0] += outres[i];
}
}

return ret;
}

private void _dumpInfo() {
rotate();
//DateFormat df = DateFormat.getDateInstance(DateFormat.LONG, Locale.FRANCE);
//System.err.println(DateFormat.getDateInstance().format(new Date()));
System.err.println(new Date());
final double divby = STATISTICS_DURATION_S*1024;
for (Map.Entry<String,StatisticEntry> entry : targets.entrySet()) {
String key = entry.getKey();
int inres[] = entry.getValue().getRecieved();
int outres[] = entry.getValue().getSent();
System.err.print((key + " ").substring(0,22) + ": ");
int tin = 0;
int tout = 0;

for (int i = 0 ; i < inres.length ; i++) {
// in/out in 102.4 bytes (hecto-bytes)
tin += inres[i];
tout += outres[i];

int in = (int) ((tin*10.0) / (divby*(i+1)));
int out =(int) ((tout*10.0) /(divby*(i+1)));

System.err.print("i:" + (in/10) + '.' + (in%10));
System.err.print(" o:" + (out/10) + '.' + (out%10));
System.err.print(" \t");
}
System.err.println();
}
System.err.println();
}

private void rotate() {
long now = System.currentTimeMillis();
if ((now - lastrotate) >= STATISTICS_DURATION) {
lastrotate = now;
Object[] keys = targets.keySet().toArray();
if(keys == null) return; // Why aren't we iterating there ?
for(int i = 0 ; i < keys.length ; i++) {
Object key = keys[i];
if (targets.get(key).rotate() == false)
targets.remove(key);
}
// FIXME: debugging
//_dumpInfo();
}
}
public void reportSentBytes(InetAddress addr, int bytes) {
if (isLocal(addr) || bytes <= 0) {
return;
}
synchronized (this) {
totalBytesOut += bytes;
}
}


/*
* to thead each update.... heavy stuff
private class StatisticUpdater implements Runnable {
private IOStatisticCollector sc;
private String key;
private int inbytes;
private int outbytes;

public StatisticUpdater(IOStatisticCollector sc, String key,
int inbytes, int outbytes) {
this.sc = sc;
this.key = key;
this.inbytes = inbytes;
this.outbytes = outbytes;
new Thread(this, "IOStatisticCollector$StatisticUpdater").run();
}

public void run() {

}
}
*/



private static class StatisticEntry {
private int recieved[];
private int sent[];

public StatisticEntry() {
// Create a new array and clear it
recieved = new int[IOStatisticCollector.STATISTICS_ENTRIES+1];
sent = new int[IOStatisticCollector.STATISTICS_ENTRIES+1];
for (int i = 0 ; i < recieved.length ; i++) {
recieved[i] = sent[i] = 0;
}
}

public void addData(int inbytes, int outbytes) {
recieved[0] += inbytes;
sent[0] += outbytes;
}

public boolean rotate() {
boolean hasdata = false;
for (int i = recieved.length - 1 ; i > 0 ; i--) {
recieved[i] = recieved[i-1];
sent[i] = sent[i-1];
hasdata |= (recieved[i] > 0) || (sent[i] > 0);
}
recieved[0] = sent[0] = 0;
return hasdata;
}

public int[] getRecieved() {
return Arrays.copyOfRange(recieved, 1, 1 + IOStatisticCollector.STATISTICS_ENTRIES);
}

public int[] getSent() {
return Arrays.copyOfRange(sent, 1, 1 + IOStatisticCollector.STATISTICS_ENTRIES);
}

}
public synchronized long[] getTotalIO() {
return new long[]{totalBytesOut, totalBytesIn};
}

private static boolean isLocal(InetAddress address) {
return address.isLinkLocalAddress() || address.isLoopbackAddress() || IPUtil.isSiteLocalAddress(address);
}
}
16 changes: 5 additions & 11 deletions src/freenet/io/comm/UdpSocketHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import freenet.node.PrioRunnable;
import freenet.support.Logger;
import freenet.support.io.NativeThread;
import freenet.support.transport.ip.IPUtil;
import sun.misc.Unsafe;

public class UdpSocketHandler implements PrioRunnable, PacketSocketHandler, PortForwardSensitiveSocketHandler {
Expand All @@ -48,7 +47,7 @@ public class UdpSocketHandler implements PrioRunnable, PacketSocketHandler, Port
private final String title;
private boolean _started;
private long startTime;
private final IOStatisticCollector collector;
private final IOStatisticCollector ioStatistics;

static {
Logger.registerClass(UdpSocketHandler.class);
Expand Down Expand Up @@ -124,9 +123,9 @@ public static boolean setAddressPreference(DatagramChannel channel, SOCKET_ADDR_
}
}

public UdpSocketHandler(int listenPort, InetAddress bindToAddress, Node node, long startupTime, String title, IOStatisticCollector collector) throws IOException {
public UdpSocketHandler(int listenPort, InetAddress bindToAddress, Node node, long startupTime, String title, IOStatisticCollector ioStatistics) throws IOException {
this.node = node;
this.collector = collector;
this.ioStatistics = ioStatistics;
this.title = title;
localAddress = new InetSocketAddress(bindToAddress, listenPort);
datagramChannel = DatagramChannel.open()
Expand Down Expand Up @@ -268,9 +267,8 @@ private InetSocketAddress receive() {
receiveBuffer.clear();
InetSocketAddress remote = (InetSocketAddress) datagramChannel.receive(receiveBuffer);
receiveBuffer.flip();
int port = remote.getPort();
InetAddress address = remote.getAddress();
collector.addInfo(address, port, getHeadersLength(address) + receiveBuffer.limit(), 0, isLocal(address));
ioStatistics.reportReceivedBytes(address, getHeadersLength(address) + receiveBuffer.limit());
return remote;
} catch (SocketTimeoutException e1) {
return null;
Expand Down Expand Up @@ -319,7 +317,7 @@ public void sendPacket(byte[] blockToSend, Peer destination, boolean allowLocalA
try {
datagramChannel.send(packet, new InetSocketAddress(address, port));
tracker.sentPacketTo(destination);
collector.addInfo(address, port, 0, getHeadersLength(address) + blockToSend.length, isLocal(address));
ioStatistics.reportSentBytes(address, getHeadersLength(address) + blockToSend.length);
if (logMINOR) {
Logger.minor(this, "Sent packet length " + blockToSend.length + " to " + address + ':' + port);
}
Expand Down Expand Up @@ -464,8 +462,4 @@ public long getStartTime() {
return startTime;
}

private static boolean isLocal(InetAddress address) {
return address.isLinkLocalAddress() || address.isLoopbackAddress() || IPUtil.isSiteLocalAddress(address);
}

}