Skip to content

Commit

Permalink
Merge branch 'main' into dano/volume-put-concurrent-hash
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnorberg authored Jan 4, 2025
2 parents 844c96f + 7169281 commit c7cfe01
Show file tree
Hide file tree
Showing 17 changed files with 104 additions and 65 deletions.
38 changes: 23 additions & 15 deletions modal/_container_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
instrument_imports(telemetry_socket)

import asyncio
import base64
import concurrent.futures
import inspect
import queue
Expand Down Expand Up @@ -337,14 +336,17 @@ def _cancel_input_signal_handler(signum, stackframe):
signal.signal(signal.SIGUSR1, usr1_handler) # reset signal handler


def get_active_app_fallback(function_def: api_pb2.Function) -> Optional[_App]:
def get_active_app_fallback(function_def: api_pb2.Function) -> _App:
# This branch is reached in the special case that the imported function/class is:
# 1) not serialized, and
# 2) isn't a FunctionHandle - i.e, not decorated at definition time
# Look at all instantiated apps - if there is only one with the indicated name, use that one
app_name: Optional[str] = function_def.app_name or None # coalesce protobuf field to None
matching_apps = _App._all_apps.get(app_name, [])
active_app = None
if len(matching_apps) == 1:
active_app: _App = matching_apps[0]
return active_app

if len(matching_apps) > 1:
if app_name is not None:
warning_sub_message = f"app with the same name ('{app_name}')"
Expand All @@ -354,12 +356,10 @@ def get_active_app_fallback(function_def: api_pb2.Function) -> Optional[_App]:
f"You have more than one {warning_sub_message}. "
"It's recommended to name all your Apps uniquely when using multiple apps"
)
elif len(matching_apps) == 1:
(active_app,) = matching_apps
# there could also technically be zero found apps, but that should probably never be an
# issue since that would mean user won't use is_inside or other function handles anyway

return active_app
# If we don't have an active app, create one on the fly
# The app object is used to carry the app layout etc
return _App()


def call_lifecycle_functions(
Expand Down Expand Up @@ -403,7 +403,7 @@ def main(container_args: api_pb2.ContainerArguments, client: Client):
# This is a bit weird but we need both the blocking and async versions of ContainerIOManager.
# At some point, we should fix that by having built-in support for running "user code"
container_io_manager = ContainerIOManager(container_args, client)
active_app: Optional[_App] = None
active_app: _App
service: Service
function_def = container_args.function_def
is_auto_snapshot: bool = function_def.is_auto_snapshot
Expand Down Expand Up @@ -450,8 +450,9 @@ def main(container_args: api_pb2.ContainerArguments, client: Client):
)

# If the cls/function decorator was applied in local scope, but the app is global, we can look it up
active_app = service.app
if active_app is None:
if service.app is not None:
active_app = service.app
else:
# if the app can't be inferred by the imported function, use name-based fallback
active_app = get_active_app_fallback(function_def)

Expand All @@ -468,9 +469,8 @@ def main(container_args: api_pb2.ContainerArguments, client: Client):

# Initialize objects on the app.
# This is basically only functions and classes - anything else is deprecated and will be unsupported soon
if active_app is not None:
app: App = synchronizer._translate_out(active_app)
app._init_container(client, container_app)
app: App = synchronizer._translate_out(active_app)
app._init_container(client, container_app)

# Hydrate all function dependencies.
# TODO(erikbern): we an remove this once we
Expand Down Expand Up @@ -581,7 +581,15 @@ def breakpoint_wrapper():
logger.debug("Container: starting")

container_args = api_pb2.ContainerArguments()
container_args.ParseFromString(base64.b64decode(sys.argv[1]))

container_arguments_path: Optional[str] = os.environ.get("MODAL_CONTAINER_ARGUMENTS_PATH")
if container_arguments_path is None:
# TODO(erikbern): this fallback is for old workers and we can remove it very soon (days)
import base64

container_args.ParseFromString(base64.b64decode(sys.argv[1]))
else:
container_args.ParseFromString(open(container_arguments_path, "rb").read())

# Note that we're creating the client in a synchronous context, but it will be running in a separate thread.
# This is good because if the function is long running then we the client can still send heartbeats
Expand Down
7 changes: 1 addition & 6 deletions modal/_runtime/container_io_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,12 +457,7 @@ async def get_app_objects(self, app_layout: api_pb2.AppLayout) -> RunningApp:
resp = await retry_transient_errors(self._client.stub.AppGetLayout, req)
app_layout = resp.app_layout

return running_app_from_layout(
self.app_id,
app_layout,
self._client,
environment_name=self._environment_name,
)
return running_app_from_layout(self.app_id, app_layout)

