diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS
index 926ae8247fbf..3fe456ceb364 100644
--- a/.github/CODEOWNERS
+++ b/.github/CODEOWNERS
@@ -23,8 +23,7 @@
/corehq/sql_db/ @snopoke @calellowitz
/corehq/sql_proxy_accessors/ @snopoke
/corehq/sql_proxy_standby_accessors/ @snopoke
-/corehq/tests/nose.py @millerdev
-/corehq/tests/noseplugins/ @millerdev
+/corehq/tests/ @millerdev
/corehq/util/couch.py @esoergel
/corehq/util/couch_helpers.py @millerdev
/corehq/util/datadog/lockmeter.py @millerdev
diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml
index 796f48577ff7..658db16b1652 100644
--- a/.github/workflows/tests.yml
+++ b/.github/workflows/tests.yml
@@ -19,9 +19,9 @@ jobs:
fail-fast: false
matrix:
include:
- - {TEST: python, NOSE_DIVIDED_WE_RUN: '05'}
- - {TEST: python, NOSE_DIVIDED_WE_RUN: '6a'}
- - {TEST: python, NOSE_DIVIDED_WE_RUN: 'bf'}
+ - {TEST: python, DIVIDED_WE_RUN: '05'}
+ - {TEST: python, DIVIDED_WE_RUN: '6a'}
+ - {TEST: python, DIVIDED_WE_RUN: 'bf'}
- {TEST: python-sharded-and-javascript}
env:
DATADOG_API_KEY: ${{ secrets.DATADOG_API_KEY }}
@@ -46,11 +46,11 @@ jobs:
- name: Run tests
env:
TEST: ${{ matrix.TEST }}
- NOSE_DIVIDED_WE_RUN: ${{ matrix.NOSE_DIVIDED_WE_RUN }}
+ DIVIDED_WE_RUN: ${{ matrix.DIVIDED_WE_RUN }}
JS_SETUP: yes
KAFKA_HOSTNAME: kafka
STRIPE_PRIVATE_KEY: ${{ secrets.STRIPE_PRIVATE_KEY }}
- run: scripts/docker test --noinput --stop -v --divided-we-run=${{ matrix.NOSE_DIVIDED_WE_RUN }} --divide-depth=1 --with-timing --with-flaky --threshold=10 --max-test-time=29
+ run: scripts/docker test --exitfirst -vv --reusedb=1 --divided-we-run=${{ matrix.DIVIDED_WE_RUN }} --showlocals --max-test-time=29 -p no:cacheprovider
- name: "Codecov upload"
env:
TOKEN: ${{ secrets.CODECOV_TOKEN }}
diff --git a/.pytest.ini b/.pytest.ini
new file mode 100644
index 000000000000..41809a8e5125
--- /dev/null
+++ b/.pytest.ini
@@ -0,0 +1,49 @@
+[pytest]
+minversion = 8.1
+
+addopts =
+ --strict-markers
+ -pcorehq.tests.pytest_hooks
+ # HQ has its own (incompatible) warnings system
+ -pno:warnings
+markers =
+ es_test: marker for elasticsearch tests
+ sharded: tests to be run against shard databases
+ skip_setup_users: skip user setup in importer tests
+ slow: marks tests as slow (deselect with -m 'not slow')
+
+empty_parameter_set_mark = fail_at_collect
+xfail_strict = true
+
+norecursedirs =
+ .*
+ *.egg
+ artifacts
+ docker
+ git-hooks
+ locale
+ node_modules
+ requirements
+ scripts
+ sharedfiles
+ staticfiles
+
+python_files =
+ test_*.py
+ tests.py
+ */tests/*.py
+
+pythonpath =
+ .
+ corehq/ex-submodules
+ # 'submodules' is for langcodes
+ submodules
+ submodules/commcare-translations
+ submodules/couchdbkit-aggregate
+ submodules/django-digest-src
+ submodules/django-no-exceptions
+ submodules/python-digest
+ submodules/xml2json
+
+required_plugins = pytest-django
+DJANGO_SETTINGS_MODULE = testsettings
diff --git a/DEV_SETUP.md b/DEV_SETUP.md
index 5bd76c3b3538..b016440cd439 100644
--- a/DEV_SETUP.md
+++ b/DEV_SETUP.md
@@ -1038,8 +1038,8 @@ Or, to drop the current test DB and create a fresh one
./manage.py test corehq.apps.app_manager --reusedb=reset
```
-See `corehq.tests.nose.HqdbContext` ([source](corehq/tests/nose.py)) for full
-description of `REUSE_DB` and `--reusedb`.
+See `corehq.tests.pytest_plugins.reusedb` ([source](corehq/tests/pytest_plugins/reusedb.py))
+for full description of `REUSE_DB` and `--reusedb`.
### Accessing the test shell and database
@@ -1102,21 +1102,21 @@ ignore:unclosed:ResourceWarning'
Personal whitelist items may also be added in localsettings.py.
-### Running tests by tag
+### Running tests by marker
-You can run all tests with a certain tag as follows:
+You can run all tests with a certain marker as follows:
```sh
-./manage.py test --attr=tag
+pytest -m MARKER
```
-Available tags:
+Available markers:
- slow: especially slow tests
- sharded: tests that should get run on the sharded test runner
- es_test: Elasticsearch tests
-See http://nose.readthedocs.io/en/latest/plugins/attrib.html for more details.
+See https://docs.pytest.org/en/stable/example/markers.html for more details.
### Running on DB tests or Non-DB tests
@@ -1131,7 +1131,7 @@ See http://nose.readthedocs.io/en/latest/plugins/attrib.html for more details.
### Running only failed tests
-See https://github.com/nose-devs/nose/blob/master/nose/plugins/testid.py
+See https://docs.pytest.org/en/stable/how-to/cache.html
## Javascript tests
diff --git a/corehq/README.rst b/corehq/README.rst
index 4b0a92354f2c..f73042cbf19c 100644
--- a/corehq/README.rst
+++ b/corehq/README.rst
@@ -48,7 +48,7 @@ tabs
Menu structure for CommCare HQ.
tests
Contains a few tests for high-level functionality like locks, as well as tooling to run tests with
- `nose `_, an extension of ``unittest``.
+ `pytest `_.
toggles.py
Toggles allow limiting functionality based on user or domain. Also see ``ex-submodules/toggle`` and ``corehq.apps.toggle_ui``.
util
diff --git a/corehq/apps/change_feed/tests/test_data_sources.py b/corehq/apps/change_feed/tests/test_data_sources.py
index 2bb249452147..3a5b8000720d 100644
--- a/corehq/apps/change_feed/tests/test_data_sources.py
+++ b/corehq/apps/change_feed/tests/test_data_sources.py
@@ -201,7 +201,17 @@ def sms_data():
@sharded
class DocumentStoreDbTests(TestCase):
- pass
+
+ def test_couch_document_store(self):
+ # this one is not included with generate_cases below because
+ # get_db() should not be called during test collection
+ _test_document_store(
+ self,
+ CouchDocumentStore,
+ (get_db(), 'domain', 'doc_type'),
+ couch_data,
+ '_id',
+ )
def _test_document_store(self, doc_store_cls, doc_store_args, data_context, id_field):
@@ -216,7 +226,6 @@ def _test_document_store(self, doc_store_cls, doc_store_args, data_context, id_f
@generate_cases([
- (CouchDocumentStore, (get_db(), 'domain', 'doc_type'), couch_data, '_id'),
(CaseDocumentStore, ('domain',), case_data, '_id'),
(FormDocumentStore, ('domain',), form_data, '_id'),
(LocationDocumentStore, ('domain',), location_data, 'location_id'),
@@ -224,7 +233,7 @@ def _test_document_store(self, doc_store_cls, doc_store_args, data_context, id_f
(SyncLogDocumentStore, (), synclog_data, '_id'),
(SMSDocumentStore, (), sms_data, '_id'),
], DocumentStoreDbTests)
-def test_documet_store(*args):
+def test_document_store(*args):
_test_document_store(*args)
diff --git a/corehq/apps/es/tests/utils.py b/corehq/apps/es/tests/utils.py
index 15a227cb3d13..e092224802bf 100644
--- a/corehq/apps/es/tests/utils.py
+++ b/corehq/apps/es/tests/utils.py
@@ -98,7 +98,7 @@ def checkQuery(self, query, expected_json, is_raw_query=False, validate_query=Tr
@nottest
def es_test(test=None, requires=None, setup_class=False):
"""Decorator for Elasticsearch tests.
- The decorator sets the ``es_test`` nose attribute and optionally performs
+ The decorator sets the ``es_test`` pytest marker and optionally performs
index setup/cleanup before and after the test(s).
:param test: A test class, method, or function -- only used via the
diff --git a/corehq/apps/es/transient_util.py b/corehq/apps/es/transient_util.py
index 35e2cc16ee73..75dd2b4fce27 100644
--- a/corehq/apps/es/transient_util.py
+++ b/corehq/apps/es/transient_util.py
@@ -88,7 +88,6 @@ def populate_doc_adapter_map():
from pillowtop.tests.utils import TEST_ES_TYPE, TEST_ES_MAPPING, TEST_ES_INDEX
add_dynamic_adapter("PillowTop", TEST_ES_INDEX, TEST_ES_TYPE, TEST_ES_MAPPING)
- import corehq.tests.pytest_compat # noqa: F401 - to be removed after switch to pytest
from corehq.apps.es.tests.utils import TEST_ES_INFO, TEST_ES_MAPPING
add_dynamic_adapter("UtilES", TEST_ES_INFO.alias, TEST_ES_INFO.type,
TEST_ES_MAPPING)
diff --git a/corehq/ex-submodules/casexml/apps/phone/tests/test_restore_user.py b/corehq/ex-submodules/casexml/apps/phone/tests/test_restore_user.py
index 45362bb8c7a5..26ec2bd0f586 100644
--- a/corehq/ex-submodules/casexml/apps/phone/tests/test_restore_user.py
+++ b/corehq/ex-submodules/casexml/apps/phone/tests/test_restore_user.py
@@ -1,6 +1,8 @@
from unittest.mock import Mock, patch
+import pytest
from nose.tools import assert_equal
+from unmagic import use
from corehq.apps.users.models import CommCareUser, DomainMembership, WebUser
from corehq.apps.users.tests.util import patch_user_data_db_layer
@@ -24,11 +26,12 @@ def test_get_commtrack_location_id():
assert_equal(loc_id, '1')
-@patch_user_data_db_layer
-def test_user_types():
- for user, expected_type in [
- (WebUser(), 'web'),
- (CommCareUser(domain=DOMAIN), 'commcare'),
- ]:
+@pytest.mark.parametrize("user, expected_type", [
+ (WebUser(), 'web'),
+ (CommCareUser(domain=DOMAIN), 'commcare'),
+])
+@use("db")
+def test_user_types(user, expected_type):
+ with patch_user_data_db_layer():
user_type = user.to_ota_restore_user(DOMAIN).user_session_data['commcare_user_type']
- yield assert_equal, user_type, expected_type
+ assert_equal(user_type, expected_type)
diff --git a/corehq/ex-submodules/dimagi/utils/tests/test_decorators.py b/corehq/ex-submodules/dimagi/utils/tests/test_decorators.py
index 17e458afac7f..de53b9d7e730 100644
--- a/corehq/ex-submodules/dimagi/utils/tests/test_decorators.py
+++ b/corehq/ex-submodules/dimagi/utils/tests/test_decorators.py
@@ -1,4 +1,4 @@
-import sys
+import inspect
from io import StringIO
from unittest.mock import patch
@@ -15,7 +15,8 @@ def test_profile_decorator():
def func(arg):
args.append(arg)
- with patch.object(sys.stderr, "write", output.write):
+ sys_stderr = inspect.signature(profile).parameters["stream"].default
+ with patch.object(sys_stderr, "write", output.write):
func(1)
eq(args, [1])
eq(output.getvalue(), Regex(r"test_decorators.py:\d+\(func\)"))
diff --git a/corehq/tests/nose.py b/corehq/tests/nose.py
deleted file mode 100644
index fa386433205e..000000000000
--- a/corehq/tests/nose.py
+++ /dev/null
@@ -1,381 +0,0 @@
-"""
-Utilities and plugins for running tests with nose
-
-Django-nose database context to run tests in two phases:
-
- - Stage 1 runs all test that don't require DB access (test that don't inherit
- from TransactionTestCase)
- - Stage 2 runs all DB tests (test that do inherit from TransactionTestCase)
-
-Adapted from testrunner.TwoStageTestRunner
-Based on http://www.caktusgroup.com/blog/2013/10/02/skipping-test-db-creation/
-"""
-import logging
-import os
-import sys
-import threading
-from fnmatch import fnmatch
-
-from django.conf import settings
-from django.core import cache
-from django.core.management import call_command
-from django.db.backends.base.creation import TEST_DATABASE_PREFIX
-from django.db.utils import OperationalError
-from django.test.utils import get_unique_databases_and_mirrors
-
-from couchdbkit import ResourceNotFound
-from couchdbkit.ext.django import loading
-from django_nose.plugin import DatabaseContext
-from unittest.mock import Mock, patch
-from nose.plugins import Plugin
-from nose.tools import nottest
-from requests.exceptions import HTTPError
-
-from dimagi.utils.parsing import string_to_boolean
-
-from corehq.apps.es.client import manager as elastic_manager
-from corehq.tests.noseplugins.cmdline_params import CmdLineParametersPlugin
-from corehq.util.couchdb_management import couch_config
-from corehq.util.test_utils import timelimit, unit_testing_only
-
-log = logging.getLogger(__name__)
-
-
-class HqTestFinderPlugin(Plugin):
- """Find tests in all modules within "tests" packages"""
-
- enabled = True
-
- INCLUDE_DIRS = [
- "corehq/ex-submodules/*",
- "submodules/dimagi-utils-src",
- "submodules/django-digest-src",
- "submodules/toggle",
- "extensions/*/*",
- "custom",
- ]
-
- def options(self, parser, env):
- """Avoid adding a ``--with`` option for this plugin."""
-
- def configure(self, options, conf):
- # do not call super (always enabled)
-
- import corehq
- abspath = os.path.abspath
- dirname = os.path.dirname
- self.hq_root = dirname(dirname(abspath(corehq.__file__)))
-
- @staticmethod
- def pathmatch(path, pattern):
- """Test if globbing pattern matches path
-
- >>> join = os.path.join
- >>> match = HqTestFinderPlugin.pathmatch
- >>> match(join('a', 'b', 'c'), 'a/b/c')
- True
- >>> match(join('a', 'b', 'c'), 'a/b/*')
- True
- >>> match(join('a', 'b', 'c'), 'a/*/c')
- True
- >>> match(join('a'), 'a/*')
- True
- >>> match(join('a', 'b', 'c'), 'a/b')
- >>> match(join('a', 'b', 'c'), 'a/*')
- >>> match(join('a', 'b', 'c'), 'a/*/x')
- False
- >>> match(join('a', 'b', 'x'), 'a/b/c')
- False
- >>> match(join('a', 'x', 'c'), 'a/b')
- False
-
- :returns: `True` if the pattern matches. `False` if it does not
- match. `None` if the match pattern could match, but
- has less elements than the path.
- """
- parts = path.split(os.path.sep)
- patterns = pattern.split("/")
- result = all(fnmatch(part, pat) for part, pat in zip(parts, patterns))
- if len(patterns) >= len(parts):
- return result
- return None if result else False
-
- def wantDirectory(self, directory):
- root = self.hq_root + os.path.sep
- if directory.startswith(root):
- relname = directory[len(root):]
- results = [self.pathmatch(relname, p) for p in self.INCLUDE_DIRS]
- log.debug("want directory? %s -> %s", relname, results)
- if any(results):
- return True
- else:
- log.debug("ignored directory: %s", directory)
- return None
-
- def wantFile(self, path):
- """Want all .py files in .../tests dir (and all sub-packages)"""
- pysrc = os.path.splitext(path)[-1] == ".py"
- if pysrc:
- parent, base = os.path.split(path)
- while base and len(parent) > len(self.hq_root):
- if base == "tests":
- return True
- parent, base = os.path.split(parent)
-
- def wantModule(self, module):
- """Want all modules in "tests" package"""
- return "tests" in module.__name__.split(".")
-
-
-class ErrorOnDbAccessContext(object):
- """Ensure that touching a database raises an error."""
-
- def __init__(self, tests, runner):
- pass
-
- def setup(self):
- """Disable database access"""
- self.original_db_enabled = settings.DB_ENABLED
- settings.DB_ENABLED = False
-
- self.db_patch = patch('django.db.backends.utils.CursorWrapper')
- db_mock = self.db_patch.start()
- error = RuntimeError(
- "Attempt to access database in a 'no database' test suite. "
- "It could be that you don't have 'BASE_ADDRESS' set in your "
- "localsettings.py. If your test really needs database access "
- "it should subclass 'django.test.testcases.TestCase' or a "
- "similar test base class.")
- db_mock.side_effect = error
-
- class CouchSpec(object):
- dbname = None
- view = Mock(return_value=[])
-
- def mock_couch(app):
- dbname = dbs.get(app, main_db_url).rsplit("/", 1)[1]
- return Mock(name=dbname, dbname=dbname, spec_set=CouchSpec)
-
- # register our dbs with the extension document classes
- main_db_url = settings.COUCH_DATABASE
- dbs = dict(settings.COUCHDB_DATABASES)
- self.db_classes = db_classes = []
- for app, value in loading.couchdbkit_handler.app_schema.items():
- for cls in value.values():
- db_classes.append(cls)
- cls.set_db(mock_couch(app))
-
- def teardown(self):
- """Enable database access"""
- settings.DB_ENABLED = self.original_db_enabled
- for cls in self.db_classes:
- del cls._db
- self.db_patch.stop()
-
-
-class HqdbContext(DatabaseContext):
- """Database setup/teardown
-
- In addition to the normal django database setup/teardown, also
- setup/teardown couch databases. Database setup/teardown may be
- skipped, depending on the presence and value of an environment
- variable (`REUSE_DB`). Typical usage is `REUSE_DB=1` which means
- skip database setup and migrations if possible and do not teardown
- databases after running tests. If connection fails for any test
- database in `settings.DATABASES` all databases will be re-created
- and migrated.
-
- When using REUSE_DB=1, you may also want to provide a value for the
- --reusedb option, either reset, flush, bootstrap, migrate, or teardown.
- ./manage.py test --help will give you a description of these.
- """
-
- def __init__(self, tests, runner):
- reuse_db = (CmdLineParametersPlugin.get('reusedb')
- or string_to_boolean(os.environ.get("REUSE_DB") or "0"))
- self.reuse_db = reuse_db
- self.skip_setup_for_reuse_db = reuse_db and reuse_db != "reset"
- self.skip_teardown_for_reuse_db = reuse_db and reuse_db != "teardown"
- super(HqdbContext, self).__init__(tests, runner)
-
- def should_skip_test_setup(self):
- return CmdLineParametersPlugin.get('collect_only')
-
- @timelimit(480)
- def setup(self):
- if self.should_skip_test_setup():
- return
-
- from corehq.blobs.tests.util import TemporaryFilesystemBlobDB
- self.blob_db = TemporaryFilesystemBlobDB()
- self.old_names = self._get_databases()
-
- if self.skip_setup_for_reuse_db and self._databases_ok():
- if self.reuse_db == "migrate":
- call_command('migrate_multi', interactive=False)
- if self.reuse_db == "flush":
- flush_databases()
- if self.reuse_db == "bootstrap":
- bootstrap_migrated_db_state()
- return # skip remaining setup
-
- if self.reuse_db == "reset":
- self.reset_databases()
-
- print("", file=sys.__stdout__) # newline for creating database message
- if self.reuse_db:
- print("REUSE_DB={} ".format(self.reuse_db), file=sys.__stdout__, end="")
- if self.skip_setup_for_reuse_db:
- # pass this on to the Django runner to avoid creating databases
- # that already exist
- self.runner.keepdb = True
- super(HqdbContext, self).setup()
-
- def reset_databases(self):
- self.delete_couch_databases()
- self.delete_elastic_indexes()
- self.clear_redis()
- # tear down all databases together to avoid dependency issues
- teardown = []
- for connection, db_name, is_first in self.old_names:
- try:
- connection.ensure_connection()
- teardown.append((connection, db_name, is_first))
- except OperationalError:
- pass # ignore databases that don't exist
- self.runner.teardown_databases(reversed(teardown))
-
- def _databases_ok(self):
- for connection, db_name, _ in self.old_names:
- db = connection.settings_dict
- assert db["NAME"].startswith(TEST_DATABASE_PREFIX), db["NAME"]
- try:
- connection.ensure_connection()
- except OperationalError as e:
- print(str(e), file=sys.__stderr__)
- return False
- return True
-
- def _get_databases(self):
- from django.db import connections
- old_names = []
- test_databases, mirrored_aliases = get_unique_databases_and_mirrors()
- assert not mirrored_aliases, "DB mirrors not supported"
- for signature, (db_name, aliases) in test_databases.items():
- alias = list(aliases)[0]
- connection = connections[alias]
- old_names.append((connection, db_name, True))
- return old_names
-
- def delete_couch_databases(self):
- for db in get_all_test_dbs():
- try:
- db.server.delete_db(db.dbname)
- log.info("deleted database %s", db.dbname)
- except ResourceNotFound:
- log.info("database %s not found! it was probably already deleted.",
- db.dbname)
-
- def delete_elastic_indexes(self):
- # corehq.apps.es.client.create_document_adapter uses
- # TEST_DATABASE_PREFIX when constructing test index names
- for index_name in elastic_manager.get_indices():
- if index_name.startswith(TEST_DATABASE_PREFIX):
- elastic_manager.index_delete(index_name)
-
- def clear_redis(self):
- config = settings.CACHES.get("redis", {})
- loc = config.get("TEST_LOCATION")
- if loc:
- redis = cache.caches['redis']
- assert redis.client._server == [loc], (redis.client._server, config)
- redis.clear()
-
- def teardown(self):
- if self.should_skip_test_setup():
- return
-
- self.blob_db.close()
-
- self.delete_elastic_indexes()
-
- if self.skip_teardown_for_reuse_db:
- return
-
- self.delete_couch_databases()
- self.clear_redis()
-
- # HACK clean up leaked database connections
- from corehq.sql_db.connections import connection_manager
- connection_manager.dispose_all()
-
- # in case this was set before we want to remove it now
- self.runner.keepdb = False
-
- # tear down in reverse order
- self.old_names = reversed(self.old_names)
- super(HqdbContext, self).teardown()
-
-
-def print_imports_until_thread_change():
- """Print imports until the current thread changes
-
- This is useful for troubleshooting premature test runner exit
- (often caused by an import when running tests --with-doctest).
- """
- main = threading.current_thread()
- print("setting up import hook on %s" % main, file=sys.__stdout__)
-
- class InfoImporter(object):
-
- def find_module(self, name, path=None):
- thread = threading.current_thread()
- # add code here to check for other things happening on import
- #if name == 'gevent':
- # sys.exit()
- print("%s %s" % (thread, name), file=sys.__stdout__)
- if thread is not main:
- sys.exit()
- return None
-
- # Register the import hook. See https://www.python.org/dev/peps/pep-0302/
- sys.meta_path.append(InfoImporter())
-
-
-@nottest
-@unit_testing_only
-def get_all_test_dbs():
- all_dbs = list(couch_config.all_dbs_by_db_name.values())
- for db in all_dbs:
- if '/test_' not in db.uri:
- raise ValueError("not a test db url: db=%s url=%r" % (db.dbname, db.uri))
- return all_dbs
-
-
-@unit_testing_only
-def flush_databases():
- """
- Best effort at emptying all documents from all databases.
- Useful when you break a test and it doesn't clean up properly. This took
- about 5 seconds to run when trying it out.
- """
- print("Flushing test databases, check yourself before you wreck yourself!", file=sys.__stdout__)
- for db in get_all_test_dbs():
- try:
- db.flush()
- except (ResourceNotFound, HTTPError):
- pass
- call_command('flush', interactive=False)
- bootstrap_migrated_db_state()
-
-
-@unit_testing_only
-def bootstrap_migrated_db_state():
- from corehq.apps.accounting.tests.generator import bootstrap_accounting
- from corehq.apps.smsbillables.tests.utils import bootstrap_smsbillables
- bootstrap_accounting()
- bootstrap_smsbillables()
-
-
-if os.environ.get("HQ_TESTS_PRINT_IMPORTS"):
- print_imports_until_thread_change()
diff --git a/corehq/tests/nosecompat.py b/corehq/tests/nosecompat.py
new file mode 100644
index 000000000000..552b31647e0e
--- /dev/null
+++ b/corehq/tests/nosecompat.py
@@ -0,0 +1,34 @@
+import sys
+
+from testil import assert_raises as _assert_raises
+
+from .tools import nottest as nottest_tool
+
+
+def create_nose_virtual_package():
+ sys.modules['nose.tools'] = VirtualNose.tools
+
+
+class VirtualNose:
+ """Legacy namespace for tests written before pytest"""
+ class tools:
+ nottest = nottest_tool
+ assert_raises = _assert_raises
+
+ def assert_equal(actual, expected):
+ assert actual == expected
+
+ def assert_false(value):
+ assert not value
+
+ def assert_true(value):
+ assert value
+
+ def assert_in(needle, haystack):
+ assert needle in haystack
+
+ def assert_list_equal(actual, expected):
+ assert actual == expected
+
+ def assert_is_none(value):
+ assert value is None
diff --git a/corehq/tests/noseplugins/__init__.py b/corehq/tests/noseplugins/__init__.py
deleted file mode 100644
index 26089201516b..000000000000
--- a/corehq/tests/noseplugins/__init__.py
+++ /dev/null
@@ -1 +0,0 @@
-import corehq.tests.pytest_compat # noqa: F401
diff --git a/corehq/tests/noseplugins/classcleanup.py b/corehq/tests/noseplugins/classcleanup.py
deleted file mode 100644
index 9363efd02bed..000000000000
--- a/corehq/tests/noseplugins/classcleanup.py
+++ /dev/null
@@ -1,57 +0,0 @@
-"""Call TestCase.doClassCleanups after each test"""
-import sys
-from traceback import print_exception
-
-from nose.plugins import Plugin
-
-
-class ClassCleanupPlugin(Plugin):
- """Call TestCase.doClassCleanups after running tests on a test class
-
- Nose overrides the part of the default Python test suite runner
- that normally calls TestCase.doClassCleanups(). This plugin ensures
- that it gets called.
- """
-
- name = "classcleanup"
- enabled = True
-
- def options(self, parser, env):
- """Do not call super (always enabled)"""
-
- def begin(self):
- self.patch_django_test_case()
-
- def handleError(self, test, err):
- if getattr(test, "error_context", None) in {"setup", "teardown"}:
- self._do_class_cleanups(test.context)
-
- def stopContext(self, context):
- self._do_class_cleanups(context)
-
- def _do_class_cleanups(self, context):
- cleanup = getattr(context, "doClassCleanups", None)
- if cleanup is not None:
- cleanup()
- errors = getattr(context, "tearDown_exceptions", None)
- if errors:
- if len(errors) > 1:
- num = len(errors)
- for n, (exc_type, exc, tb) in enumerate(errors[:-1], start=1):
- print(f"\nclass cleanup error ({n} of {num}):", file=sys.stderr)
- print_exception(exc_type, exc, tb)
- raise errors[-1][1]
-
- def patch_django_test_case(self):
- """Do class cleanups before TestCase transaction rollback"""
- from django.test import TestCase
-
- @classmethod
- def tearDownClass(cls):
- try:
- self._do_class_cleanups(cls)
- finally:
- real_tearDownClass(cls)
-
- real_tearDownClass = TestCase.tearDownClass.__func__
- TestCase.tearDownClass = tearDownClass
diff --git a/corehq/tests/noseplugins/cmdline_params.py b/corehq/tests/noseplugins/cmdline_params.py
deleted file mode 100644
index 20673f7fa272..000000000000
--- a/corehq/tests/noseplugins/cmdline_params.py
+++ /dev/null
@@ -1,43 +0,0 @@
-"""
-A plugin to accept parameters used for various test runner operations.
-"""
-from nose.plugins import Plugin
-
-REUSE_DB_HELP = """
-To be used in conjunction with the environment variable REUSE_DB=1.
-reset: Drop existing test dbs, then create and migrate new ones, but do not
- teardown after running tests. This is convenient when the existing databases
- are outdated and need to be rebuilt.
-flush: Flush all objects from the old test databases before running tests.
- Much faster than `reset`. Also runs `bootstrap` (see below).
-bootstrap: Restore database state such as software plan versions and currencies
- initialized by database migrations. Sometimes when running tests with
- REUSE_DB=1 this state is lost, causing tests that depend on it to fail.
-migrate: Migrate the test databases before running tests.
-teardown: Skip database setup; do normal teardown after running tests.
-"""
-
-
-class CmdLineParametersPlugin(Plugin):
- """Accept and store misc test runner flags for later reference
- """
- name = "cmdline-parameters"
- enabled = True
- parameters = {}
-
- def options(self, parser, env):
- parser.add_option(
- '--reusedb',
- default=None,
- choices=['reset', 'flush', 'bootstrap', 'migrate', 'teardown'],
- help=REUSE_DB_HELP,
- )
- # --collect-only is a built-in option, adding it here causes a warning
-
- def configure(self, options, conf):
- for option in ['reusedb', 'collect_only']:
- type(self).parameters[option] = getattr(options, option)
-
- @classmethod
- def get(cls, parameter):
- return cls.parameters[parameter]
diff --git a/corehq/tests/noseplugins/dbtransaction.py b/corehq/tests/noseplugins/dbtransaction.py
deleted file mode 100644
index e645e107d3ff..000000000000
--- a/corehq/tests/noseplugins/dbtransaction.py
+++ /dev/null
@@ -1,39 +0,0 @@
-from nose.plugins import Plugin
-
-from django.db import connection
-
-
-class DatabaseTransactionPlugin(Plugin):
- """Verify database transaction not in progress before/after test context
-
- A "test context" is a package, module, or test class.
- """
-
- name = "dbtransaction"
- enabled = True
-
- def options(self, parser, env):
- """Do not call super (always enabled)"""
-
- def startContext(self, context):
- check_for_transaction(context)
-
- def stopContext(self, context):
- check_for_transaction(context)
-
- def handleError(self, test, err):
- if getattr(test, "error_context", None) in {"setup", "teardown"}:
- check_for_transaction(err[1])
-
-
-def check_for_transaction(context):
- if connection.in_atomic_block:
- raise UnexpectedTransaction(
- "Was an exception raised in setUpClass() after super().setUpClass() "
- "or in tearDownClass() before super().tearDownClass()? "
- f"Context: {context}"
- )
-
-
-class UnexpectedTransaction(Exception):
- pass
diff --git a/corehq/tests/noseplugins/debug.py b/corehq/tests/noseplugins/debug.py
deleted file mode 100644
index 1cc03b7abad8..000000000000
--- a/corehq/tests/noseplugins/debug.py
+++ /dev/null
@@ -1,57 +0,0 @@
-"""A temporary plugin for debugging tests
-
-This is useful for finding tests that do not cleanup after themselves.
-
-Usage:
-
-- Uncomment 'corehq.tests.noseplugins.debug.DebugPlugin' in testsettings.py
-- Customize DebugPlugin below to inspect state.
-
-Tips:
-
-- Write to `sys.__stdout__` to bypass stdout and logging collector.
-- `afterContext` is run at test collection time, not after teardown.
-- Plugin interface:
- https://nose.readthedocs.org/en/latest/plugins/interface.html
-"""
-import sys
-from nose.plugins import Plugin
-
-
-class DebugPlugin(Plugin):
- """Temporary debugging plugin"""
-
- name = "debug"
- enabled = True
-
- def options(self, parser, env):
- """Avoid adding a ``--with`` option for this plugin."""
-
- def configure(self, options, conf):
- """Do not call super (always enabled)"""
-
-# def prepareTestCase(self, case):
-# from custom.ewsghana.models import FacilityInCharge
-# def audit(result):
-# try:
-# case.test(result)
-# finally:
-# sys.__stdout__.write("{}: {}\n".format(
-# case.test,
-# [f.id for f in FacilityInCharge.objects.all()],
-# ))
-# return audit
-
- def stopContext(self, context):
- from django.contrib.auth.models import User
- num = User.objects.filter(username='user1').count()
- if num:
- sys.__stdout__.write("\n{} {}\n".format(num, context))
-
-# def wantFunction(self, func):
-# """Do not want 'test' functions with required args"""
-# import inspect
-# if "test" in func.__name__ and getattr(func, '__test__', True):
-# spec = inspect.getargspec(func)
-# return len(spec.args) <= len(spec.defaults or [])
-# return None
diff --git a/corehq/tests/noseplugins/dividedwerun.py b/corehq/tests/noseplugins/dividedwerun.py
deleted file mode 100644
index f0d368875366..000000000000
--- a/corehq/tests/noseplugins/dividedwerun.py
+++ /dev/null
@@ -1,172 +0,0 @@
-import logging
-import types
-from hashlib import md5
-from unittest import SkipTest
-
-from nose.case import Test, FunctionTestCase
-from nose.failure import Failure
-from nose.plugins import Plugin
-from nose.suite import ContextSuite
-from nose.tools import nottest
-
-
-log = logging.getLogger(__name__)
-
-
-class DividedWeRunPlugin(Plugin):
- """Run a predictably random subset of tests
-
- Warning: this plugin is not compatible with other plugins that return
- something other than a `ContextSuite` from `prepareTest`.
- """
-
- name = "divided-we-run"
-
- def options(self, parser, env):
- # Do not call super to avoid adding a ``--with`` option for this plugin
- parser.add_option('--divided-we-run',
- default=env.get('NOSE_DIVIDED_WE_RUN'),
- help="Run a predictably random subset of tests based "
- "on test name. The value of this option should "
- "be one or two hexadecimal digits denoting the "
- "first and last bucket to include, where each "
- "bucket is a predictably random hex digit based "
- "on the test (module) path. "
- "[NOSE_DIVIDED_WE_RUN]")
- parser.add_option('--divide-depth',
- default=env.get('NOSE_DIVIDE_DEPTH', '0'),
- help="Number of suite contexts to descend into when "
- "dividing tests. [NOSE_DIVIDE_DEPTH]")
-
- def configure(self, options, conf):
- if options.divided_we_run:
- self.enabled = True
- if len(options.divided_we_run) not in [1, 2]:
- raise ValueError("invalid divided-we-run value: "
- "expected 1 or 2 hexadecimal digits")
- self.divided_we_run = options.divided_we_run
- self.first_bucket = options.divided_we_run[0]
- self.last_bucket = options.divided_we_run[-1]
- self.divide_depth = int(options.divide_depth)
- if int(self.first_bucket, 16) > int(self.last_bucket, 16):
- raise ValueError(
- "divided-we-run range start is after range end")
-
- def prepareTest(self, test):
- return self.skip_out_of_range_tests(test)
-
- def skip_out_of_range_tests(self, test, depth=0):
- if isinstance(test, ContextSuite):
- if depth >= self.divide_depth:
- return self.maybe_skip(test)
- depth += 1 if test.implementsAnyFixture(test.context, None) else 0
- test._tests = [self.skip_out_of_range_tests(case, depth)
- for case in test]
- else:
- test = self.maybe_skip(test)
- return test
-
- def maybe_skip(self, test):
- bucket = get_score(test)
- log.debug("%s divided-we-run=%s bucket=%s",
- name_of(test), self.divided_we_run, bucket)
- if bucket < self.first_bucket or bucket > self.last_bucket:
- def skip():
- raise SkipTest("divided-we-run: {} not in range {}".format(
- bucket, self.divided_we_run))
- if isinstance(test, ContextSuite):
- desc = get_descriptor(test)
- elif isinstance(test.test, Failure):
- return get_descriptive_failing_test(test.test)
- elif test.test.descriptor is None:
- desc = get_descriptor(test)
- else:
- desc = test.test.descriptor
- return Test(
- FunctionTestCase(skip, descriptor=desc),
- getattr(test, 'config', None),
- getattr(test, 'resultProxy', None),
- )
- return test
-
-
-def name_of(test):
- """Returns the full name of the test as a string."""
- if not isinstance(test, ContextSuite):
- return str(test)
- context = test.context
- if isinstance(context, type) or isinstance(context, types.FunctionType):
- return context.__module__ + ":" + context.__name__
- if isinstance(context, types.ModuleType):
- return context.__name__
- return str(context)
-
-
-def get_score(test):
- """Returns the score for a test, which is derived from the MD5 hex digest
- of the test's (possibly truncated) name.
-
- Calls ``name_of(test)`` to acquire the "full name", then truncates that
- value at the first occurrence of an open-parenthesis character (or the
- entire name if none exist) before generating the MD5 digest.
-
- Example:
-
- .. code-block:: python
-
- >>> name_of(test_this)
- 'module.test_func()'
- >>> name_of(test_other)
- 'module.test_func()'
- >>> md5(name_of(test_this)).hexdigest()
- '45fd9a647841b1e65633f332ee5f759b'
- >>> md5(name_of(test_other)).hexdigest()
- 'acf7e690fb7d940bfefec1d06392ee17'
- >>> get_score(test_this)
- 'c'
- >>> get_score(test_other)
- 'c'
- """
- runtime_safe = name_of(test).split("(", 1)[0]
- return md5(runtime_safe.encode('utf-8')).hexdigest()[0]
-
-
-def get_descriptor(test):
- def descriptor():
- raise Exception("unexpected call")
- if hasattr(test.context, "__module__"):
- return test.context
- name = test.context.__name__
- if "." in name:
- name, descriptor.__name__ = name.rsplit(".", 1)
- else:
- descriptor.__name__ = "*"
- descriptor.__module__ = name
- return descriptor
-
-
-@nottest
-def get_descriptive_failing_test(failure_obj):
- """Get descriptive test from failure object
-
- Useful for extracting descriptive details from a test failure that
- occurs during test collection. This can happen when test generator
- function raises an exception such as SkipTest, for example.
- """
- def fail():
- raise failure_obj.exc_val
- tb = failure_obj.tb
- while tb.tb_next is not None:
- tb = tb.tb_next
- frame = tb.tb_frame
- try:
- descriptor = frame.f_globals[frame.f_code.co_name]
- except KeyError:
- def descriptor():
- raise Exception("unexpected call")
- descriptor.__name__ = str(frame)
- return Test(
- FunctionTestCase(fail, descriptor=descriptor),
- getattr(failure_obj, 'config', None),
- getattr(failure_obj, 'resultProxy', None),
- )
diff --git a/corehq/tests/noseplugins/djangomigrations.py b/corehq/tests/noseplugins/djangomigrations.py
deleted file mode 100644
index cec384402e6e..000000000000
--- a/corehq/tests/noseplugins/djangomigrations.py
+++ /dev/null
@@ -1,37 +0,0 @@
-"""A plugin to disable django database migrations (saves a lot of time)
-
-Use --no-migrations to disable django database migrations.
-"""
-from nose.plugins import Plugin
-
-
-class DjangoMigrationsPlugin(Plugin):
- """Run tests without Django database migrations."""
-
- # Inspired by https://gist.github.com/NotSqrt/5f3c76cd15e40ef62d09
- # See also https://github.com/henriquebastos/django-test-without-migrations
-
- name = 'django-migrations'
- enabled = True
-
- def options(self, parser, env):
- # Do not call super to avoid adding a ``--with`` option for this plugin
- parser.add_option('--no-migrations', action='store_true',
- dest='no_migrations',
- default=env.get('NOSE_DISABLE_DJANGO_MIGRATIONS'),
- help='Disable Django database migrations to save a '
- 'lot of time. [NOSE_DISABLE_DJANGO_MIGRATIONS]')
-
- def configure(self, options, conf):
- if options.no_migrations:
- from django.conf import settings
- settings.MIGRATION_MODULES = DisableMigrations()
-
-
-class DisableMigrations(object):
-
- def __contains__(self, item):
- return True
-
- def __getitem__(self, item):
- return "notmigrations"
diff --git a/corehq/tests/noseplugins/elasticsnitch.py b/corehq/tests/noseplugins/elasticsnitch.py
deleted file mode 100644
index c38d70c9f17e..000000000000
--- a/corehq/tests/noseplugins/elasticsnitch.py
+++ /dev/null
@@ -1,69 +0,0 @@
-"""A test timing plugin for nose
-
-Usage: ./manage.py test --with-elasticsnitch --snitch-out=/path/to/elasticsnitch.txt
-"""
-import sys
-
-from nose.plugins import Plugin
-from corehq.tests.noseplugins.uniformresult import uniform_description
-from corehq.apps.es.client import manager
-
-
-class ElasticSnitchPlugin(Plugin):
- """A plugin to snitch on tests that change (add or delete) Elasticsearch
- indexes without cleaning up after themselves (putting the index "state"
- back how they found it).
- """
-
- name = "elasticsnitch"
-
- def options(self, parser, env):
- """Register commandline options."""
- super().options(parser, env)
- parser.add_option("--snitch-out", action="store", metavar="FILE",
- help="Snitch output file (default: STDOUT)")
-
- def configure(self, options, conf):
- """Configure plugin."""
- super().configure(options, conf)
- self.conf = conf
- self.snitch_out = options.snitch_out
- self.prefix = "" if self.snitch_out else f"{self.name}: "
-
- def begin(self):
- self.output = (open(self.snitch_out, "w", encoding="utf-8")
- if self.snitch_out else sys.__stdout__)
-
- def finalize(self, result):
- if self.output is not None and self.output is not sys.__stdout__:
- self.output.close()
-
- def startTest(self, case):
- """Make a record of existing index names.
-
- Called prior to test run:
- - after ``case.setUpClass()``
- - before ``case.setUp()``
- """
- self.start_indexes = get_all_index_names()
-
- def stopTest(self, case):
- """Compare existing index names against pre-test ones. If there are
- differences, write the delta.
-
- Called on test completion:
- - after: case.tearDown()
- - before: case.tearDownClass()
- """
- name = uniform_description(case.test)
- end_indexes = get_all_index_names()
- if not end_indexes ^ self.start_indexes: # XOR
- return
- added = end_indexes - self.start_indexes
- removed = self.start_indexes - end_indexes
- self.output.write(f"{self.prefix}{name}: +{sorted(added)}, -{sorted(removed)}\n")
-
-
-def get_all_index_names():
- """Returns a set of all existing Elasticsearch index names."""
- return set(manager.get_indices())
diff --git a/corehq/tests/noseplugins/logfile.py b/corehq/tests/noseplugins/logfile.py
deleted file mode 100644
index 5c072e801455..000000000000
--- a/corehq/tests/noseplugins/logfile.py
+++ /dev/null
@@ -1,73 +0,0 @@
-"""A plugin to log test failures to a file
-
-This is useful to preserve error output from a test run in a file in
-additon to displaying the output in a terminal. It is also possible to
-to view errors (in the log file) as soon as they occur while running
-very large test suites.
-
-The log file will not be overwritten if the test run completes with no
-errors or failures.
-
-Usage:
-
- ./manage.py test --log-file=test-failures.log
-"""
-import datetime
-import os
-import sys
-try:
- from shlex import quote # py3
-except ImportError:
- from pipes import quote # py2
-from unittest.runner import TextTestResult, _WritelnDecorator
-from nose.plugins import Plugin
-
-
-class LogFilePlugin(Plugin):
- """Log test failures to file"""
-
- name = "log-file"
-
- def options(self, parser, env):
- # Do not call super to avoid adding a ``--with`` option for this plugin
- parser.add_option('--log-file',
- default=env.get('NOSE_LOG_FILE'),
- help="File in which to log test failures. "
- "[NOSE_LOG_FILE]")
-
- def configure(self, options, conf):
- if options.log_file:
- self.enabled = True
- self.log_path = os.path.expanduser(options.log_file)
- self.log_file = None
- self.argv = sys.argv
- self.start = datetime.datetime.now()
-
- def setup_log(self):
- self.log_file = _WritelnDecorator(open(self.log_path, "w"))
- self.log_file.writeln(" ".join(quote(a) for a in self.argv))
- self.log_file.writeln(str(self.start))
- self.result = TextTestResult(self.log_file, True, 0)
-
- def log(self, label, test, err):
- if self.log_file is None:
- self.setup_log()
- if isinstance(err[1], str):
- # Turn value back into an Exception (required in Python 3.x).
- # https://github.com/nose-devs/nose/blob/7c26ad1e6b/nose/proxy.py#L90-L95
- value = type(err[0].__name__, (Exception,), {})(err[1])
- err = (err[0], value, err[2])
- err_string = self.result._exc_info_to_string(err, test)
- self.result.printErrorList(label, [(test, err_string)])
- self.log_file.flush()
-
- def addError(self, test, err):
- self.log("ERROR", test, err)
-
- def addFailure(self, test, err):
- self.log("FAIL", test, err)
-
- def finalize(self, result):
- if self.log_file is not None:
- self.log_file.writeln(str(datetime.datetime.now()))
- self.log_file.close()
diff --git a/corehq/tests/noseplugins/output.py b/corehq/tests/noseplugins/output.py
deleted file mode 100644
index f27ca9435f22..000000000000
--- a/corehq/tests/noseplugins/output.py
+++ /dev/null
@@ -1,44 +0,0 @@
-"""Print collected test output for passing tests
-
-Usage: ./manage.py test --with-output
-"""
-from nose.plugins import Plugin
-from nose.plugins.capture import Capture
-from nose.plugins.logcapture import LogCapture
-
-from corehq.tests.noseplugins.uniformresult import uniform_description
-
-
-class OutputPlugin(Plugin):
- """Print collected test output for passing tests"""
-
- name = "output"
-
- def configure(self, options, conf):
- super(OutputPlugin, self).configure(options, conf)
- if self.enabled:
- self.output = []
- # monkey-patch plugins to grab captured output
- Capture.addSuccess = addSuccess
- LogCapture.addSuccess = addSuccess
-
- def startTest(self, case):
- case.__output = []
-
- def stopTest(self, case):
- if case.__output:
- name = uniform_description(case.test)
- self.output.extend(["=" * 70, "PASS: " + name])
- self.output.extend(case.__output)
-
- def report(self, stream):
- for line in self.output:
- stream.writeln(line)
-
-
-def addSuccess(self, test):
- err = (None, None, None)
- output = self.formatError(test, err)
- if output is not err:
- output = output[1].split('\n', 1)[1]
- test._OutputPlugin__output.append(output)
diff --git a/corehq/tests/noseplugins/patches.py b/corehq/tests/noseplugins/patches.py
deleted file mode 100644
index 480c2cc9f50c..000000000000
--- a/corehq/tests/noseplugins/patches.py
+++ /dev/null
@@ -1,38 +0,0 @@
-from nose.plugins import Plugin
-
-from corehq.apps.domain.tests.test_utils import patch_domain_deletion
-from corehq.form_processor.tests.utils import patch_testcase_databases
-from corehq.util.es.testing import patch_es_user_signals
-from corehq.util.test_utils import patch_foreign_value_caches
-
-
-class PatchesPlugin(Plugin):
- """Patches various things before tests are run"""
- name = "patches"
- enabled = True
-
- def options(self, parser, env):
- """Do not call super (always enabled)"""
-
- def begin(self):
- patch_assertItemsEqual()
- patch_testcase_databases()
- extend_freezegun_ignore_list()
- patch_es_user_signals()
- patch_foreign_value_caches()
- patch_domain_deletion()
-
-
-def patch_assertItemsEqual():
- import unittest
- unittest.TestCase.assertItemsEqual = unittest.TestCase.assertCountEqual
-
-
-GLOBAL_FREEZEGUN_IGNORE_LIST = ["kafka."]
-
-
-def extend_freezegun_ignore_list():
- """Extend the freezegun ignore list"""
- import freezegun
-
- freezegun.configure(extend_ignore_list=GLOBAL_FREEZEGUN_IGNORE_LIST)
diff --git a/corehq/tests/noseplugins/timing.py b/corehq/tests/noseplugins/timing.py
deleted file mode 100644
index 50ba978cd500..000000000000
--- a/corehq/tests/noseplugins/timing.py
+++ /dev/null
@@ -1,195 +0,0 @@
-"""A test timing plugin for nose
-
-Usage: ./manage.py test --with-timing --timing-file=/path/to/timing.csv
-"""
-import csv
-import sys
-import time
-from functools import partial
-from unittest.mock import patch
-
-from nose.plugins import Plugin
-from nose.tools import nottest
-from corehq.tests.noseplugins.uniformresult import uniform_description
-
-
-class TimingPlugin(Plugin):
- """A plugin to measure times of testing events
-
- Measure elapsed time before setup, during setup, during test, and
- during teardown events. Outputs the results as CSV.
- """
- name = "timing"
-
- def options(self, parser, env):
- """Register commandline options.
- """
- super(TimingPlugin, self).options(parser, env)
- parser.add_option('--timing-file', action='store',
- dest='timing_file',
- metavar="FILE",
- default=env.get('NOSE_TIMING_FILE'),
- help='Timing output file (CSV); default is STDOUT')
- parser.add_option('--pretty-timing', action='store_true',
- dest='pretty_output',
- default=env.get('NOSE_PRETTY_TIMING'),
- help='Print timing info in a format that is better '
- 'for reviewing in text mode (not CSV).')
- parser.add_option('--threshold', type=int,
- default=env.get('NOSE_TIMING_THRESHOLD'),
- help='Only print timing info for events above this threshold (seconds).')
- parser.add_option('--max-test-time', type=float, dest="max_test_time",
- default=env.get('NOSE_TIMING_MAX_TEST_TIME'),
- help='Fail test if it runs for longer than this limit (seconds). '
- 'Use `corehq.util.test_utils.timelimit` to '
- 'override the time limit for individual tests.')
-
- def configure(self, options, conf):
- """Configure plugin.
- """
- super(TimingPlugin, self).configure(options, conf)
- self.conf = conf
- self.timing_file = options.timing_file
- self.pretty_output = options.pretty_output
- self.threshold = options.threshold
- self.max_test_time = options.max_test_time
- if self.max_test_time is not None:
- assert self.max_test_time > (self.threshold or 0), \
- "--max-test-time must be greater than --threshold"
- print(f"max test time: {self.max_test_time}")
-
- def begin(self):
- self.output = (open(self.timing_file, "w", encoding='utf-8')
- if self.timing_file else sys.__stdout__)
- if not self.pretty_output:
- self.csv = csv.writer(self.output)
- self.csv.writerow(["event", "name", "elapsed time", "start time"])
- self.event_start = time.time()
- global PLUGIN_INSTANCE
- self.old_plugin_instance = PLUGIN_INSTANCE
- PLUGIN_INSTANCE = self
-
- def finalize(self, result):
- if self.output is not None and self.output is not sys.__stdout__:
- self.output.close()
- global PLUGIN_INSTANCE
- PLUGIN_INSTANCE = self.old_plugin_instance
-
- def end_event(self, event, context):
- now = time.time()
- name = uniform_description(context)
- duration = now - self.event_start
- if self.threshold and duration < self.threshold:
- self.event_start = now
- return
-
- if self.pretty_output:
- self.output.write("{time:>-6,.2f} {event} {name}\n".format(
- event=event,
- name=name,
- time=duration,
- ))
- else:
- self.csv.writerow([
- event,
- name,
- duration,
- self.event_start,
- ])
- self.event_start = now
-
- if self.max_test_time is not None:
- limit = max(self.__dict__.pop("time_limit", 0), self.max_test_time)
- if duration > limit:
- raise AssertionError(f"{event} time limit ({limit}) exceeded: {duration}")
-
- def startContext(self, context):
- # called before context setup
- self.end_event("before", context)
-
- def startTest(self, case):
- # called before test is started
- self.end_event("setup", case.test)
-
- def stopTest(self, case):
- # called on test completion
- self.end_event("run", case.test)
-
- def stopContext(self, context):
- # called after context teardown
- self.end_event("teardown", context)
-
-
-class FakePlugin:
- """Allow (no-op) plugin manipulation while plugin is inactive"""
- enabled = False
- max_test_time = None
-
- def end_event(name, context):
- pass
-
-
-PLUGIN_INSTANCE = FakePlugin
-
-
-def end_event(name, context):
- """Signal the end of a custom timing event
-
- Use to add arbitrary "events" anywhere in the code to isolate
- sources of slowness during profiling. This function terminates the
- given event name and immediately begins the next (as yet unnamed)
- event. Requires the `TimingPlugin` must to be enabled.
- """
- PLUGIN_INSTANCE.end_event(name, context)
-
-
-def add_time_limit(limit):
- """Add time limit on current test event
-
- Extend the existing limit if a limit is already set for the current phase.
-
- :param limit: Number of seconds.
- """
- plugin = PLUGIN_INSTANCE
- if plugin.enabled and plugin.max_test_time is not None:
- plugin.time_limit = getattr(plugin, "time_limit", 0) + limit
-
-
-@nottest
-def patch_max_test_time(limit):
- """Temporarily override test time limit (--max-test-time)
-
- Note: this is only useful when spanning multiple test events because
- the limit must be present at the _end_ of a test event to take
- effect. Therefore it will do nothing if used within the context of a
- single test. It also does not affect the time limit on the final
- teardown fixture (in which the patch is removed).
- """
- patch_obj = patch(f"{__name__}.PLUGIN_INSTANCE.max_test_time", limit)
- patch_obj.decorate_class = partial(apply_fixture_patch, patch_obj)
- return patch_obj
-
-
-def apply_fixture_patch(patch_obj, cls):
- """Apply patch on setup class and remove on teardown class
-
- A `patch` is normally applied to a class by decorating each of the
- class's methods, which means that the patch is not in place between
- method calls. This applies the patch on `setUpClass` and removes it
- on `tearDownClass`.
- """
- def setUpClass():
- patch_obj.start()
- real_setup()
-
- def tearDownClass():
- try:
- real_teardown()
- finally:
- patch_obj.stop()
-
- real_setup = cls.setUpClass
- real_teardown = cls.tearDownClass
- cls.setUpClass = setUpClass
- cls.tearDownClass = tearDownClass
- return cls
diff --git a/corehq/tests/noseplugins/uniformresult.py b/corehq/tests/noseplugins/uniformresult.py
deleted file mode 100644
index 55361357c530..000000000000
--- a/corehq/tests/noseplugins/uniformresult.py
+++ /dev/null
@@ -1,56 +0,0 @@
-r"""A plugin to format test names uniformly for easy comparison
-
-Usage:
-
- # collect django tests
- COLLECT_ONLY=1 ./manage.py test -v2 --settings=settings 2> tests-django.txt
-
- # collect nose tests
- ./manage.py test -v2 --collect-only 2> tests-nose.txt
-
- # clean up django test output: s/skipped\ \'.*\'$/ok/
- # sort each output file
- # diff tests-django.txt tests-nose.txt
-"""
-from inspect import isfunction
-from types import ModuleType
-
-from nose.case import FunctionTestCase
-from nose.plugins import Plugin
-
-
-def uniform_description(test):
- if type(test).__name__ == "DocTestCase":
- return test._dt_test.name
- if isinstance(test, ModuleType):
- return test.__name__
- if isinstance(test, type) or isfunction(test):
- return "%s:%s" % (test.__module__, test.__name__)
- if isinstance(test, FunctionTestCase):
- descriptor = test.descriptor or test.test
- return "%s:%s %s" % (
- descriptor.__module__,
- descriptor.__name__,
- test.arg,
- )
- name = "%s:%s.%s" % (
- test.__module__,
- type(test).__name__,
- test._testMethodName
- )
- return name
- #return sys.modules[test.__module__].__file__
-
-
-class UniformTestResultPlugin(Plugin):
- """Format test descriptions for easy comparison
- """
-
- name = "uniform-results"
- enabled = True
-
- def configure(self, options, conf):
- """Do not call super (always enabled)"""
-
- def describeTest(self, test):
- return uniform_description(test.test)
diff --git a/corehq/tests/pytest_compat.py b/corehq/tests/pytest_compat.py
deleted file mode 100644
index 94ed0ae88625..000000000000
--- a/corehq/tests/pytest_compat.py
+++ /dev/null
@@ -1,43 +0,0 @@
-import sys
-from functools import wraps
-
-
-def _install_pytest_compat():
- assert 'pytest' not in sys.modules, "Already installed or a real pytest is present"
- sys.modules['pytest'] = sys.modules[__name__]
-
-
-class Marker:
-
- def __getattr__(self, name):
- def set_attribute(test_obj):
- setattr(test_obj, name, True)
- return test_obj
- return set_attribute
-
- def parametrize(self, arg_names, args_tuples):
- def parametrize(func):
- @wraps(func)
- def test():
- test_tuple = (func,)
- test = (func,)
- for args in args_tuples:
- if "," not in arg_names and (
- not isinstance(args, tuple)
- or len(args) != 1
- ):
- args = (args,)
- elif isinstance(args, list) and len(args) == arg_names.count(",") + 1:
- args = tuple(args)
- try:
- test_tuple = test + args
- except TypeError:
- raise TypeError(f"could not construct test tuple for {func} with args {args!r}")
- yield test_tuple
- return test
- return parametrize
-
-
-mark = Marker()
-
-_install_pytest_compat()
diff --git a/corehq/tests/pytest_hooks.py b/corehq/tests/pytest_hooks.py
new file mode 100644
index 000000000000..24c3b87529f1
--- /dev/null
+++ b/corehq/tests/pytest_hooks.py
@@ -0,0 +1,86 @@
+import os
+from pathlib import Path
+
+import pytest
+from unmagic import fence
+
+from ddtrace.internal.module import ModuleWatchdog
+if ModuleWatchdog.is_installed():
+ # Remove ddtrace cruft from tracebacks. ModuleWatchdog is installed
+ # unconditionally when ddtrace is imported, which happens early
+ # in pytest startup because of ddtrace's pytest11 entry point(s).
+ ModuleWatchdog.uninstall()
+
+pytest_plugins = [
+ 'unmagic',
+ 'corehq.tests.pytest_plugins.dividedwerun',
+ 'corehq.tests.pytest_plugins.timelimit',
+ 'corehq.tests.pytest_plugins.patches',
+ 'corehq.tests.pytest_plugins.redislocks',
+ 'corehq.tests.pytest_plugins.reusedb',
+]
+
+
+@pytest.hookimpl(tryfirst=True)
+def pytest_load_initial_conftests():
+ assert not hasattr(pytest_load_initial_conftests, 'loaded'), "Already loaded"
+ pytest_load_initial_conftests.loaded = True
+ os.environ.setdefault('CCHQ_TESTING', '1')
+
+ from manage import init_hq_python_path, run_patches
+ init_hq_python_path()
+ run_patches()
+
+ from corehq.warnings import configure_warnings
+ configure_warnings(is_testing=True)
+
+ from .nosecompat import create_nose_virtual_package
+ create_nose_virtual_package()
+
+
+def pytest_pycollect_makeitem(collector, name, obj):
+ """Fail on common mistake that results in masked tests"""
+ if (
+ "Test" in name
+ and not isinstance(obj, type)
+ and isinstance(wrapped := _get_wrapped(obj), type)
+ and any(n.startswith("test_") for n in dir(wrapped))
+ ):
+ return pytest.fail(
+ f"{obj.__module__}.{name} appears to be a test class that has "
+ "been wrapped with a decorator that masks its tests."
+ )
+ return None
+
+
+def pytest_report_teststatus(report, config):
+ """Show skipped reason when verbosity >= 2 (-vv)"""
+ if report.when in ("setup", "teardown") and report.skipped:
+ if config.get_verbosity() >= 2:
+ reason = report.longrepr[-1].removeprefix("Skipped: ")
+ return "skipped", "s", f"SKIPPED {reason}"
+ return None
+
+
+def _get_wrapped(obj):
+ while hasattr(obj, "__wrapped__"):
+ obj = obj.__wrapped__
+ return obj
+
+
+def _dirset(path):
+ return {p.name for p in path.iterdir() if p.is_dir()}
+
+
+_ROOT = Path(__file__).parent.parent.parent
+fence.install({
+ "corehq",
+ "couchdbkit_aggregate",
+ "django_digest",
+ "langcodes",
+ "no_exceptions",
+ "python_digest",
+ "test",
+ "testapps",
+ "xml2json",
+} | _dirset(_ROOT / "custom") | _dirset(_ROOT / "corehq/ex-submodules"))
diff --git a/corehq/tests/pytest_plugins/__init__.py b/corehq/tests/pytest_plugins/__init__.py
new file mode 100644
index 000000000000..e69de29bb2d1
diff --git a/corehq/tests/pytest_plugins/dividedwerun.py b/corehq/tests/pytest_plugins/dividedwerun.py
new file mode 100644
index 000000000000..9d13a8f2eaa4
--- /dev/null
+++ b/corehq/tests/pytest_plugins/dividedwerun.py
@@ -0,0 +1,95 @@
+import os
+from hashlib import md5
+from unittest import TestCase
+
+import pytest
+from django.test import SimpleTestCase
+
+
+def pytest_addoption(parser):
+ parser.addoption('--divided-we-run',
+ default=os.environ.get('DIVIDED_WE_RUN'),
+ help="Run a predictably random subset of tests based "
+ "on test name. The value of this option should "
+ "be one or two hexadecimal digits denoting the "
+ "first and last bucket to include, where each "
+ "bucket is a predictably random hex digit based "
+ "on the fully qualified test name. "
+ "[DIVIDED_WE_RUN]")
+
+
+def pytest_configure(config):
+ config.reuse_db = reusedb = config.getoption("--reusedb")
+ config.skip_setup_for_reuse_db = reusedb and reusedb != "reset"
+ test_range = config.getoption("--divided-we-run")
+ if test_range:
+ if len(test_range) not in [1, 2]:
+ raise ValueError("invalid divided-we-run value: "
+ "expected 1 or 2 hexadecimal digits")
+ config.divided_we_run = test_range
+ first, last = test_range[0], test_range[-1]
+ if int(first, 16) > int(last, 16):
+ raise ValueError("divided-we-run range start is after range end")
+ config.divided_we_run_range = first, last
+
+
+def pytest_collection_modifyitems(config, items):
+ if not hasattr(config, "divided_we_run"):
+ return
+ rng = config.divided_we_run
+ skip = {bucket: pytest.mark.skip(
+ reason=f"divided-we-run: {bucket!r} not in range {rng!r}"
+ ) for bucket in "0123456789abcdef"}
+ first, last = config.divided_we_run_range
+ for item in items:
+ bucket = get_score(item)
+ if bucket < first or bucket > last:
+ item.add_marker(skip[bucket])
+
+
+def name_of(test):
+ if hasattr(test.module, "setup_module"):
+ # group all tests with module-level setup
+ return test.module.__name__
+ if hasattr(test.cls, 'setUpClass'):
+ # group all tests with non-simple setUpClass
+ setupclass = get_setupclass(test.cls)
+ if not (
+ setupclass is get_setupclass(TestCase)
+ or setupclass is get_setupclass(SimpleTestCase)
+ ):
+ return "::".join([test.module.__name__, test.cls.__name__])
+ return test.nodeid
+
+
+def get_setupclass(cls):
+ return cls.setUpClass.__func__
+
+
+def get_score(test):
+ """Returns the score for a test, which is derived from the MD5 hex digest
+ of the test's (possibly truncated) name.
+
+ Calls ``name_of(test)`` to acquire the "full name", then truncates that
+ value at the first occurrence of an open-parenthesis character (or the
+ entire name if none exist) before generating the MD5 digest.
+
+ Example:
+
+ .. code-block:: python
+
+ >>> name_of(test_this)
+ 'module.test_func[]'
+ >>> name_of(test_that)
+ 'module.test_func[]'
+ >>> md5(name_of(test_this)).hexdigest()
+ '45fd9a647841b1e65633f332ee5f759b'
+ >>> md5(name_of(test_that)).hexdigest()
+ 'acf7e690fb7d940bfefec1d06392ee17'
+ >>> get_score(test_this)
+ 'c'
+ >>> get_score(test_that)
+ 'c'
+ """
+ runtime_safe = name_of(test).split("[", 1)[0]
+ return md5(runtime_safe.encode('utf-8')).hexdigest()[0]
diff --git a/corehq/tests/pytest_plugins/patches.py b/corehq/tests/pytest_plugins/patches.py
new file mode 100644
index 000000000000..f312f538d7de
--- /dev/null
+++ b/corehq/tests/pytest_plugins/patches.py
@@ -0,0 +1,75 @@
+import pytest
+
+
+@pytest.hookimpl
+def pytest_sessionstart():
+ from corehq.apps.domain.tests.test_utils import patch_domain_deletion
+ from corehq.form_processor.tests.utils import patch_testcase_databases
+ from corehq.util.es.testing import patch_es_user_signals
+ from corehq.util.test_utils import patch_foreign_value_caches
+
+ patch_unittest_TestCase_doClassCleanup()
+ patch_django_test_case()
+ patch_assertItemsEqual()
+ patch_testcase_databases()
+ extend_freezegun_ignore_list()
+ patch_es_user_signals()
+ patch_foreign_value_caches()
+ patch_domain_deletion()
+
+
+def patch_unittest_TestCase_doClassCleanup():
+ """Raise/print errors caught during class cleanup
+
+ pytest ignores `TestCase.tearDown_exceptions`, which causes them to
+ pass silently. Can be removed once on a version of pytest that
+ includes https://github.com/pytest-dev/pytest/pull/12250
+ """
+
+ @classmethod
+ def doClassCleanupAndRaiseLastError(cls):
+ doClassCleanups()
+ errors = cls.tearDown_exceptions
+ if errors:
+ if len(errors) > 1:
+ num = len(errors)
+ for n, (exc_type, exc, tb) in enumerate(errors[:-1], start=1):
+ print(f"\nclass cleanup error ({n} of {num}):", file=sys.stderr)
+ print_exception(exc_type, exc, tb)
+ raise errors[-1][1]
+
+ import sys
+ from traceback import print_exception
+ from unittest.case import TestCase
+ doClassCleanups = TestCase.doClassCleanups
+ TestCase.doClassCleanups = doClassCleanupAndRaiseLastError
+
+
+def patch_django_test_case():
+ """Do class cleanups before TestCase transaction rollback"""
+ from django.test import TestCase
+
+ @classmethod
+ def tearDownClass(cls):
+ try:
+ cls.doClassCleanups()
+ finally:
+ real_tearDownClass(cls)
+
+ real_tearDownClass = TestCase.tearDownClass.__func__
+ TestCase.tearDownClass = tearDownClass
+
+
+def patch_assertItemsEqual():
+ import unittest
+ unittest.TestCase.assertItemsEqual = unittest.TestCase.assertCountEqual
+
+
+GLOBAL_FREEZEGUN_IGNORE_LIST = ["kafka."]
+
+
+def extend_freezegun_ignore_list():
+ """Extend the freezegun ignore list"""
+ import freezegun
+
+ freezegun.configure(extend_ignore_list=GLOBAL_FREEZEGUN_IGNORE_LIST)
diff --git a/corehq/tests/noseplugins/redislocks.py b/corehq/tests/pytest_plugins/redislocks.py
similarity index 52%
rename from corehq/tests/noseplugins/redislocks.py
rename to corehq/tests/pytest_plugins/redislocks.py
index 8b3cbcce891f..f614368ed512 100644
--- a/corehq/tests/noseplugins/redislocks.py
+++ b/corehq/tests/pytest_plugins/redislocks.py
@@ -1,48 +1,47 @@
+"""A plugin that causes blocking redis locks to error on lock timeout"""
import logging
from datetime import datetime
import attr
-from nose.tools import nottest
-from nose.plugins import Plugin
-
-import dimagi.utils.couch
-from dimagi.utils.couch.cache.cache_core import get_redis_client
+import pytest
from ..locks import TestRedisClient
+from ..tools import nottest
log = logging.getLogger(__name__)
-class RedisLockTimeoutPlugin(Plugin):
- """A plugin that causes blocking redis locks to error on lock timeout"""
- name = "test-redis-locks"
- enabled = True
+@pytest.hookimpl
+def pytest_sessionstart():
+ import dimagi.utils.couch
- def configure(self, options, conf):
- """Do not call super (always enabled)"""
- self.get_client = TestRedisClient(get_test_lock)
+ global get_client
+ get_client = TestRedisClient(get_test_lock)
- def begin(self):
- """Patch redis client used for locks before any tests are run
+ # Patch redis client used for locks before any tests are run.
+ # The patch will remain in effect for the duration of the test
+ # process. Tests (e.g., using `reentrant_redis_locks`) may
+ # override this patch temporarily on an as-needed basis.
+ dimagi.utils.couch.get_redis_client = get_client
- The patch will remain in effect for the duration of the test
- process. Tests (e.g., using `reentrant_redis_locks`) may
- override this patch temporarily on an as-needed basis.
- """
- dimagi.utils.couch.get_redis_client = self.get_client
- def stopTest(self, case):
- get = dimagi.utils.couch.get_redis_client
- assert get == self.get_client, f"redis client patch broke ({get})"
+@pytest.hookimpl(wrapper=True)
+def pytest_runtest_teardown():
+ result = yield
+ import dimagi.utils.couch
+ get = dimagi.utils.couch.get_redis_client
+ assert get == get_client, f"redis client patch broke ({get})"
+ return result
-@nottest
def get_test_lock(key, **kw):
+ from dimagi.utils.couch.cache.cache_core import get_redis_client
timeout = kw["timeout"]
lock = get_redis_client().lock(key, **kw)
return TestLock(key, lock, timeout)
+@nottest
@attr.s
class TestLock:
name = attr.ib()
diff --git a/corehq/tests/pytest_plugins/reusedb.py b/corehq/tests/pytest_plugins/reusedb.py
new file mode 100644
index 000000000000..bcc632b91fd7
--- /dev/null
+++ b/corehq/tests/pytest_plugins/reusedb.py
@@ -0,0 +1,446 @@
+"""Reuse databases between test runs to save setup/teardown time.
+
+In addition to the normal django database setup/teardown, also
+setup/teardown Couch and Elasticsearch databases. Database setup/
+teardown may be skipped, depending on the presence and value of an
+environment variable (`REUSE_DB`) or option (`--reusedb`). Typical
+usage is `REUSE_DB=1` which means skip database setup and migrations
+if possible and do not teardown databases after running tests. If
+connection fails for any test database in `settings.DATABASES` all
+databases will be re-created and migrated.
+
+The `REUSE_DB` environment variable may be overridden with
+`--reusedb` option passed on the command line.
+"""
+import logging
+import os
+import sys
+from contextlib import ExitStack, contextmanager, nullcontext
+from functools import partial
+from unittest.mock import Mock, patch
+
+import pytest
+from pytest_django import fixtures as django_fixtures
+from pytest_django import plugin as django_plugin
+from pytest_django.plugin import DjangoDbBlocker, blocking_manager_key
+from unmagic import get_request, use
+
+from django.conf import settings
+from django.core import cache
+from django.core.management import call_command
+from django.db.backends.base.creation import TEST_DATABASE_PREFIX
+from django.db.utils import OperationalError
+from django.test import utils as djutils
+from django.test.utils import get_unique_databases_and_mirrors
+
+from couchdbkit import ResourceNotFound
+from requests.exceptions import HTTPError
+
+from corehq.util.test_utils import timelimit, unit_testing_only
+
+from .util import override_fixture
+from ..tools import nottest
+
+log = logging.getLogger(__name__)
+
+REUSE_DB_HELP = """
+To be used in conjunction with the environment variable REUSE_DB=1.
+reset: Drop existing test dbs, then create and migrate new ones, but do not
+ teardown after running tests. This is convenient when the existing databases
+ are outdated and need to be rebuilt.
+flush: Flush all objects from the old test databases before running tests.
+ Much faster than `reset`. Also runs `bootstrap` (see below).
+bootstrap: Restore database state such as software plan versions and currencies
+ initialized by database migrations. Sometimes when running tests with
+ REUSE_DB=1 this state is lost, causing tests that depend on it to fail.
+migrate: Migrate the test databases before running tests.
+teardown: Skip database setup; do normal teardown after running tests.
+"""
+
+
+@pytest.hookimpl
+def pytest_addoption(parser):
+ parser.addoption(
+ "--reusedb",
+ default=os.environ.get("REUSE_DB"),
+ help=REUSE_DB_HELP,
+ )
+ parser.addoption(
+ "--db",
+ default="both",
+ choices=["skip", "only"],
+ help="Skip or only run database tests."
+ )
+
+
+@pytest.hookimpl
+def pytest_configure(config):
+ config.reuse_db = reusedb = config.getoption("--reusedb") or config.getvalue("reuse_db")
+ config.skip_setup_for_reuse_db = reusedb and (reusedb != "reset" or config.getvalue("create_db"))
+ config.should_teardown = not reusedb or reusedb == "teardown"
+ db_opt = config.getoption("--db")
+ assert db_opt in ["both", "only", "skip"], db_opt
+ config.should_run_database_tests = db_opt
+
+ if settings.configured:
+ # This blocker will be activated by django-pytest's
+ # pytest_configure hook, which uses trylast=True.
+ config.stash[blocking_manager_key] = HqDbBlocker(config, _ispytest=True)
+
+
+@pytest.hookimpl(wrapper=True)
+def pytest_collection_modifyitems(session, items):
+ """Sort and filter tests, inject database setup"""
+ import pytest_django.plugin as mod
+ django_key = None
+
+ class items_for_django:
+ def sort(key):
+ nonlocal django_key
+ django_key = key
+ filter_and_sort(items, key, session)
+
+ def skip_django_modifyitems():
+ called.append(1)
+ return False
+
+ mod.pytest_collection_modifyitems(items_for_django)
+ called = []
+ # use patch to skip django-pytest pytest_collection_modifyitems
+ with patch.object(mod, "django_settings_is_configured", skip_django_modifyitems):
+ yield
+ assert called, "django_settings_is_configured patch was ineffective. " \
+ "HQ-speicific test filtering and sorting may not have happened."
+ assert is_still_sorted(items, django_key), "Test order changed. Database " \
+ "setup may not be done at the correct point in the test run."
+
+
+def filter_and_sort(items, key, session):
+ def is_db_test(item):
+ return bool(new_key(item))
+
+ new_key = reorder(key)
+ if session.config.should_run_database_tests == "only":
+ should_run = is_db_test
+ elif session.config.should_run_database_tests == "skip":
+ def should_run(item):
+ return not is_db_test(item)
+ else:
+ def should_run(item):
+ return True
+
+ items[:] = sorted((t for t in items if should_run(t)), key=new_key)
+
+
+def reorder(key):
+ """Translate django-pytest's test sorting key
+
+ - 2 -> 0: non-db tests first (pytest-django normally runs them last)
+ - 0 -> 1: tests using the 'db' fixture (TestCase)
+ - 1 -> 2: tests using the 'transactional_db' (TransactionTestCase) last
+ """
+ def new_key(item):
+ fixtures = {f._id for f in getattr(item.obj, "unmagic_fixtures", [])}
+ if "transactional_db" in fixtures:
+ return 2
+ if "db" in fixtures:
+ return 1
+ return new_order[key(item)]
+
+ new_order = {2: 0, 0: 1, 1: 2}
+ return new_key
+
+
+def is_still_sorted(items, key):
+ if not items:
+ return True
+ new_key = reorder(key)
+ it = iter(items)
+ next(it)
+ return all(new_key(a) <= new_key(b) for a, b in zip(items, it))
+
+
+@override_fixture(django_fixtures.django_db_setup)
+@use("django_db_modify_db_settings")
+def django_db_setup():
+ """Override pytest-django's django_db_setup fixture
+
+ Replace pytest-django's database setup/teardown with
+ DeferredDatabaseContext, which handles other databases
+ including Couch, Elasticsearch, BlobDB, and Redis.
+ """
+ # HqDbBlocker.unblock() calls DeferredDatabaseContext.setup_databases()
+ try:
+ yield
+ finally:
+ _db_context.teardown_databases()
+
+
+@override_fixture(django_plugin._django_setup_unittest)
+def _django_setup_unittest():
+ """Do not unblock db for SimpleTestCase tests
+
+ Why is this not the default behavior of pytest-django?
+ """
+ from django.test import TransactionTestCase
+ request = get_request()
+ test_class = getattr(request, "cls", None)
+ if test_class and not issubclass(test_class, TransactionTestCase):
+ class db_blocker:
+ unblock = nullcontext
+ else:
+ db_blocker = request.getfixturevalue("django_db_blocker")
+ yield from _django_setup_unittest.super(request, db_blocker)
+
+
+class DeferredDatabaseContext:
+
+ @timelimit(480)
+ def setup_databases(self, db_blocker):
+ """Setup databases for tests"""
+ from corehq.blobs.tests.util import TemporaryFilesystemBlobDB
+
+ def setup(enter, cleanup):
+ # NOTE teardowns will be called in reverse order
+ enter(TemporaryFilesystemBlobDB())
+ cleanup(delete_elastic_indexes)
+ enter(couch_sql_context(session.config))
+ if session.config.should_teardown:
+ cleanup(close_leaked_sql_connections)
+ cleanup(clear_redis)
+ cleanup(delete_couch_databases)
+
+ def teardown(do_teardown):
+ with db_blocker.unblock():
+ do_teardown()
+
+ assert "teardown_databases" not in self.__dict__, "already set up"
+ db_blocker = get_request().getfixturevalue("django_db_blocker")
+ self.setup_databases = lambda b: None # do not set up more than once
+ session = get_request().session
+ with ExitStack() as stack:
+ try:
+ setup(stack.enter_context, stack.callback)
+ except BaseException:
+ session.shouldfail = "Abort: database setup failed"
+ raise
+ self.teardown_databases = partial(teardown, stack.pop_all().close)
+
+ def teardown_databases(self):
+ """No-op to be replaced with ExitStack.close by setup_databases"""
+
+
+@unit_testing_only
+@contextmanager
+def couch_sql_context(config):
+ if config.skip_setup_for_reuse_db and sql_databases_ok():
+ if config.reuse_db == "migrate":
+ call_command('migrate_multi', interactive=False)
+ if config.reuse_db == "flush":
+ flush_databases()
+ if config.reuse_db == "bootstrap":
+ bootstrap_migrated_db_state()
+ if config.should_teardown:
+ dbs = get_sql_databases()
+ else:
+ if config.reuse_db == "reset":
+ reset_databases(config.option.verbose)
+ dbs = djutils.setup_databases(
+ interactive=False,
+ verbosity=config.option.verbose,
+ # avoid re-creating databases that already exist
+ keepdb=config.skip_setup_for_reuse_db,
+ )
+
+ try:
+ yield
+ finally:
+ if config.should_teardown:
+ djutils.teardown_databases(
+ reversed(dbs), # tear down in reverse setup order
+ verbosity=config.option.verbose,
+ )
+
+
+def sql_databases_ok():
+ for connection, db_name, _ in get_sql_databases():
+ try:
+ connection.ensure_connection()
+ except OperationalError as e:
+ print(str(e), file=sys.__stderr__)
+ return False
+ return True
+
+
+def get_sql_databases(*, _cache=[]):
+ if not _cache:
+ from django.db import connections
+ test_databases, mirrored_aliases = get_unique_databases_and_mirrors()
+ assert not mirrored_aliases, "DB mirrors not supported"
+ for signature, (db_name, aliases) in test_databases.items():
+ alias = list(aliases)[0]
+ connection = connections[alias]
+ db = connection.settings_dict
+ assert db["NAME"].startswith(TEST_DATABASE_PREFIX), db["NAME"]
+ _cache.append((connection, db_name, True))
+ return _cache
+
+
+@unit_testing_only
+def reset_databases(verbosity):
+ delete_couch_databases()
+ delete_elastic_indexes()
+ clear_redis()
+ # tear down all databases together to avoid dependency issues
+ teardown = []
+ for connection, db_name, is_first in get_sql_databases():
+ try:
+ connection.ensure_connection()
+ teardown.append((connection, db_name, is_first))
+ except OperationalError:
+ pass # ignore missing database
+ djutils.teardown_databases(reversed(teardown), verbosity=verbosity)
+
+
+@unit_testing_only
+def delete_couch_databases():
+ for db in get_all_couch_dbs():
+ try:
+ db.server.delete_db(db.dbname)
+ log.info("deleted database %s", db.dbname)
+ except ResourceNotFound:
+ log.info("database %s not found! it was probably already deleted.", db.dbname)
+
+
+@unit_testing_only
+def delete_elastic_indexes():
+ from corehq.apps.es.client import manager as elastic_manager
+ # corehq.apps.es.client.create_document_adapter uses
+ # TEST_DATABASE_PREFIX when constructing test index names
+ for index_name in elastic_manager.get_indices():
+ if index_name.startswith(TEST_DATABASE_PREFIX):
+ elastic_manager.index_delete(index_name)
+
+
+@unit_testing_only
+def clear_redis():
+ config = settings.CACHES.get("redis", {})
+ loc = config.get("TEST_LOCATION")
+ if loc:
+ redis = cache.caches['redis']
+ assert redis.client._server == [loc], (redis.client._server, config)
+ redis.clear()
+
+
+@nottest
+@unit_testing_only
+def get_all_couch_dbs():
+ from corehq.util.couchdb_management import couch_config
+ all_dbs = list(couch_config.all_dbs_by_db_name.values())
+ for db in all_dbs:
+ if '/test_' not in db.uri:
+ raise ValueError("not a test db url: db=%s url=%r" % (db.dbname, db.uri))
+ return all_dbs
+
+
+def close_leaked_sql_connections():
+ from corehq.sql_db.connections import connection_manager
+ connection_manager.dispose_all()
+
+
+@unit_testing_only
+def flush_databases():
+ """
+ Best effort at emptying all documents from all databases.
+ Useful when you break a test and it doesn't clean up properly. This took
+ about 5 seconds to run when trying it out.
+ """
+ print("Flushing test databases, check yourself before you wreck yourself!", file=sys.__stdout__)
+ for db in get_all_couch_dbs():
+ try:
+ db.flush()
+ except (ResourceNotFound, HTTPError):
+ pass
+ call_command('flush', interactive=False)
+ bootstrap_migrated_db_state()
+
+
+@unit_testing_only
+def bootstrap_migrated_db_state():
+ from corehq.apps.accounting.tests.generator import bootstrap_accounting
+ from corehq.apps.smsbillables.tests.utils import bootstrap_smsbillables
+ bootstrap_accounting()
+ bootstrap_smsbillables()
+
+
+class HqDbBlocker(DjangoDbBlocker):
+
+ def __init__(self, config, **kw):
+ super().__init__(**kw)
+ self._callbacks = []
+ self.block_couch, self.unblock_couch = _setup_couch_blocker()
+ self.original_db_enabled = settings.DB_ENABLED
+
+ # HACK get the real ensure_connection
+ old = config.stash.get(blocking_manager_key, None)
+ if old and old._real_ensure_connection:
+ self._real_ensure_connection = old._real_ensure_connection
+
+ def _block(self):
+ settings.DB_ENABLED = False
+ self.block_couch()
+
+ def _unblock(self):
+ settings.DB_ENABLED = self.original_db_enabled
+ self.unblock_couch()
+
+ def block(self):
+ """Disable database access"""
+ self._callbacks.append(self._unblock)
+ self._block()
+ return super().block()
+
+ def unblock(self):
+ """Enable database access"""
+ self._callbacks.append(self._block)
+ self._unblock()
+ blocker = super().unblock()
+ _db_context.setup_databases(self)
+ return blocker
+
+ def restore(self):
+ self._callbacks.pop()()
+ super().restore()
+
+
+def _setup_couch_blocker():
+ from couchdbkit.ext.django import loading
+
+ class CouchSpec(object):
+ dbname = None
+ view = Mock(return_value=[])
+
+ def mock_couch(app):
+ dbname = dbs.get(app, main_db_url).rsplit("/", 1)[1]
+ return Mock(name=dbname, dbname=dbname, spec_set=CouchSpec)
+
+ # register our dbs with the extension document classes
+ main_db_url = settings.COUCH_DATABASE
+ dbs = dict(settings.COUCHDB_DATABASES)
+ patches = []
+ for app, value in loading.couchdbkit_handler.app_schema.items():
+ for cls in value.values():
+ patches.append(patch.object(cls, "_db", mock_couch(app)))
+
+ def block():
+ for pch in patches:
+ pch.start()
+
+ def unblock():
+ for pch in patches:
+ pch.stop()
+
+ return block, unblock
+
+
+_db_context = DeferredDatabaseContext()
diff --git a/corehq/tests/pytest_plugins/timelimit.py b/corehq/tests/pytest_plugins/timelimit.py
new file mode 100644
index 000000000000..c56d1a4be925
--- /dev/null
+++ b/corehq/tests/pytest_plugins/timelimit.py
@@ -0,0 +1,83 @@
+"""Enforce a maximum time for various test events, fail if the limit is exceeded
+"""
+import os
+import time
+import warnings
+
+import pytest
+from unmagic.scope import get_active
+
+
+def pytest_addoption(parser):
+ parser.addoption(
+ '--max-test-time', type=float, dest="max_test_time",
+ default=get_float(os.environ.get('CCHQ_MAX_TEST_TIME'), 0),
+ help='Fail test if it runs for longer than this limit (seconds). '
+ 'Use `corehq.tests.util.timelimit.timelimit` to '
+ 'override the time limit for individual tests or '
+ 'functions called by tests.'
+ )
+
+
+def pytest_configure(config):
+ config.max_test_time = config.getoption("--max-test-time")
+ if config.max_test_time:
+ config.pluginmanager.register(MaxTestTimePlugin(), "timelimit")
+
+
+class MaxTestTimePlugin:
+
+ def __init__(self):
+ self.limits = None
+ self.time = time.time # evade freezegun
+
+ @pytest.hookimpl(wrapper=True)
+ def pytest_runtest_setup(self, item):
+ yield from self.enforce_limit(item, "setup")
+
+ @pytest.hookimpl(wrapper=True)
+ def pytest_runtest_call(self, item):
+ yield from self.enforce_limit(item, "test")
+
+ @pytest.hookimpl(wrapper=True)
+ def pytest_runtest_teardown(self, item):
+ yield from self.enforce_limit(item, "teardown")
+
+ def enforce_limit(self, item, event):
+ limits = self.limits = []
+ start = self.time()
+ yield
+ duration = self.time() - start
+ limit = max(sum(limits), item.config.max_test_time)
+ if duration > limit:
+ raise AssertionError(f"{event} time limit ({limit}) exceeded: {duration}")
+ self.limits = None
+
+
+def get_float(value, default):
+ try:
+ return float(value)
+ except (TypeError, ValueError):
+ return default
+
+
+def increase_max_test_time(value):
+ """Increase the maximum amount of time allowed for the active test phase
+
+ If it is greater, the sum of values passed to this function will be
+ used instead of --max-test-time.
+ """
+ try:
+ plugin = get_active().session.config.pluginmanager.get_plugin("timelimit")
+ except (ValueError, AttributeError):
+ warnings.warn("timelimit used outside of test?")
+ return
+ if plugin is not None:
+ if plugin.limits is None:
+ warnings.warn("timelimit used outside of runtest lifecycle")
+ else:
+ plugin.limits.append(value)
+
+
+def get_max_test_time(obj):
+ return getattr(obj, "max_test_time", 0)
diff --git a/corehq/tests/pytest_plugins/util.py b/corehq/tests/pytest_plugins/util.py
new file mode 100644
index 000000000000..a901a2e76345
--- /dev/null
+++ b/corehq/tests/pytest_plugins/util.py
@@ -0,0 +1,17 @@
+from functools import wraps
+
+
+def override_fixture(old_fixture):
+ """Override a pytest magic fixture with an unmagic fixture
+
+ The overriding fixture function will be assigned a 'super'
+ attribute that references the overridden fixture function.
+ """
+ def apply(new_fixture):
+ @wraps(new_fixture)
+ def fixture(*a, **k):
+ yield from new_fixture(*a, **k)
+ new_fixture.super = old_fixture.__pytest_wrapped__.obj
+ old_fixture.__pytest_wrapped__.obj = fixture
+ return new_fixture
+ return apply
diff --git a/corehq/tests/test_classcleanup.py b/corehq/tests/test_classcleanup.py
index 4fa58fc55199..5d65912d8f24 100644
--- a/corehq/tests/test_classcleanup.py
+++ b/corehq/tests/test_classcleanup.py
@@ -1,64 +1,89 @@
-from functools import wraps
-from unittest import TestCase
+import inspect
+import os
-from nose.plugins import PluginTester
from testil import eq
-
-from .noseplugins import classcleanup as mod
-
-
-class TestClassCleanupPlugin(PluginTester, TestCase):
- activate = '' # Activate option not needed. Plugin is always enabled.
- plugins = [mod.ClassCleanupPlugin()]
-
- def setUp(self):
- pass # super().setUp() is called by self.run_with_errors(...)
-
- def makeSuite(self):
-
- def log_call_and_maybe_error(func):
- @wraps(func)
- def wrapper(self_):
- func(self_)
- self.call_log.append(func.__name__)
- if func.__name__ in self.errors:
- self.call_log[-1] += f" {self.error_class.__name__}"
- raise self.error_class
- return wrapper
-
- class Test(TestCase):
- @classmethod
- @log_call_and_maybe_error
- def setUpClass(cls):
- cls.addClassCleanup(self.call_log.append, "classCleanup")
-
- @log_call_and_maybe_error
- def setUp(self):
- pass
-
- @log_call_and_maybe_error
- def runTest(self):
- pass
-
- @log_call_and_maybe_error
- def tearDown(self):
- pass
-
- @classmethod
- @log_call_and_maybe_error
- def tearDownClass(cls):
- pass
-
- return [Test()]
-
- def run_with_errors(self, *errors, error_class=Exception):
- self.call_log = []
+from unmagic import fixture
+
+pytest_plugins = ["pytester"]
+log = []
+
+
+class TestClassCleanupPlugin:
+
+ def run_suite(self, **kwargs):
+ @get_source
+ def test_py():
+ from functools import wraps
+ from unittest import TestCase
+
+ import corehq.tests.test_classcleanup as mod
+
+ def log(value):
+ mod.log.append(value)
+
+ def log_call_and_maybe_error(func):
+ @wraps(func)
+ def wrapper(self_):
+ func(self_)
+ if func.__name__ in '__ERRORS__':
+ log(func.__name__ + " '__ERROR_CLASS_NAME__'")
+ raise '__ERROR_CLASS_NAME__'
+ log(func.__name__)
+ return wrapper
+
+ class Test(TestCase):
+ @classmethod
+ @log_call_and_maybe_error
+ def setUpClass(cls):
+ cls.addClassCleanup(cls.classCleanup)
+
+ @classmethod
+ @log_call_and_maybe_error
+ def classCleanup(cls):
+ pass
+
+ @log_call_and_maybe_error
+ def setUp(self):
+ pass
+
+ @log_call_and_maybe_error
+ def runTest(self):
+ pass
+
+ @log_call_and_maybe_error
+ def tearDown(self):
+ pass
+
+ @classmethod
+ @log_call_and_maybe_error
+ def tearDownClass(cls):
+ pass
+
+ pytester = fixture("pytester")()
+ pytester.makepyfile(
+ test_py
+ .replace("'__ERRORS__'", repr(self.errors))
+ .replace("'__ERROR_CLASS_NAME__'", self.error_class.__name__)
+ )
+
+ assert not log
+ os.environ['TestClassCleanupPlugin_data'] = '[]'
+ # fragile! other pytest plugins could break this
+ result = pytester.runpytest('-qs', '-pno:django', '-pno:corehq', '-pno:warnings')
+ result.assert_outcomes(**kwargs)
+ # sharing via os.environ works because runpytest runs in the same process
+ self.call_log = log[:]
+ del log[:]
+
+ def run_with_errors(self, *errors, error_class=Exception, **kwargs):
+ if not kwargs:
+ kwargs = {"failed": 1}
self.errors = errors
self.error_class = error_class
- super().setUp()
+ self.run_suite(**kwargs)
def test_cleanup_in_happy_path(self):
- self.run_with_errors()
+ self.run_with_errors(passed=1)
eq(self.call_log, [
"setUpClass",
"setUp",
@@ -69,7 +94,7 @@ def test_cleanup_in_happy_path(self):
])
def test_cleanup_on_error_in_set_up_class(self):
- self.run_with_errors("setUpClass")
+ self.run_with_errors("setUpClass", errors=1)
eq(self.call_log, [
"setUpClass Exception",
"classCleanup"
@@ -118,7 +143,7 @@ def test_cleanup_on_error_in_tearDown(self):
])
def test_cleanup_on_error_in_tearDownClass(self):
- self.run_with_errors("tearDownClass")
+ self.run_with_errors("tearDownClass", passed=1, errors=1)
eq(self.call_log, [
"setUpClass",
"setUp",
@@ -129,7 +154,7 @@ def test_cleanup_on_error_in_tearDownClass(self):
])
def test_cleanup_on_error_in_tearDown_and_tearDownClass(self):
- self.run_with_errors("tearDown", "tearDownClass")
+ self.run_with_errors("tearDown", "tearDownClass", failed=1, errors=1)
eq(self.call_log, [
"setUpClass",
"setUp",
@@ -138,3 +163,23 @@ def test_cleanup_on_error_in_tearDown_and_tearDownClass(self):
"tearDownClass Exception",
"classCleanup",
])
+
+ def test_error_in_classCleanup(self):
+ self.run_with_errors("classCleanup", passed=1, errors=1)
+ eq(self.call_log, [
+ "setUpClass",
+ "setUp",
+ "runTest",
+ "tearDown",
+ "tearDownClass",
+ "classCleanup Exception",
+ ])
+
+
+def get_source(func):
+ src = inspect.getsource(func)
+ while True:
+ firstline, src = src.split("\n", 1)
+ if f'def {func.__name__}(' in firstline:
+ return src
+ assert src
diff --git a/corehq/tests/test_dividedwerun.py b/corehq/tests/test_dividedwerun.py
new file mode 100644
index 000000000000..8afef7f6d977
--- /dev/null
+++ b/corehq/tests/test_dividedwerun.py
@@ -0,0 +1,118 @@
+from unittest import TestCase
+
+import pytest
+from django.test import SimpleTestCase, TestCase as DatabaseTest
+from unmagic import get_request
+
+from .pytest_plugins.dividedwerun import name_of
+
+
+def test_name_of_function_test():
+ def do():
+ pass
+ func = make_function(do)
+
+ assert name_of(func) == "corehq/tests/test_dividedwerun.py::do"
+
+
+def test_name_of_with_setup_module():
+ global setup_module
+
+ def do():
+ pass
+
+ def da():
+ pass
+
+ do_func = make_function(do)
+ da_func = make_function(da)
+
+ assert name_of(do_func) != name_of(da_func)
+
+ def setup_module():
+ pass
+
+ assert name_of(do_func) == name_of(da_func)
+ del setup_module
+
+
+class Test:
+
+ def test(self):
+ func = make_function(self.test)
+ assert name_of(func) == "corehq/tests/test_dividedwerun.py::Test::test"
+
+
+class TestSubclass(TestCase):
+
+ def test(self):
+ func = make_function(self.test)
+ assert name_of(func) == "corehq/tests/test_dividedwerun.py::TestSubclass::test"
+
+
+class TestSimpleSubclass(SimpleTestCase):
+
+ def test(self):
+ func = make_function(self.test)
+ assert name_of(func) == "corehq/tests/test_dividedwerun.py::TestSimpleSubclass::test"
+
+
+class TestCaseClassSetup(TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ """Potentially expensive"""
+ super().setUpClass()
+
+ def test(self):
+ func = make_function(self.test)
+ other = make_function(self.other)
+ assert name_of(func) == name_of(other)
+
+ def other(self):
+ pass
+
+
+class TestSimpleClassSetup(SimpleTestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ """Potentially expensive"""
+ super().setUpClass()
+
+ def test(self):
+ func = make_function(self.test)
+ other = make_function(self.other)
+ assert name_of(func) == name_of(other)
+
+ def other(self):
+ pass
+
+
+class TestDbClassSetup(DatabaseTest):
+
+ @classmethod
+ def setUpClass(cls):
+ """Potentially expensive"""
+ # do not call super to skip db setup
+
+ @classmethod
+ def tearDownClass(cls):
+ """Potentially expensive"""
+ # do not call super to skip db teardown
+
+ def test(self):
+ func = make_function(self.test)
+ other = make_function(self.other)
+ assert name_of(func) == name_of(other)
+
+ def other(self):
+ pass
+
+
+def make_function(func):
+ return pytest.Function.from_parent(
+ get_request().node.parent,
+ name=func.__name__,
+ callobj=func,
+ )
diff --git a/corehq/tests/test_locks.py b/corehq/tests/test_locks.py
index 5d9774a30ce9..23ac3d942832 100644
--- a/corehq/tests/test_locks.py
+++ b/corehq/tests/test_locks.py
@@ -1,3 +1,4 @@
+import re
from redis.exceptions import LockError
from testil import assert_raises, eq
@@ -6,10 +7,10 @@
from corehq.util.test_utils import timelimit
from .locks import ReentrantTestLock, reentrant_redis_locks
-from .noseplugins.redislocks import TestLock, TimeoutError
+from .pytest_plugins.redislocks import TestLock, TimeoutError
-def test_redislocks_nose_plugin():
+def test_redislocks_pytest_plugin():
lock1 = get_redis_lock(__name__, timeout=0.2, name="test")
assert isinstance(lock1.lock, TestLock), lock1.lock
assert lock1.acquire(blocking_timeout=1)
@@ -51,7 +52,7 @@ def simulate_reentrant_lock():
@timelimit(0.1)
def test_unreleased_lock():
msg = "unreleased dict_values([ReentrantTestLock(name='unreleased', level=1)])"
- with assert_raises(AssertionError, msg=msg):
+ with assert_raises(AssertionError, msg=re.compile("^" + re.escape(msg))):
with reentrant_redis_locks():
lock = get_redis_lock("unreleased", timeout=0.5, name="test")
assert lock.acquire()
diff --git a/corehq/tests/test_nose_timing.py b/corehq/tests/test_nose_timing.py
deleted file mode 100644
index b8e7730ef055..000000000000
--- a/corehq/tests/test_nose_timing.py
+++ /dev/null
@@ -1,238 +0,0 @@
-import gc
-import time
-from tempfile import NamedTemporaryFile
-from unittest import TestCase
-
-from nose.plugins import PluginTester
-from testil import Regex, eq
-
-from corehq.util.test_utils import timelimit
-
-from .noseplugins import timing as mod
-
-
-class TimingPluginTesterBase(PluginTester, TestCase):
- activate = "--with-timing"
- plugins = [mod.TimingPlugin()]
-
- def setUp(self):
- with NamedTemporaryFile("w+", encoding="utf-8") as fh:
- self.args = [
- "--timing-file", fh.name,
- "--pretty-timing",
- "--max-test-time=0.05",
- "-v",
- ]
- gc.disable()
- try:
- # time-critical tests are run in here
- super().setUp()
- finally:
- gc.enable()
- fh.seek(0)
- print("---- begin test output ----")
- print(self.output)
- print("---- end test output ----")
- print("---- begin timing output ----")
- print(fh.read())
- print("---- end timing output ----")
-
- def print(self, *args):
- print(*args, file=mod.PLUGIN_INSTANCE.output)
-
- @property
- def ChattyTestBase(self):
- class ChattyTestBase(TestCase):
- @classmethod
- def setUpClass(cls):
- print("setUpClass")
- super().setUpClass()
-
- @classmethod
- def tearDownClass(cls):
- print("tearDownClass")
- super().tearDownClass()
-
- def setUp(self):
- print("setUp")
- super().setUp()
-
- def tearDown(self):
- print("tearDown")
- super().tearDown()
-
- def runTest(self):
- print("runTest")
-
- print = self.print
- ChattyTestBase.print = self.print
- return ChattyTestBase
-
-
-class TestDefaultMaxSetupTime(TimingPluginTesterBase):
-
- def makeSuite(self):
- class Test(self.ChattyTestBase):
- @classmethod
- def setUpClass(cls):
- super().setUpClass()
- time.sleep(0.051)
-
- def setUp(self):
- # NOTE setUp() is part of "run" phase, not "setup"
- assert False, "should not get here"
-
- return [Test()]
-
- def test_time_limit(self):
- output = str(self.output)
- eq(output, Regex(r"setup time limit \(0\.05\) exceeded: 0\.0[5-9]"))
-
-
-class TestDefaultMaxTeardownTime(TimingPluginTesterBase):
-
- def makeSuite(self):
- class Test(self.ChattyTestBase):
- # NOTE tearDown() is part of "run" phase, not "teardown"
- @classmethod
- def tearDownClass(cls):
- super().tearDownClass()
- time.sleep(0.051)
- return [Test()]
-
- def test_time_limit(self):
- output = str(self.output)
- eq(output, Regex(r"teardown time limit \(0\.05\) exceeded: 0\.0[5-9]"))
-
-
-class TestDefaultMaxTestTime(TimingPluginTesterBase):
-
- def makeSuite(self):
- class Test(self.ChattyTestBase):
- def runTest(self):
- super().runTest()
- time.sleep(0.051)
- return [Test()]
-
- def test_time_limit(self):
- output = str(self.output)
- eq(output, Regex(r"run time limit \(0\.05\) exceeded: 0\.0[5-9]"))
-
-
-class TestSetupExceedsMaxWithTestLimit(TimingPluginTesterBase):
-
- def makeSuite(self):
- class Test(self.ChattyTestBase):
- def setUp(self):
- super().setUp()
- time.sleep(0.051)
-
- @timelimit(0.001)
- def runTest(self):
- super().runTest()
-
- return [Test()]
-
- def test_time_limit(self):
- output = str(self.output)
- eq(output, Regex(r"run time limit \(0\.05\) exceeded: 0\.0[5-9]"))
-
-
-class TestTeardownExceedsSumOfOtherLimits(TimingPluginTesterBase):
-
- def makeSuite(self):
- class Test(self.ChattyTestBase):
- @timelimit(0.001)
- def setUp(self):
- super().setUp()
-
- @timelimit(0.051)
- def runTest1(self):
- super().runTest()
-
- @timelimit(0.06)
- def runTest2(self):
- super().runTest()
-
- def tearDown(self):
- super().tearDown()
- time.sleep(0.0611)
-
- return [Test("runTest1"), Test("runTest2")]
-
- def test_time_limit1(self):
- output = str(self.output)
- eq(output, Regex(r"run time limit \(0\.052\) exceeded: 0\.0[6-9]"))
-
- def test_time_limit2(self):
- output = str(self.output)
- eq(output, Regex(r"run time limit \(0\.061\) exceeded: 0\.0[6-9]"))
-
-
-class TestTimeLimitReset(TimingPluginTesterBase):
-
- def makeSuite(self):
- class Test(self.ChattyTestBase):
- @timelimit(0.051)
- def runTest1(self):
- super().runTest()
- time.sleep(0.052)
-
- def runTest2(self):
- super().runTest()
- time.sleep(0.052)
-
- return [Test("runTest1"), Test("runTest2")]
-
- def test_time_limit1(self):
- output = str(self.output)
- eq(output, Regex(r"run time limit \(0\.051\) exceeded: 0\.0[5-9]"))
-
- def test_time_limit2(self):
- output = str(self.output)
- eq(output, Regex(r"run time limit \(0\.05\) exceeded: 0\.0[5-9]"))
-
-
-class TestExtendedTimeLimit(TimingPluginTesterBase):
-
- def makeSuite(self):
- class Test(self.ChattyTestBase):
- @timelimit(0.06)
- def runTest(self):
- super().runTest()
- time.sleep(0.061)
- return [Test()]
-
- def test_time_limit(self):
- output = str(self.output)
- eq(output, Regex(r"runTest took too long: 0:00:00.0[6-9]"))
-
-
-class TestPatchMaxTestTime(TimingPluginTesterBase):
-
- def makeSuite(self):
- @mod.patch_max_test_time(0.051)
- class Test(self.ChattyTestBase):
- @timelimit(0.001)
- def test1(self):
- super().runTest()
- time.sleep(0.01)
-
- def test2(self):
- super().runTest()
- time.sleep(0.052)
-
- @classmethod
- def tearDownClass(cls):
- super().tearDownClass()
- time.sleep(0.052)
-
- return [Test("test1"), Test("test2")]
-
- def test_time_limit_errors(self):
- output = str(self.output)
- eq(output, Regex(r"test1 took too long: 0:00:00\.0[1-9]"))
- eq(output, Regex(r"run time limit \(0\.051\) exceeded: 0\.0[5-9]"))
-
- # NOTE tearDownClass is not limited by class patch
- eq(output, Regex(r"teardown time limit \(0\.05\) exceeded: 0\.0[5-9]"))
diff --git a/corehq/tests/test_reusedb.py b/corehq/tests/test_reusedb.py
new file mode 100644
index 000000000000..bd63060c7845
--- /dev/null
+++ b/corehq/tests/test_reusedb.py
@@ -0,0 +1,34 @@
+import re
+
+import pytest
+from unmagic import use
+
+from django.conf import settings
+
+from corehq.apps.domain.models import Domain as CouchModel # arbitrary couch model
+from corehq.apps.users.models import User as Model # arbitrary SQL model
+from corehq.blobs.models import BlobMeta as ShardedModel # arbitrary SQL model
+
+
+def test_database_blocker():
+ assert not settings.DB_ENABLED
+
+ with pytest.raises(AttributeError, match="Mock object has no attribute 'info'"):
+ CouchModel.get_db().info
+
+ with pytest.raises(RuntimeError, match=re.compile("^Database access not allowed")):
+ Model.objects.all().explain()
+
+ with pytest.raises(RuntimeError, match=re.compile("^Database access not allowed")):
+ ShardedModel.objects.using("p1").all().explain()
+
+
+@use("db")
+def test_unblocked_database_blocker():
+ assert settings.DB_ENABLED
+
+ assert CouchModel.get_db().info()["db_name"].startswith("test_")
+
+ # these should not raise
+ Model.objects.all().explain()
+ ShardedModel.objects.using("p1").all().explain()
diff --git a/corehq/tests/test_timelimit.py b/corehq/tests/test_timelimit.py
new file mode 100644
index 000000000000..3e185f8e092d
--- /dev/null
+++ b/corehq/tests/test_timelimit.py
@@ -0,0 +1,200 @@
+import gc
+import inspect
+
+from testil import Regex
+from unmagic import fixture
+
+pytest_plugins = ["pytester"]
+
+
+class TimeLimitTestCase:
+
+ def setup_method(self):
+ pytester = fixture("pytester")()
+ self.create_pytester_files(pytester)
+ plugin_opts = [
+ "-pno:django",
+ "-pno:corehq",
+ "-pcorehq.tests.pytest_plugins.timelimit"
+ ]
+ gc.disable()
+ try:
+ # time-critical tests are run in here
+ result = pytester.runpytest(*plugin_opts, "-lv", "--max-test-time=0.05")
+ finally:
+ gc.enable()
+ self.output = str(result.stdout)
+
+ def create_pytester_files(self, pytester):
+ pytester.makepyfile(get_source(self.pyfile_code))
+
+
+class TestDefaultMaxSetupTime(TimeLimitTestCase):
+
+ def pyfile_code(self):
+ import time
+ from unittest import TestCase
+
+ class Test(TestCase):
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+ time.sleep(0.051)
+
+ def setUp(self):
+ # NOTE setUp() is part of "runtest", not "setup" event
+ assert False, "should not get here"
+
+ test = setUp
+
+ @classmethod
+ def tearDownClass(cls):
+ assert 0, 'tearDownClass was called'
+
+ def test_time_limit(self):
+ assert self.output == Regex(r"setup time limit \(0\.05\) exceeded: 0\.0[5-9]")
+
+ def test_teardownclass_called_if_setupclass_limit_exceeded(self):
+ assert "tearDownClass was called" in self.output
+
+ def test_setUp_not_called(self):
+ assert "should not get here" not in self.output
+
+
+class TestDefaultMaxTeardownTime(TimeLimitTestCase):
+
+ def pyfile_code(self):
+ import time
+ from unittest import TestCase
+
+ class Test(TestCase):
+ # NOTE tearDown() is part of "runtest", not "teardown" event
+ @classmethod
+ def tearDownClass(cls):
+ time.sleep(0.051)
+
+ def test(self):
+ pass
+
+ def test_time_limit(self):
+ assert self.output == Regex(r"teardown time limit \(0\.05\) exceeded: 0\.0[5-9]")
+
+
+class TestDefaultMaxTestTime(TimeLimitTestCase):
+
+ def pyfile_code(self):
+ import time
+ from unittest import TestCase
+
+ class Test(TestCase):
+ def runTest(self):
+ time.sleep(0.051)
+
+ def test_time_limit(self):
+ assert self.output == Regex(r"test time limit \(0\.05\) exceeded: 0\.0[5-9]")
+
+
+class TestSetupExceedsMaxTestTimeLimit(TimeLimitTestCase):
+
+ def pyfile_code(self):
+ import time
+ from unittest import TestCase
+ from corehq.util.test_utils import timelimit
+
+ class Test(TestCase):
+ def setUp(self):
+ time.sleep(0.051)
+
+ @timelimit(0.001)
+ def test(self):
+ pass
+
+ def test_time_limit(self):
+ assert self.output == Regex(r"test time limit \(0\.05\) exceeded: 0\.0[5-9]")
+
+
+class TestTeardownExceedsMaxTestTimeLimit(TimeLimitTestCase):
+
+ def pyfile_code(self):
+ import time
+ from unittest import TestCase
+ from corehq.util.test_utils import timelimit
+
+ class Test(TestCase):
+ @timelimit(0.021)
+ def setUp(self):
+ pass
+
+ @timelimit(0.041)
+ def test(self):
+ pass
+
+ def tearDown(self):
+ time.sleep(0.063)
+
+ def test_time_limit(self):
+ assert self.output == Regex(r"test time limit \(0\.062\) exceeded: 0\.0[6-9]")
+
+
+class TestTimeLimitReset(TimeLimitTestCase):
+
+ def pyfile_code(self):
+ import time
+ from unittest import TestCase
+ from corehq.util.test_utils import timelimit
+
+ class Test(TestCase):
+ @timelimit(0.051)
+ def test_1(self):
+ time.sleep(0.052)
+
+ def test_2(self):
+ time.sleep(0.052)
+
+ def test_time_limit1(self):
+ assert self.output == Regex(r"test_1 time limit \(0\.051\) exceeded: 0\.0[5-9]")
+
+ def test_time_limit2(self):
+ assert self.output == Regex(r"test time limit \(0\.05\) exceeded: 0\.0[5-9]")
+
+
+class TestOverrideTimeLimit(TimeLimitTestCase):
+
+ def pyfile_code(self):
+ import time
+ from unittest import TestCase
+ from corehq.util.test_utils import timelimit
+
+ class Test(TestCase):
+ @timelimit(0.06)
+ def test(self):
+ time.sleep(0.051)
+
+ def test_time_limit(self):
+ assert "exceeded" not in self.output
+
+
+class TestNestedTimeLimits(TimeLimitTestCase):
+
+ def pyfile_code(self):
+ from time import sleep
+ from corehq.tests.util.timelimit import timelimit
+
+ @timelimit(0.05)
+ def slowdown():
+ sleep(0.02)
+
+ @timelimit(0.05)
+ def test_nested_timelimit():
+ slowdown()
+ sleep(0.04)
+
+ def test_time_limit(self):
+ assert "exceeded" not in self.output
+
+
+def get_source(method):
+ src = inspect.getsource(method)
+ firstline, body = src.split("\n", 1)
+ assert f'def {method.__name__}(' in firstline, src
+ return body
diff --git a/corehq/tests/util/tests/test_timelimit.py b/corehq/tests/util/tests/test_timelimit.py
new file mode 100644
index 000000000000..360c94f44ded
--- /dev/null
+++ b/corehq/tests/util/tests/test_timelimit.py
@@ -0,0 +1,78 @@
+import re
+import time
+
+from testil import assert_raises, eq
+
+from ..timelimit import timelimit
+
+
+def test_timelimit_pass():
+ @timelimit(0.001)
+ def addone(x):
+ return x + 1
+ eq(addone(x=1), 2)
+
+
+def test_timelimit_fail():
+ @timelimit(0.0001)
+ def sleeper():
+ time.sleep(0.001)
+ with assert_raises(AssertionError, msg=re.compile("sleeper time limit .+ exceeded")):
+ sleeper()
+
+
+def test_timelimit_default():
+ @timelimit
+ def double(x):
+ return x * 2
+ eq(double(2), 4)
+
+
+def test_nested_timelimits():
+ @timelimit(0.01)
+ def sleeper():
+ time.sleep(0.002)
+
+ @timelimit(0.001)
+ def addone(x):
+ sleeper()
+ return x + 1
+
+ eq(addone(x=1), 2)
+
+
+def test_nested_timelimit_failure():
+ @timelimit(0.001)
+ def inner():
+ time.sleep(0.002)
+
+ @timelimit(0.01)
+ def outer():
+ inner()
+
+ with assert_raises(AssertionError, msg=re.compile("inner time limit .+ exceeded")):
+ outer()
+
+
+def test_nested_timelimits_with_error():
+ @timelimit
+ def raiser():
+ raise ValueError("boom")
+
+ with assert_raises(ValueError, msg="boom"):
+ raiser()
+
+ # time limit of raiser should not transfer to this timelimit
+ @timelimit(0.001)
+ def too_slow():
+ time.sleep(0.0011)
+
+ with assert_raises(AssertionError, msg=re.compile("too_slow time limit .+ exceeded")):
+ too_slow()
+
+
+def test_cannot_limit_generator():
+ with assert_raises(ValueError, msg=re.compile("'timelimit' on generator")):
+ @timelimit
+ def gen():
+ yield
diff --git a/corehq/tests/util/timelimit.py b/corehq/tests/util/timelimit.py
new file mode 100644
index 000000000000..4d0e6b9afc8b
--- /dev/null
+++ b/corehq/tests/util/timelimit.py
@@ -0,0 +1,61 @@
+from functools import wraps
+from inspect import isgeneratorfunction
+from time import time
+
+_context = []
+
+
+def timelimit(limit):
+ """Create a decorator that asserts a run time limit
+
+ An assertion error is raised if the decorated function returns
+ without raising an error and the elapsed run time is longer than
+ the allowed time limit.
+
+ Can be used to override the limit imposed by --max-test-time.
+ Note: this decorator is not thread-safe.
+
+ Usage:
+
+ @timelimit
+ def lt_one_second():
+ ...
+
+ @timelimit(0.5)
+ def lt_half_second():
+ ...
+
+ :param limit: number of seconds or a callable to decorate. If
+ callable, the time limit defaults to one second.
+ """
+ # Import here to avoid breaking the docs build on github actions.
+ # Error: Handler for event 'config-inited'
+ # threw an exception (exception: No module named 'pytest')
+ from ..pytest_plugins.timelimit import increase_max_test_time
+
+ if callable(limit):
+ return timelimit((limit, 1))
+ if not isinstance(limit, tuple):
+ return lambda func: timelimit((func, limit))
+ func, seconds = limit
+
+ if isgeneratorfunction(func):
+ raise ValueError(f"cannot use 'timelimit' on generator function: {func}")
+
+ @wraps(func)
+ def time_limit(*args, **kw):
+ level = len(_context)
+ try:
+ _context.append(seconds)
+ increase_max_test_time(seconds)
+ start = time()
+ rval = func(*args, **kw)
+ elapsed = time() - start
+ limit = sum(_context[level:])
+ finally:
+ if level == 0:
+ _context.clear()
+ assert elapsed < limit, f"{func.__name__} time limit ({limit}) exceeded: {elapsed}"
+ return rval
+ time_limit.max_test_time = seconds
+ return time_limit
diff --git a/corehq/util/decorators.py b/corehq/util/decorators.py
index a61895f26805..94bdea6f36a6 100644
--- a/corehq/util/decorators.py
+++ b/corehq/util/decorators.py
@@ -12,7 +12,6 @@
from corehq.apps.celery import task
from corehq.util.global_request import get_request
-from corehq.util.metrics import metrics_counter
def handle_uncaught_exceptions(mail_admins=True):
@@ -42,6 +41,8 @@ def silence_and_report_error(message, metric_name):
Instead, report the issue to sentry and track the overall count as a metric
"""
+ # indirectly causes Django settings access on import
+ from corehq.util.metrics import metrics_counter
try:
yield
@@ -63,7 +64,7 @@ def inner(*args, **kwargs):
return outer
-enterprise_skip = run_only_when(not settings.ENTERPRISE_MODE)
+enterprise_skip = run_only_when(lambda: not settings.ENTERPRISE_MODE)
class change_log_level(ContextDecorator):
diff --git a/corehq/util/metrics/const.py b/corehq/util/metrics/const.py
index 2d239bfae46d..636e865da512 100644
--- a/corehq/util/metrics/const.py
+++ b/corehq/util/metrics/const.py
@@ -1,4 +1,4 @@
-import settings
+from django.conf import settings
ALERT_ERROR = 'error'
ALERT_WARNING = 'warning'
diff --git a/corehq/util/test_utils.py b/corehq/util/test_utils.py
index 290245028e8e..470b658190d8 100644
--- a/corehq/util/test_utils.py
+++ b/corehq/util/test_utils.py
@@ -13,7 +13,6 @@
import uuid
from collections import namedtuple
from contextlib import ExitStack, closing, contextmanager
-from datetime import datetime, timedelta
from functools import wraps
from io import StringIO, open
from textwrap import indent, wrap
@@ -29,6 +28,7 @@
from django.test import TransactionTestCase
from django.test.utils import CaptureQueriesContext
+from corehq.tests.util.timelimit import timelimit # noqa: F401
from corehq.util.context_managers import drop_connected_signals
from corehq.util.decorators import ContextDecorator
@@ -371,8 +371,8 @@ def test_foo(self, foo, bar)
their parameterized names are not valid function names. This was a
tradeoff with making parameterized tests identifiable on failure.
- Another alternative is nose test generators.
- https://nose.readthedocs.io/en/latest/writing_tests.html#test-generators
+ Another alternative is pytest.mark.parametrize.
+ https://pytest.org/en/stable/example/parametrize.html
:param argsets: A sequence of argument tuples or dicts, one for each
test case to be generated.
@@ -434,87 +434,6 @@ def test(self, args=args):
return Test
-def timelimit(limit):
- """Create a decorator that asserts a run time limit
-
- An assertion error is raised if the decorated function returns
- without raising an error and the elapsed run time is longer than
- the allowed time limit.
-
- This decorator can be used to extend the time limit imposed by
- --max-test-time when `corehq.tests.noseplugins.timing.TimingPlugin`
- is enabled.
-
- Usage:
-
- @timelimit
- def lt_one_second():
- ...
-
- @timelimit(0.5)
- def lt_half_second():
- ...
-
- See also: `patch_max_test_time` for overriding time limits for an
- entire test group (module, test class, etc.)
-
- :param limit: number of seconds or a callable to decorate. If
- callable, the time limit defaults to one second.
- """
- if callable(limit):
- return timelimit((limit, timedelta(seconds=1)))
- if not isinstance(limit, tuple):
- limit = timedelta(seconds=limit)
- return lambda func: timelimit((func, limit))
- func, limit = limit
-
- @wraps(func)
- def time_limit(*args, **kw):
- from corehq.tests.noseplugins.timing import add_time_limit
- add_time_limit(limit.total_seconds())
- start = datetime.utcnow()
- rval = func(*args, **kw)
- elapsed = datetime.utcnow() - start
- assert elapsed < limit, f"{func.__name__} took too long: {elapsed}"
- return rval
- return time_limit
-
-
-def patch_max_test_time(limit):
- """Temporarily override test time limit (--max-test-time)
-
- Note: this is only useful when spanning multiple test events because
- the limit must be present at the _end_ of a test event to take
- effect. Therefore it will do nothing if used within the context of a
- single test (use `timelimit` for that). It also does not affect the
- time limit on the final teardown fixture (in which the patch is
- removed).
-
- :param limit: New time limit (seconds).
-
- Usage at module level:
-
- TIME_LIMIT = patch_max_test_time(9)
-
- def setup_module():
- TIME_LIMIT.start()
-
- def teardown_module():
- TIME_LIMIT.stop()
-
- Usage as class decorator:
-
- @patch_max_test_time(9)
- class TestSomething(TestCase):
- ...
- """
- from corehq.tests.noseplugins.timing import patch_max_test_time
- return patch_max_test_time(limit)
-
-
-patch_max_test_time.__test__ = False
-
-
def patch_foreign_value_caches():
"""Patch django.test to clear ForeignValue LRU caches
@@ -693,10 +612,7 @@ def create_test_case(domain, case_type, case_name, case_properties=None, drop_si
def teardown(do_teardown):
- """A decorator that adds teardown logic to a test function/method
-
- NOTE this will not work for nose test generator functions.
- """
+ """A decorator that adds teardown logic to a test function/method"""
def decorate(func):
@wraps(func)
def wrapper(*args, **kw):
@@ -868,19 +784,6 @@ def new_db_connection(alias=DEFAULT_DB_ALIAS):
yield cn
-def require_db_context(fn):
- """
- Only run 'fn' in DB tests
- :param fn: a setUpModule or tearDownModule function
- """
- @wraps(fn)
- def inner(*args, **kwargs):
- from corehq.apps.domain.models import Domain
- if not isinstance(Domain.get_db(), mock.Mock):
- return fn(*args, **kwargs)
- return inner
-
-
def disable_quickcache(test_case=None):
"""A patch/decorator that disables quickcache
diff --git a/corehq/util/tests/test_utils.py b/corehq/util/tests/test_utils.py
index 176b4af9d331..66810ec510ab 100644
--- a/corehq/util/tests/test_utils.py
+++ b/corehq/util/tests/test_utils.py
@@ -1,33 +1,8 @@
-import re
-import time
-
from unittest import TestCase
-from testil import assert_raises, eq
-
-from ..test_utils import disable_quickcache, generate_cases, timelimit
-
-
-def test_timelimit_pass():
- @timelimit(0.001)
- def addone(x):
- return x + 1
- eq(addone(x=1), 2)
-
-
-def test_timelimit_fail():
- @timelimit(0.0001)
- def sleeper():
- time.sleep(0.001)
- with assert_raises(AssertionError, msg=re.compile("sleeper took too long")):
- sleeper()
-
+from testil import eq
-def test_timelimit_default():
- @timelimit
- def double(x):
- return x * 2
- eq(double(2), 4)
+from ..test_utils import disable_quickcache, generate_cases
def test_disable_quickcache():
diff --git a/corehq/warnings.py b/corehq/warnings.py
index f797bed2ebd2..5ae25fbfb4ce 100644
--- a/corehq/warnings.py
+++ b/corehq/warnings.py
@@ -21,13 +21,9 @@
# warnings that may be resolved with a library upgrade
("bs4.builder", "option of HTMLParser() has never done anything"),
("couchdbkit.schema.properties", "'collections.abc'"),
- ("ddtrace.internal.module", "the imp module is deprecated"),
+ ("ddtrace.internal.module", "pkg_resources is deprecated as an API"),
("eulxml", "pkg_resources is deprecated as an API"),
- ("nose.importer", "the imp module is deprecated"),
- ("nose.util", "inspect.getargspec() is deprecated"),
("pkg_resources", "pkg_resources.declare_namespace"),
- ("nose.suite", "'collections.abc'"),
- ("nose.plugins.collect", "'collections.abc'"),
("", "", RemovedInDjango50Warning),
("", "", RemovedInDjango51Warning),
@@ -54,8 +50,8 @@
"elasticsearch6.connection.http_urllib3",
"HTTPResponse.getheaders() is deprecated and will be removed in urllib3 v2.1.0."
),
- # Should be removed when Nose is updated
- ("nose.plugins.manager", "pkg_resources is deprecated as an API."),
+ # Open files are leaked all over the place, it will probably take a long time to fix all of them
+ ("", "unclosed file", ResourceWarning),
# other, resolution not obvious
("IPython.core.interactiveshell", "install IPython inside the virtualenv.", UserWarning),
@@ -72,7 +68,7 @@
]
-def configure_warnings(is_testing):
+def configure_warnings(is_testing=False):
strict = is_testing or os.environ.get("CCHQ_STRICT_WARNINGS")
if strict:
augment_warning_messages()
diff --git a/custom/inddex/example_data/data.py b/custom/inddex/example_data/data.py
index d072bb6bb00b..f206ad01c9f4 100644
--- a/custom/inddex/example_data/data.py
+++ b/custom/inddex/example_data/data.py
@@ -30,13 +30,13 @@ def populate_inddex_domain(domain):
user = _get_or_create_user(domain)
_import_cases(domain, user)
_import_fixtures(domain)
- _rebuild_datasource(domain)
+ return _rebuild_datasource(domain)
-def _get_or_create_user(domain):
+def _get_or_create_user(domain, create=True):
username = format_username('nick', domain)
user = CommCareUser.get_by_username(username, strict=True)
- if not user:
+ if not user and create:
user = CommCareUser.create(domain, username, 'secret', None, None)
return user
@@ -142,3 +142,4 @@ def _rebuild_datasource(domain):
config_id = StaticDataSourceConfiguration.get_doc_id(
domain, 'food_consumption_indicators')
rebuild_indicators(config_id, source='populate_inddex_test_domain')
+ return config_id
diff --git a/custom/inddex/tests/test_master_report.py b/custom/inddex/tests/test_master_report.py
index 59afcee5d91c..bff108e389d3 100644
--- a/custom/inddex/tests/test_master_report.py
+++ b/custom/inddex/tests/test_master_report.py
@@ -1,6 +1,7 @@
import csv
import doctest
import os
+from contextlib import ExitStack
from datetime import date
from django.test import SimpleTestCase, TestCase
@@ -8,19 +9,21 @@
from memoized import memoized
from unittest.mock import patch
+from unmagic import fixture
from dimagi.utils.dates import DateSpan
import custom.inddex.reports.r4_nutrient_stats
-from corehq.apps.domain.models import Domain
from corehq.apps.domain.shortcuts import create_domain
from corehq.apps.fixtures.models import LookupTable
+from corehq.apps.userreports.util import get_indicator_adapter, get_ucr_datasource_config_by_id
from corehq.form_processor.models import CommCareCase
-from corehq.util.test_utils import require_db_context
+from corehq.sql_db.util import get_db_aliases_for_partitioned_query
from ..example_data.data import (
FOOD_CASE_TYPE,
FOODRECALL_CASE_TYPE,
+ _get_or_create_user,
populate_inddex_domain,
)
from ..fixtures import FixtureAccessor
@@ -64,22 +67,36 @@ def _overwrite_report(filename, actual_report):
writer.writerows(rows)
-@require_db_context
-def setUpModule():
- create_domain(name=DOMAIN)
- try:
- with patch('corehq.apps.callcenter.data_source.get_call_center_domains', lambda: []):
- populate_inddex_domain(DOMAIN)
- except Exception:
- tearDownModule()
- raise
+@fixture(scope='module', autouse=__file__)
+def inddex_domain():
+ def on_exit(callback):
+ cleanup.callback(with_db, callback)
+ def with_db(func):
+ with db_blocker.unblock():
+ func()
-@require_db_context
-def tearDownModule():
- Domain.get_by_name(DOMAIN).delete()
- get_food_data.reset_cache()
- _get_case_ids_by_external_id.reset_cache()
+ db_blocker = fixture("django_db_blocker")()
+ with ExitStack() as cleanup:
+ cleanup.callback(_get_case_ids_by_external_id.reset_cache)
+ cleanup.callback(get_food_data.reset_cache)
+
+ with db_blocker.unblock():
+ domain = create_domain(name=DOMAIN)
+ on_exit(domain.delete)
+
+ with patch('corehq.apps.callcenter.data_source.get_call_center_domains', lambda: []):
+ datasource_config_id = populate_inddex_domain(DOMAIN)
+ config = get_ucr_datasource_config_by_id(datasource_config_id)
+
+ on_exit(lambda: _get_or_create_user(domain.name, create=False).delete(None, None))
+ on_exit(LookupTable.objects.filter(domain=domain.name).delete)
+ on_exit(get_indicator_adapter(config).drop_table)
+
+ for dbname in get_db_aliases_for_partitioned_query():
+ on_exit(CommCareCase.objects.using(dbname).filter(domain=domain.name).delete)
+
+ yield
@memoized
diff --git a/custom/up_nrhm/tests/__init__.py b/custom/up_nrhm/tests/__init__.py
index 3d1db2a18e72..bfb43a47980e 100644
--- a/custom/up_nrhm/tests/__init__.py
+++ b/custom/up_nrhm/tests/__init__.py
@@ -1,63 +1,59 @@
import os
+from contextlib import ExitStack, contextmanager
+from unittest import mock
from django.test.utils import override_settings
-from unittest import mock
import postgres_copy
import sqlalchemy
+from unmagic import fixture
-from corehq.apps.domain.models import Domain
from corehq.apps.domain.shortcuts import create_domain
from corehq.apps.userreports.models import StaticDataSourceConfiguration
from corehq.apps.userreports.util import get_indicator_adapter, get_table_name
from corehq.sql_db.connections import UCR_ENGINE_ID, connection_manager
-from corehq.util.test_utils import require_db_context
from custom.up_nrhm.sql_data import DOMAIN
-@require_db_context
-def setUpModule():
- _call_center_domain_mock = mock.patch(
- 'corehq.apps.callcenter.data_source.call_center_data_source_configuration_provider'
- )
- _call_center_domain_mock.start()
-
- domain = create_domain(DOMAIN)
-
- with override_settings(SERVER_ENVIRONMENT='production'):
-
- configs = StaticDataSourceConfiguration.by_domain(domain.name)
- adapters = [get_indicator_adapter(config) for config in configs]
-
- for adapter in adapters:
- adapter.build_table()
-
- engine = connection_manager.get_engine(UCR_ENGINE_ID)
- metadata = sqlalchemy.MetaData(bind=engine)
- metadata.reflect(bind=engine, extend_existing=True)
- path = os.path.join(os.path.dirname(__file__), 'fixtures')
- for file_name in os.listdir(path):
- with open(os.path.join(path, file_name), encoding='utf-8') as f:
- table_name = get_table_name(domain.name, file_name[:-4])
- table = metadata.tables[table_name]
- postgres_copy.copy_from(f, table, engine, format='csv', null='', header=True)
- _call_center_domain_mock.stop()
-
-
-@require_db_context
-def tearDownModule():
- _call_center_domain_mock = mock.patch(
- 'corehq.apps.callcenter.data_source.call_center_data_source_configuration_provider'
- )
- domain = Domain.get_by_name(DOMAIN)
- engine = connection_manager.get_engine(UCR_ENGINE_ID)
- metadata = sqlalchemy.MetaData(bind=engine)
- metadata.reflect(bind=engine, extend_existing=True)
- path = os.path.join(os.path.dirname(__file__), 'fixtures')
- for file_name in os.listdir(path):
- table_name = get_table_name(domain.name, file_name[:-4])
- table = metadata.tables[table_name]
- table.drop()
- _call_center_domain_mock.start()
- domain.delete()
- _call_center_domain_mock.stop()
+@fixture(scope="package", autouse=__file__)
+def up_nrhm_context():
+ def with_db(func):
+ with db_blocker.unblock():
+ func()
+
+ @contextmanager
+ def up_nrhm_domain():
+ domain = create_domain(DOMAIN)
+ yield domain
+ with call_center_domain_mock, db_blocker.unblock():
+ domain.delete()
+
+ call_center_domain_mock = mock.patch(
+ 'corehq.apps.callcenter.data_source.call_center_data_source_configuration_provider')
+ db_blocker = fixture("django_db_blocker")()
+ with ExitStack() as on_exit:
+ with (
+ call_center_domain_mock,
+ override_settings(SERVER_ENVIRONMENT='production'),
+ db_blocker.unblock(),
+ ):
+ domain = on_exit.enter_context(up_nrhm_domain())
+
+ configs = StaticDataSourceConfiguration.by_domain(domain.name)
+ adapters = [get_indicator_adapter(config) for config in configs]
+ for adapter in adapters:
+ adapter.build_table()
+ on_exit.callback(with_db, adapter.drop_table)
+
+ engine = connection_manager.get_engine(UCR_ENGINE_ID)
+ metadata = sqlalchemy.MetaData(bind=engine)
+ metadata.reflect(bind=engine, extend_existing=True)
+ path = os.path.join(os.path.dirname(__file__), 'fixtures')
+ for file_name in os.listdir(path):
+ with open(os.path.join(path, file_name), encoding='utf-8') as f:
+ table_name = get_table_name(domain.name, file_name[:-4])
+ table = metadata.tables[table_name]
+ postgres_copy.copy_from(f, table, engine, format='csv', null='', header=True)
+ on_exit.callback(with_db, table.drop)
+
+ yield # NOTE outside call_center_domain_mock / override_settings / db-unblock context
diff --git a/docker/README.rst b/docker/README.rst
index 479de8685557..f78f8992c13b 100644
--- a/docker/README.rst
+++ b/docker/README.rst
@@ -197,8 +197,8 @@ TEST=[ javascript | **python** | python-sharded | python-sharded-and-javascript
``javascript``. Also sends static analysis to Datadog if a job is a
Travis "cron" event.
-NOSE_DIVIDED_WE_RUN
- Only runs a subset of tests. See ``.travis.yml`` for exact options.
+DIVIDED_WE_RUN
+ Only runs a subset of tests. See the ``pytest dividedwerun plugin``_ for exact options.
REUSE_DB
Same as normal ``REUSE_DB``
@@ -225,6 +225,7 @@ DOCKER_HQ_OVERLAYFS_METACOPY=[ on | **off** ]
See ``.travis.yml`` for environment variable options used on Travis.
+.. _pytest dividedwerun plugin: https://github.com/dimagi/commcare-hq/blob/master/corehq/tests/pytest_plugins/dividedwerun.py
Run containers with Podman instead of Docker
============================================
diff --git a/docker/hq-compose.yml b/docker/hq-compose.yml
index 5ef92d2c1529..346a1d7d6926 100644
--- a/docker/hq-compose.yml
+++ b/docker/hq-compose.yml
@@ -16,7 +16,7 @@ services:
GITHUB_EVENT_NAME: "${GITHUB_EVENT_NAME}"
JS_SETUP: "${JS_SETUP}"
JS_TEST_EXTENSIONS: "${JS_TEST_EXTENSIONS}"
- NOSE_DIVIDED_WE_RUN: "${NOSE_DIVIDED_WE_RUN}"
+ DIVIDED_WE_RUN: "${DIVIDED_WE_RUN}"
REUSE_DB: "${REUSE_DB}"
STRIPE_PRIVATE_KEY: "${STRIPE_PRIVATE_KEY}"
TRAVIS: "${TRAVIS}"
diff --git a/docker/run.sh b/docker/run.sh
index 85bf0d6724fd..91d55007049b 100755
--- a/docker/run.sh
+++ b/docker/run.sh
@@ -176,29 +176,29 @@ function _run_tests {
python-sharded*)
export USE_PARTITIONED_DATABASE=yes
# TODO make it possible to run a subset of python-sharded tests
- py_test_args+=("--attr=sharded")
+ py_test_args+=("-msharded")
;;
python-elasticsearch-v5)
export ELASTICSEARCH_HOST='elasticsearch5'
export ELASTICSEARCH_PORT=9205
export ELASTICSEARCH_MAJOR_VERSION=5
- py_test_args+=("--attr=es_test")
+ py_test_args+=("-mes_test")
;;
esac
function _test_python {
./manage.py create_kafka_topics
if [ -n "$CI" ]; then
- logmsg INFO "coverage run manage.py test ${py_test_args[*]}"
+ logmsg INFO "coverage run $(which pytest) ${py_test_args[*]}"
# `coverage` generates a file that's then sent to codecov
- coverage run manage.py test "${py_test_args[@]}"
+ coverage run $(which pytest) "${py_test_args[@]}"
coverage xml
if [ -n "$TRAVIS" ]; then
bash <(curl -s https://codecov.io/bash)
fi
else
- logmsg INFO "./manage.py test ${py_test_args[*]}"
- ./manage.py test "${py_test_args[@]}"
+ logmsg INFO "pytest ${py_test_args[*]}"
+ pytest "${py_test_args[@]}"
fi
}
diff --git a/docs/testing.rst b/docs/testing.rst
index c3b666ec5a3f..fc7ddc4a7353 100644
--- a/docs/testing.rst
+++ b/docs/testing.rst
@@ -2,24 +2,17 @@
Testing infrastructure
======================
-Tests are run with `nose `_.
-Unlike many projects that use nose, tests cannot normally be invoked with the
-``nosetests`` command because it does not perform necessary Django setup.
-Instead, tests are invoked using the standard Django convention:
-``./manage.py test``.
+Tests are run with `pytest `_.
-Nose plugins
-============
+Pytest plugins
+==============
-Nose plugins are used for various purposes, some of which are optional and can
+Pytest plugins are used for various purposes, some of which are optional and can
be enabled with command line parameters or environment variables. Others are
-required by the test environment and are always enabled. Custom plugins are
-registered with `django-nose `_ via the
-``NOSE_PLUGINS`` setting in
-`testsettings `_.
+required by the test environment and are always enabled.
One very important always-enabled plugin applies
-`patches `_
+`patches `_
before tests are run. The patches remain in effect for the duration of the test
run unless utilities are provided to temporarily disable them. For example,
`sync_users_to_es `_
@@ -36,9 +29,10 @@ Testing best practices
Test set up
===========
-Doing a lot of work in the ``setUp`` call of a test class means that it will be run on every test. This
-quickly adds a lot of run time to the tests. Some things that can be easily moved to ``setUpClass`` are domain
-creation, user creation, or any other static models needed for the test.
+The ``setUp`` method is run before each test, so doing a lot of work there can add a lot of run time to the tests.
+If possible, consider moving common setup of things that are not mutated by tests into ``setUpClass``. Some things
+that can be easily moved to ``setUpClass`` are domain creation, user creation, or any other static models needed
+for the test.
Sometimes classes share the same base class and inherit the ``setUpClass`` function. Below is an example:
@@ -83,23 +77,49 @@ In the above example the ``setUpClass`` is run twice, once for ``MyTestClass`` a
...
However this can lead to giant Test classes. If you find that all the tests in a package or module are sharing
-the same set up, you can write a setup method for the entire package or module. More information on that can be found `here `_.
+the same set up, you can use a 'module' or 'package' `scoped fixture `_.
+
Test tear down
-==================
+==============
-It is important to ensure that all objects you have created in the test database are deleted when the test
-class finishes running. This often happens in the ``tearDown`` method or the ``tearDownClass`` method.
-However, unneccessary cleanup "just to be safe" can add a large amount of time onto your tests.
+It is important to ensure that all objects you have created in databases are deleted when the test class finishes
+running. The best practice is to use pytest `fixtures `_ or `addCleanup`
+or `addClassCleanup` to call cleanup routines. Cleanup can also be done in the ``tearDown`` method or the
+``tearDownClass`` method, although pytest fixtures and `add[Class]Cleanup` are preferred because, unlike
+`tearDown[Class]`, they get run if `setUp[Class]` raises an exception.
+SQL data in non-shard databases is automatically cleaned up on transaction rollback by test classes that override
+`django.test.TestCase` or function tests that use pytest-django's `db` fixture, so it is not necessary to delete
+SQL data in most cases. One exception to this is the shard dbs, since most usages of shard databases do not support
+transactions.
-Using SimpleTestCase
-====================
+Also beware that unneccessary cleanup "just to be safe" can add a large amount of time onto your tests and should
+be avoided.
+
+Functional database tests
+=========================
+
+Function tests may access databases by using pytest-django's ``db`` or ``transactional_db`` fixture. This is
+similar to extending ``django.test.TestCase`` or ``django.test.TransactionTestCase`` respectively.
+
+.. code:: python
+
+ from unmagic import use
+
+ @use("db")
+ def test_database():
+ ...
+
+
+Using ``SimpleTestCase`` and function tests
+===========================================
-The SimpleTestCase runs tests without a database. Many times this can be achieved through the use of the `mock
-library `_. A good rule of thumb is to have 80% of your tests be unit
-tests that utilize ``SimpleTestCase``, and then 20% of your tests be integration tests that utilize the
-database and ``TestCase``.
+``SimpleTestCase`` and function tests that do not use the ``db`` or ``transactional_db`` fixture run tests without
+a database and are often MUCH faster than database tests. Many times this can be achieved through the use of the
+`mock library `_. A good rule of thumb is to have 80% of your
+tests be unit tests that do not touch the database, and 20% of your tests be integration tests that use the
+database.
CommCare HQ also has some custom in mocking tools.
diff --git a/manage.py b/manage.py
index dd7e7e67bdd4..e7e10dd121e2 100755
--- a/manage.py
+++ b/manage.py
@@ -8,6 +8,9 @@
def main():
+ if len(sys.argv) > 1 and sys.argv[1] == 'test':
+ sys.exit("pytest is used to run HQ tests. See 'pytest --help' for options")
+
# important to apply gevent monkey patches before running any other code
# applying this later can lead to inconsistencies and threading issues
# but compressor doesn't like it
@@ -40,10 +43,9 @@ def main():
run_patches()
from corehq.warnings import configure_warnings
- configure_warnings(is_testing(sys.argv))
+ configure_warnings()
set_default_settings_path(sys.argv)
- set_nosetests_verbosity(sys.argv)
from django.core.management import execute_from_command_line
execute_from_command_line(sys.argv)
@@ -159,51 +161,12 @@ def unpatch_sys_modules():
def set_default_settings_path(argv):
- if is_testing(argv):
- os.environ.setdefault('CCHQ_TESTING', '1')
+ if os.environ.get('CCHQ_TESTING') == '1':
module = 'testsettings'
else:
module = 'settings'
os.environ.setdefault("DJANGO_SETTINGS_MODULE", module)
-def is_testing(argv):
- return len(argv) > 1 and argv[1] == 'test' or os.environ.get('CCHQ_TESTING') == '1'
-
-
-def set_nosetests_verbosity(argv):
- """Increase nose output verbosity with -v... argument
-
- -v: print test names
- -vv: do not capture stdout
- -vvv: do not capture logging
- -vvvv: enable nose internal logging
- """
- import logging
-
- def set_verbosity(arg, i):
- args = []
- verbosity = sum(1 for c in arg if c == "v") + 1
- if len(arg) > verbosity:
- # preserve other single-letter arguments (ex: -xv)
- args.append("".join(c for c in arg if c != "v"))
- if verbosity > 2:
- args.append("--nocapture")
- if verbosity > 3:
- verbosity -= 1
- args.append("--nologcapture")
- logging.basicConfig(level=logging.NOTSET)
- logging.getLogger().info(
- "Adjust logging with testsettings._set_logging_levels")
- args.append("--nose-verbosity=%s" % verbosity)
- argv[i:i + 1] = args
-
- if len(argv) > 1 and argv[1] == 'test':
- for i, arg in reversed(list(enumerate(argv))):
- if arg[:1] == "-" and arg[1] != "-" and any(c == 'v' for c in arg):
- set_verbosity(arg, i)
- break
-
-
if __name__ == "__main__":
main()
diff --git a/requirements/dev-requirements.txt b/requirements/dev-requirements.txt
index c6b14e101dda..f3e420ae1b30 100644
--- a/requirements/dev-requirements.txt
+++ b/requirements/dev-requirements.txt
@@ -203,8 +203,6 @@ django-formtools==2.3
# via
# -r base-requirements.in
# django-two-factor-auth
-django-nose @ https://github.com/dimagi/django-nose/raw/fast-first-1.4.6.1/releases/django_nose-1.4.6.1-py2.py3-none-any.whl
- # via -r test-requirements.in
django-oauth-toolkit==2.3.0
# via -r base-requirements.in
django-otp==1.1.3
@@ -255,7 +253,9 @@ ethiopian-date-converter==0.1.5
eulxml==1.1.3
# via -r base-requirements.in
exceptiongroup==1.1.1
- # via cattrs
+ # via
+ # cattrs
+ # pytest
executing==2.0.1
# via stack-data
fakecouch==0.0.15
@@ -268,7 +268,7 @@ fixture==1.5.11
# via -r dev-requirements.in
flake8==6.1.0
# via -r dev-requirements.in
-flaky==3.7.0
+flaky==3.8.1
# via -r test-requirements.in
freezegun==1.1.0
# via -r test-requirements.in
@@ -366,6 +366,8 @@ importlib-metadata==6.0.0
# markdown
# opentelemetry-api
# sphinx
+iniconfig==2.0.0
+ # via pytest
ipython==8.10.0
# via -r dev-requirements.in
iso8601==2.0.0
@@ -449,13 +451,6 @@ multidict==6.0.5
# yarl
myst-parser==2.0.0
# via -r docs-requirements.in
-nose==1.3.7
- # via
- # -r test-requirements.in
- # django-nose
- # nose-exclude
-nose-exclude==0.5.0
- # via -r test-requirements.in
oauthlib==3.1.0
# via
# django-oauth-toolkit
@@ -473,6 +468,7 @@ packaging==23.0
# -r base-requirements.in
# build
# gunicorn
+ # pytest
# sphinx
parsimonious==0.10.0
# via pyseeyou
@@ -494,6 +490,8 @@ pillow==10.3.0
# reportlab
pip-tools==7.3.0
# via -r test-requirements.in
+pluggy==1.5.0
+ # via pytest
ply==3.11
# via
# eulxml
@@ -582,6 +580,15 @@ pyrsistent==0.17.3
# via jsonschema
pyseeyou==1.0.2
# via transifex-python
+pytest @ git+https://github.com/pytest-dev/pytest.git@85760bff2776989b365167c7aeb35c86308ab76b
+ # via
+ # -r test-requirements.in
+ # pytest-django
+ # pytest-unmagic
+pytest-django==4.8.0
+ # via -r test-requirements.in
+pytest-unmagic==1.0.0
+ # via -r test-requirements.in
python-dateutil==2.8.2
# via
# -r base-requirements.in
@@ -772,6 +779,7 @@ tomli==2.0.1
# via
# build
# pip-tools
+ # pytest
toolz==0.12.1
# via pyseeyou
toposort==1.7
diff --git a/requirements/test-requirements.in b/requirements/test-requirements.in
index 30ee65fea10e..aa43e2e81d9f 100644
--- a/requirements/test-requirements.in
+++ b/requirements/test-requirements.in
@@ -1,16 +1,18 @@
-r requirements.in
-django-nose @ https://github.com/dimagi/django-nose/raw/fast-first-1.4.6.1/releases/django_nose-1.4.6.1-py2.py3-none-any.whl
fakecouch
-nose
-nose-exclude
pip-tools>=6.8.0
testil
requests-mock
sqlalchemy-postgres-copy
Faker
-flaky
+flaky>=3.8 # required for pytest 8.2 (and possibly earlier versions)
freezegun
radon>=5.0 # v5 no longer depends on flake8-polyfill
+# pytest is pinned to a git commit until 8.4 is released.
+# `minversion = 8.4` should also be set in .pytest.ini at that time.
+pytest @ git+https://github.com/pytest-dev/pytest.git@85760bff2776989b365167c7aeb35c86308ab76b
+pytest-django
+pytest-unmagic
coverage
uWSGI
diff --git a/requirements/test-requirements.txt b/requirements/test-requirements.txt
index 6cc74b6f4039..ec6573006e59 100644
--- a/requirements/test-requirements.txt
+++ b/requirements/test-requirements.txt
@@ -182,8 +182,6 @@ django-formtools==2.3
# via
# -r base-requirements.in
# django-two-factor-auth
-django-nose @ https://github.com/dimagi/django-nose/raw/fast-first-1.4.6.1/releases/django_nose-1.4.6.1-py2.py3-none-any.whl
- # via -r test-requirements.in
django-oauth-toolkit==2.3.0
# via -r base-requirements.in
django-otp==1.1.3
@@ -229,14 +227,16 @@ ethiopian-date-converter==0.1.5
eulxml==1.1.3
# via -r base-requirements.in
exceptiongroup==1.1.1
- # via cattrs
+ # via
+ # cattrs
+ # pytest
fakecouch==0.0.15
# via -r test-requirements.in
faker==26.0.0
# via -r test-requirements.in
firebase-admin==6.1.0
# via -r base-requirements.in
-flaky==3.7.0
+flaky==3.8.1
# via -r test-requirements.in
freezegun==1.1.0
# via -r test-requirements.in
@@ -326,6 +326,8 @@ importlib-metadata==6.0.0
# via
# markdown
# opentelemetry-api
+iniconfig==2.0.0
+ # via pytest
iso8601==2.0.0
# via -r base-requirements.in
isodate==0.6.1
@@ -385,13 +387,6 @@ multidict==6.0.5
# via
# aiohttp
# yarl
-nose==1.3.7
- # via
- # -r test-requirements.in
- # django-nose
- # nose-exclude
-nose-exclude==0.5.0
- # via -r test-requirements.in
oauthlib==3.1.0
# via
# django-oauth-toolkit
@@ -409,6 +404,7 @@ packaging==23.0
# -r base-requirements.in
# build
# gunicorn
+ # pytest
parsimonious==0.10.0
# via pyseeyou
pep517==0.10.0
@@ -423,6 +419,8 @@ pillow==10.3.0
# reportlab
pip-tools==7.3.0
# via -r test-requirements.in
+pluggy==1.5.0
+ # via pytest
ply==3.11
# via
# eulxml
@@ -491,6 +489,15 @@ pyrsistent==0.17.3
# via jsonschema
pyseeyou==1.0.2
# via transifex-python
+pytest @ git+https://github.com/pytest-dev/pytest.git@85760bff2776989b365167c7aeb35c86308ab76b
+ # via
+ # -r test-requirements.in
+ # pytest-django
+ # pytest-unmagic
+pytest-django==4.8.0
+ # via -r test-requirements.in
+pytest-unmagic==1.0.0
+ # via -r test-requirements.in
python-dateutil==2.8.2
# via
# -r base-requirements.in
@@ -642,6 +649,7 @@ tomli==2.0.1
# via
# build
# pip-tools
+ # pytest
toolz==0.12.1
# via pyseeyou
toposort==1.7
diff --git a/scripts/datadog-utils.sh b/scripts/datadog-utils.sh
index 1fd7a6b4aa00..5d133d717208 100644
--- a/scripts/datadog-utils.sh
+++ b/scripts/datadog-utils.sh
@@ -40,7 +40,7 @@ function send_metric_to_datadog() {
\"tags\":[
$EXTRA_TAG
\"environment:$CI_ENV\",
- \"partition:$NOSE_DIVIDED_WE_RUN\"
+ \"partition:$DIVIDED_WE_RUN\"
]}
]
}" \
diff --git a/scripts/docker b/scripts/docker
index 728bc77895c1..adf26f4286ed 100755
--- a/scripts/docker
+++ b/scripts/docker
@@ -237,7 +237,7 @@ export GITHUB_ACTIONS="$GITHUB_ACTIONS"
export GITHUB_EVENT_NAME="$GITHUB_EVENT_NAME"
export JS_SETUP="${JS_SETUP:-no}"
export JS_TEST_EXTENSIONS="$JS_TEST_EXTENSIONS"
-export NOSE_DIVIDED_WE_RUN="$NOSE_DIVIDED_WE_RUN"
+export DIVIDED_WE_RUN="$DIVIDED_WE_RUN"
export REUSE_DB="$REUSE_DB"
export STRIPE_PRIVATE_KEY="$STRIPE_PRIVATE_KEY"
export TRAVIS="$TRAVIS"
diff --git a/testsettings.py b/testsettings.py
index 2246e17dc5e2..2a8beed9ba0c 100644
--- a/testsettings.py
+++ b/testsettings.py
@@ -2,7 +2,8 @@
from warnings import filterwarnings
import settingshelper as helper
-from settings import * # noqa: F403
+assert helper.is_testing(), 'test mode is required before importing settings'
+from settings import * # noqa: E402, F403
# Commenting out temporarily for tests
# if os.environ.get('ELASTICSEARCH_MAJOR_VERSION'):
@@ -53,45 +54,12 @@
# it can be reverted whenever that's figured out.
# https://github.com/dimagi/commcare-hq/pull/10034#issuecomment-174868270
INSTALLED_APPS = (
- 'django_nose',
'testapps.test_elasticsearch',
'testapps.test_pillowtop',
) + tuple(INSTALLED_APPS) # noqa: F405
-TEST_RUNNER = 'django_nose.BasicNoseRunner'
-NOSE_ARGS = [
- #'--no-migrations' # trim ~120s from test run with db tests
- #'--with-fixture-bundling',
-]
-NOSE_PLUGINS = [
- 'corehq.tests.nose.HqTestFinderPlugin',
- 'corehq.tests.noseplugins.classcleanup.ClassCleanupPlugin',
- 'corehq.tests.noseplugins.dbtransaction.DatabaseTransactionPlugin',
- 'corehq.tests.noseplugins.dividedwerun.DividedWeRunPlugin',
- 'corehq.tests.noseplugins.djangomigrations.DjangoMigrationsPlugin',
- 'corehq.tests.noseplugins.cmdline_params.CmdLineParametersPlugin',
- 'corehq.tests.noseplugins.patches.PatchesPlugin',
- 'corehq.tests.noseplugins.redislocks.RedisLockTimeoutPlugin',
- 'corehq.tests.noseplugins.uniformresult.UniformTestResultPlugin',
-
- # The following are not enabled by default
- 'corehq.tests.noseplugins.logfile.LogFilePlugin',
- 'corehq.tests.noseplugins.timing.TimingPlugin',
- 'corehq.tests.noseplugins.output.OutputPlugin',
- 'corehq.tests.noseplugins.elasticsnitch.ElasticSnitchPlugin',
-
- # Uncomment to debug tests. Plugins have nice hooks for inspecting state
- # before/after each test or context setup/teardown, etc.
- #'corehq.tests.noseplugins.debug.DebugPlugin',
-]
-
# these settings can be overridden with environment variables
for key, value in {
- 'NOSE_DB_TEST_CONTEXT': 'corehq.tests.nose.HqdbContext',
- 'NOSE_NON_DB_TEST_CONTEXT': 'corehq.tests.nose.ErrorOnDbAccessContext',
- 'NOSE_IGNORE_FILES': '^localsettings',
- 'NOSE_EXCLUDE_DIRS': 'scripts',
-
'DD_DOGSTATSD_DISABLE': 'true',
'DD_TRACE_ENABLED': 'false',
}.items():
@@ -146,7 +114,6 @@ def _set_logging_levels(levels):
'urllib3': 'WARNING',
})
-# use empty LOGGING dict with --debug=nose,nose.plugins to debug test discovery
# TODO empty logging config (and fix revealed deprecation warnings)
LOGGING = {
'disable_existing_loggers': False,