Files
selenium-twitter-scraper/scraper/twitter_scraper.py

382 lines
12 KiB
Python

import os
import sys
import pandas as pd
from progress import Progress
from scroller import Scroller
from tweet import Tweet
from datetime import datetime
from fake_headers import Headers
from time import sleep
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.common.exceptions import (
NoSuchElementException,
StaleElementReferenceException,
WebDriverException,
)
from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.webdriver.chrome.service import Service as ChromeService
from webdriver_manager.chrome import ChromeDriverManager
TWITTER_LOGIN_URL = "https://twitter.com/i/flow/login"
class Twitter_Scraper:
def __init__(
self,
username,
password,
max_tweets=50,
scrape_username=None,
scrape_hashtag=None,
scrape_query=None,
scrape_latest=True,
scrape_top=False,
):
print("Initializing Twitter Scraper...")
self.username = username
self.password = password
self.data = []
self.scraper_details = {
"type": None,
"username": scrape_username,
"hashtag": str(scrape_hashtag).replace("#", "")
if scrape_hashtag is not None
else None,
"query": scrape_query,
"tab": "Latest" if scrape_latest else "Top" if scrape_top else "Latest",
}
self.router = self.go_to_home
self.tweet_ids = set()
self.max_tweets = max_tweets
self.progress = Progress(0, max_tweets)
self.tweet_cards = []
self.driver = self._get_driver()
self.scroller = Scroller(self.driver)
self._login()
if scrape_username is not None:
self.scraper_details["type"] = "Username"
self.router = self.go_to_profile
elif scrape_hashtag is not None:
self.scraper_details["type"] = "Hashtag"
self.router = self.go_to_hashtag
elif scrape_query is not None:
self.scraper_details["type"] = "Query"
self.router = self.go_to_search
else:
self.scraper_details["type"] = "Home"
self.router = self.go_to_home
def _get_driver(self):
print("Setup WebDriver...")
header = Headers().generate()["User-Agent"]
browser_option = ChromeOptions()
browser_option.add_argument("--no-sandbox")
browser_option.add_argument("--disable-dev-shm-usage")
browser_option.add_argument("--ignore-certificate-errors")
browser_option.add_argument("--disable-gpu")
browser_option.add_argument("--log-level=3")
browser_option.add_argument("--disable-notifications")
browser_option.add_argument("--disable-popup-blocking")
browser_option.add_argument("--user-agent={}".format(header))
# For Hiding Browser
browser_option.add_argument("--headless")
try:
print("Initializing ChromeDriver...")
driver = webdriver.Chrome(
options=browser_option,
)
return driver
except WebDriverException:
try:
print("Downloading ChromeDriver...")
chromedriver_path = ChromeDriverManager().install()
chrome_service = ChromeService(executable_path=chromedriver_path)
print("Initializing ChromeDriver...")
driver = webdriver.Chrome(
service=chrome_service,
options=browser_option,
)
return driver
except Exception as e:
print(f"Error setting up WebDriver: {e}")
sys.exit(1)
def _login(self):
print("Logging in to Twitter...")
self.driver.get(TWITTER_LOGIN_URL)
self.driver.maximize_window()
sleep(3)
self._input_username()
self._input_unusual_activity()
self._input_password()
print("Login Successful")
print()
pass
def _input_username(self):
input_attempt = 0
while True:
try:
username = self.driver.find_element(
"xpath", "//input[@autocomplete='username']"
)
username.send_keys(self.username)
username.send_keys(Keys.RETURN)
sleep(3)
break
except NoSuchElementException:
input_attempt += 1
if input_attempt >= 3:
print()
print(
"""There was an error inputting the username.
It may be due to the following:
- Internet connection is unstable
- Username is incorrect
- Twitter is experiencing unusual activity"""
)
self.driver.quit()
sys.exit(1)
else:
print("Re-attempting to input username...")
sleep(2)
def _input_unusual_activity(self):
input_attempt = 0
while True:
try:
unusual_activity = self.driver.find_element(
"xpath", "//input[@data-testid='ocfEnterTextTextInput']"
)
unusual_activity.send_keys(self.username)
unusual_activity.send_keys(Keys.RETURN)
sleep(3)
break
except NoSuchElementException:
input_attempt += 1
if input_attempt >= 3:
break
def _input_password(self):
input_attempt = 0
while True:
try:
password = self.driver.find_element(
"xpath", "//input[@autocomplete='current-password']"
)
password.send_keys(self.password)
password.send_keys(Keys.RETURN)
sleep(3)
break
except NoSuchElementException:
input_attempt += 1
if input_attempt >= 3:
print()
print(
"""There was an error inputting the password.
It may be due to the following:
- Internet connection is unstable
- Password is incorrect
- Twitter is experiencing unusual activity"""
)
self.driver.quit()
sys.exit(1)
else:
print("Re-attempting to input password...")
sleep(2)
def go_to_home(self):
self.driver.get("https://twitter.com/home")
sleep(3)
pass
def go_to_profile(self):
self.driver.get(f"https://twitter.com/{self.scraper_details['username']}")
sleep(3)
pass
def go_to_hashtag(self):
url = f"https://twitter.com/hashtag/{self.scraper_details['hashtag']}?src=hashtag_click"
if self.scraper_details["tab"] == "Latest":
url += "&f=live"
self.driver.get(url)
sleep(3)
pass
def go_to_search(self):
url = f"https://twitter.com/search?q={self.scraper_details['query']}&src=typed_query"
if self.scraper_details["tab"] == "Latest":
url += "&f=live"
self.driver.get(url)
sleep(3)
pass
def get_tweets(self):
self.tweet_cards = self.driver.find_elements(
"xpath", '//article[@data-testid="tweet"]'
)
pass
def scrape_tweets(self, router=None):
if router is None:
router = self.router
router()
if self.scraper_details["type"] == "Username":
print(
"Scraping Tweets from @{}...".format(self.scraper_details["username"])
)
elif self.scraper_details["type"] == "Hashtag":
print(
"Scraping {} Tweets from #{}...".format(
self.scraper_details["tab"], self.scraper_details["hashtag"]
)
)
elif self.scraper_details["type"] == "Query":
print(
"Scraping {} Tweets from {} search...".format(
self.scraper_details["tab"], self.scraper_details["query"]
)
)
elif self.scraper_details["type"] == "Home":
print("Scraping Tweets from Home...")
self.progress.print_progress(0)
refresh_count = 0
added_tweets = 0
while self.scroller.scrolling:
try:
self.get_tweets()
added_tweets = 0
for card in self.tweet_cards[-15:]:
tweet = Tweet(card)
try:
tweet_id = f"{tweet.user}{tweet.handle}{tweet.date_time}"
except Exception as e:
continue
if tweet_id not in self.tweet_ids:
self.tweet_ids.add(tweet_id)
if tweet:
if not tweet.is_ad:
self.data.append(tweet.tweet)
added_tweets += 1
self.progress.print_progress(len(self.data))
if len(self.data) >= self.max_tweets:
self.scroller.scrolling = False
break
if len(self.data) % 50 == 0:
sleep(2)
if len(self.data) >= self.max_tweets:
break
if added_tweets == 0:
refresh_count += 1
if refresh_count >= 10:
print()
print("No more tweets to scrape")
break
else:
refresh_count = 0
self.scroller.scroll_count = 0
while True:
self.scroller.scroll_to_bottom()
sleep(2)
self.scroller.update_scroll_position()
if self.scroller.last_position == self.scroller.current_position:
self.scroller.scroll_count += 1
if self.scroller.scroll_count >= 3:
router()
sleep(2)
break
else:
sleep(1)
else:
self.scroller.last_position = self.scroller.current_position
break
except StaleElementReferenceException:
router()
sleep(2)
except Exception as e:
print("\n")
print(f"Error scraping tweets: {e}")
break
print("")
if len(self.data) >= self.max_tweets:
print("Scraping Complete")
else:
print("Scraping Incomplete")
print("Tweets: {} out of {}\n".format(len(self.data), self.max_tweets))
pass
def save_to_csv(self):
print("Saving Tweets to CSV...")
now = datetime.now()
folder_path = "./tweets/"
if not os.path.exists(folder_path):
os.makedirs(folder_path)
print("Created Folder: {}".format(folder_path))
data = {
"Name": [tweet[0] for tweet in self.data],
"Handle": [tweet[1] for tweet in self.data],
"Timestamp": [tweet[2] for tweet in self.data],
"Verified": [tweet[3] for tweet in self.data],
"Content": [tweet[4] for tweet in self.data],
"Comments": [tweet[5] for tweet in self.data],
"Retweets": [tweet[6] for tweet in self.data],
"Likes": [tweet[7] for tweet in self.data],
"Analytics": [tweet[8] for tweet in self.data],
"Tags": [tweet[9] for tweet in self.data],
"Profile Image": [tweet[10] for tweet in self.data],
}
df = pd.DataFrame(data)
current_time = now.strftime("%Y-%m-%d_%H-%M-%S")
file_path = f"{folder_path}{current_time}_tweets_1-{len(self.data)}.csv"
df.to_csv(file_path, index=False)
print("CSV Saved: {}".format(file_path))
pass