Files
clutch/database/L1/L1_Builder.py
2026-02-05 23:26:03 +08:00

102 lines
3.1 KiB
Python

"""
L1A Data Ingestion Script
This script reads raw JSON files from the 'output_arena' directory and ingests them into the SQLite database.
It supports incremental updates by default, skipping files that have already been processed.
Usage:
python ETL/L1A.py # Standard incremental run
python ETL/L1A.py --force # Force re-process all files (overwrite existing data)
"""
import os
import json
import sqlite3
import glob
import argparse # Added
# Paths
BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
OUTPUT_ARENA_DIR = os.path.join(BASE_DIR, 'output_arena')
DB_DIR = os.path.join(BASE_DIR, 'database', 'L1')
DB_PATH = os.path.join(DB_DIR, 'L1.db')
def init_db():
if not os.path.exists(DB_DIR):
os.makedirs(DB_DIR)
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS raw_iframe_network (
match_id TEXT PRIMARY KEY,
content TEXT,
processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
return conn
def process_files():
parser = argparse.ArgumentParser()
parser.add_argument('--force', action='store_true', help='Force reprocessing of all files')
args = parser.parse_args()
conn = init_db()
cursor = conn.cursor()
# Get existing match_ids to skip
existing_ids = set()
if not args.force:
try:
cursor.execute("SELECT match_id FROM raw_iframe_network")
existing_ids = set(row[0] for row in cursor.fetchall())
print(f"Found {len(existing_ids)} existing matches in DB. Incremental mode active.")
except Exception as e:
print(f"Error checking existing data: {e}")
# Pattern to match all iframe_network.json files
# output_arena/*/iframe_network.json
pattern = os.path.join(OUTPUT_ARENA_DIR, '*', 'iframe_network.json')
files = glob.glob(pattern)
print(f"Found {len(files)} files in directory.")
count = 0
skipped = 0
for file_path in files:
try:
# Extract match_id from directory name
# file_path is like .../output_arena/g161-xxx/iframe_network.json
parent_dir = os.path.dirname(file_path)
match_id = os.path.basename(parent_dir)
if match_id in existing_ids:
skipped += 1
continue
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Upsert data
cursor.execute('''
INSERT OR REPLACE INTO raw_iframe_network (match_id, content)
VALUES (?, ?)
''', (match_id, content))
count += 1
if count % 100 == 0:
print(f"Processed {count} files...")
conn.commit()
except Exception as e:
print(f"Error processing {file_path}: {e}")
conn.commit()
conn.close()
print(f"Finished. Processed: {count}, Skipped: {skipped}.")
if __name__ == '__main__':
process_files()