import argparse import asyncio import json import os import sys import time import urllib.request from pathlib import Path from urllib.parse import urlparse def build_args(): parser = argparse.ArgumentParser() parser.add_argument( "--url", default="https://arena.5eplay.com/data/match/g161-20260118222715609322516", ) parser.add_argument("--url-list", default="") parser.add_argument("--out", default="output_arena") parser.add_argument("--match-name", default="") parser.add_argument("--headless", default="false") parser.add_argument("--timeout-ms", type=int, default=30000) parser.add_argument("--capture-ms", type=int, default=5000) parser.add_argument("--iframe-capture-ms", type=int, default=8000) parser.add_argument("--concurrency", type=int, default=3) parser.add_argument("--goto-retries", type=int, default=1) parser.add_argument("--fetch-type", default="both", choices=["iframe", "demo", "both"]) return parser def ensure_dir(path): Path(path).mkdir(parents=True, exist_ok=True) def truthy(value): return str(value).lower() in {"1", "true", "yes", "y", "on"} def log(message): stamp = time.strftime("%H:%M:%S") print(f"[{stamp}] {message}") def safe_folder(value): keep = [] for ch in value: if ch.isalnum() or ch in {"-", "_"}: keep.append(ch) return "".join(keep) or "match" def extract_match_code(url): for part in url.split("/"): if part.startswith("g") and "-" in part: return part return "" def read_url_list(path): if not path: return [] if not os.path.exists(path): return [] urls = [] with open(path, "r", encoding="utf-8-sig") as f: for line in f: value = line.strip() if not value or value.startswith("#"): continue urls.append(value) return urls def collect_demo_urls(value, results): if isinstance(value, dict): for key, item in value.items(): if key == "demo_url" and isinstance(item, str): results.add(item) collect_demo_urls(item, results) elif isinstance(value, list): for item in value: collect_demo_urls(item, results) def extract_demo_urls_from_payloads(payloads): results = set() for payload in payloads: collect_demo_urls(payload, results) return list(results) def extract_demo_urls_from_network(path): if not os.path.exists(path): return [] try: with open(path, "r", encoding="utf-8") as f: payload = json.load(f) except Exception: return [] return extract_demo_urls_from_payloads([payload]) def download_file(url, dest_dir): if not url: return "" ensure_dir(dest_dir) filename = os.path.basename(urlparse(url).path) or "demo.zip" dest_path = os.path.join(dest_dir, filename) if os.path.exists(dest_path): return dest_path temp_path = dest_path + ".part" try: with urllib.request.urlopen(url) as response, open(temp_path, "wb") as f: while True: chunk = response.read(1024 * 1024) if not chunk: break f.write(chunk) os.replace(temp_path, dest_path) return dest_path except Exception: try: if os.path.exists(temp_path): os.remove(temp_path) except Exception: pass return "" def download_demo_from_iframe(out_dir, iframe_payloads=None): if iframe_payloads is None: network_path = os.path.join(out_dir, "iframe_network.json") demo_urls = extract_demo_urls_from_network(network_path) else: demo_urls = extract_demo_urls_from_payloads(iframe_payloads) downloaded = [] for url in demo_urls: path = download_file(url, out_dir) if path: downloaded.append(path) return downloaded async def safe_goto(page, url, timeout_ms, retries): attempt = 0 while True: try: await page.goto(url, wait_until="domcontentloaded", timeout=timeout_ms) return True except Exception as exc: attempt += 1 if attempt > retries: log(f"打开失败 {url} {exc}") return False await page.wait_for_timeout(1000) async def intercept_json_responses(page, sink, capture_ms): active = True async def handle_response(response): try: if not active: return headers = response.headers content_type = headers.get("content-type", "") if "application/json" in content_type or "json" in content_type: body = await response.json() sink.append( { "url": response.url, "status": response.status, "body": body, } ) except Exception: return page.on("response", handle_response) await page.wait_for_timeout(capture_ms) active = False async def open_iframe_page( context, iframe_url, out_dir, timeout_ms, capture_ms, goto_retries, write_iframe_network ): iframe_page = await context.new_page() json_sink = [] response_task = asyncio.create_task(intercept_json_responses(iframe_page, json_sink, capture_ms)) ok = await safe_goto(iframe_page, iframe_url, timeout_ms, goto_retries) if not ok: await response_task await iframe_page.close() return json_sink try: await iframe_page.wait_for_load_state("domcontentloaded", timeout=timeout_ms) except Exception: pass clicked = False try: await iframe_page.wait_for_timeout(1000) try: await iframe_page.wait_for_selector(".ya-tab", timeout=timeout_ms) except Exception: pass tab_names = ["5E Swing Score", "5E 摆动分", "摆动分", "Swing Score", "Swing", "SS"] for name in tab_names: locator = iframe_page.locator(".ya-tab", has_text=name) if await locator.count() > 0: await locator.first.scroll_into_view_if_needed() await locator.first.click(timeout=timeout_ms, force=True) clicked = True break locator = iframe_page.get_by_role("tab", name=name) if await locator.count() > 0: await locator.first.scroll_into_view_if_needed() await locator.first.click(timeout=timeout_ms, force=True) clicked = True break locator = iframe_page.get_by_role("button", name=name) if await locator.count() > 0: await locator.first.scroll_into_view_if_needed() await locator.first.click(timeout=timeout_ms, force=True) clicked = True break locator = iframe_page.get_by_text(name, exact=True) if await locator.count() > 0: await locator.first.scroll_into_view_if_needed() await locator.first.click(timeout=timeout_ms, force=True) clicked = True break locator = iframe_page.get_by_text(name, exact=False) if await locator.count() > 0: await locator.first.scroll_into_view_if_needed() await locator.first.click(timeout=timeout_ms, force=True) clicked = True break if not clicked: clicked = await iframe_page.evaluate( """() => { const labels = ["5E Swing Score", "5E 摆动分", "摆动分", "Swing Score", "Swing", "SS"]; const roots = [document]; const elements = []; while (roots.length) { const root = roots.pop(); const tree = root.querySelectorAll ? Array.from(root.querySelectorAll("*")) : []; for (const el of tree) { elements.push(el); if (el.shadowRoot) roots.push(el.shadowRoot); } } const target = elements.find(el => { const text = (el.textContent || "").trim(); if (!text) return false; if (!labels.some(l => text.includes(l))) return false; const rect = el.getBoundingClientRect(); return rect.width > 0 && rect.height > 0; }); if (target) { target.scrollIntoView({block: "center", inline: "center"}); const rect = target.getBoundingClientRect(); const x = rect.left + rect.width / 2; const y = rect.top + rect.height / 2; const events = ["pointerdown", "mousedown", "pointerup", "mouseup", "click"]; for (const type of events) { target.dispatchEvent(new MouseEvent(type, {bubbles: true, cancelable: true, clientX: x, clientY: y})); } return true; } return false; }""" ) if not clicked: clicked = await iframe_page.evaluate( """() => { const tabs = Array.from(document.querySelectorAll(".ya-tab")); if (tabs.length === 0) return false; const target = tabs.find(tab => { const text = (tab.textContent || "").replace(/\\s+/g, " ").trim(); return text.includes("5E Swing Score") || text.includes("5E 摆动分") || text.includes("摆动分"); }) || tabs[tabs.length - 1]; if (!target) return false; target.scrollIntoView({block: "center", inline: "center"}); const rect = target.getBoundingClientRect(); const x = rect.left + rect.width / 2; const y = rect.top + rect.height / 2; const events = ["pointerdown", "mousedown", "pointerup", "mouseup", "click"]; for (const type of events) { target.dispatchEvent(new MouseEvent(type, {bubbles: true, cancelable: true, clientX: x, clientY: y})); } return true; }""" ) if not clicked: tab_locator = iframe_page.locator(".ya-tab") if await tab_locator.count() > 0: target = tab_locator.nth(await tab_locator.count() - 1) box = await target.bounding_box() if box: await iframe_page.mouse.click(box["x"] + box["width"] / 2, box["y"] + box["height"] / 2) clicked = True except Exception: clicked = False if clicked: await iframe_page.wait_for_timeout(1500) await intercept_json_responses(iframe_page, json_sink, capture_ms) try: await iframe_page.wait_for_load_state("networkidle", timeout=timeout_ms) except Exception: pass await response_task if write_iframe_network: with open(os.path.join(out_dir, "iframe_network.json"), "w", encoding="utf-8") as f: json.dump(json_sink, f, ensure_ascii=False, indent=2) await iframe_page.close() return json_sink async def run_match(pw, args, url, index, total): base_out = os.path.abspath(args.out) ensure_dir(base_out) match_code = extract_match_code(url) base_name = args.match_name.strip() or match_code or "match" if total > 1: suffix = match_code or str(index + 1) if base_name != suffix: name = f"{base_name}-{suffix}" else: name = base_name else: name = base_name out_dir = os.path.join(base_out, safe_folder(name)) ensure_dir(out_dir) headless = truthy(args.headless) timeout_ms = args.timeout_ms capture_ms = args.capture_ms iframe_capture_ms = args.iframe_capture_ms goto_retries = args.goto_retries fetch_type = str(args.fetch_type or "both").lower() want_iframe = fetch_type in {"iframe", "both"} want_demo = fetch_type in {"demo", "both"} browser = await pw.chromium.launch(headless=headless, slow_mo=50) context = await browser.new_context(accept_downloads=True) page = await context.new_page() log(f"打开比赛页 {index + 1}/{total}") ok = await safe_goto(page, url, timeout_ms, goto_retries) if not ok: await browser.close() return try: await page.wait_for_load_state("networkidle", timeout=timeout_ms) except Exception: pass iframe_url = await page.evaluate( """() => { const iframe = document.querySelector('iframe') return iframe ? iframe.getAttribute('src') : null }""" ) iframe_sink = [] if iframe_url and (want_iframe or want_demo): log(f"进入内嵌页面 {iframe_url}") iframe_sink = await open_iframe_page( context, iframe_url, out_dir, timeout_ms, iframe_capture_ms, goto_retries, want_iframe ) if want_demo: downloaded = download_demo_from_iframe(out_dir, iframe_sink if iframe_sink else None) if downloaded: log(f"已下载 demo: {len(downloaded)}") await browser.close() async def run_match_with_semaphore(semaphore, pw, args, url, index, total): async with semaphore: try: await run_match(pw, args, url, index, total) except Exception as exc: log(f"任务失败 {url} {exc}") async def run(): args = build_args().parse_args() try: from playwright.async_api import async_playwright except Exception: print("Playwright 未安装,请先安装: python -m pip install playwright && python -m playwright install") sys.exit(1) urls = read_url_list(args.url_list) if not urls: urls = [args.url] async with async_playwright() as pw: concurrency = max(1, int(args.concurrency or 1)) semaphore = asyncio.Semaphore(concurrency) tasks = [ asyncio.create_task(run_match_with_semaphore(semaphore, pw, args, url, index, len(urls))) for index, url in enumerate(urls) ] if tasks: await asyncio.gather(*tasks) log("完成") def main(): asyncio.run(run()) if __name__ == "__main__": main()