Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dispynode cannot shutdown #181

Open
MorayK opened this issue Mar 20, 2019 · 20 comments
Open

dispynode cannot shutdown #181

MorayK opened this issue Mar 20, 2019 · 20 comments

Comments

@MorayK
Copy link

MorayK commented Mar 20, 2019

Dispynode cannot shutdown because of error:
2019-03-19 21:15:10 pycos - uncaught exception in !_shutdown/140335435848168:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/pycos/init.py", line 3671, in _schedule
retval = task._generator.send(task._value)
File "/usr/local/bin/dispynode.py", line 2424, in _shutdown
sock = AsyncSocket(socket.socket(addrinfo.family, socket.SOCK_STREAM),
AttributeError: 'NoneType' object has no attribute 'family'

In function shutdown(), dispynode.py, line 2423:
addrinfo = self.sdcheduler['addrinfo'] is None (from original initialization).
Thus it should not be used to get the socket family (via addrinfo.family).
Instead the default AF_INET should be used whenever addrinfo is None.

@MorayK
Copy link
Author

MorayK commented Mar 20, 2019

Also in dispynode.py, dispynode version: 4.10.5
There are other locations where addrinfo from self.scheduler['addrinfo'] is used unsafely.
Line numbers: 1034, 1846, 1953, 2423, 2459, 2529

@MorayK
Copy link
Author

MorayK commented Mar 20, 2019

In dispynode.py, dispynode version: 4.10.5
Here is where self.scheduler['addrinfo'] gets assigned:
Line numbers: 731, 1196, 1204

@MorayK
Copy link
Author

MorayK commented Mar 20, 2019

I fixed the shutdown by changing dispynode.py, line 730 to:
self.scheduler = {'ip_addr': dispy._node_ipaddr(scheduler_node) if scheduler_node else None,
'port': scheduler_port, 'auth': set(),
'addrinfo': dispy.host_addrinfo(host=scheduler_node,
ipv4_multicast=self.ipv4_udp_multicast) if scheduler_node else None}

@pgiri
Copy link
Owner

pgiri commented Mar 21, 2019

Thanks! I will take a look at it over the weekend and commit your fix.

@MorayK
Copy link
Author

MorayK commented Mar 21, 2019 via email

@MorayK
Copy link
Author

MorayK commented Mar 22, 2019

In my experiments, I get the following error in my code, dispynode2.py (with the shutdown fix):
2019-03-21 23:14:38 dispynode - New job id 139683873994696 from 10.129.3.64/10.129.3.64
Process Process-2:
Traceback (most recent call last):
File "/usr/lib64/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib64/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "dispynode2.py", line 193, in _dispy_job_func
reply_Q = __dispy_job_globals.pop('reply_Q')
KeyError: 'reply_Q'

I noticed the function _dispy_job_func is unprepared for 'reply_Q' to be absent.

@pgiri
Copy link
Owner

pgiri commented Mar 22, 2019

Giridhar, I have been code reviewing the dispy package to see if it is suitable for our planned usage, and I am impressed!

Thanks for your comments!

The package is extraordinary. Did you code it yourself?

Yes.

I will soon be asking a few questions regarding the best way to discover and map volatile dispynodes to particular dispyschedulers.

Sure.

@pgiri
Copy link
Owner

pgiri commented Mar 22, 2019

I noticed the function _dispy_job_func is unprepared for 'reply_Q' to be absent.

That shouldn't happen (i.e., reply_Q should be in job globals). Can you attach a sample (client) program that fails?

pgiri added a commit that referenced this issue Mar 24, 2019
'shutdown' should use 'addrinfo' attribute only if it is initialized.
Fix for issue #181.
@MorayK
Copy link
Author

MorayK commented Mar 25, 2019 via email

@pgiri
Copy link
Owner

pgiri commented Mar 25, 2019

There is only one reply_Q at a dispynode (created in constructor). It is (copy of it) sent to each job in its globals. The job setup (before client computation) removes it and after the computation, the results are put in that queue. Thus, each job needs to have it available in globals (otherwise the results can't be sent back to dispynode queue server that then processes results). I don't know how killing might have affected this. Yesterday I committed improvements to job termination, along with the fix for addrinfo issue mentioned in the first post of this issue, that may help.

I am curious about your project. If it is okay, can you give some information? You can email me if you prefer.

@MorayK
Copy link
Author

MorayK commented Mar 26, 2019 via email

@pgiri
Copy link
Owner

pgiri commented Mar 27, 2019

We are creating enterprise software for a large cluster where data has to be sharded across a large number of nodes.

Great!

  1. Continuing the architecture discussion: If multiple computations are running concurrently on one dispynode, and they need to use the global reply_Q simultaneously, how do you coordinate the usage?

If you are asking about corruption due to multiple jobs using the queue simultaneously, then I was not aware that putting elements needs to be protected with a lock (I thought putting elements is thread/process safe). However, I think fixing this with a lock may not be simple, as the job can be terminated while lock is held. I will address this after releasing 4.10.6 (there are already too many changes in this release), probably tomorrow.

  1. I was testing the messaging between the dispyscheduler and the dispynode. Work around: I discovered that sending one compute job from a client through the scheduler clears this signaling fault. Now the scheduler successfully receives the termination signal from the dispynode as it exits.

When just scheduler and node discover each other, nodes don't inform scheduler(s). Only after a scheduler sends a computation, node is reserved for that particular scheduler (so nodes are efficiently used). Sending a job is not required, just the computation (i.e., client used JobCluster or SharedJobCluster).

  1. When I change the cpus on the dispynode, it fails to inform the scheduler.

This may be an issue. I will look into it after the release.

@MorayK
Copy link
Author

MorayK commented Mar 27, 2019 via email

@pgiri
Copy link
Owner

pgiri commented Mar 27, 2019

The replies will have to be sent back to the scheduler associated with the job that put them on the reply_Q. Do I have the correct idea in mind? Can a job put more than one packet into the reply_Q?

Yes, _dispy_job_func puts job result (only) in the queue (after the job is done). If computation is a program, __job_program also puts the result in the same queue.

  1. I noticed that the dispynode.py code explicitly acquires and releases the thread_lock. Suggestion: Perhaps it would be better to use the 'with' statement to acquire and automatically release locks: https://docs.python.org/3/library/threading.html#with-locks

I don't know if there is any overhead with context managers. I agree that using with may be cleaner, but if using it burns few more cycles especially functions that execute often, it may be okay to forgo. Elsewhere with is used where it is convenient.

  1. We will need to send large python scripts instead of the compute function. When I use the path string for the compute parameter, I noticed that client sends the script to the scheduler who then sends it to the node. Can I send the script file directly to the node with cluster.send_file(path, node) and then invoke it to run directly on the node? How do I get stack traces or print() output from the running script?

I am a bit confused about then why you are using dispyscheduler, which is meant for sharing nodes simultaneously by multiple clients. I am wondering if using JobCluster (which includes a scheduler) is better suited. And if you don't need to compute at all but need to only send scripts, pycos is appropriate. With netpycos module you can send files as and when needed.

To answer above question about sending files with dispyscheduler, the client can't directly send files to nodes, because nodes are managed only by scheduler. Different parts of dispy and pycos use authentication strings for communication (every message, except for broadcasting or otherwise harmless messages , includes authentication that only valid peer knows). So with dispyscheduler, client can only talk to dispyscheduler and only dispyscheduler can talk to nodes. However, if JobCluster is used, there is no external scheduler, so files can be sent directly to nodes.

If you describe in a bit more detail what you need or if you attach a simple program that you use with dispyscheduler currently, I can suggest if alternates suggested above are better.

@MorayK
Copy link
Author

MorayK commented Mar 28, 2019 via email

@pgiri
Copy link
Owner

pgiri commented Mar 28, 2019

The compute() function is able to return output data files via dispy_send_file(). Are sent data files routed through the scheduler? I am hoping not.

If client calls send_file method on SharedJobCluster instance, then the file is sent via scheduler. The scheduler doesn't save the file, but relays to node.

(See attached script, motest.py; the client code is attached file e6.py).

I don't see files attached in your message.

Idea: What if I send in a small compute() function that reads a script file and calls exec() on it? Is this a good approach for the dispy system?

You can execute the script from within job computation, of course.

@MorayK
Copy link
Author

MorayK commented Mar 28, 2019

#motest.py:

dispy experiment

import dispy, socket, time
from dispy.dispynode import dispy_send_file
host = socket.gethostname()
ip = socket.gethostbyname(host)
if name == 'main':
print("Running motest.py on %s %s" % (host, ip))
with open("motest.in", 'r') as f:
rs = f.read()
fpath = "motest.out"
with open(fpath, 'w') as f:
f.write("motest from: %s %s\nRead: %s\n" % (host, ip, rs))
#dispy_send_file(fpath)
print("Done motest.py")

@MorayK
Copy link
Author

MorayK commented Mar 28, 2019

#e6.py:

OKD experiments

import dispy, socket, time

LOG_LEVEL = dispy.logger.DEBUG # ERROR, WARNING, DEBUG
SCHEDULER = "cqscheduler" # service name
HOST_NAME = socket.gethostname()
HOST_IP = socket.gethostbyname(HOST_NAME)
print(" my Host Name: %s" % HOST_NAME)
print(" my Host IP: %s" % HOST_IP)
SCRIPT = "motest.py"

def get_nodes(cluster):
status = cluster.status()
#print(status)
nodes = []
for node in status.nodes:
if node.name is not None:
nodes.append(node)
print("IP=%s, name=%s, cpus=%s, avail_cpus==%s" % (node.ip_addr, node.name, node.cpus, node.avail_cpus))
return nodes

def show_job(job, rv):
print("Job %s rv=%s:" % (job.id, rv))
print(job.stdout.decode())
print(job.stderr.decode())
#print(job.exception, job.ip_addr, job.start_time, job.end_time)

if name == 'main':
cluster = dispy.SharedJobCluster(SCRIPT, loglevel=LOG_LEVEL, scheduler_node=SCHEDULER, ext_ip_addr=HOST_IP)
print(" Mo cluster initialized")
time.sleep(.1)
nodes = get_nodes(cluster)
print(" Mo now submitting jobs...")
jobs = []
id = 0
for node in nodes:
cluster.send_file("motest.in", node)
job = cluster.submit_node(node, 1)
assert(job)
job.id = id
id += 1
jobs.append(job)
#cluster.wait()
for job in jobs:
rv = job() # waits for job to finish
show_job(job, rv)
cluster.print_status()

@MorayK
Copy link
Author

MorayK commented Mar 28, 2019

Sorry. The drag and drop would not accept the python files. When I pasted in the contents, the indentation was lost. Do you have an email address that I can send the files to you directly?

@pgiri
Copy link
Owner

pgiri commented Mar 28, 2019

My email is in __author__ setting in files.

pgiri added a commit that referenced this issue Mar 31, 2019
When shutting down, node now broadcasts that it has terminated
services, so any schedulers that have discovered it, but not yet
using it, delete it. This addresses one of the issues mentioned
in issue #181.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants