1
0
mirror of https://github.com/zedeus/nitter.git synced 2025-12-06 03:55:36 -05:00

3 Commits

Author SHA1 Message Date
Zed
0bb0b7e78c Support grok_share card
Fixes #1306
2025-11-17 06:37:24 +01:00
Zed
a666c4867c Include username in session logs if available
Fixes #1310
2025-11-17 05:42:44 +01:00
Zed
778eb35ee3 Add curl-based cookie session script 2025-11-17 03:55:23 +01:00
12 changed files with 387 additions and 18 deletions

View File

@@ -60,11 +60,11 @@ proc getAndValidateSession*(api: Api): Future[Session] {.async.} =
case result.kind
of SessionKind.oauth:
if result.oauthToken.len == 0:
echo "[sessions] Empty oauth token, session: ", result.id
echo "[sessions] Empty oauth token, session: ", result.pretty
raise rateLimitError()
of SessionKind.cookie:
if result.authToken.len == 0 or result.ct0.len == 0:
echo "[sessions] Empty cookie credentials, session: ", result.id
echo "[sessions] Empty cookie credentials, session: ", result.pretty
raise rateLimitError()
template fetchImpl(result, fetchBody) {.dirty.} =
@@ -107,7 +107,7 @@ template fetchImpl(result, fetchBody) {.dirty.} =
setLimited(session, api)
raise rateLimitError()
elif result.startsWith("429 Too Many Requests"):
echo "[sessions] 429 error, API: ", api, ", session: ", session.id
echo "[sessions] 429 error, API: ", api, ", session: ", session.pretty
session.apis[api].remaining = 0
# rate limit hit, resets after the 15 minute window
raise rateLimitError()
@@ -124,8 +124,8 @@ template fetchImpl(result, fetchBody) {.dirty.} =
except OSError as e:
raise e
except Exception as e:
let id = if session.isNil: "null" else: $session.id
echo "error: ", e.name, ", msg: ", e.msg, ", sessionId: ", id, ", url: ", url
let s = session.pretty
echo "error: ", e.name, ", msg: ", e.msg, ", session: ", s, ", url: ", url
raise rateLimitError()
finally:
release(session)

View File

@@ -29,6 +29,20 @@ var
template log(str: varargs[string, `$`]) =
echo "[sessions] ", str.join("")
proc pretty*(session: Session): string =
if session.isNil:
return "<null>"
if session.id > 0 and session.username.len > 0:
result = $session.id & " (" & session.username & ")"
elif session.username.len > 0:
result = session.username
elif session.id > 0:
result = $session.id
else:
result = "<unknown>"
result = $session.kind & " " & result
proc snowflakeToEpoch(flake: int64): int64 =
int64(((flake shr 22) + 1288834974657) div 1000)
@@ -130,7 +144,7 @@ proc isLimited(session: Session; api: Api): bool =
if session.limited and api != Api.userTweets:
if (epochTime().int - session.limitedAt) > hourInSeconds:
session.limited = false
log "resetting limit: ", session.id
log "resetting limit: ", session.pretty
return false
else:
return true
@@ -146,7 +160,7 @@ proc isReady(session: Session; api: Api): bool =
proc invalidate*(session: var Session) =
if session.isNil: return
log "invalidating: ", session.id
log "invalidating: ", session.pretty
# TODO: This isn't sufficient, but it works for now
let idx = sessionPool.find(session)
@@ -171,7 +185,7 @@ proc getSession*(api: Api): Future[Session] {.async.} =
proc setLimited*(session: Session; api: Api) =
session.limited = true
session.limitedAt = epochTime().int
log "rate limited by api: ", api, ", reqs left: ", session.apis[api].remaining, ", id: ", session.id
log "rate limited by api: ", api, ", reqs left: ", session.apis[api].remaining, ", ", session.pretty
proc setRateLimit*(session: Session; api: Api; remaining, reset, limit: int) =
# avoid undefined behavior in race conditions

View File

