Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tests: Refactor transport tests from unittest to pytest #6152

Merged
merged 1 commit into from
Oct 17, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
467 changes: 176 additions & 291 deletions tests/transports/test_all_plugins.py

Large diffs are not rendered by default.

50 changes: 17 additions & 33 deletions tests/transports/test_local.py
Original file line number Diff line number Diff line change
@@ -7,54 +7,38 @@
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
"""Tests for the `LocalTransport`."""
import unittest
"""Test :mod:`aiida.transports.plugins.local`."""
import getpass

import pytest

from aiida.transports.plugins.local import LocalTransport
from aiida.transports.transport import TransportInternalError

# This will be used by test_all_plugins

plugin_transport = LocalTransport()


class TestGeneric(unittest.TestCase):
"""
Test whoami on localhost.
"""

def test_whoami(self):
"""Test the `whoami` command."""
import getpass
def test_basic():
"""Test constructor."""
with LocalTransport():
pass

with LocalTransport() as transport:
self.assertEqual(transport.whoami(), getpass.getuser())

def test_whoami():
"""Test the `whoami` command."""
with LocalTransport() as transport:
assert transport.whoami() == getpass.getuser()

class TestBasicConnection(unittest.TestCase):
"""
Test basic connections.
"""

def test_closed_connection(self):
"""Test running a command on a closed connection."""

with self.assertRaises(TransportInternalError):
transport = LocalTransport()
transport.listdir()

@staticmethod
def test_basic():
"""Test constructor."""
with LocalTransport():
pass
def test_closed_connection():
"""Test running a command on a closed connection."""
with pytest.raises(TransportInternalError):
transport = LocalTransport()
transport.listdir()


def test_gotocomputer():
"""Test gotocomputer"""
with LocalTransport() as transport:
cmd_str = transport.gotocomputer_command('/remote_dir/')

