Home > phoebus

phoebus

Phoebus is a project mainly written in ..., based on the Apache-2.0 license.

Phoebus is a distributed framework for large scale graph processing written in Erlang.

PHOEBUS

Introduction

Phoebus is a system written in Erlang for Distributed processing of very large graphs that span billions of vertices and edges. It is basically an implementation of Google's Pregel[1] paper. It supports a Distributed model of computation similar to MapReduce[2], but more tuned to Graph processing.

Computational Model

  • A Graph is partitioned into a groups of Records.
  • A Record consists of a Vertex and its outgoing Edges (An Edge is a Tuple consisting of the edge weight and the target vertex name).
  • A User specifies a 'Compute' function that is applied to each Record.
  • Computation on the graph happens in a sequence of incremental Super Steps.
  • At each Super step, the Compute function is applied to all 'active' vertice of the graph.
  • Vertices communicate with each other via Message Passing.
  • The Compute function is provided with the Vertex record and all Messages sent to the Vertex in the previous SuperStep.
  • A Compute funtion can
    • Mutate the value associated to a vertex
    • Add/Remove outgoing edges.
    • Mutate Edge weight
    • Send a Message to any other vertex in the graph.
    • Change state of the vertex from 'active' to 'hold'.
  • At the begining of each SuperStep, if there are no more active vertices -and- if there are no messages to be sent to any vertex, the algorithm terminates.
  • A User may additionally specify a 'MaxSteps' to stop the algorithm after a some number of super steps.
  • A User may additionally specify a 'Combine' funtion that is applied to the all the Messages targetted at a Vertex before the Compute function is applied to it.

Distributed Processing

  • The Computational model allows the algorithm to be parallelly performed by a cluster of phoebus nodes.
  • A 'Job' submitted to a Phoebus cluster is managed by a 'Master' process running on the node that receives the Job.
  • The Master partitions the input graph and spawns a 'Worker' for each partition on one of the nodes of the cluster.
  • The Master then askes the Worker to perform a Super step on its partition and awaits notification from the Worker of Step completion.
  • The Step number is incremented untill all Workers report that they have no more 'active' Vertices and no more outstanding messages to be deliverd.

Getting it to work (Tested on Mac OS X Snow Leopard)

Requirement:

  • rebar (Download from http://hg.basho.com/rebar/downloads/rebar)
  • git
  • erlang (tested on R14B)

1) Clone github..

$ git clone git://github.com/xslogic/phoebus.git $ cd phoebus

2) Compile and create release..

$ rebar compile ; rm -rf rel/phoebus ; rebar generate ; chmod +x ./rel/phoebus/bin/phoebus ; ./rel/phoebus/bin/phoebus ==> rel (compile) ==> phoebus (compile) .... ==> rel (generate) Usage: phoebus {start|stop|restart|reboot|ping|console|attach}

3) Create a sample output directory

$ mkdir /tmp/output

4) Start a two node Phoebus cluster...

Terminal 1: $ env A_FILE=$PWD/vm1.args ./rel/phoebus/bin/phoebus console ..... Erlang R13B04 (erts-5.7.5) [source] [64-bit] [smp:2:2] [rq:2] [async-threads:5] [hipe] [kernel-poll:true]

Eshell V5.7.5 (abort with ^G) (phoebusr1@my-machine)1>

Terminal 2: $ env A_FILE=$PWD/vm2.args ./rel/phoebus/bin/phoebus console ..... Erlang R13B04 (erts-5.7.5) [source] [64-bit] [smp:2:2] [rq:2] [async-threads:5] [hipe] [kernel-poll:true]

Eshell V5.7.5 (abort with ^G) (phoebusr2@my-machine)1>

5) Creating input data set: Currently, Phoebus requires each Input record be line delimited. It must be of the form " ... " The module "algos" that comes with Phoebus has a utility function that can generate Binary Tree as a sample input data set.

(phoebusr1@my-machine)1> algos:create_binary_tree("/tmp/input", 4, 1000). ok

The create_binary_tree function has created an input data set in the directory "/tmp/input". It has created a 1000 node binary tree with root as "1" It has split the input into 4 files.

$ head -5 infile1 1 1 1 2 1 3 2 2 1 4 1 5 3 3 1 6 1 7 4 4 1 8 1 9 5 5 1 10 1 11

6) Running a sample algo: The module "algos" has a sample Compute function that calculates shortest path to a Node

(phoebusr1@my-machine)1> AFun = fun algos:shortest_path/2.

Fun

(phoebusr1@my-machine)1> phoebus_master:start_link([{name, "first_ever"}, {max_steps, 100}, {algo_fun, AFun}, {input_dir, "file:///tmp/input/"}, {output_dir, "file:///tmp/output/"}]). ok

Since the input has 4 files, phoebous spawns 4 workers.. 2 on each node...

7) Wait for Algorithm to end.. Once it finishes.. output will be written to "/tmp/output". Listing all Vertices with names starting with "20"...

$ cat /tmp/output/* | grep '^20' 200 100:50:25:12:6:3:1 1 400 1 401 204 102:51:25:12:6:3:1 1 408 1 409 20 10:5:2:1 1 40 1 41
201 100:50:25:12:6:3:1 1 402 1 403 203 101:50:25:12:6:3:1 1 406 1 407

The Second column (The value of the Vertex) gives the shortest path to the vertex from the root of the binary tree

Next Steps

  • Currently supports reading/writing from/to local filesystem.. Need to extend it to read/write from a distributed filesystem like HDFS[3] or DDFS[4]
  • Need to fix Fault tolerence and Error Handling.. If Worker dies, master must ask another worker on another node to take up work
  • The Pregel paper talks of an 'Aggregate' Function... need to implement..
  • Support Jobs written in Python.

References

  1. (http://portal.acm.org/citation.cfm?id=1582716.1582723) Grzegorz Malewicz, Matthew H. Austern, Aart J. C. Bik, James C. Dehnert, Ilan Horn, Naty Leiser, and Grzegorz Czajkowski, Pregel: A System for Large-Scale Graph Processing
  2. MapReduce (http://en.wikipedia.org/wiki/MapReduce)
  3. Hadoop Distributed File system (http://hadoop.apache.org/hdfs/)
  4. Disco Distributed File System (http://discoproject.org/doc/start/ddfs.html)