Skip to content

Commit

Permalink
Merge pull request #6 from pakozm/devel
Browse files Browse the repository at this point in the history
Devel
  • Loading branch information
pakozm committed May 19, 2014
2 parents c292ec9 + a31a23f commit 93861b5
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 22 deletions.
23 changes: 15 additions & 8 deletions mapreduce/examples/April-ANN/common.lua
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,17 @@ local mapfn = function(key, value, emit)
local train_func = deserialize_from_gridfs(gridfs, assert(conf.train_func))
local trainer = train_func:get_state_table().last
conf:read_only(true)
local weight_grads,loss_matrix = compute_gradients_and_loss(trainer,
key, value,
conf)
local weight_grads,loss_matrix,bunch_size =
compute_gradients_and_loss(trainer, key, value, conf)
conf:read_only(false)
assert(weight_grads and loss_matrix and bunch_size,
"compute_gradients_and_loss had to return gradients, loss_matrix and bunch_size")
for name,grads in pairs(weight_grads) do
serialize_and_map_emit(name,
{ grads, trainer:weights(name):get_shared_count() },
{
grads,
trainer:weights(name):get_shared_count()*bunch_size
},
emit)
end
serialize_and_map_emit(TR_LOSS_KEY, loss_matrix, emit)
Expand All @@ -129,7 +133,7 @@ local reducefn = function(key, values, emit)
end
serialize_and_red_emit({ loss:get_accum_loss() }, emit)
else
-- accumulate here the shared count
-- accumulate gradients and shared count
local t = deserialize_emitted_value(values[1])
local gradient = t[1]
local counts = t[2]
Expand Down Expand Up @@ -165,10 +169,12 @@ local finalfn = function(pairs_iterator)
tr_loss_mean = value[1]
tr_loss_var = value[2]
else
local N = value[2] if not N or N==0 then N=1 end
if params.smooth_gradients then
-- gradients smoothing
value[1]:scal( 1.0/math.sqrt(N) )
end
weight_grads[key] = value[1]
local w = trainer:weights(key)
w:reset_shared_count()
w:add_to_shared_count(value[2])
end
end
assert(tr_loss_mean)
Expand Down Expand Up @@ -214,6 +220,7 @@ local make_map_reduce_task_table = function(t)
user_taskfn = { mandatory = true, type_match="function" },
user_finalfn = { mandatory = true, type_match="function" },
generate_new_trainer_and_train_func = { mandatory = true, type_match="function" },
smooth_gradients = { mandatory = false, default = true },
}, t)
--
dbname = params.dbname
Expand Down
28 changes: 16 additions & 12 deletions mapreduce/examples/April-ANN/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ local NUM_REDUCERS = 10
local EXP_DBHOST = "localhost"
local EXP_DBNAME = "exp_digits"

local bunch_size = 32
local bunch_size = 128
local weights_random = random(1234)
local description = "256 inputs 256 tanh 128 tanh 10 log_softmax"
local description = "256 inputs 128 tanh 10 log_softmax"
local inf = -1
local sup = 1
local shuffle_random = random() -- TOTALLY RANDOM FOR EACH WORKER
local learning_rate = 0.005
local learning_rate = 0.01
local momentum = 0.02
local weight_decay = 1e-04
local max_epochs = 40
Expand Down Expand Up @@ -77,7 +77,7 @@ local make_load_matrix = function(value)
end
end