@@ -13,6 +13,7 @@ proc parseSession*(raw: string): Session =
result = Session(
kind: SessionKind.oauth,
id: parseBiggestInt(id),
username: session.username,
oauthToken: session.oauthToken,
oauthSecret: session.oauthTokenSecret
)
@@ -21,6 +22,7 @@ proc parseSession*(raw: string): Session =
result = Session(
kind: SessionKind.cookie,
id: id,
username: session.username,
authToken: session.authToken,
ct0: session.ct0
)

View File

@@ -1,6 +1,7 @@
import std/[options, tables, strutils, strformat, sugar]
import jsony
import user, ../types/unifiedcard
import ../../formatters
from ../../types import Card, CardKind, Video
from ../../utils import twimg, https
@@ -77,6 +78,18 @@ proc parseMedia(component: Component; card: UnifiedCard; result: var Card) =
of model3d:
result.title = "Unsupported 3D model ad"
proc parseGrokShare(data: ComponentData; card: UnifiedCard; result: var Card) =
result.kind = summaryLarge
data.destination.parseDestination(card, result)
result.dest = "Answer by Grok"
for msg in data.conversationPreview:
if msg.sender == "USER":
result.title = msg.message.shorten(70)
elif msg.sender == "AGENT":
result.text = msg.message.shorten(500)
proc parseUnifiedCard*(json: string): Card =
let card = json.fromJson(UnifiedCard)
@@ -92,6 +105,8 @@ proc parseUnifiedCard*(json: string): Card =
component.parseMedia(card, result)
of buttonGroup:
discard
of grokShare:
component.data.parseGrokShare(card, result)
of ComponentType.jobDetails:
component.data.parseJobDetails(card, result)
of ComponentType.hidden:

View File

@@ -1,8 +1,8 @@
type
RawSession* = object
kind*: string
username*: string
id*: string
username*: string
oauthToken*: string
oauthTokenSecret*: string
authToken*: string

View File

@@ -22,6 +22,7 @@ type
communityDetails
mediaWithDetailsHorizontal
hidden
grokShare
unknown
Component* = object
@@ -42,6 +43,7 @@ type
topicDetail*: tuple[title: Text]
profileUser*: User
shortDescriptionText*: string
conversationPreview*: seq[GrokConversation]
MediaItem* = object
id*: string
@@ -76,6 +78,10 @@ type
title*: Text
category*: Text
GrokConversation* = object
message*: string
sender*: string
TypeField = Component | Destination | MediaEntity | AppStoreData
converter fromText*(text: Text): string = string(text)
@@ -96,6 +102,7 @@ proc enumHook*(s: string; v: var ComponentType) =
of "community_details": communityDetails
of "media_with_details_horizontal": mediaWithDetailsHorizontal
of "commerce_drop_details": hidden
of "grok_share": grokShare
else: echo "ERROR: Unknown enum value (ComponentType): ", s; unknown
proc enumHook*(s: string; v: var AppType) =

View File

@@ -33,10 +33,13 @@ proc getUrlPrefix*(cfg: Config): string =
if cfg.useHttps: https & cfg.hostname
else: "http://" & cfg.hostname
proc shortLink*(text: string; length=28): string =
result = text.replace(wwwRegex, "")
proc shorten*(text: string; length=28): string =
result = text
if result.len > length:
result = result[0 ..< length] & ""
proc shortLink*(text: string; length=28): string =
result = text.replace(wwwRegex, "").shorten(length)
proc stripHtml*(text: string; shorten=false): string =
var html = parseHtml(text)

View File

