Skip to content

Commit

Permalink
Add monitoring capability
Browse files Browse the repository at this point in the history
Allow nDPI to process the entire flows and not only the first N packets.
Usefull when the application is interested in some metadata spanning the
entire life of the session.

As initial step, only STUN flows can be put in monitoring.

See `doc/monitoring.md` for further details.

This feature is disabled by default.

Close #2583
  • Loading branch information
IvanNardi committed Oct 13, 2024
1 parent ad9c574 commit e4390a2
Show file tree
Hide file tree
Showing 28 changed files with 809 additions and 41 deletions.
3 changes: 2 additions & 1 deletion doc/configuration_parameters.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,5 @@ TODO
| "openvpn" | "subclassification_by_ip" | enable | NULL | NULL | Enable/disable sub-classification of OpenVPN flows using server IP. Useful to detect the specific VPN application/app. At the moment, this knob allows to identify: Mullvad, NordVPN, ProtonVPN. |
| "wireguard" | "subclassification_by_ip" | enable | NULL | NULL | Enable/disable sub-classification of Wireguard flows using server IP. Useful to detect the specific VPN application/app. At the moment, this knob allows to identify: Mullvad, NordVPN, ProtonVPN. |
| $PROTO_NAME | "log" | disable | NULL | NULL | Enable/disable logging/debug for specific protocol. Use "any" as protocol name if you want to easily enable/disable logging/debug for all protocols |
| $PROTO_NAME | "ip_list.load" | 1 | NULL | NULL | Enable/disable loading of internal list of IP addresses (used for (sub)classification) specific to that protocol. Use "any" as protocol name if you want to easily enable/disable all lists. This knob is valid only for the following protocols: Alibaba, Amazon AWS, Apple, Avast, Bloomberg, Cachefly, Cloudflare, Discord, Disney+, Dropbox, Edgecast, EpicGames, Ethereum, Facebook, Github, Google, Google Cloud, GoTo, Hotspot Shield, Hulu, Line, Microsoft 365, Microsoft Azure, Microsoft One Drive, Microsoft Outlook, Mullvad, Netflix, NordVPN, Nvidia, OpenDNS, ProtonVPN, RiotGames, Roblox, Skype/Teams, Starcraft, Steam, SurfSharkVPN, Teamviewer, Telegram, Tencent, Threema, TOR, Twitch, Twitter, UbuntuONE, VK, Yandex, Yandex Cloud, Webex, Whatsapp, Zoom |
| $PROTO_NAME | "ip_list.load" | enable | NULL | NULL | Enable/disable loading of internal list of IP addresses (used for (sub)classification) specific to that protocol. Use "any" as protocol name if you want to easily enable/disable all lists. This knob is valid only for the following protocols: Alibaba, Amazon AWS, Apple, Avast, Bloomberg, Cachefly, Cloudflare, Discord, Disney+, Dropbox, Edgecast, EpicGames, Ethereum, Facebook, Github, Google, Google Cloud, GoTo, Hotspot Shield, Hulu, Line, Microsoft 365, Microsoft Azure, Microsoft One Drive, Microsoft Outlook, Mullvad, Netflix, NordVPN, Nvidia, OpenDNS, ProtonVPN, RiotGames, Roblox, Skype/Teams, Starcraft, Steam, SurfSharkVPN, Teamviewer, Telegram, Tencent, Threema, TOR, Twitch, Twitter, UbuntuONE, VK, Yandex, Yandex Cloud, Webex, Whatsapp, Zoom |
| $PROTO_NAME | "monitoring" | disable | NULL | NULL | Enable/disable monitoring state for this specific protocol. Use "any" as protocol name if you want to easily enable/disable monitoring feature for all protocols. This knob is valid only for the following protocols: Stun. Monitoring allows nDPI to process the entire flow (i.e. all its packets), without any limits. See doc/monitoring.md for further details |
23 changes: 23 additions & 0 deletions doc/monitoring.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@

# Monitoring

