Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error when running single test using gen_cluster with check_new_threads=True #2487

Closed
lesteve opened this issue Jan 28, 2019 · 6 comments
Closed

Comments

@lesteve
Copy link
Member

lesteve commented Jan 28, 2019

I bumped into this when working on #2485. The only similar thing I found when googling is xpdAcq/rapidz#22.

To reproduce:

# Run this from your "distributed" checkout
conda create -n test -y dask pytest

conda activate test

pip install -e .
pytest distributed/tests/test_client.py::test_close

Note I am using the test_close (in distributed/tests/test_client.py) but this happens for any test using gen_cluster with check_new_threads=True. Not sure why this is not happening in Travis ...

The relevant part of the error:

       if PY3 and not WINDOWS and check_new_threads:
            start = time()
            while True:
                bad = [t for t, v in threading._active.items()
                       if t not in active_threads_start and
                      "Threaded" not in v.name and
                      "watch message" not in v.name]
                if not bad:
                    break
                else:
                    sleep(0.01)
                if time() > start + 5:
                    from distributed import profile
                    tid = bad[0]
                    thread = threading._active[tid]
                    call_stacks = profile.call_stack(sys._current_frames()[tid])
>                   assert False, (thread, call_stacks)
E                   AssertionError: (<Thread(ThreadPoolExecutor-0_0, started daemon 140614192363264)>, ['  File "/home/local/lesteve/miniconda3/envs/test/...vs/test/lib/python3.7/concurrent/futures/thread.py", line 78, in _worker
E                     \twork_item = work_queue.get(block=True)
E                     '])
E                   assert False

distributed/utils_test.py:944: AssertionError
Full output:
======================================================== test session starts ========================================================
platform linux -- Python 3.7.2, pytest-4.1.1, py-1.7.0, pluggy-0.8.1 -- /home/local/lesteve/miniconda3/envs/test/bin/python
cachedir: .pytest_cache
rootdir: /home/local/lesteve/dev/distributed, inifile: setup.cfg
collected 1 item                                                                                                                    

distributed/tests/test_client.py::test_close FAILED                                                                           [100%]

============================================================= FAILURES ==============================================================
____________________________________________________________ test_close _____________________________________________________________

    def test_func():
        del _global_workers[:]
        _global_clients.clear()
        active_threads_start = set(threading._active)
    
        reset_config()
    
        dask.config.set({'distributed.comm.timeouts.connect': '5s'})
        # Restore default logging levels
        # XXX use pytest hooks/fixtures instead?
        for name, level in logging_levels.items():
            logging.getLogger(name).setLevel(level)
    
        result = None
        workers = []
    
        with pristine_loop() as loop:
            with check_active_rpc(loop, active_rpc_timeout):
                @gen.coroutine
                def coro():
                    with dask.config.set(config):
                        s = False
                        for i in range(5):
                            try:
                                s, ws = yield start_cluster(
                                    ncores, scheduler, loop, security=security,
                                    Worker=Worker, scheduler_kwargs=scheduler_kwargs,
                                    worker_kwargs=worker_kwargs)
                            except Exception as e:
                                logger.error("Failed to start gen_cluster, retryng", exc_info=True)
                            else:
                                workers[:] = ws
                                args = [s] + workers
                                break
                        if s is False:
                            raise Exception("Could not start cluster")
                        if client:
                            c = yield Client(s.address, loop=loop, security=security,
                                             asynchronous=True, **client_kwargs)
                            args = [c] + args
                        try:
                            future = func(*args)
                            if timeout:
                                future = gen.with_timeout(timedelta(seconds=timeout),
                                                          future)
                            result = yield future
                            if s.validate:
                                s.validate_state()
                        finally:
                            if client:
                                yield c._close(fast=s.status == 'closed')
                            yield end_cluster(s, workers)
                            yield gen.with_timeout(timedelta(seconds=1),
                                                   cleanup_global_workers())
    
                        try:
                            c = yield default_client()
                        except ValueError:
                            pass
                        else:
                            yield c._close(fast=True)
    
                        raise gen.Return(result)
    
                result = loop.run_sync(coro, timeout=timeout * 2 if timeout else timeout)
    
            for w in workers:
                if getattr(w, 'data', None):
                    try:
                        w.data.clear()
                    except EnvironmentError:
                        # zict backends can fail if their storage directory
                        # was already removed
                        pass
                    del w.data
            DequeHandler.clear_all_instances()
            for w in _global_workers:
                w = w()
                w._close(report=False, executor_wait=False)
                if w.status == 'running':
                    w.close()
            del _global_workers[:]
    
        if PY3 and not WINDOWS and check_new_threads:
            start = time()
            while True:
                bad = [t for t, v in threading._active.items()
                       if t not in active_threads_start and
                      "Threaded" not in v.name and
                      "watch message" not in v.name]
                if not bad:
                    break
                else:
                    sleep(0.01)
                if time() > start + 5:
                    from distributed import profile
                    tid = bad[0]
                    thread = threading._active[tid]
                    call_stacks = profile.call_stack(sys._current_frames()[tid])
>                   assert False, (thread, call_stacks)
E                   AssertionError: (<Thread(ThreadPoolExecutor-0_0, started daemon 140068793722624)>, ['  File "/home/local/lesteve/miniconda3/envs/test/...vs/test/lib/python3.7/concurrent/futures/thread.py", line 78, in _worker
E                     \twork_item = work_queue.get(block=True)
E                     '])
E                   assert False

distributed/utils_test.py:944: AssertionError
------------------------------------------------------- Captured stderr call --------------------------------------------------------
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:     tcp://127.0.0.1:42177
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:40861
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:40861
distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:42177
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -                Memory:                   16.74 GB
distributed.worker - INFO -       Local Directory: /home/local/lesteve/dev/distributed/dask-worker-space/worker-3vcl3pzq
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:35517
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:35517
distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:42177
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          2
distributed.worker - INFO -                Memory:                   16.74 GB
distributed.worker - INFO -       Local Directory: /home/local/lesteve/dev/distributed/dask-worker-space/worker-jwhl26kd
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register tcp://127.0.0.1:40861
distributed.scheduler - INFO - Register tcp://127.0.0.1:35517
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:40861
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:35517
distributed.core - INFO - Starting established connection
distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:42177
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:42177
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Receive client connection: Client-4adcabba-231f-11e9-bc9e-d89ef3096d47
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Remove client Client-4adcabba-231f-11e9-bc9e-d89ef3096d47
distributed.scheduler - INFO - Remove client Client-4adcabba-231f-11e9-bc9e-d89ef3096d47
distributed.scheduler - INFO - Close client connection: Client-4adcabba-231f-11e9-bc9e-d89ef3096d47
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:40861
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:35517
distributed.scheduler - INFO - Scheduler closing...
distributed.scheduler - INFO - Scheduler closing all comms
distributed.worker - INFO - Connection to scheduler broken.  Reconnecting...
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Connection to scheduler broken.  Reconnecting...
distributed.worker - INFO - -------------------------------------------------
========================================================= warnings summary ==========================================================
distributed/utils_test.py:745
  /home/local/lesteve/dev/distributed/distributed/utils_test.py:745: PytestDeprecationWarning: the `pytest.config` global is deprecated.  Please use `request.config` or `pytest_configure` (if you're a pytest plugin) instead.
    if not pytest.config.getoption("--runslow"):
  /home/local/lesteve/dev/distributed/distributed/utils_test.py:745: PytestDeprecationWarning: the `pytest.config` global is deprecated.  Please use `request.config` or `pytest_configure` (if you're a pytest plugin) instead.
    if not pytest.config.getoption("--runslow"):
  /home/local/lesteve/dev/distributed/distributed/utils_test.py:745: PytestDeprecationWarning: the `pytest.config` global is deprecated.  Please use `request.config` or `pytest_configure` (if you're a pytest plugin) instead.
    if not pytest.config.getoption("--runslow"):
  /home/local/lesteve/dev/distributed/distributed/utils_test.py:745: PytestDeprecationWarning: the `pytest.config` global is deprecated.  Please use `request.config` or `pytest_configure` (if you're a pytest plugin) instead.
    if not pytest.config.getoption("--runslow"):
  /home/local/lesteve/dev/distributed/distributed/utils_test.py:745: PytestDeprecationWarning: the `pytest.config` global is deprecated.  Please use `request.config` or `pytest_configure` (if you're a pytest plugin) instead.
    if not pytest.config.getoption("--runslow"):
  /home/local/lesteve/dev/distributed/distributed/utils_test.py:745: PytestDeprecationWarning: the `pytest.config` global is deprecated.  Please use `request.config` or `pytest_configure` (if you're a pytest plugin) instead.
    if not pytest.config.getoption("--runslow"):
  /home/local/lesteve/dev/distributed/distributed/utils_test.py:745: PytestDeprecationWarning: the `pytest.config` global is deprecated.  Please use `request.config` or `pytest_configure` (if you're a pytest plugin) instead.
    if not pytest.config.getoption("--runslow"):
  /home/local/lesteve/dev/distributed/distributed/utils_test.py:745: PytestDeprecationWarning: the `pytest.config` global is deprecated.  Please use `request.config` or `pytest_configure` (if you're a pytest plugin) instead.
    if not pyte<details>
<summary>
Full error
</summary>

st.config.getoption("--runslow"):
  /home/local/lesteve/dev/distributed/distributed/utils_test.py:745: PytestDeprecationWarning: the `pytest.config` global is deprecated.  Please use `request.config` or `pytest_configure` (if you're a pytest plugin) instead.
    if not pytest.config.getoption("--runslow"):
  /home/local/lesteve/dev/distributed/distributed/utils_test.py:745: PytestDeprecationWarning: the `pytest.config` global is deprecated.  Please use `request.config` or `pytest_configure` (if you're a pytest plugin) instead.
    if not pytest.config.getoption("--runslow"):
  /home/local/lesteve/dev/distributed/distributed/utils_test.py:745: PytestDeprecationWarning: the `pytest.config` global is deprecated.  Please use `request.config` or `pytest_configure` (if you're a pytest plugin) instead.
    if not pytest.config.getoption("--runslow"):
  /home/local/lesteve/dev/distributed/distributed/utils_test.py:745: PytestDeprecationWarning: the `pytest.config` global is deprecated.  Please use `request.config` or `pytest_configure` (if you're a pytest plugin) instead.
    if not pytest.config.getoption("--runslow"):

distributed/tests/test_client.py::test_close
  /home/local/lesteve/miniconda3/envs/test/lib/python3.7/site-packages/zict/zip.py:3: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated, and in 3.8 it will stop working
    from collections import MutableMapping
  /home/local/lesteve/miniconda3/envs/test/lib/python3.7/site-packages/zict/common.py:3: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated, and in 3.8 it will stop working
    from collections import Mapping, MutableMapping

-- Docs: https://docs.pytest.org/en/latest/warnings.html
===================================================== slowest 10 test durations =====================================================
5.07s call     distributed/tests/test_client.py::test_close

(0.00 durations hidden.  Use -vv to show these durations.)
=============================================== 1 failed, 14 warnings in 6.10 seconds ===============================================

@mrocklin
Copy link
Member

mrocklin commented Feb 8, 2019

I believe that this has been resolved

@lesteve
Copy link
Member Author

lesteve commented Feb 11, 2019

Hmmm, I am still getting the error on master (0a5b8da).

@mrocklin
Copy link
Member

I've run the lines that you ran above from master and am unable to reproduce this. I'm also decently confident that this was resolved in #2510

@lesteve
Copy link
Member Author

lesteve commented Feb 21, 2019

OK this is weird I am still having a very similar issue with my original reproducer snippet. I get the problem on two different computers but I can not reproduce on Travis. Maybe @jcrist you can have a go since you bumped into the same issue in #2511?

Something I noticed is that the problematic thread is named differently: Profile, whereas it was ThreadPoolExecutor-<something> in the OP. See stack-trace below for more details.

Would it make sense to ignore "Profile" threads, as we do for "ThreadExecutor" and "watch message" threads?

Relevant part of the stack-trace (using the snippet from the top post):

        if PY3 and not WINDOWS and check_new_threads:
            start = time()
            while True:
                bad = [t for t, v in threading._active.items()
                       if t not in active_threads_start and
                      "Threaded" not in v.name and
                      "watch message" not in v.name and
                      "TCP-Executor" not in v.name]
                if not bad:
                    break
                else:
                    sleep(0.01)
                if time() > start + 5:
                    from distributed import profile
                    tid = bad[0]
                    thread = threading._active[tid]
                    call_stacks = profile.call_stack(sys._current_frames()[tid])
>                   assert False, (thread, call_stacks)
E                   AssertionError: (<Thread(Profile, started daemon 140360104298240)>, ['  File "/home/local/lesteve/miniconda3/envs/test/lib/python3.7/t...)
E                     ', '  File "/home/local/lesteve/dev/distributed/distributed/profile.py", line 251, in _watch
E                     \tsleep(interval)
E                     '])
E                   assert False

distributed/utils_test.py:945: AssertionError

@mrocklin
Copy link
Member

Hrm, so the relevant thread is started here:

if hasattr(self.io_loop, 'closing'):
def stop():
loop = ref()
return loop is None or loop.closing
else:
def stop():
loop = ref()
return loop is None or loop._closing
self.io_loop.profile = profile.watch(
omit=('profile.py', 'selectors.py'),
interval=dask.config.get('distributed.worker.profile.interval'),
cycle=dask.config.get('distributed.worker.profile.cycle'),
stop=stop,
)

And the stopping condition is checked here:

while not stop():

That logic looks ok to me. One possibility is that you have a very long profiling interval in a config file somewhere? (though I don't know why this would be).

Would it make sense to ignore "Profile" threads, as we do for "ThreadExecutor" and "watch message" threads?

No, I think that we genuinely need to test for these. We start them every time we create a Server around a new event loop. We need to make sure that we clean them up.

@lesteve
Copy link
Member Author

lesteve commented Feb 22, 2019

Thanks a lot for your help!

Debugging a bit further I noticed I had a old-style config in ~/.dask/config.yaml. Removing ~/.dask/config.yaml makes the problem go away.

This does not make immediate sense since the values seem to match in the old config.

profile-interval: 10  # milliseconds in bektween statistical profiling queries
profile-cycle-interval: 1000  # milliseconds between starting new profile

I have to admit I am happy I got back to a sane state, even without understanding fully why ...

@lesteve lesteve closed this as completed Feb 22, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants