From b43fb72dbdc268bd53441c7aac675c6231f49c7e Mon Sep 17 00:00:00 2001 From: Jarrian Date: Sat, 9 Sep 2023 14:19:13 +0800 Subject: [PATCH] add error handling and script runs until unexpected error or max reached --- scraper/__main__.py | 59 ++++++++++++++--------- scraper/progress.py | 13 ++++-- scraper/scroller.py | 53 +++++++++++---------- scraper/tweet.py | 49 ++++++++------------ scraper/twitter_scraper.py | 95 ++++++++++++++++++++------------------ 5 files changed, 142 insertions(+), 127 deletions(-) diff --git a/scraper/__main__.py b/scraper/__main__.py index 6dc9934..f82b40d 100644 --- a/scraper/__main__.py +++ b/scraper/__main__.py @@ -1,6 +1,7 @@ import os import sys import argparse +import getpass from twitter_scraper import Twitter_Scraper try: @@ -16,37 +17,49 @@ except Exception as e: def main(): try: - USER_UNAME = os.getenv('TWITTER_USERNAME') - USER_PASSWORD = os.getenv('TWITTER_PASSWORD') - except Exception as e: - print(f"Error retrieving environment variables: {e}") - USER_UNAME = None - USER_PASSWORD = None - sys.exit(1) - - parser = argparse.ArgumentParser(description='Twitter Scraper') - parser.add_argument('--tweets', type=int, default=50, - help='Number of tweets to scrape (default: 50)') - args = parser.parse_args() - - if USER_UNAME is not None and USER_PASSWORD is not None: try: + USER_UNAME = os.getenv("TWITTER_USERNAME") + USER_PASSWORD = os.getenv("TWITTER_PASSWORD") + except Exception as e: + print(f"Error retrieving environment variables: {e}") + USER_UNAME = None + USER_PASSWORD = None + sys.exit(1) + + if USER_UNAME is None: + USER_UNAME = input("Twitter Username: ") + + if USER_PASSWORD is None: + USER_PASSWORD = getpass.getpass("Enter Password: ") + + print() + + parser = argparse.ArgumentParser(description="Twitter Scraper") + parser.add_argument( + "--tweets", + type=int, + default=50, + help="Number of tweets to scrape (default: 50)", + ) + args = parser.parse_args() + + if USER_UNAME is not None and USER_PASSWORD is not None: scraper = Twitter_Scraper( - username=USER_UNAME, - password=USER_PASSWORD, - max_tweets=args.tweets + username=USER_UNAME, password=USER_PASSWORD, max_tweets=args.tweets ) scraper.scrape_tweets() - scraper.driver.close() scraper.save_to_csv() - except KeyboardInterrupt: - print("\nScript Interrupted by user. Exiting...") + scraper.driver.close() + else: + print( + "Missing Twitter username or password environment variables. Please check your .env file." + ) sys.exit(1) - else: - print("Missing Twitter username or password environment variables. Please check your .env file.") + except KeyboardInterrupt: + print("\nScript Interrupted by user. Exiting...") sys.exit(1) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/scraper/progress.py b/scraper/progress.py index 5501d62..9af9bd4 100644 --- a/scraper/progress.py +++ b/scraper/progress.py @@ -11,8 +11,15 @@ class Progress: self.current = current progress = current / self.total bar_length = 40 - progress_bar = "[" + "=" * int(bar_length * progress) + \ - "-" * (bar_length - int(bar_length * progress)) + "]" + progress_bar = ( + "[" + + "=" * int(bar_length * progress) + + "-" * (bar_length - int(bar_length * progress)) + + "]" + ) sys.stdout.write( - "\rProgress: [{:<40}] {:.2%} {} of {}".format(progress_bar, progress, current, self.total)) + "\rProgress: [{:<40}] {:.2%} {} of {}".format( + progress_bar, progress, current, self.total + ) + ) sys.stdout.flush() diff --git a/scraper/scroller.py b/scraper/scroller.py index a8cd845..24ea636 100644 --- a/scraper/scroller.py +++ b/scraper/scroller.py @@ -1,27 +1,26 @@ -class Scroller(): - def __init__(self, driver) -> None: - self.driver = driver - self.current_position = 0 - self.last_position = driver.execute_script("return window.pageYOffset;") - self.scrolling = True - self.scroll_count = 0 - pass - - def reset(self) -> None: - self.current_position = 0 - self.last_position = self.driver.execute_script("return window.pageYOffset;") - self.scroll_count = 0 - pass - - def scroll_to_top(self) -> None: - self.driver.execute_script("window.scrollTo(0, 0);") - pass - - def scroll_to_bottom(self) -> None: - self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") - pass - - def update_scroll_position(self) -> None: - self.current_position = self.driver.execute_script("return window.pageYOffset;") - pass - \ No newline at end of file +class Scroller: + def __init__(self, driver) -> None: + self.driver = driver + self.current_position = 0 + self.last_position = driver.execute_script("return window.pageYOffset;") + self.scrolling = True + self.scroll_count = 0 + pass + + def reset(self) -> None: + self.current_position = 0 + self.last_position = self.driver.execute_script("return window.pageYOffset;") + self.scroll_count = 0 + pass + + def scroll_to_top(self) -> None: + self.driver.execute_script("window.scrollTo(0, 0);") + pass + + def scroll_to_bottom(self) -> None: + self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") + pass + + def update_scroll_position(self) -> None: + self.current_position = self.driver.execute_script("return window.pageYOffset;") + pass diff --git a/scraper/tweet.py b/scraper/tweet.py index fe0bcfa..3cb1c44 100644 --- a/scraper/tweet.py +++ b/scraper/tweet.py @@ -7,23 +7,20 @@ class Tweet: self.card = card self.user = card.find_element( - 'xpath', - './/div[@data-testid="User-Name"]//span' + "xpath", './/div[@data-testid="User-Name"]//span' ).text try: self.handle = card.find_element( - 'xpath', - './/span[contains(text(), "@")]' + "xpath", './/span[contains(text(), "@")]' ).text except NoSuchElementException: return try: - self.date_time = card.find_element( - 'xpath', - './/time' - ).get_attribute('datetime') + self.date_time = card.find_element("xpath", ".//time").get_attribute( + "datetime" + ) if self.date_time is not None: self.is_ad = False @@ -33,8 +30,7 @@ class Tweet: try: card.find_element( - 'xpath', - './/*[local-name()="svg" and @data-testid="icon-verified"]' + "xpath", './/*[local-name()="svg" and @data-testid="icon-verified"]' ) self.verified = True @@ -43,8 +39,8 @@ class Tweet: self.content = "" contents = card.find_elements( - 'xpath', - './/div[@data-testid="tweetText"]/span | .//div[@data-testid="tweetText"]/a' + "xpath", + './/div[@data-testid="tweetText"]/span | .//div[@data-testid="tweetText"]/a', ) for index, content in enumerate(contents): @@ -52,43 +48,38 @@ class Tweet: try: self.reply_cnt = card.find_element( - 'xpath', - './/div[@data-testid="reply"]//span' + "xpath", './/div[@data-testid="reply"]//span' ).text except NoSuchElementException: - self.reply_cnt = '0' + self.reply_cnt = "0" try: self.retweet_cnt = card.find_element( - 'xpath', - './/div[@data-testid="retweet"]//span' + "xpath", './/div[@data-testid="retweet"]//span' ).text except NoSuchElementException: - self.retweet_cnt = '0' + self.retweet_cnt = "0" try: self.like_cnt = card.find_element( - 'xpath', - './/div[@data-testid="like"]//span' + "xpath", './/div[@data-testid="like"]//span' ).text except NoSuchElementException: - self.like_cnt = '0' + self.like_cnt = "0" try: self.analytics_cnt = card.find_element( - 'xpath', - './/a[contains(@href, "/analytics")]//span' + "xpath", './/a[contains(@href, "/analytics")]//span' ).text except NoSuchElementException: - self.analytics_cnt = '0' + self.analytics_cnt = "0" try: self.profile_img = card.find_element( - 'xpath', - './/div[@data-testid="Tweet-User-Avatar"]//img' - ).get_attribute('src') + "xpath", './/div[@data-testid="Tweet-User-Avatar"]//img' + ).get_attribute("src") except NoSuchElementException: - self.profile_img = '' + self.profile_img = "" self.tweet = ( self.user, @@ -100,7 +91,7 @@ class Tweet: self.retweet_cnt, self.like_cnt, self.analytics_cnt, - self.profile_img + self.profile_img, ) pass diff --git a/scraper/twitter_scraper.py b/scraper/twitter_scraper.py index b3df4c9..3543da6 100644 --- a/scraper/twitter_scraper.py +++ b/scraper/twitter_scraper.py @@ -10,7 +10,11 @@ from fake_headers import Headers from time import sleep from selenium import webdriver from selenium.webdriver.common.keys import Keys -from selenium.common.exceptions import NoSuchElementException, StaleElementReferenceException, WebDriverException +from selenium.common.exceptions import ( + NoSuchElementException, + StaleElementReferenceException, + WebDriverException, +) from selenium.webdriver.chrome.options import Options as ChromeOptions from selenium.webdriver.chrome.service import Service as ChromeService @@ -20,7 +24,7 @@ from webdriver_manager.chrome import ChromeDriverManager TWITTER_LOGIN_URL = "https://twitter.com/i/flow/login" -class Twitter_Scraper(): +class Twitter_Scraper: def __init__(self, username, password, max_tweets=50): print("Initializing Twitter Scraper...") self.username = username @@ -36,17 +40,17 @@ class Twitter_Scraper(): def _get_driver(self): print("Setup WebDriver...") - header = Headers().generate()['User-Agent'] + header = Headers().generate()["User-Agent"] browser_option = ChromeOptions() - browser_option.add_argument('--no-sandbox') + browser_option.add_argument("--no-sandbox") browser_option.add_argument("--disable-dev-shm-usage") - browser_option.add_argument('--ignore-certificate-errors') - browser_option.add_argument('--disable-gpu') - browser_option.add_argument('--log-level=3') - browser_option.add_argument('--disable-notifications') - browser_option.add_argument('--disable-popup-blocking') - browser_option.add_argument('--user-agent={}'.format(header)) + browser_option.add_argument("--ignore-certificate-errors") + browser_option.add_argument("--disable-gpu") + browser_option.add_argument("--log-level=3") + browser_option.add_argument("--disable-notifications") + browser_option.add_argument("--disable-popup-blocking") + browser_option.add_argument("--user-agent={}".format(header)) # For Hiding Browser browser_option.add_argument("--headless") @@ -62,8 +66,7 @@ class Twitter_Scraper(): try: print("Downloading ChromeDriver...") chromedriver_path = ChromeDriverManager().install() - chrome_service = ChromeService( - executable_path=chromedriver_path) + chrome_service = ChromeService(executable_path=chromedriver_path) print("Initializing ChromeDriver...") driver = webdriver.Chrome( @@ -96,8 +99,7 @@ class Twitter_Scraper(): while True: try: username = self.driver.find_element( - "xpath", - "//input[@autocomplete='username']" + "xpath", "//input[@autocomplete='username']" ) username.send_keys(self.username) @@ -108,18 +110,19 @@ class Twitter_Scraper(): input_attempt += 1 if input_attempt >= 3: print() - print(""" -There was an error inputting the username. + print( + """There was an error inputting the username. It may be due to the following: - Internet connection is unstable - Username is incorrect -- Twitter is experiencing unusual activity - """) +- Twitter is experiencing unusual activity""" + ) self.driver.quit() sys.exit(1) else: print("Re-attempting to input username...") + sleep(2) def _input_unusual_activity(self): input_attempt = 0 @@ -127,8 +130,7 @@ It may be due to the following: while True: try: unusual_activity = self.driver.find_element( - "xpath", - "//input[@data-testid='ocfEnterTextTextInput']" + "xpath", "//input[@data-testid='ocfEnterTextTextInput']" ) unusual_activity.send_keys(self.username) unusual_activity.send_keys(Keys.RETURN) @@ -145,8 +147,7 @@ It may be due to the following: while True: try: password = self.driver.find_element( - "xpath", - "//input[@autocomplete='current-password']" + "xpath", "//input[@autocomplete='current-password']" ) password.send_keys(self.password) @@ -157,18 +158,19 @@ It may be due to the following: input_attempt += 1 if input_attempt >= 3: print() - print(""" -There was an error inputting the password. + print( + """There was an error inputting the password. It may be due to the following: - Internet connection is unstable - Password is incorrect -- Twitter is experiencing unusual activity - """) +- Twitter is experiencing unusual activity""" + ) self.driver.quit() sys.exit(1) else: print("Re-attempting to input password...") + sleep(2) def go_to_home(self): self.driver.get("https://twitter.com/home") @@ -177,8 +179,7 @@ It may be due to the following: def get_tweets(self): self.tweet_cards = self.driver.find_elements( - 'xpath', - '//article[@data-testid="tweet"]' + "xpath", '//article[@data-testid="tweet"]' ) pass @@ -191,8 +192,8 @@ It may be due to the following: print("Scraping Tweets...") self.progress.print_progress(0) - try: - while self.scroller.scrolling: + while self.scroller.scrolling: + try: self.get_tweets() for card in self.tweet_cards[-15:]: @@ -235,11 +236,15 @@ It may be due to the following: else: self.scroller.last_position = self.scroller.current_position break + except StaleElementReferenceException: + callback() + sleep(2) - print("\n") + print("\n") + + if len(self.data) >= self.max_tweets: print("Scraping Complete") - except StaleElementReferenceException: - print("\n") + else: print("Scraping Incomplete") print("Tweets: {} out of {}\n".format(len(self.data), self.max_tweets)) @@ -249,29 +254,29 @@ It may be due to the following: def save_to_csv(self): print("Saving Tweets to CSV...") now = datetime.now() - folder_path = './tweets/' + folder_path = "./tweets/" if not os.path.exists(folder_path): os.makedirs(folder_path) print("Created Folder: {}".format(folder_path)) data = { - 'Name': [tweet[0] for tweet in self.data], - 'Handle': [tweet[1] for tweet in self.data], - 'Timestamp': [tweet[2] for tweet in self.data], - 'Verified': [tweet[3] for tweet in self.data], - 'Content': [tweet[4] for tweet in self.data], - 'Comments': [tweet[5] for tweet in self.data], - 'Retweets': [tweet[6] for tweet in self.data], - 'Likes': [tweet[7] for tweet in self.data], - 'Analytics': [tweet[8] for tweet in self.data], - 'Profile Image': [tweet[9] for tweet in self.data], + "Name": [tweet[0] for tweet in self.data], + "Handle": [tweet[1] for tweet in self.data], + "Timestamp": [tweet[2] for tweet in self.data], + "Verified": [tweet[3] for tweet in self.data], + "Content": [tweet[4] for tweet in self.data], + "Comments": [tweet[5] for tweet in self.data], + "Retweets": [tweet[6] for tweet in self.data], + "Likes": [tweet[7] for tweet in self.data], + "Analytics": [tweet[8] for tweet in self.data], + "Profile Image": [tweet[9] for tweet in self.data], } df = pd.DataFrame(data) current_time = now.strftime("%Y-%m-%d_%H-%M-%S") - file_path = f'{folder_path}{current_time}_tweets_1-{len(self.data)}.csv' + file_path = f"{folder_path}{current_time}_tweets_1-{len(self.data)}.csv" df.to_csv(file_path, index=False) print("CSV Saved: {}".format(file_path))