async def get_serialized_function(self) -> tuple[Optional[Any], Optional[Callable[..., Any]]]:
# Fetch the serialized function definition
Expand Down
18 changes: 10 additions & 8 deletions modal/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def foo():
"""

_all_apps: ClassVar[dict[Optional[str], list["_App"]]] = {}
_container_app: ClassVar[Optional[RunningApp]] = None
_container_app: ClassVar[Optional["_App"]] = None

_name: Optional[str]
_description: Optional[str]
Expand Down Expand Up @@ -294,12 +294,7 @@ async def lookup(
app = _App(name)
app._app_id = response.app_id
app._client = client
app._running_app = RunningApp(
response.app_id,
client=client,
environment_name=environment_name,
interactive=False,
)
app._running_app = RunningApp(response.app_id, interactive=False)
return app

def set_description(self, description: str):
Expand Down Expand Up @@ -488,7 +483,7 @@ def _init_container(self, client: _Client, running_app: RunningApp):
self._running_app = running_app
self._client = client

_App._container_app = running_app
_App._container_app = self

# Hydrate function objects
for tag, object_id in running_app.function_ids.items():
Expand Down Expand Up @@ -1047,6 +1042,13 @@ async def _logs(self, client: Optional[_Client] = None) -> AsyncGenerator[str, N
if log.data:
yield log.data

@classmethod
def _get_container_app(cls) -> Optional["_App"]:
"""Returns the `App` running inside a container.
This will return `None` outside of a Modal container."""
return cls._container_app

@classmethod
def _reset_container_app(cls):
"""Only used for tests."""
Expand Down
3 changes: 3 additions & 0 deletions modal/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ def clustered(size: int, broadcast: bool = True):

assert broadcast, "broadcast=False has not been implemented yet!"

if size <= 0:
raise ValueError("cluster size must be greater than 0")

def wrapper(raw_f: Callable[..., Any]) -> _PartialFunction:
if isinstance(raw_f, _Function):
raw_f = raw_f.get_raw_f()
Expand Down
3 changes: 0 additions & 3 deletions modal/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ async def _init_local_app_existing(client: _Client, existing_app_id: str, enviro
return running_app_from_layout(
existing_app_id,
obj_resp.app_layout,
client,
app_page_url=app_page_url,
)

Expand All @@ -89,10 +88,8 @@ async def _init_local_app_new(
logger.debug(f"Created new app with id {app_resp.app_id}")
return RunningApp(
app_resp.app_id,
client=client,
app_page_url=app_resp.app_page_url,
app_logs_url=app_resp.app_logs_url,
environment_name=environment_name,
interactive=interactive,
)

Expand Down
8 changes: 0 additions & 8 deletions modal/running_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,10 @@
from modal._utils.grpc_utils import get_proto_oneof
from modal_proto import api_pb2

from .client import _Client


@dataclass
class RunningApp:
app_id: str
client: _Client
environment_name: Optional[str] = None
app_page_url: Optional[str] = None
app_logs_url: Optional[str] = None
function_ids: dict[str, str] = field(default_factory=dict)
Expand All @@ -26,8 +22,6 @@ class RunningApp:
def running_app_from_layout(
app_id: str,
app_layout: api_pb2.AppLayout,
client: _Client,
environment_name: Optional[str] = None,
app_page_url: Optional[str] = None,
) -> RunningApp:
object_handle_metadata = {}
Expand All @@ -37,8 +31,6 @@ def running_app_from_layout(

return RunningApp(
app_id,
client,
environment_name=environment_name,
function_ids=dict(app_layout.function_ids),
class_ids=dict(app_layout.class_ids),
object_handle_metadata=object_handle_metadata,
Expand Down
6 changes: 3 additions & 3 deletions modal/sandbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,9 @@ async def create(

app_id = app.app_id
app_client = app._client
elif _App._container_app is not None:
app_id = _App._container_app.app_id
app_client = _App._container_app.client
elif (container_app := _App._get_container_app()) is not None:
app_id = container_app.app_id
app_client = container_app._client
else:
arglist = ", ".join(repr(s) for s in entrypoint_args)
deprecation_error(
Expand Down
3 changes: 3 additions & 0 deletions modal_proto/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1479,6 +1479,8 @@ message FunctionGetOutputsRequest {
string last_entry_id = 6;
bool clear_on_success = 7; // expires *any* remaining outputs soon after this call, not just the returned ones
double requested_at = 8; // Used for waypoints.
// The jwts the client expects the server to be processing. This is optional and used for sync inputs only.
repeated string input_jwts = 9;
}

message FunctionGetOutputsResponse {
Expand Down Expand Up @@ -1678,6 +1680,7 @@ message GenericResult { // Used for both tasks and function outputs
// Used when the user's function fails to initialize (ex. S3 mount failed due to invalid credentials).
// Terminates the function and all remaining inputs.
GENERIC_STATUS_INIT_FAILURE = 5;
GENERIC_STATUS_INTERNAL_FAILURE = 6;
}

GenericStatus status = 1; // Status of the task or function output.
Expand Down
2 changes: 1 addition & 1 deletion modal_version/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
major_number = 0

# Bump this manually on breaking changes, then reset the number in _version_generated.py
minor_number = 70
minor_number = 71

# Right now, automatically increment the patch number in CI
__version__ = f"{major_number}.{minor_number}.{max(build_number, 0)}"
4 changes: 2 additions & 2 deletions modal_version/_version_generated.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright Modal Labs 2024
# Copyright Modal Labs 2025

# Note: Reset this value to -1 whenever you make a minor `0.X` release of the client.
build_number = 3 # git: c164251
build_number = 0 # git: b06252b
2 changes: 1 addition & 1 deletion test/cls_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ def test_rehydrate(client, servicer, reset_container_app):
app_id = deploy_app(app, "my-cls-app", client=client).app_id

# Initialize a container
container_app = RunningApp(app_id, client)
container_app = RunningApp(app_id)

# Associate app with app
app._init_container(client, container_app)
Expand Down
4 changes: 1 addition & 3 deletions test/container_app_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ async def test_container_function_lazily_imported(container_client):
object_handle_metadata: dict[str, Message] = {
"fu-123": api_pb2.FunctionHandleMetadata(),
}
container_app = RunningApp(
"ap-123", container_client, function_ids=function_ids, object_handle_metadata=object_handle_metadata
)
container_app = RunningApp("ap-123", function_ids=function_ids, object_handle_metadata=object_handle_metadata)
app = App()

# This is normally done in _container_entrypoint
Expand Down
Loading

0 comments on commit c7cfe01

Please sign in to comment.