twitter scraper script

This commit is contained in:
Jarrian
2023-09-09 12:23:18 +08:00
parent e4e46c9802
commit 783294a1ed
9 changed files with 492 additions and 14 deletions

0
scraper/__init__.py Normal file
View File

52
scraper/__main__.py Normal file
View File

@@ -0,0 +1,52 @@
import os
import sys
import argparse
from twitter_scraper import Twitter_Scraper
try:
from dotenv import load_dotenv
print("Loading .env file")
load_dotenv()
print("Loaded .env file\n")
except Exception as e:
print(f"Error loading .env file: {e}")
sys.exit(1)
def main():
try:
USER_UNAME = os.getenv('TWITTER_USERNAME')
USER_PASSWORD = os.getenv('TWITTER_PASSWORD')
except Exception as e:
print(f"Error retrieving environment variables: {e}")
USER_UNAME = None
USER_PASSWORD = None
sys.exit(1)
parser = argparse.ArgumentParser(description='Twitter Scraper')
parser.add_argument('--tweets', type=int, default=50,
help='Number of tweets to scrape (default: 50)')
args = parser.parse_args()
if USER_UNAME is not None and USER_PASSWORD is not None:
try:
scraper = Twitter_Scraper(
username=USER_UNAME,
password=USER_PASSWORD,
max_tweets=args.tweets
)
scraper.scrape_tweets()
scraper.driver.close()
scraper.save_to_csv()
except KeyboardInterrupt:
print("\nScript Interrupted by user. Exiting...")
sys.exit(1)
else:
print("Missing Twitter username or password environment variables. Please check your .env file.")
sys.exit(1)
if __name__ == '__main__':
main()

18
scraper/progress.py Normal file
View File

@@ -0,0 +1,18 @@
import sys
class Progress:
def __init__(self, current, total) -> None:
self.current = current
self.total = total
pass
def print_progress(self, current) -> None:
self.current = current
progress = current / self.total
bar_length = 40
progress_bar = "[" + "=" * int(bar_length * progress) + \
"-" * (bar_length - int(bar_length * progress)) + "]"
sys.stdout.write(
"\rProgress: [{:<40}] {:.2%} {} of {}".format(progress_bar, progress, current, self.total))
sys.stdout.flush()

27
scraper/scroller.py Normal file
View File

@@ -0,0 +1,27 @@
class Scroller():
def __init__(self, driver) -> None:
self.driver = driver
self.current_position = 0
self.last_position = driver.execute_script("return window.pageYOffset;")
self.scrolling = True
self.scroll_count = 0
pass
def reset(self) -> None:
self.current_position = 0
self.last_position = self.driver.execute_script("return window.pageYOffset;")
self.scroll_count = 0
pass
def scroll_to_top(self) -> None:
self.driver.execute_script("window.scrollTo(0, 0);")
pass
def scroll_to_bottom(self) -> None:
self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
pass
def update_scroll_position(self) -> None:
self.current_position = self.driver.execute_script("return window.pageYOffset;")
pass

87
scraper/tweet.py Normal file
View File

@@ -0,0 +1,87 @@
from selenium.webdriver import Chrome
from selenium.common.exceptions import NoSuchElementException
class Tweet:
def __init__(self, card: Chrome) -> None:
self.card = card
self.user = card.find_element(
'xpath',
'.//div[@data-testid="User-Name"]//span'
).text
try:
self.handle = card.find_element(
'xpath',
'.//span[contains(text(), "@")]'
).text
except NoSuchElementException:
return
try:
self.date_time = card.find_element(
'xpath',
'.//time'
).get_attribute('datetime')
if self.date_time is not None:
self.is_ad = False
except NoSuchElementException:
self.is_ad = True
return
try:
card.find_element(
'xpath',
'.//*[local-name()="svg" and @data-testid="icon-verified"]'
)
self.verified = True
except NoSuchElementException:
self.verified = False
self.content = ""
contents = card.find_elements(
'xpath',
'.//div[@data-testid="tweetText"]/span | .//div[@data-testid="tweetText"]/a'
)
for index, content in enumerate(contents):
self.content += content.text
try:
self.reply_cnt = card.find_element(
'xpath',
'.//div[@data-testid="reply"]//span'
).text
except NoSuchElementException:
self.reply_cnt = '0'
try:
self.retweet_cnt = card.find_element(
'xpath',
'.//div[@data-testid="retweet"]//span'
).text
except NoSuchElementException:
self.retweet_cnt = '0'
try:
self.like_cnt = card.find_element(
'xpath',
'.//div[@data-testid="like"]//span'
).text
except NoSuchElementException:
self.like_cnt = '0'
self.tweet = (
self.user,
self.handle,
self.date_time,
self.verified,
self.content,
self.reply_cnt,
self.retweet_cnt,
self.like_cnt
)
pass

267
scraper/twitter_scraper.py Normal file
View File