local make_load_dataset = function(mat)
local make_load_dataset = function(mat,m2)
return function()
local train_input = dataset.matrix(mat,
{
Expand Down Expand Up @@ -122,10 +122,11 @@ local make_load_dataset = function(mat)
end
end

-- receives the persistent table in read-only mode as last argument
-- receives a trainer, key,value pair and the persistent table in read-only mode
-- as last argument; returns gradients, loss_matrix and bunch_size
local compute_gradients_and_loss = function(trainer, key, value, conf)
local mat = cached(value, make_load_matrix(value), mat_cache)
local ds_tbl = cached(value, make_load_dataset(mat), ds_cache)
local mat = cached(value, make_load_matrix(value), mat_cache)
local ds_tbl = cached(value, make_load_dataset(mat,m2), ds_cache)
local in_ds = ds_tbl.train_input
local out_ds = ds_tbl.train_output
local bunch_tbl = {}
Expand All @@ -136,15 +137,16 @@ local compute_gradients_and_loss = function(trainer, key, value, conf)
local target = out_ds:getPatternBunch(bunch_tbl)
local grads,tr_loss,tr_loss_matrix = trainer:compute_gradients_step(input,
target)
return grads,tr_loss_matrix
return grads,tr_loss_matrix,bunch_size
end

-- receives the persistent table in read-only mode as last argument
-- receives a trainer and the persistent table in read-only mode as last
-- argument; returns the validation loss mean and variance
local compute_validation_loss = function(trainer, conf)
util.omp_set_num_threads(4)
local value = "misc/digits.png"
local mat = cached(value, make_load_matrix(value), mat_cache)
local ds_tbl = cached(value, make_load_dataset(mat), ds_cache)
local mat = cached(value, make_load_matrix(value), mat_cache)
local ds_tbl = cached(value, make_load_dataset(mat,m2), ds_cache)
local in_ds = ds_tbl.val_input
local out_ds = ds_tbl.val_output
local va_loss_mean,va_loss_var = trainer:validate_dataset{
Expand All @@ -155,7 +157,8 @@ local compute_validation_loss = function(trainer, conf)
return va_loss_mean,va_loss_var
end

-- the last argument is the persistent table (allows read/write operations)
-- receives a train_func instance and the persistent table (allows read/write
-- operations)
local user_finalfn = function(train_func, conf)
print(train_func:get_state_string())
train_func:save("best_func.lua")
Expand All @@ -173,4 +176,5 @@ return common.make_map_reduce_task_table {
generate_new_trainer_and_train_func = generate_new_trainer_and_train_func,
compute_gradients_and_loss = compute_gradients_and_loss,
compute_validation_loss = compute_validation_loss,
-- smooth_gradients = true, -- by default it is true
}
22 changes: 21 additions & 1 deletion mapreduce/task.lua
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,14 @@ function task:get_partition_args()
return self.tbl.init_args
end

-- JOB INTERFACE
-- TASK INTERFACE

local cache_map_ids = {}
local cache_inv_map_ids = {}
function task.reset_cache()
cache_map_ids = {}
cache_inv_map_ids = {}
end

-- workers use this method to load a new job in the caller object
function task:take_next_job(tmpname)
Expand All @@ -265,6 +272,12 @@ function task:take_next_job(tmpname)
{ status = STATUS.BROKEN, },
},
}
-- after first iteration, map jobs done previously will be taken if possible,
-- reducing the overhead for loading data
if self:get_iteration() > 1 and task_status == TASK_STATUS.MAP then
query._id = { ["$in"] = cache_map_ids }
if db:count(jobs_ns, query) == 0 then query._id = nil end
end
local set_query = {
worker = utils.get_hostname(),
tmpname = tmpname_summary(tmpname),
Expand All @@ -282,6 +295,13 @@ function task:take_next_job(tmpname)
-- updated its data
local job_tbl = db:find_one(jobs_ns, set_query)
if job_tbl then
if task_status == TASK_STATUS.MAP then
local _id = job_tbl._id
if not cache_inv_map_ids[_id] then
cache_inv_map_ids[_id] = true
table.insert(cache_map_ids, _id)
end
end
local storage,path = self:get_storage()
return task_status,job(self.cnn, job_tbl, task_status,
self:get_fname(), self:get_args(),
Expand Down
3 changes: 2 additions & 1 deletion mapreduce/worker.lua
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ local utils = require "mapreduce.utils"
local task = require "mapreduce.task"
local cnn = require "mapreduce.cnn"

-- PRIVATE FUNCTIONS
-- PRIVATE FUNCTIONS AND PROPERTIES

-- executes the worker main loop; it runs querying the task object for new jobs
local worker_execute = function(self)
Expand Down Expand Up @@ -92,6 +92,7 @@ local worker_execute = function(self)
ntasks = ntasks + 1
job_done = false
job.reset_cache()
task.reset_cache()
end
if ntasks < MAX_TASKS then
print(string.format("# WAITING...\tntasks: %d/%d\tit: %d/%d\tsleep: %.1f",
Expand Down

0 comments on commit 93861b5

Please sign in to comment.