diff --git a/README.md b/README.md index 3adcfbe..4f770d9 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,8 @@ The following "special" binaries are included in the toolbox: Prepend a RFC3339 timestamp of nanosecond resolution to each line on stdin * **to_csv** Processes a crowsnest log file into a set of "topic-specific" csv files +* **udp_listen** + Eavesdrop onto UDP traffic and output to stdout, currently only supports multicast. ## Recipes @@ -36,7 +38,7 @@ The following are "recipes" for "run" commands that can be used with this image. * Injecting data from "any" source into a mqtt broker using the standard brefv format (examplified by a multicast stream). Every UDP packet gets base64-encoded and packaged into a brefv envelope and then published to the broker: ``` - socat -u UDP4-RECVFROM:60002,reuseaddr,ip-add-membership=239.192.0.2:enp2s0,fork SYSTEM:echo $$(base64 --wrap=0) \ + udp_listen --encode multicast 239.192.0.2 60002 --interface=172.16.6.1 \ | raw_to_brefv \ | mosquitto_pub -l -t '' ``` diff --git a/bin/udp_listen b/bin/udp_listen new file mode 100644 index 0000000..0e9877d --- /dev/null +++ b/bin/udp_listen @@ -0,0 +1,77 @@ +#!/usr/bin/env python3.9 + +""" +Command line utility tool for listening to udp traffic and +outputting to stdout. +""" + +import sys +import socket +import struct +import argparse +from base64 import b64encode + + +def process_multicast(args: argparse.Namespace): + """Listen in on multicast traffic and output to stdout + + Args: + args (argparse.Namespace): Command-line arguments + """ + + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, args.TTL) + sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, int(args.loopback)) + sock.setsockopt( + socket.IPPROTO_IP, socket.IP_MULTICAST_IF, socket.inet_aton(args.interface) + ) + sock.setsockopt( + socket.IPPROTO_IP, + socket.IP_ADD_MEMBERSHIP, + socket.inet_aton(args.group) + socket.inet_aton(args.interface), + ) + sock.bind((args.group, args.port)) + sock.setblocking(True) + + try: + while True: + data = sock.recvmsg(65535)[0] + sys.stdout.write( + (b64encode(data).decode() + "\n") if args.encode else data.decode() + ) + sys.stdout.flush() + finally: + sock.setsockopt( + socket.IPPROTO_IP, + socket.IP_DROP_MEMBERSHIP, + socket.inet_aton(args.group) + socket.inet_aton(args.interface), + ) + sock.close() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="UDP listener") + parser.add_argument( + "--encode", + action="store_true", + default=False, + help="base64 encode each packet content", + ) + + sub_commands = parser.add_subparsers() + + multicast_parser = sub_commands.add_parser("multicast") + multicast_parser.add_argument("group", type=str) + multicast_parser.add_argument("port", type=int) + multicast_parser.add_argument("--interface", type=str, default="0.0.0.0") + multicast_parser.add_argument("--loopback", type=bool, default=False) + multicast_parser.add_argument( + "--TTL", + type=int, + default=1, + ) + multicast_parser.set_defaults(func=process_multicast) + + args = parser.parse_args() + args.func(args)