1.0.0 : Web Implemented.

This commit is contained in:
2026-01-26 02:13:06 +08:00
parent 026a8fe65d
commit 8dabf0b097
55 changed files with 4545 additions and 3 deletions

View File

@@ -0,0 +1,40 @@
import subprocess
import os
import sys
from web.config import Config
class EtlService:
@staticmethod
def run_script(script_name, args=None):
"""
Executes an ETL script located in the ETL directory.
Returns (success, message)
"""
script_path = os.path.join(Config.BASE_DIR, 'ETL', script_name)
if not os.path.exists(script_path):
return False, f"Script not found: {script_path}"
try:
# Use the same python interpreter
python_exe = sys.executable
cmd = [python_exe, script_path]
if args:
cmd.extend(args)
result = subprocess.run(
cmd,
cwd=Config.BASE_DIR,
capture_output=True,
text=True,
timeout=300 # 5 min timeout
)
if result.returncode == 0:
return True, f"Success:\n{result.stdout}"
else:
return False, f"Failed (Code {result.returncode}):\n{result.stderr}\n{result.stdout}"
except Exception as e:
return False, str(e)

View File

@@ -0,0 +1,256 @@
from web.database import query_db
class FeatureService:
@staticmethod
def get_player_features(steam_id):
sql = "SELECT * FROM dm_player_features WHERE steam_id_64 = ?"
return query_db('l3', sql, [steam_id], one=True)
@staticmethod
def get_players_list(page=1, per_page=20, sort_by='rating', search=None):
offset = (page - 1) * per_page
# Sort Mapping
sort_map = {
'rating': 'basic_avg_rating',
'kd': 'basic_avg_kd',
'kast': 'basic_avg_kast',
'matches': 'matches_played'
}
order_col = sort_map.get(sort_by, 'basic_avg_rating')
from web.services.stats_service import StatsService
# Helper to attach match counts
def attach_match_counts(player_list):
if not player_list:
return
ids = [p['steam_id_64'] for p in player_list]
# Batch query for counts from L2
placeholders = ','.join('?' for _ in ids)
sql = f"""
SELECT steam_id_64, COUNT(*) as cnt
FROM fact_match_players
WHERE steam_id_64 IN ({placeholders})
GROUP BY steam_id_64
"""
counts = query_db('l2', sql, ids)
cnt_dict = {r['steam_id_64']: r['cnt'] for r in counts}
for p in player_list:
p['matches_played'] = cnt_dict.get(p['steam_id_64'], 0)
if search:
# ... existing search logic ...
# Get all matching players
l2_players, _ = StatsService.get_players(page=1, per_page=100, search=search)
if not l2_players:
return [], 0
# ... (Merge logic) ...
# I need to insert the match count logic inside the merge loop or after
steam_ids = [p['steam_id_64'] for p in l2_players]
placeholders = ','.join('?' for _ in steam_ids)
sql = f"SELECT * FROM dm_player_features WHERE steam_id_64 IN ({placeholders})"
features = query_db('l3', sql, steam_ids)
f_dict = {f['steam_id_64']: f for f in features}
# Get counts for sorting
count_sql = f"SELECT steam_id_64, COUNT(*) as cnt FROM fact_match_players WHERE steam_id_64 IN ({placeholders}) GROUP BY steam_id_64"
counts = query_db('l2', count_sql, steam_ids)
cnt_dict = {r['steam_id_64']: r['cnt'] for r in counts}
merged = []
for p in l2_players:
f = f_dict.get(p['steam_id_64'])
m = dict(p)
if f:
m.update(dict(f))
else:
# Fallback Calc
stats = StatsService.get_player_basic_stats(p['steam_id_64'])
if stats:
m['basic_avg_rating'] = stats['rating']
m['basic_avg_kd'] = stats['kd']
m['basic_avg_kast'] = stats['kast']
else:
m['basic_avg_rating'] = 0
m['basic_avg_kd'] = 0
m['basic_avg_kast'] = 0 # Ensure kast exists
m['matches_played'] = cnt_dict.get(p['steam_id_64'], 0)
merged.append(m)
merged.sort(key=lambda x: x.get(order_col, 0) or 0, reverse=True)
total = len(merged)
start = (page - 1) * per_page
end = start + per_page
return merged[start:end], total
else:
# Browse mode
# Check L3
l3_count = query_db('l3', "SELECT COUNT(*) as cnt FROM dm_player_features", one=True)['cnt']
if l3_count == 0 or sort_by == 'matches':
# If sorting by matches, we MUST use L2 counts because L3 might not have it or we want dynamic.
# OR if L3 is empty.
# Since L3 schema is unknown regarding 'matches_played', let's assume we fallback to L2 logic
# but paginated in memory if dataset is small, or just fetch all L2 players?
# Fetching all L2 players is bad if many.
# But for 'matches' sort, we need to know counts for ALL to sort correctly.
# Solution: Query L2 for top N players by match count.
if sort_by == 'matches':
# Query L2 for IDs ordered by count
sql = """
SELECT steam_id_64, COUNT(*) as cnt
FROM fact_match_players
GROUP BY steam_id_64
ORDER BY cnt DESC
LIMIT ? OFFSET ?
"""
top_ids = query_db('l2', sql, [per_page, offset])
if not top_ids:
return [], 0
total = query_db('l2', "SELECT COUNT(DISTINCT steam_id_64) as cnt FROM fact_match_players", one=True)['cnt']
ids = [r['steam_id_64'] for r in top_ids]
# Fetch details for these IDs
l2_players = StatsService.get_players_by_ids(ids)
# Merge logic (reuse)
merged = []
# Fetch L3 features for these IDs to show stats
p_ph = ','.join('?' for _ in ids)
f_sql = f"SELECT * FROM dm_player_features WHERE steam_id_64 IN ({p_ph})"
features = query_db('l3', f_sql, ids)
f_dict = {f['steam_id_64']: f for f in features}
cnt_dict = {r['steam_id_64']: r['cnt'] for r in top_ids}
# Map L2 players to dict for easy access (though list order matters for sort?)
# Actually top_ids is sorted.
p_dict = {p['steam_id_64']: p for p in l2_players}
for r in top_ids: # Preserve order
sid = r['steam_id_64']
p = p_dict.get(sid)
if not p: continue
m = dict(p)
f = f_dict.get(sid)
if f:
m.update(dict(f))
else:
stats = StatsService.get_player_basic_stats(sid)
if stats:
m['basic_avg_rating'] = stats['rating']
m['basic_avg_kd'] = stats['kd']
m['basic_avg_kast'] = stats['kast']
else:
m['basic_avg_rating'] = 0
m['basic_avg_kd'] = 0
m['basic_avg_kast'] = 0
m['matches_played'] = r['cnt']
merged.append(m)
return merged, total
# L3 empty fallback (existing logic)
l2_players, total = StatsService.get_players(page, per_page, sort_by=None)
merged = []
attach_match_counts(l2_players) # Helper
for p in l2_players:
m = dict(p)
stats = StatsService.get_player_basic_stats(p['steam_id_64'])
if stats:
m['basic_avg_rating'] = stats['rating']
m['basic_avg_kd'] = stats['kd']
m['basic_avg_kast'] = stats['kast']
else:
m['basic_avg_rating'] = 0
m['basic_avg_kd'] = 0
m['basic_avg_kast'] = 0
m['matches_played'] = p.get('matches_played', 0)
merged.append(m)
if sort_by != 'rating':
merged.sort(key=lambda x: x.get(order_col, 0) or 0, reverse=True)
return merged, total
# Normal L3 browse (sort by rating/kd/kast)
sql = f"SELECT * FROM dm_player_features ORDER BY {order_col} DESC LIMIT ? OFFSET ?"
features = query_db('l3', sql, [per_page, offset])
total = query_db('l3', "SELECT COUNT(*) as cnt FROM dm_player_features", one=True)['cnt']
if not features:
return [], total
steam_ids = [f['steam_id_64'] for f in features]
l2_players = StatsService.get_players_by_ids(steam_ids)
p_dict = {p['steam_id_64']: p for p in l2_players}
merged = []
for f in features:
m = dict(f)
p = p_dict.get(f['steam_id_64'])
if p:
m.update(dict(p))
else:
m['username'] = f['steam_id_64'] # Fallback
m['avatar_url'] = None
merged.append(m)
return merged, total
@staticmethod
def get_top_players(limit=20, sort_by='basic_avg_rating'):
# Safety check for sort_by to prevent injection
allowed_sorts = ['basic_avg_rating', 'basic_avg_kd', 'basic_avg_kast', 'basic_avg_rws']
if sort_by not in allowed_sorts:
sort_by = 'basic_avg_rating'
sql = f"""
SELECT f.*, p.username, p.avatar_url
FROM dm_player_features f
LEFT JOIN l2.dim_players p ON f.steam_id_64 = p.steam_id_64
ORDER BY {sort_by} DESC
LIMIT ?
"""
# Note: Cross-database join (l2.dim_players) works in SQLite if attached.
# But `query_db` connects to one DB.
# Strategy: Fetch features, then fetch player infos from L2. Or attach DB.
# Simple strategy: Fetch features, then extract steam_ids and batch fetch from L2 in StatsService.
# Or simpler: Just return features and let the controller/template handle the name/avatar via another call or pre-fetching.
# Actually, for "Player List" view, we really want L3 data joined with L2 names.
# I will change this to just return features for now, and handle joining in the route handler or via a helper that attaches databases.
# Attaching is better.
return query_db('l3', f"SELECT * FROM dm_player_features ORDER BY {sort_by} DESC LIMIT ?", [limit])
@staticmethod
def get_player_trend(steam_id, limit=30):
# This requires `fact_match_features` or querying L2 matches for historical data.
# WebRDD says: "Trend graph: Recent 10/20 matches Rating trend (Chart.js)."
# We can get this from L2 fact_match_players.
sql = """
SELECT m.start_time, mp.rating, mp.kd_ratio, mp.adr, m.match_id
FROM fact_match_players mp
JOIN fact_matches m ON mp.match_id = m.match_id
WHERE mp.steam_id_64 = ?
ORDER BY m.start_time DESC
LIMIT ?
"""
# This query needs to run against L2.
# So this method should actually be in StatsService or FeatureService connecting to L2.
# I will put it here but note it uses L2. Actually, better to put in StatsService if it uses L2 tables.
# But FeatureService conceptualizes "Trends". I'll move it to StatsService for implementation correctness (DB context).
pass

