Skip to content

Commit

Permalink
Merge pull request #11 from pakozm/devel
Browse files Browse the repository at this point in the history
Devel
  • Loading branch information
pakozm committed Jun 23, 2014
2 parents b0dde09 + 5b0d136 commit 30d62cf
Show file tree
Hide file tree
Showing 9 changed files with 24 additions and 17 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ $ ./execute_BIG_server.sh > output

**Note 1:** using only one worker takes: 146 seconds

**Note 2:** using 30 mappers and 10 reducers (30 workers) takes: 35 seconds
**Note 2:** using 30 mappers and 15 reducers (30 workers) takes: 32 seconds

A naive word-count version implemented with pipes and shellscripts takes:

Expand Down
2 changes: 1 addition & 1 deletion mapreduce/examples/WordCount/init.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- string hash function: http://isthe.com/chongo/tech/comp/fnv/
local NUM_REDUCERS = 10
local NUM_REDUCERS = 15
local FNV_prime = 16777619
local offset_basis = 2166136261
local MAX = 2^32
Expand Down
2 changes: 1 addition & 1 deletion mapreduce/examples/WordCount/partitionfn.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- string hash function: http://isthe.com/chongo/tech/comp/fnv/
local NUM_REDUCERS = 10
local NUM_REDUCERS = 15
local FNV_prime = 16777619
local offset_basis = 2166136261
local MAX = 2^32
Expand Down
3 changes: 2 additions & 1 deletion mapreduce/fs.lua
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ local fs = {
local utils = require "mapreduce.utils"

local make_wildcard_from_mongo_match = function(match_tbl)
return match_tbl.filename["$regex"]:gsub("%.%*","*"):gsub("[$^]","")
return match_tbl.filename["$regex"]:
gsub("\\%.","."):gsub("%.%*","*"):gsub("[$^]","")
end

------------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion mapreduce/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ local utils = require "mapreduce.utils"
local persistent_table = require "mapreduce.persistent_table"

local mapreduce = {
_VERSION = "0.3.2",
_VERSION = "0.3.4",
_NAME = "mapreduce",
worker = worker,
server = server,
Expand Down
2 changes: 1 addition & 1 deletion mapreduce/job.lua
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ function job_prepare_reduce(self, g, storage, path)
local fs,make_builder,make_lines_iterator = fs.router(self.cnn,mappers,
storage,path)
local filenames = {}
local match_str = string.format("^%s.*", job_file)
local match_str = string.format("^%s\\..*", job_file)
local list = fs:list({ filename = { ["$regex"] = match_str } })
for v in list:results() do
table.insert(filenames, v.filename)
Expand Down
10 changes: 8 additions & 2 deletions mapreduce/server.lua
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ local get_storage_from = utils.get_storage_from

-- PRIVATE FUNCTIONS AND METHODS

local function keys_of(t)
local out = {}
for k,_ in pairs(t) do table.insert(out, k) end
return out
end

local function count_digits(n)
-- sanity check
assert(n >= 0, "Only valid for positive integers")
Expand Down Expand Up @@ -293,15 +299,15 @@ local function server_prepare_reduce(self)
max_part_key = math.max(max_part_key, part_key)
-- annotate the mapper
mappers_by_part_key[part_key] = mappers_by_part_key[part_key] or {}
table.insert(mappers_by_part_key[part_key], map_hostnames[mapper_key])
mappers_by_part_key[part_key][ map_hostnames[mapper_key] ] = true
end
local part_key_digits = count_digits(max_part_key)
local result_str_format = "%s.P%0" .. tostring(part_key_digits) .. "d"
local count=0
for part_key,_ in pairs(part_keys) do
count = count + 1
local value = {
mappers = mappers_by_part_key[part_key],
mappers = keys_of(mappers_by_part_key[part_key]),
file = string.format("%s/%s.P%d", path, map_results_ns, part_key),
result = string.format(result_str_format, self.result_ns, part_key),
}
Expand Down
12 changes: 6 additions & 6 deletions mapreduce/utils.lua
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ local function gridfs_lines_iterator(gridfs, filename)
local gridfs = gridfs
local gridfile = gridfile
if current_chunk < num_chunks then
chunk = chunk or gridfile:chunk(current_chunk)
chunk = chunk or assert( gridfile:chunk(current_chunk) )
if current_pos <= chunk:len() then
local first_chunk = current_chunk
local last_chunk = current_chunk
Expand All @@ -149,15 +149,15 @@ local function gridfs_lines_iterator(gridfs, filename)
for k,v in ipairs(tbl) do tbl[k] = nil end
local found_line = false
repeat
chunk = chunk or gridfile:chunk(current_chunk)
chunk = chunk or assert( gridfile:chunk(current_chunk) )
data = data or chunk:data()
local chunk_len = chunk:len()
local match = data:match("^([^\n]*)\n", current_pos)
if match then
tbl[ #tbl+1 ] = match
current_pos = #match + current_pos + 1 -- +1 because of the \n
abs_pos = #match + abs_pos + 1
found_line = true
found_line = true
else -- if match ... then
-- inserts the whole chunk substring, no \n match found
tbl[ #tbl+1 ] = data:sub(current_pos, chunk_len)
Expand Down Expand Up @@ -250,8 +250,8 @@ local function merge_iterator(fs, filenames, make_lines_iterator)
local data = data
local take_next = take_next
local queue = queue
-- merge all the files until finished (empty queue)
while not queue:empty() do
-- merge all the files until empty queue (finished)
if not queue:empty() then
counter = counter + 1
--
local key,result = merge_min_keys()
Expand All @@ -260,7 +260,7 @@ local function merge_iterator(fs, filenames, make_lines_iterator)
collectgarbage("collect")
end
return key,result
end -- while not finished()
end -- if not finished
end -- return function
end

Expand Down
6 changes: 3 additions & 3 deletions mapreduce/worker.lua
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,12 @@ function worker_methods:execute()
num_failed_jobs = num_failed_jobs + 1
end
failed_jobs[id] = true
end
end -- if self.current_job then
self.cnn:flush_pending_inserts(0)
self.cnn:insert_error(utils.get_hostname(), msg)
print(string.format("Error executing a job: %s",msg))
io.stderr:write(string.format("Error executing a job: %s\n",msg))
utils.sleep(utils.DEFAULT_SLEEP*4)
end
end -- if not ok then
until ok or num_failed_jobs >= utils.MAX_WORKER_RETRIES
print(string.format("# Worker retries: %d",num_failed_jobs))
if num_failed_jobs >= utils.MAX_WORKER_RETRIES then
Expand Down

0 comments on commit 30d62cf

Please sign in to comment.