diff --git a/.github/workflows/pythonbuild.yml b/.github/workflows/pythonbuild.yml index 26d2980cc3..2f8e12f63b 100644 --- a/.github/workflows/pythonbuild.yml +++ b/.github/workflows/pythonbuild.yml @@ -25,6 +25,8 @@ jobs: tox-env: py310-core - python-version: "3.11" tox-env: py311-core + - python-version: "3.12" + tox-env: py312-core steps: - uses: actions/checkout@v2 @@ -92,6 +94,8 @@ jobs: tox-env: py310-postgres - python-version: "3.11" tox-env: py311-postgres + - python-version: "3.12" + tox-env: py312-postgres steps: - uses: actions/checkout@v2 @@ -145,6 +149,8 @@ jobs: tox-env: py310-aws - python-version: "3.11" tox-env: py311-aws + - python-version: "3.12" + tox-env: py312-aws - python-version: "3.6" tox-env: py36-unixsocket @@ -164,6 +170,9 @@ jobs: - python-version: "3.11" tox-env: py311-unixsocket OVERRIDE_SKIP_CI_TESTS: True + - python-version: "3.12" + tox-env: py312-unixsocket + OVERRIDE_SKIP_CI_TESTS: True - python-version: "3.6" tox-env: py36-apache @@ -177,6 +186,8 @@ jobs: tox-env: py310-apache - python-version: "3.11" tox-env: py311-apache + - python-version: "3.12" + tox-env: py312-apache - python-version: "3.6" tox-env: py36-azureblob @@ -190,6 +201,8 @@ jobs: tox-env: py310-azureblob - python-version: "3.11" tox-env: py311-azureblob + - python-version: "3.12" + tox-env: py312-azureblob - python-version: 3.9 diff --git a/README.rst b/README.rst index 1af2c3a841..deaffe4dc0 100644 --- a/README.rst +++ b/README.rst @@ -18,7 +18,7 @@ :target: https://luigi.readthedocs.io/en/stable/?badge=stable :alt: Documentation Status -Luigi is a Python (3.6, 3.7, 3.8, 3.9, 3.10, 3.11 tested) package that helps you build complex +Luigi is a Python (3.6, 3.7, 3.8, 3.9, 3.10, 3.11, 3.12 tested) package that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow management, visualization, handling failures, command line integration, and much more. diff --git a/doc/configuration.rst b/doc/configuration.rst index 2499fb7567..c4b1fdbe0c 100644 --- a/doc/configuration.rst +++ b/doc/configuration.rst @@ -432,13 +432,13 @@ traceback_max_length Maximum length for traceback included in error email. Default is 5000. -[batch_notifier] +[batch_email] ---------------- Parameters controlling the contents of batch notifications sent from the scheduler -email_interval_minutes +email_interval Number of minutes between e-mail sends. Making this larger results in fewer, bigger e-mails. Defaults to 60. diff --git a/luigi/__meta__.py b/luigi/__meta__.py index 39c6ffc1f7..b9bf1e3ac0 100644 --- a/luigi/__meta__.py +++ b/luigi/__meta__.py @@ -7,5 +7,5 @@ __author__ = 'The Luigi Authors' __contact__ = 'https://github.com/spotify/luigi' __license__ = 'Apache License 2.0' -__version__ = '3.5.0' +__version__ = '3.5.1' __status__ = 'Production' diff --git a/luigi/contrib/azureblob.py b/luigi/contrib/azureblob.py index 20de24224a..38c96b6973 100644 --- a/luigi/contrib/azureblob.py +++ b/luigi/contrib/azureblob.py @@ -20,7 +20,7 @@ import logging import datetime -from azure.storage.blob import blockblobservice +from azure.storage.blob import BlobServiceClient from luigi.format import get_default_format from luigi.target import FileAlreadyExists, FileSystem, AtomicLocalFile, FileSystemTarget @@ -62,60 +62,101 @@ def __init__(self, account_name=None, account_key=None, sas_token=None, **kwargs * `custom_domain` - The custom domain to use. This can be set in the Azure Portal. For example, ‘www.mydomain.com’. * `token_credential` - A token credential used to authenticate HTTPS requests. The token value should be updated before its expiration. """ - self.options = {"account_name": account_name, "account_key": account_key, "sas_token": sas_token} + if kwargs.get("custom_domain"): + account_url = "{protocol}://{custom_domain}/{account_name}".format(protocol=kwargs.get("protocol", "https"), + custom_domain=kwargs.get("custom_domain"), + account_name=account_name) + else: + account_url = "{protocol}://{account_name}.blob.{endpoint_suffix}".format(protocol=kwargs.get("protocol", + "https"), + account_name=account_name, + endpoint_suffix=kwargs.get( + "endpoint_suffix", + "core.windows.net")) + + self.options = { + "account_name": account_name, + "account_key": account_key, + "account_url": account_url, + "sas_token": sas_token} self.kwargs = kwargs @property def connection(self): - return blockblobservice.BlockBlobService(account_name=self.options.get("account_name"), - account_key=self.options.get("account_key"), - sas_token=self.options.get("sas_token"), - protocol=self.kwargs.get("protocol"), - connection_string=self.kwargs.get("connection_string"), - endpoint_suffix=self.kwargs.get("endpoint_suffix"), - custom_domain=self.kwargs.get("custom_domain"), - is_emulated=self.kwargs.get("is_emulated") or False) + if self.kwargs.get("connection_string"): + return BlobServiceClient.from_connection_string(conn_str=self.kwargs.get("connection_string"), + **self.kwargs) + else: + return BlobServiceClient(account_url=self.options.get("account_url"), + credential=self.options.get("account_key") or self.options.get("sas_token"), + **self.kwargs) + + def container_client(self, container_name): + return self.connection.get_container_client(container_name) + + def blob_client(self, container_name, blob_name): + container_client = self.container_client(container_name) + return container_client.get_blob_client(blob_name) def upload(self, tmp_path, container, blob, **kwargs): logging.debug("Uploading file '{tmp_path}' to container '{container}' and blob '{blob}'".format( tmp_path=tmp_path, container=container, blob=blob)) self.create_container(container) - lease_id = self.connection.acquire_blob_lease(container, blob)\ - if self.exists("{container}/{blob}".format(container=container, blob=blob)) else None + lease = None + blob_client = self.blob_client(container, blob) + if blob_client.exists(): + lease = blob_client.acquire_lease() try: - self.connection.create_blob_from_path(container, blob, tmp_path, lease_id=lease_id, progress_callback=kwargs.get("progress_callback")) + with open(tmp_path, 'rb') as data: + blob_client.upload_blob(data, + overwrite=True, + lease=lease, + progress_hook=kwargs.get("progress_callback")) finally: - if lease_id is not None: - self.connection.release_blob_lease(container, blob, lease_id) + if lease is not None: + lease.release() def download_as_bytes(self, container, blob, bytes_to_read=None): - start_range, end_range = (0, bytes_to_read-1) if bytes_to_read is not None else (None, None) logging.debug("Downloading from container '{container}' and blob '{blob}' as bytes".format( container=container, blob=blob)) - return self.connection.get_blob_to_bytes(container, blob, start_range=start_range, end_range=end_range).content + blob_client = self.blob_client(container, blob) + download_stream = blob_client.download_blob(offset=0, length=bytes_to_read) if bytes_to_read \ + else blob_client.download_blob() + return download_stream.readall() def download_as_file(self, container, blob, location): logging.debug("Downloading from container '{container}' and blob '{blob}' to {location}".format( container=container, blob=blob, location=location)) - return self.connection.get_blob_to_path(container, blob, location) + blob_client = self.blob_client(container, blob) + with open(location, 'wb') as file: + download_stream = blob_client.download_blob() + file.write(download_stream.readall()) + return blob_client.get_blob_properties() def create_container(self, container_name): - return self.connection.create_container(container_name) + if not self.exists(container_name): + return self.connection.create_container(container_name) def delete_container(self, container_name): - lease_id = self.connection.acquire_container_lease(container_name) - self.connection.delete_container(container_name, lease_id=lease_id) + container_client = self.container_client(container_name) + lease = container_client.acquire_lease() + container_client.delete_container(lease=lease) def exists(self, path): container, blob = self.splitfilepath(path) - return self.connection.exists(container, blob) + if blob is None: + return self.container_client(container).exists() + else: + return self.blob_client(container, blob).exists() def remove(self, path, recursive=True, skip_trash=True): - container, blob = self.splitfilepath(path) if not self.exists(path): return False - lease_id = self.connection.acquire_blob_lease(container, blob) - self.connection.delete_blob(container, blob, lease_id=lease_id) + + container, blob = self.splitfilepath(path) + blob_client = self.blob_client(container, blob) + lease = blob_client.acquire_lease() + blob_client.delete_blob(lease=lease) return True def mkdir(self, path, parents=True, raise_if_exists=False): @@ -148,16 +189,18 @@ def copy(self, path, dest): source_container=source_container, dest_container=dest_container )) - source_lease_id = self.connection.acquire_blob_lease(source_container, source_blob) - destination_lease_id = self.connection.acquire_blob_lease(dest_container, dest_blob) if self.exists(dest) else None + source_blob_client = self.blob_client(source_container, source_blob) + dest_blob_client = self.blob_client(dest_container, dest_blob) + source_lease = source_blob_client.acquire_lease() + destination_lease = dest_blob_client.acquire_lease() if self.exists(dest) else None try: - return self.connection.copy_blob(source_container, dest_blob, self.connection.make_blob_url( - source_container, source_blob), - destination_lease_id=destination_lease_id, source_lease_id=source_lease_id) + return dest_blob_client.start_copy_from_url(source_url=source_blob_client.url, + source_lease=source_lease, + destination_lease=destination_lease) finally: - self.connection.release_blob_lease(source_container, source_blob, source_lease_id) - if destination_lease_id is not None: - self.connection.release_blob_lease(dest_container, dest_blob, destination_lease_id) + source_lease.release() + if destination_lease is not None: + destination_lease.release() def rename_dont_move(self, path, dest): self.move(path, dest) diff --git a/luigi/contrib/hadoop_jar.py b/luigi/contrib/hadoop_jar.py index 2635aeca5e..74f171ab9a 100644 --- a/luigi/contrib/hadoop_jar.py +++ b/luigi/contrib/hadoop_jar.py @@ -20,8 +20,8 @@ import logging import os -import pipes import random +import shlex import warnings import luigi.contrib.hadoop @@ -110,7 +110,7 @@ def run_job(self, job, tracking_url_callback=None): arglist += ['-o', 'UserKnownHostsFile=/dev/null', '-o', 'StrictHostKeyChecking=no'] arglist.append('{}@{}'.format(username, host)) - hadoop_arglist = [pipes.quote(arg) for arg in hadoop_arglist] + hadoop_arglist = [shlex.quote(arg) for arg in hadoop_arglist] arglist.append(' '.join(hadoop_arglist)) else: if not os.path.exists(job.jar()): diff --git a/luigi/parameter.py b/luigi/parameter.py index b101640e99..f7f137a6d1 100644 --- a/luigi/parameter.py +++ b/luigi/parameter.py @@ -309,7 +309,7 @@ def normalize(self, x): """ return x # default impl - def next_in_enumeration(self, _value): + def next_in_enumeration(self, value): """ If your Parameter type has an enumerable ordering of values. You can choose to override this method. This method is used by the @@ -389,6 +389,9 @@ def _warn_on_wrong_param_type(self, param_name, param_value): OptionalParameterTypeWarning, ) + def next_in_enumeration(self, value): + return None + class OptionalParameter(OptionalParameterMixin, Parameter): """Class to parse optional parameters.""" diff --git a/luigi/py.typed b/luigi/py.typed new file mode 100644 index 0000000000..e69de29bb2 diff --git a/luigi/static/visualiser/js/graph.js b/luigi/static/visualiser/js/graph.js index f996526bd7..449dd1c0b8 100644 --- a/luigi/static/visualiser/js/graph.js +++ b/luigi/static/visualiser/js/graph.js @@ -146,8 +146,8 @@ Graph = (function() { $.each(n.deps, function(i, dep) { if (nodeIndex[dep]) { var next_node = nodes[nodeIndex[dep]] - var depth = (selfDependencies ? depth + 1 : classDepths[next_node.name]) - placeNodes(next_node, depth); + var next_depth = (selfDependencies ? depth + 1 : classDepths[next_node.name]) + placeNodes(next_node, next_depth); } }); } diff --git a/luigi/static/visualiser/js/test/graph_test.js b/luigi/static/visualiser/js/test/graph_test.js index 780f6d3839..1103bb4fe6 100644 --- a/luigi/static/visualiser/js/test/graph_test.js +++ b/luigi/static/visualiser/js/test/graph_test.js @@ -2,18 +2,27 @@ module("graph.js"); test("nodeFromTask", function() { var task = { - deps: ["B","C"], - taskId: "A", - status: "DONE" + deps: ["B1","C1"], + taskId: "A1", + status: "DONE", + name: "A", + params: {}, + priority: 0, }; var expected = { - taskId: "A", + taskId: "A1", status: "DONE", - trackingUrl: "#A", - deps: ["B","C"], - depth: -1 + trackingUrl: "#A1", + deps: ["B1","C1"], + depth: -1, + name: "A", + params: {}, + priority: 0, }; - deepEqual(Graph.testableMethods.nodeFromTask(task), expected); + let graph = { + hashBase: "#" + } + deepEqual(Graph.testableMethods.nodeFromTask.bind(graph)(task), expected); }); test("uniqueIndexByProperty", function() { @@ -70,7 +79,7 @@ test("computeRowsSelfDeps", function () { var rowSizes = Graph.testableMethods.computeRows(nodes, nodeIndex) equal(A1.depth, 0) equal(A2.depth, 1) - equal(rowSizes, [1, 1]) + deepEqual(rowSizes, [1, 1]) }); test("computeRowsGrouped", function() { @@ -83,6 +92,8 @@ test("computeRowsGrouped", function() { var D2 = {name: "D", taskId: "D2", deps: [], depth: -1} var E1 = {name: "E", taskId: "E1", deps: [], depth: -1} var E2 = {name: "E", taskId: "E2", deps: [], depth: -1} + var nodes = [A0, B0, C1, C2, D0, D1, D2, E1, E2] + var nodeIndex = {"A0": 0, "B0": 1, "C1": 2, "C2": 3, "D0": 4, "D1": 5, "D2": 6, "E1": 7, "E2": 8} var rowSizes = Graph.testableMethods.computeRows(nodes, nodeIndex) equal(A0.depth, 0) equal(B0.depth, 1) @@ -93,7 +104,7 @@ test("computeRowsGrouped", function() { equal(D2.depth, 3) equal(E1.depth, 4) equal(E2.depth, 4) - equal(rowSizes, [1, 1, 2, 3, 2]) + deepEqual(rowSizes, [1, 1, 2, 3, 2]) }); test("createGraph", function() { diff --git a/luigi/task.py b/luigi/task.py index f5a108dc23..4d3a312884 100644 --- a/luigi/task.py +++ b/luigi/task.py @@ -264,6 +264,13 @@ def wrapped(callback): return callback return wrapped + @classmethod + def remove_event_handler(cls, event, callback): + """ + Function to remove the event handler registered previously by the cls.event_handler decorator. + """ + cls._event_callbacks[cls][event].remove(callback) + def trigger_event(self, event, *args, **kwargs): """ Trigger that calls all of the specified events associated with this class. diff --git a/luigi/worker.py b/luigi/worker.py index c3ea777b8a..a11f808ae5 100644 --- a/luigi/worker.py +++ b/luigi/worker.py @@ -55,7 +55,7 @@ from luigi.scheduler import DISABLED, DONE, FAILED, PENDING, UNKNOWN, Scheduler, RetryPolicy from luigi.scheduler import WORKER_STATE_ACTIVE, WORKER_STATE_DISABLED from luigi.target import Target -from luigi.task import Task, Config, DynamicRequirements +from luigi.task import Task, Config, DynamicRequirements, flatten from luigi.task_register import TaskClassException from luigi.task_status import RUNNING from luigi.parameter import BoolParameter, FloatParameter, IntParameter, OptionalParameter, Parameter, TimeDeltaParameter @@ -185,7 +185,7 @@ def run(self): missing = [] for dep in self.task.deps(): if not self.check_complete(dep): - nonexistent_outputs = [output for output in dep.output() if not output.exists()] + nonexistent_outputs = [output for output in flatten(dep.output()) if not output.exists()] if nonexistent_outputs: missing.append(f'{dep.task_id} ({", ".join(map(str, nonexistent_outputs))})') else: diff --git a/scripts/ci/install_start_azurite.sh b/scripts/ci/install_start_azurite.sh index e4a011139b..a73bc8c81b 100755 --- a/scripts/ci/install_start_azurite.sh +++ b/scripts/ci/install_start_azurite.sh @@ -2,7 +2,7 @@ echo "$DOCKERHUB_TOKEN" | docker login -u spotifyci --password-stdin -docker pull arafato/azurite +docker pull mcr.microsoft.com/azure-storage/azurite mkdir -p blob_emulator $1/stop_azurite.sh -docker run -e executable=blob -d -t -p 10000:10000 -v blob_emulator:/opt/azurite/folder arafato/azurite +docker run -p 10000:10000 -v blob_emulator:/data -e AZURITE_ACCOUNTS=devstoreaccount1:YXp1cml0ZQ== -d mcr.microsoft.com/azure-storage/azurite azurite-blob -l /data --blobHost 0.0.0.0 --blobPort 10000 diff --git a/scripts/ci/stop_azurite.sh b/scripts/ci/stop_azurite.sh index 834f5e7bd6..f1b8437e68 100755 --- a/scripts/ci/stop_azurite.sh +++ b/scripts/ci/stop_azurite.sh @@ -1,2 +1,2 @@ #!/usr/bin/env bash -docker stop $(docker ps -q --filter ancestor=arafato/azurite) \ No newline at end of file +docker stop "$(docker ps -q --filter ancestor=mcr.microsoft.com/azure-storage/azurite)" \ No newline at end of file diff --git a/setup.py b/setup.py index c56e54811e..e01d76018a 100644 --- a/setup.py +++ b/setup.py @@ -25,7 +25,7 @@ def get_static_files(path): "*.eot", "*.svg", "*.ttf", "*.woff", "*.woff2"]] -luigi_package_data = sum(map(get_static_files, ["luigi/static", "luigi/templates"]), []) +luigi_package_data = sum(map(get_static_files, ["luigi/static", "luigi/templates"]), ["py.typed"]) readme_note = """ .. note:: @@ -118,6 +118,7 @@ def get_static_files(path): 'Programming Language :: Python :: 3.9', 'Programming Language :: Python :: 3.10', 'Programming Language :: Python :: 3.11', + 'Programming Language :: Python :: 3.12', 'Topic :: System :: Monitoring', ], ) diff --git a/test/cmdline_test.py b/test/cmdline_test.py index 7db317ded2..f8d2b4ced8 100644 --- a/test/cmdline_test.py +++ b/test/cmdline_test.py @@ -362,7 +362,7 @@ def test_cmd_line_params_are_available_for_execution_summary(self): print(stdout) print(stderr) - self.assertNotEquals(returncode, 1) + self.assertNotEqual(returncode, 1) self.assertFalse(b'required_test_param' in stderr) diff --git a/test/contrib/azureblob_test.py b/test/contrib/azureblob_test.py index d587768c2e..c887000be0 100644 --- a/test/contrib/azureblob_test.py +++ b/test/contrib/azureblob_test.py @@ -26,12 +26,14 @@ import luigi from luigi.contrib.azureblob import AzureBlobClient, AzureBlobTarget +from luigi.target import FileAlreadyExists -account_name = os.environ.get("ACCOUNT_NAME") -account_key = os.environ.get("ACCOUNT_KEY") -sas_token = os.environ.get("SAS_TOKEN") -is_emulated = False if account_name else True -client = AzureBlobClient(account_name, account_key, sas_token, is_emulated=is_emulated) +account_name = os.environ.get("AZURITE_ACCOUNT_NAME") +account_key = os.environ.get("AZURITE_ACCOUNT_KEY") +sas_token = os.environ.get("AZURITE_SAS_TOKEN") +custom_domain = os.environ.get("AZURITE_CUSTOM_DOMAIN") +protocol = os.environ.get("AZURITE_PROTOCOL", "http") +client = AzureBlobClient(account_name, account_key, sas_token, custom_domain=custom_domain, protocol=protocol) @pytest.mark.azureblob @@ -95,8 +97,15 @@ def test_upload_copy_move_remove_blob(self): self.client.upload(f.name, container_name, from_blob_name) self.assertTrue(self.client.exists(from_path)) + # mkdir + self.assertRaises(FileAlreadyExists, self.client.mkdir, from_path, False, True) + + # mkdir does not actually create anything + self.client.mkdir(to_path, True, True) + self.assertFalse(self.client.exists(to_path)) + # copy - self.assertIn(self.client.copy(from_path, to_path).status, ["success", "pending"]) + self.assertIn(self.client.copy(from_path, to_path)["copy_status"], ["success", "pending"]) self.assertTrue(self.client.exists(to_path)) # remove @@ -121,7 +130,7 @@ def output(self): return AzureBlobTarget("luigi-test", "movie-cheesy.txt", client, download_when_reading=False) def run(self): - client.connection.create_container("luigi-test") + client.create_container("luigi-test") with self.output().open("w") as op: op.write("I'm going to make him an offer he can't refuse.\n") op.write("Toto, I've got a feeling we're not in Kansas anymore.\n") diff --git a/test/contrib/external_daily_snapshot_test.py b/test/contrib/external_daily_snapshot_test.py index ff273dd751..53c9d8a0b5 100644 --- a/test/contrib/external_daily_snapshot_test.py +++ b/test/contrib/external_daily_snapshot_test.py @@ -32,18 +32,18 @@ class ExternalDailySnapshotTest(unittest.TestCase): def test_latest(self): MockTarget('data-xyz-zebra-Congo-2012-01-01').open('w').close() d = DataDump.latest(date=datetime.date(2012, 1, 10), param='xyz') - self.assertEquals(d.date, datetime.date(2012, 1, 1)) + self.assertEqual(d.date, datetime.date(2012, 1, 1)) def test_latest_not_exists(self): MockTarget('data-abc-zebra-Congo-2012-01-01').open('w').close() d = DataDump.latest(date=datetime.date(2012, 1, 11), param='abc', lookback=5) - self.assertEquals(d.date, datetime.date(2012, 1, 7)) + self.assertEqual(d.date, datetime.date(2012, 1, 7)) def test_deterministic(self): MockTarget('data-pqr-zebra-Congo-2012-01-01').open('w').close() d = DataDump.latest(date=datetime.date(2012, 1, 10), param='pqr', a='zebra', aa='Congo') - self.assertEquals(d.date, datetime.date(2012, 1, 1)) + self.assertEqual(d.date, datetime.date(2012, 1, 1)) MockTarget('data-pqr-zebra-Congo-2012-01-05').open('w').close() d = DataDump.latest(date=datetime.date(2012, 1, 10), param='pqr', aa='Congo', a='zebra') - self.assertEquals(d.date, datetime.date(2012, 1, 1)) # Should still be the same + self.assertEqual(d.date, datetime.date(2012, 1, 1)) # Should still be the same diff --git a/test/contrib/hadoop_test.py b/test/contrib/hadoop_test.py index b3db99a0bc..8e1c2fe64d 100644 --- a/test/contrib/hadoop_test.py +++ b/test/contrib/hadoop_test.py @@ -200,7 +200,7 @@ def test_run_2(test_case): job = WordFreqJob(use_hdfs=test_case.use_hdfs) luigi.build([job], local_scheduler=True) c = read_wordcount_output(job.output()) - test_case.assertAlmostEquals(float(c['jk']), 6.0 / 33.0) + test_case.assertAlmostEqual(float(c['jk']), 6.0 / 33.0) @staticmethod def test_map_only(test_case): diff --git a/test/contrib/postgres_test.py b/test/contrib/postgres_test.py index f08a330f84..bf9c0b89ac 100644 --- a/test/contrib/postgres_test.py +++ b/test/contrib/postgres_test.py @@ -132,11 +132,11 @@ def test_bulk_complete(self, mock_connect): def test_override_port(self): output = DummyPostgresQueryWithPort(date=datetime.datetime(1991, 3, 24)).output() - self.assertEquals(output.port, 1234) + self.assertEqual(output.port, 1234) def test_port_encoded_in_host(self): output = DummyPostgresQueryWithPortEncodedInHost(date=datetime.datetime(1991, 3, 24)).output() - self.assertEquals(output.port, '1234') + self.assertEqual(output.port, '1234') @pytest.mark.postgres diff --git a/test/contrib/s3_test.py b/test/contrib/s3_test.py index 845806606b..c24e69899a 100644 --- a/test/contrib/s3_test.py +++ b/test/contrib/s3_test.py @@ -20,7 +20,8 @@ import tempfile import boto3 -from boto.s3 import key +if sys.version_info[:2] <= (3, 11): + from boto.s3 import key from botocore.exceptions import ClientError from mock import patch @@ -107,6 +108,7 @@ def test_read_no_file_sse(self): t = self.create_target(encrypt_key=True) self.assertRaises(FileNotFoundException, t.open) + @unittest.skipIf(tuple(sys.version_info) >= (3, 12), "boto is not supported on Python 3.12+") def test_read_iterator_long(self): # write a file that is 5X the boto buffersize # to test line buffering @@ -342,7 +344,7 @@ def test_get(self): s3_client.get('s3://mybucket/putMe', tmp_file_path) with open(tmp_file_path, 'r') as f: content = f.read() - self.assertEquals(content, self.tempFileContents.decode("utf-8")) + self.assertEqual(content, self.tempFileContents.decode("utf-8")) tmp_file.close() def test_get_as_bytes(self): @@ -352,7 +354,7 @@ def test_get_as_bytes(self): contents = s3_client.get_as_bytes('s3://mybucket/putMe') - self.assertEquals(contents, self.tempFileContents) + self.assertEqual(contents, self.tempFileContents) def test_get_as_string(self): create_bucket() @@ -361,7 +363,7 @@ def test_get_as_string(self): contents = s3_client.get_as_string('s3://mybucket/putMe2') - self.assertEquals(contents, self.tempFileContents.decode('utf-8')) + self.assertEqual(contents, self.tempFileContents.decode('utf-8')) def test_get_as_string_latin1(self): create_bucket() @@ -370,7 +372,7 @@ def test_get_as_string_latin1(self): contents = s3_client.get_as_string('s3://mybucket/putMe3', encoding='ISO-8859-1') - self.assertEquals(contents, self.tempFileContents.decode('ISO-8859-1')) + self.assertEqual(contents, self.tempFileContents.decode('ISO-8859-1')) def test_get_key(self): create_bucket() diff --git a/test/event_callbacks_test.py b/test/event_callbacks_test.py index 1d24752214..5382438628 100644 --- a/test/event_callbacks_test.py +++ b/test/event_callbacks_test.py @@ -162,6 +162,21 @@ def test_processing_time_handler_failure(self): t, result = self._run_processing_time_handler(True) self.assertEqual(result, []) + def test_remove_event_handler(self): + run_cnt = 0 + + @EmptyTask.event_handler(luigi.Event.START) + def handler(task): + nonlocal run_cnt + run_cnt += 1 + + task = EmptyTask() + build([task], local_scheduler=True) + assert run_cnt == 1 + EmptyTask.remove_event_handler(luigi.Event.START, handler) + build([task], local_scheduler=True) + assert run_cnt == 1 + # A # / \ diff --git a/test/interface_test.py b/test/interface_test.py index 8418683a34..502a408640 100644 --- a/test/interface_test.py +++ b/test/interface_test.py @@ -204,9 +204,9 @@ class CoreConfigTest(LuigiTestCase): @with_config({}) def test_parallel_scheduling_processes_default(self): - self.assertEquals(0, core().parallel_scheduling_processes) + self.assertEqual(0, core().parallel_scheduling_processes) @with_config({'core': {'parallel-scheduling-processes': '1234'}}) def test_parallel_scheduling_processes(self): from luigi.interface import core - self.assertEquals(1234, core().parallel_scheduling_processes) + self.assertEqual(1234, core().parallel_scheduling_processes) diff --git a/test/list_parameter_test.py b/test/list_parameter_test.py index 4210d2a675..5ebade9024 100644 --- a/test/list_parameter_test.py +++ b/test/list_parameter_test.py @@ -90,7 +90,7 @@ def test_schema(self): # Check that invalid lists raise correct errors invalid_list_type = ["NOT AN INT"] - invalid_list_value = [-999, 999] + invalid_list_value = [-999, 4] with pytest.raises(ValidationError, match="'NOT AN INT' is not of type 'number'"): a.normalize(invalid_list_type) diff --git a/test/optional_parameter_test.py b/test/optional_parameter_test.py index 2bbea15505..97559c2ae6 100644 --- a/test/optional_parameter_test.py +++ b/test/optional_parameter_test.py @@ -21,6 +21,10 @@ def run(self): # Test parsing empty string (should be None) self.assertIsNone(cls(**kwargs).parse('')) + # Test next_in_enumeration always returns None for summary + self.assertIsNone(TestConfig.param.next_in_enumeration(expected_value)) + self.assertIsNone(TestConfig.param.next_in_enumeration(None)) + # Test that warning is raised only with bad type with mock.patch('luigi.parameter.warnings') as warnings: TestConfig() diff --git a/test/task_test.py b/test/task_test.py index e2da34796c..926106ea54 100644 --- a/test/task_test.py +++ b/test/task_test.py @@ -577,4 +577,4 @@ def __init_subclass__(cls, x, **kwargs): class Receiver(ReceivesClassKwargs, x=1): pass - self.assertEquals(Receiver.x, 1) + self.assertEqual(Receiver.x, 1) diff --git a/test/worker_task_test.py b/test/worker_task_test.py index af51defe6c..d58f740ae3 100644 --- a/test/worker_task_test.py +++ b/test/worker_task_test.py @@ -26,6 +26,7 @@ import luigi import luigi.date_interval import luigi.notifications +from luigi.mock import MockTarget from luigi.worker import TaskException, TaskProcess from luigi.scheduler import DONE, FAILED @@ -106,6 +107,42 @@ def complete(self): None )) + def test_fail_on_unfulfilled_dependencies(self): + class NeverCompleteTask(luigi.Task): + def complete(self): + return False + + class A(NeverCompleteTask): + def output(self): + return [] + + class B(NeverCompleteTask): + def output(self): + return MockTarget("foo-B") + + class C(NeverCompleteTask): + def output(self): + return [MockTarget("foo-C1"), MockTarget("foo-C2")] + + class Main(NeverCompleteTask): + def requires(self): + return [A(), B(), C()] + + task = Main() + result_queue = multiprocessing.Queue() + task_process = TaskProcess(task, 1, result_queue, mock.Mock()) + + with mock.patch.object(result_queue, 'put') as mock_put: + task_process.run() + expected_missing = [A().task_id, f"{B().task_id} (foo-B)", f"{C().task_id} (foo-C1, foo-C2)"] + mock_put.assert_called_once_with(( + task.task_id, + FAILED, + StringContaining(f"Unfulfilled dependencies at run time: {', '.join(expected_missing)}"), + expected_missing, + [], + )) + def test_cleanup_children_on_terminate(self): """ Subprocesses spawned by tasks should be terminated on terminate diff --git a/tox.ini b/tox.ini index 4208e98eca..7b0946c965 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py{35,36,37,38,39,310,311}-{cdh,hdp,core,contrib,apache,aws,gcloud,postgres,unixsocket,azureblob,dropbox}, visualiser, docs, flake8 +envlist = py{35,36,37,38,39,310,311,312}-{cdh,hdp,core,contrib,apache,aws,gcloud,postgres,unixsocket,azureblob,dropbox}, visualiser, docs, flake8 skipsdist = True [pytest] @@ -26,7 +26,7 @@ deps = pytest<7.0 pytest-cov>=2.0,<3.0 mock<2.0 - moto>=1.3.10 + moto>=1.3.10,<5.0 HTTPretty==0.8.10 docker>=2.1.0 boto>=2.42,<3.0 @@ -48,7 +48,7 @@ deps = google-compute-engine coverage>=5.0,<6 codecov>=1.4.0 - requests>=2.20.0,<3.0 + requests>=2.20.0,<=2.31.0 unixsocket: requests-unixsocket<1.0 pygments hypothesis>=6.7.0,<7.0.0 @@ -56,9 +56,9 @@ deps = pymongo==3.4.0 toml<2.0.0 responses<1.0.0 - azure-storage<=0.36 + azure-storage-blob<=12.20.0 datadog==0.22.0 - prometheus-client>=0.5.0<0.15 + prometheus-client>=0.5.0,<0.15 dropbox: dropbox>=11.0.0 jsonschema passenv = @@ -75,6 +75,9 @@ setenv = AWS_DEFAULT_REGION=us-east-1 AWS_ACCESS_KEY_ID=accesskey AWS_SECRET_ACCESS_KEY=secretkey + AZURITE_ACCOUNT_NAME=devstoreaccount1 + AZURITE_ACCOUNT_KEY=YXp1cml0ZQ== + AZURITE_CUSTOM_DOMAIN=localhost:10000 commands = cdh,hdp: {toxinidir}/scripts/ci/setup_hadoop_env.sh azureblob: {toxinidir}/scripts/ci/install_start_azurite.sh {toxinidir}/scripts/ci @@ -137,7 +140,7 @@ deps = jinja2==3.0.3 Sphinx>=1.4.4,<1.5 sphinx_rtd_theme - azure-storage<=0.36 + azure-storage-blob<=12.20.0 prometheus-client==0.5.0 alabaster<0.7.13 commands =