Skip to content

Ruby on Hadoop: Efficient, effective Hadoop streaming & bulk data processing. Write micro scripts for terabyte-scale data

License

Notifications You must be signed in to change notification settings

mrflip/wukong

 
 

Repository files navigation

Wukong

Wukong is Ruby for Hadoop — it makes Hadoop so easy a chimpanzee can use it.

Treat your dataset like a

  • stream of lines when it’s efficient to process by lines
  • stream of field arrays when it’s efficient to deal directly with fields
  • stream of lightweight objects when it’s efficient to deal with objects

RDocs for wukong available at rdoc.info

Wukong is friends with Hadoop the elephant, Pig the query language, and the cat on your command line.

The main documentation lives on the Wukong Pages. Please feel free to add supplemental information to the wukong wiki.

Help!

Send Wukong questions to the Infinite Monkeywrench mailing list

Install

Get the code

We’re still actively developing wukong. The newest version is available via Git on github:

$ git clone git://github.com/mrflip/wukong

A gem is available from gemcutter:

$ sudo gem install wukong --source=http://gemcutter.org

(don’t use the gems.github.com version — it’s way out of date.)

You can instead download this project in either zip or tar formats.

Dependencies and setup

To finish setting up, see the detailed setup instructions and then read the usage notes

How to write a Wukong script

Here’s a script to count words in a text stream:

    require 'wukong'
    module WordCount
      class Mapper < Wukong::Streamer::LineStreamer
        # Emit each word in the line.
        def process line
          words = line.strip.split(/\W+/).reject(&:blank?)
          words.each{|word| yield [word, 1] }
        end
      end
      
      class Reducer < Wukong::Streamer::ListReducer
        def finalize
          yield [ key, values.map(&:last).map(&:to_i).sum ]
        end
      end
    end
    
    Wukong::Script.new(
      WordCount::Mapper,
      WordCount::Reducer
      ).run # Execute the script

The first class, the Mapper, eats lines and craps [word, count] records: word is the /key/, its count is the /value/.

In the reducer, the values for each key are stacked up into a list; then the record(s) yielded by #finalize are emitted. There are many other ways to write the reducer (most of them are better) — see the examples

Structured data stream

You can also use structs to treat your dataset as a stream of objects:

    require 'wukong'
    require 'my_blog' #defines the blog models
    # structs for our input objects
    Tweet = Struct.new( :id, :created_at, :twitter_user_id,
      :in_reply_to_user_id, :in_reply_to_status_id, :text )
    TwitterUser  = Struct.new( :id, :username, :fullname,
      :homepage, :location, :description )
    module TwitBlog
      class Mapper < Wukong::Streamer::RecordStreamer
        # Watch for tweets by me
        MY_USER_ID = 24601
        #
        # If this is a tweet is by me, convert it to a Post.
        #
        # If it is a tweet not by me, convert it to a Comment that
        # will be paired with the correct Post.
        #
        # If it is a TwitterUser, convert it to a User record and
        # a user_location record
        #
        def process record
          case record
          when TwitterUser
            user     = MyBlog::User.new.merge(record) # grab the fields in common
            user_loc = MyBlog::UserLoc.new(record.id, record.location, nil, nil)
            yield user
            yield user_loc
          when Tweet
            if record.twitter_user_id == MY_USER_ID
              post = MyBlog::Post.new.merge record
              post.link = "http://twitter.com/statuses/show/#{record.id}"
              post.body = record.text
              post.title = record.text[0..65] + "..."
              yield post
            else
              comment = MyBlog::Comment.new.merge record
              comment.body    = record.text
              comment.post_id = record.in_reply_to_status_id
              yield comment
            end
          end
        end
      end
    end
    Wukong::Script.new( TwitBlog::Mapper, nil ).run # identity reducer

Advanced Patterns

Wukong has a good collection of map/reduce patterns. Here’s an AccumulatingReducer that takes a long list of key-value pairs and emits, for each key, all its corresponding values in one line.

    #
    # Roll up all values for each key into a single line
    #
    class GroupByReducer < Wukong::Streamer::AccumulatingReducer
      attr_accessor :values

      # Start with an empty list
      def start! *args
        self.values = []
      end

      # Aggregate each value in turn 
      def accumulate key, value
        self.values << value
      end

      # Emit the key and all values, tab-separated
      def finalize
        yield [key, values].flatten
      end
    end

