Practical session, Spark streaming for Twitter data analysis
Register a Twitter app
Create a http://twitter.com account if you do not have one and log into it.
Go to https://apps.twitter.com/ . Make sure you are still logged in: you should see a drop-down menu in the upper right-hand corner of the page.
Click Create New App and fill in as many details as you can (you can change most of them later). Click Create your Twitter application.
Go to Keys and Access Tokens. Click Create my access token. You will need the following four key strings need later (keep them secret to protect your Twitter account):
- Consumer Key (API Key)
- Consumer Secret (API Secret)
- Access Token
- Access Token Secret
Stream Twitter messages into Spark
1) You can use the following links to access the tutorial.
- See practical session folder in Canvas
- Installation guide for Spark streaming for scala
For python:
2)Or you can follow the below steps for SCALA:
To test that login works, open a spark-shell (remember the --jars SPARK_JARS option if you defined such an environment variable earlier), and import these APIs:
import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.twitter._
Set logging level (to avoid warnings from running Spark standalone), and create a new spark streaming context (ssc):
sc.setLogLevel("ERROR") val ssc = new StreamingContext(sc, Seconds(5))
Set system properties for each of your keys and access tokens provided by Twitter earlier:
System.setProperty("twitter4j.oauth.consumerKey", "...copy this string from the Twitter app page...") System.setProperty("twitter4j.oauth.consumerSecret", "...copy this string from the Twitter app page...") System.setProperty("twitter4j.oauth.accessToken", "...copy this string from the Twitter app page...") System.setProperty("twitter4j.oauth.accessTokenSecret", "...copy this string from the Twitter app page...")
Create a Spark stream with messages from Twitter:
val stream = TwitterUtils.createStream(ssc, None)
Because Spark has lazy evaluation, nothing happens until you have defined some transformations on the stream and started it. The next two lines collect messages from Twitter, identified by their status, split each message text into words, pick out only those words that start with a #, and print them to the console:
val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#"))) hashTags.foreachRDD(rdd => rdd.foreach(str => println(":: " + str)))
Starting and stopping streams
You are now ready to start the stream:
ssc.start
This should write out current hashtags to the console. After a while, stop the stream with:
ssc.stop(false, true)
Here is the documentation page for Spark's StreamingContext and other classes.
When a streaming context has been stopped, it cannot be restarted, but it can be recreated as follows:
val ssc = new StreamingContext(sc, Seconds(5)) val stream = TwitterUtils.createStream(ssc, None)
You can create an initialisation file, for example init-twitter.scala, as follows:
import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.twitter._ // set Twitter oauth properties System.setProperty("twitter4j.oauth.consumerKey", "your key") System.setProperty("twitter4j.oauth.consumerSecret", "your key") System.setProperty("twitter4j.oauth.accessToken", "your key") System.setProperty("twitter4j.oauth.accessTokenSecret", "your key") // create stream sc.setLogLevel("ERROR") val ssc = new StreamingContext(sc, Seconds(5)) val stream = TwitterUtils.createStream(ssc, None) // define transformations val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#"))) hashTags.foreachRDD(rdd => rdd.foreach(str => println(":: " + str))) // start ssc.start // to stop, use: ssc.stop(false, true)
Start spark-shell with an initialisation file as follows:
spark-shell -i init-twitter.scala
Todo:
- partition by hashtag, save message text, save bags of words [1]