Skip to content

Commit

Permalink
Create lazy_dyndeps to avoid caffe2 import costs. (pytorch#39488)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: pytorch#39488

Currently caffe2.InitOpLibrary does the dll import uniliaterally. Instead if we make a lazy version and use it, then many pieces of code which do not need the caffe2urrenoperators get a lot faster.

One a real test, the import time went from 140s to 68s. 8s.

This also cleans up the algorithm slightly (although it makes a very minimal
difference), by parsing the list of operators once, rather than every time a
new operator is added, since we defer the RefreshCall until after we've
imported all the operators.

The key way we maintain safety, is that as soon as someone does an operation
which requires a operator (or could), we force importing of all available
operators.

Future work could include trying to identify which code is needed for which
operator and only import the needed ones. There may also be wins available by
playing with dlmopen (which opens within a namespace), or seeing if the dl
flags have an impact (I tried this and didn't see an impact, but dlmopen may
make it better).

Test Plan:
I added a new test a lazy_dyndep_test.py (copied from all_compare_test.py).
I'm a little concerned that I don't see any explicit tests for dyndep, but this
should provide decent coverage.

Differential Revision: D21870844

fbshipit-source-id: 3f65fedb65bb48663670349cee5e1d3e22d560ed
  • Loading branch information
c00w authored and facebook-github-bot committed Jul 9, 2020
1 parent f69d6a7 commit 07fd5f8
Show file tree
Hide file tree
Showing 4 changed files with 205 additions and 5 deletions.
20 changes: 19 additions & 1 deletion caffe2/python/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,18 @@ def _InitDataType():

_InitDataType()

_import_lazy_calls = []

def RegisterLazyImport(lazy):
global _import_lazy_calls
_import_lazy_calls += [lazy]


def _import_lazy():
global _import_lazy_calls
for lazy in _import_lazy_calls:
lazy()


def _GetRegisteredOperators():
return set(workspace.RegisteredOperators())
Expand All @@ -57,7 +69,9 @@ def _GetRegisteredOperators():
_REGISTERED_OPERATORS = _GetRegisteredOperators()


def RefreshRegisteredOperators():
def RefreshRegisteredOperators(trigger_lazy=True):
if trigger_lazy:
_import_lazy()
global _REGISTERED_OPERATORS
_REGISTERED_OPERATORS = _GetRegisteredOperators()

Expand All @@ -66,6 +80,7 @@ def RefreshRegisteredOperators():


def GlobalInit(args):
_import_lazy()
_GLOBAL_INIT_ARGS.extend(args[1:])
C.global_init(args)

Expand All @@ -79,6 +94,7 @@ def IsOperator(op_type):


def IsOperatorWithEngine(op_type, engine):
_import_lazy()
return C.op_registry_key(op_type, engine) in _REGISTERED_OPERATORS


Expand Down Expand Up @@ -278,6 +294,7 @@ def __getattr__(self, op_type):
op_type, *args, **kwargs)

def __dir__(self):
_import_lazy()
additional_methods = [
op
for op in _REGISTERED_OPERATORS
Expand Down Expand Up @@ -2211,6 +2228,7 @@ def __getattr__(self, op_type):
op_type, *args, **kwargs)