@@ -0,0 +1,267 @@
import os
import sys
import pandas as pd
from progress import Progress
from scroller import Scroller
from tweet import Tweet
from datetime import datetime
from fake_headers import Headers
from time import sleep
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.common.exceptions import NoSuchElementException, StaleElementReferenceException, WebDriverException
from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.webdriver.chrome.service import Service as ChromeService
from webdriver_manager.chrome import ChromeDriverManager
TWITTER_LOGIN_URL = "https://twitter.com/i/flow/login"
class Twitter_Scraper():
def __init__(self, username, password, max_tweets=50):
print("Initializing Twitter Scraper...")
self.username = username
self.password = password
self.data = []
self.tweet_ids = set()
self.max_tweets = max_tweets
self.progress = Progress(0, max_tweets)
self.tweet_cards = []
self.driver = self._get_driver()
self.scroller = Scroller(self.driver)
self._login()
def _get_driver(self):
print("Setup WebDriver...")
header = Headers().generate()['User-Agent']
browser_option = ChromeOptions()
browser_option.add_argument('--no-sandbox')
browser_option.add_argument("--disable-dev-shm-usage")
browser_option.add_argument('--ignore-certificate-errors')
browser_option.add_argument('--disable-gpu')
browser_option.add_argument('--log-level=3')
browser_option.add_argument('--disable-notifications')
browser_option.add_argument('--disable-popup-blocking')
browser_option.add_argument('--user-agent={}'.format(header))
# For Hiding Browser
browser_option.add_argument("--headless")
try:
print("Downloading ChromeDriver...")
chromedriver_path = ChromeDriverManager().install()
chrome_service = ChromeService(executable_path=chromedriver_path)
print("Initializing ChromeDriver...")
driver = webdriver.Chrome(
service=chrome_service,
options=browser_option,
)
return driver
except Exception as e:
print(f"Error setting up WebDriver: {e}")
sys.exit(1)
def _login(self):
print("Logging in to Twitter...")
self.driver.get(TWITTER_LOGIN_URL)
self.driver.maximize_window()
sleep(3)
self._input_username()
self._input_unusual_activity()
self._input_password()
pass
def _input_username(self):
input_attempt = 0
while True:
try:
username = self.driver.find_element(
"xpath",
"//input[@autocomplete='username']"
)
username.send_keys(self.username)
username.send_keys(Keys.RETURN)
sleep(3)
break
except NoSuchElementException:
input_attempt += 1
if input_attempt >= 3:
print()
print(""""
There was an error inputting the username.
It may be due to the following:
- Internet connection is unstable
- Username is incorrect
- Twitter is experiencing unusual activity
""")
self.driver.quit()
exit()
else:
print("Re-attempting to input username...")
def _input_unusual_activity(self):
input_attempt = 0
while True:
try:
unusual_activity = self.driver.find_element(
"xpath",
"//input[@data-testid='ocfEnterTextTextInput']"
)
unusual_activity.send_keys(self.username)
unusual_activity.send_keys(Keys.RETURN)
sleep(3)
break
except NoSuchElementException:
input_attempt += 1
if input_attempt >= 3:
break
def _input_password(self):
input_attempt = 0
while True:
try:
password = self.driver.find_element(
"xpath",
"//input[@autocomplete='current-password']"
)
password.send_keys(self.password)
password.send_keys(Keys.RETURN)
sleep(3)
break
except NoSuchElementException:
input_attempt += 1
if input_attempt >= 3:
print()
print("""
There was an error inputting the password.
It may be due to the following:
- Internet connection is unstable
- Password is incorrect
- Twitter is experiencing unusual activity
""")
self.driver.quit()
exit()
else:
print("Re-attempting to input password...")
def go_to_home(self):
self.driver.get("https://twitter.com/home")
sleep(3)
pass
def get_tweets(self):
self.tweet_cards = self.driver.find_elements(
'xpath',
'//article[@data-testid="tweet"]'
)
pass
def scrape_tweets(self, callback=None):
if callback is None:
callback = self.go_to_home
callback()
print("Scraping Tweets...")
print()
self.progress.print_progress(0)
try:
while self.scroller.scrolling:
self.get_tweets()
for card in self.tweet_cards[-15:]:
tweet_id = str(card)
if tweet_id not in self.tweet_ids:
self.tweet_ids.add(tweet_id)
tweet = Tweet(card)
if tweet:
if not tweet.is_ad:
self.data.append(tweet.tweet)
self.progress.print_progress(len(self.data))
if len(self.data) >= self.max_tweets:
self.scroller.scrolling = False
break
if len(self.data) % 50 == 0:
sleep(2)
if len(self.data) >= self.max_tweets:
break
self.scroller.scroll_count = 0
while True:
self.scroller.scroll_to_bottom()
sleep(2)
self.scroller.update_scroll_position()
if self.scroller.last_position == self.scroller.current_position:
self.scroller.scroll_count += 1
if self.scroller.scroll_count >= 3:
callback()
sleep(2)
self.scroller.reset()
break
else:
sleep(2)
else:
self.scroller.last_position = self.scroller.current_position
break
print("\n")
print("Scraping Complete")
except StaleElementReferenceException:
print("\n")
print("Scraping Incomplete")
print("Tweets: {} out of {}\n".format(len(self.data), self.max_tweets))
pass
def save_to_csv(self):
print("Saving Tweets to CSV...")
now = datetime.now()
folder_path = './tweets/'
if not os.path.exists(folder_path):
os.makedirs(folder_path)
print("Created Folder: {}".format(folder_path))
data = {
'Name': [tweet[0] for tweet in self.data],
'Handle': [tweet[1] for tweet in self.data],
'Timestamp': [tweet[2] for tweet in self.data],
'Verified': [tweet[3] for tweet in self.data],
'Content': [tweet[4] for tweet in self.data],
'Comments': [tweet[5] for tweet in self.data],
'Retweets': [tweet[6] for tweet in self.data],
'Likes': [tweet[7] for tweet in self.data]
}
df = pd.DataFrame(data)
current_time = now.strftime("%Y-%m-%d_%H-%M-%S")
file_path = f'{folder_path}{current_time}_tweets_1-{len(self.data)}.csv'
df.to_csv(file_path, index=False)
print("CSV Saved: {}".format(file_path))
pass