463 lines
14 KiB
Plaintext
463 lines
14 KiB
Plaintext
{
|
|
"cells": [
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 10,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"import os\n",
|
|
"import sys\n",
|
|
"import pandas as pd\n",
|
|
"from datetime import datetime\n",
|
|
"from fake_headers import Headers\n",
|
|
"from time import sleep\n",
|
|
"from selenium import webdriver\n",
|
|
"from selenium.webdriver.common.keys import Keys\n",
|
|
"from selenium.common.exceptions import NoSuchElementException, StaleElementReferenceException\n",
|
|
"\n",
|
|
"from selenium.webdriver.chrome.options import Options as ChromeOptions\n",
|
|
"from selenium.webdriver.chrome.service import Service as ChromeService\n",
|
|
"\n",
|
|
"from webdriver_manager.chrome import ChromeDriverManager\n",
|
|
"\n",
|
|
"now = datetime.now()\n",
|
|
"folder_path = './tweets/'\n",
|
|
"\n",
|
|
"if not os.path.exists(folder_path):\n",
|
|
" os.makedirs(folder_path)\n",
|
|
"\n",
|
|
"USER_UNAME = os.environ['TWITTER_USERNAME']\n",
|
|
"USER_PASSWORD = os.environ['TWITTER_PASSWORD']\n",
|
|
"TWITTER_LOGIN_URL = \"https://twitter.com/i/flow/login\""
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 11,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"class Progress:\n",
|
|
" def __init__(self, current, total) -> None:\n",
|
|
" self.current = current\n",
|
|
" self.total = total\n",
|
|
" pass\n",
|
|
" \n",
|
|
" def print_progress(self, current) -> None:\n",
|
|
" self.current = current\n",
|
|
" progress = current / self.total\n",
|
|
" bar_length = 40\n",
|
|
" progress_bar = \"[\" + \"=\" * int(bar_length * progress) + \\\n",
|
|
" \"-\" * (bar_length - int(bar_length * progress)) + \"]\"\n",
|
|
" sys.stdout.write(\n",
|
|
" \"\\rProgress: [{:<40}] {:.2%} {} of {}\".format(progress_bar, progress, current, self.total))\n",
|
|
" sys.stdout.flush()\n"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 12,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"class Scroller():\n",
|
|
" def __init__(self, driver) -> None:\n",
|
|
" self.driver = driver\n",
|
|
" self.current_position = 0\n",
|
|
" self.last_position = driver.execute_script(\"return window.pageYOffset;\")\n",
|
|
" self.scrolling = True\n",
|
|
" self.scroll_count = 0\n",
|
|
" pass\n",
|
|
" \n",
|
|
" def reset(self) -> None:\n",
|
|
" self.current_position = 0\n",
|
|
" self.last_position = self.driver.execute_script(\"return window.pageYOffset;\")\n",
|
|
" self.scroll_count = 0\n",
|
|
" pass"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 13,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"class Twitter_Scraper():\n",
|
|
" def __init__(self, username, password, max_tweets=50):\n",
|
|
" self.username = username\n",
|
|
" self.password = password\n",
|
|
" self.data = []\n",
|
|
" self.tweet_ids = set()\n",
|
|
" self.max_tweets = max_tweets\n",
|
|
" self.tweet_cards = []\n",
|
|
" self.driver = self._get_driver()\n",
|
|
" self.scroller = Scroller(self.driver)\n",
|
|
" self._login()\n",
|
|
" \n",
|
|
" def _get_driver(self):\n",
|
|
" header = Headers().generate()['User-Agent']\n",
|
|
"\n",
|
|
" browser_option = ChromeOptions()\n",
|
|
" browser_option.add_argument('--no-sandbox')\n",
|
|
" browser_option.add_argument(\"--disable-dev-shm-usage\")\n",
|
|
" browser_option.add_argument('--ignore-certificate-errors')\n",
|
|
" browser_option.add_argument('--disable-gpu')\n",
|
|
" browser_option.add_argument('--log-level=3')\n",
|
|
" browser_option.add_argument('--disable-notifications')\n",
|
|
" browser_option.add_argument('--disable-popup-blocking')\n",
|
|
" browser_option.add_argument('--user-agent={}'.format(header))\n",
|
|
"\n",
|
|
" # For Hiding Browser\n",
|
|
" browser_option.add_argument(\"--headless\")\n",
|
|
" \n",
|
|
" chromedriver_path=ChromeDriverManager().install()\n",
|
|
" chrome_service = ChromeService(executable_path=chromedriver_path)\n",
|
|
"\n",
|
|
" driver = webdriver.Chrome(\n",
|
|
" service=chrome_service,\n",
|
|
" options=browser_option,\n",
|
|
" )\n",
|
|
" \n",
|
|
" return driver\n",
|
|
" \n",
|
|
" def _login(self):\n",
|
|
" self.driver.get(TWITTER_LOGIN_URL)\n",
|
|
" self.driver.maximize_window()\n",
|
|
" sleep(3)\n",
|
|
" \n",
|
|
" self._input_username()\n",
|
|
" self._input_unusual_activity()\n",
|
|
" self._input_password()\n",
|
|
" pass\n",
|
|
"\n",
|
|
" def _input_username(self):\n",
|
|
" try:\n",
|
|
" username = self.driver.find_element(\n",
|
|
" \"xpath\",\n",
|
|
" \"//input[@autocomplete='username']\"\n",
|
|
" )\n",
|
|
"\n",
|
|
" username.send_keys(self.username)\n",
|
|
" username.send_keys(Keys.RETURN)\n",
|
|
" sleep(3)\n",
|
|
"\n",
|
|
" except NoSuchElementException:\n",
|
|
" print(\"Username field not found\")\n",
|
|
" self.driver.quit()\n",
|
|
" exit()\n",
|
|
" pass\n",
|
|
"\n",
|
|
" def _input_unusual_activity(self):\n",
|
|
" try:\n",
|
|
" unusual_activity = self.driver.find_element(\n",
|
|
" \"xpath\",\n",
|
|
" \"//input[@data-testid='ocfEnterTextTextInput']\"\n",
|
|
" )\n",
|
|
" unusual_activity.send_keys(self.username)\n",
|
|
" unusual_activity.send_keys(Keys.RETURN)\n",
|
|
" sleep(3)\n",
|
|
" except NoSuchElementException:\n",
|
|
" pass\n",
|
|
" pass\n",
|
|
"\n",
|
|
" def _input_password(self):\n",
|
|
" try:\n",
|
|
" password = self.driver.find_element(\n",
|
|
" \"xpath\",\n",
|
|
" \"//input[@autocomplete='current-password']\"\n",
|
|
" )\n",
|
|
"\n",
|
|
" password.send_keys(self.password)\n",
|
|
" password.send_keys(Keys.RETURN)\n",
|
|
" sleep(3)\n",
|
|
"\n",
|
|
" except NoSuchElementException:\n",
|
|
" print(\"Password field not found\")\n",
|
|
" self.driver.quit()\n",
|
|
" exit()\n",
|
|
" pass\n",
|
|
" \n",
|
|
" def go_to_home(self):\n",
|
|
" self.driver.get(\"https://twitter.com/home\")\n",
|
|
" sleep(3)\n",
|
|
" pass\n",
|
|
" \n",
|
|
" def get_tweets(self):\n",
|
|
" self.tweet_cards = self.driver.find_elements(\n",
|
|
" 'xpath',\n",
|
|
" '//article[@data-testid=\"tweet\"]'\n",
|
|
" )\n",
|
|
" pass"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 14,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"\n",
|
|
"class Tweet: \n",
|
|
" def __init__(self, card) -> None:\n",
|
|
" self.card = card\n",
|
|
" \n",
|
|
" self.user = card.find_element(\n",
|
|
" 'xpath',\n",
|
|
" './/div[@data-testid=\"User-Name\"]//span'\n",
|
|
" ).text\n",
|
|
" \n",
|
|
" try:\n",
|
|
" self.handle = card.find_element(\n",
|
|
" 'xpath',\n",
|
|
" './/span[contains(text(), \"@\")]'\n",
|
|
" ).text\n",
|
|
" except NoSuchElementException:\n",
|
|
" return\n",
|
|
" \n",
|
|
" try:\n",
|
|
" self.date_time = card.find_element(\n",
|
|
" 'xpath',\n",
|
|
" './/time'\n",
|
|
" ).get_attribute('datetime')\n",
|
|
" \n",
|
|
" if self.date_time is not None:\n",
|
|
" self.is_ad = False\n",
|
|
" except NoSuchElementException:\n",
|
|
" self.is_ad = True\n",
|
|
" return\n",
|
|
" \n",
|
|
" try:\n",
|
|
" card.find_element(\n",
|
|
" 'xpath',\n",
|
|
" './/*[local-name()=\"svg\" and @data-testid=\"icon-verified\"]'\n",
|
|
" )\n",
|
|
" \n",
|
|
" self.verified = True\n",
|
|
" except NoSuchElementException:\n",
|
|
" self.verified = False\n",
|
|
" \n",
|
|
" self.content = \"\"\n",
|
|
" contents = card.find_elements(\n",
|
|
" 'xpath',\n",
|
|
" './/div[@data-testid=\"tweetText\"]/span | .//div[@data-testid=\"tweetText\"]/a'\n",
|
|
" )\n",
|
|
"\n",
|
|
" for index, content in enumerate(contents):\n",
|
|
" self.content += content.text\n",
|
|
" \n",
|
|
" try:\n",
|
|
" self.reply_cnt= card.find_element(\n",
|
|
" 'xpath',\n",
|
|
" './/div[@data-testid=\"reply\"]//span'\n",
|
|
" ).text\n",
|
|
" except NoSuchElementException:\n",
|
|
" self.reply_cnt = '0'\n",
|
|
" \n",
|
|
" try:\n",
|
|
" self.retweet_cnt = card.find_element(\n",
|
|
" 'xpath',\n",
|
|
" './/div[@data-testid=\"retweet\"]//span'\n",
|
|
" ).text\n",
|
|
" except NoSuchElementException:\n",
|
|
" self.retweet_cnt = '0'\n",
|
|
" \n",
|
|
" try:\n",
|
|
" self.like_cnt = card.find_element(\n",
|
|
" 'xpath',\n",
|
|
" './/div[@data-testid=\"like\"]//span'\n",
|
|
" ).text\n",
|
|
" except NoSuchElementException:\n",
|
|
" self.like_cnt = '0'\n",
|
|
" \n",
|
|
" self.tweet = (\n",
|
|
" self.user,\n",
|
|
" self.handle,\n",
|
|
" self.date_time,\n",
|
|
" self.verified,\n",
|
|
" self.content,\n",
|
|
" self.reply_cnt,\n",
|
|
" self.retweet_cnt,\n",
|
|
" self.like_cnt\n",
|
|
" )\n",
|
|
" \n",
|
|
" pass"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 15,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"Progress: [[=======---------------------------------]] 18.00% 9 of 50\n",
|
|
"Scraping Incomplete\n",
|
|
"Tweets: 9 out of 50\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"scraper = Twitter_Scraper(\n",
|
|
" username=USER_UNAME,\n",
|
|
" password=USER_PASSWORD,\n",
|
|
" max_tweets=50\n",
|
|
")\n",
|
|
"\n",
|
|
"scraper.go_to_home()\n",
|
|
"progress = Progress(0, scraper.max_tweets)\n",
|
|
"progress.print_progress(0)\n",
|
|
"\n",
|
|
"try:\n",
|
|
" while scraper.scroller.scrolling:\n",
|
|
" scraper.get_tweets()\n",
|
|
"\n",
|
|
" for card in scraper.tweet_cards[-15:]:\n",
|
|
" tweet_id = str(card)\n",
|
|
" if tweet_id not in scraper.tweet_ids:\n",
|
|
" scraper.tweet_ids.add(tweet_id)\n",
|
|
" tweet = Tweet(card)\n",
|
|
" if tweet:\n",
|
|
" if not tweet.is_ad:\n",
|
|
" scraper.data.append(tweet.tweet)\n",
|
|
" progress.print_progress(len(scraper.data))\n",
|
|
"\n",
|
|
" if len(scraper.data) >= scraper.max_tweets:\n",
|
|
" scraper.scroller.scrolling = False\n",
|
|
" break\n",
|
|
"\n",
|
|
" if len(scraper.data) % 50 == 0:\n",
|
|
" sleep(2)\n",
|
|
"\n",
|
|
" if len(scraper.data) >= scraper.max_tweets:\n",
|
|
" break\n",
|
|
"\n",
|
|
" scraper.scroller.scroll_count = 0\n",
|
|
"\n",
|
|
" while True:\n",
|
|
" scraper.driver.execute_script(\n",
|
|
" 'window.scrollTo(0, document.body.scrollHeight);'\n",
|
|
" )\n",
|
|
" sleep(2)\n",
|
|
" scraper.scroller.current_position = scraper.driver.execute_script(\n",
|
|
" \"return window.pageYOffset;\"\n",
|
|
" )\n",
|
|
"\n",
|
|
" if scraper.scroller.last_position == scraper.scroller.current_position:\n",
|
|
" scraper.scroller.scroll_count += 1\n",
|
|
"\n",
|
|
" if scraper.scroller.scroll_count >= 3:\n",
|
|
" scraper.go_to_home()\n",
|
|
" sleep(2)\n",
|
|
" scraper.scroller.reset()\n",
|
|
" break\n",
|
|
" else:\n",
|
|
" sleep(2)\n",
|
|
" else:\n",
|
|
" scraper.scroller.last_position = scraper.scroller.current_position\n",
|
|
" break\n",
|
|
"\n",
|
|
" print()\n",
|
|
" print(\"Scraping Complete\")\n",
|
|
"except StaleElementReferenceException:\n",
|
|
" print()\n",
|
|
" print(\"Scraping Incomplete\")\n",
|
|
"\n",
|
|
"scraper.driver.close()\n",
|
|
"print(\"Tweets: {} out of {}\".format(len(scraper.data), scraper.max_tweets))"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 16,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# import tabulate\n",
|
|
"\n",
|
|
"# # Tabulate\n",
|
|
"# print(tabulate.tabulate(\n",
|
|
"# scraper.data[:10],\n",
|
|
"# headers=[\n",
|
|
"# 'Name',\n",
|
|
"# 'Handle',\n",
|
|
"# 'Date Time',\n",
|
|
"# 'Verified',\n",
|
|
"# 'Content',\n",
|
|
"# 'Reply Count',\n",
|
|
"# 'Retweet Count',\n",
|
|
"# 'Like Count'\n",
|
|
"# ],\n",
|
|
"# tablefmt='tsv'\n",
|
|
"# ))\n",
|
|
" "
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 17,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# import csv\n",
|
|
"# \n",
|
|
"# with open('twitter_tweets.csv', 'w', encoding='utf-8', newline='') as f:\n",
|
|
"# header = ['Name', 'Handle', 'Timestamp', 'Verified',\n",
|
|
"# 'Content', 'Comments', 'Retweets', 'Likes']\n",
|
|
"# writer = csv.writer(f)\n",
|
|
"# writer.writerow(header)\n",
|
|
"# writer.writerows(scraper.data)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 18,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"data = {\n",
|
|
" 'Name': [tweet[0] for tweet in scraper.data],\n",
|
|
" 'Handle': [tweet[1] for tweet in scraper.data],\n",
|
|
" 'Timestamp': [tweet[2] for tweet in scraper.data],\n",
|
|
" 'Verified': [tweet[3] for tweet in scraper.data],\n",
|
|
" 'Content': [tweet[4] for tweet in scraper.data],\n",
|
|
" 'Comments': [tweet[5] for tweet in scraper.data],\n",
|
|
" 'Retweets': [tweet[6] for tweet in scraper.data],\n",
|
|
" 'Likes': [tweet[7] for tweet in scraper.data]\n",
|
|
"}\n",
|
|
"\n",
|
|
"df = pd.DataFrame(data)\n",
|
|
"\n",
|
|
"current_time = datetime.now().strftime(\"%Y-%m-%d-%H-%M\")\n",
|
|
"\n",
|
|
"file_path = f'{folder_path}{current_time}_tweets_1-{len(scraper.data)}.csv'\n",
|
|
"df.to_csv(file_path, index=False)\n"
|
|
]
|
|
}
|
|
],
|
|
"metadata": {
|
|
"kernelspec": {
|
|
"display_name": "ml",
|
|
"language": "python",
|
|
"name": "python3"
|
|
},
|
|
"language_info": {
|
|
"codemirror_mode": {
|
|
"name": "ipython",
|
|
"version": 3
|
|
},
|
|
"file_extension": ".py",
|
|
"mimetype": "text/x-python",
|
|
"name": "python",
|
|
"nbconvert_exporter": "python",
|
|
"pygments_lexer": "ipython3",
|
|
"version": "3.10.12"
|
|
},
|
|
"orig_nbformat": 4
|
|
},
|
|
"nbformat": 4,
|
|
"nbformat_minor": 2
|
|
}
|