Home > mincemeatpy

mincemeatpy

Mincemeatpy is a project mainly written in ..., based on the MIT license.

Fork of mincemeat.py lightweight Python Map-Reduce framework

mincemeat.py: MapReduce on Python

mincemeat.py is licensed under the MIT license. See LICENSE for more details

REQUIREMENTS

Python 2.5 (2.6+ is best). 2.4 doesn't have a compatible hmac/hashlib/sha1

DESCRIPTION

The Map/Reduce Transaction applies 2 functions over a datasource of the

form:

    {name1: corpus1, ..., nameN: corpusN}

roughly corresponding to:

    results = reduce( map( datasource ))

ultimately resulting in results of the form:

{key1: value, key2: value, ..., keyN: value}

First, the Server distributes each (name: corpus) across all available Clients to run the Map function. Then, once all Map results are collected and merged by key, the Server distributes each (key, [value, ...]) to all avaiable Clients to run the Reduce function. Finally, all the Reduce results are collected and merged by key, to produce the final results.

The Map Phase (distributed accross all clients), takes the data

and produces:

    (name1, corpus1)  ==> (key1, [value, ...]), ..., (keyN, [value, ...])
...
    (nameN, corpusN)  ==> (key1, [value, ...]), ..., (keyN, [value, ...])

(Server collects all key1 values, ..., keyN values)

The Reduce Phase (distributed accross all clients), takes the

combined data from all of the Map phase results, and sends the data for one key to each Client:

    (key1, [value, ...]) ==> (key1, result)
...
    (keyN, [value, ...]) ==> (keyN, result)

The Server collects all keys, produces the results dictionary:

    {key1: result, ..., keyN: result}

All communication between Server and Clients occurs asynchronously using the

standard Python asyncore/asynchat library, and may be interspersed with user-defined communication in either direction, either from within the application's asynchronous communication thread, or from another thread. Tools are provided to implement arbitrary synchronous events such as heartbeats, health checking, and soft or hard communications shutdown.

API compatibility with Michael Fairley's original mincemeat.py from has been

largely maintained:

    http://remembersaurus.com/mincemeatpy/

EXAMPLES

Run the basic example with:

python example.py

and a worker with:

python mincemeat.py -p changeme [server address (default "localhost")]

