Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
Aetf committed Jan 13, 2020
2 parents bb0cfd5 + 01a36d6 commit 1ff67a1
Show file tree
Hide file tree
Showing 138 changed files with 3,486 additions and 165 deletions.
4 changes: 0 additions & 4 deletions .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,6 @@ PenaltyBreakString: 1000
PenaltyExcessCharacter: 1000
PenaltyReturnTypeOnItsOwnLine: 10000
PointerAlignment: Right
RawStringFormats:
- Delimiter: pb
Language: TextProto
BasedOnStyle: google
ReflowComments: true
SortIncludes: true
SortUsingDeclarations: true
Expand Down
2 changes: 2 additions & 0 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ stages:

variables:
DOCKER_HOST: tcp://docker:2375
# This will instruct Docker not to start over TLS.
DOCKER_TLS_CERTDIR: ""
DOCKER_DRIVER: overlay2
IMAGE_TAG: $CI_REGISTRY_IMAGE:$CI_COMMIT_REF_SLUG
LATEST_TAG: $CI_REGISTRY_IMAGE:latest
Expand Down
10 changes: 9 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Version 3.8 required from CheckCXXFeature.cmake
# Version 3.10 required from Boost 1.66.0 for imported target
cmake_minimum_required(VERSION 3.10.0)
# Version 3.13 required for target_link_options
cmake_minimum_required(VERSION 3.13.0)

project(executor VERSION 1.0.0 LANGUAGES C CXX)

Expand Down Expand Up @@ -84,6 +85,9 @@ endif(WITH_TCMALLOC)
find_package(nlohmann_json)
set_package_properties(nlohmann_json PROPERTIES TYPE OPTIONAL PURPOSE "For OpTracing logging")

set(THREADS_PREFER_PTHREAD_FLAG)
find_package(Threads)

# Bundled third party library
add_subdirectory(thirdparty)
#---------------------------------------------------------------------------------------
Expand Down Expand Up @@ -144,6 +148,10 @@ if(WITH_TIMEOUT_WARNING)
set(SALUS_ENABLE_TIMEOUT_WARNING 1)
endif(WITH_TIMEOUT_WARNING)

if(USE_TENSORFLOW)
set(SALUS_ENABLE_TENSORFLOW 1)
endif(USE_TENSORFLOW)

configure_file(src/config.h.in ${CMAKE_CURRENT_BINARY_DIR}/config.h)
include_directories(${CMAKE_CURRENT_BINARY_DIR})

Expand Down
2 changes: 1 addition & 1 deletion bc
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
#! /bin/bash
python -m benchmarks.driver "$@"
vex tfbuild python -m benchmarks.driver "$@"
2 changes: 1 addition & 1 deletion benchmarks/driver/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def parse_expname(args):


def main():
# type: (Sequence[str]) -> None
# type: () -> None
# find first argument not starting with dash
exp, argv = parse_expname(sys.argv)

Expand Down
224 changes: 212 additions & 12 deletions benchmarks/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

from .server import SalusServer
from .tfserver import TFDistServer
from .utils import Popen, execute, snake_to_pascal, str2bool
from .utils import Popen, execute, snake_to_pascal, str2bool, remove_suffix
from .utils.compatiblity import pathlib, subprocess as sp

Path = pathlib.Path
Expand All @@ -40,6 +40,11 @@
flags.DEFINE_string('tfbench_base', '../tf_benchmarks', 'Base dir of TFBenchmark based workloads')
flags.DEFINE_string('unit_base', 'tests', 'Base dir of unittest based workloads')
flags.DEFINE_string('fathom_base', '../fathom', 'Base dir of Fathom based workloads')
flags.DEFINE_string('tfweb_base', '../tfweb', 'Base dir of TFWeb based workloads')
flags.DEFINE_string('tfweb_saved_model_dir', '~/../symbiotic/peifeng/tf_cnn_benchmarks_models/saved_models',
'SavedModel dir of TFWeb based workloads')
flags.DEFINE_string('tfweb_request_body_dir', '~/../symbiotic/peifeng/tf_cnn_benchmarks_models/reqeusts',
'Predefined request body dir for TFWeb based workloads')
flags.DEFINE_boolean('no_capture', False, 'Do not capture workload outputs')


