+19
-11
@@ -130,7 +130,7 @@ template fetchImpl(result, fetchBody) {.dirty.} =
|
||||
raise newException(BadClientError, "Bad client")
|
||||
|
||||
if resp.status == $Http404 and result.len == 0:
|
||||
echo "[sessions] transient 404 (empty body), retrying: ", url.path
|
||||
echo "[sessions] transient 404 (empty body), retrying: ", url.path, ", session: ", session.pretty
|
||||
raise rateLimitError()
|
||||
|
||||
if resp.headers.hasKey(rlRemaining):
|
||||
@@ -147,7 +147,7 @@ template fetchImpl(result, fetchBody) {.dirty.} =
|
||||
if result.startsWith("{\"errors"):
|
||||
let errors = result.fromJson(Errors)
|
||||
if errors notin errorsToSkip:
|
||||
echo "Fetch error, API: ", url.path, ", errors: ", errors
|
||||
echo "Fetch error, API: ", url.path, ", errors: ", errors, ", session: ", session.pretty
|
||||
if errors in {expiredToken, badToken, locked}:
|
||||
invalidate(session)
|
||||
raise rateLimitError()
|
||||
@@ -162,7 +162,7 @@ template fetchImpl(result, fetchBody) {.dirty.} =
|
||||
fetchBody
|
||||
|
||||
if resp.status == $Http400:
|
||||
echo "ERROR 400, ", url.path, ": ", result
|
||||
echo "ERROR 400, ", url.path, ": ", result, ", session: ", session.pretty
|
||||
raise newException(InternalError, $url)
|
||||
except InternalError as e:
|
||||
raise e
|
||||
@@ -177,21 +177,29 @@ template fetchImpl(result, fetchBody) {.dirty.} =
|
||||
finally:
|
||||
release(session)
|
||||
|
||||
template retry(bod) =
|
||||
template retry(bod) {.dirty.} =
|
||||
var session: Session
|
||||
for i in 0 ..< maxRetries:
|
||||
try:
|
||||
session = nil
|
||||
bod
|
||||
break
|
||||
except RateLimitError:
|
||||
echo "[sessions] Rate limited, retrying ", req.cookie.endpoint,
|
||||
let api = if session.isNil: req.cookie.endpoint
|
||||
else: req.endpoint(session)
|
||||
if session.isNil:
|
||||
echo "[sessions] Rate limited, retrying ", api,
|
||||
" request (", i, "/", maxRetries, ")..."
|
||||
else:
|
||||
echo "[sessions] Rate limited, retrying ", api,
|
||||
" request (", i, "/", maxRetries, ")..., session: ", session.pretty
|
||||
session = nil
|
||||
if retryDelayMs > 0:
|
||||
await sleepAsync(retryDelayMs)
|
||||
|
||||
proc fetch*(req: ApiReq): Future[JsonNode] {.async.} =
|
||||
retry:
|
||||
var
|
||||
body: string
|
||||
var body: string
|
||||
session = await getAndValidateSession(req)
|
||||
|
||||
let url = req.toUrl(session.kind)
|
||||
@@ -200,22 +208,22 @@ proc fetch*(req: ApiReq): Future[JsonNode] {.async.} =
|
||||
if body.startsWith('{') or body.startsWith('['):
|
||||
result = parseJson(body)
|
||||
else:
|
||||
echo resp.status, ": ", body, " --- url: ", url
|
||||
echo resp.status, ": ", body, " --- url: ", url, ", session: ", session.pretty
|
||||
result = newJNull()
|
||||
|
||||
let error = result.getError
|
||||
if error != null and error notin errorsToSkip:
|
||||
echo "Fetch error, API: ", url.path, ", error: ", error
|
||||
echo "Fetch error, API: ", url.path, ", error: ", error, ", session: ", session.pretty
|
||||
if error in {expiredToken, badToken, locked}:
|
||||
invalidate(session)
|
||||
raise rateLimitError()
|
||||
|
||||
proc fetchRaw*(req: ApiReq): Future[string] {.async.} =
|
||||
retry:
|
||||
var session = await getAndValidateSession(req)
|
||||
session = await getAndValidateSession(req)
|
||||
let url = req.toUrl(session.kind)
|
||||
|
||||
fetchImpl result:
|
||||
if not (result.startsWith('{') or result.startsWith('[')):
|
||||
echo resp.status, ": ", result, " --- url: ", url
|
||||
echo resp.status, ": ", result, " --- url: ", url, ", session: ", session.pretty
|
||||
result.setLen(0)
|
||||
|
||||
+16
-1
@@ -18,7 +18,7 @@ proc setMaxConcurrentReqs*(reqs: int) =
|
||||
template log(str: varargs[string, `$`]) =
|
||||
echo "[sessions] ", str.join("")
|
||||
|
||||
proc endpoint(req: ApiReq; session: Session): string =
|
||||
proc endpoint*(req: ApiReq; session: Session): string =
|
||||
case session.kind
|
||||
of oauth: req.oauth.endpoint
|
||||
of cookie: req.cookie.endpoint
|
||||
@@ -50,6 +50,8 @@ proc getSessionPoolHealth*(): JsonNode =
|
||||
oldest = now.int64
|
||||
newest = 0'i64
|
||||
average = 0'i64
|
||||
oauthTotal, cookieTotal = 0
|
||||
oauthLimited, cookieLimited = 0
|
||||
|
||||
for session in sessionPool:
|
||||
let created = snowflakeToEpoch(session.id)
|
||||
@@ -59,8 +61,15 @@ proc getSessionPoolHealth*(): JsonNode =
|
||||
oldest = created
|
||||
average += created
|
||||
|
||||
case session.kind
|
||||
of oauth: inc oauthTotal
|
||||
of cookie: inc cookieTotal
|
||||
|
||||
if session.limited:
|
||||
limited.incl session.id
|
||||
case session.kind
|
||||
of oauth: inc oauthLimited
|
||||
of cookie: inc cookieLimited
|
||||
|
||||
for api in session.apis.keys:
|
||||
let
|
||||
@@ -84,6 +93,8 @@ proc getSessionPoolHealth*(): JsonNode =
|
||||
"sessions": %*{
|
||||
"total": sessionPool.len,
|
||||
"limited": limited.card,
|
||||
"oauth": %*{"total": oauthTotal, "limited": oauthLimited},
|
||||
"cookie": %*{"total": cookieTotal, "limited": cookieLimited},
|
||||
"oldest": $fromUnix(oldest),
|
||||
"newest": $fromUnix(newest),
|
||||
"average": $fromUnix(average)
|
||||
@@ -100,6 +111,7 @@ proc getSessionPoolDebug*(): JsonNode =
|
||||
|
||||
for session in sessionPool:
|
||||
let sessionJson = %*{
|
||||
"kind": $session.kind,
|
||||
"apis": newJObject(),
|
||||
"pending": session.pending,
|
||||
}
|
||||
@@ -173,7 +185,10 @@ proc getSession*(req: ApiReq): Future[Session] {.async.} =
|
||||
if not result.isNil and result.isReady(req):
|
||||
inc result.pending
|
||||
else:
|
||||
if result.isNil:
|
||||
log "no sessions available for API: ", req.cookie.endpoint
|
||||
else:
|
||||
log "no sessions available for API: ", req.endpoint(result), ", last tried: ", result.pretty
|
||||
raise noSessionsError()
|
||||
|
||||
proc setLimited*(session: Session; req: ApiReq) =
|
||||
|
||||
Reference in New Issue
Block a user