diff --git a/twscrape/xclid.py b/twscrape/xclid.py index 32f1eb70..64379e87 100644 --- a/twscrape/xclid.py +++ b/twscrape/xclid.py @@ -1,10 +1,11 @@ +import asyncio import base64 import hashlib import math import random import re import time -from typing import Iterator +from urllib.parse import urljoin import bs4 @@ -45,16 +46,28 @@ def script_url(k: str, v: str): return f"https://abs.twimg.com/responsive-web/client-web/{k}.{v}.js" -def get_scripts_list(text: str) -> Iterator[str]: +# Current X web build (Vite): script bundles are linked directly in the page +# HTML under https://abs.twimg.com/x-web/.../*.js (modulepreload links + entry). +ASSET_URL_RE = re.compile(r"https://[\w.-]+/x-web/[\w./-]+\.js") + + +def get_scripts_list(text: str) -> list[str]: """ Extract chunk script URLs from the X homepage HTML. - As of 2026-05, X embeds two separate maps directly in the page HTML: + Current build (x-web / Vite): scripts are linked directly in the page HTML, + so we just collect those URLs. If none are found we fall back to the legacy + webpack build, which embeds two maps in the page and requires URL + reconstruction: - Hash map {chunk_id: "7hexchars"} values are exactly 7 lowercase hex digits - Name map {chunk_id: "human_readable_name"} values contain non-hex characters - - URL format: https://abs.twimg.com/responsive-web/client-web/{name}.{hash}a.js + URL format: https://abs.twimg.com/responsive-web/client-web/{name}.{hash}a.js """ + urls = list(dict.fromkeys(ASSET_URL_RE.findall(text))) + if urls: + return urls + + # Legacy webpack build fallback. # Hash map: values are exactly 7 lowercase hex digits (distinguishes them from name-map values) hash_map = {m.group(1): m.group(2) for m in re.finditer(r'(\d+):"([0-9a-f]{7})"', text)} @@ -68,9 +81,10 @@ def get_scripts_list(text: str) -> Iterator[str]: if not re.fullmatch(r"[0-9a-f]{7}", val): name_map[m.group(1)] = val - for chunk_id, hash_val in hash_map.items(): - name = name_map.get(chunk_id, chunk_id) - yield script_url(name, hash_val + "a") + return [ + script_url(name_map.get(chunk_id, chunk_id), hash_val + "a") + for chunk_id, hash_val in hash_map.items() + ] # MARK: XClientTxId parsing @@ -216,13 +230,50 @@ def parse_vk_bytes(soup: bs4.BeautifulSoup) -> list[int]: return list(base64.b64decode(bytes(el, "utf-8"))) +# File holding the animation indices: legacy build linked `ondemand.s.*.js`, +# current x-web build dynamically imports `sign.o-*.js` from a bundle chunk. +# \b guards against substring hits like `design.o-*.js`. +INDICES_FILE_RE = re.compile(r"(?:\.{0,2}/)?[\w./-]*?\b(?:ondemand\.s|sign\.o)[\w.-]*\.js") + + +async def _find_indices_url(scripts: list[str], clt: HttpClient) -> str: + # The indices file (sign.o-*.js) is not linked in the page directly — it is + # dynamically imported from one of the bundle chunks. Scan chunks + # concurrently and resolve the first reference we find, then stop. + sem = asyncio.Semaphore(16) + + async def fetch(url: str) -> tuple[str, str]: + async with sem: + try: + return url, (await clt.get(url)).text + except Exception: + return url, "" + + tasks = [asyncio.create_task(fetch(u)) for u in scripts] + try: + for fut in asyncio.as_completed(tasks): + url, body = await fut + m = INDICES_FILE_RE.search(body) + if m: + return urljoin(url, m.group(0)) + finally: + for t in tasks: + t.cancel() + + raise Exception("Couldn't get XClientTxId indices script") + + async def parse_anim_idx(text: str, clt: HttpClient) -> list[int]: scripts = list(get_scripts_list(text)) - scripts = [x for x in scripts if "/ondemand.s." in x] if not scripts: raise Exception("Couldn't get XClientTxId scripts") - text = await get_tw_page_text(scripts[0], clt) + # Legacy build links the indices file directly; the current x-web build + # hides it behind a dynamic import inside a bundle chunk. + direct = [x for x in scripts if INDICES_FILE_RE.search(x)] + url = direct[0] if direct else await _find_indices_url(scripts, clt) + + text = await get_tw_page_text(url, clt) items = [int(x.group(2)) for x in INDICES_REGEX.finditer(text)] if not items: @@ -313,6 +364,4 @@ async def main(): if __name__ == "__main__": - import asyncio - asyncio.run(main())