Consuming Twitter Streaming API

Requirements

Using the Twitter Streaming API create a website that lists Tweets with geo location specified by the user.

-Create a rake task that consumes the Twitter Streaming API
-Inserts Tweets into a Capped MongoDB tweets collection (http://www.mongodb.org/display/DOCS/Capped+Collections) that is limited to 100000 entries.
-Create Geo spatial indexes to ensure fast queries
-Create a page where the user should be able to enter Long and Lat coordinates to see the latest 50 Tweets near that location.
-Use Fibers and EventMachine where applicable.

Requirements:
-Mongoid
-EventMachine
-Fibers
-Unit tests using RSpec

Bonus:
-Use Google Maps to visualize location of Tweets

Instructions for the Solution

In the constants.rb, replace the user name and password for a valid Twitter account. Mongodb and the db name should also be provided here. So replace localhost and bonjo to whatever db you want the twitter streamer to save the tweets. From the bonjo directory run : ruby twitter_stream.rb, this will continuously consume the Twitter stream
for SF.

The Rails project is under stream directory called streamer. Run bundle install. Run the server and go to localhost:3000. Enter -122 for the longitude and 30 for latitude. Click search and you will see the latest 50 Tweets for that location.

You can create geospatial index and capped collection in the mongo console. Use your db and create it like this:

db.createCollection("tweets", {capped:true, size:100000})
db.tweets.ensureIndex( { location : "2d" } )
db.tweets.find({location : {$near : [-120, 30]}})

The tweets controller:

class TweetsController < ApplicationController
  def index
    longitude = params[:longitude].to_i
    latitude = params[:latitude].to_i
    @messages = Tweet.near(longitude, latitude)
  end
end

The tweet model uses Mongodb.

class Tweet
  include MongoMapper::Document

  key :tweet, String
  key :location, Array

  ensure_index [[:location, '2d']]  

  def self.near(longitude, latitude)
    where(:location => {'$near' => [longitude, latitude]}).limit(50).all
  end
end

Given the latitude and longitude, we can find all the tweets near that location. The home index page looks like this:

<% form_tag tweets_index_path, :method => "get" do %>
<p>
    Longitude:
    <%= text_field_tag :longitude, params[:longitude] %><br/>
    Latitude:
    <%= text_field_tag :latitude, params[:latitude] %><br/>
    <%= submit_tag "Search"  %><br/>
</p>    
<% end %>

The tweets index page is as shown below.

<% if @messages.empty? %>

  <h1>No Results Found</h1>

<% else %>
    <h1>Latest Tweets</h1>

    <table>
      <tr>
        <th></th>
      </tr>

    <% @messages.each do |message| %>
      <tr>
        <td><%= message.tweet %></td>
      </tr>
    <% end %>
    </table>

<% end %>

<%= link_to "Home", root_path %>

The config/mongo.yml is as shown below:

defaults: &defaults
  host: 127.0.0.1
  port: 27017

development:
  <<: *defaults
  database: bonjo

test:
  <<: *defaults
  database: streamer_test

# set these environment variables on your prod server
production:
  <<: *defaults
  database: streamer
  username: <%= ENV['MONGO_USERNAME'] %>
  password: <%= ENV['MONGO_PASSWORD'] %>

The routes.rb is as shown below.

  get "tweets/index"
  get "home/show"

  root :to => "home#index"

The request spec is as follows:

require "spec_helper"

describe "Twitter stream " do

  it "should display the stream for a given location" do
    visit "/"

    fill_in "longitude", :with => "-121"
    fill_in "latitude",  :with => "30"

    click_button "Search" 

    page.should have_content("Latest Tweets")    
  end
end

The constants.rb:

module Constants
  USERNAME = 'matz'
  PASSWORD = 'topsycret'
  LONGITUDE = -122.75
  LATITUDE = 36.8
  LOCATION = "#{LONGITUDE},#{LATITUDE},#{LONGITUDE+1},#{LATITUDE+1}"
  SSL_PORT = 443
  TWITTER_URL = "https://stream.twitter.com/1/statuses/filter.json"
  DB_HOST = 'localhost'
  DB_NAME = 'bonjo'
end

The parser.rb:

class Parser
  def self.parse_tweet(buffer)
    while line = buffer.slice!(/.+\r?\n/)
      if line.length > 5
        tweet = JSON.parse(line)

        return tweet['text']
      end
    end
  end
end

The parser_spec.rb:

require 'json'
require_relative 'parser'

describe Parser do
  it "should parse the tweet from the chunk" do
    chunk = "{\"text\":\"Half a chicken for lunch. Think the chicken must have been Jurassic given the size #onlyinamerica http:\\/\\/t.co\\/HbN7ZoHl\",\"in_reply_to_status_id_str\":null,\"entities\":{\"media\":[{\"type\":\"photo\",\"indices\":[98,118],\"media_url_https\":\"https:\\/\\/p.twimg.com\\/AnLtApOCAAI-UXo.jpg\",\"sizes\":{\"small\":{\"resize\":\"fit\",\"h\":453,\"w\":340},\"thumb\":{\"resize\":\"crop\",\"h\":150,\"w\":150},\"large\":{\"resize\":\"fit\",\"h\":1024,\"w\":768},\"medium\":{\"resize\":\"fit\",\"h\":800,\"w\":600}},\"display_url\":\"pic.twitter.com\\/HbN7ZoHl\",\"url\":\"http:\\/\\/t.co\\/HbN7ZoHl\",\"media_url\":\"http:\\/\\/p.twimg.com\\/AnLtApOCAAI-UXo.jpg\",\"id\":176463930741358594,\"id_str\":\"176463930741358594\",\"expanded_url\":\"http:\\/\\/twitter.com\\/MervDM\\/status\\/176463930732969987\\/photo\\/1\"}],\"user_mentions\":[],\"urls\":[],\"hashtags\":[{\"text\":\"onlyinamerica\",\"indices\":[83,97]}]},\"coordinates\":{\"type\":\"Point\",\"coordinates\":[-118.35365916,34.13639291]},\"in_reply_to_status_id\":null,\"place\":{\"bounding_box\":{\"type\":\"Polygon\",\"coordinates\":[[[-124.482003,32.528832],[-114.131211,32.528832],[-114.131211,42.009517],[-124.482003,42.009517]]]},\"country\":\"United States\",\"url\":\"http:\\/\\/api.twitter.com\\/1\\/geo\\/id\\/fbd6d2f5a4e4a15e.json\",\"country_code\":\"US\",\"attributes\":{},\"full_name\":\"California, US\",\"name\":\"California\",\"id\":\"fbd6d2f5a4e4a15e\",\"place_type\":\"admin\"},\"in_reply_to_user_id_str\":null,\"favorited\":false,\"truncated\":false,\"geo\":{\"type\":\"Point\",\"coordinates\":[34.13639291,-118.35365916]},\"in_reply_to_screen_name\":null,\"source\":\"\\u003Ca href=\\\"http:\\/\\/twitter.com\\/#!\\/download\\/iphone\\\" rel=\\\"nofollow\\\"\\u003ETwitter for iPhone\\u003C\\/a\\u003E\",\"created_at\":\"Mon Mar 05 00:27:35 +0000 2012\",\"possibly_sensitive\":false,\"contributors\":null,\"in_reply_to_user_id\":null,\"possibly_sensitive_editable\":true,\"id_str\":\"176463930732969987\",\"user\":{\"listed_count\":5,\"geo_enabled\":true,\"is_translator\":false,\"lang\":\"en\",\"profile_sidebar_border_color\":\"181A1E\",\"default_profile\":false,\"follow_request_sent\":null,\"profile_use_background_image\":true,\"description\":\"Lives and works in Central London; interested in many things\",\"following\":null,\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/images\\/themes\\/theme9\\/bg.gif\",\"profile_text_color\":\"666666\",\"contributors_enabled\":false,\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\/images\\/themes\\/theme9\\/bg.gif\",\"created_at\":\"Mon Feb 09 18:49:34 +0000 2009\",\"profile_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_images\\/1445760103\\/image_normal.jpg\",\"default_profile_image\":false,\"verified\":false,\"profile_link_color\":\"2FC2EF\",\"url\":null,\"favourites_count\":66,\"protected\":false,\"screen_name\":\"MervDM\",\"show_all_inline_media\":false,\"statuses_count\":19090,\"friends_count\":242,\"profile_background_color\":\"1A1B1F\",\"followers_count\":392,\"profile_image_url\":\"http:\\/\\/a0.twimg.com\\/profile_images\\/1445760103\\/image_normal.jpg\",\"name\":\"Merv Metcalf\",\"id_str\":\"20456048\",\"notifications\":null,\"profile_background_tile\":false,\"location\":\"iPhone: 51.514614,-0.133169\",\"id\":20456048,\"time_zone\":\"London\",\"utc_offset\":0,\"profile_sidebar_fill_color\":\"252429\"},\"retweeted\":false,\"id\":176463930732969987,\"retweet_count\":0}\r\n"
    expected_tweet = "Half a chicken for lunch. Think the chicken must have been Jurassic given the size #onlyinamerica http://t.co/HbN7ZoHl"

    actual_tweet = Parser.parse_tweet(chunk)
    actual_tweet.should == expected_tweet
  end
end

The peristable.rb:

class Persistable
  def self.save(tweetdb, tweet, longitude, latitude)
    unless tweet == nil
      tweetdb.insert({'tweet' => tweet, 'location' => [longitude, latitude]})
    end
  end
end

The persistable_spec.rb:

require_relative 'persistable'

describe Persistable do
  it "should not insert tweets that is blank" do
    tweetdb = mock("tweetdb")
    tweetdb.should_not_receive(:insert)

    Persistable.save(tweetdb, nil, "does-not-matter", "does-not-matter")
  end

  it "should insert tweets that is not null" do
    tweetdb = mock("tweetdb")
    tweetdb.should_receive(:insert)

    Persistable.save(tweetdb, "Wonderful", "does-not-matter", "does-not-matter")    
  end

  it "should save the tweet, longitude and latitude" do
    tweetdb = mock("tweetdb")
    tweetdb.should_receive(:insert).with({'tweet' => "Wonderful", 'location' => [120, 100]})

    Persistable.save(tweetdb, "Wonderful", 120, 100)        
  end
end

The twitter_stream.rb:

require "em-mongo"
require 'eventmachine'
require 'em-http'
require 'json'

require_relative 'persistable'
require_relative 'parser'
require_relative 'constants'

include Constants

EM.run do  
  db = EM::Mongo::Connection.new(DB_HOST).db(DB_NAME)
  tweetdb = db.collection('tweets')

  http = EM::HttpRequest.new(TWITTER_URL, { :port => SSL_PORT}).post(
                    :head => { 'Authorization' => [ USERNAME, PASSWORD ] } , 
                    :body => {"locations" => LOCATION},
                    :keepalive => true,
                    :timeout => -1)

  buffer = ""
  http.stream do |chunk|
    buffer += chunk
    tweet = Parser.parse_tweet(buffer)
    Persistable.save(tweetdb, tweet, LONGITUDE, LATITUDE)
  end

  http.errback do
     p Time.new.to_s+"Error: #{http.error}"
     EM.stop_event_loop
  end
end  

You can stub out the rake db:test:prepare task as follows:

namespace :db do
  namespace :test do
    task :prepare do
      # Stub out for MongoDB
    end
  end
end

In this article, you learned how to consume Twitter Streaming API and store them in Mongodb. We also used geo spatial search using Mongodb.


Related Articles