Home > task_tempest

task_tempest

Task_tempest is a project mainly written in Ruby, based on the MIT license.

Framework for creating queue based, threaded asychronous job processors.

= task_tempest

Framework for building threaded asynchronous job processors.

== Description

+task_tempest+ basically lets you build glorified loops, reading messages from a queue and dispatching them to classes to handle. In short, it's just another background job processor for Ruby.

+task_tempest+ is based on the idea of a thread pool; each background job is executed on a thread. This can achieve high throughput, but your code will need to be threadsafe.

+task_tempest+ can also run in fibered mode in conjunction with EventMachine. This can achieve high(er?) throughput, but your code will need to be fiber aware.

+task_tempest+ is queue agnostic; you can use whatever queue you like, as long as it supports two operations: enqueue, dequeue.

+task_tempest+ is used in production at {Onespot}[http://www.onespot.com] and processes over 1 million jobs a day.

== Quickstart

Run the code below and both MathTempest.log and MathTempest.task.log will be created in your current working directory. To stop the tempest, send the process an Interrupt or SIGTERM.

require "task_tempest"

class MemoryQueue def initialize; @array = []; end def enqueue(message); @array.push(message); end def dequeue; @array.shift; end end

class MathTempest < TaskTempest::Engine configure do threads 2 queue MemoryQueue.new end end

class Adder extend TaskTempest::Task def process(a, b) c = a + b task_logger.info "#{a} + #{b} = #{c}" end end

10.times do a, b = rand(10), rand(10) MathTempest.submit(Adder, a, b) end

MathTempest.run

== Queue

The queue object given in the TaskTempest::Engine's +configure+ block must define 2 methods: +enqueue+ and +dequeue+.

+task_tempest+ doesn't care what kind of queue you use or what you actually store in it, as long as +dequeue+ returns a message as described here. You are responsible for any serialization and deserialization that may be required.

A message is an array with the following format:

[task_id, task_class_name, arg1, arg2, ...]

An example message is:

[nil, "Adder", 2, 5]

The queue's +enqueue+ method will receive a message as its first argument, plus any additional arguments given to +submit+.

The queue's +dequeue+ method must return a message.

If the +task_id+ of a message is +nil+, then +task_tempest+ will automatically assign an id.

== Tasks

Messages are dispatched to a task class to handle. A task class must extend TaskTempest::Task and define a +process+ method. The +process+ method will be called with the arguments from the message being dispatched.

class Averager extend TaskTempest::Task configure_task do ... end def process(*args) avg = args.inject(0){ |memo, n| memo += n; memo } / args.length task_logger.info "The average of #{args.inspect} is #{avg}" end end

MathTempest.submit(Averager, 2, 4)

Will produce a message like ["9ac4f", "Averager", 2, 4]

And eventually Averager.process(2, 4) will be called.

MathTempest.submit(Averager, 2, 4, 6)

Will produce a message like ["b733c", "Averager", 2, 4, 6]

And eventually Averager.process(2, 4, 6) will be called.

See TaskTempest::Task::Configuration for what options can be set in a task's +configure+ block.

== Logging

+task_tempest+ logs to a main log and a task log.

The main log shows a high level view of all the tasks that are run (when they started, when the finished, if they failed, if they timed out, etc).

The task log shows detailed information about each task. Each line in the task log is prefixed with a task id. This is so that if you see a task failed in the main log, you can grep for its id in the task log.

Any logging done in a task using +task_logger+ will be written to the task log.

== Callbacks

Each task class can define callbacks for +success+, +failure+, and +timeout+ of a task.

class Averager extend TaskTempest::Task configure_task do timeout 1.5 after_failure proc { |exception| HoptoadNotifier.notify(exception) } after_timeout proc { task_logger.warn "Crap, I took longer than 1.5 seconds" } end ... end

See TaskTempest::Task::Configuration for the names of each callback and arguments yielded.

== EventMachine + Fibers

The +fibers+ option puts +task_tempest+ into fibered mode and also defines the size of the fiber pool.

class FiberedTempest << TaskTempest::Engine configure do fibers 5 ... end ... end

Now when +FiberedTempest.run+ is called, The EventMachine reactor will be started and each task will be dispatched on a fiber.

If you run in fibered mode, you need to install +fiber_storm+.

== Daemonizing

There is no code in +task_tempest+ to run as a daemon, that is left to you. It's easy with the {Daemons}[http://rubygems.org/gems/daemons] gem though.

Assuming your tempest is defined in my_tempest.rb, just put the following code at the bottom of the file.

if $0 == FILE require "daemons" Daemons.run_proc(MyTempest.conf.name, :log_output => true) do MyTempest.run end end

Now you can run it as a daemon from the command line.

ruby my_tempest.rb start ruby my_tempest.rb stop ruby my_tempest.rb run # Run in foreground

See the {Daemons rdoc}[http://daemons.rubyforge.org/] for more info.

== Rails

When +task_tempest+ is used as an asynchronous task processor for a Rails app, you probably want to load the Rails environment so you can have access to models, configuration, etc.

You can load the Rails environment however you like, but if you choose to load it in one of +task_tempest+'s initialization callbacks, then there are some caveats to be aware of.

class MathTempest < TaskTempest::Engine configure do root{ Rails.root } after_initialize proc { require "config/environment" # Load Rails. } end end

That won't work and will result in an exception saying +Rails+ isn't defined, because the +root+ configuration option is used during the initialization process before the +after_initialized+ callback is called. To fix the error, change +after_initialized+ to +before_initialize+.

== Complete examples

See the +examples+ directory.

== Copyright

Copyright (c) 2010 Christopher J. Bottaro. See LICENSE for details.