244 lines
8.4 KiB
Python
244 lines
8.4 KiB
Python
import json
|
|
import os
|
|
from pathlib import Path
|
|
from urllib.parse import urlparse
|
|
from collections import defaultdict
|
|
from .rules import is_ignored_url, get_key_mask, get_value_type
|
|
|
|
class SchemaExtractor:
|
|
def __init__(self):
|
|
# schemas: category -> schema_node
|
|
self.schemas = {}
|
|
self.url_counts = defaultdict(int)
|
|
|
|
def get_url_category(self, url):
|
|
"""
|
|
Derives a category name from the URL.
|
|
"""
|
|
parsed = urlparse(url)
|
|
path = parsed.path
|
|
parts = path.strip('/').split('/')
|
|
cleaned_parts = []
|
|
for p in parts:
|
|
# Mask Match IDs (e.g., g161-...)
|
|
if p.startswith('g161-'):
|
|
cleaned_parts.append('{match_id}')
|
|
# Mask other long numeric IDs
|
|
elif p.isdigit() and len(p) > 4:
|
|
cleaned_parts.append('{id}')
|
|
else:
|
|
cleaned_parts.append(p)
|
|
|
|
category = "/".join(cleaned_parts)
|
|
if not category:
|
|
category = "root"
|
|
return category
|
|
|
|
def process_directory(self, root_dir):
|
|
"""
|
|
Iterates over all iframe_network.json files in the directory.
|
|
"""
|
|
p = Path(root_dir)
|
|
# Use rglob to find all iframe_network.json files
|
|
files = list(p.rglob("iframe_network.json"))
|
|
print(f"Found {len(files)} files to process.")
|
|
|
|
for i, filepath in enumerate(files):
|
|
if i % 10 == 0:
|
|
print(f"Processing {i}/{len(files)}: {filepath}")
|
|
self.process_file(filepath)
|
|
|
|
def process_file(self, filepath):
|
|
try:
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
except Exception as e:
|
|
# print(f"Error reading {filepath}: {e}")
|
|
return
|
|
|
|
if not isinstance(data, list):
|
|
return
|
|
|
|
for entry in data:
|
|
url = entry.get('url', '')
|
|
if not url or is_ignored_url(url):
|
|
continue
|
|
|
|
status = entry.get('status')
|
|
if status != 200:
|
|
continue
|
|
|
|
body = entry.get('body')
|
|
# Skip empty bodies or bodies that are just empty dicts if that's not useful
|
|
if not body:
|
|
continue
|
|
|
|
category = self.get_url_category(url)
|
|
self.url_counts[category] += 1
|
|
|
|
if category not in self.schemas:
|
|
self.schemas[category] = None
|
|
|
|
self.schemas[category] = self.merge_value(self.schemas[category], body)
|
|
|
|
def merge_value(self, schema, value):
|
|
"""
|
|
Merges a value into the existing schema.
|
|
"""
|
|
val_type = get_value_type(value)
|
|
|
|
if schema is None:
|
|
schema = {
|
|
"types": {val_type},
|
|
"count": 1
|
|
}
|
|
else:
|
|
schema["count"] += 1
|
|
schema["types"].add(val_type)
|
|
|
|
# Handle Dicts
|
|
if isinstance(value, dict):
|
|
if "properties" not in schema:
|
|
schema["properties"] = {}
|
|
|
|
for k, v in value.items():
|
|
masked_key = get_key_mask(k)
|
|
schema["properties"][masked_key] = self.merge_value(
|
|
schema["properties"].get(masked_key),
|
|
v
|
|
)
|
|
|
|
# Handle Lists
|
|
elif isinstance(value, list):
|
|
if "items" not in schema:
|
|
schema["items"] = None
|
|
|
|
for item in value:
|
|
schema["items"] = self.merge_value(schema["items"], item)
|
|
|
|
# Handle Primitives (Capture examples if needed, currently just tracking types)
|
|
else:
|
|
if "examples" not in schema:
|
|
schema["examples"] = set()
|
|
if len(schema["examples"]) < 5:
|
|
# Store string representation to avoid type issues in set
|
|
schema["examples"].add(str(value))
|
|
|
|
return schema
|
|
|
|
def to_serializable(self, schema):
|
|
"""
|
|
Converts the internal schema structure (with sets) to a JSON-serializable format.
|
|
"""
|
|
if schema is None:
|
|
return None
|
|
|
|
res = {
|
|
"types": list(sorted(schema["types"])),
|
|
"count": schema["count"]
|
|
}
|
|
|
|
if "properties" in schema:
|
|
res["properties"] = {
|
|
k: self.to_serializable(v)
|
|
for k, v in sorted(schema["properties"].items())
|
|
}
|
|
|
|
if "items" in schema:
|
|
res["items"] = self.to_serializable(schema["items"])
|
|
|
|
if "examples" in schema:
|
|
res["examples"] = list(sorted(schema["examples"]))
|
|
|
|
return res
|
|
|
|
def export_report(self, output_path):
|
|
report = {}
|
|
for category, schema in self.schemas.items():
|
|
report[category] = self.to_serializable(schema)
|
|
|
|
with open(output_path, 'w', encoding='utf-8') as f:
|
|
json.dump(report, f, indent=2, ensure_ascii=False)
|
|
print(f"Report saved to {output_path}")
|
|
|
|
def export_markdown_summary(self, output_path):
|
|
"""
|
|
Generates a Markdown summary of the hierarchy.
|
|
"""
|
|
with open(output_path, 'w', encoding='utf-8') as f:
|
|
f.write("# Schema Hierarchy Report\n\n")
|
|
|
|
for category, schema in sorted(self.schemas.items()):
|
|
f.write(f"## Category: `{category}`\n")
|
|
f.write(f"**Total Requests**: {self.url_counts[category]}\n\n")
|
|
|
|
self._write_markdown_schema(f, schema, level=0)
|
|
f.write("\n---\n\n")
|
|
print(f"Markdown summary saved to {output_path}")
|
|
|
|
def export_csv_summary(self, output_path):
|
|
"""
|
|
Generates a CSV summary of the flattened schema.
|
|
"""
|
|
import csv
|
|
with open(output_path, 'w', encoding='utf-8', newline='') as f:
|
|
writer = csv.writer(f)
|
|
writer.writerow(["Category", "Path", "Types", "Examples"])
|
|
|
|
for category, schema in sorted(self.schemas.items()):
|
|
self._write_csv_schema(writer, category, schema, path="")
|
|
print(f"CSV summary saved to {output_path}")
|
|
|
|
def _write_csv_schema(self, writer, category, schema, path):
|
|
if schema is None:
|
|
return
|
|
|
|
current_types = list(sorted(schema["types"]))
|
|
type_str = ", ".join(map(str, current_types))
|
|
|
|
# If it's a leaf or has no properties/items
|
|
is_leaf = "properties" not in schema and "items" not in schema
|
|
|
|
if is_leaf:
|
|
examples = list(schema.get("examples", []))
|
|
ex_str = "; ".join(examples[:3]) if examples else ""
|
|
writer.writerow([category, path, type_str, ex_str])
|
|
|
|
if "properties" in schema:
|
|
for k, v in schema["properties"].items():
|
|
new_path = f"{path}.{k}" if path else k
|
|
self._write_csv_schema(writer, category, v, new_path)
|
|
|
|
if "items" in schema:
|
|
new_path = f"{path}[]"
|
|
self._write_csv_schema(writer, category, schema["items"], new_path)
|
|
|
|
def _write_markdown_schema(self, f, schema, level=0):
|
|
if schema is None:
|
|
return
|
|
|
|
indent = " " * level
|
|
types = schema["types"]
|
|
type_str = ", ".join([str(t) for t in types])
|
|
|
|
# If it's a leaf (no props, no items)
|
|
if "properties" not in schema and "items" not in schema:
|
|
# Show examples
|
|
examples = schema.get("examples", [])
|
|
ex_str = f" (e.g., {', '.join(list(examples)[:3])})" if examples else ""
|
|
return # We handle leaf printing in the parent loop for keys, or here if it's a root/list item
|
|
|
|
if "properties" in schema:
|
|
for k, v in schema["properties"].items():
|
|
v_types = ", ".join(list(sorted(v["types"])))
|
|
v_ex = list(v.get("examples", []))
|
|
v_ex_str = f", e.g. {v_ex[0]}" if v_ex and "dict" not in v["types"] and "list" not in v["types"] else ""
|
|
|
|
f.write(f"{indent}- **{k}** ({v_types}{v_ex_str})\n")
|
|
self._write_markdown_schema(f, v, level + 1)
|
|
|
|
if "items" in schema:
|
|
f.write(f"{indent}- *[Array Items]*\n")
|
|
self._write_markdown_schema(f, schema["items"], level + 1)
|
|
|