Files
selenium-twitter-scraper/main.ipynb
2023-09-08 23:19:17 +08:00

433 lines
13 KiB
Plaintext

{
"cells": [
{
"cell_type": "code",
"execution_count": 103,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import sys\n",
"import re\n",
"import pandas as pd\n",
"from fake_headers import Headers\n",
"from getpass import getpass\n",
"from time import sleep\n",
"from selenium import webdriver\n",
"from selenium.webdriver.common.keys import Keys\n",
"from selenium.common.exceptions import NoSuchElementException, WebDriverException\n",
"\n",
"from selenium.webdriver.chrome.options import Options as ChromeOptions\n",
"from selenium.webdriver.chrome.service import Service as ChromeService\n",
"\n",
"from webdriver_manager.chrome import ChromeDriverManager\n",
"\n",
"USER_UNAME = os.environ['TWITTER_USERNAME']\n",
"USER_PASSWORD = os.environ['TWITTER_PASSWORD']\n",
"TWITTER_LOGIN_URL = \"https://twitter.com/i/flow/login\""
]
},
{
"cell_type": "code",
"execution_count": 104,
"metadata": {},
"outputs": [],
"source": [
"class Progress:\n",
" def __init__(self, current, total) -> None:\n",
" self.current = current\n",
" self.total = total\n",
" pass\n",
" \n",
" def print_progress(self, current) -> None:\n",
" self.current = current\n",
" progress = current / self.total\n",
" bar_length = 40\n",
" progress_bar = \"[\" + \"=\" * int(bar_length * progress) + \\\n",
" \"-\" * (bar_length - int(bar_length * progress)) + \"]\"\n",
" sys.stdout.write(\n",
" \"\\rProgress: [{:<40}] {:.2%} {} of {}\".format(progress_bar, progress, current, self.total))\n",
" sys.stdout.flush()\n",
" if current == self.total:\n",
" print(\"\\n\")\n"
]
},
{
"cell_type": "code",
"execution_count": 105,
"metadata": {},
"outputs": [],
"source": [
"class Scroller():\n",
" def __init__(self, driver) -> None:\n",
" self.driver = driver\n",
" self.current_position = 0\n",
" self.last_position = driver.execute_script(\"return window.pageYOffset;\")\n",
" self.scrolling = True\n",
" self.scroll_count = 0\n",
" pass"
]
},
{
"cell_type": "code",
"execution_count": 106,
"metadata": {},
"outputs": [],
"source": [
"class Twitter_Scraper():\n",
" def __init__(self, username, password, max_tweets=50):\n",
" self.username = username\n",
" self.password = password\n",
" self.data = []\n",
" self.tweet_ids = set()\n",
" self.max_tweets = max_tweets\n",
" self.tweet_cards = []\n",
" self.driver = self._get_driver()\n",
" self.scroller = Scroller(self.driver)\n",
" self._login()\n",
" \n",
" def _get_driver(self):\n",
" header = Headers().generate()['User-Agent']\n",
"\n",
" browser_option = ChromeOptions()\n",
" browser_option.add_argument('--no-sandbox')\n",
" browser_option.add_argument(\"--disable-dev-shm-usage\")\n",
" browser_option.add_argument('--ignore-certificate-errors')\n",
" browser_option.add_argument('--disable-gpu')\n",
" browser_option.add_argument('--log-level=3')\n",
" browser_option.add_argument('--disable-notifications')\n",
" browser_option.add_argument('--disable-popup-blocking')\n",
" browser_option.add_argument('--user-agent={}'.format(header))\n",
"\n",
" # For Hiding Browser\n",
" browser_option.add_argument(\"--headless\")\n",
"\n",
" driver = webdriver.Chrome(\n",
" options=browser_option,\n",
" )\n",
" \n",
" return driver\n",
" \n",
" def _login(self):\n",
" self.driver.get(TWITTER_LOGIN_URL)\n",
" self.driver.maximize_window()\n",
" sleep(3)\n",
" \n",
" self._input_username()\n",
" self._input_unusual_activity()\n",
" self._input_password()\n",
" pass\n",
"\n",
" def _input_username(self):\n",
" try:\n",
" username = self.driver.find_element(\n",
" \"xpath\",\n",
" \"//input[@autocomplete='username']\"\n",
" )\n",
"\n",
" username.send_keys(USER_UNAME)\n",
" username.send_keys(Keys.RETURN)\n",
" sleep(3)\n",
"\n",
" except NoSuchElementException:\n",
" print(\"Username field not found\")\n",
" self.driver.quit()\n",
" exit()\n",
" pass\n",
"\n",
" def _input_unusual_activity(self):\n",
" try:\n",
" unusual_activity = self.driver.find_element(\n",
" \"xpath\",\n",
" \"//input[@data-testid='ocfEnterTextTextInput']\"\n",
" )\n",
" unusual_activity.send_keys(USER_UNAME)\n",
" unusual_activity.send_keys(Keys.RETURN)\n",
" sleep(3)\n",
" except NoSuchElementException:\n",
" pass\n",
" pass\n",
"\n",
" def _input_password(self):\n",
" try:\n",
" password = self.driver.find_element(\n",
" \"xpath\",\n",
" \"//input[@autocomplete='current-password']\"\n",
" )\n",
"\n",
" password.send_keys(USER_PASSWORD)\n",
" password.send_keys(Keys.RETURN)\n",
" sleep(3)\n",
"\n",
" except NoSuchElementException:\n",
" print(\"Password field not found\")\n",
" self.driver.quit()\n",
" exit()\n",
" pass\n",
" \n",
" def go_to_home(self):\n",
" self.driver.get(\"https://twitter.com/home\")\n",
" sleep(3)\n",
" pass\n",
" \n",
" def get_tweets(self):\n",
" self.tweet_cards = self.driver.find_elements(\n",
" 'xpath',\n",
" '//article[@data-testid=\"tweet\"]'\n",
" )\n",
" pass"
]
},
{
"cell_type": "code",
"execution_count": 107,
"metadata": {},
"outputs": [],
"source": [
"\n",
"class Tweet: \n",
" def __init__(self, card) -> None:\n",
" self.card = card\n",
" \n",
" self.user = card.find_element(\n",
" 'xpath',\n",
" './/div[@data-testid=\"User-Name\"]//span'\n",
" ).text\n",
" \n",
" try:\n",
" self.handle = card.find_element(\n",
" 'xpath',\n",
" './/span[contains(text(), \"@\")]'\n",
" ).text\n",
" except NoSuchElementException:\n",
" return\n",
" \n",
" try:\n",
" self.date_time = card.find_element(\n",
" 'xpath',\n",
" './/time'\n",
" ).get_attribute('datetime')\n",
" \n",
" if self.date_time is not None:\n",
" self.is_ad = False\n",
" except NoSuchElementException:\n",
" self.is_ad = True\n",
" return\n",
" \n",
" try:\n",
" card.find_element(\n",
" 'xpath',\n",
" './/*[local-name()=\"svg\" and @data-testid=\"icon-verified\"]'\n",
" )\n",
" \n",
" self.verified = True\n",
" except NoSuchElementException:\n",
" self.verified = False\n",
" \n",
" self.content = \"\"\n",
" contents = card.find_elements(\n",
" 'xpath',\n",
" './/div[@data-testid=\"tweetText\"]/span | .//div[@data-testid=\"tweetText\"]/a'\n",
" )\n",
"\n",
" for index, content in enumerate(contents):\n",
" self.content += content.text\n",
" \n",
" try:\n",
" self.reply_cnt= card.find_element(\n",
" 'xpath',\n",
" './/div[@data-testid=\"reply\"]//span'\n",
" ).text\n",
" except NoSuchElementException:\n",
" self.reply_cnt = 0\n",
" \n",
" try:\n",
" self.retweet_cnt = card.find_element(\n",
" 'xpath',\n",
" './/div[@data-testid=\"retweet\"]//span'\n",
" ).text\n",
" except NoSuchElementException:\n",
" self.retweet_cnt = 0\n",
" \n",
" try:\n",
" self.like_cnt = card.find_element(\n",
" 'xpath',\n",
" './/div[@data-testid=\"like\"]//span'\n",
" ).text\n",
" except NoSuchElementException:\n",
" self.like_cnt = 0\n",
" \n",
" self.tweet = (\n",
" self.user,\n",
" self.handle,\n",
" self.date_time,\n",
" self.verified,\n",
" self.content,\n",
" self.reply_cnt,\n",
" self.retweet_cnt,\n",
" self.like_cnt\n",
" )\n",
" \n",
" pass"
]
},
{
"cell_type": "code",
"execution_count": 108,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Progress: [[========================================]] 100.00% 50 of 50\n",
"\n",
"Scraping Complete\n",
"Tweets: 50\n"
]
}
],
"source": [
"scraper = Twitter_Scraper(\n",
" username=USER_UNAME,\n",
" password=USER_PASSWORD,\n",
" max_tweets=50\n",
")\n",
"\n",
"scraper.go_to_home()\n",
"progress = Progress(0, scraper.max_tweets)\n",
"\n",
"while scraper.scroller.scrolling:\n",
" scraper.get_tweets()\n",
"\n",
" for card in scraper.tweet_cards[-15:]:\n",
" tweet_id = str(card)\n",
" if tweet_id not in scraper.tweet_ids:\n",
" scraper.tweet_ids.add(tweet_id)\n",
" tweet = Tweet(card)\n",
" if tweet:\n",
" if not tweet.is_ad:\n",
" scraper.data.append(tweet.tweet)\n",
" progress.print_progress(len(scraper.data))\n",
"\n",
" if len(scraper.data) >= scraper.max_tweets:\n",
" scraper.scroller.scrolling = False\n",
" break\n",
"\n",
" if len(scraper.data) >= scraper.max_tweets:\n",
" break\n",
" \n",
" scraper.scroller.scroll_count = 0\n",
" \n",
" while True:\n",
" scraper.driver.execute_script(\n",
" 'window.scrollTo(0, document.body.scrollHeight);')\n",
" sleep(2)\n",
" scraper.scroller.current_position = scraper.driver.execute_script(\n",
" \"return window.pageYOffset;\"\n",
" )\n",
" \n",
" if scraper.scroller.last_position == scraper.scroller.current_position:\n",
" scraper.scroller.scroll_count += 1\n",
" \n",
" if scraper.scroller.scroll_count >= 3:\n",
" scraper.scroller.scrolling = False\n",
" break\n",
" else:\n",
" sleep(2)\n",
" else:\n",
" scraper.scroller.last_position = scraper.scroller.current_position\n",
" break\n",
"\n",
"print(\"Scraping Complete\")\n",
"print(\"Tweets: {}\".format(len(scraper.data)))"
]
},
{
"cell_type": "code",
"execution_count": 109,
"metadata": {},
"outputs": [],
"source": [
"# import tabulate\n",
"\n",
"# # Tabulate\n",
"# print(tabulate.tabulate(\n",
"# scraper.data[:10],\n",
"# headers=[\n",
"# 'Name',\n",
"# 'Handle',\n",
"# 'Date Time',\n",
"# 'Verified',\n",
"# 'Content',\n",
"# 'Reply Count',\n",
"# 'Retweet Count',\n",
"# 'Like Count'\n",
"# ],\n",
"# tablefmt='tsv'\n",
"# ))\n",
" "
]
},
{
"cell_type": "code",
"execution_count": 110,
"metadata": {},
"outputs": [],
"source": [
"# import csv\n",
"# \n",
"# with open('twitter_tweets.csv', 'w', encoding='utf-8', newline='') as f:\n",
"# header = ['Name', 'Handle', 'Timestamp', 'Verified',\n",
"# 'Content', 'Comments', 'Retweets', 'Likes']\n",
"# writer = csv.writer(f)\n",
"# writer.writerow(header)\n",
"# writer.writerows(scraper.data)"
]
},
{
"cell_type": "code",
"execution_count": 111,
"metadata": {},
"outputs": [],
"source": [
"data = {\n",
" 'Name': [tweet[0] for tweet in scraper.data],\n",
" 'Handle': [tweet[1] for tweet in scraper.data],\n",
" 'Timestamp': [tweet[2] for tweet in scraper.data],\n",
" 'Verified': [tweet[3] for tweet in scraper.data],\n",
" 'Content': [tweet[4] for tweet in scraper.data],\n",
" 'Comments': [tweet[5] for tweet in scraper.data],\n",
" 'Retweets': [tweet[6] for tweet in scraper.data],\n",
" 'Likes': [tweet[7] for tweet in scraper.data]\n",
"}\n",
"\n",
"df = pd.DataFrame(data)\n",
"df.to_csv('twitter_tweets.csv', index=False)\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "ml",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
},
"orig_nbformat": 4
},
"nbformat": 4,
"nbformat_minor": 2
}