feat: Add recent performance stability stats (matches/days) to player profile

This commit is contained in:
2026-01-28 14:04:32 +08:00
commit 48f1f71d3a
104 changed files with 17572 additions and 0 deletions

416
downloader/downloader.py Normal file
View File

@@ -0,0 +1,416 @@
import argparse
import asyncio
import json
import os
import sys
import time
import urllib.request
from pathlib import Path
from urllib.parse import urlparse
def build_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"--url",
default="https://arena.5eplay.com/data/match/g161-20260118222715609322516",
)
parser.add_argument("--url-list", default="")
parser.add_argument("--out", default="output_arena")
parser.add_argument("--match-name", default="")
parser.add_argument("--headless", default="false")
parser.add_argument("--timeout-ms", type=int, default=30000)
parser.add_argument("--capture-ms", type=int, default=5000)
parser.add_argument("--iframe-capture-ms", type=int, default=8000)
parser.add_argument("--concurrency", type=int, default=3)
parser.add_argument("--goto-retries", type=int, default=1)
parser.add_argument("--fetch-type", default="both", choices=["iframe", "demo", "both"])
return parser
def ensure_dir(path):
Path(path).mkdir(parents=True, exist_ok=True)
def truthy(value):
return str(value).lower() in {"1", "true", "yes", "y", "on"}
def log(message):
stamp = time.strftime("%H:%M:%S")
print(f"[{stamp}] {message}")
def safe_folder(value):
keep = []
for ch in value:
if ch.isalnum() or ch in {"-", "_"}:
keep.append(ch)
return "".join(keep) or "match"
def extract_match_code(url):
for part in url.split("/"):
if part.startswith("g") and "-" in part:
return part
return ""
def read_url_list(path):
if not path:
return []
if not os.path.exists(path):
return []
urls = []
with open(path, "r", encoding="utf-8-sig") as f:
for line in f:
value = line.strip()
if not value or value.startswith("#"):
continue
urls.append(value)
return urls
def collect_demo_urls(value, results):
if isinstance(value, dict):
for key, item in value.items():
if key == "demo_url" and isinstance(item, str):
results.add(item)
collect_demo_urls(item, results)
elif isinstance(value, list):
for item in value:
collect_demo_urls(item, results)
def extract_demo_urls_from_payloads(payloads):
results = set()
for payload in payloads:
collect_demo_urls(payload, results)
return list(results)
def extract_demo_urls_from_network(path):
if not os.path.exists(path):
return []
try:
with open(path, "r", encoding="utf-8") as f:
payload = json.load(f)
except Exception:
return []
return extract_demo_urls_from_payloads([payload])
def download_file(url, dest_dir):
if not url:
return ""
ensure_dir(dest_dir)
filename = os.path.basename(urlparse(url).path) or "demo.zip"
dest_path = os.path.join(dest_dir, filename)
if os.path.exists(dest_path):
return dest_path
temp_path = dest_path + ".part"
try:
with urllib.request.urlopen(url) as response, open(temp_path, "wb") as f:
while True:
chunk = response.read(1024 * 1024)
if not chunk:
break
f.write(chunk)
os.replace(temp_path, dest_path)
return dest_path
except Exception:
try:
if os.path.exists(temp_path):
os.remove(temp_path)
except Exception:
pass
return ""
def download_demo_from_iframe(out_dir, iframe_payloads=None):
if iframe_payloads is None:
network_path = os.path.join(out_dir, "iframe_network.json")
demo_urls = extract_demo_urls_from_network(network_path)
else:
demo_urls = extract_demo_urls_from_payloads(iframe_payloads)
downloaded = []
for url in demo_urls:
path = download_file(url, out_dir)
if path:
downloaded.append(path)
return downloaded
async def safe_goto(page, url, timeout_ms, retries):
attempt = 0
while True:
try:
await page.goto(url, wait_until="domcontentloaded", timeout=timeout_ms)
return True
except Exception as exc:
attempt += 1
if attempt > retries:
log(f"打开失败 {url} {exc}")
return False
await page.wait_for_timeout(1000)
async def intercept_json_responses(page, sink, capture_ms):
active = True
async def handle_response(response):
try:
if not active:
return
headers = response.headers
content_type = headers.get("content-type", "")
if "application/json" in content_type or "json" in content_type:
body = await response.json()
sink.append(
{
"url": response.url,
"status": response.status,
"body": body,
}
)
except Exception:
return
page.on("response", handle_response)
await page.wait_for_timeout(capture_ms)
active = False
async def open_iframe_page(
context, iframe_url, out_dir, timeout_ms, capture_ms, goto_retries, write_iframe_network
):
iframe_page = await context.new_page()
json_sink = []
response_task = asyncio.create_task(intercept_json_responses(iframe_page, json_sink, capture_ms))
ok = await safe_goto(iframe_page, iframe_url, timeout_ms, goto_retries)
if not ok:
await response_task
await iframe_page.close()
return json_sink
try:
await iframe_page.wait_for_load_state("domcontentloaded", timeout=timeout_ms)
except Exception:
pass
clicked = False
try:
await iframe_page.wait_for_timeout(1000)
try:
await iframe_page.wait_for_selector(".ya-tab", timeout=timeout_ms)
except Exception:
pass
tab_names = ["5E Swing Score", "5E 摆动分", "摆动分", "Swing Score", "Swing", "SS"]
for name in tab_names:
locator = iframe_page.locator(".ya-tab", has_text=name)
if await locator.count() > 0:
await locator.first.scroll_into_view_if_needed()
await locator.first.click(timeout=timeout_ms, force=True)
clicked = True
break
locator = iframe_page.get_by_role("tab", name=name)
if await locator.count() > 0:
await locator.first.scroll_into_view_if_needed()
await locator.first.click(timeout=timeout_ms, force=True)
clicked = True
break
locator = iframe_page.get_by_role("button", name=name)
if await locator.count() > 0:
await locator.first.scroll_into_view_if_needed()
await locator.first.click(timeout=timeout_ms, force=True)
clicked = True
break
locator = iframe_page.get_by_text(name, exact=True)
if await locator.count() > 0:
await locator.first.scroll_into_view_if_needed()
await locator.first.click(timeout=timeout_ms, force=True)
clicked = True
break
locator = iframe_page.get_by_text(name, exact=False)
if await locator.count() > 0:
await locator.first.scroll_into_view_if_needed()
await locator.first.click(timeout=timeout_ms, force=True)
clicked = True
break
if not clicked:
clicked = await iframe_page.evaluate(
"""() => {
const labels = ["5E Swing Score", "5E 摆动分", "摆动分", "Swing Score", "Swing", "SS"];
const roots = [document];
const elements = [];
while (roots.length) {
const root = roots.pop();
const tree = root.querySelectorAll ? Array.from(root.querySelectorAll("*")) : [];
for (const el of tree) {
elements.push(el);
if (el.shadowRoot) roots.push(el.shadowRoot);
}
}
const target = elements.find(el => {
const text = (el.textContent || "").trim();
if (!text) return false;
if (!labels.some(l => text.includes(l))) return false;
const rect = el.getBoundingClientRect();
return rect.width > 0 && rect.height > 0;
});
if (target) {
target.scrollIntoView({block: "center", inline: "center"});
const rect = target.getBoundingClientRect();
const x = rect.left + rect.width / 2;
const y = rect.top + rect.height / 2;
const events = ["pointerdown", "mousedown", "pointerup", "mouseup", "click"];
for (const type of events) {
target.dispatchEvent(new MouseEvent(type, {bubbles: true, cancelable: true, clientX: x, clientY: y}));
}
return true;
}
return false;
}"""
)
if not clicked:
clicked = await iframe_page.evaluate(
"""() => {
const tabs = Array.from(document.querySelectorAll(".ya-tab"));
if (tabs.length === 0) return false;
const target = tabs.find(tab => {
const text = (tab.textContent || "").replace(/\\s+/g, " ").trim();
return text.includes("5E Swing Score") || text.includes("5E 摆动分") || text.includes("摆动分");
}) || tabs[tabs.length - 1];
if (!target) return false;
target.scrollIntoView({block: "center", inline: "center"});
const rect = target.getBoundingClientRect();
const x = rect.left + rect.width / 2;
const y = rect.top + rect.height / 2;
const events = ["pointerdown", "mousedown", "pointerup", "mouseup", "click"];
for (const type of events) {
target.dispatchEvent(new MouseEvent(type, {bubbles: true, cancelable: true, clientX: x, clientY: y}));
}
return true;
}"""
)
if not clicked:
tab_locator = iframe_page.locator(".ya-tab")
if await tab_locator.count() > 0:
target = tab_locator.nth(await tab_locator.count() - 1)
box = await target.bounding_box()
if box:
await iframe_page.mouse.click(box["x"] + box["width"] / 2, box["y"] + box["height"] / 2)
clicked = True
except Exception:
clicked = False
if clicked:
await iframe_page.wait_for_timeout(1500)
await intercept_json_responses(iframe_page, json_sink, capture_ms)
try:
await iframe_page.wait_for_load_state("networkidle", timeout=timeout_ms)
except Exception:
pass
await response_task
if write_iframe_network:
with open(os.path.join(out_dir, "iframe_network.json"), "w", encoding="utf-8") as f:
json.dump(json_sink, f, ensure_ascii=False, indent=2)
await iframe_page.close()
return json_sink
async def run_match(pw, args, url, index, total):
base_out = os.path.abspath(args.out)
ensure_dir(base_out)
match_code = extract_match_code(url)
base_name = args.match_name.strip() or match_code or "match"
if total > 1:
suffix = match_code or str(index + 1)
if base_name != suffix:
name = f"{base_name}-{suffix}"
else:
name = base_name
else:
name = base_name
out_dir = os.path.join(base_out, safe_folder(name))
ensure_dir(out_dir)
headless = truthy(args.headless)
timeout_ms = args.timeout_ms
capture_ms = args.capture_ms
iframe_capture_ms = args.iframe_capture_ms
goto_retries = args.goto_retries
fetch_type = str(args.fetch_type or "both").lower()
want_iframe = fetch_type in {"iframe", "both"}
want_demo = fetch_type in {"demo", "both"}
browser = await pw.chromium.launch(headless=headless, slow_mo=50)
context = await browser.new_context(accept_downloads=True)
page = await context.new_page()
log(f"打开比赛页 {index + 1}/{total}")
ok = await safe_goto(page, url, timeout_ms, goto_retries)
if not ok:
await browser.close()
return
try:
await page.wait_for_load_state("networkidle", timeout=timeout_ms)
except Exception:
pass
iframe_url = await page.evaluate(
"""() => {
const iframe = document.querySelector('iframe')
return iframe ? iframe.getAttribute('src') : null
}"""
)
iframe_sink = []
if iframe_url and (want_iframe or want_demo):
log(f"进入内嵌页面 {iframe_url}")
iframe_sink = await open_iframe_page(
context, iframe_url, out_dir, timeout_ms, iframe_capture_ms, goto_retries, want_iframe
)
if want_demo:
downloaded = download_demo_from_iframe(out_dir, iframe_sink if iframe_sink else None)
if downloaded:
log(f"已下载 demo: {len(downloaded)}")
await browser.close()
async def run_match_with_semaphore(semaphore, pw, args, url, index, total):
async with semaphore:
try:
await run_match(pw, args, url, index, total)
except Exception as exc:
log(f"任务失败 {url} {exc}")
async def run():
args = build_args().parse_args()
try:
from playwright.async_api import async_playwright
except Exception:
print("Playwright 未安装,请先安装: python -m pip install playwright && python -m playwright install")
sys.exit(1)
urls = read_url_list(args.url_list)
if not urls:
urls = [args.url]
async with async_playwright() as pw:
concurrency = max(1, int(args.concurrency or 1))
semaphore = asyncio.Semaphore(concurrency)
tasks = [
asyncio.create_task(run_match_with_semaphore(semaphore, pw, args, url, index, len(urls)))
for index, url in enumerate(urls)
]
if tasks:
await asyncio.gather(*tasks)
log("完成")
def main():
asyncio.run(run())
if __name__ == "__main__":
main()