diff --git a/ETL/L1A.py b/ETL/L1A.py index 37c42ed..bcc6d6d 100644 --- a/ETL/L1A.py +++ b/ETL/L1A.py @@ -2,6 +2,7 @@ import os import json import sqlite3 import glob +import argparse # Added # Paths BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) @@ -26,17 +27,33 @@ def init_db(): return conn def process_files(): + parser = argparse.ArgumentParser() + parser.add_argument('--force', action='store_true', help='Force reprocessing of all files') + args = parser.parse_args() + conn = init_db() cursor = conn.cursor() + # Get existing match_ids to skip + existing_ids = set() + if not args.force: + try: + cursor.execute("SELECT match_id FROM raw_iframe_network") + existing_ids = set(row[0] for row in cursor.fetchall()) + print(f"Found {len(existing_ids)} existing matches in DB. Incremental mode active.") + except Exception as e: + print(f"Error checking existing data: {e}") + # Pattern to match all iframe_network.json files # output_arena/*/iframe_network.json pattern = os.path.join(OUTPUT_ARENA_DIR, '*', 'iframe_network.json') files = glob.glob(pattern) - print(f"Found {len(files)} files to process.") + print(f"Found {len(files)} files in directory.") count = 0 + skipped = 0 + for file_path in files: try: # Extract match_id from directory name @@ -44,6 +61,10 @@ def process_files(): parent_dir = os.path.dirname(file_path) match_id = os.path.basename(parent_dir) + if match_id in existing_ids: + skipped += 1 + continue + with open(file_path, 'r', encoding='utf-8') as f: content = f.read() @@ -63,7 +84,7 @@ def process_files(): conn.commit() conn.close() - print(f"Finished processing {count} files.") + print(f"Finished. Processed: {count}, Skipped: {skipped}.") if __name__ == '__main__': - process_files() + process_files() \ No newline at end of file diff --git a/ETL/verify/clean_dirty_data.py b/ETL/verify/clean_dirty_data.py new file mode 100644 index 0000000..60280d5 --- /dev/null +++ b/ETL/verify/clean_dirty_data.py @@ -0,0 +1,39 @@ +import sqlite3 +import os + +# 路径指向正式数据库 +BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +DB_PATH = os.path.join(BASE_DIR, 'database', 'L1A', 'L1A.sqlite') + +def clean_db(): + if not os.path.exists(DB_PATH): + print(f"Database not found at {DB_PATH}") + return + + print(f"Connecting to production DB: {DB_PATH}") + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + # 查找脏数据 (假设模拟数据的 match_id 是 match_001, match_002, match_003) + dirty_ids = ['match_001', 'match_002', 'match_003'] + + # 也可以用 LIKE 'match_%' 如果您想删得更彻底,但要小心误删 + # 这里我们精准删除 + + deleted_count = 0 + for mid in dirty_ids: + cursor.execute("DELETE FROM raw_iframe_network WHERE match_id = ?", (mid,)) + if cursor.rowcount > 0: + print(f"Deleted dirty record: {mid}") + deleted_count += 1 + + conn.commit() + conn.close() + + if deleted_count > 0: + print(f"Cleanup complete. Removed {deleted_count} dirty records.") + else: + print("Cleanup complete. No dirty records found.") + +if __name__ == "__main__": + clean_db() \ No newline at end of file diff --git a/ETL/verify/setup_test_data.py b/ETL/verify/setup_test_data.py new file mode 100644 index 0000000..0641b87 --- /dev/null +++ b/ETL/verify/setup_test_data.py @@ -0,0 +1,35 @@ +import os +import json + +# 定义路径 +CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) +PROJECT_ROOT = os.path.dirname(os.path.dirname(CURRENT_DIR)) +OUTPUT_ARENA_DIR = os.path.join(PROJECT_ROOT, 'output_arena') + +def create_mock_data(): + if not os.path.exists(OUTPUT_ARENA_DIR): + os.makedirs(OUTPUT_ARENA_DIR) + print(f"Created directory: {OUTPUT_ARENA_DIR}") + + # 创建 3 个模拟比赛数据 + mock_matches = ['match_001', 'match_002', 'match_003'] + + for match_id in mock_matches: + match_dir = os.path.join(OUTPUT_ARENA_DIR, match_id) + if not os.path.exists(match_dir): + os.makedirs(match_dir) + + file_path = os.path.join(match_dir, 'iframe_network.json') + if not os.path.exists(file_path): + mock_content = { + "match_id": match_id, + "data": "This is mock data for testing." + } + with open(file_path, 'w', encoding='utf-8') as f: + json.dump(mock_content, f) + print(f"Created mock file: {file_path}") + else: + print(f"File already exists: {file_path}") + +if __name__ == "__main__": + create_mock_data() \ No newline at end of file diff --git a/ETL/verify/test_L1_incremental.py b/ETL/verify/test_L1_incremental.py new file mode 100644 index 0000000..e6ab1a1 --- /dev/null +++ b/ETL/verify/test_L1_incremental.py @@ -0,0 +1,76 @@ +import os +import sqlite3 +import subprocess +import glob + +# 配置路径 +# 当前脚本位于 ETL/verify/ 目录下,需要向上两级找到项目根目录 +CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) +PROJECT_ROOT = os.path.dirname(os.path.dirname(CURRENT_DIR)) + +L1_SCRIPT = os.path.join(PROJECT_ROOT, 'ETL', 'L1A.py') +DB_PATH = os.path.join(PROJECT_ROOT, 'database', 'L1A', 'L1A.sqlite') +OUTPUT_ARENA_DIR = os.path.join(PROJECT_ROOT, 'output_arena') + +def get_db_count(): + """获取数据库中的记录数""" + if not os.path.exists(DB_PATH): + return 0 + try: + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + cursor.execute("SELECT COUNT(*) FROM raw_iframe_network") + count = cursor.fetchone()[0] + conn.close() + return count + except Exception: + return 0 + +def get_file_count(): + """获取源文件总数""" + pattern = os.path.join(OUTPUT_ARENA_DIR, '*', 'iframe_network.json') + files = glob.glob(pattern) + return len(files) + +def run_l1_script(): + """运行 L1 脚本并返回输出""" + # 必须在项目根目录下运行,或者正确处理 Python 路径 + # 这里我们使用绝对路径调用脚本 + result = subprocess.run(['python', L1_SCRIPT], capture_output=True, text=True) + return result.stdout + +def main(): + print("=== 开始 L1 增量逻辑测试 ===") + print(f"项目根目录: {PROJECT_ROOT}") + + # 1. 检查环境 + total_files = get_file_count() + initial_db_count = get_db_count() + print(f"[环境] 源文件总数: {total_files}") + print(f"[环境] 数据库当前记录数: {initial_db_count}") + + # 2. 运行脚本 (第一次) + print("\n--- 运行 L1A.py (Run 1) ---") + output1 = run_l1_script() + print(output1.strip()) + + mid_db_count = get_db_count() + print(f"[状态] 运行后数据库记录数: {mid_db_count}") + + if mid_db_count < total_files: + print("警告: 数据库记录数少于文件数,可能部分文件处理失败或尚未完成。") + + # 3. 运行脚本 (第二次 - 验证增量) + print("\n--- 再次运行 L1A.py (Run 2 - 验证增量) ---") + output2 = run_l1_script() + print(output2.strip()) + + # 4. 验证结果 + expected_msg = f"Skipped: {total_files}" + if expected_msg in output2: + print("\n✅ 测试通过! 第二次运行跳过了所有文件,增量逻辑生效。") + else: + print(f"\n❌ 测试未通过。预期输出应包含 '{expected_msg}'") + +if __name__ == "__main__": + main() \ No newline at end of file