Expand Down Expand Up @@ -113,28 +118,61 @@ def __call__(self, executor, output_file):
'--num_batches={}'.format(self.wl.batch_num),
'--batch_size={}'.format(self.wl.batch_size),
]
eval_interval = self.wl.env.pop('SALUS_TFBENCH_EVAL_INTERVAL', '0.1')
eval_rand_factor = self.wl.env.pop('SALUS_TFBENCH_EVAL_RAND_FACTOR', '5')
eval_interval = self.wl.env.pop('SALUS_TFBENCH_EVAL_INTERVAL', None)
eval_rand_factor = self.wl.env.pop('SALUS_TFBENCH_EVAL_RAND_FACTOR', None)
eval_block = self.wl.env.pop('SALUS_TFBENCH_EVAL_BLOCK', 'true')

eval_model_dir = self.wl.env.pop('SALUS_TFBENCH_EVAL_MODEL_DIR', 'models')
eval_model_dir = str(Path(eval_model_dir).joinpath(remove_suffix(self.wl.name, 'eval')))

eval_saved_model_dir = self.wl.env.pop('SALUS_TFBENCH_EVAL_SAVED_MODEL_DIR', None)
if eval_saved_model_dir is not None:
eval_saved_model_dir = str(Path(eval_saved_model_dir).joinpath(remove_suffix(self.wl.name, 'eval')))

num_seconds = self.wl.env.pop('SALUS_ITER_SECONDS', None)
if num_seconds is not None:
cmd += [
'--num_seconds={}'.format(num_seconds)
]

wait_for_signal = self.wl.env.pop('SALUS_WAIT_FOR_SIGNAL', None)
if wait_for_signal is not None:
cmd += [
'--wait_for_signal={}'.format(wait_for_signal)
]

if self.wl.name.endswith('eval'):
model_name = self.wl.name.rsplit('eval')[0]
model_name = remove_suffix(self.wl.name, 'eval')
cmd += [
'--model_dir=models/{}'.format(model_name),
'--model_dir=' + eval_model_dir,
'--model={}'.format(model_name),
'--eval_interval_secs={}'.format(eval_interval),
'--eval_interval_random_factor={}'.format(eval_rand_factor),
'--eval_block={}'.format(eval_block),
'--eval'
]
if eval_interval is not None:
cmd += [
'--eval_interval_secs={}'.format(eval_interval),
]
if eval_rand_factor is not None:
cmd += [
'--eval_interval_random_factor={}'.format(eval_rand_factor),
]
if eval_saved_model_dir is not None:
cmd += [
'--saved_model_dir=' + eval_saved_model_dir
]
else:
cmd += [
'--model={}'.format(self.wl.name),
]
if str2bool(self.wl.env.pop('SALUS_SAVE_MODEL', '')):
cmd += [
'--model_dir=models/{}'.format(self.wl.name),
'--model_dir=' + eval_model_dir,
]

cmd += self.wl.extra_args
logger.info(f'Starting workload with cmd: {cmd}')

if FLAGS.no_capture:
return execute(cmd, cwd=str(cwd), env=self.env)
else:
Expand All @@ -157,6 +195,7 @@ def __call__(self, executor, output_file):
# type: (Executor, Path) -> Popen
env = self.env.copy()
env['EXEC_ITER_NUMBER'] = str(self.wl.batch_num)
env['SALUS_BATCH_SIZE'] = str(self.wl.batch_size)
if executor == Executor.TFDist:
env['SALUS_TFDIST_ENDPOINT'] = TFDistServer.current_server().endpoint

Expand All @@ -166,12 +205,16 @@ def __call__(self, executor, output_file):
'stdbuf', '-o0', '-e0', '--',
'python', '-m', pkg, method,
]
cmd += self.wl.extra_args

logger.info(f'Starting workload with cmd: {cmd}')
if FLAGS.no_capture:
return execute(cmd, cwd=str(cwd), env=self.env)
else:
output_file.parent.mkdir(exist_ok=True, parents=True)
with output_file.open('w') as f:
return execute(cmd, cwd=str(cwd), env=env, stdout=f, stderr=sp.STDOUT)
# return execute(cmd, cwd=str(cwd), env=env, stdout=f, stderr=sp.STDOUT)
return execute(cmd, cwd=str(cwd), env=env, stdout=f, stderr=None)

