Skip to content

Commit

Permalink
Sarkars/backend create tensor (#370)
Browse files Browse the repository at this point in the history
  • Loading branch information
sayantan-nervana authored Dec 12, 2019
1 parent 1bb6517 commit 16f8811
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 10 deletions.
44 changes: 39 additions & 5 deletions ngraph_bridge/ngraph_encapsulate_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,6 @@ Status NGraphEncapsulateImpl::AllocateNGOutputTensors(
current_dst_ptr, last_dst_ptr, last_ng_tensor, true, ng_exec,
op_backend, ng_element_type, ng_shape,
m_executable_can_create_tensor ? out_group_from_pipeline[i] : nullptr);

current_ng_tensor->set_stale(true);
output_caches[i] = std::make_pair(current_dst_ptr, current_ng_tensor);
ng_outputs.push_back(current_ng_tensor);
Expand All @@ -416,18 +415,21 @@ std::shared_ptr<ng::runtime::Tensor> NGraphEncapsulateImpl::GetCurrentNgTensor(
// NOTE: we assume that TF's pointers WILL change if it actually changes
// values. ie, it will not reuse the same space if its rewritten it
bool tf_tensor_has_changed = current_tf_ptr != last_tf_ptr;
NGRAPH_VLOG(5) << "tf_tensor_has_changed: " << tf_tensor_has_changed;
bool no_ng_tensor_found = last_ng_tensor == nullptr;
bool is_cpu = m_op_backend_name == "CPU";
// m_op_backend_name might be BE:0, check if it starts with BE
bool is_cpu_or_nnpi = (m_op_backend_name.find("CPU") == 0) ||
(m_op_backend_name.find("NNPI") == 0);

// We need to check last_ng_tensor != nullptr, since there are cases where
// at the first call to the ng_exec, both current_dst_ptr (when the
// output is a 0-sized tensor) and last_dst_ptr (uninitialized at the
// first call) are nullptr
// A new tensor needs to be created for sure if no_ng_tensor_found
// Additionally for CPU, it needs to be created if tf_tensor_has_changed,
// Additionally for CPU/NNPI, it needs to be created if tf_tensor_has_changed,
// for others, we do not create
bool need_new_tensor_creation;
if (is_cpu) {
if (is_cpu_or_nnpi) {
need_new_tensor_creation = no_ng_tensor_found || tf_tensor_has_changed;
} else {
need_new_tensor_creation = no_ng_tensor_found;
Expand All @@ -449,7 +451,9 @@ std::shared_ptr<ng::runtime::Tensor> NGraphEncapsulateImpl::GetCurrentNgTensor(
current_ng_tensor = tensor_from_pipeline;
} else {
if (need_new_tensor_creation) {
if (is_cpu) {
if (is_cpu_or_nnpi) {
NGRAPH_VLOG(5) << "Backend creating tensor with pointer: "
<< current_tf_ptr;
current_ng_tensor = op_backend->create_tensor(ng_element_type, ng_shape,
current_tf_ptr);
} else {
Expand Down Expand Up @@ -576,6 +580,36 @@ void NGraphEncapsulateImpl::DumpNgFunction(
StringToFile(file_name, m_serialized_ng_function_map[ng_exec]);
}

Status NGraphEncapsulateImpl::GetPersistentTFOutputTensor(
std::shared_ptr<ngraph::runtime::Executable> exec,
std::vector<tensorflow::PersistentTensor>& tf_output_tensors) {
auto itr = m_out_persistents.find(exec);
if (itr == m_out_persistents.end()) {
return errors::Internal(
"Expected persistent tensor to be present in cache");
} else {
tf_output_tensors = itr->second;
}
return Status::OK();
}

bool NGraphEncapsulateImpl::PersistentOutputsExist(
std::shared_ptr<ngraph::runtime::Executable> exec) {
return m_out_persistents.find(exec) != m_out_persistents.end();
}

Status NGraphEncapsulateImpl::RegisterPersistentOutputTensors(
std::shared_ptr<ngraph::runtime::Executable> exec,
std::vector<tensorflow::PersistentTensor> persistent_tensors) {
auto itr = m_out_persistents.find(exec);
if (itr != m_out_persistents.end()) {
return errors::Internal(
"Found an entry already exists in the cache for persistent tensors");
}
m_out_persistents.emplace(exec, persistent_tensors);
return Status::OK();
}

} // namespace ngraph_bridge

} // namespace tensorflow
19 changes: 19 additions & 0 deletions ngraph_bridge/ngraph_encapsulate_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,18 @@ class NGraphEncapsulateImpl {
m_serialized_ng_function_map.clear();
}

void ClearNgExecPersistentOutputCache() { m_out_persistents.clear(); }

Status GetPersistentTFOutputTensor(
std::shared_ptr<ngraph::runtime::Executable>,
std::vector<tensorflow::PersistentTensor>&);

bool PersistentOutputsExist(std::shared_ptr<ngraph::runtime::Executable>);

Status RegisterPersistentOutputTensors(
std::shared_ptr<ngraph::runtime::Executable>,
std::vector<tensorflow::PersistentTensor>);

NGraphFreshnessTracker* GetNgraphFreshnessTracker() {
return m_freshness_tracker;
}
Expand Down Expand Up @@ -249,6 +261,13 @@ class NGraphEncapsulateImpl {
m_executable_pipelined_tensors_map;

int m_depth{2}; // TODO make this settable

// each executable (which comes from a new shape) corresponds to a vector of
// output tensors
// TODO: Should the vector store PersistentTensor or PersistentTensor* ?
std::unordered_map<std::shared_ptr<ngraph::runtime::Executable>,
std::vector<tensorflow::PersistentTensor>>
m_out_persistents;
};

} // namespace ngraph_bridge
Expand Down
60 changes: 55 additions & 5 deletions ngraph_bridge/ngraph_encapsulate_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,22 @@ NGraphEncapsulateOp::NGraphEncapsulateOp(OpKernelConstruction* ctx)
BackendManager::SetConfig(ng_encap_impl.GetOpBackend(),
additional_attribute_map);

ng_encap_impl.SetExecCanCreateTensor(
// For NNPI (even though executable can create tensor) use backend to create
// tensor
// Keep the executable_can_create_tensors check before the
// backend_name!="NNPI"
bool executable_create_tensor =
BackendManager::GetBackend(ng_encap_impl.GetOpBackend())
->executable_can_create_tensors());
->executable_can_create_tensors() &&
(backend_name != "NNPI");
ng_encap_impl.SetExecCanCreateTensor(executable_create_tensor);
NGRAPH_VLOG(5) << "Executable can "
<< (ng_encap_impl.GetExecCanCreateTensor() ? "" : "not")
<< " create tensors";

const char* not_persistent_flag = std::getenv("NGRAPH_TF_DISABLE_PERSISTENT");
m_use_persistent = (not_persistent_flag == nullptr);

event.Stop();
ngraph::Event::write_trace(event);
}
Expand Down Expand Up @@ -262,6 +271,7 @@ NGraphEncapsulateOp::~NGraphEncapsulateOp() {
ng_encap_impl.ClearNgExecMap();
ng_encap_impl.ClearNgExecPipelinedTensorMap();
ng_encap_impl.ClearNgExecSerializedFunctionCache();
ng_encap_impl.ClearNgExecPersistentOutputCache();

// Release the backend
NGRAPH_VLOG(2) << "~NGraphEncapsulateOp():: ReleaseBackend";
Expand Down Expand Up @@ -345,9 +355,20 @@ void NGraphEncapsulateOp::Compute(OpKernelContext* ctx) {
// Allocate tensors for the output results.
vector<shared_ptr<ng::runtime::Tensor>> ng_outputs;
std::vector<Tensor*> tf_output_tensors;
std::vector<tensorflow::PersistentTensor> cached_persistent_output_tensors(
ng_exec->get_results().size());
bool present_in_cache = false;

{
NG_TRACE("NGTF_Output_Alloc", "");
if (m_use_persistent) {
present_in_cache = ng_encap_impl.PersistentOutputsExist(ng_exec);
if (present_in_cache) {
OP_REQUIRES_OK(ctx, ng_encap_impl.GetPersistentTFOutputTensor(
ng_exec, cached_persistent_output_tensors));
}
}

for (auto i = 0; i < ng_exec->get_results().size(); i++) {
auto ng_element = ng_exec->get_results()[i];
auto ng_shape = ng_element->get_shape();
Expand All @@ -360,21 +381,40 @@ void NGraphEncapsulateOp::Compute(OpKernelContext* ctx) {
}
TensorShape tf_shape(dims);
Tensor* output_tensor = nullptr;
OP_REQUIRES_OK(ctx, ctx->allocate_output(i, tf_shape, &output_tensor));
tf_output_tensors.push_back(output_tensor);

// Make sure the nGraph-inferred element type agrees with what TensorFlow
// expected.
ng::element::Type expected_elem_type;
// TODO, we only need to do these checks once when the exec was
// created/compiled, not again and again

OP_REQUIRES_OK(
ctx, TFDataTypeToNGraphElementType(ctx->expected_output_dtype(i),
&expected_elem_type));
OP_REQUIRES(
ctx, ng_element_type == expected_elem_type,
errors::Internal("Element type inferred by nGraph does not match "
"the element type expected by TensorFlow"));
}

if (m_use_persistent) {
if (present_in_cache) {
output_tensor = cached_persistent_output_tensors[i].AccessTensor(ctx);
} else {
// create a persistent tensor
OP_REQUIRES_OK(
ctx, ctx->allocate_persistent(
ctx->expected_output_dtype(i), tf_shape,
&cached_persistent_output_tensors[i], &output_tensor));
}
} else {
OP_REQUIRES_OK(ctx, ctx->allocate_output(i, tf_shape, &output_tensor));
}
tf_output_tensors.push_back(output_tensor);
}
if (m_use_persistent && !present_in_cache) {
OP_REQUIRES_OK(ctx, ng_encap_impl.RegisterPersistentOutputTensors(
ng_exec, cached_persistent_output_tensors));
}
OP_REQUIRES_OK(ctx, ng_encap_impl.AllocateNGOutputTensors(
tf_output_tensors, ng_exec, out_group_from_pipeline,
op_backend, ng_outputs));
Expand Down Expand Up @@ -611,6 +651,16 @@ void NGraphEncapsulateOp::Compute(OpKernelContext* ctx) {
exp.what(), "\n"));
}
}

if (m_use_persistent) {
for (int out_idx = 0; out_idx < ng_exec->get_results().size(); out_idx++) {
OP_REQUIRES_OK(ctx, ng_encap_impl.GetPersistentTFOutputTensor(
ng_exec, cached_persistent_output_tensors));
auto out_tensor =
cached_persistent_output_tensors[out_idx].AccessTensor(ctx);
ctx->set_output(out_idx, *out_tensor);
}
}
} // end compute

int NGraphEncapsulateImpl::s_instance_count = 0;
Expand Down
1 change: 1 addition & 0 deletions ngraph_bridge/ngraph_encapsulate_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class NGraphEncapsulateOp : public OpKernel {
private:
NGraphEncapsulateImpl ng_encap_impl;
std::mutex m_compute_lock;
bool m_use_persistent;
};

} // namespace ngraph_bridge
Expand Down
10 changes: 10 additions & 0 deletions test/tf_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,16 @@ Status CreateSession(const string& graph_filename, const string& backend_name,
return load_graph_status;
}

// This test might fail when running with persistent output tensors
// Maybe because once we have computed outputs out to persistent tensors,
// the next thread comes in before Compare runs, and changes the values?
// For example, if we add a 1sec sleep right after entering Compute(), this test
// would pass (since we now allow enough time for compare to run before the next
// thread comes in and modifies the persistent tensor values)
// TODO: see how persistenttensors might fit in with this kind of multithreading
// (symmetric parallel)
TEST(tf_exec, SingleGraphOn2Threads) {
SetEnvVariable("NGRAPH_TF_DISABLE_PERSISTENT", "1");
string graph_name = "test_axpy.pbtxt";
vector<string> backends{"CPU", "INTERPRETER"};
for (auto be : backends) {
Expand Down Expand Up @@ -136,6 +145,7 @@ TEST(tf_exec, SingleGraphOn2Threads) {
thread0.join();
thread1.join();
}
UnsetEnvVariable("NGRAPH_TF_DISABLE_PERSISTENT");
}

TEST(tf_exec, hello_world) {
Expand Down

0 comments on commit 16f8811

Please sign in to comment.