Files
clutch/database/L3/processors/base_processor.py

321 lines
10 KiB
Python
Raw Permalink Normal View History

"""
Base processor classes and utility functions for L3 feature calculation
"""
import sqlite3
import math
from typing import Dict, Any, List, Optional
from abc import ABC, abstractmethod
class SafeAggregator:
"""Utility class for safe mathematical operations with NULL handling"""
@staticmethod
def safe_divide(numerator: float, denominator: float, default: float = 0.0) -> float:
"""Safe division with NULL/zero handling"""
if denominator is None or denominator == 0:
return default
if numerator is None:
return default
return numerator / denominator
@staticmethod
def safe_avg(values: List[float], default: float = 0.0) -> float:
"""Safe average calculation"""
if not values or len(values) == 0:
return default
valid_values = [v for v in values if v is not None]
if not valid_values:
return default
return sum(valid_values) / len(valid_values)
@staticmethod
def safe_stddev(values: List[float], default: float = 0.0) -> float:
"""Safe standard deviation calculation"""
if not values or len(values) < 2:
return default
valid_values = [v for v in values if v is not None]
if len(valid_values) < 2:
return default
mean = sum(valid_values) / len(valid_values)
variance = sum((x - mean) ** 2 for x in valid_values) / len(valid_values)
return math.sqrt(variance)
@staticmethod
def safe_sum(values: List[float], default: float = 0.0) -> float:
"""Safe sum calculation"""
if not values:
return default
valid_values = [v for v in values if v is not None]
return sum(valid_values) if valid_values else default
@staticmethod
def safe_min(values: List[float], default: float = 0.0) -> float:
"""Safe minimum calculation"""
if not values:
return default
valid_values = [v for v in values if v is not None]
return min(valid_values) if valid_values else default
@staticmethod
def safe_max(values: List[float], default: float = 0.0) -> float:
"""Safe maximum calculation"""
if not values:
return default
valid_values = [v for v in values if v is not None]
return max(valid_values) if valid_values else default
class NormalizationUtils:
"""Z-score normalization and scaling utilities"""
@staticmethod
def z_score_normalize(value: float, mean: float, std: float,
scale_min: float = 0.0, scale_max: float = 100.0) -> float:
"""
Z-score normalization to a target range
Args:
value: Value to normalize
mean: Population mean
std: Population standard deviation
scale_min: Target minimum (default: 0)
scale_max: Target maximum (default: 100)
Returns:
Normalized value in [scale_min, scale_max] range
"""
if std == 0 or std is None:
return (scale_min + scale_max) / 2.0
# Calculate z-score
z = (value - mean) / std
# Map to target range (±3σ covers ~99.7% of data)
# z = -3 → scale_min, z = 0 → midpoint, z = 3 → scale_max
midpoint = (scale_min + scale_max) / 2.0
scale_range = (scale_max - scale_min) / 6.0 # 6σ total range
normalized = midpoint + (z * scale_range)
# Clamp to target range
return max(scale_min, min(scale_max, normalized))
@staticmethod
def percentile_normalize(value: float, all_values: List[float],
scale_min: float = 0.0, scale_max: float = 100.0) -> float:
"""
Percentile-based normalization
Args:
value: Value to normalize
all_values: All values in population
scale_min: Target minimum
scale_max: Target maximum
Returns:
Normalized value based on percentile
"""
if not all_values:
return scale_min
sorted_values = sorted(all_values)
rank = sum(1 for v in sorted_values if v < value)
percentile = rank / len(sorted_values)
return scale_min + (percentile * (scale_max - scale_min))
@staticmethod
def min_max_normalize(value: float, min_val: float, max_val: float,
scale_min: float = 0.0, scale_max: float = 100.0) -> float:
"""Min-max normalization to target range"""
if max_val == min_val:
return (scale_min + scale_max) / 2.0
normalized = (value - min_val) / (max_val - min_val)
return scale_min + (normalized * (scale_max - scale_min))
@staticmethod
def calculate_population_stats(conn_l3: sqlite3.Connection, column: str) -> Dict[str, float]:
"""
Calculate population mean and std for a column in dm_player_features
Args:
conn_l3: L3 database connection
column: Column name to analyze
Returns:
dict with 'mean', 'std', 'min', 'max'
"""
cursor = conn_l3.cursor()
cursor.execute(f"""
SELECT
AVG({column}) as mean,
STDDEV({column}) as std,
MIN({column}) as min,
MAX({column}) as max
FROM dm_player_features
WHERE {column} IS NOT NULL
""")
row = cursor.fetchone()
return {
'mean': row[0] if row[0] is not None else 0.0,
'std': row[1] if row[1] is not None else 1.0,
'min': row[2] if row[2] is not None else 0.0,
'max': row[3] if row[3] is not None else 0.0
}
class BaseFeatureProcessor(ABC):
"""
Abstract base class for all feature processors
Each processor implements the calculate() method which returns a dict
of feature_name: value pairs.
"""
MIN_MATCHES_REQUIRED = 5 # Minimum matches needed for feature calculation
@staticmethod
@abstractmethod
def calculate(steam_id: str, conn_l2: sqlite3.Connection) -> Dict[str, Any]:
"""
Calculate features for a specific player
Args:
steam_id: Player's Steam ID (steam_id_64)
conn_l2: Connection to L2 database
Returns:
Dictionary of {feature_name: value}
"""
pass
@staticmethod
def check_min_matches(steam_id: str, conn_l2: sqlite3.Connection,
min_required: int = None) -> bool:
"""
Check if player has minimum required matches
Args:
steam_id: Player's Steam ID
conn_l2: L2 database connection
min_required: Minimum matches (uses class default if None)
Returns:
True if player has enough matches
"""
if min_required is None:
min_required = BaseFeatureProcessor.MIN_MATCHES_REQUIRED
cursor = conn_l2.cursor()
cursor.execute("""
SELECT COUNT(*) FROM fact_match_players
WHERE steam_id_64 = ?
""", (steam_id,))
count = cursor.fetchone()[0]
return count >= min_required
@staticmethod
def get_player_match_count(steam_id: str, conn_l2: sqlite3.Connection) -> int:
"""Get total match count for player"""
cursor = conn_l2.cursor()
cursor.execute("""
SELECT COUNT(*) FROM fact_match_players
WHERE steam_id_64 = ?
""", (steam_id,))
return cursor.fetchone()[0]
@staticmethod
def get_player_round_count(steam_id: str, conn_l2: sqlite3.Connection) -> int:
"""Get total round count for player"""
cursor = conn_l2.cursor()
cursor.execute("""
SELECT SUM(round_total) FROM fact_match_players
WHERE steam_id_64 = ?
""", (steam_id,))
result = cursor.fetchone()[0]
return result if result is not None else 0
class WeaponCategories:
"""Weapon categorization constants"""
RIFLES = [
'ak47', 'aug', 'm4a1', 'm4a1_silencer', 'sg556', 'galilar', 'famas'
]
PISTOLS = [
'glock', 'usp_silencer', 'hkp2000', 'p250', 'fiveseven', 'tec9',
'cz75a', 'deagle', 'elite', 'revolver'
]
SMGS = [
'mac10', 'mp9', 'mp7', 'mp5sd', 'ump45', 'p90', 'bizon'
]
SNIPERS = [
'awp', 'ssg08', 'scar20', 'g3sg1'
]
HEAVY = [
'nova', 'xm1014', 'mag7', 'sawedoff', 'm249', 'negev'
]
@classmethod
def get_category(cls, weapon_name: str) -> str:
"""Get category for a weapon"""
weapon_clean = weapon_name.lower().replace('weapon_', '')
if weapon_clean in cls.RIFLES:
return 'rifle'
elif weapon_clean in cls.PISTOLS:
return 'pistol'
elif weapon_clean in cls.SMGS:
return 'smg'
elif weapon_clean in cls.SNIPERS:
return 'sniper'
elif weapon_clean in cls.HEAVY:
return 'heavy'
elif weapon_clean == 'knife':
return 'knife'
elif weapon_clean == 'hegrenade':
return 'grenade'
else:
return 'other'
class MapAreas:
"""Map area classification utilities (for position analysis)"""
# This will be expanded with actual map coordinates in IntelligenceProcessor
SITE_A = 'site_a'
SITE_B = 'site_b'
MID = 'mid'
SPAWN_T = 'spawn_t'
SPAWN_CT = 'spawn_ct'
@staticmethod
def classify_position(x: float, y: float, z: float, map_name: str) -> str:
"""
Classify position into map area (simplified)
Full implementation requires map-specific coordinate ranges
"""
# Placeholder - will be implemented with map data
return "unknown"
# Export all classes
__all__ = [
'SafeAggregator',
'NormalizationUtils',
'BaseFeatureProcessor',
'WeaponCategories',
'MapAreas'
]