import os import sys import pandas as pd from progress import Progress from scroller import Scroller from tweet import Tweet from datetime import datetime from fake_headers import Headers from time import sleep from selenium import webdriver from selenium.webdriver.common.keys import Keys from selenium.common.exceptions import ( NoSuchElementException, StaleElementReferenceException, WebDriverException, ) from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.chrome.options import Options as ChromeOptions from selenium.webdriver.chrome.service import Service as ChromeService from selenium.webdriver.firefox.options import Options as FirefoxOptions from selenium.webdriver.firefox.service import Service as FirefoxService from selenium.webdriver.support.ui import WebDriverWait from webdriver_manager.chrome import ChromeDriverManager from webdriver_manager.firefox import GeckoDriverManager TWITTER_LOGIN_URL = "https://twitter.com/i/flow/login" class Twitter_Scraper: def __init__( self, mail, username, password, headlessState, max_tweets=50, scrape_username=None, scrape_hashtag=None, scrape_query=None, scrape_bookmarks=False, scrape_poster_details=False, scrape_latest=True, scrape_top=False, proxy=None, ): print("Initializing Twitter Scraper...") self.mail = mail self.username = username self.password = password self.headlessState = headlessState self.interrupted = False self.tweet_ids = set() self.data = [] self.tweet_cards = [] self.scraper_details = { "type": None, "username": None, "hashtag": None, "bookmarks": False, "query": None, "tab": None, "poster_details": False, } self.max_tweets = max_tweets self.progress = Progress(0, max_tweets) self.router = self.go_to_home self.driver = self._get_driver(proxy) self.actions = ActionChains(self.driver) self.scroller = Scroller(self.driver) self._config_scraper( max_tweets, scrape_username, scrape_hashtag, scrape_bookmarks, scrape_query, scrape_latest, scrape_top, scrape_poster_details, ) def _config_scraper( self, max_tweets=50, scrape_username=None, scrape_hashtag=None, scrape_bookmarks=False, scrape_query=None, scrape_list=None, scrape_latest=True, scrape_top=False, scrape_poster_details=False, ): self.tweet_ids = set() self.data = [] self.tweet_cards = [] self.max_tweets = max_tweets self.progress = Progress(0, max_tweets) self.scraper_details = { "type": None, "username": scrape_username, "hashtag": str(scrape_hashtag).replace("#", "") if scrape_hashtag is not None else None, "bookmarks": scrape_bookmarks, "query": scrape_query, "list": scrape_list, "tab": "Latest" if scrape_latest else "Top" if scrape_top else "Latest", "poster_details": scrape_poster_details, } self.router = self.go_to_home self.scroller = Scroller(self.driver) if scrape_username is not None: self.scraper_details["type"] = "Username" self.router = self.go_to_profile elif scrape_hashtag is not None: self.scraper_details["type"] = "Hashtag" self.router = self.go_to_hashtag elif scrape_bookmarks is not False: self.scraper_details["type"] = "Bookmarks" self.router = self.go_to_bookmarks elif scrape_query is not None: self.scraper_details["type"] = "Query" self.router = self.go_to_search elif scrape_list is not None: self.scraper_details["type"] = "List" self.router = self.go_to_list else: self.scraper_details["type"] = "Home" self.router = self.go_to_home pass def _get_driver( self, proxy=None, ): print("Setup WebDriver...") # header = Headers().generate()["User-Agent"] # User agent of a andoird smartphone device header="Mozilla/5.0 (Linux; Android 11; SM-G998B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.5414.87 Mobile Safari/537.36" # browser_option = ChromeOptions() browser_option = FirefoxOptions() browser_option.add_argument("--no-sandbox") browser_option.add_argument("--disable-dev-shm-usage") browser_option.add_argument("--ignore-certificate-errors") browser_option.add_argument("--disable-gpu") browser_option.add_argument("--log-level=3") browser_option.add_argument("--disable-notifications") browser_option.add_argument("--disable-popup-blocking") browser_option.add_argument("--user-agent={}".format(header)) if proxy is not None: browser_option.add_argument("--proxy-server=%s" % proxy) # Option to hide browser or not # If not yes then skips the headless if self.headlessState == 'yes': # For Hiding Browser browser_option.add_argument("--headless") try: # print("Initializing ChromeDriver...") # driver = webdriver.Chrome( # options=browser_option, # ) print("Initializing FirefoxDriver...") driver = webdriver.Firefox( options=browser_option, ) print("WebDriver Setup Complete") return driver except WebDriverException: try: # print("Downloading ChromeDriver...") # chromedriver_path = ChromeDriverManager().install() # chrome_service = ChromeService(executable_path=chromedriver_path) print("Downloading FirefoxDriver...") firefoxdriver_path = GeckoDriverManager().install() firefox_service = FirefoxService(executable_path=firefoxdriver_path) # print("Initializing ChromeDriver...") # driver = webdriver.Chrome( # service=chrome_service, # options=browser_option, # ) print("Initializing FirefoxDriver...") driver = webdriver.Firefox( service=firefox_service, options=browser_option, ) print("WebDriver Setup Complete") return driver except Exception as e: print(f"Error setting up WebDriver: {e}") sys.exit(1) pass def login(self): print() print("Logging in to Twitter...") try: self.driver.maximize_window() self.driver.execute_script("document.body.style.zoom='150%'") #set zoom to 150% self.driver.get(TWITTER_LOGIN_URL) sleep(3) self._input_username() self._input_unusual_activity() self._input_password() cookies = self.driver.get_cookies() auth_token = None for cookie in cookies: if cookie["name"] == "auth_token": auth_token = cookie["value"] break if auth_token is None: raise ValueError( """This may be due to the following: - Internet connection is unstable - Username is incorrect - Password is incorrect """ ) print() print("Login Successful") print() except Exception as e: print() print(f"Login Failed: {e}") sys.exit(1) pass def _input_username(self): input_attempt = 0 while True: try: username = self.driver.find_element( "xpath", "//input[@autocomplete='username']" ) username.send_keys(self.username) username.send_keys(Keys.RETURN) sleep(3) break except NoSuchElementException: input_attempt += 1 if input_attempt >= 3: print() print( """There was an error inputting the username. It may be due to the following: - Internet connection is unstable - Username is incorrect - Twitter is experiencing unusual activity""" ) self.driver.quit() sys.exit(1) else: print("Re-attempting to input username...") sleep(2) def _input_unusual_activity(self): input_attempt = 0 while True: try: unusual_activity = self.driver.find_element( "xpath", "//input[@data-testid='ocfEnterTextTextInput']" ) unusual_activity.send_keys(self.username) unusual_activity.send_keys(Keys.RETURN) sleep(3) break except NoSuchElementException: input_attempt += 1 if input_attempt >= 3: break def _input_password(self): input_attempt = 0 while True: try: password = self.driver.find_element( "xpath", "//input[@autocomplete='current-password']" ) password.send_keys(self.password) password.send_keys(Keys.RETURN) sleep(3) break except NoSuchElementException: input_attempt += 1 if input_attempt >= 3: print() print( """There was an error inputting the password. It may be due to the following: - Internet connection is unstable - Password is incorrect - Twitter is experiencing unusual activity""" ) self.driver.quit() sys.exit(1) else: print("Re-attempting to input password...") sleep(2) def go_to_home(self): self.driver.get("https://twitter.com/home") sleep(3) pass def go_to_profile(self): if ( self.scraper_details["username"] is None or self.scraper_details["username"] == "" ): print("Username is not set.") sys.exit(1) else: self.driver.get(f"https://twitter.com/{self.scraper_details['username']}") sleep(3) pass def go_to_hashtag(self): if ( self.scraper_details["hashtag"] is None or self.scraper_details["hashtag"] == "" ): print("Hashtag is not set.") sys.exit(1) else: url = f"https://twitter.com/hashtag/{self.scraper_details['hashtag']}?src=hashtag_click" if self.scraper_details["tab"] == "Latest": url += "&f=live" self.driver.get(url) sleep(3) pass def go_to_bookmarks(self): if ( self.scraper_details["bookmarks"] is False or self.scraper_details["bookmarks"] == "" ): print("Bookmarks is not set.") sys.exit(1) else: url = f"https://twitter..com/i/bookmarks" self.driver.get(url) sleep(3) pass def go_to_search(self): if self.scraper_details["query"] is None or self.scraper_details["query"] == "": print("Query is not set.") sys.exit(1) else: url = f"https://twitter.com/search?q={self.scraper_details['query']}&src=typed_query" if self.scraper_details["tab"] == "Latest": url += "&f=live" self.driver.get(url) sleep(3) pass def go_to_list(self): if self.scraper_details["list"] is None or self.scraper_details["list"] == "": print("List is not set.") sys.exit(1) else: url = f"https://x.com/i/lists/{self.scraper_details['list']}" self.driver.get(url) sleep(3) pass def get_tweet_cards(self): self.tweet_cards = self.driver.find_elements( "xpath", '//article[@data-testid="tweet" and not(@disabled)]' ) pass def remove_hidden_cards(self): try: hidden_cards = self.driver.find_elements( "xpath", '//article[@data-testid="tweet" and @disabled]' ) for card in hidden_cards[1:-2]: self.driver.execute_script( "arguments[0].parentNode.parentNode.parentNode.remove();", card ) except Exception as e: return pass def scrape_tweets( self, max_tweets=50, no_tweets_limit=False, scrape_username=None, scrape_hashtag=None, scrape_bookmarks=False, scrape_query=None, scrape_list=None, scrape_latest=True, scrape_top=False, scrape_poster_details=False, router=None, ): self._config_scraper( max_tweets, scrape_username, scrape_hashtag, scrape_bookmarks, scrape_query, scrape_list, scrape_latest, scrape_top, scrape_poster_details, ) if router is None: router = self.router router() if self.scraper_details["type"] == "Username": print( "Scraping Tweets from @{}...".format(self.scraper_details["username"]) ) elif self.scraper_details["type"] == "Hashtag": print( "Scraping {} Tweets from #{}...".format( self.scraper_details["tab"], self.scraper_details["hashtag"] ) ) elif self.scraper_details["type"] == "Bookmarks": print( "Scraping Tweets from bookmarks...".format(self.scraper_details["username"])) elif self.scraper_details["type"] == "Query": print( "Scraping {} Tweets from {} search...".format( self.scraper_details["tab"], self.scraper_details["query"] ) ) elif self.scraper_details["type"] == "Home": print("Scraping Tweets from Home...") # Accept cookies to make the banner disappear try: accept_cookies_btn = self.driver.find_element( "xpath", "//span[text()='Refuse non-essential cookies']/../../..") accept_cookies_btn.click() except NoSuchElementException: pass self.progress.print_progress(0, False, 0, no_tweets_limit) refresh_count = 0 added_tweets = 0 empty_count = 0 retry_cnt = 0 while self.scroller.scrolling: try: self.get_tweet_cards() added_tweets = 0 for card in self.tweet_cards[-15:]: try: tweet_id = str(card) if tweet_id not in self.tweet_ids: self.tweet_ids.add(tweet_id) if not self.scraper_details["poster_details"]: self.driver.execute_script( "arguments[0].scrollIntoView();", card ) tweet = Tweet( card=card, driver=self.driver, actions=self.actions, scrape_poster_details=self.scraper_details[ "poster_details" ], ) if tweet: if not tweet.error and tweet.tweet is not None: if not tweet.is_ad: self.data.append(tweet.tweet) added_tweets += 1 self.progress.print_progress(len(self.data), False, 0, no_tweets_limit) if len(self.data) >= self.max_tweets and not no_tweets_limit: self.scroller.scrolling = False break else: continue else: continue else: continue else: continue except NoSuchElementException: continue if len(self.data) >= self.max_tweets and not no_tweets_limit: break if added_tweets == 0: # Check if there is a button "Retry" and click on it with a regular basis until a certain amount of tries try: while retry_cnt < 15: retry_button = self.driver.find_element( "xpath", "//span[text()='Retry']/../../..") self.progress.print_progress(len(self.data), True, retry_cnt, no_tweets_limit) sleep(600) retry_button.click() retry_cnt += 1 sleep(2) # There is no Retry button so the counter is reseted except NoSuchElementException: retry_cnt = 0 self.progress.print_progress(len(self.data), False, 0, no_tweets_limit) if empty_count >= 5: if refresh_count >= 3: print() print("No more tweets to scrape") break refresh_count += 1 empty_count += 1 sleep(1) else: empty_count = 0 refresh_count = 0 except StaleElementReferenceException: sleep(2) continue except KeyboardInterrupt: print("\n") print("Keyboard Interrupt") self.interrupted = True break except Exception as e: print("\n") print(f"Error scraping tweets: {e}") break print("") if len(self.data) >= self.max_tweets or no_tweets_limit: print("Scraping Complete") else: print("Scraping Incomplete") if not no_tweets_limit: print("Tweets: {} out of {}\n".format(len(self.data), self.max_tweets)) pass def save_to_csv(self): print("Saving Tweets to CSV...") now = datetime.now() folder_path = "./tweets/" if not os.path.exists(folder_path): os.makedirs(folder_path) print("Created Folder: {}".format(folder_path)) data = { "Name": [tweet[0] for tweet in self.data], "Handle": [tweet[1] for tweet in self.data], "Timestamp": [tweet[2] for tweet in self.data], "Verified": [tweet[3] for tweet in self.data], "Content": [tweet[4] for tweet in self.data], "Comments": [tweet[5] for tweet in self.data], "Retweets": [tweet[6] for tweet in self.data], "Likes": [tweet[7] for tweet in self.data], "Analytics": [tweet[8] for tweet in self.data], "Tags": [tweet[9] for tweet in self.data], "Mentions": [tweet[10] for tweet in self.data], "Emojis": [tweet[11] for tweet in self.data], "Profile Image": [tweet[12] for tweet in self.data], "Tweet Link": [tweet[13] for tweet in self.data], "Tweet ID": [f"tweet_id:{tweet[14]}" for tweet in self.data], } if self.scraper_details["poster_details"]: data["Tweeter ID"] = [f"user_id:{tweet[15]}" for tweet in self.data] data["Following"] = [tweet[16] for tweet in self.data] data["Followers"] = [tweet[17] for tweet in self.data] df = pd.DataFrame(data) current_time = now.strftime("%Y-%m-%d_%H-%M-%S") file_path = f"{folder_path}{current_time}_tweets_1-{len(self.data)}.csv" pd.set_option("display.max_colwidth", None) df.to_csv(file_path, index=False, encoding="utf-8") print("CSV Saved: {}".format(file_path)) pass def get_tweets(self): return self.data