nDPI usually needs only a few packets per flow to get full classification and to get all the required metadata/flow_risks. After that point, nDPI stops processing the flow.
However, in some use cases, it might be useful to allow nDPI to process the *entire* flow (i.e. *all* its packets, without any limits). Some examples:
* to extract all the Stun metadata from a Stun flow
* to extract all the request/replay pairs from a DNS flow
In essence, monitoring allows the application to get the same metadata, multiple times, throughout the entire life of the session.

If monitoring is enable in a flow:
* structures `ndpi_flow->protos`, `ndpi_flow->http`, `ndpi_flow->stun`,... are populated as usual, usually with the *first* instance of the specific metadata. Nothing changed.
* packet by packet, the new structure `ndpi_flow->monitor` is populated with the metadata of the *current* packet. This information is lost when starting processing the next packet in the same flow; it is the responsibility of the application to get it.

In other words:
* "flow metadata" is saved in `ndpi_flow->protos`, `ndpi_flow->http`, `ndpi_flow->stun`, regardless of the monitoring feature being enabled or not
* "(curent) packet metadata" is saved in `ndpi_flow->monitor`, only if monitor is enabled

Monitoring must be explicit enabled with something like: `--cfg=stun,monitoring,1`

## Implementation notes

* Flows move to monitoring state only after extra-dissections end
* The classification doesn't change for flows in monitoring state
48 changes: 34 additions & 14 deletions example/ndpiReader.c
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ static int enable_malloc_bins = 0;
static int max_malloc_bins = 14;
int malloc_size_stats = 0;

int monitoring_enabled;

