Skip to content

Commit

Permalink
Merge pull request #4 from pakozm/devel
Browse files Browse the repository at this point in the history
Devel
  • Loading branch information
pakozm committed May 17, 2014
2 parents b4a0dd7 + 082b140 commit f679fca
Show file tree
Hide file tree
Showing 21 changed files with 713 additions and 316 deletions.
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Francisco Zamora-Martinez (2014-)
Lua-MapReduce, Copyright (c) 2014, Francisco Zamora-Martinez

GNU GENERAL PUBLIC LICENSE
Version 3, 29 June 2007
Expand Down
190 changes: 3 additions & 187 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,194 +26,10 @@ add the active directory by writing in the terminal:
$ export LUA_PATH='?.lua;?/init.lua'
```

Usage
-----
Documentation
-------------

Two Lua scripts have been prepared for fast running of the software.

- `execute_server.lua` runs the master server for your map-reduce operation.
Only **one instance** of this script is needed. Note that this software
receives the **map-reduce task** splitted into several Lua modules. These
modules had to be visible in the `LUA_PATH` of the server and all the workers
that you execute. This script receives 7 mandatory arguments:

1. The connection string, normally `localhost` or `localhost:21707`.
2. The name of the database where the work will be done.
3. A Lua module which contains the **task** function data.
4. A Lua module which contains the **map** function data.
5. A Lua module which contains the **partition** function data.
6. A Lua module which contains the **reduce** function data.
7. A Lua module which contains the **final** function data.

- `execute_worker.lua` runs the worker, which is configured by default to
execute one map-reduce task and finish its operation. One task doesn't mean
one job. A **map-reduce task** is performed as several individual **map/reduce
jobs**. A worker waits until all the possible map or reduce jobs are completed
to consider a task as finished. This script receives two arguments:

1. The connection string, as above.
2. The name of the database where the work will be done, as above.

A simple word-count example is available in the repository. There are two
shell-scripts: `execute_server_example.sh` and `execute_worker_example.sh`;
which are ready to run the word-count example in only one machine, with one or
more worker instances. The execution of the example looks like this:

**SERVER**
```
$ ./execute_example_server.sh > output
# Preparing MAP
# MAP execution
100.0 %
# Preparing REDUCE
# MERGE AND PARTITIONING
100.0 %
# CREATING JOBS
# STARTING REDUCE
# REDUCE execution
100.0 %
# FINAL execution
```

**WORKER**
```
$ ./execute_example_worker.sh
# NEW TASK READY
# EXECUTING MAP JOB _id: "1"
# FINISHED
# EXECUTING MAP JOB _id: "2"
# FINISHED
# EXECUTING MAP JOB _id: "3"
# FINISHED
# EXECUTING MAP JOB _id: "4"
# FINISHED
# EXECUTING REDUCE JOB _id: "121"
# FINISHED
# EXECUTING REDUCE JOB _id: "37"
# FINISHED
...
```

Map-reduce task example: word-count
-----------------------------------

The example is composed by one Lua module for each of the map-reduce functions,
and are available at the directory `examples/WordCount/`. All the modules has
the same structure, they return a Lua table with two fields:

- **init** function, which receives a table of arguments and allows to configure
your module options, in case that you need any option.

- A function which implements the necessary Lua code for the operation. The name
of the function is different for each operation.

A map-reduce task is divided, at least, in the following modules:

- **taskfn.lua** is the script which defines how the data is divided in order to
create **map jobs**. The **func** field is executed as a Lua *coroutine*, so,
every map job will be created by calling `corotuine.yield(key,value)`.

```Lua
-- arg is for configuration purposes, it is allowed in any of the scripts
local init = function(arg)
-- do whatever you need for initialization parametrized by arg table
end
return {
init = init,
taskfn = function()
coroutine.yield(1,"mapreduce/server.lua")
coroutine.yield(2,"mapreduce/worker.lua")
coroutine.yield(3,"mapreduce/test.lua")
coroutine.yield(4,"mapreduce/utils.lua")
end
}
```

- **mapfn.lua** is the script where the map function is implemented. The
**func** field is executed as a standard Lua function, and receives three
arguments `(key,value,emit)`. The first two are generated b
one of the yields at your `taskfn`
script. The third argument is a function. Map results
are produced by calling the function
`emit(key,value)`.

```Lua
return {
init = function() end,
mapfn = function(key,value,emit)
for line in io.lines(value) do
for w in line:gmatch("[^%s]+") do
emit(w,1)
end
end
end
}
```

- **partitionfn.lua** is the script which describes how the map results are
grouped and partitioned in order to create **reduce jobs**. The **func** field
is a hash function which receives an emitted key and returns an integer
number. Depending in your hash function, more or less reducers will be needed.

```Lua
-- string hash function: http://isthe.com/chongo/tech/comp/fnv/
local NUM_REDUCERS = 10
local FNV_prime = 16777619
local offset_basis = 2166136261
local MAX = 2^32
return {
init = function() end,
partitionfn = function(key)
-- compute hash
local h = offset_basis
for i=1,#key do
h = (h * FNV_prime) % MAX
h = bit32.bxor(h, key:byte(i))
end
return h % NUM_REDUCERS
end
}
```

- **reducefn.lua** is the script which implements the reduce function. The
**func** field is a function which receives a pair `(key,values)` where the
`key` is one of the emitted keys, and the `values` is a Lua array (table with
integer and sequential keys starting at 1) with all the available map values
for the given key. The system could reuse the reduce function several times,
so, it must be idempotent. The reduce results will be grouped following the
partition function. For each possible partition, a GridFS file will be created
in a collection called `dbname_fs` where dbname is the database name defined
above.

```Lua
return {
init = function() end,
reducefn = function(key,values)
local count=0
for _,v in ipairs(values) do count = count + v end
return count
end
}
```

- **finalfn.lua** is the script which implements how to take the results
produced by the system. The **func** field is a function which receives a
Lua pairs iterator, and returns a boolean indicating if to destroy or not
the GridFS collection data. If the returned value is `true`, the results
will be removed. If the returned value is `false` or `nil`, the results
will be available after the execution of your map-reduce task.

```Lua
return {
init = function() end,
finalfn = function(it)
for key,value in it do
print(value,key)
end
return true -- indicates to remove mongo gridfs result files
end
}
```
Available at [wiki pages](https://github.com/pakozm/lua-mapreduce/wiki).

Performance notes
-----------------
Expand Down
2 changes: 1 addition & 1 deletion examples/WordCount/finalfn.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ return {
init = function() end,
finalfn = function(pairs_iterator)
for key,value in pairs_iterator do
print(value,key)
print(value[1],key)
end
return true -- indicates to remove mongo gridfs result files
end
Expand Down
98 changes: 57 additions & 41 deletions examples/WordCount/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,46 +3,62 @@ local NUM_REDUCERS = 10
local FNV_prime = 16777619
local offset_basis = 2166136261
local MAX = 2^32
return {
-- arg is for configuration purposes, it will be executed with init_args given
-- to the server
init = function(arg) end,

taskfn = function()
coroutine.yield(1,"mapreduce/server.lua")
coroutine.yield(2,"mapreduce/worker.lua")
coroutine.yield(3,"mapreduce/test.lua")
coroutine.yield(4,"mapreduce/utils.lua")
end,

mapfn = function(key,value,emit)
for line in io.lines(value) do
for w in line:gmatch("[^%s]+") do
emit(w,1)
end
end
end,

partitionfn = function(key)
-- compute hash
local h = offset_basis
for i=1,#key do
h = (h * FNV_prime) % MAX
h = bit32.bxor(h, key:byte(i))
end
return h % NUM_REDUCERS
end,

reducefn = function(key,values)
local count=0
for _,v in ipairs(values) do count = count + v end
return count
end,

finalfn = function(pairs_iterator)
for key,value in pairs_iterator do
print(value,key)

-- arg is for configuration purposes, it will be executed with init_args given
-- to the server
local init = function(arg) end

local taskfn = function(emit)
emit(1,"mapreduce/server.lua")
emit(2,"mapreduce/worker.lua")
emit(3,"mapreduce/test.lua")
emit(4,"mapreduce/utils.lua")
end

local mapfn = function(key,value,emit)
for line in io.lines(value) do
for w in line:gmatch("[^%s]+") do
emit(w,1)
end
return true -- indicates to remove mongo gridfs result files
end,
end
end

local partitionfn = function(key)
-- compute hash
local h = offset_basis
for i=1,#key do
h = (h * FNV_prime) % MAX
h = bit32.bxor(h, key:byte(i))
end
return h % NUM_REDUCERS
end

local reducefn = function(key,values,emit)
local count=0
for _,v in ipairs(values) do count = count + v end
emit(count)
end

local combinerfn = reducefn

local finalfn = function(pairs_iterator)
for key,value in pairs_iterator do
print(value[1],key)
end
return true -- indicates to remove mongo gridfs result files
end

return {
init = init,
taskfn = taskfn,
mapfn = mapfn,
partitionfn = partitionfn,
reducefn = reducefn,
combinerfn = combinerfn,
finalfn = finalfn,
-- This three properties are true for this reduce function.
-- Combiners always must to fulfill this properties.
associative_reducer = true,
commutative_reducer = true,
idempotent_reducer = true,
}
17 changes: 12 additions & 5 deletions examples/WordCount/reducefn.lua
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
local reducefn = function(key,values,emit)
local count=0
for _,v in ipairs(values) do count = count + v end
emit(count)
end
return {
init = function() end,
reducefn = function(key,values)
local count=0
for _,v in ipairs(values) do count = count + v end
return count
end
reducefn = reducefn,
combinerfn = reducefn,
-- This three properties are true for this reduce function.
-- Combiners always must to fulfill this properties.
associative_reducer = true,
commutative_reducer = true,
idempotent_reducer = true,
}
10 changes: 10 additions & 0 deletions examples/WordCount/reducefn2.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
local reducefn = function(key,values,emit)
local count=0
for _,v in ipairs(values) do count = count + v end
emit(count)
end
return {
init = function() end,
reducefn = reducefn,
combinerfn = reducefn,
}
10 changes: 5 additions & 5 deletions examples/WordCount/taskfn.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ local init = function(arg)
end
return {
init = init,
taskfn = function()
coroutine.yield(1,"mapreduce/server.lua")
coroutine.yield(2,"mapreduce/worker.lua")
coroutine.yield(3,"mapreduce/test.lua")
coroutine.yield(4,"mapreduce/utils.lua")
taskfn = function(emit)
emit(1,"mapreduce/server.lua")
emit(2,"mapreduce/worker.lua")
emit(3,"mapreduce/test.lua")
emit(4,"mapreduce/utils.lua")
end
}
4 changes: 2 additions & 2 deletions examples/WordCountBig/taskfn.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ return {
-- init is for configuration purposes, it is allowed in any of the scripts
init = function(arg)
end,
taskfn = function()
taskfn = function(emit)
local f = io.popen("ls /home/experimentos/CORPORA/EUROPARL/en-splits/*","r")
local i=0
for filename in f:lines() do
i=i+1
coroutine.yield(i,filename)
emit(i,filename)
end
f:close()
end
Expand Down
3 changes: 2 additions & 1 deletion execute_example_server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ lua execute_server.lua localhost wordcount \
examples.WordCount.mapfn \
examples.WordCount.partitionfn \
examples.WordCount.reducefn \
examples.WordCount.finalfn
examples.WordCount.finalfn \
examples.WordCount.reducefn $@
Loading

0 comments on commit f679fca

Please sign in to comment.