Java implementation of the Bully Algorithm for determining a Coordinator in a distributed network.
All configuration is defined inside "Consts.java" file.
Feel free to play around with the parameters.
- There's a finite number of N nodes (where N can be arbitrarily large).
- Each node knows the range of ports the nodes occupy (by defining a start port and max number of nodes variables).
- If a node crashes before sending the coordinator the answer to its chunk, the coordinator announces the minimum of all the responses it received (from the other working nodes).
- Any node can crash at any time, including the coordinator. The operation should be robust enough to continue normally, even if it takes a few message exchanges to reach steady state.
- When a new node joins the network, it starts a new election (even though in my code it already knows whether or not it has the highest PID), since this was demanded explicity in your email regarding the task.
- If a node crashes, we don't start an election process, unless it was the coordinator node.
- A new task is only done when a node becomes the new coordinator. The task is not repeated afterwards as this was not demanded in the task.
- Architecture is decentralized (P2P), nodes directly communicate with each other, and any node listening on any port can become the coordinator.
- There is no fixed port for the coordinator.
- All communication is done through sockets using TCP (to ensure no loss of data).
- Any sending IO work or CPU work is delegated to a dedicated thread. While the main thread does all the receiving. (Threads communicate among the same process and aren't used for any Inter-Process Communication). This is to avoid any blocking operations that sacrifice performance.
- In cases such as broadcast and task messages, I chose to allocate a thread to each node as opposed to doing it sequentially. This is to avoid cascading delays from timeouts when nodes crash. This may be suboptimal if the max number of nodes is large and the Rate of Failure is low. This is a tradeoff to be considered depending on the context.
- The coordinator periodically sends ALIVE messages to all other nodes.
- Each node has a timer, if the coordinator fails to send an ALIVE message in the specified time, the node assumes the coordinator is dead.
- If the node has the highest PID, it becomes the coordinator and sends a VICTORY message to all the other nodes.
- Else, it starts a new election by sending an ELECTION message to all the nodes that have a higher PID than itself.
- When a node receives an ELECTION message
- It replies with an ANSWER only if the node asking for an election has a lower PID than itself, and then starts an election process itself.
- When a node receives an ANSWER message from a process with higher PID, it stops sending ELECTION messages (stops electing itself), and waits for a VICTORY message. If no VICTORY message arrives, it re-starts the election process.
- If a node send an ELECTION message and receives no ANSWER messages, it becomes the new coordinator and broadcasts a VICTORY message to all nodes (implemented in a threading manner).
- Any node that receives a VICTORY message treats the sender as the coordinator
- Once a new coordinator is announced, it generates random N numbers and splits them evenly into chunks, sending 1 chunk to each process in a TASK message.
- The processes calculate the minimum number in their chunks by spawning a separate thread for this task, and send the result to the coordinator in a TASK_REPLY message.
- At any time: The threads responsible for sending the ALIVE messages inform the coordinator when a node is down. Note that the other nodes aren't informed of this failure, as this doesn't affect our simulation, as only the coordinator needs to know this information when sending the task.
- After receiving a chunk: The coordinator starts a timer after sending the TASK messages. If the timer expires, it prints the minimum response it received while alerting the user that some nodes did not reply.
- Coordinator crashing: All nodes have a timer, if they don't receive an ALIVE message during this time, they assume coordinator is dead, and a new election is started (by many nodes simultaneously)
- New coordinator crashing before sending victory: Each node starts a timer when it receives an ANSWER from a node with higher PID and is thus awaiting a victory. If no victory is received, it starts a new election
- A new node process is started.
- The node searches over our entire port range, sends a GREETING msg to functioning nodes, and ignores unoccupied ports.
- Now all nodes know about our new node, and our node knows our entire network.
- The coordinator deliberately does not send ALIVE messages to any new node, in order to satisfy the requirement that new nodes should start a new election request in the task.
- The node with the highest PID wins and becomes the new coordinator.
- The new coordinator starts sending ALIVE messages periodically to all nodes.
- It also broadcasts a VICTORY message to all nodes.
- it generates an array of random ints and splits it into chunks, sending 1 chunk to each other node.
- Coordinator receives the minimum of each chunk and calculates the global array minimum and displays it.
- Steady state is reached.
- Tested with several number of nodes (up to 10).
- Tested with relatively large arrays (up to 10K), any more than that the terminal gets flooded and timeouts occur due to receiver thread taking a long time to process the message content.
- Tested new node entering the network the highest PID.
- Testing any node failing at any time.
- Tested coordinator failing.
- Tested starting a new node at several stages of our flow.
Run the bully-algorithm-Osama-Nabih.jar file in any command line terminal using.
java -jar bully-algorithm-Osama-Nabih
You can run as many instances as you'd like
We can simply terminate any process, including the coordinator, by pressing Ctrl + C in the terminal, or closing the terminal window.
Each message is logged in the following format
Node <process_pid> received at <timestamp> Message [type=<msg_type>, senderPort=<msg_sender_port>, senderPid=<msg_sender_pid>, content=<msg_content>]
Important information is also logged, such as
- Any timeout.
- Any failing process.
- Starting a new election.
- Peers' discovery.
- Winning the elections.
- Becoming the coordinator.
- Victory broadcast.
- Starting the task.
I made decent refactoring effort to the code, by making a State abstract class and having Init, Running, Electing, PendingVictory, and Coordinating inherit from it, hence splitting the logic of each state entirely.
This resulted in a much cleaner code, unfortunately I didn't have the time to test it.
You can find that code in the refactoring branch.