-
Notifications
You must be signed in to change notification settings - Fork 35
Parallelism
A task scheduler is implemented in OCaml and runs in the qp_run
program.
The IRPF90 programs communicate with this scheduler to fetch new tasks to do.
The flexibility of IRPF90 enables to build micro-services to help a running
program. For example, if the computation of the AOs takes too much time, it is
possible to start on multiple compute nodes some tasks that will accelerate
the running program.
The typical scheme is the following:
-
The program (Fortran) asks
qp_run
to create a new queue for a state of the calculation -
The program adds multiple tasks to do to the queue
-
The program starts a collector thread that waits for the results computed by the workers
-
The program starts multiple worker threads that will fetch tasks to do from the queue, compute the corresponding task, and send the result directly to the collector. Then, the queue is informed that the task has been done
-
When the queue is empty and all workers have sent their results, the last worker receives from
qp_run
a control integer, and sends it to the collector thread -
The collector thread checks that the control integer is correct : this can be for instance the number of AOs to compute and the number of actually computed AOs.
-
The parallel section is terminated
The task scheduler (implemented in the TaskServer.ml
file) understands text
messages, and transforms them in typed messages. The list of understood
messages can be found in the of_string
function of the Message.ml
file.
When qp_run
starts, it tries to opens a ZeroMQ REP socket on a default port,
and tries again on different ports until a suitable port range is found.
The port is bound using both the TCP protocal and the inproc protocol to
enable both multi-threaded and distributed support.
To initiate a parallel job, a Newjob
message has to be sent to the scheduler,
with a job name (state
) that will be checked at every connection to the
scheduler.
new_job <state> <push_address_tcp> <push_address_inproc>
The collector thread needs to opens a PULL
socket bound both using the TCP
and the inproc protocols, and those endpoints need to be given together with
the Newjob
message.
Now, a new Queuing_system
instance is created. The Queuing_system
contains
- A list of tasks
- A list of connected clients (empty at the initialization)
- The subset of tasks still queued
- The subset of tasks currently running, and on which client they run
The format of tasks is up to the user : it is just a string.
To add new tasks to the system, the AddTask
message is sent:
-
add_task <state> <string>
: Add a unique task to theQueuing_system
-
add_task <state> range <i> <j>
: Builds a list of tasks (j) where i<l<j. -
add_task <state> triangle <i>
: Builds a list of tasks (l,i) where 1<l<i.
When workers connect to the Queuing_system
with a Connect
message, they
obtain as a reply the address of the PULL
socket of the collector,
as well as a new Client ID :
connect (tcp|inproc)
Now the Workers connect a PUSH
socket to the PULL
endpoint of the
collector (TCP or inproc).
Workers are now ready to fetch new tasks using GetTask
messages:
get_task <state> <client_id>
and the Queuing_system
now knows which tasks runs on which client.
The reply is the task, as a string, and the corresponding Task ID.
When the task is done, the worker pushes the results to the collector,
and sends to qp_run
a TaskDone
message with the corresponding Task ID,
and its contribution to the control integer which will be accumulated in the
Queuing_system
instance:
task_done <state> <client_id> <task_id> <control>
If the queue is empty, the reply to the GetTaskmessage is a
Terminate` message
which informs the workers to terminate.
When a worker terminates, it sends a Disconnect
message to the scheduler:
disconnect <state> <client_id>
If there are remaining running clients, the reply is 0
. For the last client,
the reply contains the control integer, and this allows the worker to inform the
collector that all tasks are done and all workers are disconnected.
Once the collector thread has finished to pull all the data, it can terminate.
Now, the main thread can send an End_job
message to the scheduler to inform
it that the parallel task is done.
The ZeroMQ slave can be an MPI program. In that case, MPI is used to broadcast large arrays such as the wave function, and the input data which are read only by the MPI master. But the communication between the ZeroMQ slave and the ZeroMQ master is still done via ZeroMQ.