From 783294a1edce34d2e2bd77b8f44408fa0375cf82 Mon Sep 17 00:00:00 2001 From: Jarrian Date: Sat, 9 Sep 2023 12:23:18 +0800 Subject: [PATCH] twitter scraper script --- README.md | 24 +++- main.ipynb | 26 ++-- requirements.txt | 5 + scraper/__init__.py | 0 scraper/__main__.py | 52 ++++++++ scraper/progress.py | 18 +++ scraper/scroller.py | 27 ++++ scraper/tweet.py | 87 ++++++++++++ scraper/twitter_scraper.py | 267 +++++++++++++++++++++++++++++++++++++ 9 files changed, 492 insertions(+), 14 deletions(-) create mode 100644 requirements.txt create mode 100644 scraper/__init__.py create mode 100644 scraper/__main__.py create mode 100644 scraper/progress.py create mode 100644 scraper/scroller.py create mode 100644 scraper/tweet.py create mode 100644 scraper/twitter_scraper.py diff --git a/README.md b/README.md index ddfe5f8..5362cbc 100644 --- a/README.md +++ b/README.md @@ -1 +1,23 @@ -# selenium-twitter-scraper \ No newline at end of file +# selenium-twitter-scraper + +## Setup + +```bash +pip install -r requirements.txt +``` + +## Usage + +```bash +python scraper +``` + +### Arguments + +```bash +usage: python scraper [arg] + +Arguments Description +--tweets : No. of tweets. 50 default. + e.g. --tweets=500 +``` diff --git a/main.ipynb b/main.ipynb index ddb0118..cbc09a7 100644 --- a/main.ipynb +++ b/main.ipynb @@ -2,7 +2,7 @@ "cells": [ { "cell_type": "code", - "execution_count": 10, + "execution_count": 46, "metadata": {}, "outputs": [], "source": [ @@ -34,7 +34,7 @@ }, { "cell_type": "code", - "execution_count": 11, + "execution_count": 47, "metadata": {}, "outputs": [], "source": [ @@ -57,7 +57,7 @@ }, { "cell_type": "code", - "execution_count": 12, + "execution_count": 48, "metadata": {}, "outputs": [], "source": [ @@ -79,7 +79,7 @@ }, { "cell_type": "code", - "execution_count": 13, + "execution_count": 49, "metadata": {}, "outputs": [], "source": [ @@ -193,7 +193,7 @@ }, { "cell_type": "code", - "execution_count": 14, + "execution_count": 50, "metadata": {}, "outputs": [], "source": [ @@ -286,16 +286,16 @@ }, { "cell_type": "code", - "execution_count": 15, + "execution_count": 51, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - "Progress: [[=======---------------------------------]] 18.00% 9 of 50\n", - "Scraping Incomplete\n", - "Tweets: 9 out of 50\n" + "Progress: [[========================================]] 100.00% 50 of 50\n", + "Scraping Complete\n", + "Tweets: 50 out of 50\n" ] } ], @@ -371,7 +371,7 @@ }, { "cell_type": "code", - "execution_count": 16, + "execution_count": 52, "metadata": {}, "outputs": [], "source": [ @@ -397,7 +397,7 @@ }, { "cell_type": "code", - "execution_count": 17, + "execution_count": 53, "metadata": {}, "outputs": [], "source": [ @@ -413,7 +413,7 @@ }, { "cell_type": "code", - "execution_count": 18, + "execution_count": 54, "metadata": {}, "outputs": [], "source": [ @@ -430,7 +430,7 @@ "\n", "df = pd.DataFrame(data)\n", "\n", - "current_time = datetime.now().strftime(\"%Y-%m-%d-%H-%M\")\n", + "current_time = datetime.now().strftime(\"%Y-%m-%d_%H-%M-%S\")\n", "\n", "file_path = f'{folder_path}{current_time}_tweets_1-{len(scraper.data)}.csv'\n", "df.to_csv(file_path, index=False)\n" diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..25fc613 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +fake_headers==1.0.2 +pandas==2.0.3 +python-dotenv==1.0.0 +selenium==4.12.0 +webdriver_manager==4.0.0 diff --git a/scraper/__init__.py b/scraper/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scraper/__main__.py b/scraper/__main__.py new file mode 100644 index 0000000..6dc9934 --- /dev/null +++ b/scraper/__main__.py @@ -0,0 +1,52 @@ +import os +import sys +import argparse +from twitter_scraper import Twitter_Scraper + +try: + from dotenv import load_dotenv + + print("Loading .env file") + load_dotenv() + print("Loaded .env file\n") +except Exception as e: + print(f"Error loading .env file: {e}") + sys.exit(1) + + +def main(): + try: + USER_UNAME = os.getenv('TWITTER_USERNAME') + USER_PASSWORD = os.getenv('TWITTER_PASSWORD') + except Exception as e: + print(f"Error retrieving environment variables: {e}") + USER_UNAME = None + USER_PASSWORD = None + sys.exit(1) + + parser = argparse.ArgumentParser(description='Twitter Scraper') + parser.add_argument('--tweets', type=int, default=50, + help='Number of tweets to scrape (default: 50)') + args = parser.parse_args() + + if USER_UNAME is not None and USER_PASSWORD is not None: + try: + scraper = Twitter_Scraper( + username=USER_UNAME, + password=USER_PASSWORD, + max_tweets=args.tweets + ) + + scraper.scrape_tweets() + scraper.driver.close() + scraper.save_to_csv() + except KeyboardInterrupt: + print("\nScript Interrupted by user. Exiting...") + sys.exit(1) + else: + print("Missing Twitter username or password environment variables. Please check your .env file.") + sys.exit(1) + + +if __name__ == '__main__': + main() diff --git a/scraper/progress.py b/scraper/progress.py new file mode 100644 index 0000000..5501d62 --- /dev/null +++ b/scraper/progress.py @@ -0,0 +1,18 @@ +import sys + + +class Progress: + def __init__(self, current, total) -> None: + self.current = current + self.total = total + pass + + def print_progress(self, current) -> None: + self.current = current + progress = current / self.total + bar_length = 40 + progress_bar = "[" + "=" * int(bar_length * progress) + \ + "-" * (bar_length - int(bar_length * progress)) + "]" + sys.stdout.write( + "\rProgress: [{:<40}] {:.2%} {} of {}".format(progress_bar, progress, current, self.total)) + sys.stdout.flush() diff --git a/scraper/scroller.py b/scraper/scroller.py new file mode 100644 index 0000000..a8cd845 --- /dev/null +++ b/scraper/scroller.py @@ -0,0 +1,27 @@ +class Scroller(): + def __init__(self, driver) -> None: + self.driver = driver + self.current_position = 0 + self.last_position = driver.execute_script("return window.pageYOffset;") + self.scrolling = True + self.scroll_count = 0 + pass + + def reset(self) -> None: + self.current_position = 0 + self.last_position = self.driver.execute_script("return window.pageYOffset;") + self.scroll_count = 0 + pass + + def scroll_to_top(self) -> None: + self.driver.execute_script("window.scrollTo(0, 0);") + pass + + def scroll_to_bottom(self) -> None: + self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") + pass + + def update_scroll_position(self) -> None: + self.current_position = self.driver.execute_script("return window.pageYOffset;") + pass + \ No newline at end of file diff --git a/scraper/tweet.py b/scraper/tweet.py new file mode 100644 index 0000000..9ac7d9d --- /dev/null +++ b/scraper/tweet.py @@ -0,0 +1,87 @@ +from selenium.webdriver import Chrome +from selenium.common.exceptions import NoSuchElementException + +class Tweet: + def __init__(self, card: Chrome) -> None: + self.card = card + + self.user = card.find_element( + 'xpath', + './/div[@data-testid="User-Name"]//span' + ).text + + try: + self.handle = card.find_element( + 'xpath', + './/span[contains(text(), "@")]' + ).text + except NoSuchElementException: + return + + try: + self.date_time = card.find_element( + 'xpath', + './/time' + ).get_attribute('datetime') + + if self.date_time is not None: + self.is_ad = False + except NoSuchElementException: + self.is_ad = True + return + + try: + card.find_element( + 'xpath', + './/*[local-name()="svg" and @data-testid="icon-verified"]' + ) + + self.verified = True + except NoSuchElementException: + self.verified = False + + self.content = "" + contents = card.find_elements( + 'xpath', + './/div[@data-testid="tweetText"]/span | .//div[@data-testid="tweetText"]/a' + ) + + for index, content in enumerate(contents): + self.content += content.text + + try: + self.reply_cnt = card.find_element( + 'xpath', + './/div[@data-testid="reply"]//span' + ).text + except NoSuchElementException: + self.reply_cnt = '0' + + try: + self.retweet_cnt = card.find_element( + 'xpath', + './/div[@data-testid="retweet"]//span' + ).text + except NoSuchElementException: + self.retweet_cnt = '0' + + try: + self.like_cnt = card.find_element( + 'xpath', + './/div[@data-testid="like"]//span' + ).text + except NoSuchElementException: + self.like_cnt = '0' + + self.tweet = ( + self.user, + self.handle, + self.date_time, + self.verified, + self.content, + self.reply_cnt, + self.retweet_cnt, + self.like_cnt + ) + + pass diff --git a/scraper/twitter_scraper.py b/scraper/twitter_scraper.py new file mode 100644 index 0000000..cc50efc --- /dev/null +++ b/scraper/twitter_scraper.py @@ -0,0 +1,267 @@ +import os +import sys +import pandas as pd +from progress import Progress +from scroller import Scroller +from tweet import Tweet + +from datetime import datetime +from fake_headers import Headers +from time import sleep +from selenium import webdriver +from selenium.webdriver.common.keys import Keys +from selenium.common.exceptions import NoSuchElementException, StaleElementReferenceException, WebDriverException + +from selenium.webdriver.chrome.options import Options as ChromeOptions +from selenium.webdriver.chrome.service import Service as ChromeService + +from webdriver_manager.chrome import ChromeDriverManager + +TWITTER_LOGIN_URL = "https://twitter.com/i/flow/login" + + +class Twitter_Scraper(): + def __init__(self, username, password, max_tweets=50): + print("Initializing Twitter Scraper...") + self.username = username + self.password = password + self.data = [] + self.tweet_ids = set() + self.max_tweets = max_tweets + self.progress = Progress(0, max_tweets) + self.tweet_cards = [] + self.driver = self._get_driver() + self.scroller = Scroller(self.driver) + self._login() + + def _get_driver(self): + print("Setup WebDriver...") + header = Headers().generate()['User-Agent'] + + browser_option = ChromeOptions() + browser_option.add_argument('--no-sandbox') + browser_option.add_argument("--disable-dev-shm-usage") + browser_option.add_argument('--ignore-certificate-errors') + browser_option.add_argument('--disable-gpu') + browser_option.add_argument('--log-level=3') + browser_option.add_argument('--disable-notifications') + browser_option.add_argument('--disable-popup-blocking') + browser_option.add_argument('--user-agent={}'.format(header)) + + # For Hiding Browser + browser_option.add_argument("--headless") + + try: + print("Downloading ChromeDriver...") + chromedriver_path = ChromeDriverManager().install() + chrome_service = ChromeService(executable_path=chromedriver_path) + + print("Initializing ChromeDriver...") + driver = webdriver.Chrome( + service=chrome_service, + options=browser_option, + ) + + return driver + except Exception as e: + print(f"Error setting up WebDriver: {e}") + sys.exit(1) + + def _login(self): + print("Logging in to Twitter...") + self.driver.get(TWITTER_LOGIN_URL) + self.driver.maximize_window() + sleep(3) + + self._input_username() + self._input_unusual_activity() + self._input_password() + pass + + def _input_username(self): + input_attempt = 0 + + while True: + try: + username = self.driver.find_element( + "xpath", + "//input[@autocomplete='username']" + ) + + username.send_keys(self.username) + username.send_keys(Keys.RETURN) + sleep(3) + break + except NoSuchElementException: + input_attempt += 1 + if input_attempt >= 3: + print() + print("""" +There was an error inputting the username. + +It may be due to the following: +- Internet connection is unstable +- Username is incorrect +- Twitter is experiencing unusual activity + """) + self.driver.quit() + exit() + else: + print("Re-attempting to input username...") + + def _input_unusual_activity(self): + input_attempt = 0 + + while True: + try: + unusual_activity = self.driver.find_element( + "xpath", + "//input[@data-testid='ocfEnterTextTextInput']" + ) + unusual_activity.send_keys(self.username) + unusual_activity.send_keys(Keys.RETURN) + sleep(3) + break + except NoSuchElementException: + input_attempt += 1 + if input_attempt >= 3: + break + + def _input_password(self): + input_attempt = 0 + + while True: + try: + password = self.driver.find_element( + "xpath", + "//input[@autocomplete='current-password']" + ) + + password.send_keys(self.password) + password.send_keys(Keys.RETURN) + sleep(3) + break + except NoSuchElementException: + input_attempt += 1 + if input_attempt >= 3: + print() + print(""" +There was an error inputting the password. + +It may be due to the following: +- Internet connection is unstable +- Password is incorrect +- Twitter is experiencing unusual activity + """) + self.driver.quit() + exit() + else: + print("Re-attempting to input password...") + + def go_to_home(self): + self.driver.get("https://twitter.com/home") + sleep(3) + pass + + def get_tweets(self): + self.tweet_cards = self.driver.find_elements( + 'xpath', + '//article[@data-testid="tweet"]' + ) + pass + + def scrape_tweets(self, callback=None): + if callback is None: + callback = self.go_to_home + + callback() + + print("Scraping Tweets...") + print() + + self.progress.print_progress(0) + + try: + while self.scroller.scrolling: + self.get_tweets() + + for card in self.tweet_cards[-15:]: + tweet_id = str(card) + if tweet_id not in self.tweet_ids: + self.tweet_ids.add(tweet_id) + tweet = Tweet(card) + if tweet: + if not tweet.is_ad: + self.data.append(tweet.tweet) + self.progress.print_progress(len(self.data)) + + if len(self.data) >= self.max_tweets: + self.scroller.scrolling = False + break + + if len(self.data) % 50 == 0: + sleep(2) + + if len(self.data) >= self.max_tweets: + break + + self.scroller.scroll_count = 0 + + while True: + self.scroller.scroll_to_bottom() + sleep(2) + self.scroller.update_scroll_position() + + if self.scroller.last_position == self.scroller.current_position: + self.scroller.scroll_count += 1 + + if self.scroller.scroll_count >= 3: + callback() + sleep(2) + self.scroller.reset() + break + else: + sleep(2) + else: + self.scroller.last_position = self.scroller.current_position + break + + print("\n") + print("Scraping Complete") + except StaleElementReferenceException: + print("\n") + print("Scraping Incomplete") + + print("Tweets: {} out of {}\n".format(len(self.data), self.max_tweets)) + + pass + + def save_to_csv(self): + print("Saving Tweets to CSV...") + now = datetime.now() + folder_path = './tweets/' + + if not os.path.exists(folder_path): + os.makedirs(folder_path) + print("Created Folder: {}".format(folder_path)) + + data = { + 'Name': [tweet[0] for tweet in self.data], + 'Handle': [tweet[1] for tweet in self.data], + 'Timestamp': [tweet[2] for tweet in self.data], + 'Verified': [tweet[3] for tweet in self.data], + 'Content': [tweet[4] for tweet in self.data], + 'Comments': [tweet[5] for tweet in self.data], + 'Retweets': [tweet[6] for tweet in self.data], + 'Likes': [tweet[7] for tweet in self.data] + } + + df = pd.DataFrame(data) + + current_time = now.strftime("%Y-%m-%d_%H-%M-%S") + file_path = f'{folder_path}{current_time}_tweets_1-{len(self.data)}.csv' + df.to_csv(file_path, index=False) + + print("CSV Saved: {}".format(file_path)) + + pass