-
Notifications
You must be signed in to change notification settings - Fork 0
/
calculate_latency.py
158 lines (125 loc) · 4.87 KB
/
calculate_latency.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import socket
import os
import sys
import time
from multiprocessing import Process
from RDT import RDTSocket as RDTSocket
from Header import RDTHeader
Speed_RDT = 0
Speed_UDP = 0
source_address = ('127.0.0.1', 12334)
target_address = ('127.0.0.1', 12335)
# sys.stdout = open('output.txt', 'w')
def UDP_send(ip, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_address = (ip, port)
try:
for i in range(0, 100):
sock.sendto(str(float(time.time() * 1000)).encode(), server_address)
# sock.sendto(str(float(time.time() * 1000)).encode(), server_address)
sock.sendto('end'.encode(), server_address)
except IOError as e:
print(f"An error occurred: {e}")
finally:
sock.close()
def UDP_receive(ip, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((ip, port))
spend_time = 0
time_list = []
try:
while True:
data, addr = sock.recvfrom(200)
# spend_time = float(time.time() * 1000) - float(data.decode())
# print(spend_time)
if data == b'end':
break
else:
spend_time = float(time.time() * 1000) - float(data.decode())
time_list.append(spend_time)
time_list.sort()
print(f"{time_list[49]}")
except IOError as e:
print(f"An error occurred: {e}")
finally:
sock.close()
# print(f"UDP lantency : {spend_time} ms")
# print(f"UDP lantency : {sum(time_list)/ len(time_list)} ms")
print(f"UDP lantency : {time_list[-1]} ms")
def UDP_start_test(port=12349):
sender = Process(target=UDP_send, args=("localhost", port))
receiver = Process(target=UDP_receive, args=("localhost", port))
receiver.start()
time.sleep(1)
sender.start()
sender.join()
receiver.join()
def RDT_start_test():
sender = Process(target=RDT_send, args=(source_address, target_address))
receiver = Process(target=RDT_receive, args=(target_address,))
receiver.start()
time.sleep(1)
sender.start()
sender.join()
receiver.join()
def RDT_send(source_address, target_address):
"""
You need to send the system's timestamp and use it to calculate the lantency of a communication. The following code is
a reference code that you can modify due to differences in implementation.
Note that the lantency is calculated between the time the sender calls the function send() to send data and the time the receiver calls the function recv().
params:
target_address: Target IP address and its port
source_address: Source IP address and its port
"""
#############################################################################
# HINT: Since the lantency test could be finished locally. So you could
# assign the ProxyServerAddress as your true target address directly.
# you can change this code base on your implementation.
#############################################################################
source_address = ('127.0.0.1', 22334)
target_address = ('127.0.0.1', 22335)
sock = RDTSocket()
sock.bind(source_address)
sock.connect(target_address)
time.sleep(1)
sock.send(data=str(float(time.time() * 1000)))
time.sleep(1)
# sock.send(data=str(float(time.time())))
sock.close()
def RDT_receive(source_address):
"""
You need to send the system's timestamp and use it to calculate the lantency of a communication. The following code is
a reference code that you can modify due to differences in implementation.
Note that the lantency is calculated between the time the sender calls the function send() to send data and the time the receiver calls the function recv().
params:
source_address: Source IP address and its port
"""
#############################################################################
# HINT: Since the lantency test could be finished locally. So you could
# assign the ProxyServerAddress as your true target address directly.
# you can change this code base on your implementation.
#############################################################################
source_address = ('127.0.0.1',22335)
sock = RDTSocket()
sock.bind(source_address)
server_sock = sock.accept()
data = server_sock.recv()
spend_time = float(time.time() * 1000) - float(data)
# spend_time = time.time() - float(data)
print(f"RDT lantency : {spend_time} ms")
server_sock.close()
sock.close()
def test_latency():
# UDP
try:
UDP_start_test()
except Exception as e:
print(e)
# Yours
try:
# for i in range(50):
RDT_start_test()
except Exception as e:
print(e)
if __name__ == '__main__':
test_latency()