feat: optionally scrape followers and following

This commit is contained in:
Jarrian
2023-09-25 08:27:08 +08:00
parent ed0be321bb
commit 069b0cc24a
4 changed files with 217 additions and 76 deletions

View File

@@ -17,7 +17,7 @@
},
{
"cell_type": "code",
"execution_count": 113,
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
@@ -57,7 +57,7 @@
},
{
"cell_type": "code",
"execution_count": 114,
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
@@ -97,7 +97,7 @@
},
{
"cell_type": "code",
"execution_count": 115,
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
@@ -141,7 +141,7 @@
},
{
"cell_type": "code",
"execution_count": 116,
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
@@ -387,13 +387,12 @@
},
{
"cell_type": "code",
"execution_count": 117,
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"TWITTER_LOGIN_URL = \"https://twitter.com/i/flow/login\"\n",
"\n",
"\n",
"class Twitter_Scraper:\n",
" def __init__(\n",
" self,\n",
@@ -410,6 +409,7 @@
" print(\"Initializing Twitter Scraper...\")\n",
" self.username = username\n",
" self.password = password\n",
" self.interrupted = False\n",
" self.tweet_ids = set()\n",
" self.data = []\n",
" self.tweet_cards = []\n",
@@ -829,6 +829,7 @@
" except KeyboardInterrupt:\n",
" print(\"\\n\")\n",
" print(\"Keyboard Interrupt\")\n",
" self.interrupted = True\n",
" break\n",
" except Exception as e:\n",
" print(\"\\n\")\n",
@@ -899,7 +900,7 @@
},
{
"cell_type": "code",
"execution_count": 118,
"execution_count": 6,
"metadata": {},
"outputs": [
{
@@ -932,7 +933,7 @@
},
{
"cell_type": "code",
"execution_count": 119,
"execution_count": 7,
"metadata": {},
"outputs": [
{
@@ -961,7 +962,7 @@
},
{
"cell_type": "code",
"execution_count": 120,
"execution_count": 8,
"metadata": {},
"outputs": [
{
@@ -998,7 +999,7 @@
},
{
"cell_type": "code",
"execution_count": 121,
"execution_count": 9,
"metadata": {},
"outputs": [
{
@@ -1006,7 +1007,7 @@
"output_type": "stream",
"text": [
"Saving Tweets to CSV...\n",
"CSV Saved: ./tweets/2023-09-24_23-57-11_tweets_1-50.csv\n"
"CSV Saved: ./tweets/2023-09-25_08-20-51_tweets_1-50.csv\n"
]
}
],
@@ -1016,7 +1017,7 @@
},
{
"cell_type": "code",
"execution_count": 122,
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [

View File

@@ -73,6 +73,14 @@ def main():
help="Twitter query or search. Scrape tweets from a query or search.",
)
parser.add_argument(
"-a",
"--add",
type=str,
default="",
help="Additional data to scrape and save in the .csv file.",
)
parser.add_argument(
"--latest",
action="store_true",
@@ -107,6 +115,8 @@ def main():
if args.query is not None:
tweet_type_args.append(args.query)
additional_data: list = args.add.split(",")
if len(tweet_type_args) > 1:
print("Please specify only one of --username, --hashtag, or --query.")
sys.exit(1)
@@ -119,14 +129,8 @@ def main():
scraper = Twitter_Scraper(
username=USER_UNAME,
password=USER_PASSWORD,
max_tweets=args.tweets,
scrape_username=args.username,
scrape_hashtag=args.hashtag,
scrape_query=args.query,
scrape_latest=args.latest,
scrape_top=args.top,
)
scraper.login()
scraper.scrape_tweets(
max_tweets=args.tweets,
scrape_username=args.username,
@@ -134,9 +138,11 @@ def main():
scrape_query=args.query,
scrape_latest=args.latest,
scrape_top=args.top,
scrape_poster_details="pd" in additional_data,
)
scraper.save_to_csv()
scraper.driver.close()
if not scraper.interrupted:
scraper.driver.close()
else:
print(
"Missing Twitter username or password environment variables. Please check your .env file."
@@ -145,6 +151,10 @@ def main():
except KeyboardInterrupt:
print("\nScript Interrupted by user. Exiting...")
sys.exit(1)
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
sys.exit(1)
if __name__ == "__main__":

View File

@@ -1,24 +1,39 @@
from selenium.webdriver import Chrome
from selenium.common.exceptions import NoSuchElementException
from time import sleep
from selenium.common.exceptions import (
NoSuchElementException,
StaleElementReferenceException,
)
from selenium.webdriver.chrome.webdriver import WebDriver
from selenium.webdriver.common.action_chains import ActionChains
class Tweet:
def __init__(self, card: Chrome) -> None:
def __init__(
self,
card: WebDriver,
driver: WebDriver,
actions: ActionChains,
scrape_poster_details=False,
) -> None:
self.card = card
self.error = False
self.tweet = None
try:
self.user = card.find_element(
"xpath", './/div[@data-testid="User-Name"]//span'
).text
except NoSuchElementException:
return
self.error = True
self.user = "skip"
try:
self.handle = card.find_element(
"xpath", './/span[contains(text(), "@")]'
).text
except NoSuchElementException:
return
self.error = True
self.handle = "skip"
try:
self.date_time = card.find_element("xpath", ".//time").get_attribute(
@@ -29,6 +44,10 @@ class Tweet:
self.is_ad = False
except NoSuchElementException:
self.is_ad = True
self.error = True
self.date_time = "skip"
if self.error:
return
try:
@@ -129,6 +148,75 @@ class Tweet:
except NoSuchElementException:
self.profile_img = ""
self.following_cnt = "0"
self.followers_cnt = "0"
if scrape_poster_details:
el_name = card.find_element(
"xpath", './/div[@data-testid="User-Name"]//span'
)
ext_hover_card = False
ext_following = False
ext_followers = False
hover_attempt = 0
while not ext_hover_card or not ext_following or not ext_followers:
try:
actions.move_to_element(el_name).perform()
hover_card = driver.find_element(
"xpath", '//div[@data-testid="hoverCardParent"]'
)
ext_hover_card = True
while not ext_following:
try:
self.following_cnt = hover_card.find_element(
"xpath", './/a[contains(@href, "/following")]//span'
).text
if self.following_cnt == "":
self.following_cnt = "0"
ext_following = True
except NoSuchElementException:
continue
except StaleElementReferenceException:
self.error = True
return
while not ext_followers:
try:
self.followers_cnt = hover_card.find_element(
"xpath",
'.//a[contains(@href, "/verified_followers")]//span',
).text
if self.followers_cnt == "":
self.followers_cnt = "0"
ext_followers = True
except NoSuchElementException:
continue
except StaleElementReferenceException:
self.error = True
return
except NoSuchElementException:
if hover_attempt == 3:
self.error
return
hover_attempt += 1
sleep(0.5)
continue
except StaleElementReferenceException:
self.error = True
return
if ext_hover_card and ext_following and ext_followers:
actions.reset_actions()
self.tweet = (
self.user,
self.handle,
@@ -143,6 +231,8 @@ class Tweet:
self.mentions,
self.emojis,
self.profile_img,
self.following_cnt,
self.followers_cnt,
)
pass

View File

@@ -8,6 +8,7 @@ from tweet import Tweet
from datetime import datetime
from fake_headers import Headers
from time import sleep
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.common.exceptions import (
@@ -15,7 +16,7 @@ from selenium.common.exceptions import (
StaleElementReferenceException,
WebDriverException,
)
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.webdriver.chrome.service import Service as ChromeService
@@ -33,12 +34,14 @@ class Twitter_Scraper:
scrape_username=None,
scrape_hashtag=None,
scrape_query=None,
scrape_poster_details=False,
scrape_latest=True,
scrape_top=False,
):
print("Initializing Twitter Scraper...")
self.username = username
self.password = password
self.interrupted = False
self.tweet_ids = set()
self.data = []
self.tweet_cards = []
@@ -48,13 +51,14 @@ class Twitter_Scraper:
"hashtag": None,
"query": None,
"tab": None,
"poster_details": False,
}
self.max_tweets = max_tweets
self.progress = Progress(0, max_tweets)
self.router = self.go_to_home
self.driver = self._get_driver()
self.actions = ActionChains(self.driver)
self.scroller = Scroller(self.driver)
self._login()
self._config_scraper(
max_tweets,
scrape_username,
@@ -62,6 +66,7 @@ class Twitter_Scraper:
scrape_query,
scrape_latest,
scrape_top,
scrape_poster_details,
)
def _config_scraper(
@@ -72,6 +77,7 @@ class Twitter_Scraper:
scrape_query=None,
scrape_latest=True,
scrape_top=False,
scrape_poster_details=False,
):
self.tweet_ids = set()
self.data = []
@@ -86,6 +92,7 @@ class Twitter_Scraper:
else None,
"query": scrape_query,
"tab": "Latest" if scrape_latest else "Top" if scrape_top else "Latest",
"poster_details": scrape_poster_details,
}
self.router = self.go_to_home
self.scroller = Scroller(self.driver)
@@ -127,6 +134,7 @@ class Twitter_Scraper:
options=browser_option,
)
print("WebDriver Setup Complete")
return driver
except WebDriverException:
try:
@@ -140,17 +148,20 @@ class Twitter_Scraper:
options=browser_option,
)
print("WebDriver Setup Complete")
return driver
except Exception as e:
print(f"Error setting up WebDriver: {e}")
sys.exit(1)
pass
def _login(self):
def login(self):
print()
print("Logging in to Twitter...")
try:
self.driver.get(TWITTER_LOGIN_URL)
self.driver.maximize_window()
self.driver.get(TWITTER_LOGIN_URL)
sleep(3)
self._input_username()
@@ -313,10 +324,24 @@ It may be due to the following:
def get_tweet_cards(self):
self.tweet_cards = self.driver.find_elements(
"xpath", '//article[@data-testid="tweet"]'
"xpath", '//article[@data-testid="tweet" and not(@disabled)]'
)
pass
def remove_hidden_cards(self):
try:
hidden_cards = self.driver.find_elements(
"xpath", '//article[@data-testid="tweet" and @disabled]'
)
for card in hidden_cards[1:-2]:
self.driver.execute_script(
"arguments[0].parentNode.parentNode.parentNode.remove();", card
)
except Exception as e:
return
pass
def scrape_tweets(
self,
max_tweets=50,
@@ -325,6 +350,7 @@ It may be due to the following:
scrape_query=None,
scrape_latest=True,
scrape_top=False,
scrape_poster_details=False,
router=None,
):
self._config_scraper(
@@ -334,6 +360,7 @@ It may be due to the following:
scrape_query,
scrape_latest,
scrape_top,
scrape_poster_details,
)
if router is None:
@@ -364,6 +391,7 @@ It may be due to the following:
refresh_count = 0
added_tweets = 0
empty_count = 0
while self.scroller.scrolling:
try:
@@ -371,62 +399,70 @@ It may be due to the following:
added_tweets = 0
for card in self.tweet_cards[-15:]:
tweet = Tweet(card)
try:
tweet_id = f"{tweet.user}{tweet.handle}{tweet.date_time}"
except Exception as e:
tweet_id = str(card)
if tweet_id not in self.tweet_ids:
self.tweet_ids.add(tweet_id)
if not self.scraper_details["poster_details"]:
self.driver.execute_script(
"arguments[0].scrollIntoView();", card
)
tweet = Tweet(
card=card,
driver=self.driver,
actions=self.actions,
scrape_poster_details=self.scraper_details[
"poster_details"
],
)
if tweet:
if not tweet.error and tweet.tweet is not None:
if not tweet.is_ad:
self.data.append(tweet.tweet)
added_tweets += 1
self.progress.print_progress(len(self.data))
if len(self.data) >= self.max_tweets:
self.scroller.scrolling = False
break
else:
continue
else:
continue
else:
continue
else:
continue
except NoSuchElementException:
continue
if tweet_id not in self.tweet_ids:
self.tweet_ids.add(tweet_id)
if tweet:
if not tweet.is_ad:
self.data.append(tweet.tweet)
added_tweets += 1
self.progress.print_progress(len(self.data))
if len(self.data) >= self.max_tweets:
self.scroller.scrolling = False
break
if len(self.data) % 50 == 0:
sleep(2)
if len(self.data) >= self.max_tweets:
break
if added_tweets == 0:
refresh_count += 1
if refresh_count >= 10:
print()
print("No more tweets to scrape")
break
else:
refresh_count = 0
self.scroller.scroll_count = 0
while True:
self.scroller.scroll_to_bottom()
sleep(2)
self.scroller.update_scroll_position()
if self.scroller.last_position == self.scroller.current_position:
self.scroller.scroll_count += 1
if self.scroller.scroll_count >= 3:
router()
sleep(2)
if empty_count >= 5:
if refresh_count >= 3:
print()
print("No more tweets to scrape")
break
else:
sleep(1)
else:
self.scroller.last_position = self.scroller.current_position
break
refresh_count += 1
empty_count += 1
sleep(1)
else:
empty_count = 0
refresh_count = 0
except StaleElementReferenceException:
router()
sleep(2)
continue
except KeyboardInterrupt:
print("\n")
print("Keyboard Interrupt")
self.interrupted = True
break
except Exception as e:
print("\n")
print(f"Error scraping tweets: {e}")
@@ -468,6 +504,10 @@ It may be due to the following:
"Profile Image": [tweet[12] for tweet in self.data],
}
if self.scraper_details["poster_details"]:
data["Following"] = [tweet[13] for tweet in self.data]
data["Followers"] = [tweet[14] for tweet in self.data]
df = pd.DataFrame(data)
current_time = now.strftime("%Y-%m-%d_%H-%M-%S")