3.0.1: fix
This commit is contained in:
@@ -4,6 +4,8 @@ import os
|
||||
import sys
|
||||
import sqlite3
|
||||
import json
|
||||
import argparse
|
||||
import concurrent.futures
|
||||
|
||||
# Setup logging
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
@@ -94,7 +96,61 @@ def _get_team_players():
|
||||
logger.error(f"Error reading Web DB: {e}")
|
||||
return set()
|
||||
|
||||
def main():
|
||||
def _get_match_date_range(steam_id: str, conn_l2: sqlite3.Connection):
|
||||
cursor = conn_l2.cursor()
|
||||
cursor.execute("""
|
||||
SELECT MIN(m.start_time), MAX(m.start_time)
|
||||
FROM fact_match_players p
|
||||
JOIN fact_matches m ON p.match_id = m.match_id
|
||||
WHERE p.steam_id_64 = ?
|
||||
""", (steam_id,))
|
||||
date_row = cursor.fetchone()
|
||||
first_match_date = date_row[0] if date_row and date_row[0] else None
|
||||
last_match_date = date_row[1] if date_row and date_row[1] else None
|
||||
return first_match_date, last_match_date
|
||||
|
||||
def _build_player_record(steam_id: str):
|
||||
try:
|
||||
from database.L3.processors import (
|
||||
BasicProcessor,
|
||||
TacticalProcessor,
|
||||
IntelligenceProcessor,
|
||||
MetaProcessor,
|
||||
CompositeProcessor
|
||||
)
|
||||
conn_l2 = sqlite3.connect(L2_DB_PATH)
|
||||
conn_l2.row_factory = sqlite3.Row
|
||||
features = {}
|
||||
features.update(BasicProcessor.calculate(steam_id, conn_l2))
|
||||
features.update(TacticalProcessor.calculate(steam_id, conn_l2))
|
||||
features.update(IntelligenceProcessor.calculate(steam_id, conn_l2))
|
||||
features.update(MetaProcessor.calculate(steam_id, conn_l2))
|
||||
features.update(CompositeProcessor.calculate(steam_id, conn_l2, features))
|
||||
match_count = _get_match_count(steam_id, conn_l2)
|
||||
round_count = _get_round_count(steam_id, conn_l2)
|
||||
first_match_date, last_match_date = _get_match_date_range(steam_id, conn_l2)
|
||||
conn_l2.close()
|
||||
return {
|
||||
"steam_id": steam_id,
|
||||
"features": features,
|
||||
"match_count": match_count,
|
||||
"round_count": round_count,
|
||||
"first_match_date": first_match_date,
|
||||
"last_match_date": last_match_date,
|
||||
"error": None,
|
||||
}
|
||||
except Exception as e:
|
||||
return {
|
||||
"steam_id": steam_id,
|
||||
"features": None,
|
||||
"match_count": 0,
|
||||
"round_count": 0,
|
||||
"first_match_date": None,
|
||||
"last_match_date": None,
|
||||
"error": str(e),
|
||||
}
|
||||
|
||||
def main(force_all: bool = False, workers: int = 1):
|
||||
"""
|
||||
Main L3 feature building pipeline using modular processors
|
||||
"""
|
||||
@@ -125,26 +181,29 @@ def main():
|
||||
conn_l3 = sqlite3.connect(L3_DB_PATH)
|
||||
|
||||
try:
|
||||
# 4. Get target players (Team Lineups only)
|
||||
team_players = _get_team_players()
|
||||
if not team_players:
|
||||
logger.warning("No players found in Team Lineups. Aborting L3 build.")
|
||||
return
|
||||
|
||||
# 5. Get distinct players from L2 matching Team Lineups
|
||||
cursor_l2 = conn_l2.cursor()
|
||||
|
||||
# Build placeholder string for IN clause
|
||||
placeholders = ','.join(['?' for _ in team_players])
|
||||
|
||||
sql = f"""
|
||||
SELECT DISTINCT steam_id_64
|
||||
FROM dim_players
|
||||
WHERE steam_id_64 IN ({placeholders})
|
||||
ORDER BY steam_id_64
|
||||
"""
|
||||
|
||||
cursor_l2.execute(sql, list(team_players))
|
||||
if force_all:
|
||||
logger.info("Force mode enabled: building L3 for all players in L2.")
|
||||
sql = """
|
||||
SELECT DISTINCT steam_id_64
|
||||
FROM dim_players
|
||||
ORDER BY steam_id_64
|
||||
"""
|
||||
cursor_l2.execute(sql)
|
||||
else:
|
||||
team_players = _get_team_players()
|
||||
if not team_players:
|
||||
logger.warning("No players found in Team Lineups. Aborting L3 build.")
|
||||
return
|
||||
|
||||
placeholders = ','.join(['?' for _ in team_players])
|
||||
sql = f"""
|
||||
SELECT DISTINCT steam_id_64
|
||||
FROM dim_players
|
||||
WHERE steam_id_64 IN ({placeholders})
|
||||
ORDER BY steam_id_64
|
||||
"""
|
||||
cursor_l2.execute(sql, list(team_players))
|
||||
|
||||
players = cursor_l2.fetchall()
|
||||
total_players = len(players)
|
||||
@@ -156,51 +215,61 @@ def main():
|
||||
|
||||
success_count = 0
|
||||
error_count = 0
|
||||
|
||||
# 6. Process each player
|
||||
for idx, row in enumerate(players, 1):
|
||||
steam_id = row[0]
|
||||
|
||||
try:
|
||||
# Calculate features from each processor tier by tier
|
||||
features = {}
|
||||
processed_count = 0
|
||||
|
||||
if workers and workers > 1:
|
||||
steam_ids = [row[0] for row in players]
|
||||
with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor:
|
||||
futures = [executor.submit(_build_player_record, sid) for sid in steam_ids]
|
||||
for future in concurrent.futures.as_completed(futures):
|
||||
result = future.result()
|
||||
processed_count += 1
|
||||
if result.get("error"):
|
||||
error_count += 1
|
||||
logger.error(f"Error processing player {result.get('steam_id')}: {result.get('error')}")
|
||||
else:
|
||||
_upsert_features(
|
||||
conn_l3,
|
||||
result["steam_id"],
|
||||
result["features"],
|
||||
result["match_count"],
|
||||
result["round_count"],
|
||||
None,
|
||||
result["first_match_date"],
|
||||
result["last_match_date"],
|
||||
)
|
||||
success_count += 1
|
||||
if processed_count % 4 == 0:
|
||||
conn_l3.commit()
|
||||
logger.info(f"Progress: {processed_count}/{total_players} ({success_count} success, {error_count} errors)")
|
||||
else:
|
||||
for idx, row in enumerate(players, 1):
|
||||
steam_id = row[0]
|
||||
|
||||
# Tier 1: CORE (41 columns)
|
||||
features.update(BasicProcessor.calculate(steam_id, conn_l2))
|
||||
|
||||
# Tier 2: TACTICAL (44 columns)
|
||||
features.update(TacticalProcessor.calculate(steam_id, conn_l2))
|
||||
|
||||
# Tier 3: INTELLIGENCE (53 columns)
|
||||
features.update(IntelligenceProcessor.calculate(steam_id, conn_l2))
|
||||
|
||||
# Tier 4: META (52 columns)
|
||||
features.update(MetaProcessor.calculate(steam_id, conn_l2))
|
||||
|
||||
# Tier 5: COMPOSITE (11 columns) - requires previous features
|
||||
features.update(CompositeProcessor.calculate(steam_id, conn_l2, features))
|
||||
|
||||
# Add metadata
|
||||
match_count = _get_match_count(steam_id, conn_l2)
|
||||
round_count = _get_round_count(steam_id, conn_l2)
|
||||
|
||||
# Insert/Update features in L3
|
||||
_upsert_features(conn_l3, steam_id, features, match_count, round_count, conn_l2)
|
||||
|
||||
success_count += 1
|
||||
|
||||
# Batch commit and progress logging
|
||||
if idx % 50 == 0:
|
||||
try:
|
||||
features = {}
|
||||
features.update(BasicProcessor.calculate(steam_id, conn_l2))
|
||||
features.update(TacticalProcessor.calculate(steam_id, conn_l2))
|
||||
features.update(IntelligenceProcessor.calculate(steam_id, conn_l2))
|
||||
features.update(MetaProcessor.calculate(steam_id, conn_l2))
|
||||
features.update(CompositeProcessor.calculate(steam_id, conn_l2, features))
|
||||
match_count = _get_match_count(steam_id, conn_l2)
|
||||
round_count = _get_round_count(steam_id, conn_l2)
|
||||
first_match_date, last_match_date = _get_match_date_range(steam_id, conn_l2)
|
||||
_upsert_features(conn_l3, steam_id, features, match_count, round_count, conn_l2, first_match_date, last_match_date)
|
||||
success_count += 1
|
||||
except Exception as e:
|
||||
error_count += 1
|
||||
logger.error(f"Error processing player {steam_id}: {e}")
|
||||
if error_count <= 3:
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
continue
|
||||
|
||||
processed_count = idx
|
||||
if processed_count % 4 == 0:
|
||||
conn_l3.commit()
|
||||
logger.info(f"Progress: {idx}/{total_players} ({success_count} success, {error_count} errors)")
|
||||
|
||||
except Exception as e:
|
||||
error_count += 1
|
||||
logger.error(f"Error processing player {steam_id}: {e}")
|
||||
if error_count <= 3: # Show details for first 3 errors
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
continue
|
||||
logger.info(f"Progress: {processed_count}/{total_players} ({success_count} success, {error_count} errors)")
|
||||
|
||||
# Final commit
|
||||
conn_l3.commit()
|
||||
@@ -244,23 +313,18 @@ def _get_round_count(steam_id: str, conn_l2: sqlite3.Connection) -> int:
|
||||
|
||||
|
||||
def _upsert_features(conn_l3: sqlite3.Connection, steam_id: str, features: dict,
|
||||
match_count: int, round_count: int, conn_l2: sqlite3.Connection):
|
||||
match_count: int, round_count: int, conn_l2: sqlite3.Connection | None,
|
||||
first_match_date=None, last_match_date=None):
|
||||
"""
|
||||
Insert or update player features in dm_player_features
|
||||
"""
|
||||
cursor_l3 = conn_l3.cursor()
|
||||
cursor_l2 = conn_l2.cursor()
|
||||
|
||||
# Get first and last match dates from L2
|
||||
cursor_l2.execute("""
|
||||
SELECT MIN(m.start_time), MAX(m.start_time)
|
||||
FROM fact_match_players p
|
||||
JOIN fact_matches m ON p.match_id = m.match_id
|
||||
WHERE p.steam_id_64 = ?
|
||||
""", (steam_id,))
|
||||
date_row = cursor_l2.fetchone()
|
||||
first_match_date = date_row[0] if date_row and date_row[0] else None
|
||||
last_match_date = date_row[1] if date_row and date_row[1] else None
|
||||
if first_match_date is None or last_match_date is None:
|
||||
if conn_l2 is not None:
|
||||
first_match_date, last_match_date = _get_match_date_range(steam_id, conn_l2)
|
||||
else:
|
||||
first_match_date = None
|
||||
last_match_date = None
|
||||
|
||||
# Add metadata to features
|
||||
features['total_matches'] = match_count
|
||||
@@ -289,5 +353,12 @@ def _upsert_features(conn_l3: sqlite3.Connection, steam_id: str, features: dict,
|
||||
|
||||
cursor_l3.execute(sql, values)
|
||||
|
||||
def _parse_args():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--force", action="store_true")
|
||||
parser.add_argument("--workers", type=int, default=1)
|
||||
return parser.parse_args()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
args = _parse_args()
|
||||
main(force_all=args.force, workers=args.workers)
|
||||
|
||||
Reference in New Issue
Block a user