Analyze Social Media Data in Real Time

Posted on

Co-authored by Saeed Aghabozorgi and Polong Lin.

 

This is a brief tutorial on using Spark Streaming to analyze social media data in real time.

Do you want to know what people are tweeting about in different parts of world, continents or your country? Do you want to understand trend or opinion of people in a region? Do you need to detect events? Data science can help you to gain this information real-time.

This is a brief guide on using Spark Streaming to analyze social data. I have created a streaming program that is constantly running, fetching Twitter data in real-time, and also clustering the tweets based on their text and location, using the k-means algorithm. My goal is to show you how state-of-the-art approaches like Spark now enables us to quickly and easily write such a program with minimal lines of code. You will practice a wide range of Spark commands here to implement this application.

twitter stream
This is the flow of data in our Spark Streaming program: Twitter data gets fed into Spark Streaming, which clusters them together, and the tweets are plotted on a map in real-time.

 

First, we need to read Twitter data. However, unlike traditional methods, in the tutorial we do not actually save the Tweets on our disk or into a database. Instead, we clean, analyze and visualize it all in real-time. So in this case, we won’t have to face scalability issues – we can read data for hours and continue to visualize it on a map.

This guide has two parts:

  • Scraper: Reading data from Twitter
  • Analyzer: Cleaning, Analyzing and Visualizing data

In the Scraper, we read data from Twitter and write it into a socket. Why? Because later, in the second part of this tutorial – Analyzer – we will read from the socket in a streaming manner, and analyze the data on-the-fly. So, the basic task of the Scraper is to be a mediator between Twitter (the original data) streaming and the Analyzer.

To write this code in Python, we have used an convenient platform, Data Scientist Workbench (DSWB), that hosts a lot of open-source tools used in data science, such as Jupyter notebooks, RStudio and Zeppelin. It also has Apache Spark pre-installed for each of those tools. Everything is set up for us already, so all we have to do is write your code. You can take a look at this 3-minute video to learn more about it.   You can upload notebooks using one of these two options:

1) From the top menu, go to “Build Analytics” then “IPython/Jupyter Notebooks”. Now in the searchbar in the top-right corner, copy and paste the URL below and press Enter.

Scraper.ipynb:

Analyzer.ipynb:

2) Alternatively, you can download the IPython notebooks here:

Scraper

Analyzer

and drag and drop each file into the sidebar while DSWB is displayed.

It shows how to upload a file into Data Scientist Workbench
It shows how to upload a file into Data Scientist Workbench

 

Lets look at Scraper first. It is Python code that acts as a pipe between the Twitter and Analyzer file.

Scraper runs three tasks:

  1. Opening a socket, listening to and accepting connection requests from the Analyzer
  2. Reading stream tweets from the Twitter streaming API
  3. Writing tweets to socket in JSON format

We create a server socket to write the tweets into so that the Analyzer can later read streaming data from this socket. To this end, we should use an arbitrary non-privileged port, bind a socket to local host and port, and start listening. Function socket.socket creates a socket and returns a socket that is used. We will use port 9999 for this socket:

s = socket.socket('localhost', 9999)
s.bind(('localhost', 9999))

The following block will read data from Twitter and write it to the socket. To read tweets, we should set variables that contain the user credentials to access Twitter API. It includes access token and password, and also consumer key and password. You can find more about it in Connecting to a streaming endpoint. Then, we establish a connection to the streaming API.

auth = requests_oauthlib.OAuth1(consumer_key, consumer_secret,access_token, access_token_secret)

We pull a selection of tweets from the Twitter Streaming API, also known as the fire hose. We will be using a very “long-lived” HTTP request, and parsing the response incrementally. Conceptually, you can think of it as downloading an infinitely long file over HTTP. The stream output is determined by the contents of the following streaming endpoint:

url='https://stream.twitter.com/1.1/statuses/filter.json'

Also, we can define some parameters to restrict the results, for example to English tweets, and confined to a geographic boundary, for instance, the United States, and also filter it based on some keywords.

data = [('language', 'en'), ('locations', '-130,-20,100,50') ,('track','ibm’, ‘Cloud’, ‘BigData’)]
query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in data])
response = requests.get(query_url, auth=auth, stream=True)

Then, we will iterate over streaming tweets, and use function send to simply send/write data to opened socket.

In the second part, Analyzer, we use Spark’s streaming API to read data from socket, to pre-process the tweets, to cluster tweets when data arrive in a stream, and push the result out for visualization.