expected_str = (
"""bash -c "if [ -d '/remote_dir/' ] ;"""
""" then cd '/remote_dir/' ; bash -l ; else echo ' ** The directory' ; """
132 changes: 61 additions & 71 deletions tests/transports/test_ssh.py
Original file line number Diff line number Diff line change
@@ -7,103 +7,93 @@
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
"""Test the `SshTransport` plugin on localhost."""
"""Test :mod:`aiida.transports.plugins.ssh`."""
import logging
import unittest

import paramiko
import pytest

from aiida.transports.plugins.ssh import SshTransport
from aiida.transports.transport import TransportInternalError

# This will be used by test_all_plugins

plugin_transport = SshTransport(machine='localhost', timeout=30, load_system_host_keys=True, key_policy='AutoAddPolicy')
def test_closed_connection_ssh():
"""Test calling command on a closed connection."""
with pytest.raises(TransportInternalError):
transport = SshTransport(machine='localhost')
transport._exec_command_internal('ls') # pylint: disable=protected-access


class TestBasicConnection(unittest.TestCase):
"""
Test basic connections.
"""
def test_closed_connection_sftp():
"""Test calling sftp command on a closed connection."""
with pytest.raises(TransportInternalError):
transport = SshTransport(machine='localhost')
transport.listdir()

def test_closed_connection_ssh(self):
"""Test calling command on a closed connection."""
with self.assertRaises(TransportInternalError):
transport = SshTransport(machine='localhost')
transport._exec_command_internal('ls') # pylint: disable=protected-access

def test_closed_connection_sftp(self):
"""Test calling sftp command on a closed connection."""
with self.assertRaises(TransportInternalError):
transport = SshTransport(machine='localhost')
transport.listdir()
def test_auto_add_policy():
"""Test the auto add policy."""
with SshTransport(machine='localhost', timeout=30, load_system_host_keys=True, key_policy='AutoAddPolicy'):
pass

@staticmethod
def test_auto_add_policy():
"""Test the auto add policy."""
with SshTransport(machine='localhost', timeout=30, load_system_host_keys=True, key_policy='AutoAddPolicy'):
pass

@staticmethod
def test_proxy_jump():
"""Test the connection with a proxy jump or several"""
with SshTransport(
machine='localhost',
proxy_jump='localhost',
timeout=30,
load_system_host_keys=True,
key_policy='AutoAddPolicy'
):
pass
def test_proxy_jump():
"""Test the connection with a proxy jump or several"""
with SshTransport(
machine='localhost', proxy_jump='localhost', timeout=30, load_system_host_keys=True, key_policy='AutoAddPolicy'
):
pass

# kind of pointless, but should work and to check that proxy chaining works
with SshTransport(
machine='localhost',
proxy_jump='localhost,localhost,localhost',
timeout=30,
load_system_host_keys=True,
key_policy='AutoAddPolicy'
):
pass
# kind of pointless, but should work and to check that proxy chaining works
with SshTransport(
machine='localhost',
proxy_jump='localhost,localhost,localhost',
timeout=30,
load_system_host_keys=True,
key_policy='AutoAddPolicy'
):
pass

def test_proxy_jump_invalid(self):
"""Test proper error reporting when invalid host as a proxy"""

# import is also that when Python is running with debug warnings `-Wd`
# no unclosed files are reported.
with self.assertRaises(paramiko.SSHException):
with SshTransport(
machine='localhost',
proxy_jump='localhost,nohost',
timeout=30,
load_system_host_keys=True,
key_policy='AutoAddPolicy'
):
pass

@staticmethod
def test_proxy_command():
"""Test the connection with a proxy command"""

def test_proxy_jump_invalid():
"""Test proper error reporting when invalid host as a proxy"""

# import is also that when Python is running with debug warnings `-Wd`
# no unclosed files are reported.
with pytest.raises(paramiko.SSHException):
with SshTransport(
machine='localhost',
proxy_command='ssh -W localhost:22 localhost',
proxy_jump='localhost,nohost',
timeout=30,
load_system_host_keys=True,
key_policy='AutoAddPolicy'
):
pass

def test_no_host_key(self):
"""Test if there is no host key."""
# Disable logging to avoid output during test
logging.disable(logging.ERROR)

with self.assertRaises(paramiko.SSHException):
with SshTransport(machine='localhost', timeout=30, load_system_host_keys=False):
pass
def test_proxy_command():
"""Test the connection with a proxy command"""
with SshTransport(
machine='localhost',
proxy_command='ssh -W localhost:22 localhost',
timeout=30,
load_system_host_keys=True,
key_policy='AutoAddPolicy'
):
pass


def test_no_host_key():
"""Test if there is no host key."""
# Disable logging to avoid output during test
logging.disable(logging.ERROR)

with pytest.raises(paramiko.SSHException):
with SshTransport(machine='localhost', timeout=30, load_system_host_keys=False):
pass

# Reset logging level
logging.disable(logging.NOTSET)
# Reset logging level
logging.disable(logging.NOTSET)


def test_gotocomputer():

Unchanged files with check annotations Beta

if parent_calc and self.metadata.store_provenance:
if isinstance(parent_calc, orm.CalculationNode):
raise exceptions.InvalidOperation('calling processes from a calculation type process is forbidden.')

Check failure on line 715 in aiida/engine/processes/process.py

GitHub Actions / tests (3.6)

calling processes from a calculation type process is forbidden.

Check failure on line 715 in aiida/engine/processes/process.py

GitHub Actions / tests (3.7)

calling processes from a calculation type process is forbidden.

Check failure on line 715 in aiida/engine/processes/process.py

GitHub Actions / tests (3.8)

calling processes from a calculation type process is forbidden.
if isinstance(self.node, orm.CalculationNode):
self.node.base.links.add_incoming(parent_calc, LinkType.CALL_CALC, self.metadata.call_link_label)
_node_class = WorkflowNode
def run(self):
raise RuntimeError('CRASH')

Check failure on line 70 in tests/utils/processes.py

GitHub Actions / tests (3.6)

CRASH

Check failure on line 70 in tests/utils/processes.py

GitHub Actions / tests (3.7)

CRASH

Check failure on line 70 in tests/utils/processes.py

GitHub Actions / tests (3.8)

CRASH
class WaitProcess(Process):
self.pause()
def check(self):
raise RuntimeError('should have been aborted by now')

Check failure on line 1171 in tests/engine/test_work_chain.py

GitHub Actions / tests (3.6)

should have been aborted by now

Check failure on line 1171 in tests/engine/test_work_chain.py

GitHub Actions / tests (3.7)

should have been aborted by now

Check failure on line 1171 in tests/engine/test_work_chain.py

GitHub Actions / tests (3.8)

should have been aborted by now
def test_simple_run(self):
"""
self.pause()
def check(self):
raise RuntimeError('should have been aborted by now')

Check failure on line 1253 in tests/engine/test_work_chain.py

GitHub Actions / tests (3.6)

should have been aborted by now

Check failure on line 1253 in tests/engine/test_work_chain.py

GitHub Actions / tests (3.7)

should have been aborted by now

Check failure on line 1253 in tests/engine/test_work_chain.py

GitHub Actions / tests (3.8)

should have been aborted by now
class MainWorkChain(WorkChain):
return ToContext(child=self.submit(TestWorkChainAbortChildren.SubWorkChain, kill=self.inputs.kill))
def check(self):
raise RuntimeError('should have been aborted by now')

Check failure on line 1267 in tests/engine/test_work_chain.py

GitHub Actions / tests (3.6)

should have been aborted by now

Check failure on line 1267 in tests/engine/test_work_chain.py

GitHub Actions / tests (3.7)

should have been aborted by now

Check failure on line 1267 in tests/engine/test_work_chain.py

GitHub Actions / tests (3.8)

should have been aborted by now
def test_simple_run(self):
"""
# Submitting from within another process requires `self.submit` unless it is a work function, in which case the
# current process in the scope should be an instance of `FunctionProcess`
if is_process_scoped() and not isinstance(Process.current(), FunctionProcess):
raise InvalidOperation('Cannot use top-level `submit` from within another process, use `self.submit` instead')

Check failure on line 98 in aiida/engine/launch.py

GitHub Actions / tests (3.6)

Cannot use top-level `submit` from within another process, use `self.submit` instead

Check failure on line 98 in aiida/engine/launch.py

GitHub Actions / tests (3.7)

Cannot use top-level `submit` from within another process, use `self.submit` instead

Check failure on line 98 in aiida/engine/launch.py

GitHub Actions / tests (3.8)

Cannot use top-level `submit` from within another process, use `self.submit` instead
runner = manager.get_manager().get_runner()
assert runner.persister is not None, 'runner does not have a persister'