866 lines
29 KiB
Plaintext
866 lines
29 KiB
Plaintext
{
|
|
"cells": [
|
|
{
|
|
"attachments": {},
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"# Twitter Scraper using Selenium\n",
|
|
"\n",
|
|
"Scraper for Twitter Tweets using selenium. It can scrape tweets from:\n",
|
|
"- Home/New Feeds\n",
|
|
"- User Profile Tweets\n",
|
|
"- Query or Search Tweets\n",
|
|
"- Hashtags Tweets\n",
|
|
"- Advanced Search Tweets"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 10,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"import os\n",
|
|
"import sys\n",
|
|
"import pandas as pd\n",
|
|
"\n",
|
|
"from datetime import datetime\n",
|
|
"from fake_headers import Headers\n",
|
|
"from time import sleep\n",
|
|
"from selenium import webdriver\n",
|
|
"from selenium.webdriver import Chrome\n",
|
|
"from selenium.webdriver.common.keys import Keys\n",
|
|
"from selenium.common.exceptions import (\n",
|
|
" NoSuchElementException,\n",
|
|
" StaleElementReferenceException,\n",
|
|
" WebDriverException,\n",
|
|
")\n",
|
|
"\n",
|
|
"from selenium.webdriver.chrome.options import Options as ChromeOptions\n",
|
|
"from selenium.webdriver.chrome.service import Service as ChromeService\n",
|
|
"\n",
|
|
"from webdriver_manager.chrome import ChromeDriverManager"
|
|
]
|
|
},
|
|
{
|
|
"attachments": {},
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"# Progress Class\n",
|
|
"\n",
|
|
"Class for the progress of the scraper instance."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 11,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"class Progress:\n",
|
|
" def __init__(self, current, total) -> None:\n",
|
|
" self.current = current\n",
|
|
" self.total = total\n",
|
|
" pass\n",
|
|
"\n",
|
|
" def print_progress(self, current) -> None:\n",
|
|
" self.current = current\n",
|
|
" progress = current / self.total\n",
|
|
" bar_length = 40\n",
|
|
" progress_bar = (\n",
|
|
" \"[\"\n",
|
|
" + \"=\" * int(bar_length * progress)\n",
|
|
" + \"-\" * (bar_length - int(bar_length * progress))\n",
|
|
" + \"]\"\n",
|
|
" )\n",
|
|
" sys.stdout.write(\n",
|
|
" \"\\rProgress: [{:<40}] {:.2%} {} of {}\".format(\n",
|
|
" progress_bar, progress, current, self.total\n",
|
|
" )\n",
|
|
" )\n",
|
|
" sys.stdout.flush()\n"
|
|
]
|
|
},
|
|
{
|
|
"attachments": {},
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"# Scroller Class\n",
|
|
"\n",
|
|
"Class for the scrollbar of the web page."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 12,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"class Scroller:\n",
|
|
" def __init__(self, driver) -> None:\n",
|
|
" self.driver = driver\n",
|
|
" self.current_position = 0\n",
|
|
" self.last_position = driver.execute_script(\"return window.pageYOffset;\")\n",
|
|
" self.scrolling = True\n",
|
|
" self.scroll_count = 0\n",
|
|
" pass\n",
|
|
"\n",
|
|
" def reset(self) -> None:\n",
|
|
" self.current_position = 0\n",
|
|
" self.last_position = self.driver.execute_script(\"return window.pageYOffset;\")\n",
|
|
" self.scroll_count = 0\n",
|
|
" pass\n",
|
|
"\n",
|
|
" def scroll_to_top(self) -> None:\n",
|
|
" self.driver.execute_script(\"window.scrollTo(0, 0);\")\n",
|
|
" pass\n",
|
|
"\n",
|
|
" def scroll_to_bottom(self) -> None:\n",
|
|
" self.driver.execute_script(\"window.scrollTo(0, document.body.scrollHeight);\")\n",
|
|
" pass\n",
|
|
"\n",
|
|
" def update_scroll_position(self) -> None:\n",
|
|
" self.current_position = self.driver.execute_script(\"return window.pageYOffset;\")\n",
|
|
" pass\n"
|
|
]
|
|
},
|
|
{
|
|
"attachments": {},
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"# Tweet Class\n",
|
|
"\n",
|
|
"Object for the tweet. Including its data."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 13,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"class Tweet:\n",
|
|
" def __init__(self, card: Chrome) -> None:\n",
|
|
" self.card = card\n",
|
|
"\n",
|
|
" try:\n",
|
|
" self.user = card.find_element(\n",
|
|
" \"xpath\", './/div[@data-testid=\"User-Name\"]//span'\n",
|
|
" ).text\n",
|
|
" except NoSuchElementException:\n",
|
|
" return\n",
|
|
"\n",
|
|
" try:\n",
|
|
" self.handle = card.find_element(\n",
|
|
" \"xpath\", './/span[contains(text(), \"@\")]'\n",
|
|
" ).text\n",
|
|
" except NoSuchElementException:\n",
|
|
" return\n",
|
|
"\n",
|
|
" try:\n",
|
|
" self.date_time = card.find_element(\"xpath\", \".//time\").get_attribute(\n",
|
|
" \"datetime\"\n",
|
|
" )\n",
|
|
"\n",
|
|
" if self.date_time is not None:\n",
|
|
" self.is_ad = False\n",
|
|
" except NoSuchElementException:\n",
|
|
" self.is_ad = True\n",
|
|
" return\n",
|
|
"\n",
|
|
" try:\n",
|
|
" card.find_element(\n",
|
|
" \"xpath\", './/*[local-name()=\"svg\" and @data-testid=\"icon-verified\"]'\n",
|
|
" )\n",
|
|
"\n",
|
|
" self.verified = True\n",
|
|
" except NoSuchElementException:\n",
|
|
" self.verified = False\n",
|
|
"\n",
|
|
" self.content = \"\"\n",
|
|
" contents = card.find_elements(\n",
|
|
" \"xpath\",\n",
|
|
" '(.//div[@data-testid=\"tweetText\"])[1]/span | (.//div[@data-testid=\"tweetText\"])[1]/a',\n",
|
|
" )\n",
|
|
"\n",
|
|
" for index, content in enumerate(contents):\n",
|
|
" self.content += content.text\n",
|
|
"\n",
|
|
" try:\n",
|
|
" self.reply_cnt = card.find_element(\n",
|
|
" \"xpath\", './/div[@data-testid=\"reply\"]//span'\n",
|
|
" ).text\n",
|
|
" except NoSuchElementException:\n",
|
|
" self.reply_cnt = \"0\"\n",
|
|
"\n",
|
|
" try:\n",
|
|
" self.retweet_cnt = card.find_element(\n",
|
|
" \"xpath\", './/div[@data-testid=\"retweet\"]//span'\n",
|
|
" ).text\n",
|
|
" except NoSuchElementException:\n",
|
|
" self.retweet_cnt = \"0\"\n",
|
|
"\n",
|
|
" try:\n",
|
|
" self.like_cnt = card.find_element(\n",
|
|
" \"xpath\", './/div[@data-testid=\"like\"]//span'\n",
|
|
" ).text\n",
|
|
" except NoSuchElementException:\n",
|
|
" self.like_cnt = \"0\"\n",
|
|
"\n",
|
|
" try:\n",
|
|
" self.analytics_cnt = card.find_element(\n",
|
|
" \"xpath\", './/a[contains(@href, \"/analytics\")]//span'\n",
|
|
" ).text\n",
|
|
" except NoSuchElementException:\n",
|
|
" self.analytics_cnt = \"0\"\n",
|
|
"\n",
|
|
" try:\n",
|
|
" self.profile_img = card.find_element(\n",
|
|
" \"xpath\", './/div[@data-testid=\"Tweet-User-Avatar\"]//img'\n",
|
|
" ).get_attribute(\"src\")\n",
|
|
" except NoSuchElementException:\n",
|
|
" self.profile_img = \"\"\n",
|
|
"\n",
|
|
" try:\n",
|
|
" self.tags = card.find_elements(\n",
|
|
" \"xpath\",\n",
|
|
" './/a[contains(@href, \"src=hashtag_click\")]',\n",
|
|
" )\n",
|
|
"\n",
|
|
" self.tags = [tag.text for tag in self.tags]\n",
|
|
" except NoSuchElementException:\n",
|
|
" self.tags = []\n",
|
|
"\n",
|
|
" self.tweet = (\n",
|
|
" self.user,\n",
|
|
" self.handle,\n",
|
|
" self.date_time,\n",
|
|
" self.verified,\n",
|
|
" self.content,\n",
|
|
" self.reply_cnt,\n",
|
|
" self.retweet_cnt,\n",
|
|
" self.like_cnt,\n",
|
|
" self.analytics_cnt,\n",
|
|
" self.tags,\n",
|
|
" self.profile_img,\n",
|
|
" )\n",
|
|
"\n",
|
|
" pass\n"
|
|
]
|
|
},
|
|
{
|
|
"attachments": {},
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"# Twitter Scraper Class\n",
|
|
"\n",
|
|
"Class for the Twitter Scraper."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 14,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"TWITTER_LOGIN_URL = \"https://twitter.com/i/flow/login\"\n",
|
|
"\n",
|
|
"class Twitter_Scraper:\n",
|
|
" def __init__(\n",
|
|
" self,\n",
|
|
" username,\n",
|
|
" password,\n",
|
|
" max_tweets=50,\n",
|
|
" scrape_username=None,\n",
|
|
" scrape_hashtag=None,\n",
|
|
" scrape_query=None,\n",
|
|
" scrape_latest=True,\n",
|
|
" scrape_top=False,\n",
|
|
" ):\n",
|
|
" print(\"Initializing Twitter Scraper...\")\n",
|
|
" self.username = username\n",
|
|
" self.password = password\n",
|
|
" self.tweet_ids = set()\n",
|
|
" self.data = []\n",
|
|
" self.tweet_cards = []\n",
|
|
" self.scraper_details = {\n",
|
|
" \"type\": None,\n",
|
|
" \"username\": None,\n",
|
|
" \"hashtag\": None,\n",
|
|
" \"query\": None,\n",
|
|
" \"tab\": None,\n",
|
|
" }\n",
|
|
" self.max_tweets = max_tweets\n",
|
|
" self.progress = Progress(0, max_tweets)\n",
|
|
" self.router = self.go_to_home\n",
|
|
" self.driver = self._get_driver()\n",
|
|
" self.scroller = Scroller(self.driver)\n",
|
|
" self._login()\n",
|
|
" self._config_scraper(\n",
|
|
" max_tweets,\n",
|
|
" scrape_username,\n",
|
|
" scrape_hashtag,\n",
|
|
" scrape_query,\n",
|
|
" scrape_latest,\n",
|
|
" scrape_top,\n",
|
|
" )\n",
|
|
"\n",
|
|
" def _config_scraper(\n",
|
|
" self,\n",
|
|
" max_tweets=50,\n",
|
|
" scrape_username=None,\n",
|
|
" scrape_hashtag=None,\n",
|
|
" scrape_query=None,\n",
|
|
" scrape_latest=True,\n",
|
|
" scrape_top=False,\n",
|
|
" ):\n",
|
|
" self.tweet_ids = set()\n",
|
|
" self.data = []\n",
|
|
" self.tweet_cards = []\n",
|
|
" self.max_tweets = max_tweets\n",
|
|
" self.progress = Progress(0, max_tweets)\n",
|
|
" self.scraper_details = {\n",
|
|
" \"type\": None,\n",
|
|
" \"username\": scrape_username,\n",
|
|
" \"hashtag\": str(scrape_hashtag).replace(\"#\", \"\")\n",
|
|
" if scrape_hashtag is not None\n",
|
|
" else None,\n",
|
|
" \"query\": scrape_query,\n",
|
|
" \"tab\": \"Latest\" if scrape_latest else \"Top\" if scrape_top else \"Latest\",\n",
|
|
" }\n",
|
|
" self.router = self.go_to_home\n",
|
|
"\n",
|
|
" if scrape_username is not None:\n",
|
|
" self.scraper_details[\"type\"] = \"Username\"\n",
|
|
" self.router = self.go_to_profile\n",
|
|
" elif scrape_hashtag is not None:\n",
|
|
" self.scraper_details[\"type\"] = \"Hashtag\"\n",
|
|
" self.router = self.go_to_hashtag\n",
|
|
" elif scrape_query is not None:\n",
|
|
" self.scraper_details[\"type\"] = \"Query\"\n",
|
|
" self.router = self.go_to_search\n",
|
|
" else:\n",
|
|
" self.scraper_details[\"type\"] = \"Home\"\n",
|
|
" self.router = self.go_to_home\n",
|
|
" pass\n",
|
|
"\n",
|
|
" def _get_driver(self):\n",
|
|
" print(\"Setup WebDriver...\")\n",
|
|
" header = Headers().generate()[\"User-Agent\"]\n",
|
|
"\n",
|
|
" browser_option = ChromeOptions()\n",
|
|
" browser_option.add_argument(\"--no-sandbox\")\n",
|
|
" browser_option.add_argument(\"--disable-dev-shm-usage\")\n",
|
|
" browser_option.add_argument(\"--ignore-certificate-errors\")\n",
|
|
" browser_option.add_argument(\"--disable-gpu\")\n",
|
|
" browser_option.add_argument(\"--log-level=3\")\n",
|
|
" browser_option.add_argument(\"--disable-notifications\")\n",
|
|
" browser_option.add_argument(\"--disable-popup-blocking\")\n",
|
|
" browser_option.add_argument(\"--user-agent={}\".format(header))\n",
|
|
"\n",
|
|
" # For Hiding Browser\n",
|
|
" browser_option.add_argument(\"--headless\")\n",
|
|
"\n",
|
|
" try:\n",
|
|
" print(\"Initializing ChromeDriver...\")\n",
|
|
" driver = webdriver.Chrome(\n",
|
|
" options=browser_option,\n",
|
|
" )\n",
|
|
"\n",
|
|
" return driver\n",
|
|
" except WebDriverException:\n",
|
|
" try:\n",
|
|
" print(\"Downloading ChromeDriver...\")\n",
|
|
" chromedriver_path = ChromeDriverManager().install()\n",
|
|
" chrome_service = ChromeService(executable_path=chromedriver_path)\n",
|
|
"\n",
|
|
" print(\"Initializing ChromeDriver...\")\n",
|
|
" driver = webdriver.Chrome(\n",
|
|
" service=chrome_service,\n",
|
|
" options=browser_option,\n",
|
|
" )\n",
|
|
"\n",
|
|
" return driver\n",
|
|
" except Exception as e:\n",
|
|
" print(f\"Error setting up WebDriver: {e}\")\n",
|
|
" sys.exit(1)\n",
|
|
"\n",
|
|
" def _login(self):\n",
|
|
" print(\"Logging in to Twitter...\")\n",
|
|
"\n",
|
|
" try:\n",
|
|
" self.driver.get(TWITTER_LOGIN_URL)\n",
|
|
" self.driver.maximize_window()\n",
|
|
" sleep(3)\n",
|
|
"\n",
|
|
" self._input_username()\n",
|
|
" self._input_unusual_activity()\n",
|
|
" self._input_password()\n",
|
|
"\n",
|
|
" cookies = self.driver.get_cookies()\n",
|
|
"\n",
|
|
" auth_token = None\n",
|
|
"\n",
|
|
" for cookie in cookies:\n",
|
|
" if cookie[\"name\"] == \"auth_token\":\n",
|
|
" auth_token = cookie[\"value\"]\n",
|
|
" break\n",
|
|
"\n",
|
|
" if auth_token is None:\n",
|
|
" raise ValueError(\n",
|
|
" \"\"\"This may be due to the following:\n",
|
|
"\n",
|
|
"- Internet connection is unstable\n",
|
|
"- Username is incorrect\n",
|
|
"- Password is incorrect\n",
|
|
"\"\"\"\n",
|
|
" )\n",
|
|
"\n",
|
|
" print()\n",
|
|
" print(\"Login Successful\")\n",
|
|
" print()\n",
|
|
" except Exception as e:\n",
|
|
" print()\n",
|
|
" print(f\"Login Failed: {e}\")\n",
|
|
" sys.exit(1)\n",
|
|
"\n",
|
|
" pass\n",
|
|
"\n",
|
|
" def _input_username(self):\n",
|
|
" input_attempt = 0\n",
|
|
"\n",
|
|
" while True:\n",
|
|
" try:\n",
|
|
" username = self.driver.find_element(\n",
|
|
" \"xpath\", \"//input[@autocomplete='username']\"\n",
|
|
" )\n",
|
|
"\n",
|
|
" username.send_keys(self.username)\n",
|
|
" username.send_keys(Keys.RETURN)\n",
|
|
" sleep(3)\n",
|
|
" break\n",
|
|
" except NoSuchElementException:\n",
|
|
" input_attempt += 1\n",
|
|
" if input_attempt >= 3:\n",
|
|
" print()\n",
|
|
" print(\n",
|
|
" \"\"\"There was an error inputting the username.\n",
|
|
"\n",
|
|
"It may be due to the following:\n",
|
|
"- Internet connection is unstable\n",
|
|
"- Username is incorrect\n",
|
|
"- Twitter is experiencing unusual activity\"\"\"\n",
|
|
" )\n",
|
|
" self.driver.quit()\n",
|
|
" sys.exit(1)\n",
|
|
" else:\n",
|
|
" print(\"Re-attempting to input username...\")\n",
|
|
" sleep(2)\n",
|
|
"\n",
|
|
" def _input_unusual_activity(self):\n",
|
|
" input_attempt = 0\n",
|
|
"\n",
|
|
" while True:\n",
|
|
" try:\n",
|
|
" unusual_activity = self.driver.find_element(\n",
|
|
" \"xpath\", \"//input[@data-testid='ocfEnterTextTextInput']\"\n",
|
|
" )\n",
|
|
" unusual_activity.send_keys(self.username)\n",
|
|
" unusual_activity.send_keys(Keys.RETURN)\n",
|
|
" sleep(3)\n",
|
|
" break\n",
|
|
" except NoSuchElementException:\n",
|
|
" input_attempt += 1\n",
|
|
" if input_attempt >= 3:\n",
|
|
" break\n",
|
|
"\n",
|
|
" def _input_password(self):\n",
|
|
" input_attempt = 0\n",
|
|
"\n",
|
|
" while True:\n",
|
|
" try:\n",
|
|
" password = self.driver.find_element(\n",
|
|
" \"xpath\", \"//input[@autocomplete='current-password']\"\n",
|
|
" )\n",
|
|
"\n",
|
|
" password.send_keys(self.password)\n",
|
|
" password.send_keys(Keys.RETURN)\n",
|
|
" sleep(3)\n",
|
|
" break\n",
|
|
" except NoSuchElementException:\n",
|
|
" input_attempt += 1\n",
|
|
" if input_attempt >= 3:\n",
|
|
" print()\n",
|
|
" print(\n",
|
|
" \"\"\"There was an error inputting the password.\n",
|
|
"\n",
|
|
"It may be due to the following:\n",
|
|
"- Internet connection is unstable\n",
|
|
"- Password is incorrect\n",
|
|
"- Twitter is experiencing unusual activity\"\"\"\n",
|
|
" )\n",
|
|
" self.driver.quit()\n",
|
|
" sys.exit(1)\n",
|
|
" else:\n",
|
|
" print(\"Re-attempting to input password...\")\n",
|
|
" sleep(2)\n",
|
|
"\n",
|
|
" def go_to_home(self):\n",
|
|
" self.driver.get(\"https://twitter.com/home\")\n",
|
|
" sleep(3)\n",
|
|
" pass\n",
|
|
"\n",
|
|
" def go_to_profile(self):\n",
|
|
" if (\n",
|
|
" self.scraper_details[\"username\"] is None\n",
|
|
" or self.scraper_details[\"username\"] == \"\"\n",
|
|
" ):\n",
|
|
" print(\"Username is not set.\")\n",
|
|
" sys.exit(1)\n",
|
|
" else:\n",
|
|
" self.driver.get(f\"https://twitter.com/{self.scraper_details['username']}\")\n",
|
|
" sleep(3)\n",
|
|
" pass\n",
|
|
"\n",
|
|
" def go_to_hashtag(self):\n",
|
|
" if (\n",
|
|
" self.scraper_details[\"hashtag\"] is None\n",
|
|
" or self.scraper_details[\"hashtag\"] == \"\"\n",
|
|
" ):\n",
|
|
" print(\"Hashtag is not set.\")\n",
|
|
" sys.exit(1)\n",
|
|
" else:\n",
|
|
" url = f\"https://twitter.com/hashtag/{self.scraper_details['hashtag']}?src=hashtag_click\"\n",
|
|
" if self.scraper_details[\"tab\"] == \"Latest\":\n",
|
|
" url += \"&f=live\"\n",
|
|
"\n",
|
|
" self.driver.get(url)\n",
|
|
" sleep(3)\n",
|
|
" pass\n",
|
|
"\n",
|
|
" def go_to_search(self):\n",
|
|
" if self.scraper_details[\"query\"] is None or self.scraper_details[\"query\"] == \"\":\n",
|
|
" print(\"Query is not set.\")\n",
|
|
" sys.exit(1)\n",
|
|
" else:\n",
|
|
" url = f\"https://twitter.com/search?q={self.scraper_details['query']}&src=typed_query\"\n",
|
|
" if self.scraper_details[\"tab\"] == \"Latest\":\n",
|
|
" url += \"&f=live\"\n",
|
|
"\n",
|
|
" self.driver.get(url)\n",
|
|
" sleep(3)\n",
|
|
" pass\n",
|
|
"\n",
|
|
" def get_tweet_cards(self):\n",
|
|
" self.tweet_cards = self.driver.find_elements(\n",
|
|
" \"xpath\", '//article[@data-testid=\"tweet\"]'\n",
|
|
" )\n",
|
|
" pass\n",
|
|
"\n",
|
|
" def scrape_tweets(\n",
|
|
" self,\n",
|
|
" max_tweets=50,\n",
|
|
" scrape_username=None,\n",
|
|
" scrape_hashtag=None,\n",
|
|
" scrape_query=None,\n",
|
|
" scrape_latest=True,\n",
|
|
" scrape_top=False,\n",
|
|
" router=None,\n",
|
|
" ):\n",
|
|
" self._config_scraper(\n",
|
|
" max_tweets,\n",
|
|
" scrape_username,\n",
|
|
" scrape_hashtag,\n",
|
|
" scrape_query,\n",
|
|
" scrape_latest,\n",
|
|
" scrape_top,\n",
|
|
" )\n",
|
|
"\n",
|
|
" if router is None:\n",
|
|
" router = self.router\n",
|
|
"\n",
|
|
" router()\n",
|
|
"\n",
|
|
" if self.scraper_details[\"type\"] == \"Username\":\n",
|
|
" print(\n",
|
|
" \"Scraping Tweets from @{}...\".format(self.scraper_details[\"username\"])\n",
|
|
" )\n",
|
|
" elif self.scraper_details[\"type\"] == \"Hashtag\":\n",
|
|
" print(\n",
|
|
" \"Scraping {} Tweets from #{}...\".format(\n",
|
|
" self.scraper_details[\"tab\"], self.scraper_details[\"hashtag\"]\n",
|
|
" )\n",
|
|
" )\n",
|
|
" elif self.scraper_details[\"type\"] == \"Query\":\n",
|
|
" print(\n",
|
|
" \"Scraping {} Tweets from {} search...\".format(\n",
|
|
" self.scraper_details[\"tab\"], self.scraper_details[\"query\"]\n",
|
|
" )\n",
|
|
" )\n",
|
|
" elif self.scraper_details[\"type\"] == \"Home\":\n",
|
|
" print(\"Scraping Tweets from Home...\")\n",
|
|
"\n",
|
|
" self.progress.print_progress(0)\n",
|
|
"\n",
|
|
" refresh_count = 0\n",
|
|
" added_tweets = 0\n",
|
|
"\n",
|
|
" while self.scroller.scrolling:\n",
|
|
" try:\n",
|
|
" self.get_tweet_cards()\n",
|
|
" added_tweets = 0\n",
|
|
"\n",
|
|
" for card in self.tweet_cards[-15:]:\n",
|
|
" tweet = Tweet(card)\n",
|
|
"\n",
|
|
" try:\n",
|
|
" tweet_id = f\"{tweet.user}{tweet.handle}{tweet.date_time}\"\n",
|
|
" except Exception as e:\n",
|
|
" continue\n",
|
|
"\n",
|
|
" if tweet_id not in self.tweet_ids:\n",
|
|
" self.tweet_ids.add(tweet_id)\n",
|
|
" if tweet:\n",
|
|
" if not tweet.is_ad:\n",
|
|
" self.data.append(tweet.tweet)\n",
|
|
" added_tweets += 1\n",
|
|
" self.progress.print_progress(len(self.data))\n",
|
|
"\n",
|
|
" if len(self.data) >= self.max_tweets:\n",
|
|
" self.scroller.scrolling = False\n",
|
|
" break\n",
|
|
"\n",
|
|
" if len(self.data) % 50 == 0:\n",
|
|
" sleep(2)\n",
|
|
"\n",
|
|
" if len(self.data) >= self.max_tweets:\n",
|
|
" break\n",
|
|
"\n",
|
|
" if added_tweets == 0:\n",
|
|
" refresh_count += 1\n",
|
|
" if refresh_count >= 10:\n",
|
|
" print()\n",
|
|
" print(\"No more tweets to scrape\")\n",
|
|
" break\n",
|
|
" else:\n",
|
|
" refresh_count = 0\n",
|
|
"\n",
|
|
" self.scroller.scroll_count = 0\n",
|
|
"\n",
|
|
" while True:\n",
|
|
" self.scroller.scroll_to_bottom()\n",
|
|
" sleep(2)\n",
|
|
" self.scroller.update_scroll_position()\n",
|
|
"\n",
|
|
" if self.scroller.last_position == self.scroller.current_position:\n",
|
|
" self.scroller.scroll_count += 1\n",
|
|
"\n",
|
|
" if self.scroller.scroll_count >= 3:\n",
|
|
" router()\n",
|
|
" sleep(2)\n",
|
|
" break\n",
|
|
" else:\n",
|
|
" sleep(1)\n",
|
|
" else:\n",
|
|
" self.scroller.last_position = self.scroller.current_position\n",
|
|
" break\n",
|
|
" except StaleElementReferenceException:\n",
|
|
" router()\n",
|
|
" sleep(2)\n",
|
|
" except Exception as e:\n",
|
|
" print(\"\\n\")\n",
|
|
" print(f\"Error scraping tweets: {e}\")\n",
|
|
" break\n",
|
|
"\n",
|
|
" print(\"\")\n",
|
|
"\n",
|
|
" if len(self.data) >= self.max_tweets:\n",
|
|
" print(\"Scraping Complete\")\n",
|
|
" else:\n",
|
|
" print(\"Scraping Incomplete\")\n",
|
|
"\n",
|
|
" print(\"Tweets: {} out of {}\\n\".format(len(self.data), self.max_tweets))\n",
|
|
"\n",
|
|
" pass\n",
|
|
"\n",
|
|
" def save_to_csv(self):\n",
|
|
" print(\"Saving Tweets to CSV...\")\n",
|
|
" now = datetime.now()\n",
|
|
" folder_path = \"./tweets/\"\n",
|
|
"\n",
|
|
" if not os.path.exists(folder_path):\n",
|
|
" os.makedirs(folder_path)\n",
|
|
" print(\"Created Folder: {}\".format(folder_path))\n",
|
|
"\n",
|
|
" data = {\n",
|
|
" \"Name\": [tweet[0] for tweet in self.data],\n",
|
|
" \"Handle\": [tweet[1] for tweet in self.data],\n",
|
|
" \"Timestamp\": [tweet[2] for tweet in self.data],\n",
|
|
" \"Verified\": [tweet[3] for tweet in self.data],\n",
|
|
" \"Content\": [tweet[4] for tweet in self.data],\n",
|
|
" \"Comments\": [tweet[5] for tweet in self.data],\n",
|
|
" \"Retweets\": [tweet[6] for tweet in self.data],\n",
|
|
" \"Likes\": [tweet[7] for tweet in self.data],\n",
|
|
" \"Analytics\": [tweet[8] for tweet in self.data],\n",
|
|
" \"Tags\": [tweet[9] for tweet in self.data],\n",
|
|
" \"Profile Image\": [tweet[10] for tweet in self.data],\n",
|
|
" }\n",
|
|
"\n",
|
|
" df = pd.DataFrame(data)\n",
|
|
"\n",
|
|
" current_time = now.strftime(\"%Y-%m-%d_%H-%M-%S\")\n",
|
|
" file_path = f\"{folder_path}{current_time}_tweets_1-{len(self.data)}.csv\"\n",
|
|
" df.to_csv(file_path, index=False)\n",
|
|
"\n",
|
|
" print(\"CSV Saved: {}\".format(file_path))\n",
|
|
"\n",
|
|
" pass\n",
|
|
"\n",
|
|
" def get_tweets(self):\n",
|
|
" return self.data"
|
|
]
|
|
},
|
|
{
|
|
"attachments": {},
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"# Create a new instance of the Twitter Scraper class"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 27,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"Initializing Twitter Scraper...\n",
|
|
"Setup WebDriver...\n",
|
|
"Initializing ChromeDriver...\n",
|
|
"Logging in to Twitter...\n",
|
|
"\n",
|
|
"Login Successful\n",
|
|
"\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"USER_UNAME = os.environ['TWITTER_USERNAME']\n",
|
|
"USER_PASSWORD = os.environ['TWITTER_PASSWORD']\n",
|
|
"\n",
|
|
"scraper = Twitter_Scraper(\n",
|
|
" username=USER_UNAME,\n",
|
|
" password=USER_PASSWORD,\n",
|
|
" # max_tweets=10,\n",
|
|
" # scrape_username=\"\",\n",
|
|
" # scrape_hashtag=\"something\",\n",
|
|
" # scrape_query=\"something\",\n",
|
|
" # scrape_latest=True,\n",
|
|
" # scrape_top=False,\n",
|
|
")"
|
|
]
|
|
},
|
|
{
|
|
"attachments": {},
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"# Run Twitter Scraper"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 28,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"Scraping Tweets from Home...\n",
|
|
"Progress: [[========================================]] 100.00% 50 of 50\n",
|
|
"Scraping Complete\n",
|
|
"Tweets: 50 out of 50\n",
|
|
"\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"scraper.scrape_tweets(\n",
|
|
" # max_tweets=10,\n",
|
|
" # scrape_username=\"something\",\n",
|
|
" # scrape_hashtag=\"something\",\n",
|
|
" # scrape_query=\"something\",\n",
|
|
" # scrape_latest=True,\n",
|
|
" # scrape_top=False,\n",
|
|
")"
|
|
]
|
|
},
|
|
{
|
|
"attachments": {},
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"# Save Scraped Tweets in a CSV"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 29,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"Saving Tweets to CSV...\n",
|
|
"CSV Saved: ./tweets/2023-09-20_09-26-30_tweets_1-50.csv\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"scraper.save_to_csv()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"scraper.driver.close()"
|
|
]
|
|
}
|
|
],
|
|
"metadata": {
|
|
"kernelspec": {
|
|
"display_name": "ml",
|
|
"language": "python",
|
|
"name": "python3"
|
|
},
|
|
"language_info": {
|
|
"codemirror_mode": {
|
|
"name": "ipython",
|
|
"version": 3
|
|
},
|
|
"file_extension": ".py",
|
|
"mimetype": "text/x-python",
|
|
"name": "python",
|
|
"nbconvert_exporter": "python",
|
|
"pygments_lexer": "ipython3",
|
|
"version": "3.11.5"
|
|
},
|
|
"orig_nbformat": 4
|
|
},
|
|
"nbformat": 4,
|
|
"nbformat_minor": 2
|
|
}
|