The example-sf-... examples require the Gutenberg SF CD, or some other set of big *.txt files, in:

 ../Gutenberg SF CD/Gutenberg SF/*.txt

Download the Gutenberg SF CD using Bit Torrent, from:

http://www.gutenberg.org/cdproject/pgsfcd-032007.zip.torrent

Unzip the ISO, mount it, and copy (or symbolic link, on Posix platforms) it in the directory above where you've cloned the mincemeatpy repository; name it "Gutenberg SF CD".

  • Skipping the Reduce Phase

The first Big Corpus example illustrates replacing a trivial Reduce phase with a "finish" function executed in the server node. Start a server (binds to all available interfaces):

python example-sf-maponly.py

and worker(s) with:

python mincemeat.py -p changeme [server address (default "localhost")
  • Dynamically Electing a Server

The next examples illustrate various means of dynamically spawning a server, if necessary. They all bind to "localhost" by default (change addr_info in the text to specify another server host interface name). Each instance run elects itself to be the server, if necessary, or becomes a client. Start multiple instances on the same host:

python example-sf-election.py

and workers(s) with:

python example-sf-election.py

or python mincemeat.py -v -p changeme

  • Multithreaded Clients/Servers

The example-sf-masterless.py example runs both a client thread and a server thread (if necessary), in the same instance, to illustrate how one might build a dynamic, robust Map-Reduce grid, when we don't have control over the lifespan of the instances (eg. a web server farm, ...).

python example-sf-masterless.py

This will run a client and a server! To speed it up, create more clients by running the same command. Another implementation with similar functionality, but using a better threading based daemon implementation for the Client and Server is:

python example-sf-daemon.py
  • Client-driven REPL for submitting Map/Reduce tasks

A much more complex example allows multiple Clients to be established, each of which may independently submit Map/Reduce Transaction to the (automatically created) Server. A prompt is printed on the console, allowing a file name pattern (including "glob" wildcards) to be specified, and the matching files are word-counted.

python example-sf-repr.py

USAGE

Create an instance of mincemeat.Server in your application, and set some

attributes specifying the desired Map and Reduce functions and a datasource, and call the Server.run_server(). Start Client node(s); unless your application specifies custom Client.mapfn implementations, a generic client may be started on any node, by running:

python mincemeat.py -p "changeme" [hostname]

where the password "changeme" corresponds to the one specified in your Server implementation, and hostname is the network interface your Server binds to (default is localhost, so by default all Clients would need to run on the same host). They will eacho connect to the Server, receive their Map/Reduce function configurations, and begin receiving Map and then Reduce tasks. Finally, the Server will print the results. See example*.py for details.

  • datasource

    Present your input data in the dict-like form, where one or more keys each index a corpus of data. You may use a dict, or any other data structure which implements the iterator protocol (iter() and next()) for returning all keys, and indexing (getitem() to return the data for each key. Each key must be unique, as it is used in mincemeat.py for housekeeping.

  • Map Phase: mapfn, collectfn

    The Map function must accept 2 parameters, representing a name and the corpus of data. It must return an iterator (or a generator) which yields (key, value) tuples from the data, which will be returned (as a dictionary) as the results of the Map phase.

    The Map function is unique amongst the functions that the user must provide; it is the only one that must be implemented as an iterator returning (key, value) pairs. For example, here's a mapfn that implements word counting (k is a filename, v is a corpus of text):

    def get_lower_simple( k, v ): for w in v.split(): yield w.lower(), 1

    The Map phase always returns a dictionary where each key indexes a list [value, value, ...] of all values that where produced with that key. So, at the end of the map phase, we'd see:

    {'a': [1, 1, ..., 1], 'and': [1, 1, ..., 1], ...}

returned to the server. This works, but is sub-optimal for very large jobs; we might want to post-process the Map data in the client. This is where the collectfn is used.

** Specialized Map/Reduce Function Configuration

Normally, you will specify 'mapfn' as a plain function in your Server code

(not a class or instance method), and you will either specify it as a Server attribute:

def mapfn(k, v):
   ...
s = mincemeat.Server()
s.mapfn = mapfn
...
s.conn(...)

or, you may supply it in the Server.conn(credentials=...) package:

s = mincemeat.Server()
credentials = {
    'password': "changeme",
    'mapfn':    mapfn,
    ...
}
s.conn(**credentials)

To avoid performing the Map or the Reduce phase altogether, you may specify

None for mapfn or reducefn (but NOT both!):

s = mincemeat.Server()
s.mapfn = None      # Don't perform Map; skip, and send data to Reduce

or: s.reducefn = None # Don't perform Reduce; send Map output to results

This will result in no mapfn (or reducefn) being transmitted to the Client, and no Map (or Reduce) tasks being generated; the output of the datasource will be immediately sent to the (remaining) Map or Reduce phase.

Sometimes, however, your Map and/or Reduce functions are simply too complex

to serialize and transmit from the Server to the (generic) Clients in the standard way. If so, you may specify a fully custom mapfn directly as a method of your derived mincemeat.Client class implementation (of course, you won't be able to run default "python mincemeat.py ..." clients!) In your Server code:

s = mincemeat.Server()
s.mapfn = False     # Do Map, to don't transmit a 'mapfn' to Client!
s.conn(...)

and in your custom Client implementation:

class my_client(mincemeat.Client):
    def mapfn(self, k, v):
    ...

Now, you may access any custom data or methods in your Client mapfn() required for processing Map tasks.

** collectfn (optional)

If the (optional) collectfn is provided, it is applied (in the

client) after the mapfn is done producing the raw Map (key,value) data. The same function signatures are allowed as those used in the Reduce phase; a simple function taking key, [value, value], and returning a single value (just like reducefn and finishfn):

def sum_values( k, vs ):
    return sum( vs )

Or, it may be implemented as an iterator/generator taking (key, [value, ...]) and yielding (key, value):

def sum_values_generator( kvi ):
    for k, vs in kvi:
    yield k, sum( vs )

And for consistency, we allow collectfn to also return (key, [value, ...]) pairs:

def sum_values_long( kvi ):
    for k, vs in kvi:
        yield k, vs if len(vs) < 10 else [sum(vs)]

The result of the Map phase is always (key, [value, ...]); the collectfn is

allowed to return single values or lists; we will always wrap any non-list collectfn result value as [value].

  • Reduce Phase: reducefn, finishfn

    In the Reduce phase, the the Server send each key,[value, ...] to a Client, which reduces it to a key,value result and sends it back. If this is trivial, you may decide to skip the Reduce phase altogether; just leave reducefn set to None. We allow any function taking key,[value,...] and returning value:

    function(k, vs) ==> v

or taking an iterator yielding key,[value,...], and itself returing an iterable yielding key,value:

function(iter([(k, vs), ...])) ==> [(k, v), ...]

** finishfn (optional)

After all the Reduce phase is done, the Server runs the (optional) finishfn.

This function is presented all Reduce phase data for any finishing required. In fact, if the Reduce function is trivial, you may chose to run it entirely on the Server (instead of sending the Reduce phase data expensively to the Client), by using finishfn to run the Reduce function, instead of assigning it to reducefn.

  • Result Phase: resultfn (optional)

    Fired by the Server as soon as results are available. If not specified, the Server default resultfn just collects up all results for later access. A function taking a transaction ID and results dictionary is required:

    def resultfn(txn, results): print "# Transaction ", txn for k, v in results.iteritems(): print k, ": ", v

TOPOLOGY

In its most simple form, a Map-Reduce cluster has a Server and several

(perhaps transient) Clients. The server configures the Client appropriately by serializing and sending the appropriate Map and Reduce functions, sends one or more Map and/or Reduce requests to the Client, collecting the results. If the Client fails without returning the result, the Server will (eventually) send the same Map/Reduce requests to another Client. When complete, the Server disconnects from all Clients and exits. In all cases, the Server is in control of the lifespan of the Clients, and also schedules all of the Map/Reduce Tasks, and all constituent Map/Reduce Transactions.

           +-------(C)
          /
         v
        (S)<--------(C)
         ^
          
           +-----(C)

In other typical uses of Map-Reduce, the Clients might be in control.  For

example, if you have a web server farm with many clients created spontaneously by the web server infrastructure, where any each process will be a Map-Reduce Client, and any Client may need to schedule some Map/Reduce task to be processed across all other Clients. In this case, a Server needs to be elected from among the Clients (if none exists yet); another Server may need to be re-elected, should one disappear before (or during) processing of some Client's task. Since the Server binds atomically and unilaterally to a port, any Client could spawn a thread to implement a Server when it fails to connect to an existing one. Here is an example of a Client connecting to a Server hosted as a thread within the same process:

           +-------(C)
          /
         v
    (C->(S))<--------(C)
         ^
          
           +-----(C)

After that server node fails, the Clients connect to a new one; the first

one to detect the failure attempts to reconnect and then fails, and will spawn a Server thread:

                   (C)-+
                        
                         v
    (XXXXX)         (C->(S))
                         ^
                        /
                 (C)---+