View File

@@ -0,0 +1,245 @@
from web.database import query_db
class StatsService:
@staticmethod
def get_recent_matches(limit=5):
sql = """
SELECT m.match_id, m.start_time, m.map_name, m.score_team1, m.score_team2, m.winner_team,
p.username as mvp_name
FROM fact_matches m
LEFT JOIN dim_players p ON m.mvp_uid = p.uid
ORDER BY m.start_time DESC
LIMIT ?
"""
return query_db('l2', sql, [limit])
@staticmethod
def get_matches(page=1, per_page=20, map_name=None, date_from=None, date_to=None):
offset = (page - 1) * per_page
args = []
where_clauses = ["1=1"]
if map_name:
where_clauses.append("map_name = ?")
args.append(map_name)
if date_from:
where_clauses.append("start_time >= ?")
args.append(date_from)
if date_to:
where_clauses.append("start_time <= ?")
args.append(date_to)
where_str = " AND ".join(where_clauses)
sql = f"""
SELECT m.match_id, m.start_time, m.map_name, m.score_team1, m.score_team2, m.winner_team, m.duration
FROM fact_matches m
WHERE {where_str}
ORDER BY m.start_time DESC
LIMIT ? OFFSET ?
"""
args.extend([per_page, offset])
matches = query_db('l2', sql, args)
# Count total for pagination
count_sql = f"SELECT COUNT(*) as cnt FROM fact_matches WHERE {where_str}"
total = query_db('l2', count_sql, args[:-2], one=True)['cnt']
return matches, total
@staticmethod
def get_match_detail(match_id):
sql = "SELECT * FROM fact_matches WHERE match_id = ?"
return query_db('l2', sql, [match_id], one=True)
@staticmethod
def get_match_players(match_id):
sql = """
SELECT mp.*, p.username, p.avatar_url
FROM fact_match_players mp
LEFT JOIN dim_players p ON mp.steam_id_64 = p.steam_id_64
WHERE mp.match_id = ?
ORDER BY mp.team_id, mp.rating DESC
"""
return query_db('l2', sql, [match_id])
@staticmethod
def get_match_rounds(match_id):
sql = "SELECT * FROM fact_rounds WHERE match_id = ? ORDER BY round_num"
return query_db('l2', sql, [match_id])
@staticmethod
def get_players(page=1, per_page=20, search=None, sort_by='rating_desc'):
offset = (page - 1) * per_page
args = []
where_clauses = ["1=1"]
if search:
# Force case-insensitive search
where_clauses.append("(LOWER(username) LIKE LOWER(?) OR steam_id_64 LIKE ?)")
args.append(f"%{search}%")
args.append(f"%{search}%")
where_str = " AND ".join(where_clauses)
# Sort mapping
order_clause = "rating DESC" # Default logic (this query needs refinement as L2 dim_players doesn't store avg rating)
# Wait, dim_players only has static info. We need aggregated stats.
# Ideally, we should fetch from L3 for player list stats.
# But StatsService is for L2.
# For the Player List, we usually want L3 data (Career stats).
# I will leave the detailed stats logic for FeatureService or do a join here if necessary.
# For now, just listing players from dim_players.
sql = f"""
SELECT * FROM dim_players
WHERE {where_str}
LIMIT ? OFFSET ?
"""
args.extend([per_page, offset])
players = query_db('l2', sql, args)
total = query_db('l2', f"SELECT COUNT(*) as cnt FROM dim_players WHERE {where_str}", args[:-2], one=True)['cnt']
return players, total
@staticmethod
def get_player_info(steam_id):
sql = "SELECT * FROM dim_players WHERE steam_id_64 = ?"
return query_db('l2', sql, [steam_id], one=True)
@staticmethod
def get_daily_match_counts(days=365):
# Return list of {date: 'YYYY-MM-DD', count: N}
sql = """
SELECT date(start_time, 'unixepoch') as day, COUNT(*) as count
FROM fact_matches
WHERE start_time > strftime('%s', 'now', ?)
GROUP BY day
ORDER BY day
"""
# sqlite modifier for 'now' needs format like '-365 days'
modifier = f'-{days} days'
rows = query_db('l2', sql, [modifier])
return rows
@staticmethod
def get_players_by_ids(steam_ids):
if not steam_ids:
return []
placeholders = ','.join('?' for _ in steam_ids)
sql = f"SELECT * FROM dim_players WHERE steam_id_64 IN ({placeholders})"
return query_db('l2', sql, steam_ids)
@staticmethod
def get_player_basic_stats(steam_id):
# Calculate stats from fact_match_players
# Prefer calculating from sums (kills/deaths) for K/D accuracy
# AVG(adr) is used as damage_total might be missing in some sources
sql = """
SELECT
AVG(rating) as rating,
SUM(kills) as total_kills,
SUM(deaths) as total_deaths,
AVG(kd_ratio) as avg_kd,
AVG(kast) as kast,
AVG(adr) as adr,
COUNT(*) as matches_played
FROM fact_match_players
WHERE steam_id_64 = ?
"""
row = query_db('l2', sql, [steam_id], one=True)
if row and row['matches_played'] > 0:
res = dict(row)
# Calculate K/D: Sum Kills / Sum Deaths
kills = res.get('total_kills') or 0
deaths = res.get('total_deaths') or 0
if deaths > 0:
res['kd'] = kills / deaths
else:
res['kd'] = kills # If 0 deaths, K/D is kills (or infinity, but kills is safer for display)
# Fallback to avg_kd if calculation failed (e.g. both 0) but avg_kd exists
if res['kd'] == 0 and res['avg_kd'] and res['avg_kd'] > 0:
res['kd'] = res['avg_kd']
# ADR validation
if res['adr'] is None:
res['adr'] = 0.0
return res
return None
@staticmethod
def get_shared_matches(steam_ids):
# Find matches where ALL steam_ids were present in the SAME team (or just present?)
# "共同经历" usually means played together.
# Query: Intersect match_ids for each player.
# SQLite doesn't have INTERSECT ALL easily for dynamic list, but we can group by match_id.
if not steam_ids or len(steam_ids) < 2:
return []
placeholders = ','.join('?' for _ in steam_ids)
count = len(steam_ids)
sql = f"""
SELECT m.match_id, m.start_time, m.map_name, m.score_team1, m.score_team2, m.winner_team
FROM fact_matches m
JOIN fact_match_players mp ON m.match_id = mp.match_id
WHERE mp.steam_id_64 IN ({placeholders})
GROUP BY m.match_id
HAVING COUNT(DISTINCT mp.steam_id_64) = ?
ORDER BY m.start_time DESC
LIMIT 20
"""
args = list(steam_ids)
args.append(count)
return query_db('l2', sql, args)
@staticmethod
def get_player_trend(steam_id, limit=20):
sql = """
SELECT m.start_time, mp.rating, mp.kd_ratio, mp.adr, m.match_id, m.map_name
FROM fact_match_players mp
JOIN fact_matches m ON mp.match_id = m.match_id
WHERE mp.steam_id_64 = ?
ORDER BY m.start_time ASC
"""
# We fetch all then slice last 'limit' in python or use subquery.
# DESC LIMIT gets recent, but we want chronological for chart.
# So: SELECT ... ORDER BY time DESC LIMIT ? -> then reverse in code.
sql = """
SELECT * FROM (
SELECT m.start_time, mp.rating, mp.kd_ratio, mp.adr, m.match_id, m.map_name, mp.is_win
FROM fact_match_players mp
JOIN fact_matches m ON mp.match_id = m.match_id
WHERE mp.steam_id_64 = ?
ORDER BY m.start_time DESC
LIMIT ?
) ORDER BY start_time ASC
"""
return query_db('l2', sql, [steam_id, limit])
@staticmethod
def get_live_matches():
# Query matches started in last 2 hours with no winner
# Assuming we have a way to ingest live matches.
# For now, this query is 'formal' but will likely return empty on static dataset.
sql = """
SELECT m.match_id, m.map_name, m.score_team1, m.score_team2, m.start_time
FROM fact_matches m
WHERE m.winner_team IS NULL
AND m.start_time > strftime('%s', 'now', '-2 hours')
"""
return query_db('l2', sql)