struct flow_info {
struct ndpi_flow_info *flow;
u_int16_t thread_id;
Expand Down Expand Up @@ -1605,18 +1607,30 @@ void print_bin(FILE *fout, const char *label, struct ndpi_bin *b) {

/* ********************************** */

static void print_ndpi_address_port_file(FILE *out, const char *label, ndpi_address_port *ap) {
if(ap->port != 0) {
char buf[INET6_ADDRSTRLEN];
static void print_ndpi_address_port_list_file(FILE *out, const char *label, ndpi_address_port_list *list) {
unsigned int i;
ndpi_address_port *ap;

if(ap->is_ipv6) {
inet_ntop(AF_INET6, &ap->address, buf, sizeof(buf));
fprintf(out, "[%s: [%s]:%u]", label, buf, ap->port);
} else {
inet_ntop(AF_INET, &ap->address, buf, sizeof(buf));
fprintf(out, "[%s: %s:%u]", label, buf, ap->port);
if(list->num_aps == 0)
return;
fprintf(out, "[%s: ", label);
for(i = 0; i < list->num_aps; i++) {
ap = &list->aps[i];
if(ap->port != 0) {
char buf[INET6_ADDRSTRLEN];

if(ap->is_ipv6) {
inet_ntop(AF_INET6, &ap->address, buf, sizeof(buf));
fprintf(out, "[%s]:%u", buf, ap->port);
} else {
inet_ntop(AF_INET, &ap->address, buf, sizeof(buf));
fprintf(out, "%s:%u", buf, ap->port);
}
if(i != list->num_aps - 1)
fprintf(out, ", ");
}
}
fprintf(out, "]");
}

/* ********************************** */
Expand Down Expand Up @@ -1989,11 +2003,11 @@ static void printFlow(u_int32_t id, struct ndpi_flow_info *flow, u_int16_t threa
}
}

print_ndpi_address_port_file(out, "Mapped IP/Port", &flow->stun.mapped_address);
print_ndpi_address_port_file(out, "Peer IP/Port", &flow->stun.peer_address);
print_ndpi_address_port_file(out, "Relayed IP/Port", &flow->stun.relayed_address);
print_ndpi_address_port_file(out, "Rsp Origin IP/Port", &flow->stun.response_origin);
print_ndpi_address_port_file(out, "Other IP/Port", &flow->stun.other_address);
print_ndpi_address_port_list_file(out, "Mapped IP/Port", &flow->stun.mapped_address);
print_ndpi_address_port_list_file(out, "Peer IP/Port", &flow->stun.peer_address);
print_ndpi_address_port_list_file(out, "Relayed IP/Port", &flow->stun.relayed_address);
print_ndpi_address_port_list_file(out, "Rsp Origin IP/Port", &flow->stun.response_origin);
print_ndpi_address_port_list_file(out, "Other IP/Port", &flow->stun.other_address);

if(flow->http.url[0] != '\0') {
ndpi_risk_enum risk = ndpi_validate_url(flow->http.url);
Expand Down Expand Up @@ -2986,6 +3000,12 @@ static void setupDetection(u_int16_t thread_id, pcap_t * pcap_handle,
fprintf(stderr, "Error ndpi_finalize_initialization: %d\n", ret);
exit(-1);
}

char buf[16];
if(ndpi_get_config(ndpi_thread_info[thread_id].workflow->ndpi_struct, "stun", "monitoring", buf, sizeof(buf)) != NULL) {
if(atoi(buf))
monitoring_enabled = 1;
}
}

/* *********************************************** */
Expand Down
177 changes: 172 additions & 5 deletions example/reader_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ extern char *addr_dump_path;
u_int8_t enable_doh_dot_detection = 0;
extern bool do_load_lists;
extern int malloc_size_stats;
extern int monitoring_enabled;

/* ****************************************************** */

Expand Down Expand Up @@ -554,6 +555,27 @@ static void ndpi_free_flow_tls_data(struct ndpi_flow_info *flow) {
ndpi_free(flow->ssh_tls.ja4_client_raw);
flow->ssh_tls.ja4_client_raw = NULL;
}

if(flow->stun.mapped_address.aps) {
ndpi_free(flow->stun.mapped_address.aps);
flow->stun.mapped_address.aps = NULL;
}
if(flow->stun.other_address.aps) {
ndpi_free(flow->stun.other_address.aps);
flow->stun.other_address.aps = NULL;
}
if(flow->stun.peer_address.aps) {
ndpi_free(flow->stun.peer_address.aps);
flow->stun.peer_address.aps = NULL;
}
if(flow->stun.relayed_address.aps) {
ndpi_free(flow->stun.relayed_address.aps);
flow->stun.relayed_address.aps = NULL;
}
if(flow->stun.response_origin.aps) {
ndpi_free(flow->stun.response_origin.aps);
flow->stun.response_origin.aps = NULL;
}
}

/* ***************************************************** */
Expand Down Expand Up @@ -1117,6 +1139,143 @@ static void dump_flow_fingerprint(struct ndpi_workflow * workflow,
ndpi_term_serializer(&serializer);
}


static void add_to_address_port_list(ndpi_address_port_list *list, ndpi_address_port *ap)
{
int new_num;
void *new_buf;
unsigned int i;

if(ap->port == 0)
return;

/* Avoid saving duplicates */
for(i = 0; i < list->num_aps; i++)
if(memcmp(&list->aps[i], ap, sizeof(*ap)) == 0)
return;

if(list->num_aps == list->num_aps_allocated) {
new_num = 1 + list->num_aps_allocated * 2;
new_buf = ndpi_realloc(list->aps, list->num_aps_allocated * sizeof(ndpi_address_port),
new_num * sizeof(ndpi_address_port));
if(!new_buf)
return;
list->aps = new_buf;
list->num_aps_allocated = new_num;
}
memcpy(&list->aps[list->num_aps++], ap, sizeof(ndpi_address_port));
}

/* ****************************************************** */

static void process_ndpi_monitoring_info(struct ndpi_flow_info *flow) {
if(!flow->ndpi_flow || !flow->ndpi_flow->monit)
return;

/* In theory, we should check only for STUN.
However since we sometimes might not have STUN in protocol classification
(because we have only two protocols in flow->ndpi_flow->detected_protocol_stack[])
we need to check also for the other "master" protocols set by STUN dissector
See at the beginning of the STUN c file for further details
*/
if(flow->detected_protocol.proto.app_protocol == NDPI_PROTOCOL_STUN ||
flow->detected_protocol.proto.master_protocol == NDPI_PROTOCOL_STUN ||
flow->detected_protocol.proto.app_protocol == NDPI_PROTOCOL_DTLS ||
flow->detected_protocol.proto.master_protocol == NDPI_PROTOCOL_DTLS ||
flow->detected_protocol.proto.app_protocol == NDPI_PROTOCOL_SRTP ||
flow->detected_protocol.proto.master_protocol == NDPI_PROTOCOL_SRTP) {

add_to_address_port_list(&flow->stun.mapped_address, &flow->ndpi_flow->monit->protos.dtls_stun_rtp.mapped_address);
add_to_address_port_list(&flow->stun.other_address, &flow->ndpi_flow->monit->protos.dtls_stun_rtp.other_address);
add_to_address_port_list(&flow->stun.peer_address, &flow->ndpi_flow->monit->protos.dtls_stun_rtp.peer_address);
add_to_address_port_list(&flow->stun.relayed_address, &flow->ndpi_flow->monit->protos.dtls_stun_rtp.relayed_address);
add_to_address_port_list(&flow->stun.response_origin, &flow->ndpi_flow->monit->protos.dtls_stun_rtp.response_origin);
}

}

/* ****************************************************** */

static void serialize_monitoring_metadata(struct ndpi_flow_info *flow)
{
unsigned int i;
char buf[64];

if(!flow->ndpi_flow->monit)
return;

ndpi_serialize_start_of_block(&flow->ndpi_flow_serializer, "monitoring");

switch(flow->detected_protocol.proto.master_protocol ? flow->detected_protocol.proto.master_protocol : flow->detected_protocol.proto.app_protocol) {
case NDPI_PROTOCOL_STUN:
case NDPI_PROTOCOL_DTLS:
case NDPI_PROTOCOL_SRTP:

ndpi_serialize_start_of_block(&flow->ndpi_flow_serializer, "stun");

if(flow->stun.mapped_address.num_aps > 0) {
ndpi_serialize_start_of_list(&flow->ndpi_flow_serializer, "mapped_address");
for(i = 0; i < flow->stun.mapped_address.num_aps; i++) {
if(flow->stun.mapped_address.aps[i].port > 0) {
ndpi_serialize_string_string(&flow->ndpi_flow_serializer, "mapped_address",
print_ndpi_address_port(&flow->stun.mapped_address.aps[i], buf, sizeof(buf)));
}
}
ndpi_serialize_end_of_list(&flow->ndpi_flow_serializer);
}

if(flow->stun.other_address.num_aps > 0) {
ndpi_serialize_start_of_list(&flow->ndpi_flow_serializer, "other_address");
for(i = 0; i < flow->stun.other_address.num_aps; i++) {
if(flow->stun.other_address.aps[i].port > 0) {
ndpi_serialize_string_string(&flow->ndpi_flow_serializer, "other_address",
print_ndpi_address_port(&flow->stun.other_address.aps[i], buf, sizeof(buf)));
}
}
ndpi_serialize_end_of_list(&flow->ndpi_flow_serializer);
}

if(flow->stun.peer_address.num_aps > 0) {
ndpi_serialize_start_of_list(&flow->ndpi_flow_serializer, "peer_address");
for(i = 0; i < flow->stun.peer_address.num_aps; i++) {
if(flow->stun.peer_address.aps[i].port > 0) {
ndpi_serialize_string_string(&flow->ndpi_flow_serializer, "peer_address",
print_ndpi_address_port(&flow->stun.peer_address.aps[i], buf, sizeof(buf)));
}
}
ndpi_serialize_end_of_list(&flow->ndpi_flow_serializer);
}

if(flow->stun.relayed_address.num_aps > 0) {
ndpi_serialize_start_of_list(&flow->ndpi_flow_serializer, "relayed_address");
for(i = 0; i < flow->stun.relayed_address.num_aps; i++) {
if(flow->stun.relayed_address.aps[i].port > 0) {
ndpi_serialize_string_string(&flow->ndpi_flow_serializer, "relayed_address",
print_ndpi_address_port(&flow->stun.relayed_address.aps[i], buf, sizeof(buf)));
}
}
ndpi_serialize_end_of_list(&flow->ndpi_flow_serializer);
}

if(flow->stun.response_origin.num_aps > 0) {
ndpi_serialize_start_of_list(&flow->ndpi_flow_serializer, "response_origin");
for(i = 0; i < flow->stun.response_origin.num_aps; i++) {
if(flow->stun.response_origin.aps[i].port > 0) {
ndpi_serialize_string_string(&flow->ndpi_flow_serializer, "response_origin",
print_ndpi_address_port(&flow->stun.response_origin.aps[i], buf, sizeof(buf)));
}
}
ndpi_serialize_end_of_list(&flow->ndpi_flow_serializer);
}

ndpi_serialize_end_of_block(&flow->ndpi_flow_serializer); /* stun */

break;
}

ndpi_serialize_end_of_block(&flow->ndpi_flow_serializer);
}

/* ****************************************************** */

void process_ndpi_collected_info(struct ndpi_workflow * workflow, struct ndpi_flow_info *flow) {
Expand Down Expand Up @@ -1416,11 +1575,13 @@ void process_ndpi_collected_info(struct ndpi_workflow * workflow, struct ndpi_fl
}
}

memcpy(&flow->stun.mapped_address, &flow->ndpi_flow->stun.mapped_address, sizeof(ndpi_address_port));
memcpy(&flow->stun.peer_address, &flow->ndpi_flow->stun.peer_address, sizeof(ndpi_address_port));
memcpy(&flow->stun.relayed_address, &flow->ndpi_flow->stun.relayed_address, sizeof(ndpi_address_port));
memcpy(&flow->stun.response_origin, &flow->ndpi_flow->stun.response_origin, sizeof(ndpi_address_port));
memcpy(&flow->stun.other_address, &flow->ndpi_flow->stun.other_address, sizeof(ndpi_address_port));
if(!monitoring_enabled) {
add_to_address_port_list(&flow->stun.mapped_address, &flow->ndpi_flow->stun.mapped_address);
add_to_address_port_list(&flow->stun.peer_address, &flow->ndpi_flow->stun.peer_address);
add_to_address_port_list(&flow->stun.relayed_address, &flow->ndpi_flow->stun.relayed_address);
add_to_address_port_list(&flow->stun.response_origin, &flow->ndpi_flow->stun.response_origin);
add_to_address_port_list(&flow->stun.other_address, &flow->ndpi_flow->stun.other_address);
}

flow->multimedia_flow_type = flow->ndpi_flow->flow_multimedia_type;

Expand Down Expand Up @@ -1478,6 +1639,10 @@ void process_ndpi_collected_info(struct ndpi_workflow * workflow, struct ndpi_fl
ndpi_serialize_string_uint32(&flow->ndpi_flow_serializer, "detection_completed", flow->detection_completed);
ndpi_serialize_string_uint32(&flow->ndpi_flow_serializer, "check_extra_packets", flow->check_extra_packets);

if(flow->ndpi_flow->monitoring) {
serialize_monitoring_metadata(flow);
}

if(flow->server_hostname)
ndpi_serialize_string_string(&flow->ndpi_flow_serializer, "server_hostname", flow->server_hostname);
}
Expand Down Expand Up @@ -1792,6 +1957,8 @@ static struct ndpi_proto packet_processing(struct ndpi_workflow * workflow,
flow->detected_protocol = ndpi_detection_process_packet(workflow->ndpi_struct, ndpi_flow,
iph ? (uint8_t *)iph : (uint8_t *)iph6,
ipsize, time_ms, &input_info);
if(monitoring_enabled)
process_ndpi_monitoring_info(flow);
enough_packets |= ndpi_flow->fail_with_unknown;
if(enough_packets || (flow->detected_protocol.proto.app_protocol != NDPI_PROTOCOL_UNKNOWN)) {
if((!enough_packets)
Expand Down
8 changes: 7 additions & 1 deletion example/reader_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,12 @@ enum info_type {
INFO_NATPMP,
};

typedef struct {
ndpi_address_port *aps;
unsigned int num_aps;
unsigned int num_aps_allocated;
} ndpi_address_port_list;

// flow tracking
typedef struct ndpi_flow_info {
u_int32_t flow_id;
Expand Down Expand Up @@ -303,7 +309,7 @@ typedef struct ndpi_flow_info {
} http;

struct {
ndpi_address_port mapped_address, peer_address,
ndpi_address_port_list mapped_address, peer_address,
relayed_address, response_origin, other_address;
} stun;

Expand Down
7 changes: 7 additions & 0 deletions fuzz/fuzz_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,13 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) {
ndpi_set_config(ndpi_info_mod, cfg_proto, "ip_list.load", cfg_value);
ndpi_get_config(ndpi_info_mod, cfg_proto, "ip_list.load", cfg_value, sizeof(cfg_value));
}
if(fuzzed_data.ConsumeBool()) {
pid = fuzzed_data.ConsumeIntegralInRange<u_int16_t>(0, NDPI_MAX_SUPPORTED_PROTOCOLS + 1); /* + 1 to trigger invalid pid */
value = fuzzed_data.ConsumeIntegralInRange(0, 1 + 1);
snprintf(cfg_value, sizeof(cfg_value), "%d", value);
snprintf(cfg_proto, sizeof(cfg_proto), "%d", pid);
ndpi_set_config(ndpi_info_mod, cfg_proto, "monitoring", cfg_value);
}
if(fuzzed_data.ConsumeBool()) {
value = fuzzed_data.ConsumeIntegralInRange(0, 255 + 1);
snprintf(cfg_value, sizeof(cfg_value), "%d", value);
Expand Down
2 changes: 2 additions & 0 deletions fuzz/fuzz_ndpi_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ int malloc_size_stats = 0;
FILE *fingerprint_fp = NULL;
bool do_load_lists = false;
char *addr_dump_path = NULL;
int monitoring_enabled = 0;

extern void ndpi_report_payload_stats(FILE *out);

Expand Down Expand Up @@ -95,6 +96,7 @@ int LLVMFuzzerTestOneInput(const uint8_t *Data, size_t Size) {
ndpi_set_config(workflow->ndpi_struct, "openvpn", "dpi.heuristics.num_messages", "255");
ndpi_set_config(workflow->ndpi_struct, "tls", "dpi.heuristics", "0x07");
ndpi_set_config(workflow->ndpi_struct, "tls", "dpi.heuristics.max_packets_extra_dissection", "255");
ndpi_set_config(workflow->ndpi_struct, "stun", "monitoring", "1");

ndpi_finalize_initialization(workflow->ndpi_struct);

Expand Down
1 change: 1 addition & 0 deletions fuzz/fuzz_readerutils_parseprotolist.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ int malloc_size_stats = 0;
FILE *fingerprint_fp = NULL;
bool do_load_lists = false;
char *addr_dump_path = NULL;
int monitoring_enabled = 0;

extern "C" int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) {
FuzzedDataProvider fuzzed_data(data, size);
Expand Down
1 change: 1 addition & 0 deletions fuzz/fuzz_readerutils_workflow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ int malloc_size_stats = 0;
FILE *fingerprint_fp = NULL;
bool do_load_lists = false;
char *addr_dump_path = NULL;
int monitoring_enabled = 0;

extern "C" int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) {
FuzzedDataProvider fuzzed_data(data, size);
Expand Down
Loading

0 comments on commit e4390a2

Please sign in to comment.