diff --git a/caffe2/python/core.py b/caffe2/python/core.py index 8d81d899ea35f..ad610ee91dabf 100644 --- a/caffe2/python/core.py +++ b/caffe2/python/core.py @@ -49,6 +49,18 @@ def _InitDataType(): _InitDataType() +_import_lazy_calls = [] + +def RegisterLazyImport(lazy): + global _import_lazy_calls + _import_lazy_calls += [lazy] + + +def _import_lazy(): + global _import_lazy_calls + for lazy in _import_lazy_calls: + lazy() + def _GetRegisteredOperators(): return set(workspace.RegisteredOperators()) @@ -57,7 +69,9 @@ def _GetRegisteredOperators(): _REGISTERED_OPERATORS = _GetRegisteredOperators() -def RefreshRegisteredOperators(): +def RefreshRegisteredOperators(trigger_lazy=True): + if trigger_lazy: + _import_lazy() global _REGISTERED_OPERATORS _REGISTERED_OPERATORS = _GetRegisteredOperators() @@ -66,6 +80,7 @@ def RefreshRegisteredOperators(): def GlobalInit(args): + _import_lazy() _GLOBAL_INIT_ARGS.extend(args[1:]) C.global_init(args) @@ -79,6 +94,7 @@ def IsOperator(op_type): def IsOperatorWithEngine(op_type, engine): + _import_lazy() return C.op_registry_key(op_type, engine) in _REGISTERED_OPERATORS @@ -278,6 +294,7 @@ def __getattr__(self, op_type): op_type, *args, **kwargs) def __dir__(self): + _import_lazy() additional_methods = [ op for op in _REGISTERED_OPERATORS @@ -2211,6 +2228,7 @@ def __getattr__(self, op_type): op_type, *args, **kwargs) def __dir__(self): + _import_lazy() additional_methods = [ op for op in _REGISTERED_OPERATORS diff --git a/caffe2/python/dyndep.py b/caffe2/python/dyndep.py index af203fa27b129..8bea144238758 100644 --- a/caffe2/python/dyndep.py +++ b/caffe2/python/dyndep.py @@ -11,7 +11,7 @@ from caffe2.python import core, extension_loader -def InitOpsLibrary(name): +def InitOpsLibrary(name, trigger_lazy=True): """Loads a dynamic library that contains custom operators into Caffe2. Since Caffe2 uses static variable registration, you can optionally load a @@ -32,7 +32,7 @@ def InitOpsLibrary(name): # time when an actual call is made. print('Ignoring {} as it is not a valid file.'.format(name)) return - _init_impl(name) + _init_impl(name, trigger_lazy=trigger_lazy) _IMPORTED_DYNDEPS = set() @@ -43,10 +43,10 @@ def GetImportedOpsLibraries(): return _IMPORTED_DYNDEPS -def _init_impl(path): +def _init_impl(path, trigger_lazy=True): with dll_lock: _IMPORTED_DYNDEPS.add(path) with extension_loader.DlopenGuard(): ctypes.CDLL(path) # reinitialize available ops - core.RefreshRegisteredOperators() + core.RefreshRegisteredOperators(trigger_lazy) diff --git a/caffe2/python/lazy_dyndep.py b/caffe2/python/lazy_dyndep.py new file mode 100644 index 0000000000000..baaefae9f5a95 --- /dev/null +++ b/caffe2/python/lazy_dyndep.py @@ -0,0 +1,84 @@ +## @package lazy_dyndep +# Module caffe2.python.lazy_dyndep +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +import ctypes +import os +from caffe2.python import core, dyndep + + +def RegisterOpsLibrary(name): + """Registers a dynamic library that contains custom operators into Caffe2. + + Since Caffe2 uses static variable registration, you can optionally load a + separate .so file that contains custom operators and registers that into + the caffe2 core binary. In C++, this is usually done by either declaring + dependency during compilation time, or via dynload. This allows us to do + registration similarly on the Python side. + + Unlike dyndep.InitOpsLibrary, this does not actually parse the c++ file + and refresh operators until caffe2 is called in a fashion which requires + operators. In some large codebases this saves a large amount of time + during import. + + It is safe to use within a program that also uses dyndep.InitOpsLibrary + + Args: + name: a name that ends in .so, such as "my_custom_op.so". Otherwise, + the command will simply be ignored. + Returns: + None + """ + if not os.path.exists(name): + # Note(jiayq): if the name does not exist, instead of immediately + # failing we will simply print a warning, deferring failure to the + # time when an actual call is made. + print('Ignoring {} as it is not a valid file.'.format(name)) + return + global _LAZY_IMPORTED_DYNDEPS + _LAZY_IMPORTED_DYNDEPS.add(name) + + +_LAZY_IMPORTED_DYNDEPS = set() +_error_handler = None + + +def SetErrorHandler(handler): + """Registers an error handler for errors from registering operators + + Since the lazy registration may happen at a much later time, having a dedicated + error handler allows for custom error handling logic. It is highly + recomended to set this to prevent errors from bubbling up in weird parts of the + code. + + Args: + handler: a function that takes an exception as a single handler. + Returns: + None + """ + + global _error_handler + _error_handler = handler + + +def GetImportedOpsLibraries(): + _import_lazy() + return dyndep.GetImportedOpsLibraries() + + +def _import_lazy(): + global _LAZY_IMPORTED_DYNDEPS + if not _LAZY_IMPORTED_DYNDEPS: + return + for name in _LAZY_IMPORTED_DYNDEPS: + try: + dyndep.InitOpLibrary(name, trigger_lazy=False) + except BaseException as e: + if _error_handler: + _error_handler(e) + _LAZY_IMPORTED_DYNDEPS = set() + +core.RegisterLazyImport(_import_lazy) diff --git a/caffe2/python/lazy_dyndep_test.py b/caffe2/python/lazy_dyndep_test.py new file mode 100644 index 0000000000000..1ef43103cb1cc --- /dev/null +++ b/caffe2/python/lazy_dyndep_test.py @@ -0,0 +1,98 @@ +#!/usr/bin/env python + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +from hypothesis import given +import hypothesis.strategies as st +from multiprocessing import Process + +import numpy as np +import tempfile +import shutil + +import caffe2.python.hypothesis_test_util as hu +import unittest + +op_engine = 'GLOO' + +class TemporaryDirectory: + def __enter__(self): + self.tmpdir = tempfile.mkdtemp() + return self.tmpdir + + def __exit__(self, type, value, traceback): + shutil.rmtree(self.tmpdir) + + +def allcompare_process(filestore_dir, process_id, data, num_procs): + from caffe2.python import core, data_parallel_model, workspace, lazy_dyndep + from caffe2.python.model_helper import ModelHelper + from caffe2.proto import caffe2_pb2 + lazy_dyndep.RegisterOpsLibrary("@/caffe2/caffe2/distributed:file_store_handler_ops") + + workspace.RunOperatorOnce( + core.CreateOperator( + "FileStoreHandlerCreate", [], ["store_handler"], path=filestore_dir + ) + ) + rendezvous = dict( + kv_handler="store_handler", + shard_id=process_id, + num_shards=num_procs, + engine=op_engine, + exit_nets=None + ) + + model = ModelHelper() + model._rendezvous = rendezvous + + workspace.FeedBlob("test_data", data) + + data_parallel_model._RunComparison( + model, "test_data", core.DeviceOption(caffe2_pb2.CPU, 0) + ) + + +class TestLazyDynDepAllCompare(hu.HypothesisTestCase): + @given( + d=st.integers(1, 5), n=st.integers(2, 11), num_procs=st.integers(1, 8) + ) + def test_allcompare(self, d, n, num_procs): + dims = [] + for _ in range(d): + dims.append(np.random.randint(1, high=n)) + test_data = np.random.ranf(size=tuple(dims)).astype(np.float32) + + with TemporaryDirectory() as tempdir: + processes = [] + for idx in range(num_procs): + process = Process( + target=allcompare_process, + args=(tempdir, idx, test_data, num_procs) + ) + processes.append(process) + process.start() + + while len(processes) > 0: + process = processes.pop() + process.join() + +class TestLazyDynDepError(unittest.TestCase): + def test_errorhandler(self): + from caffe2.python import core, lazy_dyndep + import tempfile + + with tempfile.NamedTemporaryFile() as f: + lazy_dyndep.RegisterOpsLibrary(f.name) + def handler(e): + raise ValueError("test") + lazy_dyndep.SetErrorHandler(handler) + with self.assertRaises(ValueError, msg="test"): + core.RefreshRegisteredOperators() + + +if __name__ == "__main__": + unittest.main()