def __dir__(self):
_import_lazy()
additional_methods = [
op
for op in _REGISTERED_OPERATORS
Expand Down
8 changes: 4 additions & 4 deletions caffe2/python/dyndep.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from caffe2.python import core, extension_loader


def InitOpsLibrary(name):
def InitOpsLibrary(name, trigger_lazy=True):
"""Loads a dynamic library that contains custom operators into Caffe2.
Since Caffe2 uses static variable registration, you can optionally load a
Expand All @@ -32,7 +32,7 @@ def InitOpsLibrary(name):
# time when an actual call is made.
print('Ignoring {} as it is not a valid file.'.format(name))
return
_init_impl(name)
_init_impl(name, trigger_lazy=trigger_lazy)


_IMPORTED_DYNDEPS = set()
Expand All @@ -43,10 +43,10 @@ def GetImportedOpsLibraries():
return _IMPORTED_DYNDEPS


def _init_impl(path):
def _init_impl(path, trigger_lazy=True):
with dll_lock:
_IMPORTED_DYNDEPS.add(path)
with extension_loader.DlopenGuard():
ctypes.CDLL(path)
# reinitialize available ops
core.RefreshRegisteredOperators()
core.RefreshRegisteredOperators(trigger_lazy)
84 changes: 84 additions & 0 deletions caffe2/python/lazy_dyndep.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
## @package lazy_dyndep
# Module caffe2.python.lazy_dyndep
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import ctypes
import os
from caffe2.python import core, dyndep


def RegisterOpsLibrary(name):
"""Registers a dynamic library that contains custom operators into Caffe2.
Since Caffe2 uses static variable registration, you can optionally load a
separate .so file that contains custom operators and registers that into
the caffe2 core binary. In C++, this is usually done by either declaring
dependency during compilation time, or via dynload. This allows us to do
registration similarly on the Python side.
Unlike dyndep.InitOpsLibrary, this does not actually parse the c++ file
and refresh operators until caffe2 is called in a fashion which requires
operators. In some large codebases this saves a large amount of time
during import.
It is safe to use within a program that also uses dyndep.InitOpsLibrary
Args:
name: a name that ends in .so, such as "my_custom_op.so". Otherwise,
the command will simply be ignored.
Returns:
None
"""
if not os.path.exists(name):
# Note(jiayq): if the name does not exist, instead of immediately
# failing we will simply print a warning, deferring failure to the
# time when an actual call is made.
print('Ignoring {} as it is not a valid file.'.format(name))
return
global _LAZY_IMPORTED_DYNDEPS
_LAZY_IMPORTED_DYNDEPS.add(name)


_LAZY_IMPORTED_DYNDEPS = set()
_error_handler = None


def SetErrorHandler(handler):
"""Registers an error handler for errors from registering operators
Since the lazy registration may happen at a much later time, having a dedicated
error handler allows for custom error handling logic. It is highly
recomended to set this to prevent errors from bubbling up in weird parts of the
code.
Args:
handler: a function that takes an exception as a single handler.
Returns:
None
"""

global _error_handler
_error_handler = handler


def GetImportedOpsLibraries():
_import_lazy()
return dyndep.GetImportedOpsLibraries()


def _import_lazy():
global _LAZY_IMPORTED_DYNDEPS
if not _LAZY_IMPORTED_DYNDEPS:
return
for name in _LAZY_IMPORTED_DYNDEPS:
try:
dyndep.InitOpLibrary(name, trigger_lazy=False)
except BaseException as e:
if _error_handler:
_error_handler(e)
_LAZY_IMPORTED_DYNDEPS = set()

core.RegisterLazyImport(_import_lazy)
98 changes: 98 additions & 0 deletions caffe2/python/lazy_dyndep_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#!/usr/bin/env python

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

from hypothesis import given
import hypothesis.strategies as st
from multiprocessing import Process

import numpy as np
import tempfile
import shutil

import caffe2.python.hypothesis_test_util as hu
import unittest

op_engine = 'GLOO'

class TemporaryDirectory:
def __enter__(self):
self.tmpdir = tempfile.mkdtemp()
return self.tmpdir

def __exit__(self, type, value, traceback):
shutil.rmtree(self.tmpdir)


def allcompare_process(filestore_dir, process_id, data, num_procs):
from caffe2.python import core, data_parallel_model, workspace, lazy_dyndep
from caffe2.python.model_helper import ModelHelper
from caffe2.proto import caffe2_pb2
lazy_dyndep.RegisterOpsLibrary("@/caffe2/caffe2/distributed:file_store_handler_ops")

workspace.RunOperatorOnce(
core.CreateOperator(
"FileStoreHandlerCreate", [], ["store_handler"], path=filestore_dir
)
)
rendezvous = dict(
kv_handler="store_handler",
shard_id=process_id,
num_shards=num_procs,
engine=op_engine,
exit_nets=None
)

model = ModelHelper()
model._rendezvous = rendezvous

workspace.FeedBlob("test_data", data)

data_parallel_model._RunComparison(
model, "test_data", core.DeviceOption(caffe2_pb2.CPU, 0)
)


class TestLazyDynDepAllCompare(hu.HypothesisTestCase):
@given(
d=st.integers(1, 5), n=st.integers(2, 11), num_procs=st.integers(1, 8)
)
def test_allcompare(self, d, n, num_procs):
dims = []
for _ in range(d):
dims.append(np.random.randint(1, high=n))
test_data = np.random.ranf(size=tuple(dims)).astype(np.float32)

with TemporaryDirectory() as tempdir:
processes = []
for idx in range(num_procs):
process = Process(
target=allcompare_process,
args=(tempdir, idx, test_data, num_procs)
)
processes.append(process)
process.start()

while len(processes) > 0:
process = processes.pop()
process.join()

class TestLazyDynDepError(unittest.TestCase):
def test_errorhandler(self):
from caffe2.python import core, lazy_dyndep
import tempfile

with tempfile.NamedTemporaryFile() as f:
lazy_dyndep.RegisterOpsLibrary(f.name)
def handler(e):
raise ValueError("test")
lazy_dyndep.SetErrorHandler(handler)
with self.assertRaises(ValueError, msg="test"):
core.RefreshRegisteredOperators()


if __name__ == "__main__":
unittest.main()

0 comments on commit 07fd5f8

Please sign in to comment.