Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple queues (one per connection) [unitaryhack] #139

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 35 additions & 17 deletions qunetsim/components/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
import time
from inspect import signature
from queue import Queue
from threading import Thread

import matplotlib.pyplot as plt
import networkx as nx

from qunetsim.backends import EQSNBackend
from qunetsim.objects import Qubit, RoutingPacket, Logger, DaemonThread
from qunetsim.objects.daemon_thread import is_thread_alive
from qunetsim.utils.constants import Constants


Expand Down Expand Up @@ -38,8 +40,7 @@ def __init__(self):
self._quantum_routing_algo = nx.shortest_path
self._classical_routing_algo = nx.shortest_path
self._use_hop_by_hop = True
self._packet_queue = Queue()
self._stop_thread = False
self._packet_queues = {}
self._use_ent_swap = False
self._queue_processor_thread = None
self._delay = 0.1
Expand Down Expand Up @@ -178,6 +179,12 @@ def packet_drop_rate(self, drop_rate):
def arp(self):
return self.ARP

def add_queue(self, host_id):
self._packet_queues[host_id] = Queue()

def remove_queue(self, host_id):
del self._packet_queues[host_id]

@property
def num_hosts(self):
return len(self.arp.keys())
Expand All @@ -191,6 +198,7 @@ def add_host(self, host):
"""

Logger.get_instance().debug('host added: ' + host.host_id)
self.add_queue(host.host_id)
self.ARP[host.host_id] = host
self._update_network_graph(host)

Expand All @@ -214,6 +222,7 @@ def remove_host(self, host):

if host.host_id in self.ARP:
del self.ARP[host.host_id]
self.remove_queue(host.host_id)
if self.quantum_network.has_node(host.host_id):
self.quantum_network.remove_node(host.host_id)
if self.classical_network.has_node(host.host_id):
Expand Down Expand Up @@ -498,19 +507,21 @@ def transfer_qubits(r, s, original_sender=None):
i += 1
return True

def _process_queue(self):
def _process_queues(self):
"""
Runs a thread for processing the packets in the packet queue.
Runs multiple threads for processing the packets in the packet queues.
"""

while True:
def process_queue(packet_queue):
"""
A single thread processes the packet in a single queue.
Each host has it's own queue and thread for processing the queue.
"""
packet = packet_queue.get()

packet = self._packet_queue.get()

if not packet:
# Stop the network
self._stop_thread = True
break
# If None packet is received, then stop thread
if not packet.payload:
return

# Artificially delay the network
if self.delay > 0:
Expand All @@ -522,14 +533,14 @@ def _process_queue(self):
Logger.get_instance().log("PACKET DROPPED")
if packet.payload_type == Constants.QUANTUM:
packet.payload.release()
continue
return

sender, receiver = packet.sender, packet.receiver

if packet.payload_type == Constants.QUANTUM:
if not self._route_quantum_info(sender, receiver,
[packet.payload]):
continue
return

try:
if packet.protocol == Constants.RELAY and not self.use_hop_by_hop:
Expand Down Expand Up @@ -586,15 +597,20 @@ def _process_queue(self):
except Exception as e:
Logger.get_instance().error('Error in network: ' + str(e))

while True:
WingCode marked this conversation as resolved.
Show resolved Hide resolved
for host_name, queue in self._packet_queues.items():
if not queue.empty() and not is_thread_alive(host_name):
thread = Thread(target=process_queue, args=[queue], daemon=False, name=host_name)
thread.start()

def send(self, packet):
"""
Puts the packet to the packet queue of the network.

Args:
packet (Packet): Packet to be sent
"""

self._packet_queue.put(packet)
self._packet_queues.get(packet.sender).put(packet)

def stop(self, stop_hosts=False):
"""
Expand All @@ -606,8 +622,10 @@ def stop(self, stop_hosts=False):
if stop_hosts:
for host in self.ARP:
self.ARP[host].stop(release_qubits=True)
# Send None to queue to stop the queue
self.send(RoutingPacket(sender=host, receiver=None, protocol=None, payload_type=None, payload=None,
ttl=None, route=None))

self.send(None) # Send None to queue to stop the queue
if self._backend is not None:
self._backend.stop()
except Exception as e:
Expand All @@ -624,7 +642,7 @@ def start(self, nodes=None, backend=None):
self._backend = backend
if nodes is not None:
self._backend.start(nodes=nodes)
self._queue_processor_thread = DaemonThread(target=self._process_queue)
self._queue_processor_thread = DaemonThread(target=self._process_queues)

def draw_classical_network(self):
"""
Expand Down
6 changes: 6 additions & 0 deletions qunetsim/objects/daemon_thread.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import threading


def is_thread_alive(thread_name):
for thread in threading.enumerate():
if thread_name == thread:
return thread.isAlive()


class DaemonThread(threading.Thread):
""" A Daemon thread that runs a task until completion and then exits. """

Expand Down
2 changes: 1 addition & 1 deletion qunetsim/objects/packets/routing_packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def __init__(self, sender, receiver, protocol, payload_type, payload, ttl, route
ttl(int): Time-to-Live parameter
route (List): Route the packet takes to its target host.
"""
if not isinstance(payload, Packet):
if not isinstance(payload, Packet) and payload is not None:
raise ValueError("For the routing packet the payload has to be a packet.")

self._ttl = ttl
Expand Down