Skip to content
Paco Zamora Martinez edited this page May 17, 2014 · 8 revisions

The basis

A MongoDB database is used in order to communicate all the machines and for persistence purposes, allowing fault tolerancy. In the basic, the tool needs to know:

  • A connection string which indicates the destination of all MongoDB queries.

  • A database name (denoted as dbname) which indicates the name of the collection available for the MapReduce task. Reusing this collection is dangerous because Lua-MapReduce could delete your data.

  • A MapReduce task given in the form of multiple Lua modules. It is possible to write all the task in one Lua script, but you need to provide the same Lua module for every MapReduce function.

  • Optionally the authentication credentials (denoted as auth) if you need to authorize the connection for the given dbname.

The connection string, dbname and auth are needed by both programs, the server and the workers. The purpose of them is:

  • The server creates the dbname if needed and creates all the needed collections. The most important collections are:

    • dbname.task stores information about the current task and its status. At the end of the execution statistics about time needed for map and reduce jobs will be written in this collection.

    • dbname.map_jobs stores as many elements as map jobs you have defined in your task function. The jobs are taken by the workers as soon as possible, and in a random fashion. It is not possible (currently) to define how jobs will be distributed. Mappers take one job and execute your map function.

    • dbname.red_jobs is the equivalent but with the reduce jobs defined by your task. This size depends on the keys returned by mappers and how this keys are groupped by your partition function. Reducers take one job and execute your reduce function.

    • dbname.fs.files and dbname.fs.chunks are the GridFS collections. The result of your task will be stored there with filenames named result.P??? where ??? is the number of the partition. So, will be as many GridFS files as partitions, and all the keys corresponding to one partition will be stored together. This data would be removed before the end of the execution depending on the result value of your final function.

  • The workers, which read from the dbname.map_jobs or dbname.red_jobs and execute the first available job. Mappers input depends in your task scripts, they use the (key,value) pairs returned by the task function, and produce several (key',value') pairs. These result pairs will be stored in in an intermediate storage sorted by keys. This storage could be the GridFS collection (which could be sharded if you do that using MongoShell), a shared file system path or a local file system path with scp+public_keys for synchronization. Is possible to declare a combiner which reduces the size of intermediate data. Mappers result will be partitioned following your partition function. Reducers will take mappers result. At most, a reducer could read its input from as many intermediate files as map jobs are defined in your task. All this input files will be merged in order to combine together all the values available for one key. Reducers result is directly stored at the GridFS collection in the definitive result files.

Architecture

MapReduce tasks are defined by the declaration of several independent parts. Some of them are optional, other are mandatory.

Task function

The task function will be denoted by taskfn. It is the responsible for the definition of data splits. Formally, it is a function which generates a list of key,value pairs:

taskfn() => list(k1,v1)

The keys had to be string convertible with tostring(...) Lua function. The values could be whatever standard Lua type compatible with JSON (string, number, table), but it is limited in size to a few KB. Larger values could be generated by storing the data in MongoDB or other shared resource, and generating a value with the location of the data. Every key,value pair is transformed in a map job which will be solved by one mapper host.

function taskfn(emit)
  emit(key1, value1)
  emit(key2, value2)
  ...
  emit(keyN, valueN)
end

Map function

A worker takes a map job from MongoDB and calls this function to do the desired computation. Map function receives a key,value pair emitted from task function, and produces zero or more mapkey,mapvalue pairs in a different domain:

mapfn(k1,v1) => list(k2,v2)

The system groups by mapkey the map emissions, computing a list of mapvalues related with every mapkey. This grouping is done independently in every map job. The resulting mapvalues lists could be combined if a combiner function is given. In all cases, the map emissions are stored sorted by key at intermediate storage partitioned following the user partition function. A different file for every pair of mapper,partition is needed. After all maps execution at most NxM files will be created being N the number of map jobs and M the number of partitions. As for task function, mapkeys must be string convertible with tostring(...) Lua function, and mapvalues must be a standard Lua type compatible with JSON.

function mapfn(key,value,emit)
  process key,value pair and execute
    emit(mapkey, mapvalue)
end

Combiner function

The combiner function is optional and when available it allows to reduce the storage and bandwidth needed to store and send map results. When given, the combiner function must be commutative, associative and idempotent. The system is able to apply it several times or to not apply it at all. This function receives a mapkey,values pair, where values is the grouped list of map emissions with same mapkey, and returns a list of values in the same domain:

combinerfn(k2,list(v2)) => list(v2)

Usually combiner function returns only one value by doing some kind of reduction. Actually, combiner function could be the same as the reducer function.

function combinerfn(key,values,emit)
  local c = combination of values table
  emit(c)
end

Partition function

The partition function is needed to distribute mapkeys over reduce jobs. All mapkeys which are in the same partition will be processed by the same worker (doing several calls to reduce function). It receives a mapkey and returns a positive integer number:

partitionfn(k2) => N+

Partition functions are defined as a hash function of the mapkey and a modulus operation to ensure that no more than M reducers will be needed.

local M = 10 -- Number of reducers
function partitionfn(key)
  local h = hash of key
  return h % M
end

Reducer function

The reducer function takes a pair mapkey,mapvalues and produces zero or more emissions related with the given mapkey:

reducefn(k2,list(v2)) => list(v3)

Reducer functions could be associative, commutative and/or idempotent. In case reduce function accomplish this properties, them had to be set as true when configuring the system, allowing to increase system performance. Because mapkeys are sorted before being stored at the corresponding partition file, reducers receive the keys sorted.

function reducefn(key,values,emit)
  for each r reduction from values
    emit(r)
end

Final function

The final function is optional, and if not given the task result will be stored at GridFS collection in MongoDB. When given, final function will be called with a one argument. This argument is a Lua iterator, similar to pairs(...) Lua function. The iterator returns pairs k2,list(v3) as produced by reduce functions. Partition files are traversed sorted by partition number, and in every partition keys,values pairs are traversed sorted by key. Formally, final function computes:

finalfn(list(k2,list(v3))) => { true, false or nil, "loop" }

The final function could return three values:

  • true indicates that results stored at MongoDB GridFS will be deleted after final function execution. So, you are responsible to traverse all results and process them in anyway you need.

  • false or nil indicates not to delete results stored at MongoDB GridFS.

  • "loop" indicates to repeat again all the process, allowing to follow the iterative MapReduce framework.

function finalfn(pairs_iterator)
  for key,values in pairs_iterator do
    print(key, values table)
  end
  return true
end