Extracting Twitter Data using PySpark

 

Twitter is a rich source of data for sentiment analysis, market research, and many other applications. In this blog, we’ll show you how to extract Twitter data using PySpark, the powerful big data processing framework.

Setting up the Environment To extract Twitter data using PySpark, you’ll need to have PySpark installed, along with the tweepy library for accessing the Twitter API. You’ll also need to have a Twitter Developer account and create a Twitter app to obtain your API credentials.

Accessing the Twitter API

Once you have your API credentials, you can use the tweepy library to access the Twitter API and retrieve tweets. Here’s an example of how to retrieve the latest tweets for a specific hashtag:

Copy code
import tweepy

# Authenticate to Twitter
auth = tweepy.OAuthHandler("consumer_key", "consumer_secret")
auth.set_access_token("access_token", "access_token_secret")

# Create API object
api = tweepy.API(auth)

# Search for tweets containing the hashtag "#PySpark"
tweets = api.search(q="#PySpark", lang="en", count=100, tweet_mode="extended")

In this example, we use the OAuthHandler to authenticate to the Twitter API using our API credentials, and the API object to interact with the API and retrieve tweets. The search method is used to search for tweets containing the hashtag “#PySpark”, with the lang parameter set to “en” to return only English tweets and the count parameter set to 100 to retrieve 100 tweets.

Converting Tweets to DataFrames

Once you have retrieved the tweets, you can convert them to a PySpark DataFrame for further processing. Here’s an example of how to convert the tweets to a DataFrame:

Copy code
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("TwitterData").getOrCreate()

# Convert the tweets to a DataFrame
df = spark.createDataFrame(tweets)

In this example, we create a Spark session and use the createDataFrame method to convert the tweets to a DataFrame.

Processing the Data

Once you have the DataFrame, you can perform various operations on the data, such as filtering, aggregating, and transforming it. For example, you can use the filter method to select tweets that contain specific keywords:

Copy code
# Filter the tweets to only include those that contain the keyword "data"
filtered_df = df.filter(df["full_text"].contains("data"))

You can also use the groupBy method to group the tweets by the user’s location and calculate the number of tweets per location:


# Group the tweets by the user's location and count the number of tweets per location
grouped_df = df.groupBy("user.location").count()

Another example

The PySpark dataframe named Hashtags contains two columns, value and Hashtags. The value column contains strings that represent tweets, while the Hashtags column contains arrays of hashtags extracted from the tweets.

The following code is used to extract the hashtags from the tweets:


Hashtags = Hashtags.withColumn("Hashtags", lower(Hashtags["value"]))
Hashtags = Hashtags.withColumn("Hashtags", split(Hashtags["Hashtags"], r'\s'))
Hashtags = Hashtags.withColumn("Hashtags", F.array_remove(Hashtags["Hashtags"], r'\s'))

This code first makes the value column lowercase, then splits it into an array of words using whitespace, tabs, and newlines as separators. Finally, it removes any elements in the Hashtags column that consist only of whitespace.

To remove elements in the Hashtags column that don’t start with “#”, a custom UDF (User Defined Function) is used:

def keep_hashtags(array):
    return [x for x in array if x.startswith("#")]

keep_hashtags_udf = udf(keep_hashtags, ArrayType(StringType()))

Hashtags = Hashtags.withColumn("Hashtags", keep_hashtags_udf(Hashtags["Hashtags"]))

This code defines a function keep_hashtags that takes an array and returns only the elements that start with “#”. The UDF keep_hashtags_udf is then used in a withColumn transformation to update the Hashtags column.

After these transformations, the dataframe Hashtags will have the desired structure, with only hashtags remaining in the Hashtags column. The dataframe might look like this:

+-----------------+---------------------+
|value            |Hashtags             |
+-----------------+---------------------+
|Instead, it has  |[]                   |
|#iran #abd #biden|[#iran, #abd, #biden]|
+-----------------+---------------------+

Conclusion

In this blog, we showed you how to extract Twitter data using PySpark and perform various operations on the data. Whether you’re interested in sentiment analysis, market research, or just exploring the vast amounts of data on Twitter