def _construct_test_name(self, executor):
# type: (Executor) -> Tuple[str, str]
Expand All @@ -197,6 +240,12 @@ def _construct_test_name(self, executor):
})
}

variable_batch_size_models = {'vae', 'superres', 'seq2seq', 'mnistsf', 'mnistcv', 'mnistlg'}
if remove_suffix(self.wl.name, 'eval') not in variable_batch_size_models:
if self.wl.batch_size not in self.wl.wtl.available_batch_sizes():
raise ValueError(f"Batch size `{self.wl.batch_size}' is not supported for {self.wl.name},"
f" available ones: {self.wl.wtl.available_batch_sizes()}")

if executor == Executor.Salus:
prefix = 'test_rpc_'
elif executor == Executor.TF:
Expand All @@ -209,19 +258,26 @@ def _construct_test_name(self, executor):
if self.wl.name.endswith('eval'):
prefix += 'eval_'

model_name = self.wl.name.rsplit('eval')[0]
model_name = remove_suffix(self.wl.name, 'eval')

if model_name in supported_model:
pkg, cls, names = supported_model[model_name]
else:
# fallback to guessing
pkg = f'test_tf.test_{model_name}'
cls = f'Test{snake_to_pascal(model_name)}'

# get method name
names = {
s: str(idx)
for idx, s in enumerate(self.wl.wtl.available_batch_sizes())
}
method = f'{cls}.{prefix}{names[self.wl.batch_size]}'

postfix = names.get(self.wl.batch_size, '0')
if model_name == 'seq2seq' and postfix == '0':
postfix = '2_large'

method = f'{cls}.{prefix}{postfix}'
return pkg, method


