import json import os from pathlib import Path from urllib.parse import urlparse from collections import defaultdict from .rules import is_ignored_url, get_key_mask, get_value_type class SchemaExtractor: def __init__(self): # schemas: category -> schema_node self.schemas = {} self.url_counts = defaultdict(int) def get_url_category(self, url): """ Derives a category name from the URL. """ parsed = urlparse(url) path = parsed.path parts = path.strip('/').split('/') cleaned_parts = [] for p in parts: # Mask Match IDs (e.g., g161-...) if p.startswith('g161-'): cleaned_parts.append('{match_id}') # Mask other long numeric IDs elif p.isdigit() and len(p) > 4: cleaned_parts.append('{id}') else: cleaned_parts.append(p) category = "/".join(cleaned_parts) if not category: category = "root" return category def process_directory(self, root_dir): """ Iterates over all iframe_network.json files in the directory. """ p = Path(root_dir) # Use rglob to find all iframe_network.json files files = list(p.rglob("iframe_network.json")) print(f"Found {len(files)} files to process.") for i, filepath in enumerate(files): if i % 10 == 0: print(f"Processing {i}/{len(files)}: {filepath}") self.process_file(filepath) def process_file(self, filepath): try: with open(filepath, 'r', encoding='utf-8') as f: data = json.load(f) except Exception as e: # print(f"Error reading {filepath}: {e}") return if not isinstance(data, list): return for entry in data: url = entry.get('url', '') if not url or is_ignored_url(url): continue status = entry.get('status') if status != 200: continue body = entry.get('body') # Skip empty bodies or bodies that are just empty dicts if that's not useful if not body: continue category = self.get_url_category(url) self.url_counts[category] += 1 if category not in self.schemas: self.schemas[category] = None self.schemas[category] = self.merge_value(self.schemas[category], body) def merge_value(self, schema, value): """ Merges a value into the existing schema. """ val_type = get_value_type(value) if schema is None: schema = { "types": {val_type}, "count": 1 } else: schema["count"] += 1 schema["types"].add(val_type) # Handle Dicts if isinstance(value, dict): if "properties" not in schema: schema["properties"] = {} for k, v in value.items(): masked_key = get_key_mask(k) schema["properties"][masked_key] = self.merge_value( schema["properties"].get(masked_key), v ) # Handle Lists elif isinstance(value, list): if "items" not in schema: schema["items"] = None for item in value: schema["items"] = self.merge_value(schema["items"], item) # Handle Primitives (Capture examples if needed, currently just tracking types) else: if "examples" not in schema: schema["examples"] = set() if len(schema["examples"]) < 5: # Store string representation to avoid type issues in set schema["examples"].add(str(value)) return schema def to_serializable(self, schema): """ Converts the internal schema structure (with sets) to a JSON-serializable format. """ if schema is None: return None res = { "types": list(sorted(schema["types"])), "count": schema["count"] } if "properties" in schema: res["properties"] = { k: self.to_serializable(v) for k, v in sorted(schema["properties"].items()) } if "items" in schema: res["items"] = self.to_serializable(schema["items"]) if "examples" in schema: res["examples"] = list(sorted(schema["examples"])) return res def export_report(self, output_path): report = {} for category, schema in self.schemas.items(): report[category] = self.to_serializable(schema) with open(output_path, 'w', encoding='utf-8') as f: json.dump(report, f, indent=2, ensure_ascii=False) print(f"Report saved to {output_path}") def export_markdown_summary(self, output_path): """ Generates a Markdown summary of the hierarchy. """ with open(output_path, 'w', encoding='utf-8') as f: f.write("# Schema Hierarchy Report\n\n") for category, schema in sorted(self.schemas.items()): f.write(f"## Category: `{category}`\n") f.write(f"**Total Requests**: {self.url_counts[category]}\n\n") self._write_markdown_schema(f, schema, level=0) f.write("\n---\n\n") print(f"Markdown summary saved to {output_path}") def export_csv_summary(self, output_path): """ Generates a CSV summary of the flattened schema. """ import csv with open(output_path, 'w', encoding='utf-8', newline='') as f: writer = csv.writer(f) writer.writerow(["Category", "Path", "Types", "Examples"]) for category, schema in sorted(self.schemas.items()): self._write_csv_schema(writer, category, schema, path="") print(f"CSV summary saved to {output_path}") def _write_csv_schema(self, writer, category, schema, path): if schema is None: return current_types = list(sorted(schema["types"])) type_str = ", ".join(map(str, current_types)) # If it's a leaf or has no properties/items is_leaf = "properties" not in schema and "items" not in schema if is_leaf: examples = list(schema.get("examples", [])) ex_str = "; ".join(examples[:3]) if examples else "" writer.writerow([category, path, type_str, ex_str]) if "properties" in schema: for k, v in schema["properties"].items(): new_path = f"{path}.{k}" if path else k self._write_csv_schema(writer, category, v, new_path) if "items" in schema: new_path = f"{path}[]" self._write_csv_schema(writer, category, schema["items"], new_path) def _write_markdown_schema(self, f, schema, level=0): if schema is None: return indent = " " * level types = schema["types"] type_str = ", ".join([str(t) for t in types]) # If it's a leaf (no props, no items) if "properties" not in schema and "items" not in schema: # Show examples examples = schema.get("examples", []) ex_str = f" (e.g., {', '.join(list(examples)[:3])})" if examples else "" return # We handle leaf printing in the parent loop for keys, or here if it's a root/list item if "properties" in schema: for k, v in schema["properties"].items(): v_types = ", ".join(list(sorted(v["types"]))) v_ex = list(v.get("examples", [])) v_ex_str = f", e.g. {v_ex[0]}" if v_ex and "dict" not in v["types"] and "list" not in v["types"] else "" f.write(f"{indent}- **{k}** ({v_types}{v_ex_str})\n") self._write_markdown_schema(f, v, level + 1) if "items" in schema: f.write(f"{indent}- *[Array Items]*\n") self._write_markdown_schema(f, schema["items"], level + 1)