Files
clutch/src/etl/auto_pipeline.py

191 lines
6.5 KiB
Python
Raw Normal View History

"""
Clutch-IQ Auto Pipeline
-----------------------
This script continuously monitors the `data/demos` directory for new .dem files.
When a new file appears, it:
1. Waits for the file to be fully written (size stability check).
2. Calls `src/etl/extract_snapshots.py` to process it.
3. Deletes the source .dem file immediately after successful processing.
Usage:
python src/etl/auto_pipeline.py
Stop:
Press Ctrl+C to stop.
"""
import os
import time
import subprocess
import logging
import sys
import argparse
# Configuration
# Default to project demos folder, but can be overridden via CLI args
DEFAULT_WATCH_DIR = os.path.abspath("data/demos")
# Target processing directory
OUTPUT_DIR = os.path.abspath("data/processed")
CHECK_INTERVAL = 5 # Check every 5 seconds
STABILITY_WAIT = 2 # Wait 2 seconds to check if file size changes
EXTRACT_SCRIPT = os.path.join(os.path.dirname(__file__), "extract_snapshots.py")
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - [AutoPipeline] - %(message)s',
handlers=[logging.StreamHandler(sys.stdout)]
)
def is_file_stable(filepath, wait_seconds=2):
"""Check if file size is constant over a short period (indicates download finished)."""
try:
size1 = os.path.getsize(filepath)
time.sleep(wait_seconds)
size2 = os.path.getsize(filepath)
return size1 == size2 and size1 > 0
except OSError:
return False
def process_file(filepath):
"""Run extraction script on a single file."""
logging.info(f"Processing new file: {filepath}")
# We use subprocess to isolate memory usage and ensure clean state per file
cmd = [
sys.executable,
EXTRACT_SCRIPT,
"--demo_dir", os.path.dirname(filepath), # Temporarily point to where the file is
"--output_dir", OUTPUT_DIR,
"--delete-source" # Critical flag!
]
try:
# Note: extract_snapshots.py currently scans the whole dir.
# This is inefficient if we monitor a busy Downloads folder.
# Ideally we should pass the specific file path.
# But for now, since we only care about .dem files and we delete them, it's okay.
# However, to avoid processing other .dem files in Downloads that user might want to keep,
# we should probably move it to a temp folder first?
# Or better: Update extract_snapshots.py to accept a single file.
# For safety in "Downloads" folder scenario:
# 1. Move file to data/demos (staging area)
# 2. Process it there
staging_dir = os.path.abspath("data/demos")
if not os.path.exists(staging_dir):
os.makedirs(staging_dir)
filename = os.path.basename(filepath)
staged_path = os.path.join(staging_dir, filename)
# If we are already in data/demos, no need to move
if os.path.dirname(filepath) != staging_dir:
logging.info(f"Moving {filename} to staging area...")
try:
os.rename(filepath, staged_path)
except OSError as e:
logging.error(f"Failed to move file: {e}")
return
else:
staged_path = filepath
# Now process from staging
cmd = [
sys.executable,
EXTRACT_SCRIPT,
"--demo_dir", staging_dir,
"--output_dir", OUTPUT_DIR,
"--delete-source"
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
logging.info(f"Successfully processed batch.")
logging.info(result.stdout)
else:
logging.error(f"Processing failed with code {result.returncode}")
logging.error(result.stderr)
except Exception as e:
logging.error(f"Execution error: {e}")
import threading
def monitor_loop(monitor_dir, stop_event=None):
"""Core monitoring loop that can be run in a separate thread."""
logging.info(f"Monitoring {monitor_dir} for new .dem files...")
logging.info("Files will be MOVED to staging, PROCESSED, and then DELETED.")
while True:
if stop_event and stop_event.is_set():
logging.info("Stopping Auto Pipeline thread...")
break
# List .dem files
try:
if not os.path.exists(monitor_dir):
# Try to create it if it doesn't exist
try:
os.makedirs(monitor_dir)
except OSError:
pass
if os.path.exists(monitor_dir):
files = [f for f in os.listdir(monitor_dir) if f.endswith('.dem')]
else:
files = []
except Exception as e:
logging.error(f"Error accessing watch directory: {e}")
time.sleep(CHECK_INTERVAL)
continue
if files:
logging.info(f"Found {len(files)} files pending in {monitor_dir}...")
# Sort by creation time (process oldest first)
files.sort(key=lambda x: os.path.getctime(os.path.join(monitor_dir, x)))
for f in files:
filepath = os.path.join(monitor_dir, f)
if not os.path.exists(filepath):
continue
if is_file_stable(filepath, STABILITY_WAIT):
process_file(filepath)
else:
logging.info(f"File {f} is still being written... skipping.")
time.sleep(CHECK_INTERVAL)
def start_background_monitor(watch_dir=DEFAULT_WATCH_DIR):
"""Start the monitor in a background thread."""
monitor_thread = threading.Thread(target=monitor_loop, args=(watch_dir,), daemon=True)
monitor_thread.start()
logging.info("Auto Pipeline service started in background.")
return monitor_thread
def main():
parser = argparse.ArgumentParser(description="Auto Pipeline Monitor")
parser.add_argument("--watch-dir", default=DEFAULT_WATCH_DIR, help="Directory to monitor for .dem files (e.g. C:/Users/Name/Downloads)")
args = parser.parse_args()
monitor_dir = os.path.abspath(args.watch_dir)
if not os.path.exists(monitor_dir):
logging.warning(f"Watch directory {monitor_dir} does not exist. Creating it...")
os.makedirs(monitor_dir)
try:
monitor_loop(monitor_dir)
except KeyboardInterrupt:
logging.info("Stopping Auto Pipeline...")
if __name__ == "__main__":
main()