Streaming tweets with Twitter API: Difference between revisions

From info319
No edit summary
 
(21 intermediate revisions by the same user not shown)
Line 1: Line 1:
== Get Twitter API account ==
== Get Twitter API account ==


This was suggested preparations before the course started, but to remind you:
This was suggested preparations before the course started: ''...head over to the Twitter [https://developer.twitter.com/en Developer page] and apply for a [https://developer.twitter.com/en/docs/developer-portal/overview Developer Account]. Get "Essential" access first. Then you can go on to apply for wider "Academic Research" access (but: that is mainly for Master students who have started the thesis work, so I am not sure what Twitter will say...). It is not mandatory to use Twitter data in the course, but it will be a convenient way to get started.''


''...head over to the Twitter [https://developer.twitter.com/en Developer page] and apply for a [https://developer.twitter.com/en/docs/developer-portal/overview Developer Account]. Get "Essential" access first. Then you can go on to apply for wider "Academic Research" access (but: that is mainly for Master students who have started the thesis work, so I am not sure what Twitter will say...). It is not mandatory to use Twitter data in the course, but it will be a convenient way to get started.'''
The simple ''Essential'' account is a fine start and offers a lot of opportunities. When you have defined a more concrete project, you can apply for an ''Academic Research'' account that gives even wider access. Then it is up to Twitter whether they will grant you access.


The simple ''Essential'' account is a fine start and offers a lot of opportunities. When you have defined a more concrete project, you may apply for an ''Academic Research'' account. Then it is up to Twitter whether they will grant you access.
Usually, a project that gives ''Essential'' access will be granted quickly. Twitter offers several ways to authenticate yourself, but the easiest was is to use the ''Bearer Token''. You can find it from your Twitter project page, under ''Keys and tokens''. In your ''info319-exercises'' folder, create a ''keys/bearer_token'' file that only you can read:
 
Usually, your ''Essential'' access will be granted quickly. Twitter offers several ways to authenticate yourself, but the easiest was is to use the ''Bearer Token''. You can find it from your Twitter project page, under ''Keys and tokens''. In your ''info319-exercises'' folder, create a ''keys/bearer_token'' file that only you can read:
  (venv) $ mkdir keys
  (venv) $ mkdir keys
  (venv) $ chmod 700 keys
  (venv) $ chmod 700 keys
  (venv) $ touch keys/bearer_token
  (venv) $ touch keys/bearer_token
  (venv) $ chmod 600 keys/bearer_token
  (venv) $ chmod 600 keys/bearer_token
  (venv) $ echo "...long token string goes here..." > keys/bearer_token
  (venv) $ echo "...the long alpha-numeric token string goes here..." > keys/bearer_token
 
== Saving tweets to file ==
(venv) $ pip install tweepy
 
# Run this minimal boilerplate code to connect to Twitter's API and start downloading a stream of tweets (the code takes a few seconds to connect):
import json
from time import sleep
import threading
import tweepy
 
DURATION = 10  # hot many seconds to run for, including connection time
class TweetHarvester:
    def __init__(self, bearer_token, duration):
        """Creates the tweepy client and starts the streaming thread."""
        self.streaming_client = tweepy.StreamingClient(bearer_token)
        self.thread = threading.Thread(
            target=self.run,
            args=(duration,))       
        self.thread.start()
    def run(self, duration):
        """The streaming tweepy client runs this function."""
        self.streaming_client.on_data = self.on_data
        self.streaming_client.sample(threaded=True)
        sleep(duration)
        self.streaming_client.disconnect()
    def on_data(self, json_data):
        """Tweepy calls this when it receives data from Twitter"""
        id_text = json.loads(json_data.decode())
        print('Received tweet:', id_text)
bearer_token = open('./keys/bearer_token').read().strip()
harvester = TweetHarvester(bearer_token, DURATION)
 
Note how, every time a new tweet is received, tweepy calls the '''on_data()'''-function, which prints the tweeted text to the screen.
 
# Create code that saves the tweets to a file (for example as JSON or CSV). Create a subfolder for tweet files to keep your ''info319-exercise2'' folder tidy.
 
# Change the code so you download and save additional information about each tweet, for example the handle (user name) of the tweeter, whether it is a retweet of another id, and perhaps the date and time.
 
# Refactor the code to prepare for cleaner termination. Remove the these lines from the '''run()'''-function (along with '''threaded=True''' in the call to '''sample()''' and along with everything about the '''duration''' variable):
        sleep(duration)
        self.streaming_client.disconnect()
Instead, call '''disconnect()''' from '''on_data()''' when a given number of tweets (say 100 to start) have been received.
 
# Change the code so it instead writes the tweets to a file (which it closes properly on disconnect).
 
== Streaming tweets to a socket ==
The following changes to the boilerplate code instead writes the stream of tweets to a [https://en.wikipedia.org/wiki/Network_socket network socket]. Think of a socket as an internal internet connection from your machine back to itself. Different programs on your computer can use this socket to communicate using the regular internet APIs.
 
XXX
 
Change the code so it sends the stream of tweets to a socket (for example PORT 65000) instead of to a file. On termination, the program must close the socket properly using '''XXX'''.


You can use the '''nc''' utility to receive data from the socket. From another console window (inside or outside VS Code, a virtual environment is not needed):
If you use git, you can add the line '''/keys/''' to your ''.gitignore'' file as well. You do not want it uploaded to an external repository.
 
$ apt install nc
$ nc localhost 65000  # you must start StreamingClient in elsewhere before you run this command
 
'''IMPORTANT:''' When you debug, the socket will often remain open after your program has crashed (there is a timeout). So you may have to change PORT number often in the StreamingClient and the test code line (65001, 65002, ...).
 
== Stream to Spark ==
Streaming Spark is the topic of Session3, but in this exercise we will write a simple Spark stream that receives and prints the Stream of tweets. It is based on [
https://sparkbyexamples.com/spark/spark-streaming-from-tcp-socket/ this example].
 
== Get Twitter API account ==
 
This was suggested preparations before the course started, but to remind you:
 
''...head over to the Twitter [https://developer.twitter.com/en Developer page] and apply for a [https://developer.twitter.com/en/docs/developer-portal/overview Developer Account]. Get "Essential" access first. Then you can go on to apply for wider "Academic Research" access (but: that is mainly for Master students who have started the thesis work, so I am not sure what Twitter will say...). It is not mandatory to use Twitter data in the course, but it will be a convenient way to get started.'''
 
The simple ''Essential'' account is a fine start and offers a lot of opportunities. When you have defined a more concrete project, you may apply for an ''Academic Research'' account. Then it is up to Twitter whether they will grant you access.
 
Usually, your ''Essential'' access will be granted quickly. Twitter offers several ways to authenticate yourself, but the easiest was is to use the ''Bearer Token''. You can find it from your Twitter project page, under ''Keys and tokens''. In your ''info319-exercises'' folder, create a ''keys/bearer_token'' file that only you can read:
(venv) $ mkdir keys
(venv) $ chmod 700 keys
(venv) $ touch keys/bearer_token
(venv) $ chmod 600 keys/bearer_token
(venv) $ echo "...long token string goes here..." > keys/bearer_token


== Saving tweets to file ==
== Saving tweets to file ==
Install the tweepy API:
  (venv) $ pip install tweepy
  (venv) $ pip install tweepy


Run the following minimal boilerplate code to connect to Twitter's API and start downloading a stream of tweets (the code takes a few seconds to connect):
'''Task 1:''' Run the following minimal boilerplate code '''tweet_harvester.py''' to connect to Twitter's API and start downloading a stream of tweets (the code takes a few seconds to connect):
  import json
  import json
  from time import sleep
  import time
import threading
  import tweepy
  import tweepy
 
DURATION = 10  # hot many seconds to run for, including connection time
   
   
  class TweetHarvester:
  KEY_FILE = './keys/bearer_token'
    def __init__(self, bearer_token, duration):
DURATION = 10  # how many seconds to run for, including connection time
        """Creates the tweepy client and starts the streaming thread."""
   
        self.streaming_client = tweepy.StreamingClient(bearer_token)
def on_data(json_data):
        self.thread = threading.Thread(
    """Tweepy calls this when it receives data from Twitter"""
            target=self.run,
    json_obj = json.loads(json_data.decode())
            args=(duration,))       
    print('Received tweet:', json_obj)
        self.thread.start()
    def run(self, duration):
        """The streaming tweepy client runs this function."""
        self.streaming_client.on_data = self.on_data
        self.streaming_client.sample(threaded=True)
        sleep(duration)
        self.streaming_client.disconnect()
    def on_data(self, json_data):
        """Tweepy calls this when it receives data from Twitter"""
        id_text = json.loads(json_data.decode())
        print('Received tweet:', id_text)
   
   
  bearer_token = open('./keys/bearer_token').read().strip()
# Initiate the tweepy client and start the sampling thread.
  harvester = TweetHarvester(bearer_token, DURATION)
  bearer_token = open(KEY_FILE).read().strip()
 
  streaming_client = tweepy.StreamingClient(bearer_token)
Note how, every time a new tweet is received, tweepy calls the '''on_data()'''-function, which prints the tweeted text to the screen.
streaming_client.on_data = on_data
 
streaming_client.sample(threaded=True)
Create code that saves the tweets to a file (for example as JSON or CSV). Create a subfolder for tweet files to keep your ''info319-exercise2'' folder tidy.
time.sleep(DURATION)
streaming_client.disconnect()
Note how, every time a new tweet is received, tweepy calls the '''on_data()'''-function, which prints the tweeted text to the screen. This is just a simple example to get started. In Session 1 we will look at many other ways to access the Twitter API V2 through tweepy.


Change the code so you download and save additional information about each tweet, for example the handle (user name) of the tweeter, whether it is a retweet of another id, and perhaps the date and time.
'''Task 2:''' Change the code to save the tweets to a file (for example as JSON or CSV). Create a subfolder for tweet files to keep your ''info319-exercise2'' folder tidy.


Refactor the code to prepare for cleaner termination. Remove the these lines from the '''run()'''-function (along with '''threaded=True''' in the call to '''sample()''' and along with everything about the '''duration''' variable):
'''Task 3:''' Change the code so you download and save additional information about each tweet, for example the handle (user name) of the tweeter, whether it is a retweet of another id, and perhaps the date and time.
        sleep(duration)
        self.streaming_client.disconnect()
Instead, make the '''on_data()'''-function call '''disconnect()''' when a given number of tweets (say 100 to start) have been received.


# Change the code so it instead writes the tweets to a file (which it closes properly on disconnect).
'''Task 4:''' Refactor the code to prepare for cleaner termination by creating a new '''on_finish()'''-function, which disconnects the streaming client, closes open files, and performs any other needed termination tasks. Remove '''time.sleep(DURATION)''', and instead make the '''on_data()'''-function call '''on_finish()''' when a given number of tweets (say 100 to start) has been received.


== Streaming tweets to a socket ==
== Streaming tweets to a socket ==
Change the code to write the stream of tweets to a [https://en.wikipedia.org/wiki/Network_socket network socket] instead. Think of a socket as an internal internet connection from your machine back to itself. Different programs on your computer can use this socket to communicate using the regular internet APIs.
'''Task 5:''' Change the code to write the stream of tweets to a [https://en.wikipedia.org/wiki/Network_socket network socket] instead. Think of a socket as an internal internet connection from your machine back to itself. Different programs on your computer can use this socket to communicate using the regular internet APIs.


Here are some critical pieces of code you need to include in your program:
Here are some critical pieces of code you need to include in your program (loosely based on [https://realpython.com/python-sockets/ this example]):
  import socket
  import socket
   
   
  HOST = 'localhost'
  HOST = 'localhost'
  PORT = 65000  # for example, but it needs to be a high number
  PORT = 65000  # for example
NUM_TWEETS = 100
   
   
To open a socket for output ('''self.''' to add it inside TweetHarvester):
tweet_count = 0
    [self.]socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    [self.]socket.bind((host, port))
    [self.]socket.listen()
    [self.]connection, self.address = self.socket.accept()


To send data on the socket:
To open a socket for output):
    def on_data(self, json_data):
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        json_obj = json.loads(json_data.decode())
socket.bind((HOST, PORT))
        json_bytes = json.dumps(json_obj['data']).encode()
socket.listen()
        self.connection.sendall(json_bytes)
connection, address = socket.accept()
        if self.tweet_count >= self.num_tweets:
Hence, on the output side the program is a server that sends tweets through the socket while, on the input side, it is a client that receives data from Twitter.
            ...close the stream and the socket...


To run:
To send data through the socket:
  bearer_token = open('./keys/bearer_token').read().strip()
  def on_data(json_data):
harvester = TweetHarvester(bearer_token, HOST, PORT, NUM_TWEETS)
    json_obj = json.loads(json_data.decode())
    json_str = json.dumps(json_obj['data']['text'])+'\r\n'
    connection.sendall(json_str.encode())
    if tweet_count >= NUM_TWEETS:
        ...close the socket etc...


You can use the '''nc''' utility to receive data from the socket. From another console window (inside or outside VS Code, a virtual environment is not needed):
To close the socket on termination:
socket.close()


For testing, you can use the '''nc''' utility to receive data from the socket. From another console window (inside or outside VS Code, a virtual environment is not needed):
  $ apt install nc
  $ apt install nc
  $ nc localhost 65000  # you must start StreamingClient in elsewhere before you run this command
  $ nc localhost 65000  # you must start the tweet stream ''before'' you run this command


'''IMPORTANT:''' When you debug, the socket will often remain open after your program has crashed. So you may have to change PORT number often in the StreamingClient and the test code line (65001, 65002, ...).
'''IMPORTANT:''' When you debug, the socket will often remain open after your program has crashed, or if you have not closed the Jupyter notebook. So you may have to change PORT number often in the StreamingClient and the test code line (65001, 65002, ...).


== Stream to Spark ==
== Receive tweets from a socket with streaming Spark ==
Streaming Spark is the topic of Session 3, but you can run the following script '''spark_receiver.py''' to receive tweets from the socket as a Spark stream. It is based on [https://sparkbyexamples.com/spark/spark-streaming-from-tcp-socket/ this example]:
Streaming Spark is the topic of Session 3, but you can run the following script '''spark_receiver.py''' to receive tweets from the socket as a Spark stream (loosely based on [https://sparkbyexamples.com/spark/spark-streaming-from-tcp-socket/ this example]):
  import os
  import os
  import findspark
  import findspark
  from pyspark.sql import SparkSession
  from pyspark.sql import SparkSession
   
   
  HOST = os.environ['HOST'] if 'HOST' in os.environ else 'localhost'
  HOST = 'localhost'
  PORT = os.environ['PORT'] if 'PORT' in os.environ else '65000'
  PORT = 65000  # for example
  PORT = int(PORT)
   
   
  findspark.init()
  findspark.init()
Line 200: Line 108:
                 .awaitTermination()
                 .awaitTermination()


Run with:
Start '''tweet_harvester.py''' first and then '''spark_receiver.py''' in another window.
(venv) $ export HOST=localhost PORT=65000; python spark_receiver.py


Instead of using '''import findspark; findspark.init()''' you can set the '''$SPARK_HOME''' environment variable, for example:
Instead of using '''import findspark; findspark.init()''' you can set the '''$SPARK_HOME''' environment variable, for example:
  export SPARK_HOME=${PWD}/venv/lib/python3.8/site-packages/pyspark
  export SPARK_HOME=${PWD}/venv/lib/python3.8/site-packages/pyspark
Add this line to the end of your '''~/.bashrc''' file to make it permanent.
Add this line to the end of your '''~/.bashrc''' file if you want to make it permanent. It is safer than '''findspark''' when you have several Spark installations on the same machine.

Latest revision as of 17:41, 16 September 2022

Get Twitter API account

This was suggested preparations before the course started: ...head over to the Twitter Developer page and apply for a Developer Account. Get "Essential" access first. Then you can go on to apply for wider "Academic Research" access (but: that is mainly for Master students who have started the thesis work, so I am not sure what Twitter will say...). It is not mandatory to use Twitter data in the course, but it will be a convenient way to get started.

The simple Essential account is a fine start and offers a lot of opportunities. When you have defined a more concrete project, you can apply for an Academic Research account that gives even wider access. Then it is up to Twitter whether they will grant you access.

Usually, a project that gives Essential access will be granted quickly. Twitter offers several ways to authenticate yourself, but the easiest was is to use the Bearer Token. You can find it from your Twitter project page, under Keys and tokens. In your info319-exercises folder, create a keys/bearer_token file that only you can read:

(venv) $ mkdir keys
(venv) $ chmod 700 keys
(venv) $ touch keys/bearer_token
(venv) $ chmod 600 keys/bearer_token
(venv) $ echo "...the long alpha-numeric token string goes here..." > keys/bearer_token

If you use git, you can add the line /keys/ to your .gitignore file as well. You do not want it uploaded to an external repository.

Saving tweets to file

Install the tweepy API:

(venv) $ pip install tweepy

Task 1: Run the following minimal boilerplate code tweet_harvester.py to connect to Twitter's API and start downloading a stream of tweets (the code takes a few seconds to connect):

import json
import time
import tweepy

KEY_FILE = './keys/bearer_token'
DURATION = 10  # how many seconds to run for, including connection time
   
def on_data(json_data):
    """Tweepy calls this when it receives data from Twitter"""
    json_obj = json.loads(json_data.decode())
    print('Received tweet:', json_obj)

# Initiate the tweepy client and start the sampling thread.
bearer_token = open(KEY_FILE).read().strip()
streaming_client = tweepy.StreamingClient(bearer_token)
streaming_client.on_data = on_data
streaming_client.sample(threaded=True)
time.sleep(DURATION)
streaming_client.disconnect()

Note how, every time a new tweet is received, tweepy calls the on_data()-function, which prints the tweeted text to the screen. This is just a simple example to get started. In Session 1 we will look at many other ways to access the Twitter API V2 through tweepy.

Task 2: Change the code to save the tweets to a file (for example as JSON or CSV). Create a subfolder for tweet files to keep your info319-exercise2 folder tidy.

Task 3: Change the code so you download and save additional information about each tweet, for example the handle (user name) of the tweeter, whether it is a retweet of another id, and perhaps the date and time.

Task 4: Refactor the code to prepare for cleaner termination by creating a new on_finish()-function, which disconnects the streaming client, closes open files, and performs any other needed termination tasks. Remove time.sleep(DURATION), and instead make the on_data()-function call on_finish() when a given number of tweets (say 100 to start) has been received.

Streaming tweets to a socket

Task 5: Change the code to write the stream of tweets to a network socket instead. Think of a socket as an internal internet connection from your machine back to itself. Different programs on your computer can use this socket to communicate using the regular internet APIs.

Here are some critical pieces of code you need to include in your program (loosely based on this example):

import socket

HOST = 'localhost'
PORT = 65000  # for example
NUM_TWEETS = 100

tweet_count = 0

To open a socket for output):

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.bind((HOST, PORT))
socket.listen()
connection, address = socket.accept()

Hence, on the output side the program is a server that sends tweets through the socket while, on the input side, it is a client that receives data from Twitter.

To send data through the socket:

def on_data(json_data):
    json_obj = json.loads(json_data.decode())
    json_str = json.dumps(json_obj['data']['text'])+'\r\n'
    connection.sendall(json_str.encode())
    if tweet_count >= NUM_TWEETS:
        ...close the socket etc...

To close the socket on termination:

socket.close()

For testing, you can use the nc utility to receive data from the socket. From another console window (inside or outside VS Code, a virtual environment is not needed):

$ apt install nc
$ nc localhost 65000  # you must start the tweet stream before you run this command

IMPORTANT: When you debug, the socket will often remain open after your program has crashed, or if you have not closed the Jupyter notebook. So you may have to change PORT number often in the StreamingClient and the test code line (65001, 65002, ...).

Receive tweets from a socket with streaming Spark

Streaming Spark is the topic of Session 3, but you can run the following script spark_receiver.py to receive tweets from the socket as a Spark stream (loosely based on this example):

import os
import findspark
from pyspark.sql import SparkSession

HOST = 'localhost'
PORT = 65000  # for example

findspark.init()
spark = SparkSession.builder.appName('SparkTest').getOrCreate()
spark.sparkContext.setLogLevel('ERROR')

# incoming stream
incoming_df = spark.readStream \
                .format('socket') \
                .option('host', HOST) \
                .option('port', PORT) \
                .load()
# outgoing stream
outgoing_df = incoming_df.select('value').writeStream \
                .format('console') \
                .outputMode('update') \
                .start() \
                .awaitTermination()

Start tweet_harvester.py first and then spark_receiver.py in another window.

Instead of using import findspark; findspark.init() you can set the $SPARK_HOME environment variable, for example:

export SPARK_HOME=${PWD}/venv/lib/python3.8/site-packages/pyspark

Add this line to the end of your ~/.bashrc file if you want to make it permanent. It is safer than findspark when you have several Spark installations on the same machine.