Files
selenium-twitter-scraper/main.ipynb
2023-09-25 00:04:36 +08:00

1050 lines
38 KiB
Plaintext

{
"cells": [
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"# Twitter Scraper using Selenium\n",
"\n",
"Scraper for Twitter Tweets using selenium. It can scrape tweets from:\n",
"- Home/New Feeds\n",
"- User Profile Tweets\n",
"- Query or Search Tweets\n",
"- Hashtags Tweets\n",
"- Advanced Search Tweets"
]
},
{
"cell_type": "code",
"execution_count": 113,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import sys\n",
"import pandas as pd\n",
"\n",
"from datetime import datetime\n",
"from fake_headers import Headers\n",
"from time import sleep\n",
"from selenium import webdriver\n",
"from selenium.webdriver import Chrome\n",
"from selenium.webdriver.common.keys import Keys\n",
"from selenium.common.exceptions import (\n",
" NoSuchElementException,\n",
" StaleElementReferenceException,\n",
" WebDriverException,\n",
")\n",
"from selenium.webdriver.common.action_chains import ActionChains\n",
"\n",
"from selenium.webdriver.chrome.webdriver import WebDriver\n",
"from selenium.webdriver.chrome.options import Options as ChromeOptions\n",
"from selenium.webdriver.chrome.service import Service as ChromeService\n",
"\n",
"from webdriver_manager.chrome import ChromeDriverManager"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"# Progress Class\n",
"\n",
"Class for the progress of the scraper instance."
]
},
{
"cell_type": "code",
"execution_count": 114,
"metadata": {},
"outputs": [],
"source": [
"class Progress:\n",
" def __init__(self, current, total) -> None:\n",
" self.current = current\n",
" self.total = total\n",
" pass\n",
"\n",
" def print_progress(self, current) -> None:\n",
" self.current = current\n",
" progress = current / self.total\n",
" bar_length = 40\n",
" progress_bar = (\n",
" \"[\"\n",
" + \"=\" * int(bar_length * progress)\n",
" + \"-\" * (bar_length - int(bar_length * progress))\n",
" + \"]\"\n",
" )\n",
" sys.stdout.write(\n",
" \"\\rProgress: [{:<40}] {:.2%} {} of {}\".format(\n",
" progress_bar, progress, current, self.total\n",
" )\n",
" )\n",
" sys.stdout.flush()\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"# Scroller Class\n",
"\n",
"Class for the scrollbar of the web page."
]
},
{
"cell_type": "code",
"execution_count": 115,
"metadata": {},
"outputs": [],
"source": [
"class Scroller:\n",
" def __init__(self, driver) -> None:\n",
" self.driver = driver\n",
" self.current_position = 0\n",
" self.last_position = driver.execute_script(\"return window.pageYOffset;\")\n",
" self.scrolling = True\n",
" self.scroll_count = 0\n",
" pass\n",
"\n",
" def reset(self) -> None:\n",
" self.current_position = 0\n",
" self.last_position = self.driver.execute_script(\"return window.pageYOffset;\")\n",
" self.scroll_count = 0\n",
" pass\n",
"\n",
" def scroll_to_top(self) -> None:\n",
" self.driver.execute_script(\"window.scrollTo(0, 0);\")\n",
" pass\n",
"\n",
" def scroll_to_bottom(self) -> None:\n",
" self.driver.execute_script(\"window.scrollTo(0, document.body.scrollHeight);\")\n",
" pass\n",
"\n",
" def update_scroll_position(self) -> None:\n",
" self.current_position = self.driver.execute_script(\"return window.pageYOffset;\")\n",
" pass\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"# Tweet Class\n",
"\n",
"Object for the tweet. Including its data."
]
},
{
"cell_type": "code",
"execution_count": 116,
"metadata": {},
"outputs": [],
"source": [
"class Tweet:\n",
" def __init__(\n",
" self,\n",
" card: WebDriver,\n",
" driver: WebDriver,\n",
" actions: ActionChains,\n",
" scrape_poster_details=False\n",
" ) -> None:\n",
" self.card = card\n",
" self.error = False\n",
" self.tweet = None\n",
"\n",
" try:\n",
" self.user = card.find_element(\n",
" \"xpath\", './/div[@data-testid=\"User-Name\"]//span'\n",
" ).text\n",
" except NoSuchElementException:\n",
" self.error = True\n",
" self.user = \"skip\"\n",
"\n",
" try:\n",
" self.handle = card.find_element(\n",
" \"xpath\", './/span[contains(text(), \"@\")]'\n",
" ).text\n",
" except NoSuchElementException:\n",
" self.error = True\n",
" self.handle = \"skip\"\n",
"\n",
" try:\n",
" self.date_time = card.find_element(\"xpath\", \".//time\").get_attribute(\n",
" \"datetime\"\n",
" )\n",
"\n",
" if self.date_time is not None:\n",
" self.is_ad = False\n",
" except NoSuchElementException:\n",
" self.is_ad = True\n",
" self.error = True\n",
" self.date_time = \"skip\"\n",
" \n",
" if self.error:\n",
" return\n",
"\n",
" try:\n",
" card.find_element(\n",
" \"xpath\", './/*[local-name()=\"svg\" and @data-testid=\"icon-verified\"]'\n",
" )\n",
"\n",
" self.verified = True\n",
" except NoSuchElementException:\n",
" self.verified = False\n",
"\n",
" self.content = \"\"\n",
" contents = card.find_elements(\n",
" \"xpath\",\n",
" '(.//div[@data-testid=\"tweetText\"])[1]/span | (.//div[@data-testid=\"tweetText\"])[1]/a',\n",
" )\n",
"\n",
" for index, content in enumerate(contents):\n",
" self.content += content.text\n",
"\n",
" try:\n",
" self.reply_cnt = card.find_element(\n",
" \"xpath\", './/div[@data-testid=\"reply\"]//span'\n",
" ).text\n",
" \n",
" if self.reply_cnt == \"\":\n",
" self.reply_cnt = \"0\"\n",
" except NoSuchElementException:\n",
" self.reply_cnt = \"0\"\n",
"\n",
" try:\n",
" self.retweet_cnt = card.find_element(\n",
" \"xpath\", './/div[@data-testid=\"retweet\"]//span'\n",
" ).text\n",
" \n",
" if self.retweet_cnt == \"\":\n",
" self.retweet_cnt = \"0\"\n",
" except NoSuchElementException:\n",
" self.retweet_cnt = \"0\"\n",
"\n",
" try:\n",
" self.like_cnt = card.find_element(\n",
" \"xpath\", './/div[@data-testid=\"like\"]//span'\n",
" ).text\n",
" \n",
" if self.like_cnt == \"\":\n",
" self.like_cnt = \"0\"\n",
" except NoSuchElementException:\n",
" self.like_cnt = \"0\"\n",
"\n",
" try:\n",
" self.analytics_cnt = card.find_element(\n",
" \"xpath\", './/a[contains(@href, \"/analytics\")]//span'\n",
" ).text\n",
" \n",
" if self.analytics_cnt == \"\":\n",
" self.analytics_cnt = \"0\"\n",
" except NoSuchElementException:\n",
" self.analytics_cnt = \"0\"\n",
"\n",
" try:\n",
" self.tags = card.find_elements(\n",
" \"xpath\",\n",
" './/a[contains(@href, \"src=hashtag_click\")]',\n",
" )\n",
"\n",
" self.tags = [tag.text for tag in self.tags]\n",
" except NoSuchElementException:\n",
" self.tags = []\n",
" \n",
" try:\n",
" self.mentions = card.find_elements(\n",
" \"xpath\",\n",
" '(.//div[@data-testid=\"tweetText\"])[1]//a[contains(text(), \"@\")]',\n",
" )\n",
"\n",
" self.mentions = [mention.text for mention in self.mentions]\n",
" except NoSuchElementException:\n",
" self.mentions = []\n",
" \n",
" try:\n",
" raw_emojis = card.find_elements(\n",
" \"xpath\",\n",
" '(.//div[@data-testid=\"tweetText\"])[1]/img[contains(@src, \"emoji\")]',\n",
" )\n",
" \n",
" self.emojis = [emoji.get_attribute(\"alt\").encode(\"unicode-escape\").decode(\"ASCII\") for emoji in raw_emojis]\n",
" except NoSuchElementException:\n",
" self.emojis = []\n",
" \n",
" try:\n",
" self.profile_img = card.find_element(\n",
" \"xpath\", './/div[@data-testid=\"Tweet-User-Avatar\"]//img'\n",
" ).get_attribute(\"src\")\n",
" except NoSuchElementException:\n",
" self.profile_img = \"\"\n",
" \n",
" self.following_cnt = \"0\"\n",
" self.followers_cnt = \"0\"\n",
" \n",
" if scrape_poster_details:\n",
" el_name = card.find_element(\n",
" \"xpath\", './/div[@data-testid=\"User-Name\"]//span'\n",
" )\n",
" \n",
" ext_hover_card = False\n",
" ext_following = False\n",
" ext_followers = False\n",
" hover_attempt = 0\n",
" \n",
" while not ext_hover_card or not ext_following or not ext_followers:\n",
" try:\n",
" actions.move_to_element(el_name).perform()\n",
" \n",
" hover_card = driver.find_element(\n",
" \"xpath\",\n",
" '//div[@data-testid=\"hoverCardParent\"]'\n",
" )\n",
" \n",
" ext_hover_card = True\n",
" \n",
" while not ext_following:\n",
" try:\n",
" self.following_cnt = hover_card.find_element(\n",
" \"xpath\",\n",
" './/a[contains(@href, \"/following\")]//span'\n",
" ).text\n",
" \n",
" if self.following_cnt == \"\":\n",
" self.following_cnt = \"0\"\n",
" \n",
" ext_following = True\n",
" except NoSuchElementException:\n",
" continue\n",
" except StaleElementReferenceException:\n",
" self.error = True\n",
" return\n",
" \n",
" while not ext_followers:\n",
" try:\n",
" self.followers_cnt = hover_card.find_element(\n",
" \"xpath\",\n",
" './/a[contains(@href, \"/verified_followers\")]//span'\n",
" ).text\n",
" \n",
" if self.followers_cnt == \"\":\n",
" self.followers_cnt = \"0\"\n",
" \n",
" ext_followers = True\n",
" except NoSuchElementException:\n",
" continue\n",
" except StaleElementReferenceException:\n",
" self.error = True\n",
" return\n",
" except NoSuchElementException:\n",
" if hover_attempt==3:\n",
" self.error\n",
" return\n",
" hover_attempt+=1\n",
" sleep(0.5)\n",
" continue\n",
" except StaleElementReferenceException:\n",
" self.error = True\n",
" return\n",
" \n",
" if ext_hover_card and ext_following and ext_followers:\n",
" actions.reset_actions()\n",
" \n",
" self.tweet = (\n",
" self.user,\n",
" self.handle,\n",
" self.date_time,\n",
" self.verified,\n",
" self.content,\n",
" self.reply_cnt,\n",
" self.retweet_cnt,\n",
" self.like_cnt,\n",
" self.analytics_cnt,\n",
" self.tags,\n",
" self.mentions,\n",
" self.emojis,\n",
" self.profile_img,\n",
" self.following_cnt,\n",
" self.followers_cnt,\n",
" )\n",
"\n",
" pass\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"# Twitter Scraper Class\n",
"\n",
"Class for the Twitter Scraper."
]
},
{
"cell_type": "code",
"execution_count": 117,
"metadata": {},
"outputs": [],
"source": [
"TWITTER_LOGIN_URL = \"https://twitter.com/i/flow/login\"\n",
"\n",
"\n",
"class Twitter_Scraper:\n",
" def __init__(\n",
" self,\n",
" username,\n",
" password,\n",
" max_tweets=50,\n",
" scrape_username=None,\n",
" scrape_hashtag=None,\n",
" scrape_query=None,\n",
" scrape_poster_details=False,\n",
" scrape_latest=True,\n",
" scrape_top=False,\n",
" ):\n",
" print(\"Initializing Twitter Scraper...\")\n",
" self.username = username\n",
" self.password = password\n",
" self.tweet_ids = set()\n",
" self.data = []\n",
" self.tweet_cards = []\n",
" self.scraper_details = {\n",
" \"type\": None,\n",
" \"username\": None,\n",
" \"hashtag\": None,\n",
" \"query\": None,\n",
" \"tab\": None,\n",
" \"poster_details\": False,\n",
" }\n",
" self.max_tweets = max_tweets\n",
" self.progress = Progress(0, max_tweets)\n",
" self.router = self.go_to_home\n",
" self.driver = self._get_driver()\n",
" self.actions = ActionChains(self.driver)\n",
" self.scroller = Scroller(self.driver)\n",
" self._config_scraper(\n",
" max_tweets,\n",
" scrape_username,\n",
" scrape_hashtag,\n",
" scrape_query,\n",
" scrape_latest,\n",
" scrape_top,\n",
" scrape_poster_details,\n",
" )\n",
"\n",
" def _config_scraper(\n",
" self,\n",
" max_tweets=50,\n",
" scrape_username=None,\n",
" scrape_hashtag=None,\n",
" scrape_query=None,\n",
" scrape_latest=True,\n",
" scrape_top=False,\n",
" scrape_poster_details=False,\n",
" ):\n",
" self.tweet_ids = set()\n",
" self.data = []\n",
" self.tweet_cards = []\n",
" self.max_tweets = max_tweets\n",
" self.progress = Progress(0, max_tweets)\n",
" self.scraper_details = {\n",
" \"type\": None,\n",
" \"username\": scrape_username,\n",
" \"hashtag\": str(scrape_hashtag).replace(\"#\", \"\")\n",
" if scrape_hashtag is not None\n",
" else None,\n",
" \"query\": scrape_query,\n",
" \"tab\": \"Latest\" if scrape_latest else \"Top\" if scrape_top else \"Latest\",\n",
" \"poster_details\": scrape_poster_details,\n",
" }\n",
" self.router = self.go_to_home\n",
" self.scroller = Scroller(self.driver)\n",
"\n",
" if scrape_username is not None:\n",
" self.scraper_details[\"type\"] = \"Username\"\n",
" self.router = self.go_to_profile\n",
" elif scrape_hashtag is not None:\n",
" self.scraper_details[\"type\"] = \"Hashtag\"\n",
" self.router = self.go_to_hashtag\n",
" elif scrape_query is not None:\n",
" self.scraper_details[\"type\"] = \"Query\"\n",
" self.router = self.go_to_search\n",
" else:\n",
" self.scraper_details[\"type\"] = \"Home\"\n",
" self.router = self.go_to_home\n",
" pass\n",
"\n",
" def _get_driver(self):\n",
" print(\"Setup WebDriver...\")\n",
" header = Headers().generate()[\"User-Agent\"]\n",
"\n",
" browser_option = ChromeOptions()\n",
" browser_option.add_argument(\"--no-sandbox\")\n",
" browser_option.add_argument(\"--disable-dev-shm-usage\")\n",
" browser_option.add_argument(\"--ignore-certificate-errors\")\n",
" browser_option.add_argument(\"--disable-gpu\")\n",
" browser_option.add_argument(\"--log-level=3\")\n",
" browser_option.add_argument(\"--disable-notifications\")\n",
" browser_option.add_argument(\"--disable-popup-blocking\")\n",
" browser_option.add_argument(\"--user-agent={}\".format(header))\n",
"\n",
" # For Hiding Browser\n",
" browser_option.add_argument(\"--headless\")\n",
"\n",
" try:\n",
" print(\"Initializing ChromeDriver...\")\n",
" driver = webdriver.Chrome(\n",
" options=browser_option,\n",
" )\n",
"\n",
" print(\"WebDriver Setup Complete\")\n",
" return driver\n",
" except WebDriverException:\n",
" try:\n",
" print(\"Downloading ChromeDriver...\")\n",
" chromedriver_path = ChromeDriverManager().install()\n",
" chrome_service = ChromeService(executable_path=chromedriver_path)\n",
"\n",
" print(\"Initializing ChromeDriver...\")\n",
" driver = webdriver.Chrome(\n",
" service=chrome_service,\n",
" options=browser_option,\n",
" )\n",
"\n",
" print(\"WebDriver Setup Complete\")\n",
" return driver\n",
" except Exception as e:\n",
" print(f\"Error setting up WebDriver: {e}\")\n",
" sys.exit(1)\n",
" pass\n",
"\n",
" def login(self):\n",
" print()\n",
" print(\"Logging in to Twitter...\")\n",
"\n",
" try:\n",
" self.driver.maximize_window()\n",
" self.driver.get(TWITTER_LOGIN_URL)\n",
" sleep(3)\n",
"\n",
" self._input_username()\n",
" self._input_unusual_activity()\n",
" self._input_password()\n",
"\n",
" cookies = self.driver.get_cookies()\n",
"\n",
" auth_token = None\n",
"\n",
" for cookie in cookies:\n",
" if cookie[\"name\"] == \"auth_token\":\n",
" auth_token = cookie[\"value\"]\n",
" break\n",
"\n",
" if auth_token is None:\n",
" raise ValueError(\n",
" \"\"\"This may be due to the following:\n",
"\n",
"- Internet connection is unstable\n",
"- Username is incorrect\n",
"- Password is incorrect\n",
"\"\"\"\n",
" )\n",
"\n",
" print()\n",
" print(\"Login Successful\")\n",
" print()\n",
" except Exception as e:\n",
" print()\n",
" print(f\"Login Failed: {e}\")\n",
" sys.exit(1)\n",
"\n",
" pass\n",
"\n",
" def _input_username(self):\n",
" input_attempt = 0\n",
"\n",
" while True:\n",
" try:\n",
" username = self.driver.find_element(\n",
" \"xpath\", \"//input[@autocomplete='username']\"\n",
" )\n",
"\n",
" username.send_keys(self.username)\n",
" username.send_keys(Keys.RETURN)\n",
" sleep(3)\n",
" break\n",
" except NoSuchElementException:\n",
" input_attempt += 1\n",
" if input_attempt >= 3:\n",
" print()\n",
" print(\n",
" \"\"\"There was an error inputting the username.\n",
"\n",
"It may be due to the following:\n",
"- Internet connection is unstable\n",
"- Username is incorrect\n",
"- Twitter is experiencing unusual activity\"\"\"\n",
" )\n",
" self.driver.quit()\n",
" sys.exit(1)\n",
" else:\n",
" print(\"Re-attempting to input username...\")\n",
" sleep(2)\n",
"\n",
" def _input_unusual_activity(self):\n",
" input_attempt = 0\n",
"\n",
" while True:\n",
" try:\n",
" unusual_activity = self.driver.find_element(\n",
" \"xpath\", \"//input[@data-testid='ocfEnterTextTextInput']\"\n",
" )\n",
" unusual_activity.send_keys(self.username)\n",
" unusual_activity.send_keys(Keys.RETURN)\n",
" sleep(3)\n",
" break\n",
" except NoSuchElementException:\n",
" input_attempt += 1\n",
" if input_attempt >= 3:\n",
" break\n",
"\n",
" def _input_password(self):\n",
" input_attempt = 0\n",
"\n",
" while True:\n",
" try:\n",
" password = self.driver.find_element(\n",
" \"xpath\", \"//input[@autocomplete='current-password']\"\n",
" )\n",
"\n",
" password.send_keys(self.password)\n",
" password.send_keys(Keys.RETURN)\n",
" sleep(3)\n",
" break\n",
" except NoSuchElementException:\n",
" input_attempt += 1\n",
" if input_attempt >= 3:\n",
" print()\n",
" print(\n",
" \"\"\"There was an error inputting the password.\n",
"\n",
"It may be due to the following:\n",
"- Internet connection is unstable\n",
"- Password is incorrect\n",
"- Twitter is experiencing unusual activity\"\"\"\n",
" )\n",
" self.driver.quit()\n",
" sys.exit(1)\n",
" else:\n",
" print(\"Re-attempting to input password...\")\n",
" sleep(2)\n",
"\n",
" def go_to_home(self):\n",
" self.driver.get(\"https://twitter.com/home\")\n",
" sleep(3)\n",
" pass\n",
"\n",
" def go_to_profile(self):\n",
" if (\n",
" self.scraper_details[\"username\"] is None\n",
" or self.scraper_details[\"username\"] == \"\"\n",
" ):\n",
" print(\"Username is not set.\")\n",
" sys.exit(1)\n",
" else:\n",
" self.driver.get(f\"https://twitter.com/{self.scraper_details['username']}\")\n",
" sleep(3)\n",
" pass\n",
"\n",
" def go_to_hashtag(self):\n",
" if (\n",
" self.scraper_details[\"hashtag\"] is None\n",
" or self.scraper_details[\"hashtag\"] == \"\"\n",
" ):\n",
" print(\"Hashtag is not set.\")\n",
" sys.exit(1)\n",
" else:\n",
" url = f\"https://twitter.com/hashtag/{self.scraper_details['hashtag']}?src=hashtag_click\"\n",
" if self.scraper_details[\"tab\"] == \"Latest\":\n",
" url += \"&f=live\"\n",
"\n",
" self.driver.get(url)\n",
" sleep(3)\n",
" pass\n",
"\n",
" def go_to_search(self):\n",
" if self.scraper_details[\"query\"] is None or self.scraper_details[\"query\"] == \"\":\n",
" print(\"Query is not set.\")\n",
" sys.exit(1)\n",
" else:\n",
" url = f\"https://twitter.com/search?q={self.scraper_details['query']}&src=typed_query\"\n",
" if self.scraper_details[\"tab\"] == \"Latest\":\n",
" url += \"&f=live\"\n",
"\n",
" self.driver.get(url)\n",
" sleep(3)\n",
" pass\n",
"\n",
" def get_tweet_cards(self):\n",
" self.tweet_cards = self.driver.find_elements(\n",
" \"xpath\", '//article[@data-testid=\"tweet\" and not(@disabled)]'\n",
" )\n",
" pass\n",
"\n",
" def remove_hidden_cards(self):\n",
" try:\n",
" hidden_cards = self.driver.find_elements(\n",
" \"xpath\", '//article[@data-testid=\"tweet\" and @disabled]'\n",
" )\n",
"\n",
" for card in hidden_cards[1:-2]:\n",
" self.driver.execute_script(\n",
" \"arguments[0].parentNode.parentNode.parentNode.remove();\", card\n",
" )\n",
" except Exception as e:\n",
" return\n",
" pass\n",
"\n",
" def scrape_tweets(\n",
" self,\n",
" max_tweets=50,\n",
" scrape_username=None,\n",
" scrape_hashtag=None,\n",
" scrape_query=None,\n",
" scrape_latest=True,\n",
" scrape_top=False,\n",
" scrape_poster_details=False,\n",
" router=None,\n",
" ):\n",
" self._config_scraper(\n",
" max_tweets,\n",
" scrape_username,\n",
" scrape_hashtag,\n",
" scrape_query,\n",
" scrape_latest,\n",
" scrape_top,\n",
" scrape_poster_details,\n",
" )\n",
"\n",
" if router is None:\n",
" router = self.router\n",
"\n",
" router()\n",
"\n",
" if self.scraper_details[\"type\"] == \"Username\":\n",
" print(\n",
" \"Scraping Tweets from @{}...\".format(self.scraper_details[\"username\"])\n",
" )\n",
" elif self.scraper_details[\"type\"] == \"Hashtag\":\n",
" print(\n",
" \"Scraping {} Tweets from #{}...\".format(\n",
" self.scraper_details[\"tab\"], self.scraper_details[\"hashtag\"]\n",
" )\n",
" )\n",
" elif self.scraper_details[\"type\"] == \"Query\":\n",
" print(\n",
" \"Scraping {} Tweets from {} search...\".format(\n",
" self.scraper_details[\"tab\"], self.scraper_details[\"query\"]\n",
" )\n",
" )\n",
" elif self.scraper_details[\"type\"] == \"Home\":\n",
" print(\"Scraping Tweets from Home...\")\n",
"\n",
" self.progress.print_progress(0)\n",
"\n",
" refresh_count = 0\n",
" added_tweets = 0\n",
" empty_count = 0\n",
"\n",
" while self.scroller.scrolling:\n",
" try:\n",
" self.get_tweet_cards()\n",
" added_tweets = 0\n",
"\n",
" for card in self.tweet_cards[-15:]:\n",
" try:\n",
" tweet_id = str(card)\n",
"\n",
" if tweet_id not in self.tweet_ids:\n",
" self.tweet_ids.add(tweet_id)\n",
"\n",
" if not self.scraper_details[\"poster_details\"]:\n",
" self.driver.execute_script(\n",
" \"arguments[0].scrollIntoView();\", card\n",
" )\n",
"\n",
" tweet = Tweet(\n",
" card=card,\n",
" driver=self.driver,\n",
" actions=self.actions,\n",
" scrape_poster_details=self.scraper_details[\n",
" \"poster_details\"\n",
" ],\n",
" )\n",
"\n",
" if tweet:\n",
" if not tweet.error and tweet.tweet is not None:\n",
" if not tweet.is_ad:\n",
" self.data.append(tweet.tweet)\n",
" added_tweets += 1\n",
" self.progress.print_progress(len(self.data))\n",
"\n",
" if len(self.data) >= self.max_tweets:\n",
" self.scroller.scrolling = False\n",
" break\n",
" else:\n",
" continue\n",
" else:\n",
" continue\n",
" else:\n",
" continue\n",
" else:\n",
" continue\n",
" except NoSuchElementException:\n",
" continue\n",
"\n",
" if len(self.data) >= self.max_tweets:\n",
" break\n",
"\n",
" if added_tweets == 0:\n",
" if empty_count >= 5:\n",
" if refresh_count >= 3:\n",
" print()\n",
" print(\"No more tweets to scrape\")\n",
" break\n",
" refresh_count += 1\n",
" empty_count += 1\n",
" sleep(1)\n",
" else:\n",
" empty_count = 0\n",
" refresh_count = 0\n",
" except StaleElementReferenceException:\n",
" sleep(2)\n",
" continue\n",
" except KeyboardInterrupt:\n",
" print(\"\\n\")\n",
" print(\"Keyboard Interrupt\")\n",
" break\n",
" except Exception as e:\n",
" print(\"\\n\")\n",
" print(f\"Error scraping tweets: {e}\")\n",
" break\n",
"\n",
" print(\"\")\n",
"\n",
" if len(self.data) >= self.max_tweets:\n",
" print(\"Scraping Complete\")\n",
" else:\n",
" print(\"Scraping Incomplete\")\n",
"\n",
" print(\"Tweets: {} out of {}\\n\".format(len(self.data), self.max_tweets))\n",
"\n",
" pass\n",
"\n",
" def save_to_csv(self):\n",
" print(\"Saving Tweets to CSV...\")\n",
" now = datetime.now()\n",
" folder_path = \"./tweets/\"\n",
"\n",
" if not os.path.exists(folder_path):\n",
" os.makedirs(folder_path)\n",
" print(\"Created Folder: {}\".format(folder_path))\n",
"\n",
" data = {\n",
" \"Name\": [tweet[0] for tweet in self.data],\n",
" \"Handle\": [tweet[1] for tweet in self.data],\n",
" \"Timestamp\": [tweet[2] for tweet in self.data],\n",
" \"Verified\": [tweet[3] for tweet in self.data],\n",
" \"Content\": [tweet[4] for tweet in self.data],\n",
" \"Comments\": [tweet[5] for tweet in self.data],\n",
" \"Retweets\": [tweet[6] for tweet in self.data],\n",
" \"Likes\": [tweet[7] for tweet in self.data],\n",
" \"Analytics\": [tweet[8] for tweet in self.data],\n",
" \"Tags\": [tweet[9] for tweet in self.data],\n",
" \"Mentions\": [tweet[10] for tweet in self.data],\n",
" \"Emojis\": [tweet[11] for tweet in self.data],\n",
" \"Profile Image\": [tweet[12] for tweet in self.data],\n",
" }\n",
"\n",
" if self.scraper_details[\"poster_details\"]:\n",
" data[\"Following\"] = [tweet[13] for tweet in self.data]\n",
" data[\"Followers\"] = [tweet[14] for tweet in self.data]\n",
"\n",
" df = pd.DataFrame(data)\n",
"\n",
" current_time = now.strftime(\"%Y-%m-%d_%H-%M-%S\")\n",
" file_path = f\"{folder_path}{current_time}_tweets_1-{len(self.data)}.csv\"\n",
" df.to_csv(file_path, index=False, encoding=\"utf-8\")\n",
"\n",
" print(\"CSV Saved: {}\".format(file_path))\n",
"\n",
" pass\n",
"\n",
" def get_tweets(self):\n",
" return self.data"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"# Create a new instance of the Twitter Scraper class"
]
},
{
"cell_type": "code",
"execution_count": 118,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Initializing Twitter Scraper...\n",
"Setup WebDriver...\n",
"Initializing ChromeDriver...\n",
"WebDriver Setup Complete\n"
]
}
],
"source": [
"USER_UNAME = os.environ['TWITTER_USERNAME']\n",
"USER_PASSWORD = os.environ['TWITTER_PASSWORD']\n",
"\n",
"scraper = Twitter_Scraper(\n",
" username=USER_UNAME,\n",
" password=USER_PASSWORD,\n",
" # max_tweets=10,\n",
" # scrape_username=\"something\",\n",
" # scrape_hashtag=\"something\",\n",
" # scrape_query=\"something\",\n",
" # scrape_latest=False,\n",
" # scrape_top=True,\n",
" # scrape_poster_details=True\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 119,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"Logging in to Twitter...\n",
"\n",
"Login Successful\n",
"\n"
]
}
],
"source": [
"scraper.login()"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"# Run Twitter Scraper"
]
},
{
"cell_type": "code",
"execution_count": 120,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Scraping Tweets from Home...\n",
"Progress: [[========================================]] 100.00% 50 of 50\n",
"Scraping Complete\n",
"Tweets: 50 out of 50\n",
"\n"
]
}
],
"source": [
"scraper.scrape_tweets(\n",
" # max_tweets=100,\n",
" # scrape_username=\"something\",\n",
" # scrape_hashtag=\"something\",\n",
" # scrape_query=\"something\",\n",
" # scrape_latest=False,\n",
" # scrape_top=True,\n",
" # scrape_poster_details=True,\n",
")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"# Save Scraped Tweets in a CSV"
]
},
{
"cell_type": "code",
"execution_count": 121,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Saving Tweets to CSV...\n",
"CSV Saved: ./tweets/2023-09-24_23-57-11_tweets_1-50.csv\n"
]
}
],
"source": [
"scraper.save_to_csv()"
]
},
{
"cell_type": "code",
"execution_count": 122,
"metadata": {},
"outputs": [],
"source": [
"scraper.driver.close()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "ml",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.5"
},
"orig_nbformat": 4
},
"nbformat": 4,
"nbformat_minor": 2
}