diff --git a/.env.example b/.env.example index 4f36ace..1a6b9d5 100644 --- a/.env.example +++ b/.env.example @@ -1,2 +1,2 @@ -TWITTER_EMAIL=# Your Twitter Email +TWITTER_USERNAME=# Your Twitter USERNAME TWITTER_PASSWORD=# Your Twitter Password diff --git a/.gitignore b/.gitignore index 68bc17f..0ee78ae 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ __pycache__/ *.py[cod] *$py.class +*.csv # C extensions *.so diff --git a/main.ipynb b/main.ipynb index 1bfbac0..fc5052f 100644 --- a/main.ipynb +++ b/main.ipynb @@ -2,13 +2,14 @@ "cells": [ { "cell_type": "code", - "execution_count": 36, + "execution_count": 103, "metadata": {}, "outputs": [], "source": [ "import os\n", + "import sys\n", "import re\n", - "import csv\n", + "import pandas as pd\n", "from fake_headers import Headers\n", "from getpass import getpass\n", "from time import sleep\n", @@ -19,49 +20,391 @@ "from selenium.webdriver.chrome.options import Options as ChromeOptions\n", "from selenium.webdriver.chrome.service import Service as ChromeService\n", "\n", - "USER_EMAIL = os.environ['TWITTER_EMAIL']\n", + "from webdriver_manager.chrome import ChromeDriverManager\n", + "\n", + "USER_UNAME = os.environ['TWITTER_USERNAME']\n", "USER_PASSWORD = os.environ['TWITTER_PASSWORD']\n", "TWITTER_LOGIN_URL = \"https://twitter.com/i/flow/login\"" ] }, { "cell_type": "code", - "execution_count": 43, + "execution_count": 104, "metadata": {}, "outputs": [], "source": [ - "header = Headers().generate()['User-Agent']\n", - "browser_option = ChromeOptions()\n", - "browser_option.add_argument('--no-sandbox')\n", - "browser_option.add_argument(\"--disable-dev-shm-usage\")\n", - "browser_option.add_argument('--ignore-certificate-errors')\n", - "browser_option.add_argument('--disable-gpu')\n", - "browser_option.add_argument('--log-level=3')\n", - "browser_option.add_argument('--disable-notifications')\n", - "browser_option.add_argument('--disable-popup-blocking')\n", - "browser_option.add_argument('--user-agent={}'.format(header))\n", - "\n", - "# For Hiding Browser\n", - "# browser_option.add_argument(\"--headless\")\n", - "\n", - "driver = webdriver.Chrome(\n", - " options=browser_option\n", - ")" + "class Progress:\n", + " def __init__(self, current, total) -> None:\n", + " self.current = current\n", + " self.total = total\n", + " pass\n", + " \n", + " def print_progress(self, current) -> None:\n", + " self.current = current\n", + " progress = current / self.total\n", + " bar_length = 40\n", + " progress_bar = \"[\" + \"=\" * int(bar_length * progress) + \\\n", + " \"-\" * (bar_length - int(bar_length * progress)) + \"]\"\n", + " sys.stdout.write(\n", + " \"\\rProgress: [{:<40}] {:.2%} {} of {}\".format(progress_bar, progress, current, self.total))\n", + " sys.stdout.flush()\n", + " if current == self.total:\n", + " print(\"\\n\")\n" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 105, "metadata": {}, "outputs": [], - "source": [] + "source": [ + "class Scroller():\n", + " def __init__(self, driver) -> None:\n", + " self.driver = driver\n", + " self.current_position = 0\n", + " self.last_position = driver.execute_script(\"return window.pageYOffset;\")\n", + " self.scrolling = True\n", + " self.scroll_count = 0\n", + " pass" + ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 106, "metadata": {}, "outputs": [], - "source": [] + "source": [ + "class Twitter_Scraper():\n", + " def __init__(self, username, password, max_tweets=50):\n", + " self.username = username\n", + " self.password = password\n", + " self.data = []\n", + " self.tweet_ids = set()\n", + " self.max_tweets = max_tweets\n", + " self.tweet_cards = []\n", + " self.driver = self._get_driver()\n", + " self.scroller = Scroller(self.driver)\n", + " self._login()\n", + " \n", + " def _get_driver(self):\n", + " header = Headers().generate()['User-Agent']\n", + "\n", + " browser_option = ChromeOptions()\n", + " browser_option.add_argument('--no-sandbox')\n", + " browser_option.add_argument(\"--disable-dev-shm-usage\")\n", + " browser_option.add_argument('--ignore-certificate-errors')\n", + " browser_option.add_argument('--disable-gpu')\n", + " browser_option.add_argument('--log-level=3')\n", + " browser_option.add_argument('--disable-notifications')\n", + " browser_option.add_argument('--disable-popup-blocking')\n", + " browser_option.add_argument('--user-agent={}'.format(header))\n", + "\n", + " # For Hiding Browser\n", + " browser_option.add_argument(\"--headless\")\n", + "\n", + " driver = webdriver.Chrome(\n", + " options=browser_option,\n", + " )\n", + " \n", + " return driver\n", + " \n", + " def _login(self):\n", + " self.driver.get(TWITTER_LOGIN_URL)\n", + " self.driver.maximize_window()\n", + " sleep(3)\n", + " \n", + " self._input_username()\n", + " self._input_unusual_activity()\n", + " self._input_password()\n", + " pass\n", + "\n", + " def _input_username(self):\n", + " try:\n", + " username = self.driver.find_element(\n", + " \"xpath\",\n", + " \"//input[@autocomplete='username']\"\n", + " )\n", + "\n", + " username.send_keys(USER_UNAME)\n", + " username.send_keys(Keys.RETURN)\n", + " sleep(3)\n", + "\n", + " except NoSuchElementException:\n", + " print(\"Username field not found\")\n", + " self.driver.quit()\n", + " exit()\n", + " pass\n", + "\n", + " def _input_unusual_activity(self):\n", + " try:\n", + " unusual_activity = self.driver.find_element(\n", + " \"xpath\",\n", + " \"//input[@data-testid='ocfEnterTextTextInput']\"\n", + " )\n", + " unusual_activity.send_keys(USER_UNAME)\n", + " unusual_activity.send_keys(Keys.RETURN)\n", + " sleep(3)\n", + " except NoSuchElementException:\n", + " pass\n", + " pass\n", + "\n", + " def _input_password(self):\n", + " try:\n", + " password = self.driver.find_element(\n", + " \"xpath\",\n", + " \"//input[@autocomplete='current-password']\"\n", + " )\n", + "\n", + " password.send_keys(USER_PASSWORD)\n", + " password.send_keys(Keys.RETURN)\n", + " sleep(3)\n", + "\n", + " except NoSuchElementException:\n", + " print(\"Password field not found\")\n", + " self.driver.quit()\n", + " exit()\n", + " pass\n", + " \n", + " def go_to_home(self):\n", + " self.driver.get(\"https://twitter.com/home\")\n", + " sleep(3)\n", + " pass\n", + " \n", + " def get_tweets(self):\n", + " self.tweet_cards = self.driver.find_elements(\n", + " 'xpath',\n", + " '//article[@data-testid=\"tweet\"]'\n", + " )\n", + " pass" + ] + }, + { + "cell_type": "code", + "execution_count": 107, + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "class Tweet: \n", + " def __init__(self, card) -> None:\n", + " self.card = card\n", + " \n", + " self.user = card.find_element(\n", + " 'xpath',\n", + " './/div[@data-testid=\"User-Name\"]//span'\n", + " ).text\n", + " \n", + " try:\n", + " self.handle = card.find_element(\n", + " 'xpath',\n", + " './/span[contains(text(), \"@\")]'\n", + " ).text\n", + " except NoSuchElementException:\n", + " return\n", + " \n", + " try:\n", + " self.date_time = card.find_element(\n", + " 'xpath',\n", + " './/time'\n", + " ).get_attribute('datetime')\n", + " \n", + " if self.date_time is not None:\n", + " self.is_ad = False\n", + " except NoSuchElementException:\n", + " self.is_ad = True\n", + " return\n", + " \n", + " try:\n", + " card.find_element(\n", + " 'xpath',\n", + " './/*[local-name()=\"svg\" and @data-testid=\"icon-verified\"]'\n", + " )\n", + " \n", + " self.verified = True\n", + " except NoSuchElementException:\n", + " self.verified = False\n", + " \n", + " self.content = \"\"\n", + " contents = card.find_elements(\n", + " 'xpath',\n", + " './/div[@data-testid=\"tweetText\"]/span | .//div[@data-testid=\"tweetText\"]/a'\n", + " )\n", + "\n", + " for index, content in enumerate(contents):\n", + " self.content += content.text\n", + " \n", + " try:\n", + " self.reply_cnt= card.find_element(\n", + " 'xpath',\n", + " './/div[@data-testid=\"reply\"]//span'\n", + " ).text\n", + " except NoSuchElementException:\n", + " self.reply_cnt = 0\n", + " \n", + " try:\n", + " self.retweet_cnt = card.find_element(\n", + " 'xpath',\n", + " './/div[@data-testid=\"retweet\"]//span'\n", + " ).text\n", + " except NoSuchElementException:\n", + " self.retweet_cnt = 0\n", + " \n", + " try:\n", + " self.like_cnt = card.find_element(\n", + " 'xpath',\n", + " './/div[@data-testid=\"like\"]//span'\n", + " ).text\n", + " except NoSuchElementException:\n", + " self.like_cnt = 0\n", + " \n", + " self.tweet = (\n", + " self.user,\n", + " self.handle,\n", + " self.date_time,\n", + " self.verified,\n", + " self.content,\n", + " self.reply_cnt,\n", + " self.retweet_cnt,\n", + " self.like_cnt\n", + " )\n", + " \n", + " pass" + ] + }, + { + "cell_type": "code", + "execution_count": 108, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Progress: [[========================================]] 100.00% 50 of 50\n", + "\n", + "Scraping Complete\n", + "Tweets: 50\n" + ] + } + ], + "source": [ + "scraper = Twitter_Scraper(\n", + " username=USER_UNAME,\n", + " password=USER_PASSWORD,\n", + " max_tweets=50\n", + ")\n", + "\n", + "scraper.go_to_home()\n", + "progress = Progress(0, scraper.max_tweets)\n", + "\n", + "while scraper.scroller.scrolling:\n", + " scraper.get_tweets()\n", + "\n", + " for card in scraper.tweet_cards[-15:]:\n", + " tweet_id = str(card)\n", + " if tweet_id not in scraper.tweet_ids:\n", + " scraper.tweet_ids.add(tweet_id)\n", + " tweet = Tweet(card)\n", + " if tweet:\n", + " if not tweet.is_ad:\n", + " scraper.data.append(tweet.tweet)\n", + " progress.print_progress(len(scraper.data))\n", + "\n", + " if len(scraper.data) >= scraper.max_tweets:\n", + " scraper.scroller.scrolling = False\n", + " break\n", + "\n", + " if len(scraper.data) >= scraper.max_tweets:\n", + " break\n", + " \n", + " scraper.scroller.scroll_count = 0\n", + " \n", + " while True:\n", + " scraper.driver.execute_script(\n", + " 'window.scrollTo(0, document.body.scrollHeight);')\n", + " sleep(2)\n", + " scraper.scroller.current_position = scraper.driver.execute_script(\n", + " \"return window.pageYOffset;\"\n", + " )\n", + " \n", + " if scraper.scroller.last_position == scraper.scroller.current_position:\n", + " scraper.scroller.scroll_count += 1\n", + " \n", + " if scraper.scroller.scroll_count >= 3:\n", + " scraper.scroller.scrolling = False\n", + " break\n", + " else:\n", + " sleep(2)\n", + " else:\n", + " scraper.scroller.last_position = scraper.scroller.current_position\n", + " break\n", + "\n", + "print(\"Scraping Complete\")\n", + "print(\"Tweets: {}\".format(len(scraper.data)))" + ] + }, + { + "cell_type": "code", + "execution_count": 109, + "metadata": {}, + "outputs": [], + "source": [ + "# import tabulate\n", + "\n", + "# # Tabulate\n", + "# print(tabulate.tabulate(\n", + "# scraper.data[:10],\n", + "# headers=[\n", + "# 'Name',\n", + "# 'Handle',\n", + "# 'Date Time',\n", + "# 'Verified',\n", + "# 'Content',\n", + "# 'Reply Count',\n", + "# 'Retweet Count',\n", + "# 'Like Count'\n", + "# ],\n", + "# tablefmt='tsv'\n", + "# ))\n", + " " + ] + }, + { + "cell_type": "code", + "execution_count": 110, + "metadata": {}, + "outputs": [], + "source": [ + "# import csv\n", + "# \n", + "# with open('twitter_tweets.csv', 'w', encoding='utf-8', newline='') as f:\n", + "# header = ['Name', 'Handle', 'Timestamp', 'Verified',\n", + "# 'Content', 'Comments', 'Retweets', 'Likes']\n", + "# writer = csv.writer(f)\n", + "# writer.writerow(header)\n", + "# writer.writerows(scraper.data)" + ] + }, + { + "cell_type": "code", + "execution_count": 111, + "metadata": {}, + "outputs": [], + "source": [ + "data = {\n", + " 'Name': [tweet[0] for tweet in scraper.data],\n", + " 'Handle': [tweet[1] for tweet in scraper.data],\n", + " 'Timestamp': [tweet[2] for tweet in scraper.data],\n", + " 'Verified': [tweet[3] for tweet in scraper.data],\n", + " 'Content': [tweet[4] for tweet in scraper.data],\n", + " 'Comments': [tweet[5] for tweet in scraper.data],\n", + " 'Retweets': [tweet[6] for tweet in scraper.data],\n", + " 'Likes': [tweet[7] for tweet in scraper.data]\n", + "}\n", + "\n", + "df = pd.DataFrame(data)\n", + "df.to_csv('twitter_tweets.csv', index=False)\n" + ] } ], "metadata": {