3.0.0 : Reconstructed Database System.

This commit is contained in:
2026-01-29 02:21:44 +08:00
parent 1642adb00e
commit 04ee957af6
69 changed files with 10258 additions and 6546 deletions

View File

@@ -0,0 +1,63 @@
import sqlite3
from pathlib import Path
def _connect(db_path: Path) -> sqlite3.Connection:
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
return conn
def _list_tables(conn: sqlite3.Connection) -> list[str]:
cur = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' ORDER BY name"
)
return [r["name"] for r in cur.fetchall()]
def _table_columns(conn: sqlite3.Connection, table: str) -> list[tuple[int, str, str, int, str, int]]:
cur = conn.execute(f"PRAGMA table_info({table})")
rows = cur.fetchall()
return [(r[0], r[1], r[2], r[3], r[4], r[5]) for r in rows]
def inspect(db_path: Path, tables: list[str] | None = None) -> None:
print(f"\n=== {db_path} ===")
if not db_path.exists():
print("NOT FOUND")
return
conn = _connect(db_path)
try:
all_tables = _list_tables(conn)
print(f"tables={len(all_tables)}")
if tables is None:
tables = all_tables
for t in tables:
if t not in all_tables:
print(f"\n-- {t} (missing)")
continue
cols = _table_columns(conn, t)
print(f"\n-- {t} cols={len(cols)}")
for cid, name, ctype, notnull, dflt, pk in cols:
print(f"{cid:>3} {name:<40} {ctype:<12} notnull={notnull} pk={pk} dflt={dflt}")
finally:
conn.close()
if __name__ == "__main__":
base_dir = Path(__file__).resolve().parents[1]
l2 = base_dir / "database" / "L2" / "L2.db"
l3 = base_dir / "database" / "L3" / "L3.db"
web = base_dir / "database" / "Web" / "Web_App.sqlite"
inspect(
l3,
tables=[
"dm_player_features",
"dm_player_match_history",
"dm_player_map_stats",
"dm_player_weapon_stats",
],
)
inspect(web)
inspect(l2, tables=["dim_players", "fact_matches", "fact_match_players", "fact_match_rounds"])

66
tools/smoke_test_teams.py Normal file
View File

@@ -0,0 +1,66 @@
import requests
import sys
BASE_URL = "http://127.0.0.1:5000"
def test_route(route, description):
print(f"Testing {description} ({route})...", end=" ")
try:
response = requests.get(f"{BASE_URL}{route}")
if response.status_code == 200:
print("OK")
return True
else:
print(f"FAILED (Status: {response.status_code})")
# Print first 500 chars of response if error
print(response.text[:500])
return False
except requests.exceptions.ConnectionError:
print("FAILED (Connection Error - Is server running?)")
return False
except Exception as e:
print(f"FAILED ({e})")
return False
def main():
print("--- Smoke Test: Team Routes ---")
# 1. Clubhouse
if not test_route("/teams/", "Clubhouse Page"):
sys.exit(1)
# 2. Roster API
print("Testing Roster API...", end=" ")
try:
response = requests.get(f"{BASE_URL}/teams/api/roster")
if response.status_code == 200:
data = response.json()
if data.get('status') == 'success':
print(f"OK (Team: {data.get('team', {}).get('name')})")
# Check if roster has stats
roster = data.get('roster', [])
if roster:
p = roster[0]
# Check for L3 keys
if 'stats' in p and 'core_avg_rating' in p['stats']:
print(f" - Verified L3 Stats Key 'core_avg_rating' present: {p['stats']['core_avg_rating']}")
else:
print(f" - WARNING: L3 Stats Key 'core_avg_rating' MISSING in {p.get('stats', {}).keys()}")
else:
print(" - Roster is empty (Warning only)")
# Get Lineup ID for Detail Page Test
lineup_id = data.get('team', {}).get('id')
if lineup_id:
test_route(f"/teams/{lineup_id}", f"Team Detail Page (ID: {lineup_id})")
else:
print("FAILED (API returned error status)")
else:
print(f"FAILED (Status: {response.status_code})")
except Exception as e:
print(f"FAILED ({e})")
sys.exit(1)
if __name__ == "__main__":
main()

50
tools/smoke_test_web.py Normal file
View File

@@ -0,0 +1,50 @@
import json
import sqlite3
from pathlib import Path
from urllib.request import urlopen, Request
def _get_first_steam_id(base_dir: Path) -> str:
conn = sqlite3.connect(str(base_dir / "database" / "L2" / "L2.db"))
try:
cur = conn.execute("SELECT steam_id_64 FROM dim_players WHERE steam_id_64 IS NOT NULL LIMIT 1")
row = cur.fetchone()
return str(row[0]) if row else ""
finally:
conn.close()
def _get(url: str) -> tuple[int, str]:
req = Request(url, headers={"User-Agent": "yrtv-smoke"})
with urlopen(req, timeout=10) as resp:
status = getattr(resp, "status", 200)
body = resp.read().decode("utf-8", errors="replace")
return status, body
if __name__ == "__main__":
base_dir = Path(__file__).resolve().parents[1]
steam_id = _get_first_steam_id(base_dir)
if not steam_id:
raise SystemExit("no steam_id in L2.dim_players")
urls = [
"http://127.0.0.1:5000/",
"http://127.0.0.1:5000/players/",
f"http://127.0.0.1:5000/players/{steam_id}",
f"http://127.0.0.1:5000/players/{steam_id}/charts_data",
"http://127.0.0.1:5000/matches/",
"http://127.0.0.1:5000/teams/",
"http://127.0.0.1:5000/teams/api/roster",
"http://127.0.0.1:5000/tactics/",
"http://127.0.0.1:5000/opponents/",
"http://127.0.0.1:5000/wiki/",
]
for u in urls:
status, body = _get(u)
print(f"{status} {u} len={len(body)}")
if u.endswith("/charts_data"):
obj = json.loads(body)
for k in ["trend", "radar", "radar_dist"]:
print(f" {k}: {'ok' if k in obj else 'missing'}")