Files
yrtv/web/services/stats_service.py

1006 lines
40 KiB
Python
Raw Permalink Normal View History

2026-01-27 19:23:05 +08:00
from web.database import query_db, execute_db
from flask import current_app, url_for
import os
2026-01-26 02:13:06 +08:00
class StatsService:
2026-01-26 18:36:47 +08:00
@staticmethod
2026-01-27 19:23:05 +08:00
def resolve_avatar_url(steam_id, avatar_url):
2026-01-27 21:26:07 +08:00
"""
Resolves avatar URL with priority:
1. Local File (web/static/avatars/{steam_id}.jpg/png) - User override
2. DB Value (avatar_url)
"""
2026-01-27 19:23:05 +08:00
try:
2026-01-27 21:26:07 +08:00
# Check local file first (User Request: "directly associate if exists")
2026-01-27 19:23:05 +08:00
base = os.path.join(current_app.root_path, 'static', 'avatars')
2026-01-27 21:26:07 +08:00
for ext in ('.jpg', '.png', '.jpeg'):
2026-01-27 19:23:05 +08:00
fname = f"{steam_id}{ext}"
2026-01-27 21:26:07 +08:00
fpath = os.path.join(base, fname)
if os.path.exists(fpath):
return url_for('static', filename=f'avatars/{fname}')
# Fallback to DB value if valid
if avatar_url and str(avatar_url).strip():
return avatar_url
2026-01-27 19:23:05 +08:00
return None
except Exception:
return avatar_url
@staticmethod
2026-01-26 18:36:47 +08:00
def get_team_stats_summary():
"""
Calculates aggregate statistics for matches where at least 2 roster members played together.
Returns:
{
'map_stats': [{'map_name', 'count', 'wins', 'win_rate'}],
'elo_stats': [{'range', 'count', 'wins', 'win_rate'}],
'duration_stats': [{'range', 'count', 'wins', 'win_rate'}],
'round_stats': [{'type', 'count', 'wins', 'win_rate'}]
}
"""
# 1. Get Active Roster
from web.services.web_service import WebService
import json
lineups = WebService.get_lineups()
active_roster_ids = []
if lineups:
try:
raw_ids = json.loads(lineups[0]['player_ids_json'])
active_roster_ids = [str(uid) for uid in raw_ids]
except:
pass
if not active_roster_ids:
return {}
# 2. Find matches with >= 2 roster members
# We need match_id, map_name, scores, winner_team, duration, avg_elo
# And we need to determine if "Our Team" won.
placeholders = ','.join('?' for _ in active_roster_ids)
# Step A: Get Candidate Match IDs (matches with >= 2 roster players)
# Also get the team_id of our players in that match to determine win
candidate_sql = f"""
SELECT mp.match_id, MAX(mp.team_id) as our_team_id
FROM fact_match_players mp
WHERE CAST(mp.steam_id_64 AS TEXT) IN ({placeholders})
GROUP BY mp.match_id
HAVING COUNT(DISTINCT mp.steam_id_64) >= 2
"""
candidate_rows = query_db('l2', candidate_sql, active_roster_ids)
if not candidate_rows:
return {}
candidate_map = {row['match_id']: row['our_team_id'] for row in candidate_rows}
match_ids = list(candidate_map.keys())
match_placeholders = ','.join('?' for _ in match_ids)
# Step B: Get Match Details
match_sql = f"""
SELECT m.match_id, m.map_name, m.score_team1, m.score_team2, m.winner_team, m.duration,
AVG(fmt.group_origin_elo) as avg_elo
FROM fact_matches m
LEFT JOIN fact_match_teams fmt ON m.match_id = fmt.match_id AND fmt.group_origin_elo > 0
WHERE m.match_id IN ({match_placeholders})
GROUP BY m.match_id
"""
match_rows = query_db('l2', match_sql, match_ids)
# 3. Process Data
# Buckets initialization
map_stats = {}
elo_ranges = ['<1000', '1000-1200', '1200-1400', '1400-1600', '1600-1800', '1800-2000', '2000+']
elo_stats = {r: {'wins': 0, 'total': 0} for r in elo_ranges}
dur_ranges = ['<30m', '30-45m', '45m+']
dur_stats = {r: {'wins': 0, 'total': 0} for r in dur_ranges}
round_types = ['Stomp (<15)', 'Normal', 'Close (>23)', 'Choke (24)']
round_stats = {r: {'wins': 0, 'total': 0} for r in round_types}
for m in match_rows:
mid = m['match_id']
# Determine Win
# Use candidate_map to get our_team_id.
# Note: winner_team is usually int (1 or 2) or string.
# our_team_id from fact_match_players is usually int (1 or 2).
# This logic assumes simple team ID matching.
# If sophisticated "UID in Winning Group" logic is needed, we'd need more queries.
# For aggregate stats, let's assume team_id matching is sufficient for 99% cases or fallback to simple check.
# Actually, let's try to be consistent with get_matches logic if possible,
# but getting group_uids for ALL matches is heavy.
# Let's trust team_id for this summary.
our_tid = candidate_map[mid]
winner_tid = m['winner_team']
# Type normalization
try:
is_win = (int(our_tid) == int(winner_tid)) if (our_tid and winner_tid) else False
except:
is_win = (str(our_tid) == str(winner_tid)) if (our_tid and winner_tid) else False
# 1. Map Stats
map_name = m['map_name'] or 'Unknown'
if map_name not in map_stats:
map_stats[map_name] = {'wins': 0, 'total': 0}
map_stats[map_name]['total'] += 1
if is_win: map_stats[map_name]['wins'] += 1
# 2. ELO Stats
elo = m['avg_elo']
if elo:
if elo < 1000: e_key = '<1000'
elif elo < 1200: e_key = '1000-1200'
elif elo < 1400: e_key = '1200-1400'
elif elo < 1600: e_key = '1400-1600'
elif elo < 1800: e_key = '1600-1800'
elif elo < 2000: e_key = '1800-2000'
else: e_key = '2000+'
elo_stats[e_key]['total'] += 1
if is_win: elo_stats[e_key]['wins'] += 1
# 3. Duration Stats
dur = m['duration'] # seconds
if dur:
dur_min = dur / 60
if dur_min < 30: d_key = '<30m'
elif dur_min < 45: d_key = '30-45m'
else: d_key = '45m+'
dur_stats[d_key]['total'] += 1
if is_win: dur_stats[d_key]['wins'] += 1
# 4. Round Stats
s1 = m['score_team1'] or 0
s2 = m['score_team2'] or 0
total_rounds = s1 + s2
if total_rounds == 24:
r_key = 'Choke (24)'
round_stats[r_key]['total'] += 1
if is_win: round_stats[r_key]['wins'] += 1
# Note: Close (>23) overlaps with Choke (24).
# User requirement: Close > 23 counts ALL matches > 23, regardless of other categories.
if total_rounds > 23:
r_key = 'Close (>23)'
round_stats[r_key]['total'] += 1
if is_win: round_stats[r_key]['wins'] += 1
if total_rounds < 15:
r_key = 'Stomp (<15)'
round_stats[r_key]['total'] += 1
if is_win: round_stats[r_key]['wins'] += 1
elif total_rounds <= 23: # Only Normal if NOT Stomp and NOT Close (<= 23 and >= 15)
r_key = 'Normal'
round_stats[r_key]['total'] += 1
if is_win: round_stats[r_key]['wins'] += 1
# 4. Format Results
def fmt(stats_dict):
res = []
for k, v in stats_dict.items():
rate = (v['wins'] / v['total'] * 100) if v['total'] > 0 else 0
res.append({'label': k, 'count': v['total'], 'wins': v['wins'], 'win_rate': rate})
return res
# For maps, sort by count
map_res = fmt(map_stats)
map_res.sort(key=lambda x: x['count'], reverse=True)
return {
'map_stats': map_res,
'elo_stats': fmt(elo_stats), # Keep order
'duration_stats': fmt(dur_stats), # Keep order
'round_stats': fmt(round_stats) # Keep order
}
2026-01-26 02:13:06 +08:00
@staticmethod
def get_recent_matches(limit=5):
sql = """
SELECT m.match_id, m.start_time, m.map_name, m.score_team1, m.score_team2, m.winner_team,
p.username as mvp_name
FROM fact_matches m
LEFT JOIN dim_players p ON m.mvp_uid = p.uid
ORDER BY m.start_time DESC
LIMIT ?
"""
return query_db('l2', sql, [limit])
@staticmethod
def get_matches(page=1, per_page=20, map_name=None, date_from=None, date_to=None):
offset = (page - 1) * per_page
args = []
where_clauses = ["1=1"]
if map_name:
where_clauses.append("map_name = ?")
args.append(map_name)
if date_from:
where_clauses.append("start_time >= ?")
args.append(date_from)
if date_to:
where_clauses.append("start_time <= ?")
args.append(date_to)
where_str = " AND ".join(where_clauses)
sql = f"""
SELECT m.match_id, m.start_time, m.map_name, m.score_team1, m.score_team2, m.winner_team, m.duration
FROM fact_matches m
WHERE {where_str}
ORDER BY m.start_time DESC
LIMIT ? OFFSET ?
"""
args.extend([per_page, offset])
matches = query_db('l2', sql, args)
# Enrich matches with Avg ELO, Party info, and Our Team Result
if matches:
match_ids = [m['match_id'] for m in matches]
placeholders = ','.join('?' for _ in match_ids)
# Fetch ELO
elo_sql = f"""
SELECT match_id, AVG(group_origin_elo) as avg_elo
FROM fact_match_teams
WHERE match_id IN ({placeholders}) AND group_origin_elo > 0
GROUP BY match_id
"""
elo_rows = query_db('l2', elo_sql, match_ids)
elo_map = {row['match_id']: row['avg_elo'] for row in elo_rows}
# Fetch Max Party Size
party_sql = f"""
SELECT match_id, MAX(cnt) as max_party
FROM (
SELECT match_id, match_team_id, COUNT(*) as cnt
FROM fact_match_players
WHERE match_id IN ({placeholders}) AND match_team_id > 0
GROUP BY match_id, match_team_id
)
GROUP BY match_id
"""
party_rows = query_db('l2', party_sql, match_ids)
party_map = {row['match_id']: row['max_party'] for row in party_rows}
# --- New: Determine "Our Team" Result ---
# Logic: Check if any player from `active_roster` played in these matches.
# Use WebService to get the active roster
from web.services.web_service import WebService
import json
lineups = WebService.get_lineups()
active_roster_ids = []
if lineups:
try:
# Load IDs and ensure they are all strings for DB comparison consistency
raw_ids = json.loads(lineups[0]['player_ids_json'])
active_roster_ids = [str(uid) for uid in raw_ids]
except:
pass
# If no roster, we can't determine "Our Result"
if not active_roster_ids:
result_map = {}
else:
# 1. Get UIDs for Roster Members involved in these matches
# We query fact_match_players to ensure we get the UIDs actually used in these matches
roster_placeholders = ','.join('?' for _ in active_roster_ids)
uid_sql = f"""
SELECT DISTINCT steam_id_64, uid
FROM fact_match_players
WHERE match_id IN ({placeholders})
AND CAST(steam_id_64 AS TEXT) IN ({roster_placeholders})
"""
combined_args_uid = match_ids + active_roster_ids
uid_rows = query_db('l2', uid_sql, combined_args_uid)
# Set of "Our UIDs" (as strings)
our_uids = set()
for r in uid_rows:
if r['uid']:
our_uids.add(str(r['uid']))
# 2. Get Group UIDs and Winner info from fact_match_teams
# We need to know which group contains our UIDs
teams_sql = f"""
SELECT fmt.match_id, fmt.group_id, fmt.group_uids, m.winner_team
FROM fact_match_teams fmt
JOIN fact_matches m ON fmt.match_id = m.match_id
WHERE fmt.match_id IN ({placeholders})
"""
teams_rows = query_db('l2', teams_sql, match_ids)
# 3. Determine Result per Match
result_map = {}
# Group data by match
match_groups = {} # match_id -> {group_id: [uids...], winner: int}
for r in teams_rows:
mid = r['match_id']
gid = r['group_id']
uids_str = r['group_uids'] or ""
# Split and clean UIDs
uids = set(str(u).strip() for u in uids_str.split(',') if u.strip())
if mid not in match_groups:
match_groups[mid] = {'groups': {}, 'winner': r['winner_team']}
match_groups[mid]['groups'][gid] = uids
# Analyze
for mid, data in match_groups.items():
winner_gid = data['winner']
groups = data['groups']
our_in_winner = False
our_in_loser = False
# Check each group
for gid, uids in groups.items():
# Intersection of Our UIDs and Group UIDs
common = our_uids.intersection(uids)
if common:
if gid == winner_gid:
our_in_winner = True
else:
our_in_loser = True
if our_in_winner and not our_in_loser:
result_map[mid] = 'win'
elif our_in_loser and not our_in_winner:
result_map[mid] = 'loss'
elif our_in_winner and our_in_loser:
result_map[mid] = 'mixed'
else:
# Fallback: If UID matching failed (maybe missing UIDs), try old team_id method?
# Or just leave it as None (safe)
pass
# Convert to dict to modify
matches = [dict(m) for m in matches]
for m in matches:
m['avg_elo'] = elo_map.get(m['match_id'], 0)
m['max_party'] = party_map.get(m['match_id'], 1)
m['our_result'] = result_map.get(m['match_id'])
# Convert to dict to modify
matches = [dict(m) for m in matches]
for m in matches:
m['avg_elo'] = elo_map.get(m['match_id'], 0)
m['max_party'] = party_map.get(m['match_id'], 1)
m['our_result'] = result_map.get(m['match_id'])
2026-01-26 02:13:06 +08:00
# Count total for pagination
count_sql = f"SELECT COUNT(*) as cnt FROM fact_matches WHERE {where_str}"
total = query_db('l2', count_sql, args[:-2], one=True)['cnt']
return matches, total
@staticmethod
def get_match_detail(match_id):
sql = "SELECT * FROM fact_matches WHERE match_id = ?"
return query_db('l2', sql, [match_id], one=True)
@staticmethod
def get_match_players(match_id):
sql = """
SELECT mp.*, p.username, p.avatar_url
FROM fact_match_players mp
LEFT JOIN dim_players p ON mp.steam_id_64 = p.steam_id_64
WHERE mp.match_id = ?
ORDER BY mp.team_id, mp.rating DESC
"""
2026-01-27 19:23:05 +08:00
rows = query_db('l2', sql, [match_id])
result = []
for r in rows or []:
d = dict(r)
d['avatar_url'] = StatsService.resolve_avatar_url(d.get('steam_id_64'), d.get('avatar_url'))
result.append(d)
return result
2026-01-26 02:13:06 +08:00
@staticmethod
def get_match_rounds(match_id):
sql = "SELECT * FROM fact_rounds WHERE match_id = ? ORDER BY round_num"
return query_db('l2', sql, [match_id])
@staticmethod
def get_players(page=1, per_page=20, search=None, sort_by='rating_desc'):
offset = (page - 1) * per_page
args = []
where_clauses = ["1=1"]
if search:
# Force case-insensitive search
where_clauses.append("(LOWER(username) LIKE LOWER(?) OR steam_id_64 LIKE ?)")
args.append(f"%{search}%")
args.append(f"%{search}%")
where_str = " AND ".join(where_clauses)
# Sort mapping
order_clause = "rating DESC" # Default logic (this query needs refinement as L2 dim_players doesn't store avg rating)
# Wait, dim_players only has static info. We need aggregated stats.
# Ideally, we should fetch from L3 for player list stats.
# But StatsService is for L2.
# For the Player List, we usually want L3 data (Career stats).
# I will leave the detailed stats logic for FeatureService or do a join here if necessary.
# For now, just listing players from dim_players.
sql = f"""
SELECT * FROM dim_players
WHERE {where_str}
LIMIT ? OFFSET ?
"""
args.extend([per_page, offset])
2026-01-27 19:23:05 +08:00
rows = query_db('l2', sql, args)
players = []
for r in rows or []:
d = dict(r)
d['avatar_url'] = StatsService.resolve_avatar_url(d.get('steam_id_64'), d.get('avatar_url'))
players.append(d)
2026-01-26 02:13:06 +08:00
total = query_db('l2', f"SELECT COUNT(*) as cnt FROM dim_players WHERE {where_str}", args[:-2], one=True)['cnt']
return players, total
@staticmethod
def get_player_info(steam_id):
sql = "SELECT * FROM dim_players WHERE steam_id_64 = ?"
2026-01-27 19:23:05 +08:00
r = query_db('l2', sql, [steam_id], one=True)
if not r:
return None
d = dict(r)
d['avatar_url'] = StatsService.resolve_avatar_url(steam_id, d.get('avatar_url'))
return d
2026-01-26 02:13:06 +08:00
@staticmethod
def get_daily_match_counts(days=365):
# Return list of {date: 'YYYY-MM-DD', count: N}
sql = """
SELECT date(start_time, 'unixepoch') as day, COUNT(*) as count
FROM fact_matches
WHERE start_time > strftime('%s', 'now', ?)
GROUP BY day
ORDER BY day
"""
# sqlite modifier for 'now' needs format like '-365 days'
modifier = f'-{days} days'
rows = query_db('l2', sql, [modifier])
return rows
@staticmethod
def get_players_by_ids(steam_ids):
if not steam_ids:
return []
placeholders = ','.join('?' for _ in steam_ids)
sql = f"SELECT * FROM dim_players WHERE steam_id_64 IN ({placeholders})"
2026-01-27 19:23:05 +08:00
rows = query_db('l2', sql, steam_ids)
result = []
for r in rows or []:
d = dict(r)
d['avatar_url'] = StatsService.resolve_avatar_url(d.get('steam_id_64'), d.get('avatar_url'))
result.append(d)
return result
2026-01-26 02:13:06 +08:00
@staticmethod
def get_player_basic_stats(steam_id):
2026-01-29 02:21:44 +08:00
l3 = query_db(
"l3",
"""
SELECT
total_matches as matches_played,
core_avg_rating as rating,
core_avg_kd as kd,
core_avg_kast as kast,
core_avg_adr as adr
FROM dm_player_features
WHERE steam_id_64 = ?
""",
[steam_id],
one=True,
)
if l3 and (l3["matches_played"] or 0) > 0:
return dict(l3)
2026-01-26 02:13:06 +08:00
sql = """
SELECT
AVG(rating) as rating,
SUM(kills) as total_kills,
SUM(deaths) as total_deaths,
AVG(kd_ratio) as avg_kd,
AVG(kast) as kast,
AVG(adr) as adr,
COUNT(*) as matches_played
FROM fact_match_players
WHERE steam_id_64 = ?
"""
2026-01-29 02:21:44 +08:00
row = query_db("l2", sql, [steam_id], one=True)
if row and row["matches_played"] > 0:
2026-01-26 02:13:06 +08:00
res = dict(row)
2026-01-29 02:21:44 +08:00
kills = res.get("total_kills") or 0
deaths = res.get("total_deaths") or 0
2026-01-26 02:13:06 +08:00
if deaths > 0:
2026-01-29 02:21:44 +08:00
res["kd"] = kills / deaths
2026-01-26 02:13:06 +08:00
else:
2026-01-29 02:21:44 +08:00
res["kd"] = kills
if res["kd"] == 0 and res["avg_kd"] and res["avg_kd"] > 0:
res["kd"] = res["avg_kd"]
if res["adr"] is None:
res["adr"] = 0.0
2026-01-26 02:13:06 +08:00
return res
return None
@staticmethod
def get_shared_matches(steam_ids):
# Find matches where ALL steam_ids were present
if not steam_ids or len(steam_ids) < 1:
2026-01-26 02:13:06 +08:00
return []
placeholders = ','.join('?' for _ in steam_ids)
count = len(steam_ids)
# We need to know which team the players were on to determine win/loss
# Assuming they were on the SAME team for "shared experience"
# If count=1, it's just match history
# Query: Get matches where all steam_ids are present
# Also join to get team_id to check if they were on the same team (optional but better)
# For simplicity in v1: Just check presence in the match.
# AND check if the player won.
# We need to return: match_id, map_name, score, result (Win/Loss)
# "Result" is relative to the lineup.
# If they were on the winning team, it's a Win.
2026-01-26 02:13:06 +08:00
sql = f"""
SELECT m.match_id, m.start_time, m.map_name, m.score_team1, m.score_team2, m.winner_team,
MAX(mp.team_id) as player_team_id -- Just take one team_id (assuming same)
2026-01-26 02:13:06 +08:00
FROM fact_matches m
JOIN fact_match_players mp ON m.match_id = mp.match_id
WHERE mp.steam_id_64 IN ({placeholders})
GROUP BY m.match_id
HAVING COUNT(DISTINCT mp.steam_id_64) = ?
ORDER BY m.start_time DESC
"""
args = list(steam_ids)
args.append(count)
rows = query_db('l2', sql, args)
results = []
for r in rows:
# Determine if Win
# winner_team in DB is 'Team 1' or 'Team 2' usually, or the team name.
# fact_matches.winner_team stores the NAME of the winner? Or 'team1'/'team2'?
# Let's check how L2_Builder stores it. Usually it stores the name.
# But fact_match_players.team_id stores the name too.
# Logic: If m.winner_team == mp.team_id, then Win.
is_win = (r['winner_team'] == r['player_team_id'])
# If winner_team is NULL or empty, it's a draw?
if not r['winner_team']:
result_str = 'Draw'
elif is_win:
result_str = 'Win'
else:
result_str = 'Loss'
res = dict(r)
res['is_win'] = is_win # Boolean for styling
res['result_str'] = result_str # Text for display
results.append(res)
return results
2026-01-26 02:13:06 +08:00
@staticmethod
def get_player_trend(steam_id, limit=20):
2026-01-29 02:21:44 +08:00
l3_sql = """
SELECT *
FROM (
SELECT
match_date as start_time,
rating,
kd_ratio,
adr,
kast,
match_id,
map_name,
is_win,
match_sequence as match_index
FROM dm_player_match_history
WHERE steam_id_64 = ?
ORDER BY match_date DESC
LIMIT ?
)
ORDER BY start_time ASC
"""
l3_rows = query_db("l3", l3_sql, [steam_id, limit])
if l3_rows:
return l3_rows
2026-01-26 02:13:06 +08:00
sql = """
SELECT * FROM (
SELECT
m.start_time,
mp.rating,
mp.kd_ratio,
mp.adr,
m.match_id,
m.map_name,
mp.is_win,
mp.match_team_id,
(SELECT COUNT(*)
FROM fact_match_players p2
WHERE p2.match_id = mp.match_id
AND p2.match_team_id = mp.match_team_id
2026-01-29 02:21:44 +08:00
AND p2.match_team_id > 0
2026-01-26 18:36:47 +08:00
) as party_size,
(
SELECT COUNT(*)
FROM fact_matches m2
WHERE m2.start_time <= m.start_time
) as match_index
2026-01-26 02:13:06 +08:00
FROM fact_match_players mp
JOIN fact_matches m ON mp.match_id = m.match_id
WHERE mp.steam_id_64 = ?
ORDER BY m.start_time DESC
LIMIT ?
) ORDER BY start_time ASC
"""
2026-01-29 02:21:44 +08:00
return query_db("l2", sql, [steam_id, limit])
2026-01-26 02:13:06 +08:00
@staticmethod
def get_recent_performance_stats(steam_id):
"""
Calculates Avg Rating and Rating Variance for:
- Last 5, 10, 15 matches
- Last 5, 10, 15 days
"""
2026-01-29 02:21:44 +08:00
def avg_var(nums):
if not nums:
return 0.0, 0.0
n = len(nums)
avg = sum(nums) / n
var = sum((x - avg) ** 2 for x in nums) / n
return avg, var
rows = query_db(
"l3",
"""
SELECT match_date as t, rating
FROM dm_player_match_history
WHERE steam_id_64 = ?
ORDER BY match_date DESC
""",
[steam_id],
)
if not rows:
rows = query_db(
"l2",
"""
SELECT m.start_time as t, mp.rating
FROM fact_match_players mp
JOIN fact_matches m ON mp.match_id = m.match_id
WHERE mp.steam_id_64 = ?
ORDER BY m.start_time DESC
""",
[steam_id],
)
if not rows:
return {}
2026-01-29 02:21:44 +08:00
matches = [{"time": r["t"], "rating": float(r["rating"] or 0)} for r in rows]
stats = {}
2026-01-29 02:21:44 +08:00
for n in [5, 10, 15]:
subset = matches[:n]
2026-01-29 02:21:44 +08:00
ratings = [m["rating"] for m in subset]
avg, var = avg_var(ratings)
stats[f"last_{n}_matches"] = {"avg": avg, "var": var, "count": len(ratings)}
import time
2026-01-29 02:21:44 +08:00
now = time.time()
for d in [5, 10, 15]:
cutoff = now - (d * 24 * 3600)
2026-01-29 02:21:44 +08:00
subset = [m for m in matches if (m["time"] or 0) >= cutoff]
ratings = [m["rating"] for m in subset]
avg, var = avg_var(ratings)
stats[f"last_{d}_days"] = {"avg": avg, "var": var, "count": len(ratings)}
return stats
2026-01-26 18:36:47 +08:00
@staticmethod
def get_roster_stats_distribution(target_steam_id):
"""
Calculates rank and distribution of the target player within the active roster.
Now covers all L3 Basic Features for Detailed Panel.
2026-01-26 18:36:47 +08:00
"""
from web.services.web_service import WebService
from web.services.feature_service import FeatureService
2026-01-26 18:36:47 +08:00
import json
2026-01-29 12:18:05 +08:00
# 1. Get Active Roster IDs
2026-01-26 18:36:47 +08:00
lineups = WebService.get_lineups()
active_roster_ids = []
if lineups:
2026-01-29 12:18:05 +08:00
try:
raw_ids = json.loads(lineups[0]['player_ids_json'])
active_roster_ids = [str(uid) for uid in raw_ids]
except:
pass
2026-01-26 18:36:47 +08:00
if not active_roster_ids:
return None
2026-01-29 02:21:44 +08:00
placeholders = ",".join("?" for _ in active_roster_ids)
rows = query_db("l3", f"SELECT * FROM dm_player_features WHERE steam_id_64 IN ({placeholders})", active_roster_ids)
2026-01-26 18:36:47 +08:00
if not rows:
return None
2026-01-29 02:21:44 +08:00
stats_map = {str(row["steam_id_64"]): FeatureService._normalize_features(dict(row)) for row in rows}
2026-01-29 12:18:05 +08:00
target_steam_id = str(target_steam_id)
# If target not in map (e.g. no L3 data), try to add empty default
2026-01-26 18:36:47 +08:00
if target_steam_id not in stats_map:
2026-01-29 12:18:05 +08:00
stats_map[target_steam_id] = {}
metrics = [
2026-01-29 02:21:44 +08:00
# TIER 1: CORE
# Basic Performance
"core_avg_rating", "core_avg_rating2", "core_avg_kd", "core_avg_adr", "core_avg_kast",
"core_avg_rws", "core_avg_hs_kills", "core_hs_rate", "core_total_kills", "core_total_deaths",
"core_total_assists", "core_avg_assists", "core_kpr", "core_dpr", "core_survival_rate",
# Match Stats
"core_win_rate", "core_wins", "core_losses", "core_avg_match_duration", "core_avg_mvps",
"core_mvp_rate", "core_avg_elo_change", "core_total_elo_gained",
# Weapon Stats
"core_avg_awp_kills", "core_awp_usage_rate", "core_avg_knife_kills", "core_avg_zeus_kills",
"core_zeus_buy_rate", "core_top_weapon_kills", "core_top_weapon_hs_rate",
"core_weapon_diversity", "core_rifle_hs_rate", "core_pistol_hs_rate", "core_smg_kills_total",
# Objective Stats
"core_avg_plants", "core_avg_defuses", "core_avg_flash_assists", "core_plant_success_rate",
"core_defuse_success_rate", "core_objective_impact",
# TIER 2: TACTICAL
# Opening Impact
"tac_avg_fk", "tac_avg_fd", "tac_fk_rate", "tac_fd_rate", "tac_fk_success_rate",
"tac_entry_kill_rate", "tac_entry_death_rate", "tac_opening_duel_winrate",
# Multi-Kill
"tac_avg_2k", "tac_avg_3k", "tac_avg_4k", "tac_avg_5k", "tac_multikill_rate", "tac_ace_count",
# Clutch Performance
"tac_clutch_1v1_attempts", "tac_clutch_1v1_wins", "tac_clutch_1v1_rate",
"tac_clutch_1v2_attempts", "tac_clutch_1v2_wins", "tac_clutch_1v2_rate",
"tac_clutch_1v3_plus_attempts", "tac_clutch_1v3_plus_wins", "tac_clutch_1v3_plus_rate",
"tac_clutch_impact_score",
# Utility Mastery
"tac_util_flash_per_round", "tac_util_smoke_per_round", "tac_util_molotov_per_round",
"tac_util_he_per_round", "tac_util_usage_rate", "tac_util_nade_dmg_per_round",
"tac_util_nade_dmg_per_nade", "tac_util_flash_time_per_round", "tac_util_flash_enemies_per_round",
"tac_util_flash_efficiency", "tac_util_smoke_timing_score", "tac_util_impact_score",
# Economy Efficiency
"tac_eco_dmg_per_1k", "tac_eco_kpr_eco_rounds", "tac_eco_kd_eco_rounds",
"tac_eco_kpr_force_rounds", "tac_eco_kpr_full_rounds", "tac_eco_save_discipline",
"tac_eco_force_success_rate", "tac_eco_efficiency_score",
# TIER 3: INTELLIGENCE
# High IQ Kills
"int_wallbang_kills", "int_wallbang_rate", "int_smoke_kills", "int_smoke_kill_rate",
"int_blind_kills", "int_blind_kill_rate", "int_noscope_kills", "int_noscope_rate", "int_high_iq_score",
# Timing Analysis
"int_timing_early_kills", "int_timing_mid_kills", "int_timing_late_kills",
"int_timing_early_kill_share", "int_timing_mid_kill_share", "int_timing_late_kill_share",
"int_timing_avg_kill_time", "int_timing_early_deaths", "int_timing_early_death_rate",
"int_timing_aggression_index", "int_timing_patience_score", "int_timing_first_contact_time",
# Pressure Performance
"int_pressure_comeback_kd", "int_pressure_comeback_rating", "int_pressure_losing_streak_kd",
"int_pressure_matchpoint_kpr", "int_pressure_matchpoint_rating", "int_pressure_clutch_composure",
"int_pressure_entry_in_loss", "int_pressure_performance_index", "int_pressure_big_moment_score",
"int_pressure_tilt_resistance",
# Position Mastery
"int_pos_site_a_control_rate", "int_pos_site_b_control_rate", "int_pos_mid_control_rate",
"int_pos_position_diversity", "int_pos_rotation_speed", "int_pos_map_coverage",
"int_pos_lurk_tendency", "int_pos_site_anchor_score", "int_pos_entry_route_diversity",
"int_pos_retake_positioning", "int_pos_postplant_positioning", "int_pos_spatial_iq_score",
"int_pos_avg_distance_from_teammates",
# Trade Network
"int_trade_kill_count", "int_trade_kill_rate", "int_trade_response_time",
"int_trade_given_count", "int_trade_given_rate", "int_trade_balance",
"int_trade_efficiency", "int_teamwork_score",
# TIER 4: META
# Stability
"meta_rating_volatility", "meta_recent_form_rating", "meta_win_rating", "meta_loss_rating",
"meta_rating_consistency", "meta_time_rating_correlation", "meta_map_stability", "meta_elo_tier_stability",
# Side Preference
"meta_side_ct_rating", "meta_side_t_rating", "meta_side_ct_kd", "meta_side_t_kd",
"meta_side_ct_win_rate", "meta_side_t_win_rate", "meta_side_ct_fk_rate", "meta_side_t_fk_rate",
"meta_side_ct_kast", "meta_side_t_kast", "meta_side_rating_diff", "meta_side_kd_diff",
"meta_side_balance_score",
# Opponent Adaptation
"meta_opp_vs_lower_elo_rating", "meta_opp_vs_similar_elo_rating", "meta_opp_vs_higher_elo_rating",
"meta_opp_vs_lower_elo_kd", "meta_opp_vs_similar_elo_kd", "meta_opp_vs_higher_elo_kd",
"meta_opp_elo_adaptation", "meta_opp_stomping_score", "meta_opp_upset_score",
"meta_opp_consistency_across_elos", "meta_opp_rank_resistance", "meta_opp_smurf_detection",
# Map Specialization
"meta_map_best_rating", "meta_map_worst_rating", "meta_map_diversity", "meta_map_pool_size",
"meta_map_specialist_score", "meta_map_versatility", "meta_map_comfort_zone_rate", "meta_map_adaptation",
# Session Pattern
"meta_session_avg_matches_per_day", "meta_session_longest_streak", "meta_session_weekend_rating",
"meta_session_weekday_rating", "meta_session_morning_rating", "meta_session_afternoon_rating",
"meta_session_evening_rating", "meta_session_night_rating",
# TIER 5: COMPOSITE
"score_aim", "score_clutch", "score_pistol", "score_defense", "score_utility",
"score_stability", "score_economy", "score_pace", "score_overall", "tier_percentile",
# Legacy Mappings (keep for compatibility if needed, or remove if fully migrated)
"basic_avg_rating", "basic_avg_kd", "basic_avg_adr", "basic_avg_kast", "basic_avg_rws",
]
2026-01-29 02:21:44 +08:00
lower_is_better = []
2026-01-27 21:26:07 +08:00
2026-01-26 18:36:47 +08:00
result = {}
for m in metrics:
2026-01-29 02:21:44 +08:00
values = []
non_numeric = False
for p in stats_map.values():
raw = (p or {}).get(m)
if raw is None:
raw = 0
try:
values.append(float(raw))
except Exception:
non_numeric = True
break
raw_target = (stats_map.get(target_steam_id) or {}).get(m)
if raw_target is None:
raw_target = 0
try:
target_val = float(raw_target)
except Exception:
non_numeric = True
target_val = 0
2026-01-26 18:36:47 +08:00
2026-01-29 02:21:44 +08:00
if non_numeric:
result[m] = None
continue
if not values:
2026-01-26 18:36:47 +08:00
result[m] = None
continue
2026-01-27 21:26:07 +08:00
# Sort: Reverse (High to Low) by default, unless in lower_is_better
is_reverse = m not in lower_is_better
values.sort(reverse=is_reverse)
2026-01-26 18:36:47 +08:00
# Rank
2026-01-26 18:36:47 +08:00
try:
rank = values.index(target_val) + 1
except ValueError:
rank = len(values)
2026-01-26 18:36:47 +08:00
result[m] = {
'val': target_val,
'rank': rank,
'total': len(values),
'min': min(values),
'max': max(values),
2026-01-27 21:26:07 +08:00
'avg': sum(values) / len(values),
'inverted': not is_reverse # Flag for frontend to invert bar
2026-01-26 18:36:47 +08:00
}
legacy_map = {
2026-01-29 02:21:44 +08:00
"basic_avg_rating": "rating",
"basic_avg_kd": "kd",
"basic_avg_adr": "adr",
"basic_avg_kast": "kast",
}
if m in legacy_map:
result[legacy_map[m]] = result[m]
2026-01-26 18:36:47 +08:00
return result
2026-01-26 02:13:06 +08:00
@staticmethod
def get_live_matches():
# Query matches started in last 2 hours with no winner
# Assuming we have a way to ingest live matches.
# For now, this query is 'formal' but will likely return empty on static dataset.
sql = """
SELECT m.match_id, m.map_name, m.score_team1, m.score_team2, m.start_time
FROM fact_matches m
WHERE m.winner_team IS NULL
AND m.start_time > strftime('%s', 'now', '-2 hours')
"""
return query_db('l2', sql)
@staticmethod
def get_head_to_head_stats(match_id):
"""
Returns a matrix of kills between players.
List of {attacker_steam_id, victim_steam_id, kills}
"""
sql = """
SELECT attacker_steam_id, victim_steam_id, COUNT(*) as kills
FROM fact_round_events
WHERE match_id = ? AND event_type = 'kill'
GROUP BY attacker_steam_id, victim_steam_id
"""
return query_db('l2', sql, [match_id])
@staticmethod
def get_match_round_details(match_id):
"""
Returns a detailed dictionary of rounds, events, and economy.
{
round_num: {
info: {winner_side, win_reason_desc, end_time_stamp...},
events: [ {event_type, event_time, attacker..., weapon...}, ... ],
economy: { steam_id: {main_weapon, equipment_value...}, ... }
}
}
"""
# 1. Base Round Info
rounds_sql = "SELECT * FROM fact_rounds WHERE match_id = ? ORDER BY round_num"
rounds_rows = query_db('l2', rounds_sql, [match_id])
if not rounds_rows:
return {}
# 2. Events
events_sql = """
SELECT * FROM fact_round_events
WHERE match_id = ?
ORDER BY round_num, event_time
"""
events_rows = query_db('l2', events_sql, [match_id])
# 3. Economy (if avail)
eco_sql = """
SELECT * FROM fact_round_player_economy
WHERE match_id = ?
"""
eco_rows = query_db('l2', eco_sql, [match_id])
# Structure Data
result = {}
# Initialize rounds
for r in rounds_rows:
r_num = r['round_num']
result[r_num] = {
'info': dict(r),
'events': [],
'economy': {}
}
# Group events
for e in events_rows:
r_num = e['round_num']
if r_num in result:
result[r_num]['events'].append(dict(e))
# Group economy
for eco in eco_rows:
r_num = eco['round_num']
sid = eco['steam_id_64']
if r_num in result:
result[r_num]['economy'][sid] = dict(eco)
return result