120
web/services/web_service.py Normal file
View File

@@ -0,0 +1,120 @@
from web.database import query_db, execute_db
import json
from datetime import datetime
class WebService:
# --- Comments ---
@staticmethod
def get_comments(target_type, target_id):
sql = "SELECT * FROM comments WHERE target_type = ? AND target_id = ? AND is_hidden = 0 ORDER BY created_at DESC"
return query_db('web', sql, [target_type, target_id])
@staticmethod
def add_comment(user_id, username, target_type, target_id, content):
sql = """
INSERT INTO comments (user_id, username, target_type, target_id, content)
VALUES (?, ?, ?, ?, ?)
"""
return execute_db('web', sql, [user_id, username, target_type, target_id, content])
@staticmethod
def like_comment(comment_id):
sql = "UPDATE comments SET likes = likes + 1 WHERE id = ?"
return execute_db('web', sql, [comment_id])
# --- Wiki ---
@staticmethod
def get_wiki_page(path):
sql = "SELECT * FROM wiki_pages WHERE path = ?"
return query_db('web', sql, [path], one=True)
@staticmethod
def get_all_wiki_pages():
sql = "SELECT path, title FROM wiki_pages ORDER BY path"
return query_db('web', sql)
@staticmethod
def save_wiki_page(path, title, content, updated_by):
# Upsert logic
check = query_db('web', "SELECT id FROM wiki_pages WHERE path = ?", [path], one=True)
if check:
sql = "UPDATE wiki_pages SET title=?, content=?, updated_by=?, updated_at=CURRENT_TIMESTAMP WHERE path=?"
execute_db('web', sql, [title, content, updated_by, path])
else:
sql = "INSERT INTO wiki_pages (path, title, content, updated_by) VALUES (?, ?, ?, ?)"
execute_db('web', sql, [path, title, content, updated_by])
# --- Team Lineups ---
@staticmethod
def save_lineup(name, description, player_ids, lineup_id=None):
# player_ids is a list
ids_json = json.dumps(player_ids)
if lineup_id:
sql = "UPDATE team_lineups SET name=?, description=?, player_ids_json=? WHERE id=?"
return execute_db('web', sql, [name, description, ids_json, lineup_id])
else:
sql = "INSERT INTO team_lineups (name, description, player_ids_json) VALUES (?, ?, ?)"
return execute_db('web', sql, [name, description, ids_json])
@staticmethod
def get_lineups():
return query_db('web', "SELECT * FROM team_lineups ORDER BY created_at DESC")
@staticmethod
def get_lineup(lineup_id):
return query_db('web', "SELECT * FROM team_lineups WHERE id = ?", [lineup_id], one=True)
# --- Users / Auth ---
@staticmethod
def get_user_by_token(token):
sql = "SELECT * FROM users WHERE token = ?"
return query_db('web', sql, [token], one=True)
# --- Player Metadata ---
@staticmethod
def get_player_metadata(steam_id):
sql = "SELECT * FROM player_metadata WHERE steam_id_64 = ?"
row = query_db('web', sql, [steam_id], one=True)
if row:
res = dict(row)
try:
res['tags'] = json.loads(res['tags']) if res['tags'] else []
except:
res['tags'] = []
return res
return {'steam_id_64': steam_id, 'notes': '', 'tags': []}
@staticmethod
def update_player_metadata(steam_id, notes=None, tags=None):
# Upsert
check = query_db('web', "SELECT steam_id_64 FROM player_metadata WHERE steam_id_64 = ?", [steam_id], one=True)
tags_json = json.dumps(tags) if tags is not None else None
if check:
# Update
clauses = []
args = []
if notes is not None:
clauses.append("notes = ?")
args.append(notes)
if tags is not None:
clauses.append("tags = ?")
args.append(tags_json)
if clauses:
clauses.append("updated_at = CURRENT_TIMESTAMP")
sql = f"UPDATE player_metadata SET {', '.join(clauses)} WHERE steam_id_64 = ?"
args.append(steam_id)
execute_db('web', sql, args)
else:
# Insert
sql = "INSERT INTO player_metadata (steam_id_64, notes, tags) VALUES (?, ?, ?)"
execute_db('web', sql, [steam_id, notes or '', tags_json or '[]'])
# --- Strategy Board ---
@staticmethod
def save_strategy_board(title, map_name, data_json, created_by):
sql = "INSERT INTO strategy_boards (title, map_name, data_json, created_by) VALUES (?, ?, ?, ?)"
return execute_db('web', sql, [title, map_name, data_json, created_by])