Swineherd is a project mainly written in RUBY and R, based on the Apache-2.0 license.
Flexible data workflow glue.
h1. SwineHerd
Swineherd is for running scripts and workflows on filesystems.
h3. Outline
A @workflow@ is built with @script@ objects and ran on a @filesystem@.
h4. Script:
A script has the following
h4. Workflow:
A workflow is built using rake @task@ objects that doing nothing more than run scripts. A workflow
h4. FileSystem
Workflows are intended to run on filesystems. At the moment, implemented filesystems are
Using the filesystem:
Paths should be absolute.
# get a new instance of local filesystem and write to it
localfs = FileSystem.get(:file)
localfs.open("mylocalfile", 'w') do |f|
f.write("Writing a string to a local file")
end
get a new instance of hadoop filesystem and write to it
hadoopfs = FileSystem.get(:hdfs)
hadoopfs.open("myhadoopfile", 'w') do |f|
f.write("Writing a string to an hdfs file")
end
get a new instance of s3 filesystem and write to it
access_key_id = '1234abcd'
secret_access_key = 'foobar1234'
s3fs = FileSystem.get(:s3, accees_key_id, secret_access_key)
s3fs.mkpath 'mys3bucket' # bucket must exist
s3fs.open("mys3bucket/mys3file", 'w') do |f|
f.write("Writing a string to an s3 file")
end
h3. Working Example
For the most up to date working example see the examples directory. Here's a simple example for running pagerank:
#!/usr/bin/env ruby
$LOAD_PATH << '../../lib'
require 'swineherd' ; include Swineherd
require 'swineherd/script' ; include Swineherd::Script
require 'swineherd/filesystem'
Settings.define :flow_id, :required => true, :description => "Flow id required to make run of workflow unique"
Settings.define :iterations, :type => Integer, :default => 10, :description => "Number of pagerank iterations to run"
Settings.define :hadoop_home, :default => '/usr/local/share/hadoop', :description => "Path to hadoop config"
Settings.resolve!
flow = Workflow.new(Settings.flow_id) do
The filesystems we're going to be working with
hdfs = Swineherd::FileSystem.get(:hdfs)
localfs = Swineherd::FileSystem.get(:file)
The scripts we're going to use
initializer = PigScript.new('scripts/pagerank_initialize.pig')
iterator = PigScript.new('scripts/pagerank.pig')
finisher = WukongScript.new('scripts/cut_off_list.rb')
plotter = RScript.new('scripts/histogram.R')
Runs simple pig script to initialize pagerank. We must specify the input
here as this is the first step in the workflow. The output attribute is to
ensure idempotency and the options attribute is the hash that will be
converted into command-line args for the pig interpreter.
task :pagerank_initialize do
initializer.options = {:adjlist => "/tmp/pagerank_example/seinfeld_network.tsv", :initgrph => next_output(:pagerank_initialize)}
initializer.run(:hadoop) unless hdfs.exists? latest_output(:pagerank_initialize)
end
Runs multiple iterations of pagerank with another pig script and manages all
the intermediate outputs.
task :pagerank_iterate => [:pagerank_initialize] do
iterator.options[:damp] = '0.85f'
iterator.options[:curr_iter_file] = latest_output(:pagerank_initialize)
Settings.iterations.times do
iterator.options[:next_iter_file] = next_output(:pagerank_iterate)
iterator.run(:hadoop) unless hdfs.exists? latest_output(:pagerank_iterate)
iterator.refresh!
iterator.options[:curr_iter_file] = latest_output(:pagerank_iterate)
end
end
Here we use a wukong script to cut off the last field (a big pig bag of
links). Notice how every wukong script MUST have an input but pig scripts do
not.
task :cut_off_adjacency_list => [:pagerank_iterate] do
finisher.input << latest_output(:pagerank_iterate)
finisher.output << next_output(:cut_off_adjacency_list)
finisher.run :hadoop unless hdfs.exists? latest_output(:cut_off_adjacency_list)
end
We want to pull down one result file, merge the part-000.. files into one file
task :merge_results => [:cut_off_adjacency_list] do
merged_results = next_output(:merge_results)
hdfs.merge(latest_output(:cut_off_adjacency_list), merged_results) unless hdfs.exists? merged_results
end
Cat results into a local directory with the same structure
eg. #{work_dir}/#{flow_id}/pull_down_results-0.
FIXME: Bridging filesystems is cludgey.
task :pull_down_results => [:merge_results] do
local_results = next_output(:pull_down_results)
hdfs.copy_to_local(latest_output(:merge_results), local_results) unless localfs.exists? local_results
end
Plot 2nd column of the result as a histogram (requires R and
ggplot2). Note that the output here is a png file but doesn't have that
extension. Ensmarten me as to the right way to handle that?
task :plot_results => [:pull_down_results] do
plotter.attributes = {
:pagerank_data => latest_output(:pull_down_results),
:plot_file => next_output(:plot_results), # <-- this will be a png...
:raw_rank => "aes(x=d$V2)"
}
plotter.run(:local) unless localfs.exists? latest_output(:plot_results)
end
end
flow.workdir = "/tmp/pagerank_example"
flow.describe
flow.run(:plot_results)
h3. Utils
There's a fun little program to emphasize the ease of using the filesystem abstraction called 'hdp-tree':
$: bin/hdp-tree /tmp/my_hdfs_directory
---
/tmp/my_hdfs_directory:
- my_hdfs_directory:
- sub_dir_a: leaf_file_1
- sub_dir_a: leaf_file_2
- sub_dir_a: leaf_file_3
- my_hdfs_directory:
- sub_dir_b: leaf_file_1
- sub_dir_b: leaf_file_2
- sub_dir_b: leaf_file_3
- my_hdfs_directory:
- sub_dir_c: leaf_file_1
- sub_dir_c: leaf_file_2
- sub_dir_c: leaf_file_3
- sub_dir_c:
- sub_sub_dir_a: yet_another_leaf_file
- sub_dir_c: sub_sub_dir_b
- sub_dir_c: sub_sub_dir_c
I know, it's not as pretty as unix tree, but this IS github...
h3. TODO