First, we initialize the parameters. Batch interval is simply the interval at which the streaming API will update, window length is the duration of the window, and sliding interval is the interval at which the window operation is performed. Also, I have determined the number of clusters for each update, using ClustNum, to be equal to 15, though this is an arbitrary number representing how many clusters we want.

BATCH_INTERVAL = 10 # How frequently to update (seconds)
WINDOWS_LENGTH=60 #the duration of the window
SLIDING_INTERVAL=20 #the interval at which the window operation is performed
clusterNum=15 #Number of Clusters

As with any other Spark program, we first created a SparkContext object, and since I’m set up on my laptop (with 4 workers), I designated the program to use the local Spark instance. In this program we use k-means algorithm to cluster tweets. K-means clustering groups data based on their similarity using an unsupervised approach.

But K-means, can work only with numerical vector data. Therefore, we have to convert the tweets to some features or vectors. There are different approaches for converting text to vectors, such as TF-IDF, Word2Vec and CountVectorizer. Spark provides a nice Word2Vec function to covert words to vectors. Word2Vec computes distributed vector representation of words. The main advantage of the distributed representations is that similar words are close in the vector space. Word2Vec uses the corpus of documents to calculate the build vector. Considering that we will use the streaming data, we cannot make a model for each batch, because first, it is costly in terms of computation, and second, the size of batch is not that big. Therefore, we use a pre-trained model (built from text8 in this example) for vectorization. To load this model in a low cost manner, I have built the model in off-line mode, and then using Spark’s SQLContext, I created a broadcast variable to keep the model on each machine as a cached and read-only variable, rather than shipping a copy of the model with tasks. It will give every node a copy of the model efficiently. To start SQLContext, and StreamingContext, we need to instantiate SparkContext which is the main entry point for Spark functionality. SparkContext is already instantiated in DSWB as “sc” so we just start using it.

sqlContext=SQLContext(sc)
lookup = sqlContext.read.parquet("word2vecModel/data").alias("lookup")
lookup.printSchema()
lookup_bd = sc.broadcast(lookup.rdd.collectAsMap())

Then, I instantiated a StreamingContext object and established a batch interval. Spark Streaming is an extension of the core Spark API that enables stream processing of live data streams. Because we will use some stateful transformations later, we also start checkpoints. It saves the generated RDDs to a reliable storage.

ssc = StreamingContext(sc, BATCH_INTERVAL)
ssc.checkpoint("checkpoint")

Then we create a DStream that will connect to a hostname and a port, like localhost:9999. We use window to apply transformations over a sliding window of data (windows length and sliding window was initialized before with 60 and 20). This means that, every 20 seconds, we will feed dstreamwin with the last 60 seconds of tweets.

dstream = ssc.socketTextStream("localhost", 9999)
dstreamwin=dstream.window(WINDOWS_LENGTH, SLIDING_INTERVAL)

Next, we read tweets, and apply the following actions on dstreamtweets RDD which are basically text mining concepts:

  • Finding the coordination of tweets,
  • Tokenization the text of tweets
  • Removing stop words, punctuations, URL, etc
  • Vectorization using Word2Vec model and Doc2Vect function.

If you want to know more about text mining and each of these concepts, refer to this course.

Subsequently, we build the train and test RDD to feed the streaming k-means clustering. This algorithm will estimate clusters dynamically, as data arrives. Spark MLlib provides support for streaming k-means clustering, with parameters to control the decay (or “forgetfulness”) of the estimates.

model = StreamingKMeans(k=clusterNum, decayFactor=0.6).setRandomCenters(102, 1.0, 3)
model.trainOn(trainingData)
clust=model.predictOnValues(testdata)

To show how streaming k-means works, check this example of clustering of some 2-dimensional points:

 

 

Then, we use foreachRDD to push data out and terminate the stream. foreachRDD is an output operator that applies put to each RDD generated from the stream, and send the output to a queue which is used for visualization. No real processing will be started until we call start(). We finally call awaitTermination to wait for the computation to terminate.

clust.foreachRDD(lambda time, rdd: q.put(rdd.collect()))
textdata.foreachRDD(lambda time, rdd: f.put(rdd.collect()))
ssc.start()
ssc.awaitTermination()

The result will be sent to a function that uses Python’s map visualization tool, Basemap, to show tweets. In this map, each point represents a tweets posted in that location (based on longitude and latitude of the tweet). The color of points indicates the cluster that each tweet belongs to. Similar tweets fall into same cluster, and similarity here is defined based on the geographic location of tweets and the similarity of the keywords in the tweets. So, if there is a pair of tweets with same color, they are either same in terms of text and/or close in terms of location. Next, after finding each cluster, we find the most frequent keyword in each cluster, so a list of the top most frequent keywords will reveal the event/news/trend in that area. For example, the following plot shows clusters in the world:

World's tweets clustering
World’s tweets clustering

 

Let’s dive more in the plot and look at the tweets in a specific area, like US. We are trying to find some clusters, and look at each cluster to understand what makes each cluster differentiate from other clusters.

The following plot is a while after starting the process and before forming all clusters. It seems most of tweets are about “New, Year, 2016” or “Job, Hiring”, however you can see some tweets in Cluster 3 that are different from other groups, and these keywords are about “Game, Bowl” probably represents some sporting event.

USA Tweets Clusters
USA Tweets Clusters

 

Continuing the streams, dynamic clusters get completed and new clusters are revealed. We can investigate these clusters more and gain further insight.

 

 

There are a few advantages that make this kind of visualization very easy to run:

  • It is an endless app. It will act as a evolving system and does not need to run k-means again after a while
  • It does not need large storage. The only storing part is to keep an off-line Word2Vec model and some capacity for storing the checkpoints to keep the history for windows length (e.g, for 60 Seconds)

 

You can tune the system by changing a few parameters:

  • Changing the number of clusters
  • Changing the area of collecting tweets by changing the latitude and longitude
  • Filtering the collected tweets

 

This app is a simple example of using Spark Streaming to handle stream data. You can improve the program by tweaking the code:

  • Using a better/bigger Word2Vec model as pre-trained offline model
  • Replacing the off-line Word2Vec model with an evolving one (continues training model)
  • Adding weight to Location and Doc2Vec to drive more accurate clusters
  • Taking advantage of other sources such as news headline, Facebook posts, etc.
  • Visualization of most keywords using more interesting plots

 

Hope you enjoyed this tutorial and if you have any questions or comments, please post them below!

You can also download the python code of Scraper and Analyzer here.

 

 

 

Tags: , , , ,


  • Ashish Dutt

    Good tutorial. Thank you. Though I have a question. You have mentioned in the code snippet
    “s = socket.socket(‘localhost’, 9999)
    s.bind((‘localhost’, 9999)) ….
    My concern is the usage of ‘localhost’ …As you have mentioned you are executing it the code on your laptop that is why the use of ‘localhost’ however real time analytics involve server to server. So as to reduce the overhead involved. Could you suggest how will you execute the same code on a remote server?
    Thank you

  • ipekkeskin

    Hi Saeed and Polong,

    Thank you very much, this post very informative and useful.I appreciated your work.
    I am newbie in Big Data and want to learn the fundemantals.I tried to this example in datascientistworkbench as you described I copied those .ipynb files into notebooks and I didn’t change the code.However, when I try to run Scraper.ipynb, it gives me the following error:

    —————————————————————————
    ImportError Traceback (most recent call last)
    in ()
    3 from thread import *
    4 import requests
    —-> 5 from requests_oauthlib import OAuth1
    6 import json
    7 import oauth2 as oauth

    ImportError: No module named requests_oauthlib

    Do you have an idea, what can be the problem?

    Thanks,
    İpek

    • Saeed Aghabozorgi Post author

      Hi ipek,

      Thank you for pointing this out to us.
      You can install the package using this command:

      !pip install requests_oauthlib
      !pip install oauth2

  • Paul R. Pival (@ppival)

    Hi Saeed, I’m in the same boat as ipekkeskin, though made it a step further, I believe. After replacing all the keys, I get the following error:


    NameError Traceback (most recent call last)
    in ()
    4 consumer_key = "my key is here"
    5 consumer_secret = "my secret is here"
    ----> 6 auth = requests_oauthlib.OAuth1(consumer_key, consumer_secret,access_token, access_token_secret)
    7
    8

    NameError: name 'requests_oauthlib' is not defined

    Did I miss updating something obvious?

    • Saeed Aghabozorgi Post author

      Hi Paul,

      Please make sure that you have installed the packages, and imported them before executing the cells related to credentials, running the following code:
      !pip install requests_oauthlib
      !pip install oauth2
      import requests_oauthlib
      import oauth2

      It should works if the packages are installed properly.

  • Lou Spironello

    Great article! Much needed. Hope you can do some more. Regards, Lou.

  • shermanash

    hi, love the tutorial thank you so much- i was able to get the map up and running with the colored dots for a specific requested keyword list, but can’t figure out how to overlay the clusters on the screen as shown in your example. am i forgetting to uncomment something or is it maybe another issue?