@@ -42,6 +42,7 @@
.card-description {
margin: 0.3em 0;
white-space: pre-wrap;
}
.card-destination {

View File

@@ -38,6 +38,7 @@ type
Session* = ref object
id*: int64
username*: string
pending*: int
limited*: bool
limitedAt*: int

View File

@@ -1,23 +1,20 @@
#!/usr/bin/env python3
"""
Authenticates with X.com/Twitter and extracts session cookies for use with Nitter.
Handles 2FA, extracts user info, and outputs clean JSON for sessions.jsonl.
Requirements:
pip install -r tools/requirements.txt
Usage:
python3 tools/get_web_session.py <username> <password> [totp_seed] [--append sessions.jsonl] [--headless]
python3 tools/create_session_browser.py <username> <password> [totp_seed] [--append sessions.jsonl] [--headless]
Examples:
# Output to terminal
python3 tools/get_web_session.py myusername mypassword TOTP_BASE32_SECRET
python3 tools/create_session_browser.py myusername mypassword TOTP_SECRET
# Append to sessions.jsonl
python3 tools/get_web_session.py myusername mypassword TOTP_SECRET --append sessions.jsonl
python3 tools/create_session_browser.py myusername mypassword TOTP_SECRET --append sessions.jsonl
# Headless mode (may increase detection risk)
python3 tools/get_web_session.py myusername mypassword TOTP_SECRET --headless
python3 tools/create_session_browser.py myusername mypassword TOTP_SECRET --headless
Output:
{"kind": "cookie", "username": "...", "id": "...", "auth_token": "...", "ct0": "..."}

View File

@@ -0,0 +1,328 @@
#!/usr/bin/env python3
"""
Requirements:
pip install curl_cffi pyotp
Usage:
python3 tools/create_session_curl.py <username> <password> [totp_seed] [--append sessions.jsonl]
Examples:
# Output to terminal
python3 tools/create_session_curl.py myusername mypassword TOTP_SECRET
# Append to sessions.jsonl
python3 tools/create_session_curl.py myusername mypassword TOTP_SECRET --append sessions.jsonl
Output:
{"kind": "cookie", "username": "...", "id": "...", "auth_token": "...", "ct0": "..."}
"""
import sys
import json
import pyotp
from curl_cffi import requests
BEARER_TOKEN = "AAAAAAAAAAAAAAAAAAAAAFQODgEAAAAAVHTp76lzh3rFzcHbmHVvQxYYpTw%3DckAlMINMjmCwxUcaXbAN4XqJVdgMJaHqNOFgPMK0zN1qLqLQCF"
BASE_URL = "https://api.x.com/1.1/onboarding/task.json"
GUEST_ACTIVATE_URL = "https://api.x.com/1.1/guest/activate.json"
# Subtask versions required by API
SUBTASK_VERSIONS = {
"action_list": 2, "alert_dialog": 1, "app_download_cta": 1,
"check_logged_in_account": 2, "choice_selection": 3,
"contacts_live_sync_permission_prompt": 0, "cta": 7, "email_verification": 2,
"end_flow": 1, "enter_date": 1, "enter_email": 2, "enter_password": 5,
"enter_phone": 2, "enter_recaptcha": 1, "enter_text": 5, "generic_urt": 3,
"in_app_notification": 1, "interest_picker": 3, "js_instrumentation": 1,
"menu_dialog": 1, "notifications_permission_prompt": 2, "open_account": 2,
"open_home_timeline": 1, "open_link": 1, "phone_verification": 4,
"privacy_options": 1, "security_key": 3, "select_avatar": 4,
"select_banner": 2, "settings_list": 7, "show_code": 1, "sign_up": 2,
"sign_up_review": 4, "tweet_selection_urt": 1, "update_users": 1,
"upload_media": 1, "user_recommendations_list": 4,
"user_recommendations_urt": 1, "wait_spinner": 3, "web_modal": 1
}
def get_base_headers(guest_token=None):
"""Build base headers for API requests."""
headers = {
"Authorization": f"Bearer {BEARER_TOKEN}",
"Content-Type": "application/json",
"Accept": "*/*",
"Accept-Language": "en-US",
"X-Twitter-Client-Language": "en-US",
"Origin": "https://x.com",
"Referer": "https://x.com/",
}
if guest_token:
headers["X-Guest-Token"] = guest_token
return headers
def get_cookies_dict(session):
"""Extract cookies from session."""
return session.cookies.get_dict() if hasattr(session.cookies, 'get_dict') else dict(session.cookies)
def make_request(session, headers, flow_token, subtask_data, print_msg):
"""Generic request handler for flow steps."""
print(f"[*] {print_msg}...", file=sys.stderr)
payload = {
"flow_token": flow_token,
"subtask_inputs": [subtask_data] if isinstance(subtask_data, dict) else subtask_data
}
response = session.post(BASE_URL, json=payload, headers=headers)
response.raise_for_status()
data = response.json()
new_flow_token = data.get('flow_token')
if not new_flow_token:
raise Exception(f"Failed to get flow token: {print_msg}")
return new_flow_token, data
def get_guest_token(session):
"""Get guest token for unauthenticated requests."""
print("[*] Getting guest token...", file=sys.stderr)
response = session.post(GUEST_ACTIVATE_URL, headers={"Authorization": f"Bearer {BEARER_TOKEN}"})
response.raise_for_status()
guest_token = response.json().get('guest_token')
if not guest_token:
raise Exception("Failed to obtain guest token")
print(f"[*] Got guest token: {guest_token}", file=sys.stderr)
return guest_token
def init_flow(session, guest_token):
"""Initialize the login flow."""
print("[*] Initializing login flow...", file=sys.stderr)
headers = get_base_headers(guest_token)
payload = {
"input_flow_data": {
"flow_context": {
"debug_overrides": {},
"start_location": {"location": "manual_link"}
},
"subtask_versions": SUBTASK_VERSIONS
}
}
response = session.post(f"{BASE_URL}?flow_name=login", json=payload, headers=headers)
response.raise_for_status()
flow_token = response.json().get('flow_token')
if not flow_token:
raise Exception("Failed to get initial flow token")
print("[*] Got initial flow token", file=sys.stderr)
return flow_token, headers
def submit_username(session, flow_token, headers, guest_token, username):
"""Submit username."""
headers = headers.copy()
headers["X-Guest-Token"] = guest_token
subtask = {
"subtask_id": "LoginEnterUserIdentifierSSO",
"settings_list": {
"setting_responses": [{
"key": "user_identifier",
"response_data": {"text_data": {"result": username}}
}],
"link": "next_link"
}
}
flow_token, data = make_request(session, headers, flow_token, subtask, "Submitting username")
# Check for denial (suspicious activity)
if data.get('subtasks') and 'cta' in data['subtasks'][0]:
error_msg = data['subtasks'][0]['cta'].get('primary_text', {}).get('text')
if error_msg:
raise Exception(f"Login denied: {error_msg}")
return flow_token
def submit_password(session, flow_token, headers, guest_token, password):
"""Submit password and detect if 2FA is needed."""
headers = headers.copy()
headers["X-Guest-Token"] = guest_token
subtask = {
"subtask_id": "LoginEnterPassword",
"enter_password": {"password": password, "link": "next_link"}
}
flow_token, data = make_request(session, headers, flow_token, subtask, "Submitting password")
needs_2fa = any(s.get('subtask_id') == 'LoginTwoFactorAuthChallenge' for s in data.get('subtasks', []))
if needs_2fa:
print("[*] 2FA required", file=sys.stderr)
return flow_token, needs_2fa
def submit_2fa(session, flow_token, headers, guest_token, totp_seed):
"""Submit 2FA code."""
if not totp_seed:
raise Exception("2FA required but no TOTP seed provided")
code = pyotp.TOTP(totp_seed).now()
print("[*] Generating 2FA code...", file=sys.stderr)
headers = headers.copy()
headers["X-Guest-Token"] = guest_token
subtask = {
"subtask_id": "LoginTwoFactorAuthChallenge",
"enter_text": {"text": code, "link": "next_link"}
}
flow_token, _ = make_request(session, headers, flow_token, subtask, "Submitting 2FA code")
return flow_token
def submit_js_instrumentation(session, flow_token, headers, guest_token):
"""Submit JS instrumentation response."""
headers = headers.copy()
headers["X-Guest-Token"] = guest_token
subtask = {
"subtask_id": "LoginJsInstrumentationSubtask",
"js_instrumentation": {
"response": '{"rf":{"a4fc506d24bb4843c48a1966940c2796bf4fb7617a2d515ad3297b7df6b459b6":121,"bff66e16f1d7ea28c04653dc32479cf416a9c8b67c80cb8ad533b2a44fee82a3":-1,"ac4008077a7e6ca03210159dbe2134dea72a616f03832178314bb9931645e4f7":-22,"c3a8a81a9b2706c6fec42c771da65a9597c537b8e4d9b39e8e58de9fe31ff239":-12},"s":"ZHYaDA9iXRxOl2J3AZ9cc23iJx-Fg5E82KIBA_fgeZFugZGYzRtf8Bl3EUeeYgsK30gLFD2jTQx9fAMsnYCw0j8ahEy4Pb5siM5zD6n7YgOeWmFFaXoTwaGY4H0o-jQnZi5yWZRAnFi4lVuCVouNz_xd2BO2sobCO7QuyOsOxQn2CWx7bjD8vPAzT5BS1mICqUWyjZDjLnRZJU6cSQG5YFIHEPBa8Kj-v1JFgkdAfAMIdVvP7C80HWoOqYivQR7IBuOAI4xCeLQEdxlGeT-JYStlP9dcU5St7jI6ExyMeQnRicOcxXLXsan8i5Joautk2M8dAJFByzBaG4wtrPhQ3QAAAZEi-_t7"}',
"link": "next_link"
}
}
flow_token, _ = make_request(session, headers, flow_token, subtask, "Submitting JS instrumentation")
return flow_token
def complete_flow(session, flow_token, headers):
"""Complete the login flow."""
cookies = get_cookies_dict(session)
headers = headers.copy()
headers["X-Twitter-Auth-Type"] = "OAuth2Session"
if cookies.get('ct0'):
headers["X-Csrf-Token"] = cookies['ct0']
subtask = {
"subtask_id": "AccountDuplicationCheck",
"check_logged_in_account": {"link": "AccountDuplicationCheck_false"}
}
make_request(session, headers, flow_token, subtask, "Completing login flow")
def extract_user_id(cookies_dict):
"""Extract user ID from twid cookie."""
twid = cookies_dict.get('twid', '').strip('"')
for prefix in ['u=', 'u%3D']:
if prefix in twid:
return twid.split(prefix)[1].split('&')[0].strip('"')
return None
def login_and_get_cookies(username, password, totp_seed=None):
"""Authenticate with X.com and extract session cookies."""
session = requests.Session(impersonate="chrome")
try:
guest_token = get_guest_token(session)
flow_token, headers = init_flow(session, guest_token)
flow_token = submit_js_instrumentation(session, flow_token, headers, guest_token)
flow_token = submit_username(session, flow_token, headers, guest_token, username)
flow_token, needs_2fa = submit_password(session, flow_token, headers, guest_token, password)
if needs_2fa:
flow_token = submit_2fa(session, flow_token, headers, guest_token, totp_seed)
complete_flow(session, flow_token, headers)
cookies_dict = get_cookies_dict(session)
cookies_dict['username'] = username
user_id = extract_user_id(cookies_dict)
if user_id:
cookies_dict['id'] = user_id
print("[*] Successfully authenticated", file=sys.stderr)
return cookies_dict
finally:
session.close()
def main():
if len(sys.argv) < 3:
print('Usage: python3 create_session_curl.py username password [totp_seed] [--append sessions.jsonl]', file=sys.stderr)
sys.exit(1)
username = sys.argv[1]
password = sys.argv[2]
totp_seed = None
append_file = None
# Parse optional arguments
i = 3
while i < len(sys.argv):
arg = sys.argv[i]
if arg == '--append':
if i + 1 < len(sys.argv):
append_file = sys.argv[i + 1]
i += 2
else:
print('[!] Error: --append requires a filename', file=sys.stderr)
sys.exit(1)
elif not arg.startswith('--'):
if totp_seed is None:
totp_seed = arg
i += 1
else:
print(f'[!] Warning: Unknown argument: {arg}', file=sys.stderr)
i += 1
try:
cookies = login_and_get_cookies(username, password, totp_seed)
session = {
'kind': 'cookie',
'username': cookies['username'],
'id': cookies.get('id'),
'auth_token': cookies['auth_token'],
'ct0': cookies['ct0']
}
output = json.dumps(session)
if append_file:
with open(append_file, 'a') as f:
f.write(output + '\n')
print(f'✓ Session appended to {append_file}', file=sys.stderr)
else:
print(output)
sys.exit(0)
except Exception as error:
print(f'[!] Error: {error}', file=sys.stderr)
import traceback
traceback.print_exc(file=sys.stderr)
sys.exit(1)
if __name__ == '__main__':
main()

View File

@@ -1,2 +1,3 @@
nodriver>=0.48.0
pyotp
curl_cffi