So given adjacency pairs for the following directed friend graph:


    @jerry      @elaine
    @elaine     @jerry
    @jerry      @kramer
    @kramer     @jerry
    @kramer     @bobsacamato
    @kramer     @newman
    @jerry      @superman
    @newman     @kramer
    @newman     @elaine
    @newman     @jerry

You’d end up with


    @elaine     @jerry
    @jerry      @elaine      @kramer     @superman
    @kramer     @bobsacamato @jerry      @newman
    @newman     @elaine      @jerry      @kramer   

Gotchas

RecordStreamer dies on blank lines with “wrong number of arguments”

If your lines don’t always have a full complement of fields, and you define #process() to take fixed named arguments, then ruby will complain when some of them don’t show up:

  class MyUnhappyMapper < Wukong::Streamer::RecordStreamer
    # this will fail if the line has more or fewer than 3 fields:
    def process x, y, z
      p [x, y, z]
    end
  end

The cleanest way I know to fix this is with recordize, which you should recall always returns an array of fields:

  class MyHappyMapper < Wukong::Streamer::RecordStreamer
    # extracts three fields always; any missing fields are nil, any extra fields discarded
    # @example
    #   recordize("a")            # ["a", nil, nil]
    #   recordize("a\t\b\tc")     # ["a", "b", "c"]
    #   recordize("a\t\b\tc\td")  # ["a", "b", "c"]
    def recordize raw_record
      x, y, z = super(raw_record)
      [x, y, z]
    end
    
    # Now all lines produce exactly three args
    def process x, y, z
      p [x, y, z]
    end
  end

If you want to preserve any extra fields, use the extra argument to #split():

  class MyMoreThanHappyMapper < Wukong::Streamer::RecordStreamer
    # extracts three fields always; any missing fields are nil, the final field will contain a tab-separated string of all trailing fields
    # @example
    #   recordize("a")            # ["a", nil, nil]
    #   recordize("a\t\b\tc")     # ["a", "b", "c"]
    #   recordize("a\t\b\tc\td")  # ["a", "b", "c\td"]
    def recordize raw_record
      x, y, z = split(raw_record, "\t", 3)
      [x, y, z]
    end
    
    # Now all lines produce exactly three args
    def process x, y, z
      p [x, y, z]
    end
  end

Why is it called Wukong?

Hadoop, as you may know, is named after a stuffed elephant. Since Wukong was started by the infochimps team, we needed a simian analog. A Monkey King who journeyed to the land of the Elephant seems to fit the bill:

Sun Wukong (孙悟空), known in the West as the Monkey King, is the main character in the classical Chinese epic novel Journey to the West. In the novel, he accompanies the monk Xuanzang on the journey to retrieve Buddhist sutras from India.

Sun Wukong possesses incredible strength, being able to lift his 13,500 jīn (8,100 kg) Ruyi Jingu Bang with ease. He also has superb speed, traveling 108,000 li (54,000 kilometers) in one somersault. Sun knows 72 transformations, which allows him to transform into various animals and objects; he is, however, shown with slight problems transforming into other people, since he is unable to complete the transformation of his tail. He is a skilled fighter, capable of holding his own against the best generals of heaven. Each of his hairs possesses magical properties, and is capable of transforming into a clone of the Monkey King himself, or various weapons, animals, and other objects. He also knows various spells in order to command wind, part water, conjure protective circles against demons, freeze humans, demons, and gods alike. — Sun Wukong’s Wikipedia entry

The Jaime Hewlett / Damon Albarn short that the BBC made for their 2008 Olympics coverage gives the general idea.

More info

There are many useful examples in the examples/ directory.

Credits

Monkeyshines was written by Philip (flip) Kromer ([email protected] / @mrflip) for the infochimps project

Patches submitted by:

Thanks to:

Help!

Send monkeyshines questions to the Infinite Monkeywrench mailing list, to [email protected] or call 855-DATA-FUN

Also, you invited to talk with author Philip (flip) Kromer in a private consultation about your big data project.

About

Ruby on Hadoop: Efficient, effective Hadoop streaming & bulk data processing. Write micro scripts for terabyte-scale data

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Ruby 89.0%
  • Shell 5.2%
  • JavaScript 5.0%
  • Perl 0.8%