Co-authored by Saeed Aghabozorgi and Polong Lin.
This is a brief tutorial on using Spark Streaming to analyze social media data in real time.
Do you want to know what people are tweeting about in different parts of world, continents or your country? Do you want to understand trend or opinion of people in a region? Do you need to detect events? Data science can help you to gain this information real-time.
This is a brief guide on using Spark Streaming to analyze social data. I have created a streaming program that is constantly running, fetching Twitter data in real-time, and also clustering the tweets based on their text and location, using the k-means algorithm. My goal is to show you how state-of-the-art approaches like Spark now enables us to quickly and easily write such a program with minimal lines of code. You will practice a wide range of Spark commands here to implement this application.
First, we need to read Twitter data. However, unlike traditional methods, in the tutorial we do not actually save the Tweets on our disk or into a database. Instead, we clean, analyze and visualize it all in real-time. So in this case, we won’t have to face scalability issues – we can read data for hours and continue to visualize it on a map.
This guide has two parts:
- Scraper: Reading data from Twitter
- Analyzer: Cleaning, Analyzing and Visualizing data
In the Scraper, we read data from Twitter and write it into a socket. Why? Because later, in the second part of this tutorial – Analyzer – we will read from the socket in a streaming manner, and analyze the data on-the-fly. So, the basic task of the Scraper is to be a mediator between Twitter (the original data) streaming and the Analyzer.
To write this code in Python, we have used an convenient platform, Data Scientist Workbench (DSWB), that hosts a lot of open-source tools used in data science, such as Jupyter notebooks, RStudio and Zeppelin. It also has Apache Spark pre-installed for each of those tools. Everything is set up for us already, so all we have to do is write your code. You can take a look at this 3-minute video to learn more about it. You can upload notebooks using one of these two options:
1) From the top menu, go to “Build Analytics” then “IPython/Jupyter Notebooks”. Now in the searchbar in the top-right corner, copy and paste the URL below and press Enter.
2) Alternatively, you can download the IPython notebooks here:
and drag and drop each file into the sidebar while DSWB is displayed.
Lets look at Scraper first. It is Python code that acts as a pipe between the Twitter and Analyzer file.
Scraper runs three tasks:
- Opening a socket, listening to and accepting connection requests from the Analyzer
- Reading stream tweets from the Twitter streaming API
- Writing tweets to socket in JSON format
We create a server socket to write the tweets into so that the Analyzer can later read streaming data from this socket. To this end, we should use an arbitrary non-privileged port, bind a socket to local host and port, and start listening. Function socket.socket creates a socket and returns a socket that is used. We will use port 9999 for this socket:
s = socket.socket('localhost', 9999)
The following block will read data from Twitter and write it to the socket. To read tweets, we should set variables that contain the user credentials to access Twitter API. It includes access token and password, and also consumer key and password. You can find more about it in Connecting to a streaming endpoint. Then, we establish a connection to the streaming API.
auth = requests_oauthlib.OAuth1(consumer_key, consumer_secret,access_token, access_token_secret)
We pull a selection of tweets from the Twitter Streaming API, also known as the fire hose. We will be using a very “long-lived” HTTP request, and parsing the response incrementally. Conceptually, you can think of it as downloading an infinitely long file over HTTP. The stream output is determined by the contents of the following streaming endpoint:
Also, we can define some parameters to restrict the results, for example to English tweets, and confined to a geographic boundary, for instance, the United States, and also filter it based on some keywords.
data = [('language', 'en'), ('locations', '-130,-20,100,50') ,('track','ibm’, ‘Cloud’, ‘BigData’)]
query_url = url + '?' + '&'.join([str(t) + '=' + str(t) for t in data])
response = requests.get(query_url, auth=auth, stream=True)
Then, we will iterate over streaming tweets, and use function send to simply send/write data to opened socket.
In the second part, Analyzer, we use Spark’s streaming API to read data from socket, to pre-process the tweets, to cluster tweets when data arrive in a stream, and push the result out for visualization.
First, we initialize the parameters. Batch interval is simply the interval at which the streaming API will update, window length is the duration of the window, and sliding interval is the interval at which the window operation is performed. Also, I have determined the number of clusters for each update, using ClustNum, to be equal to 15, though this is an arbitrary number representing how many clusters we want.
BATCH_INTERVAL = 10 # How frequently to update (seconds)
WINDOWS_LENGTH=60 #the duration of the window
SLIDING_INTERVAL=20 #the interval at which the window operation is performed
clusterNum=15 #Number of Clusters
As with any other Spark program, we first created a SparkContext object, and since I’m set up on my laptop (with 4 workers), I designated the program to use the local Spark instance. In this program we use k-means algorithm to cluster tweets. K-means clustering groups data based on their similarity using an unsupervised approach.
But K-means, can work only with numerical vector data. Therefore, we have to convert the tweets to some features or vectors. There are different approaches for converting text to vectors, such as TF-IDF, Word2Vec and CountVectorizer. Spark provides a nice Word2Vec function to covert words to vectors. Word2Vec computes distributed vector representation of words. The main advantage of the distributed representations is that similar words are close in the vector space. Word2Vec uses the corpus of documents to calculate the build vector. Considering that we will use the streaming data, we cannot make a model for each batch, because first, it is costly in terms of computation, and second, the size of batch is not that big. Therefore, we use a pre-trained model (built from text8 in this example) for vectorization. To load this model in a low cost manner, I have built the model in off-line mode, and then using Spark’s SQLContext, I created a broadcast variable to keep the model on each machine as a cached and read-only variable, rather than shipping a copy of the model with tasks. It will give every node a copy of the model efficiently. To start SQLContext, and StreamingContext, we need to instantiate SparkContext which is the main entry point for Spark functionality. SparkContext is already instantiated in DSWB as “sc” so we just start using it.
lookup = sqlContext.read.parquet("word2vecModel/data").alias("lookup")
lookup_bd = sc.broadcast(lookup.rdd.collectAsMap())
Then, I instantiated a StreamingContext object and established a batch interval. Spark Streaming is an extension of the core Spark API that enables stream processing of live data streams. Because we will use some stateful transformations later, we also start checkpoints. It saves the generated RDDs to a reliable storage.
ssc = StreamingContext(sc, BATCH_INTERVAL)
Then we create a DStream that will connect to a hostname and a port, like localhost:9999. We use window to apply transformations over a sliding window of data (windows length and sliding window was initialized before with 60 and 20). This means that, every 20 seconds, we will feed dstreamwin with the last 60 seconds of tweets.
dstream = ssc.socketTextStream("localhost", 9999)
Next, we read tweets, and apply the following actions on dstreamtweets RDD which are basically text mining concepts:
- Finding the coordination of tweets,
- Tokenization the text of tweets
- Removing stop words, punctuations, URL, etc
- Vectorization using Word2Vec model and Doc2Vect function.
If you want to know more about text mining and each of these concepts, refer to this course.
Subsequently, we build the train and test RDD to feed the streaming k-means clustering. This algorithm will estimate clusters dynamically, as data arrives. Spark MLlib provides support for streaming k-means clustering, with parameters to control the decay (or “forgetfulness”) of the estimates.
model = StreamingKMeans(k=clusterNum, decayFactor=0.6).setRandomCenters(102, 1.0, 3)
To show how streaming k-means works, check this example of clustering of some 2-dimensional points:
Then, we use foreachRDD to push data out and terminate the stream. foreachRDD is an output operator that applies put to each RDD generated from the stream, and send the output to a queue which is used for visualization. No real processing will be started until we call start(). We finally call awaitTermination to wait for the computation to terminate.
clust.foreachRDD(lambda time, rdd: q.put(rdd.collect()))
textdata.foreachRDD(lambda time, rdd: f.put(rdd.collect()))
The result will be sent to a function that uses Python’s map visualization tool, Basemap, to show tweets. In this map, each point represents a tweets posted in that location (based on longitude and latitude of the tweet). The color of points indicates the cluster that each tweet belongs to. Similar tweets fall into same cluster, and similarity here is defined based on the geographic location of tweets and the similarity of the keywords in the tweets. So, if there is a pair of tweets with same color, they are either same in terms of text and/or close in terms of location. Next, after finding each cluster, we find the most frequent keyword in each cluster, so a list of the top most frequent keywords will reveal the event/news/trend in that area. For example, the following plot shows clusters in the world:
Let’s dive more in the plot and look at the tweets in a specific area, like US. We are trying to find some clusters, and look at each cluster to understand what makes each cluster differentiate from other clusters.
The following plot is a while after starting the process and before forming all clusters. It seems most of tweets are about “New, Year, 2016” or “Job, Hiring”, however you can see some tweets in Cluster 3 that are different from other groups, and these keywords are about “Game, Bowl” probably represents some sporting event.
Continuing the streams, dynamic clusters get completed and new clusters are revealed. We can investigate these clusters more and gain further insight.
There are a few advantages that make this kind of visualization very easy to run:
- It is an endless app. It will act as a evolving system and does not need to run k-means again after a while
- It does not need large storage. The only storing part is to keep an off-line Word2Vec model and some capacity for storing the checkpoints to keep the history for windows length (e.g, for 60 Seconds)
You can tune the system by changing a few parameters:
- Changing the number of clusters
- Changing the area of collecting tweets by changing the latitude and longitude
- Filtering the collected tweets
This app is a simple example of using Spark Streaming to handle stream data. You can improve the program by tweaking the code:
- Using a better/bigger Word2Vec model as pre-trained offline model
- Replacing the off-line Word2Vec model with an evolving one (continues training model)
- Adding weight to Location and Doc2Vec to drive more accurate clusters
- Taking advantage of other sources such as news headline, Facebook posts, etc.
- Visualization of most keywords using more interesting plots
Hope you enjoyed this tutorial and if you have any questions or comments, please post them below!
You can also download the python code of Scraper and Analyzer here.