From 2553d26590c80f4badd75661c011852c8df5ab64 Mon Sep 17 00:00:00 2001 From: Jarrian Date: Wed, 13 Sep 2023 17:29:45 +0800 Subject: [PATCH] Update Twitter Scraper IPYNB --- main.ipynb | 1017 ++++++++++++++++++++++++++++++++++------------------ 1 file changed, 669 insertions(+), 348 deletions(-) diff --git a/main.ipynb b/main.ipynb index cbc09a7..c1cde67 100644 --- a/main.ipynb +++ b/main.ipynb @@ -1,439 +1,760 @@ { "cells": [ + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Twitter Scraper using Selenium\n", + "\n", + "Scraper for Twitter Tweets using selenium. It can scrape tweets from:\n", + "- Home/New Feeds\n", + "- User Profile Tweets\n", + "- Query or Search Tweets\n", + "- Hashtags Tweets\n", + "- Advanced Search Tweets" + ] + }, { "cell_type": "code", - "execution_count": 46, + "execution_count": 17, "metadata": {}, "outputs": [], "source": [ "import os\n", "import sys\n", "import pandas as pd\n", + "\n", "from datetime import datetime\n", "from fake_headers import Headers\n", "from time import sleep\n", "from selenium import webdriver\n", + "from selenium.webdriver import Chrome\n", "from selenium.webdriver.common.keys import Keys\n", - "from selenium.common.exceptions import NoSuchElementException, StaleElementReferenceException\n", + "from selenium.common.exceptions import (\n", + " NoSuchElementException,\n", + " StaleElementReferenceException,\n", + " WebDriverException,\n", + ")\n", "\n", "from selenium.webdriver.chrome.options import Options as ChromeOptions\n", "from selenium.webdriver.chrome.service import Service as ChromeService\n", "\n", - "from webdriver_manager.chrome import ChromeDriverManager\n", + "from webdriver_manager.chrome import ChromeDriverManager" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Progress Class\n", "\n", - "now = datetime.now()\n", - "folder_path = './tweets/'\n", - "\n", - "if not os.path.exists(folder_path):\n", - " os.makedirs(folder_path)\n", - "\n", - "USER_UNAME = os.environ['TWITTER_USERNAME']\n", - "USER_PASSWORD = os.environ['TWITTER_PASSWORD']\n", - "TWITTER_LOGIN_URL = \"https://twitter.com/i/flow/login\"" + "Class for the progress of the scraper instance." ] }, { "cell_type": "code", - "execution_count": 47, + "execution_count": 18, "metadata": {}, "outputs": [], "source": [ "class Progress:\n", - " def __init__(self, current, total) -> None:\n", - " self.current = current\n", - " self.total = total\n", - " pass\n", - " \n", - " def print_progress(self, current) -> None:\n", - " self.current = current\n", - " progress = current / self.total\n", - " bar_length = 40\n", - " progress_bar = \"[\" + \"=\" * int(bar_length * progress) + \\\n", - " \"-\" * (bar_length - int(bar_length * progress)) + \"]\"\n", - " sys.stdout.write(\n", - " \"\\rProgress: [{:<40}] {:.2%} {} of {}\".format(progress_bar, progress, current, self.total))\n", - " sys.stdout.flush()\n" + " def __init__(self, current, total) -> None:\n", + " self.current = current\n", + " self.total = total\n", + " pass\n", + "\n", + " def print_progress(self, current) -> None:\n", + " self.current = current\n", + " progress = current / self.total\n", + " bar_length = 40\n", + " progress_bar = (\n", + " \"[\"\n", + " + \"=\" * int(bar_length * progress)\n", + " + \"-\" * (bar_length - int(bar_length * progress))\n", + " + \"]\"\n", + " )\n", + " sys.stdout.write(\n", + " \"\\rProgress: [{:<40}] {:.2%} {} of {}\".format(\n", + " progress_bar, progress, current, self.total\n", + " )\n", + " )\n", + " sys.stdout.flush()\n" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Scroller Class\n", + "\n", + "Class for the scrollbar of the web page." ] }, { "cell_type": "code", - "execution_count": 48, + "execution_count": 19, "metadata": {}, "outputs": [], "source": [ - "class Scroller():\n", - " def __init__(self, driver) -> None:\n", - " self.driver = driver\n", - " self.current_position = 0\n", - " self.last_position = driver.execute_script(\"return window.pageYOffset;\")\n", - " self.scrolling = True\n", - " self.scroll_count = 0\n", - " pass\n", - " \n", - " def reset(self) -> None:\n", - " self.current_position = 0\n", - " self.last_position = self.driver.execute_script(\"return window.pageYOffset;\")\n", - " self.scroll_count = 0\n", - " pass" + "class Scroller:\n", + " def __init__(self, driver) -> None:\n", + " self.driver = driver\n", + " self.current_position = 0\n", + " self.last_position = driver.execute_script(\"return window.pageYOffset;\")\n", + " self.scrolling = True\n", + " self.scroll_count = 0\n", + " pass\n", + "\n", + " def reset(self) -> None:\n", + " self.current_position = 0\n", + " self.last_position = self.driver.execute_script(\"return window.pageYOffset;\")\n", + " self.scroll_count = 0\n", + " pass\n", + "\n", + " def scroll_to_top(self) -> None:\n", + " self.driver.execute_script(\"window.scrollTo(0, 0);\")\n", + " pass\n", + "\n", + " def scroll_to_bottom(self) -> None:\n", + " self.driver.execute_script(\"window.scrollTo(0, document.body.scrollHeight);\")\n", + " pass\n", + "\n", + " def update_scroll_position(self) -> None:\n", + " self.current_position = self.driver.execute_script(\"return window.pageYOffset;\")\n", + " pass\n" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Tweet Class\n", + "\n", + "Object for the tweet. Including its data." ] }, { "cell_type": "code", - "execution_count": 49, + "execution_count": 35, "metadata": {}, "outputs": [], "source": [ - "class Twitter_Scraper():\n", - " def __init__(self, username, password, max_tweets=50):\n", - " self.username = username\n", - " self.password = password\n", - " self.data = []\n", - " self.tweet_ids = set()\n", - " self.max_tweets = max_tweets\n", - " self.tweet_cards = []\n", - " self.driver = self._get_driver()\n", - " self.scroller = Scroller(self.driver)\n", - " self._login()\n", - " \n", - " def _get_driver(self):\n", - " header = Headers().generate()['User-Agent']\n", + "class Tweet:\n", + " def __init__(self, card: Chrome) -> None:\n", + " self.card = card\n", "\n", - " browser_option = ChromeOptions()\n", - " browser_option.add_argument('--no-sandbox')\n", - " browser_option.add_argument(\"--disable-dev-shm-usage\")\n", - " browser_option.add_argument('--ignore-certificate-errors')\n", - " browser_option.add_argument('--disable-gpu')\n", - " browser_option.add_argument('--log-level=3')\n", - " browser_option.add_argument('--disable-notifications')\n", - " browser_option.add_argument('--disable-popup-blocking')\n", - " browser_option.add_argument('--user-agent={}'.format(header))\n", + " try:\n", + " self.user = card.find_element(\n", + " \"xpath\", './/div[@data-testid=\"User-Name\"]//span'\n", + " ).text\n", + " except NoSuchElementException:\n", + " return\n", "\n", - " # For Hiding Browser\n", - " browser_option.add_argument(\"--headless\")\n", - " \n", - " chromedriver_path=ChromeDriverManager().install()\n", - " chrome_service = ChromeService(executable_path=chromedriver_path)\n", + " try:\n", + " self.handle = card.find_element(\n", + " \"xpath\", './/span[contains(text(), \"@\")]'\n", + " ).text\n", + " except NoSuchElementException:\n", + " return\n", "\n", - " driver = webdriver.Chrome(\n", - " service=chrome_service,\n", - " options=browser_option,\n", - " )\n", - " \n", - " return driver\n", - " \n", - " def _login(self):\n", - " self.driver.get(TWITTER_LOGIN_URL)\n", - " self.driver.maximize_window()\n", - " sleep(3)\n", - " \n", - " self._input_username()\n", - " self._input_unusual_activity()\n", - " self._input_password()\n", - " pass\n", + " try:\n", + " self.date_time = card.find_element(\"xpath\", \".//time\").get_attribute(\n", + " \"datetime\"\n", + " )\n", "\n", - " def _input_username(self):\n", - " try:\n", - " username = self.driver.find_element(\n", - " \"xpath\",\n", - " \"//input[@autocomplete='username']\"\n", - " )\n", + " if self.date_time is not None:\n", + " self.is_ad = False\n", + " except NoSuchElementException:\n", + " self.is_ad = True\n", + " return\n", "\n", - " username.send_keys(self.username)\n", - " username.send_keys(Keys.RETURN)\n", - " sleep(3)\n", + " try:\n", + " card.find_element(\n", + " \"xpath\", './/*[local-name()=\"svg\" and @data-testid=\"icon-verified\"]'\n", + " )\n", "\n", - " except NoSuchElementException:\n", - " print(\"Username field not found\")\n", - " self.driver.quit()\n", - " exit()\n", - " pass\n", + " self.verified = True\n", + " except NoSuchElementException:\n", + " self.verified = False\n", "\n", - " def _input_unusual_activity(self):\n", - " try:\n", - " unusual_activity = self.driver.find_element(\n", - " \"xpath\",\n", - " \"//input[@data-testid='ocfEnterTextTextInput']\"\n", - " )\n", - " unusual_activity.send_keys(self.username)\n", - " unusual_activity.send_keys(Keys.RETURN)\n", - " sleep(3)\n", - " except NoSuchElementException:\n", - " pass\n", - " pass\n", + " self.content = \"\"\n", + " contents = card.find_elements(\n", + " \"xpath\",\n", + " '(.//div[@data-testid=\"tweetText\"])[1]/span | (.//div[@data-testid=\"tweetText\"])[1]/a',\n", + " )\n", "\n", - " def _input_password(self):\n", - " try:\n", - " password = self.driver.find_element(\n", - " \"xpath\",\n", - " \"//input[@autocomplete='current-password']\"\n", - " )\n", + " for index, content in enumerate(contents):\n", + " self.content += content.text\n", "\n", - " password.send_keys(self.password)\n", - " password.send_keys(Keys.RETURN)\n", - " sleep(3)\n", + " try:\n", + " self.reply_cnt = card.find_element(\n", + " \"xpath\", './/div[@data-testid=\"reply\"]//span'\n", + " ).text\n", + " except NoSuchElementException:\n", + " self.reply_cnt = \"0\"\n", "\n", - " except NoSuchElementException:\n", - " print(\"Password field not found\")\n", - " self.driver.quit()\n", - " exit()\n", - " pass\n", - " \n", - " def go_to_home(self):\n", - " self.driver.get(\"https://twitter.com/home\")\n", - " sleep(3)\n", - " pass\n", - " \n", - " def get_tweets(self):\n", - " self.tweet_cards = self.driver.find_elements(\n", - " 'xpath',\n", - " '//article[@data-testid=\"tweet\"]'\n", - " )\n", - " pass" + " try:\n", + " self.retweet_cnt = card.find_element(\n", + " \"xpath\", './/div[@data-testid=\"retweet\"]//span'\n", + " ).text\n", + " except NoSuchElementException:\n", + " self.retweet_cnt = \"0\"\n", + "\n", + " try:\n", + " self.like_cnt = card.find_element(\n", + " \"xpath\", './/div[@data-testid=\"like\"]//span'\n", + " ).text\n", + " except NoSuchElementException:\n", + " self.like_cnt = \"0\"\n", + "\n", + " try:\n", + " self.analytics_cnt = card.find_element(\n", + " \"xpath\", './/a[contains(@href, \"/analytics\")]//span'\n", + " ).text\n", + " except NoSuchElementException:\n", + " self.analytics_cnt = \"0\"\n", + "\n", + " try:\n", + " self.profile_img = card.find_element(\n", + " \"xpath\", './/div[@data-testid=\"Tweet-User-Avatar\"]//img'\n", + " ).get_attribute(\"src\")\n", + " except NoSuchElementException:\n", + " self.profile_img = \"\"\n", + "\n", + " try:\n", + " self.tags = card.find_elements(\n", + " \"xpath\",\n", + " './/a[contains(@href, \"src=hashtag_click\")]',\n", + " )\n", + "\n", + " self.tags = [tag.text for tag in self.tags]\n", + " except NoSuchElementException:\n", + " self.tags = []\n", + "\n", + " self.tweet = (\n", + " self.user,\n", + " self.handle,\n", + " self.date_time,\n", + " self.verified,\n", + " self.content,\n", + " self.reply_cnt,\n", + " self.retweet_cnt,\n", + " self.like_cnt,\n", + " self.analytics_cnt,\n", + " self.tags,\n", + " self.profile_img,\n", + " )\n", + "\n", + " pass\n" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Twitter Scraper Class\n", + "\n", + "Class for the Twitter Scraper." ] }, { "cell_type": "code", - "execution_count": 50, + "execution_count": 36, "metadata": {}, "outputs": [], "source": [ + "TWITTER_LOGIN_URL = \"https://twitter.com/i/flow/login\"\n", "\n", - "class Tweet: \n", - " def __init__(self, card) -> None:\n", - " self.card = card\n", - " \n", - " self.user = card.find_element(\n", - " 'xpath',\n", - " './/div[@data-testid=\"User-Name\"]//span'\n", - " ).text\n", - " \n", - " try:\n", - " self.handle = card.find_element(\n", - " 'xpath',\n", - " './/span[contains(text(), \"@\")]'\n", - " ).text\n", - " except NoSuchElementException:\n", - " return\n", - " \n", - " try:\n", - " self.date_time = card.find_element(\n", - " 'xpath',\n", - " './/time'\n", - " ).get_attribute('datetime')\n", - " \n", - " if self.date_time is not None:\n", - " self.is_ad = False\n", - " except NoSuchElementException:\n", - " self.is_ad = True\n", - " return\n", - " \n", - " try:\n", - " card.find_element(\n", - " 'xpath',\n", - " './/*[local-name()=\"svg\" and @data-testid=\"icon-verified\"]'\n", - " )\n", - " \n", - " self.verified = True\n", - " except NoSuchElementException:\n", - " self.verified = False\n", - " \n", - " self.content = \"\"\n", - " contents = card.find_elements(\n", - " 'xpath',\n", - " './/div[@data-testid=\"tweetText\"]/span | .//div[@data-testid=\"tweetText\"]/a'\n", - " )\n", "\n", - " for index, content in enumerate(contents):\n", - " self.content += content.text\n", - " \n", - " try:\n", - " self.reply_cnt= card.find_element(\n", - " 'xpath',\n", - " './/div[@data-testid=\"reply\"]//span'\n", - " ).text\n", - " except NoSuchElementException:\n", - " self.reply_cnt = '0'\n", - " \n", - " try:\n", - " self.retweet_cnt = card.find_element(\n", - " 'xpath',\n", - " './/div[@data-testid=\"retweet\"]//span'\n", - " ).text\n", - " except NoSuchElementException:\n", - " self.retweet_cnt = '0'\n", - " \n", - " try:\n", - " self.like_cnt = card.find_element(\n", - " 'xpath',\n", - " './/div[@data-testid=\"like\"]//span'\n", - " ).text\n", - " except NoSuchElementException:\n", - " self.like_cnt = '0'\n", - " \n", - " self.tweet = (\n", - " self.user,\n", - " self.handle,\n", - " self.date_time,\n", - " self.verified,\n", - " self.content,\n", - " self.reply_cnt,\n", - " self.retweet_cnt,\n", - " self.like_cnt\n", - " )\n", - " \n", - " pass" + "class Twitter_Scraper:\n", + " def __init__(\n", + " self,\n", + " username,\n", + " password,\n", + " max_tweets=50,\n", + " scrape_username=None,\n", + " scrape_hashtag=None,\n", + " scrape_query=None,\n", + " scrape_latest=True,\n", + " scrape_top=False,\n", + " ):\n", + " print(\"Initializing Twitter Scraper...\")\n", + " self.username = username\n", + " self.password = password\n", + " self.data = []\n", + " self.scraper_details = {\n", + " \"type\": None,\n", + " \"username\": scrape_username,\n", + " \"hashtag\": str(scrape_hashtag).replace(\"#\", \"\")\n", + " if scrape_hashtag is not None\n", + " else None,\n", + " \"query\": scrape_query,\n", + " \"tab\": \"Latest\" if scrape_latest else \"Top\" if scrape_top else \"Latest\",\n", + " }\n", + " self.router = self.go_to_home\n", + " self.tweet_ids = set()\n", + " self.max_tweets = max_tweets\n", + " self.progress = Progress(0, max_tweets)\n", + " self.tweet_cards = []\n", + " self.driver = self._get_driver()\n", + " self.scroller = Scroller(self.driver)\n", + " self._login()\n", + "\n", + " if scrape_username is not None:\n", + " self.scraper_details[\"type\"] = \"Username\"\n", + " self.router = self.go_to_profile\n", + " elif scrape_hashtag is not None:\n", + " self.scraper_details[\"type\"] = \"Hashtag\"\n", + " self.router = self.go_to_hashtag\n", + " elif scrape_query is not None:\n", + " self.scraper_details[\"type\"] = \"Query\"\n", + " self.router = self.go_to_search\n", + " else:\n", + " self.scraper_details[\"type\"] = \"Home\"\n", + " self.router = self.go_to_home\n", + "\n", + " def _get_driver(self):\n", + " print(\"Setup WebDriver...\")\n", + " header = Headers().generate()[\"User-Agent\"]\n", + "\n", + " browser_option = ChromeOptions()\n", + " browser_option.add_argument(\"--no-sandbox\")\n", + " browser_option.add_argument(\"--disable-dev-shm-usage\")\n", + " browser_option.add_argument(\"--ignore-certificate-errors\")\n", + " browser_option.add_argument(\"--disable-gpu\")\n", + " browser_option.add_argument(\"--log-level=3\")\n", + " browser_option.add_argument(\"--disable-notifications\")\n", + " browser_option.add_argument(\"--disable-popup-blocking\")\n", + " browser_option.add_argument(\"--user-agent={}\".format(header))\n", + "\n", + " # For Hiding Browser\n", + " browser_option.add_argument(\"--headless\")\n", + "\n", + " try:\n", + " print(\"Initializing ChromeDriver...\")\n", + " driver = webdriver.Chrome(\n", + " options=browser_option,\n", + " )\n", + "\n", + " return driver\n", + " except WebDriverException:\n", + " try:\n", + " print(\"Downloading ChromeDriver...\")\n", + " chromedriver_path = ChromeDriverManager().install()\n", + " chrome_service = ChromeService(executable_path=chromedriver_path)\n", + "\n", + " print(\"Initializing ChromeDriver...\")\n", + " driver = webdriver.Chrome(\n", + " service=chrome_service,\n", + " options=browser_option,\n", + " )\n", + "\n", + " return driver\n", + " except Exception as e:\n", + " print(f\"Error setting up WebDriver: {e}\")\n", + " sys.exit(1)\n", + "\n", + " def _login(self):\n", + " print(\"Logging in to Twitter...\")\n", + "\n", + " try:\n", + " self.driver.get(TWITTER_LOGIN_URL)\n", + " self.driver.maximize_window()\n", + " sleep(3)\n", + "\n", + " self._input_username()\n", + " self._input_unusual_activity()\n", + " self._input_password()\n", + "\n", + " cookies = self.driver.get_cookies()\n", + "\n", + " auth_token = None\n", + "\n", + " for cookie in cookies:\n", + " if cookie[\"name\"] == \"auth_token\":\n", + " auth_token = cookie[\"value\"]\n", + " break\n", + "\n", + " if auth_token is None:\n", + " raise ValueError(\n", + " \"\"\"This may be due to the following:\n", + "\n", + "- Internet connection is unstable\n", + "- Username is incorrect\n", + "- Password is incorrect\n", + "\"\"\"\n", + " )\n", + "\n", + " print()\n", + " print(\"Login Successful\")\n", + " print()\n", + " except Exception as e:\n", + " print()\n", + " print(f\"Login Failed: {e}\")\n", + " sys.exit(1)\n", + "\n", + " pass\n", + "\n", + " def _input_username(self):\n", + " input_attempt = 0\n", + "\n", + " while True:\n", + " try:\n", + " username = self.driver.find_element(\n", + " \"xpath\", \"//input[@autocomplete='username']\"\n", + " )\n", + "\n", + " username.send_keys(self.username)\n", + " username.send_keys(Keys.RETURN)\n", + " sleep(3)\n", + " break\n", + " except NoSuchElementException:\n", + " input_attempt += 1\n", + " if input_attempt >= 3:\n", + " print()\n", + " print(\n", + " \"\"\"There was an error inputting the username.\n", + "\n", + "It may be due to the following:\n", + "- Internet connection is unstable\n", + "- Username is incorrect\n", + "- Twitter is experiencing unusual activity\"\"\"\n", + " )\n", + " self.driver.quit()\n", + " sys.exit(1)\n", + " else:\n", + " print(\"Re-attempting to input username...\")\n", + " sleep(2)\n", + "\n", + " def _input_unusual_activity(self):\n", + " input_attempt = 0\n", + "\n", + " while True:\n", + " try:\n", + " unusual_activity = self.driver.find_element(\n", + " \"xpath\", \"//input[@data-testid='ocfEnterTextTextInput']\"\n", + " )\n", + " unusual_activity.send_keys(self.username)\n", + " unusual_activity.send_keys(Keys.RETURN)\n", + " sleep(3)\n", + " break\n", + " except NoSuchElementException:\n", + " input_attempt += 1\n", + " if input_attempt >= 3:\n", + " break\n", + "\n", + " def _input_password(self):\n", + " input_attempt = 0\n", + "\n", + " while True:\n", + " try:\n", + " password = self.driver.find_element(\n", + " \"xpath\", \"//input[@autocomplete='current-password']\"\n", + " )\n", + "\n", + " password.send_keys(self.password)\n", + " password.send_keys(Keys.RETURN)\n", + " sleep(3)\n", + " break\n", + " except NoSuchElementException:\n", + " input_attempt += 1\n", + " if input_attempt >= 3:\n", + " print()\n", + " print(\n", + " \"\"\"There was an error inputting the password.\n", + "\n", + "It may be due to the following:\n", + "- Internet connection is unstable\n", + "- Password is incorrect\n", + "- Twitter is experiencing unusual activity\"\"\"\n", + " )\n", + " self.driver.quit()\n", + " sys.exit(1)\n", + " else:\n", + " print(\"Re-attempting to input password...\")\n", + " sleep(2)\n", + "\n", + " def go_to_home(self):\n", + " self.driver.get(\"https://twitter.com/home\")\n", + " sleep(3)\n", + " pass\n", + "\n", + " def go_to_profile(self):\n", + " self.driver.get(f\"https://twitter.com/{self.scraper_details['username']}\")\n", + " sleep(3)\n", + " pass\n", + "\n", + " def go_to_hashtag(self):\n", + " url = f\"https://twitter.com/hashtag/{self.scraper_details['hashtag']}?src=hashtag_click\"\n", + " if self.scraper_details[\"tab\"] == \"Latest\":\n", + " url += \"&f=live\"\n", + "\n", + " self.driver.get(url)\n", + " sleep(3)\n", + " pass\n", + "\n", + " def go_to_search(self):\n", + " url = f\"https://twitter.com/search?q={self.scraper_details['query']}&src=typed_query\"\n", + " if self.scraper_details[\"tab\"] == \"Latest\":\n", + " url += \"&f=live\"\n", + "\n", + " self.driver.get(url)\n", + " sleep(3)\n", + " pass\n", + "\n", + " def get_tweet_cards(self):\n", + " self.tweet_cards = self.driver.find_elements(\n", + " \"xpath\", '//article[@data-testid=\"tweet\"]'\n", + " )\n", + " pass\n", + "\n", + " def scrape_tweets(self, router=None):\n", + " if router is None:\n", + " router = self.router\n", + "\n", + " router()\n", + "\n", + " if self.scraper_details[\"type\"] == \"Username\":\n", + " print(\n", + " \"Scraping Tweets from @{}...\".format(self.scraper_details[\"username\"])\n", + " )\n", + " elif self.scraper_details[\"type\"] == \"Hashtag\":\n", + " print(\n", + " \"Scraping {} Tweets from #{}...\".format(\n", + " self.scraper_details[\"tab\"], self.scraper_details[\"hashtag\"]\n", + " )\n", + " )\n", + " elif self.scraper_details[\"type\"] == \"Query\":\n", + " print(\n", + " \"Scraping {} Tweets from {} search...\".format(\n", + " self.scraper_details[\"tab\"], self.scraper_details[\"query\"]\n", + " )\n", + " )\n", + " elif self.scraper_details[\"type\"] == \"Home\":\n", + " print(\"Scraping Tweets from Home...\")\n", + "\n", + " self.progress.print_progress(0)\n", + "\n", + " refresh_count = 0\n", + " added_tweets = 0\n", + "\n", + " while self.scroller.scrolling:\n", + " try:\n", + " self.get_tweet_cards()\n", + " added_tweets = 0\n", + "\n", + " for card in self.tweet_cards[-15:]:\n", + " tweet = Tweet(card)\n", + "\n", + " try:\n", + " tweet_id = f\"{tweet.user}{tweet.handle}{tweet.date_time}\"\n", + " except Exception as e:\n", + " continue\n", + "\n", + " if tweet_id not in self.tweet_ids:\n", + " self.tweet_ids.add(tweet_id)\n", + " if tweet:\n", + " if not tweet.is_ad:\n", + " self.data.append(tweet.tweet)\n", + " added_tweets += 1\n", + " self.progress.print_progress(len(self.data))\n", + "\n", + " if len(self.data) >= self.max_tweets:\n", + " self.scroller.scrolling = False\n", + " break\n", + "\n", + " if len(self.data) % 50 == 0:\n", + " sleep(2)\n", + "\n", + " if len(self.data) >= self.max_tweets:\n", + " break\n", + "\n", + " if added_tweets == 0:\n", + " refresh_count += 1\n", + " if refresh_count >= 10:\n", + " print()\n", + " print(\"No more tweets to scrape\")\n", + " break\n", + " else:\n", + " refresh_count = 0\n", + "\n", + " self.scroller.scroll_count = 0\n", + "\n", + " while True:\n", + " self.scroller.scroll_to_bottom()\n", + " sleep(2)\n", + " self.scroller.update_scroll_position()\n", + "\n", + " if self.scroller.last_position == self.scroller.current_position:\n", + " self.scroller.scroll_count += 1\n", + "\n", + " if self.scroller.scroll_count >= 3:\n", + " router()\n", + " sleep(2)\n", + " break\n", + " else:\n", + " sleep(1)\n", + " else:\n", + " self.scroller.last_position = self.scroller.current_position\n", + " break\n", + " except StaleElementReferenceException:\n", + " router()\n", + " sleep(2)\n", + " except Exception as e:\n", + " print(\"\\n\")\n", + " print(f\"Error scraping tweets: {e}\")\n", + " break\n", + "\n", + " print(\"\")\n", + "\n", + " if len(self.data) >= self.max_tweets:\n", + " print(\"Scraping Complete\")\n", + " else:\n", + " print(\"Scraping Incomplete\")\n", + "\n", + " print(\"Tweets: {} out of {}\\n\".format(len(self.data), self.max_tweets))\n", + "\n", + " pass\n", + "\n", + " def save_to_csv(self):\n", + " print(\"Saving Tweets to CSV...\")\n", + " now = datetime.now()\n", + " folder_path = \"./tweets/\"\n", + "\n", + " if not os.path.exists(folder_path):\n", + " os.makedirs(folder_path)\n", + " print(\"Created Folder: {}\".format(folder_path))\n", + "\n", + " data = {\n", + " \"Name\": [tweet[0] for tweet in self.data],\n", + " \"Handle\": [tweet[1] for tweet in self.data],\n", + " \"Timestamp\": [tweet[2] for tweet in self.data],\n", + " \"Verified\": [tweet[3] for tweet in self.data],\n", + " \"Content\": [tweet[4] for tweet in self.data],\n", + " \"Comments\": [tweet[5] for tweet in self.data],\n", + " \"Retweets\": [tweet[6] for tweet in self.data],\n", + " \"Likes\": [tweet[7] for tweet in self.data],\n", + " \"Analytics\": [tweet[8] for tweet in self.data],\n", + " \"Tags\": [tweet[9] for tweet in self.data],\n", + " \"Profile Image\": [tweet[10] for tweet in self.data],\n", + " }\n", + "\n", + " df = pd.DataFrame(data)\n", + "\n", + " current_time = now.strftime(\"%Y-%m-%d_%H-%M-%S\")\n", + " file_path = f\"{folder_path}{current_time}_tweets_1-{len(self.data)}.csv\"\n", + " df.to_csv(file_path, index=False)\n", + "\n", + " print(\"CSV Saved: {}\".format(file_path))\n", + "\n", + " pass\n", + "\n", + " def get_tweets(self):\n", + " return self.data\n" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Create a new instance of the Twitter Scraper class" ] }, { "cell_type": "code", - "execution_count": 51, + "execution_count": 32, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - "Progress: [[========================================]] 100.00% 50 of 50\n", - "Scraping Complete\n", - "Tweets: 50 out of 50\n" + "Initializing Twitter Scraper...\n", + "Setup WebDriver...\n", + "Initializing ChromeDriver...\n", + "Logging in to Twitter...\n", + "\n", + "Login Successful\n", + "\n" ] } ], "source": [ + "USER_UNAME = os.environ['TWITTER_USERNAME']\n", + "USER_PASSWORD = os.environ['TWITTER_PASSWORD']\n", + "\n", "scraper = Twitter_Scraper(\n", - " username=USER_UNAME,\n", - " password=USER_PASSWORD,\n", - " max_tweets=50\n", - ")\n", - "\n", - "scraper.go_to_home()\n", - "progress = Progress(0, scraper.max_tweets)\n", - "progress.print_progress(0)\n", - "\n", - "try:\n", - " while scraper.scroller.scrolling:\n", - " scraper.get_tweets()\n", - "\n", - " for card in scraper.tweet_cards[-15:]:\n", - " tweet_id = str(card)\n", - " if tweet_id not in scraper.tweet_ids:\n", - " scraper.tweet_ids.add(tweet_id)\n", - " tweet = Tweet(card)\n", - " if tweet:\n", - " if not tweet.is_ad:\n", - " scraper.data.append(tweet.tweet)\n", - " progress.print_progress(len(scraper.data))\n", - "\n", - " if len(scraper.data) >= scraper.max_tweets:\n", - " scraper.scroller.scrolling = False\n", - " break\n", - "\n", - " if len(scraper.data) % 50 == 0:\n", - " sleep(2)\n", - "\n", - " if len(scraper.data) >= scraper.max_tweets:\n", - " break\n", - "\n", - " scraper.scroller.scroll_count = 0\n", - "\n", - " while True:\n", - " scraper.driver.execute_script(\n", - " 'window.scrollTo(0, document.body.scrollHeight);'\n", - " )\n", - " sleep(2)\n", - " scraper.scroller.current_position = scraper.driver.execute_script(\n", - " \"return window.pageYOffset;\"\n", - " )\n", - "\n", - " if scraper.scroller.last_position == scraper.scroller.current_position:\n", - " scraper.scroller.scroll_count += 1\n", - "\n", - " if scraper.scroller.scroll_count >= 3:\n", - " scraper.go_to_home()\n", - " sleep(2)\n", - " scraper.scroller.reset()\n", - " break\n", - " else:\n", - " sleep(2)\n", - " else:\n", - " scraper.scroller.last_position = scraper.scroller.current_position\n", - " break\n", - "\n", - " print()\n", - " print(\"Scraping Complete\")\n", - "except StaleElementReferenceException:\n", - " print()\n", - " print(\"Scraping Incomplete\")\n", - "\n", - "scraper.driver.close()\n", - "print(\"Tweets: {} out of {}\".format(len(scraper.data), scraper.max_tweets))" + " username=USER_UNAME,\n", + " password=USER_PASSWORD,\n", + " max_tweets=10,\n", + " # scrape_username=\"something\",\n", + " # scrape_hashtag=\"something\",\n", + " # scrape_query=\"something\",\n", + " # scrape_latest=True,\n", + " # scrape_top=False,\n", + ")" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Run Twitter Scraper" ] }, { "cell_type": "code", - "execution_count": 52, + "execution_count": 33, "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Scraping Tweets from Home...\n", + "Progress: [[========================================]] 100.00% 10 of 10\n", + "Scraping Complete\n", + "Tweets: 10 out of 10\n", + "\n" + ] + } + ], "source": [ - "# import tabulate\n", - "\n", - "# # Tabulate\n", - "# print(tabulate.tabulate(\n", - "# scraper.data[:10],\n", - "# headers=[\n", - "# 'Name',\n", - "# 'Handle',\n", - "# 'Date Time',\n", - "# 'Verified',\n", - "# 'Content',\n", - "# 'Reply Count',\n", - "# 'Retweet Count',\n", - "# 'Like Count'\n", - "# ],\n", - "# tablefmt='tsv'\n", - "# ))\n", - " " + "scraper.scrape_tweets()" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Save Scraped Tweets in a CSV" ] }, { "cell_type": "code", - "execution_count": 53, + "execution_count": 34, "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Saving Tweets to CSV...\n", + "CSV Saved: ./tweets/2023-09-13_17-14-51_tweets_1-10.csv\n" + ] + } + ], "source": [ - "# import csv\n", - "# \n", - "# with open('twitter_tweets.csv', 'w', encoding='utf-8', newline='') as f:\n", - "# header = ['Name', 'Handle', 'Timestamp', 'Verified',\n", - "# 'Content', 'Comments', 'Retweets', 'Likes']\n", - "# writer = csv.writer(f)\n", - "# writer.writerow(header)\n", - "# writer.writerows(scraper.data)" - ] - }, - { - "cell_type": "code", - "execution_count": 54, - "metadata": {}, - "outputs": [], - "source": [ - "data = {\n", - " 'Name': [tweet[0] for tweet in scraper.data],\n", - " 'Handle': [tweet[1] for tweet in scraper.data],\n", - " 'Timestamp': [tweet[2] for tweet in scraper.data],\n", - " 'Verified': [tweet[3] for tweet in scraper.data],\n", - " 'Content': [tweet[4] for tweet in scraper.data],\n", - " 'Comments': [tweet[5] for tweet in scraper.data],\n", - " 'Retweets': [tweet[6] for tweet in scraper.data],\n", - " 'Likes': [tweet[7] for tweet in scraper.data]\n", - "}\n", - "\n", - "df = pd.DataFrame(data)\n", - "\n", - "current_time = datetime.now().strftime(\"%Y-%m-%d_%H-%M-%S\")\n", - "\n", - "file_path = f'{folder_path}{current_time}_tweets_1-{len(scraper.data)}.csv'\n", - "df.to_csv(file_path, index=False)\n" + "scraper.save_to_csv()\n", + "scraper.driver.close()" ] } ],