import os import sqlite3 import subprocess import glob # 配置路径 # 当前脚本位于 ETL/verify/ 目录下,需要向上两级找到项目根目录 CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) PROJECT_ROOT = os.path.dirname(os.path.dirname(CURRENT_DIR)) L1_SCRIPT = os.path.join(PROJECT_ROOT, 'ETL', 'L1A.py') DB_PATH = os.path.join(PROJECT_ROOT, 'database', 'L1A', 'L1A.sqlite') OUTPUT_ARENA_DIR = os.path.join(PROJECT_ROOT, 'output_arena') def get_db_count(): """获取数据库中的记录数""" if not os.path.exists(DB_PATH): return 0 try: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM raw_iframe_network") count = cursor.fetchone()[0] conn.close() return count except Exception: return 0 def get_file_count(): """获取源文件总数""" pattern = os.path.join(OUTPUT_ARENA_DIR, '*', 'iframe_network.json') files = glob.glob(pattern) return len(files) def run_l1_script(): """运行 L1 脚本并返回输出""" # 必须在项目根目录下运行,或者正确处理 Python 路径 # 这里我们使用绝对路径调用脚本 result = subprocess.run(['python', L1_SCRIPT], capture_output=True, text=True) return result.stdout def main(): print("=== 开始 L1 增量逻辑测试 ===") print(f"项目根目录: {PROJECT_ROOT}") # 1. 检查环境 total_files = get_file_count() initial_db_count = get_db_count() print(f"[环境] 源文件总数: {total_files}") print(f"[环境] 数据库当前记录数: {initial_db_count}") # 2. 运行脚本 (第一次) print("\n--- 运行 L1A.py (Run 1) ---") output1 = run_l1_script() print(output1.strip()) mid_db_count = get_db_count() print(f"[状态] 运行后数据库记录数: {mid_db_count}") if mid_db_count < total_files: print("警告: 数据库记录数少于文件数,可能部分文件处理失败或尚未完成。") # 3. 运行脚本 (第二次 - 验证增量) print("\n--- 再次运行 L1A.py (Run 2 - 验证增量) ---") output2 = run_l1_script() print(output2.strip()) # 4. 验证结果 expected_msg = f"Skipped: {total_files}" if expected_msg in output2: print("\n✅ 测试通过! 第二次运行跳过了所有文件,增量逻辑生效。") else: print(f"\n❌ 测试未通过。预期输出应包含 '{expected_msg}'") if __name__ == "__main__": main()