Real-time Twitter Sentiment Analysis
Real-time Sentiment Analysis of Twitter Data¶
In a previous post, I demonstrated how one can go about building a text classification model for sentiment analysis. In this post, I will be applying the model previously developed to extract real-time sentiments of twitter data based on a search term.
Motivation¶
Social media has become a valuable tool for anyone hoping to understand public sentiment on any current topic. Social media platforms such as Twitter, Facebook, and Instagram can provide a unique and unfiltered glimpse to what a target demographic has to say about a topic or product. Whether it is a campaign manager, social activist, or company brand manager, being able to monitor public sentiment in real-time means that one can quickly react to events. In this post, I will be applying a text classification model to twitter data using the streaming API to obtain real-time sentiment on a topic
import csv
import json
import numpy as np
import pandas as pd
import re #for regex
import pickle
import dill
import nltk
from nltk.tokenize import sent_tokenize, word_tokenize
from nltk.corpus import stopwords
from nltk.stem.wordnet import WordNetLemmatizer
from nltk.stem import PorterStemmer
from nltk.util import ngrams
import tweepy #https://github.com/tweepy/tweepy
from tweepy import Stream
from tweepy import OAuthHandler
from tweepy.streaming import StreamListener
import MySQLdb
import time
import datetime
import processtweet
import credentials as credentials
class TweetStreamListener(StreamListener):
"""
This class handles the tweets received from the stream
"""
def __init__(self, num_tweets, conn, api=None):
self.num_tweets = num_tweets
self.textProcessing = processtweet.text_processing()
self.conn = conn
self.cursor = self.conn.cursor()
#Check if the tables we need already exist in the MySQL db
#and if they do drop them and recreate the tables
self.cursor.execute("DROP TABLE IF EXISTS TwitterData CASCADE")
self.cursor.execute("DROP TABLE IF EXISTS TwitterTrend CASCADE")
self.cursor.execute("DROP TABLE IF EXISTS TwitterCurrent CASCADE")
self.cursor.execute("CREATE TABLE TwitterData (id SERIAL PRIMARY KEY, Sentiment INT(1),"
"DateTime VARCHAR(20), Tokens TEXT, Tweet VARCHAR(200))")
self.cursor.execute("CREATE TABLE TwitterTrend (id SERIAL PRIMARY KEY,"
"Pos_Sentiment VARCHAR(400), Neg_Sentiment VARCHAR(400), DateTime VARCHAR(40))")
self.cursor.execute("CREATE TABLE TwitterCurrent (id SERIAL PRIMARY KEY, Pos VARCHAR(40), Neg VARCHAR(40),"
"DateTime VARCHAR(20))")
#Initiate variables, including the start time
self.start_time = datetime.datetime.utcnow()
self.start_time_long = self.start_time
self.pos_count = 0
self.neg_count = 0
self.count = 0
super(TweetStreamListener, self).__init__()
def on_status(self, status):
try:
tweet_aslist = []
tweet = status.text.encode("utf-8")
#Exclude retweets since they will skew results
if hasattr(tweet, 'retweeted_status') or 'RT @' in tweet:
return
else:
#Note: Tweepy returns a time object set to UTC time
#see: https://timbueno.com/
time_stamp = status.created_at
#Convert tweet to a list of a string
tweet_aslist.append(tweet)
#Obtain the sentiment on each tweet
sentiment = self.textProcessing.tweetClassifier(tweet_aslist)
#Tokenize the tweet
token = self.textProcessing.getTokens(tweet)
current_time = datetime.datetime.utcnow()
#Push sentiments to array
if sentiment == 4:
self.pos_count = self.pos_count + 1
else:
self.neg_count = self.neg_count + 1
#Write tweet sentiment count and timestamp to TwitterCurrent table every 1 min
#The goal here is to capture the time-dependent twitter activity on a topic
if current_time - self.start_time_long > datetime.timedelta(seconds=60):
self.start_time_long = current_time
self.cursor.execute("INSERT INTO TwitterCurrent (Pos, Neg, DateTime) "
"VALUES (%s, %s, %s)", (self.pos_count, self.neg_count, current_time,))
self.pos_count = 0
self.neg_count = 0
#Get tweets every 1 second and make sure there is at least 4 tweets
if current_time - self.start_time > datetime.timedelta(seconds=1) and self.count > 4:
self.start_time = current_time
#Get a count of the positive and negative sentiments from TwitterData table
self.cursor.execute("SELECT COUNT(Sentiment) FROM TwitterData "
"WHERE Sentiment = 4")
count_pos = int(self.cursor.fetchone()[0]) * 1
self.cursor.execute("SELECT COUNT(Sentiment) FROM TwitterData "
"WHERE Sentiment = 0")
count_neg = int(self.cursor.fetchone()[0]) * -1
#Write to the TwitterTrend table
self.cursor.execute("INSERT INTO TwitterTrend (Pos_Sentiment, Neg_Sentiment, DateTime) "
"VALUES (%s, %s, %s)", (count_pos, count_neg, current_time,))
print self.start_time, count_pos, count_neg
#If count is less than number of tweets, write data to db
if self.count < self.num_tweets:
self.count += 1
self.cursor.execute("INSERT INTO TwitterData (Sentiment, "
"DateTime, Tokens, Tweet) VALUES (%s, %s, %s, %s)",
(sentiment, time_stamp, str(token), tweet,))
self.conn.commit()
return True
else:
#Otherwise, delete oldest entry in db and enter new one
self.cursor.execute("DELETE FROM TwitterData "
"WHERE id IN "
"(SELECT id FROM "
"(SELECT id, Tweet FROM TwitterData ORDER BY id ASC LIMIT 1) AS t)")
self.conn.commit()
self.cursor.execute("INSERT INTO TwitterData (Sentiment, "
"DateTime, Tokens, Tweet) VALUES (%s, %s, %s, %s)",
(sentiment, time_stamp, str(token), tweet))
self.conn.commit()
return True
return True
except:
e = sys.exc_info()[0]
write_to_page( "<p>Error: %s</p>" % e )
def on_error(self, status):
print status
if status_code == 420:
return False
class connect_API():
def __init__(self, num_tweets):
"""
Obtain twitter API authorization
"""
consumer_key = credentials.login['consumer_key']
consumer_secret = credentials.login['consumer_secret']
access_key = credentials.login['access_key']
access_secret = credentials.login['access_secret']
self.auth = OAuthHandler(consumer_key, consumer_secret)
self.auth.set_access_token(access_key, access_secret)
self.num_tweets = num_tweets
def stream_data(self, search_item, conn):
"""
Get twitter streaming data
"""
twitterStream = tweepy.Stream(self.auth,
listener=TweetStreamListener(num_tweets =
self.num_tweets, conn=conn))
twitterStream.filter(track=[search_item])
def main():
#db connection credentials
host = credentials.database['host']
user = credentials.database['user']
passwd = credentials.database['passwd']
unix_socket = credentials.database['unix_socket']
db = credentials.database['db']
#num_tweets are the number of tweets
#to be written to db while search_item
#is the search item to send to the
#streaming API
num_tweets = 2000
search_item = "obama"
try:
conn = MySQLdb.connect(host = host, user = user, passwd = passwd, unix_socket = unix_socket, db = db)
run_streaming = connect_API(num_tweets)
run_streaming.stream_data(search_item, conn)
cursor = conn.cursor()
except (KeyboardInterrupt, SystemExit):
raise
except Exception as e:
print(e.__doc__)
finally:
conn.close()
if __name__ == "__main__":
main()