-
Notifications
You must be signed in to change notification settings - Fork 0
/
mllp_spammer.py
133 lines (118 loc) · 6.45 KB
/
mllp_spammer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# version 0.0.2 - try is now catching sockets connect error | added -mode flag for "spam" or "once" sending
# version 0.0.1 - first working version, little error handling
# ----
# requirements
# pip install APScheduler
import argparse, textwrap
import socket
from datetime import datetime
import uuid # generate an id for each send
# need to pip install apscheduler, use Blocking so we can cancel their running from command line
from apscheduler.schedulers.blocking import BlockingScheduler
# sample test message
sample_hl7 = """\x0bMSH|^~\&|Primary|PIEDMONT ATLANTA HOSPITAL|CL|PDMT|20200319170944|ADTPA|ADT^A01|203478||2.8.1|||||||||||
EVN|A01|20200319170944||ADT_EVENT|^ADT^PATIENT^ACCESS^^^^^PHC^^^^^PAH|20200319170941|
PID|1||E3866033^^^EPIC^MRN~900091282^^^EPI^MR||TEST^PAHSEVENACARDTELEM||19770919|F||White|8 TEST ST^^ATHENS^GA^30605^USA^P^^CLARKE|CLARKE|(999)999-9999^P^PH^^^999^9999999||ENG|MARRIED|UNKNOWN|1000046070|000-00-0000|||NOT HISPANIC||N||||||N||
PD1|||PIEDMONT ATLANTA HOSPITAL^^10500|1750426920^SMITH III^GEORGE^^^^^^NPI^^^^NPI~81923^SMITH III^GEORGE^^^^^^STARPC^^^^STARID~SMIGE^SMITH III^GEORGE^^^^^^MT^^^^MTID||||||||||||||
NK1|1|TEST^SPOUSE^^|Spouse||(888)888-8888^^PH^^^888^8888888||Emergency Contact 1|||||||||||||||||||||||||||
NK1|2|||^^^^^USA|||Employer||||||CDC||||||||||||||||||||7403|Full
PV1|1|INPATIENT|TA7B^0107028^0107028^PIEDMONT ATLANTA HOSPITAL^^^^^^^DEPID|UR|||1093782799^SMITH^ANITA^^^^^^NPI^^^^NPI~52228^SMITH^ANITA^^^^^^STARPC^^^^STARID|||General Med||||Phys/Clinic|||1093782799^SMITH^ANITA^^^^^^NPI^^^^NPI~52228^SMITH^ANITA^^^^^^STARPC^^^^STARID||2017417374|UHC||||||||||||||||||||||||20200319170941||||||||||
PV2||Priv||||||20200319|||||||||||||n|N||||||||||N|||||||||||||||||
OBX|1|NM|11156-7^LEUKOCYTES^LN||||||||I|
OBX|2|NM|11273-0^ERYTHROCYTES^LN||4.06|tera.l-1||N|||P|||201410060627
OBX|3|NM|20509-6^HEMOGLOBIN^LN||||||||I|
OBX|4|NM|20570-8^HEMATOCRIT^LN||40.1|%||N|||P|||201410060830
OBX|5|NM|11125-2^PLATELETS^LN||221|giga.l-1||N|||F|||201410060830
AL1|1|DA|32264^NO KNOWN ALLERGIES^||AAA|201410060830
DG1|1||^injury|injury||^10151;EPT||||||||||||||||||||
PR1|2234|M11|111^CODE151|COMMON PROCEDURES|198809081123
GT1|1|90389|TEST^PAHSEVENACARDTELEM^^||8 TEST ST^^ATHENS^GA^30605^USA^^^CLARKE|(999)999-9999^^^^^999^9999999||19770919|F|P/F|SLF|000-00-0000||||CDC|^^^^^USA|||Full|||||||||||||||||||||||||||||
IN1|1|1070006^UHC PPO|10700|UHC|^^ATLANTA^GA^^|||||||20200219||||TEST^PAHSEVENACARDTELEM^^|Self|19770919|8 TEST ST^^ATHENS^GA^30605^USA^^^CLARKE|||1|||||||||||||13603|23423||||||Full|F|^^^^^USA|||BOTH||
IN2||000-00-0000|||Payor Plan||||||||||||||||||||||||||||||||||||||||||||||||||||||||23423||(999)999-9999^^^^^999^9999999|||||||CDC\x1c\x0d"""
# function for it
def mllp_transmit(host,port,message,add_input_padding='false',remove_output_padding='true'):
# try grab the reply
try:
# add padding as needed
message_temp = "\x0b" + message + "\x1c\x0d" if add_input_padding=='true' else message
message_bytes = bytes(message_temp,'utf-8')
# init socket
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
# set 5 second timeout
s.settimeout(5.0)
s.connect((host,port))
# create random uuid
id = str(uuid.uuid4())
# print(f'id = {id}')
# send message
s.sendall(message_bytes)
now_send = datetime.today().strftime('%Y-%m-%d %H:%M:%S')
# print(f'send datetime = {now_send}')
# the reply in bytes
reply_bytes = s.recv(1024)
now_rec = datetime.today().strftime('%Y-%m-%d %H:%M:%S')
# print(f'recv datetime = {now_rec}')
reply = reply_bytes.decode('utf-8')
if(remove_output_padding=='true'):
reply = reply.replace('\x0b','').replace('\x1c\x0d','')
# print(f'reply = {reply}')
# reply will now be csv style
# with columns id, sendtime, recvtime, reply msg
print(f'{id},{now_send},{now_rec},{reply}')
return reply
except socket.error as socketerror:
print(f'Socket Error - {socketerror} - perhaps start checking: network connectivity and your hl7 message')
finally:
s.close()
# this function runs until explicitly shut down, the scheduler doesn't stop
# assume we want a minimum send rate of 1 per second
def mllp_spammer(sends_per_sec,host,port,message,add_input_padding='false',remove_output_padding='true',mode='spam'):
if(mode=='spam'):
sec_interval = 1 / sends_per_sec
sched = BlockingScheduler()
# run the mllp_transmit function on this interval
sched.add_job(mllp_transmit,
args=[host,port,message,add_input_padding,remove_output_padding],
trigger='interval',
seconds=sec_interval)
sched.start()
else:
mllp_transmit(host,port,message,add_input_padding,remove_output_padding)
# Initialize parser
parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter)
# Adding optional argument
parser.add_argument("-sps", "--SendsPerSecond", help = textwrap.dedent('''Frequency of spamming MLLP transmit function,
e.g. 1 = 1 per second, 2 = 2 per second, etc
'''))
parser.add_argument("-host", "--Host", required=True, help = textwrap.dedent('''MLLP host, can be example.com or ip address 1.1.1.1
'''))
parser.add_argument("-p", "--Port", required=True, help = textwrap.dedent('''MLLP port number, e.g. 5000
'''))
parser.add_argument("-m", "--Message", required=False, help = textwrap.dedent('''The MLLP message, look up HL7 docs for examples
'''))
parser.add_argument("-aip", "--AddInputPadding", required=False, help = textwrap.dedent('''If your message has no leading or trailing characters that HL7 needs, this adds them in \\x0b + input_message + \\x1c\\x0d
'''))
parser.add_argument("-rop", "--RemoveOutputPadding", required=False, help = textwrap.dedent('''View the output of the message without the output padding, \\x0b + input_message + \\x1c
'''))
parser.add_argument("-mode", "--Mode", required=False, help = textwrap.dedent('''Choose spam or once, spam mode will send messages every second and once mode will just send once
'''))
# Read arguments from command line
# all args come through as strings
args = parser.parse_args()
sps = int(args.SendsPerSecond)
host = args.Host
port = int(args.Port)
message = args.Message
mode = args.Mode
# if args.Output:
# print("Displaying Output as: % s" % args.Output)
# if args.SendsPerSecond:
# print("Displaying SendsPerSecond as: % s" % args.SendsPerSecond)
# print(type(args.SendsPerSecond))
mllp_spammer(sends_per_sec=sps,
host=host,
port=port,
message=sample_hl7,
mode=mode
)