Skip to content

Commit

Permalink
Bump aiida-pythonjob to 0.1.4 (#373)
Browse files Browse the repository at this point in the history
* Bump aiida-pythonjob to 0.1.4
* Remove some test for pythonjob, because it is already tested in the aiida-pythonjob
  • Loading branch information
superstar54 authored Dec 3, 2024
1 parent 6da7c0a commit 3db9082
Show file tree
Hide file tree
Showing 3 changed files with 2 additions and 277 deletions.
2 changes: 1 addition & 1 deletion aiida_workgraph/engine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def prepare_for_python_task(task: dict, kwargs: dict, var_kwargs: dict) -> dict:
function_outputs.append(output)

inputs = prepare_pythonjob_inputs(
pickled_function=executor,
function_data=executor,
function_inputs=function_inputs,
function_outputs=function_outputs,
code=code,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ dependencies = [
"aiida-core>=2.3",
"cloudpickle",
"aiida-shell~=0.8",
"aiida-pythonjob==0.1.3",
"aiida-pythonjob==0.1.4",
"fastapi",
"uvicorn",
"pydantic_settings",
Expand Down
275 changes: 0 additions & 275 deletions tests/test_pythonjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,6 @@ def multiply(x: Any, y: Any) -> Any:
assert wg.tasks["add1"].node.label == "add1"


def test_importable_function(fixture_localhost, python_executable_path):
"""Test importable function."""
from aiida_workgraph.executors.test import add_pythonjob

wg = WorkGraph("test_importable_function")
wg.add_task(
add_pythonjob,
name="add",
x=1,
y=2,
computer="localhost",
command_info={"label": python_executable_path},
)
wg.run()
assert wg.tasks["add"].outputs["result"].value.value == 3


def test_PythonJob_kwargs(fixture_localhost, python_executable_path):
"""Test function with kwargs."""

Expand Down Expand Up @@ -95,113 +78,6 @@ def add(x, y=1, **kwargs):
assert wg.tasks["add"].inputs["kwargs"].value == {"m": 2, "n": 3}


def test_PythonJob_typing():
"""Test function with typing."""
from numpy import array
from ase import Atoms
from aiida_workgraph.orm.function_data import PickledFunction
from typing import List

def generate_structures(
structures: List[Atoms],
strain_lst: list,
data: array,
strain_lst1: list = None,
data1: array = None,
structure1: Atoms = None,
) -> list[Atoms]:
pass

def generate_structures_2(
structure1: Atoms,
strain_lst1: list = None,
data1: str = "",
) -> list[Atoms]:
pass

modules = PickledFunction.get_required_imports(generate_structures)
assert modules == {
"ase.atoms": {"Atoms"},
"typing": {"List"},
"builtins": {"list"},
"numpy": {"array"},
}
modules = PickledFunction.get_required_imports(generate_structures_2)
assert modules == {"ase.atoms": {"Atoms"}, "builtins": {"list", "str"}}


def test_PythonJob_outputs(fixture_localhost, python_executable_path):
"""Test function with multiple outputs."""

@task(
outputs=[
{"identifier": "workgraph.any", "name": "sum"},
{"identifier": "workgraph.any", "name": "diff"},
]
)
def add(x, y):
return {"sum": x + y, "diff": x - y}

wg = WorkGraph("test_PythonJob_outputs")
wg.add_task(
"PythonJob",
function=add,
name="add",
x=1,
y=2,
# code=code,
computer="localhost",
command_info={"label": python_executable_path},
)
wg.run()
assert wg.tasks["add"].outputs["sum"].value.value == 3
assert wg.tasks["add"].outputs["diff"].value.value == -1


@pytest.mark.usefixtures("started_daemon_client")
def test_PythonJob_namespace_output(fixture_localhost, python_executable_path):
"""Test function with namespace output and input."""

# output namespace
@task(
outputs=[
{
"name": "add_multiply",
"identifier": "workgraph.namespace",
},
{
"name": "add_multiply.add",
"identifier": "workgraph.namespace",
},
{"name": "minus"},
]
)
def myfunc(x, y):
add = {"order1": x + y, "order2": x * x + y * y}
return {
"add_multiply": {"add": add, "multiply": x * y},
"minus": x - y,
}

wg = WorkGraph("test_namespace_outputs")
wg.add_task("PythonJob", function=myfunc, name="myfunc")

wg.submit(
wait=True,
inputs={
"myfunc": {
"x": 1.0,
"y": 2.0,
"computer": "localhost",
"command_info": {"label": python_executable_path},
}
},
)
assert wg.tasks["myfunc"].outputs["add_multiply"].value.add.order1.value == 3
assert wg.tasks["myfunc"].outputs["add_multiply"].value.add.order2.value == 5
assert wg.tasks["myfunc"].outputs["add_multiply"].value.multiply.value == 2


def test_PythonJob_namespace_output_input(fixture_localhost, python_executable_path):
"""Test function with namespace output and input."""

Expand Down Expand Up @@ -271,96 +147,6 @@ def myfunc3(x, y):
assert wg.tasks["myfunc3"].outputs["result"].value.value == 7


def test_PythonJob_parent_folder(fixture_localhost, python_executable_path):
"""Test function with parent folder."""

def add(x, y):
z = x + y
with open("result.txt", "w") as f:
f.write(str(z))
return x + y

def multiply(x, y):
with open("parent_folder/result.txt", "r") as f:
z = int(f.read())
return x * y + z

wg = WorkGraph("test_PythonJob_parent_folder")
wg.add_task("PythonJob", function=add, name="add")
wg.add_task(
"PythonJob",
function=multiply,
name="multiply",
parent_folder=wg.tasks["add"].outputs["remote_folder"],
)

# ------------------------- Submit the calculation -------------------
wg.submit(
inputs={
"add": {
"x": 2,
"y": 3,
"computer": "localhost",
"command_info": {"label": python_executable_path},
},
"multiply": {
"x": 3,
"y": 4,
"computer": "localhost",
"command_info": {"label": python_executable_path},
},
},
wait=True,
)
assert wg.tasks["multiply"].outputs["result"].value.value == 17


def test_PythonJob_upload_files(fixture_localhost, python_executable_path):
"""Test function with upload files."""

# create a temporary file "input.txt" in the current directory
with open("input.txt", "w") as f:
f.write("2")

# create a temporary folder "inputs_folder" in the current directory
# and add a file "another_input.txt" in the folder
import os

os.makedirs("inputs_folder", exist_ok=True)
with open("inputs_folder/another_input.txt", "w") as f:
f.write("3")

def add():
with open("input.txt", "r") as f:
a = int(f.read())
with open("inputs_folder/another_input.txt", "r") as f:
b = int(f.read())
return a + b

wg = WorkGraph("test_PythonJob_upload_files")
wg.add_task("PythonJob", function=add, name="add")

# ------------------------- Submit the calculation -------------------
# we need use full path to the file
input_file = os.path.abspath("input.txt")
input_folder = os.path.abspath("inputs_folder")

wg.run(
inputs={
"add": {
"computer": "localhost",
"command_info": {"label": python_executable_path},
"upload_files": {
"input.txt": input_file,
"inputs_folder": input_folder,
},
},
},
)
# wait=True)
assert wg.tasks["add"].outputs["result"].value.value == 5


def test_PythonJob_copy_files(fixture_localhost, python_executable_path):
"""Test function with copy files."""

Expand Down Expand Up @@ -424,67 +210,6 @@ def multiply(x_folder_name, y_folder_name):
assert wg.tasks["multiply"].outputs["result"].value.value == 25


def test_PythonJob_retrieve_files(fixture_localhost, python_executable_path):
"""Test retrieve files."""

def add(x, y):
z = x + y
with open("result.txt", "w") as f:
f.write(str(z))
return x + y

wg = WorkGraph("test_PythonJob_retrieve_files")
wg.add_task("PythonJob", function=add, name="add")
# ------------------------- Submit the calculation -------------------
wg.run(
inputs={
"add": {
"x": 2,
"y": 3,
"computer": "localhost",
"command_info": {"label": python_executable_path},
"metadata": {
"options": {
"additional_retrieve_list": ["result.txt"],
}
},
},
},
)
assert (
"result.txt" in wg.tasks["add"].outputs["retrieved"].value.list_object_names()
)


def test_data_serializer(fixture_localhost, python_executable_path):
from ase import Atoms
from ase.build import bulk

@task()
def make_supercell(atoms: Atoms, dim: int) -> Atoms:
"""Scale the structure by the given scales."""
return atoms * (dim, dim, dim)

atoms = bulk("Si")

wg = WorkGraph("test_PythonJob_retrieve_files")
wg.add_task(
"PythonJob",
function=make_supercell,
atoms=atoms,
dim=2,
name="make_supercell",
computer="localhost",
command_info={"label": python_executable_path},
)
# ------------------------- Submit the calculation -------------------
wg.submit(wait=True)
assert (
wg.tasks["make_supercell"].outputs["result"].value.value.get_chemical_formula()
== "Si16"
)


def test_load_pythonjob(fixture_localhost, python_executable_path):
"""Test function with typing."""

Expand Down

0 comments on commit 3db9082

Please sign in to comment.