0.5.3 +feat: L1A Incremental-refresh implemented.
Reviewed-on: #3 Accepted as fully meet all demands.
This commit was merged in pull request #3.
This commit is contained in:
37
ETL/L1A.py
37
ETL/L1A.py
@@ -1,7 +1,20 @@
|
|||||||
|
"""
|
||||||
|
L1A Data Ingestion Script
|
||||||
|
|
||||||
|
This script reads raw JSON files from the 'output_arena' directory and ingests them into the SQLite database.
|
||||||
|
It supports incremental updates by default, skipping files that have already been processed.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
python ETL/L1A.py # Standard incremental run
|
||||||
|
python ETL/L1A.py --force # Force re-process all files (overwrite existing data)
|
||||||
|
"""
|
||||||
|
|
||||||
import os
|
import os
|
||||||
|
|
||||||
import json
|
import json
|
||||||
import sqlite3
|
import sqlite3
|
||||||
import glob
|
import glob
|
||||||
|
import argparse # Added
|
||||||
|
|
||||||
# Paths
|
# Paths
|
||||||
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||||
@@ -26,17 +39,33 @@ def init_db():
|
|||||||
return conn
|
return conn
|
||||||
|
|
||||||
def process_files():
|
def process_files():
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
parser.add_argument('--force', action='store_true', help='Force reprocessing of all files')
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
conn = init_db()
|
conn = init_db()
|
||||||
cursor = conn.cursor()
|
cursor = conn.cursor()
|
||||||
|
|
||||||
|
# Get existing match_ids to skip
|
||||||
|
existing_ids = set()
|
||||||
|
if not args.force:
|
||||||
|
try:
|
||||||
|
cursor.execute("SELECT match_id FROM raw_iframe_network")
|
||||||
|
existing_ids = set(row[0] for row in cursor.fetchall())
|
||||||
|
print(f"Found {len(existing_ids)} existing matches in DB. Incremental mode active.")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error checking existing data: {e}")
|
||||||
|
|
||||||
# Pattern to match all iframe_network.json files
|
# Pattern to match all iframe_network.json files
|
||||||
# output_arena/*/iframe_network.json
|
# output_arena/*/iframe_network.json
|
||||||
pattern = os.path.join(OUTPUT_ARENA_DIR, '*', 'iframe_network.json')
|
pattern = os.path.join(OUTPUT_ARENA_DIR, '*', 'iframe_network.json')
|
||||||
files = glob.glob(pattern)
|
files = glob.glob(pattern)
|
||||||
|
|
||||||
print(f"Found {len(files)} files to process.")
|
print(f"Found {len(files)} files in directory.")
|
||||||
|
|
||||||
count = 0
|
count = 0
|
||||||
|
skipped = 0
|
||||||
|
|
||||||
for file_path in files:
|
for file_path in files:
|
||||||
try:
|
try:
|
||||||
# Extract match_id from directory name
|
# Extract match_id from directory name
|
||||||
@@ -44,6 +73,10 @@ def process_files():
|
|||||||
parent_dir = os.path.dirname(file_path)
|
parent_dir = os.path.dirname(file_path)
|
||||||
match_id = os.path.basename(parent_dir)
|
match_id = os.path.basename(parent_dir)
|
||||||
|
|
||||||
|
if match_id in existing_ids:
|
||||||
|
skipped += 1
|
||||||
|
continue
|
||||||
|
|
||||||
with open(file_path, 'r', encoding='utf-8') as f:
|
with open(file_path, 'r', encoding='utf-8') as f:
|
||||||
content = f.read()
|
content = f.read()
|
||||||
|
|
||||||
@@ -63,7 +96,7 @@ def process_files():
|
|||||||
|
|
||||||
conn.commit()
|
conn.commit()
|
||||||
conn.close()
|
conn.close()
|
||||||
print(f"Finished processing {count} files.")
|
print(f"Finished. Processed: {count}, Skipped: {skipped}.")
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
process_files()
|
process_files()
|
||||||
@@ -1,7 +1,23 @@
|
|||||||
L1A output_arena/iframe_network.json -> L1A.sqlite(Primary Key: match_id)
|
# ETL Pipeline Documentation
|
||||||
|
|
||||||
|
## 1. L1A (Raw Data Ingestion)
|
||||||
|
**Status**: ✅ Supports Incremental Update
|
||||||
|
|
||||||
|
This script ingests raw JSON files from `output_arena/` into `database/L1A/L1A.sqlite`.
|
||||||
|
|
||||||
|
### Usage
|
||||||
|
```bash
|
||||||
|
# Standard Run (Incremental)
|
||||||
|
# Only processes new files that are not yet in the database.
|
||||||
|
python ETL/L1A.py
|
||||||
|
|
||||||
|
# Force Refresh
|
||||||
|
# Reprocesses ALL files, overwriting existing records.
|
||||||
|
python ETL/L1A.py --force
|
||||||
|
```
|
||||||
|
|
||||||
L1B demoparser2 -> L1B.sqlite
|
L1B demoparser2 -> L1B.sqlite
|
||||||
|
|
||||||
L2 L1A.sqlite (+L1b.sqlite) -> L2.sqlite
|
L2 L1A.sqlite (+L1b.sqlite) -> L2.sqlite
|
||||||
|
|
||||||
L3 Deep Dive.
|
L3 Deep Dive
|
||||||
39
ETL/verify/L1A_incre_test/clean_dirty_data.py
Normal file
39
ETL/verify/L1A_incre_test/clean_dirty_data.py
Normal file
@@ -0,0 +1,39 @@
|
|||||||
|
import sqlite3
|
||||||
|
import os
|
||||||
|
|
||||||
|
# 路径指向正式数据库
|
||||||
|
BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||||
|
DB_PATH = os.path.join(BASE_DIR, 'database', 'L1A', 'L1A.sqlite')
|
||||||
|
|
||||||
|
def clean_db():
|
||||||
|
if not os.path.exists(DB_PATH):
|
||||||
|
print(f"Database not found at {DB_PATH}")
|
||||||
|
return
|
||||||
|
|
||||||
|
print(f"Connecting to production DB: {DB_PATH}")
|
||||||
|
conn = sqlite3.connect(DB_PATH)
|
||||||
|
cursor = conn.cursor()
|
||||||
|
|
||||||
|
# 查找脏数据 (假设模拟数据的 match_id 是 match_001, match_002, match_003)
|
||||||
|
dirty_ids = ['match_001', 'match_002', 'match_003']
|
||||||
|
|
||||||
|
# 也可以用 LIKE 'match_%' 如果您想删得更彻底,但要小心误删
|
||||||
|
# 这里我们精准删除
|
||||||
|
|
||||||
|
deleted_count = 0
|
||||||
|
for mid in dirty_ids:
|
||||||
|
cursor.execute("DELETE FROM raw_iframe_network WHERE match_id = ?", (mid,))
|
||||||
|
if cursor.rowcount > 0:
|
||||||
|
print(f"Deleted dirty record: {mid}")
|
||||||
|
deleted_count += 1
|
||||||
|
|
||||||
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
if deleted_count > 0:
|
||||||
|
print(f"Cleanup complete. Removed {deleted_count} dirty records.")
|
||||||
|
else:
|
||||||
|
print("Cleanup complete. No dirty records found.")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
clean_db()
|
||||||
35
ETL/verify/L1A_incre_test/setup_test_data.py
Normal file
35
ETL/verify/L1A_incre_test/setup_test_data.py
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
import os
|
||||||
|
import json
|
||||||
|
|
||||||
|
# 定义路径
|
||||||
|
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||||
|
PROJECT_ROOT = os.path.dirname(os.path.dirname(CURRENT_DIR))
|
||||||
|
OUTPUT_ARENA_DIR = os.path.join(PROJECT_ROOT, 'output_arena')
|
||||||
|
|
||||||
|
def create_mock_data():
|
||||||
|
if not os.path.exists(OUTPUT_ARENA_DIR):
|
||||||
|
os.makedirs(OUTPUT_ARENA_DIR)
|
||||||
|
print(f"Created directory: {OUTPUT_ARENA_DIR}")
|
||||||
|
|
||||||
|
# 创建 3 个模拟比赛数据
|
||||||
|
mock_matches = ['match_001', 'match_002', 'match_003']
|
||||||
|
|
||||||
|
for match_id in mock_matches:
|
||||||
|
match_dir = os.path.join(OUTPUT_ARENA_DIR, match_id)
|
||||||
|
if not os.path.exists(match_dir):
|
||||||
|
os.makedirs(match_dir)
|
||||||
|
|
||||||
|
file_path = os.path.join(match_dir, 'iframe_network.json')
|
||||||
|
if not os.path.exists(file_path):
|
||||||
|
mock_content = {
|
||||||
|
"match_id": match_id,
|
||||||
|
"data": "This is mock data for testing."
|
||||||
|
}
|
||||||
|
with open(file_path, 'w', encoding='utf-8') as f:
|
||||||
|
json.dump(mock_content, f)
|
||||||
|
print(f"Created mock file: {file_path}")
|
||||||
|
else:
|
||||||
|
print(f"File already exists: {file_path}")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
create_mock_data()
|
||||||
76
ETL/verify/L1A_incre_test/test_L1_incremental.py
Normal file
76
ETL/verify/L1A_incre_test/test_L1_incremental.py
Normal file
@@ -0,0 +1,76 @@
|
|||||||
|
import os
|
||||||
|
import sqlite3
|
||||||
|
import subprocess
|
||||||
|
import glob
|
||||||
|
|
||||||
|
# 配置路径
|
||||||
|
# 当前脚本位于 ETL/verify/ 目录下,需要向上两级找到项目根目录
|
||||||
|
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||||
|
PROJECT_ROOT = os.path.dirname(os.path.dirname(CURRENT_DIR))
|
||||||
|
|
||||||
|
L1_SCRIPT = os.path.join(PROJECT_ROOT, 'ETL', 'L1A.py')
|
||||||
|
DB_PATH = os.path.join(PROJECT_ROOT, 'database', 'L1A', 'L1A.sqlite')
|
||||||
|
OUTPUT_ARENA_DIR = os.path.join(PROJECT_ROOT, 'output_arena')
|
||||||
|
|
||||||
|
def get_db_count():
|
||||||
|
"""获取数据库中的记录数"""
|
||||||
|
if not os.path.exists(DB_PATH):
|
||||||
|
return 0
|
||||||
|
try:
|
||||||
|
conn = sqlite3.connect(DB_PATH)
|
||||||
|
cursor = conn.cursor()
|
||||||
|
cursor.execute("SELECT COUNT(*) FROM raw_iframe_network")
|
||||||
|
count = cursor.fetchone()[0]
|
||||||
|
conn.close()
|
||||||
|
return count
|
||||||
|
except Exception:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
def get_file_count():
|
||||||
|
"""获取源文件总数"""
|
||||||
|
pattern = os.path.join(OUTPUT_ARENA_DIR, '*', 'iframe_network.json')
|
||||||
|
files = glob.glob(pattern)
|
||||||
|
return len(files)
|
||||||
|
|
||||||
|
def run_l1_script():
|
||||||
|
"""运行 L1 脚本并返回输出"""
|
||||||
|
# 必须在项目根目录下运行,或者正确处理 Python 路径
|
||||||
|
# 这里我们使用绝对路径调用脚本
|
||||||
|
result = subprocess.run(['python', L1_SCRIPT], capture_output=True, text=True)
|
||||||
|
return result.stdout
|
||||||
|
|
||||||
|
def main():
|
||||||
|
print("=== 开始 L1 增量逻辑测试 ===")
|
||||||
|
print(f"项目根目录: {PROJECT_ROOT}")
|
||||||
|
|
||||||
|
# 1. 检查环境
|
||||||
|
total_files = get_file_count()
|
||||||
|
initial_db_count = get_db_count()
|
||||||
|
print(f"[环境] 源文件总数: {total_files}")
|
||||||
|
print(f"[环境] 数据库当前记录数: {initial_db_count}")
|
||||||
|
|
||||||
|
# 2. 运行脚本 (第一次)
|
||||||
|
print("\n--- 运行 L1A.py (Run 1) ---")
|
||||||
|
output1 = run_l1_script()
|
||||||
|
print(output1.strip())
|
||||||
|
|
||||||
|
mid_db_count = get_db_count()
|
||||||
|
print(f"[状态] 运行后数据库记录数: {mid_db_count}")
|
||||||
|
|
||||||
|
if mid_db_count < total_files:
|
||||||
|
print("警告: 数据库记录数少于文件数,可能部分文件处理失败或尚未完成。")
|
||||||
|
|
||||||
|
# 3. 运行脚本 (第二次 - 验证增量)
|
||||||
|
print("\n--- 再次运行 L1A.py (Run 2 - 验证增量) ---")
|
||||||
|
output2 = run_l1_script()
|
||||||
|
print(output2.strip())
|
||||||
|
|
||||||
|
# 4. 验证结果
|
||||||
|
expected_msg = f"Skipped: {total_files}"
|
||||||
|
if expected_msg in output2:
|
||||||
|
print("\n✅ 测试通过! 第二次运行跳过了所有文件,增量逻辑生效。")
|
||||||
|
else:
|
||||||
|
print(f"\n❌ 测试未通过。预期输出应包含 '{expected_msg}'")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Reference in New Issue
Block a user