Expand All @@ -240,7 +296,7 @@ def __call__(self, executor, output_file):
cmd = [
'stdbuf', '-o0', '-e0', '--',
'python', '-m', 'fathom.cli',
'--workload', self.wl.name.rsplit('eval')[0],
'--workload', remove_suffix(self.wl.name, 'eval'),
'--action', 'test' if self.wl.name.endswith('eval') else 'train',
'--num_iters', str(self.wl.batch_num),
'--batch_size', str(self.wl.batch_size),
Expand All @@ -262,9 +318,153 @@ def __call__(self, executor, output_file):
else:
raise ValueError(f'Unknown executor: {executor}')

cmd += self.wl.extra_args
logger.info(f'Starting workload with cmd: {cmd}')

if FLAGS.no_capture:
return execute(cmd, cwd=str(cwd), env=self.env)
else:
output_file.parent.mkdir(exist_ok=True, parents=True)
with output_file.open('w') as f:
return execute(cmd, cwd=str(cwd), env=self.env, stdout=f, stderr=sp.STDOUT)


class TFWebDirectRunner(Runner):
"""Using TFWeb's load infrastructure to directly run"""

def __init__(self, wl, base_dir=None):
super().__init__(wl)
self.base_dir = base_dir
if self.base_dir is None:
self.base_dir = FLAGS.tfweb_base

def __call__(self, executor, output_file):
model_name = remove_suffix(self.wl.name, 'eval')
cwd = self.base_dir
cmd = [
'stdbuf', '-o0', '-e0', '--',
'examples/direct/client',
'--model="{}"'.format(str(Path(FLAGS.tfweb_saved_model_dir).joinpath(model_name))),
'--batch_size={}'.format(self.wl.batch_size),
'--batch_num={}'.format(self.wl.batch_num),
]

if executor == Executor.Salus:
cmd += [
'--sess_target', SalusServer.current_server().endpoint,
]
elif executor == Executor.TF:
cmd += [
'--sess_target', '""',
]
elif executor == Executor.TFDist:
cmd += [
'--sess_target', TFDistServer.current_server().endpoint,
]
else:
raise ValueError(f'Unknown executor: {executor}')
cmd += self.wl.extra_args
logger.info(f'Starting workload with cmd: {cmd}')

if FLAGS.no_capture:
return execute(cmd, cwd=str(cwd), env=self.env)
else:
output_file.parent.mkdir(exist_ok=True, parents=True)
with output_file.open('w') as f:
return execute(cmd, cwd=str(cwd), env=self.env, stdout=f, stderr=sp.STDOUT)


class TFWebRunner(Runner):
"""
Run a TFWeb based inference job
We start several servers and a balancer on the same node.
The server commandline: tfweb --model=path/to/saved_model/network --sess_target=...
The client commandline: gobetween from-file xxx.toml
"""

def __init__(self, wl, base_dir=None):
super().__init__(wl)
self.base_dir = base_dir
if self.base_dir is None:
self.base_dir = FLAGS.tfweb_base

def __call__(self, executor, output_file):
# type: (Executor, Path) -> Popen
model_name = remove_suffix(self.wl.name, 'web')
cwd = self.base_dir
cmd = [
'stdbuf', '-o0', '-e0', '--',
'examples/cluster/start_cluster',
'--model="{}"'.format(str(Path(FLAGS.tfweb_saved_model_dir).joinpath(model_name))),
]

if executor == Executor.Salus:
cmd += [
'--sess_target', SalusServer.current_server().endpoint,
]
elif executor == Executor.TF:
cmd += [
'--sess_target', '""',
]
elif executor == Executor.TFDist:
cmd += [
'--sess_target', TFDistServer.current_server().endpoint,
]
else:
raise ValueError(f'Unknown executor: {executor}')

num_replicas = self.wl.env.pop('SALUS_TFWEB_REPLICAS', '1')
cmd += [
'--num_replicas', num_replicas
]
cmd += self.wl.extra_args
logger.info(f'Starting workload with cmd: {cmd}')

if FLAGS.no_capture:
return execute(cmd, cwd=str(cwd), env=self.env)
else:
output_file.parent.mkdir(exist_ok=True, parents=True)
with output_file.open('w') as f:
return execute(cmd, cwd=str(cwd), env=self.env, stdout=f, stderr=sp.STDOUT)


class TFWebClientRunner(Runner):
"""
Run a tfweb client attacker.
Command: examples/cluster/tfweb-client TARGET REQ_BODY PLANTXT
"""

def __init__(self, wl, base_dir=None):
super().__init__(wl)
self.base_dir = base_dir
if self.base_dir is None:
self.base_dir = FLAGS.tfweb_base

def __call__(self, executor, output_file):
# type: (Executor, Path) -> Popen

model_name = remove_suffix(self.wl.name, 'client')

cwd = self.base_dir
cmd = [
'stdbuf', '-o0', '-e0', '--',
'examples/tfweb-client',
'-output', str(output_file),
self.wl.target,
# request body
str(Path(FLAGS.tfweb_request_body_dir).joinpath(model_name).with_suffix('.txt')),
# always write plan to stdin
'-',
]
cmd += self.wl.extra_args
logger.info(f'Starting workload with cmd: {cmd}')

proc = execute(cmd, cwd=str(cwd), env=self.env, stdin=sp.PIPE)
proc.stdin.write(self._plan_to_bytes())
proc.stdin.close()
return proc

def _plan_to_bytes(self):
return ' '.join(self.wl.plan).encode('utf-8')

2 changes: 2 additions & 0 deletions benchmarks/driver/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,11 @@ def _find_executable(self):
"""Find the absolute path to server executable, according to 'config.build_type'"""
candidates = [
self.config.build_dir / self.config.build_type / 'src' / 'executor',
self.config.build_dir / self.config.build_type / 'src' / 'salus-server',
self.config.build_dir / self.config.build_type / 'bin' / 'executor',
self.config.build_dir / self.config.build_type / 'bin' / 'salus-server',
self.config.build_dir / self.config.build_type.lower() / 'src' / 'executor',
self.config.build_dir / self.config.build_type.lower() / 'src' / 'salus-server',
self.config.build_dir / self.config.build_type.lower() / 'bin' / 'executor',
self.config.build_dir / self.config.build_type.lower() / 'bin' / 'salus-server',
]
Expand Down
Loading

0 comments on commit 1ff67a1

Please sign in to comment.