diff --git a/factgenie/analysis.py b/factgenie/analysis.py index f576025..52ae9eb 100644 --- a/factgenie/analysis.py +++ b/factgenie/analysis.py @@ -25,11 +25,11 @@ # coloredlogs.install(level="INFO", logger=logger, fmt="%(asctime)s %(levelname)s %(message)s") -def create_example_record(line, metadata, annotation_span_categories, annotation_records): +def create_example_record(line, metadata, annotation_span_categories, annotation_records, jsonl_file): # a record is created even if there are no annotations j = json.loads(line) - example_record = workflows.create_annotation_example_record(j) + example_record = workflows.create_annotation_example_record(j, jsonl_file) for i, category in enumerate(annotation_span_categories): example_record["cat_" + str(i)] = 0 @@ -67,7 +67,7 @@ def load_annotations_for_campaign(campaign): annotation_index += annotation_records example_record = create_example_record( - line, campaign.metadata, annotation_span_categories, annotation_records + line, campaign.metadata, annotation_span_categories, annotation_records, jsonl_file ) example_index.append(example_record) except Exception as e: diff --git a/factgenie/app.py b/factgenie/app.py index fdc9292..a50ab51 100644 --- a/factgenie/app.py +++ b/factgenie/app.py @@ -36,7 +36,9 @@ app = Flask("factgenie", template_folder=TEMPLATES_DIR, static_folder=STATIC_DIR) app.db = {} app.db["annotation_index"] = None +app.db["annotation_index_cache"] = {} app.db["output_index"] = None +app.db["output_index_cache"] = {} app.db["lock"] = threading.Lock() app.db["running_campaigns"] = set() app.db["announcers"] = {} diff --git a/factgenie/workflows.py b/factgenie/workflows.py index 90783e9..b05c117 100644 --- a/factgenie/workflows.py +++ b/factgenie/workflows.py @@ -13,7 +13,6 @@ import zipfile import traceback import tempfile - import factgenie.utils as utils from io import BytesIO @@ -189,32 +188,18 @@ def generate_campaign_index(app, force_reload=True): return app.db["campaign_index"] -def load_annotations_for_campaign(subdir): +def load_annotations_from_file(file_path): annotations_campaign = [] - # find metadata for the campaign - metadata_path = CAMPAIGN_DIR / subdir / "metadata.json" - if not metadata_path.exists(): - return [] - - with open(metadata_path) as f: - metadata = json.load(f) - - if metadata["mode"] == CampaignMode.HIDDEN or metadata["mode"] == CampaignMode.LLM_GEN: - return [] - - jsonl_files = (CAMPAIGN_DIR / subdir / "files").glob("*.jsonl") - - for jsonl_file in jsonl_files: - with open(jsonl_file) as f: - for line in f: - annotation_records = load_annotations_from_record(line) - annotations_campaign.append(annotation_records[0]) + with open(file_path) as f: + for line in f: + annotation_records = load_annotations_from_record(line, jsonl_file=file_path) + annotations_campaign.append(annotation_records[0]) return annotations_campaign -def create_annotation_example_record(j): +def create_annotation_example_record(j, jsonl_file): return { "annotation_span_categories": j["metadata"]["annotation_span_categories"], "annotator_id": j["metadata"]["annotator_id"], @@ -227,14 +212,15 @@ def create_annotation_example_record(j): "flags": j.get("flags", []), "options": j.get("options", []), "text_fields": j.get("text_fields", []), + "jsonl_file": jsonl_file, } -def load_annotations_from_record(line, split_spans=False): +def load_annotations_from_record(line, jsonl_file, split_spans=False): j = json.loads(line) annotation_records = [] - record = create_annotation_example_record(j) + record = create_annotation_example_record(j, jsonl_file) if split_spans: for annotation in j["annotations"]: @@ -250,27 +236,65 @@ def load_annotations_from_record(line, split_spans=False): return annotation_records +def get_annotation_files(): + """Get dictionary of annotation JSONL files and their modification times""" + files_dict = {} + for jsonl_file in Path(CAMPAIGN_DIR).rglob("*.jsonl"): + campaign_dir = jsonl_file.parent.parent + + # find metadata for the campaign + metadata_path = campaign_dir / "metadata.json" + if not metadata_path.exists(): + continue + + with open(metadata_path) as f: + metadata = json.load(f) + + if metadata["mode"] == CampaignMode.HIDDEN or metadata["mode"] == CampaignMode.LLM_GEN: + continue + + files_dict[str(jsonl_file)] = jsonl_file.stat().st_mtime + + return files_dict + + +def remove_annotations(app, file_path): + """Remove annotations from the annotation index for a specific file""" + if app.db["annotation_index"] is not None: + # Filter out annotations from the specified file + app.db["annotation_index"] = app.db["annotation_index"][app.db["annotation_index"]["jsonl_file"] != file_path] + + def get_annotation_index(app, force_reload=True): if app and app.db["annotation_index"] is not None and not force_reload: return app.db["annotation_index"] logger.debug("Reloading annotation index") - # contains annotations for each generated output - annotations = [] + # Get current files and their modification times + current_files = get_annotation_files() + cached_files = app.db.get("annotation_index_cache", {}) + new_annotations = [] - # for all subdirectories in CAMPAIGN_DIR, load content of all the jsonl files - for subdir in os.listdir(CAMPAIGN_DIR): - try: - annotations += load_annotations_for_campaign(subdir) - except: - traceback.print_exc() - logger.error(f"Error while loading annotations for {subdir}") + # Handle modified files + for file_path, mod_time in current_files.items(): + if file_path not in cached_files or cached_files[file_path] < mod_time: + remove_annotations(app, file_path) + new_annotations.extend(load_annotations_from_file(file_path)) + + # Handle deleted files + for file_path in set(cached_files.keys()) - set(current_files.keys()): + remove_annotations(app, file_path) - annotation_index = pd.DataFrame.from_records(annotations) - app.db["annotation_index"] = annotation_index + # Update the cache + app.db["annotation_index_cache"] = current_files - return annotation_index + if app.db["annotation_index"] is None: + app.db["annotation_index"] = pd.DataFrame.from_records(new_annotations) + else: + app.db["annotation_index"] = pd.concat([app.db["annotation_index"], pd.DataFrame.from_records(new_annotations)]) + + return app.db["annotation_index"] def get_annotations(app, dataset_id, split, example_idx, setup_id): @@ -292,45 +316,70 @@ def get_annotations(app, dataset_id, split, example_idx, setup_id): return annotations.to_dict(orient="records") +def get_output_files(): + """Get dictionary of annotation JSONL files and their modification times""" + files_dict = {} + for jsonl_file in Path(OUTPUT_DIR).rglob("*.jsonl"): + files_dict[str(jsonl_file)] = jsonl_file.stat().st_mtime + + return files_dict + + +def load_outputs_from_file(file_path, cols): + outputs = [] + + with open(file_path) as f: + for line_num, line in enumerate(f): + try: + j = json.loads(line) + + for key in ["dataset", "split", "setup_id"]: + j[key] = slugify(j[key]) + + # drop any keys that are not in the key set + j = {k: v for k, v in j.items() if k in cols} + outputs.append(j) + except Exception as e: + logger.error( + f"Error parsing output file {file_path} at line {line_num + 1}:\n\t{e.__class__.__name__}: {e}" + ) + + return outputs + + def get_output_index(app, force_reload=True): if hasattr(app, "db") and app.db["output_index"] is not None and not force_reload: return app.db["output_index"] logger.debug("Reloading output index") - outputs = [] cols = ["dataset", "split", "setup_id", "example_idx", "output"] - # find recursively all JSONL files in the output directory - outs = list(Path(OUTPUT_DIR).rglob("*.jsonl")) + current_outs = get_output_files() + cached_outs = app.db.get("output_index_cache", {}) + new_outputs = [] - for out in outs: - with open(out) as f: - for line_num, line in enumerate(f): - try: - j = json.loads(line) - - for key in ["dataset", "split", "setup_id"]: - j[key] = slugify(j[key]) + # Handle modified files + for file_path, mod_time in current_outs.items(): + if file_path not in cached_outs or cached_outs[file_path] < mod_time: + new_outputs.extend(load_outputs_from_file(file_path, cols)) - # drop any keys that are not in the key set - j = {k: v for k, v in j.items() if k in cols} + # Handle deleted files + for file_path in set(cached_outs.keys()) - set(current_outs.keys()): + # Remove outputs for deleted files from the index + if app.db["output_index"] is not None: + file_mask = app.db["output_index"]["file_path"] == file_path + app.db["output_index"] = app.db["output_index"][~file_mask] - outputs.append(j) - except Exception as e: - logger.error( - f"Error parsing output file {out} at line {line_num + 1}:\n\t{e.__class__.__name__}: {e}" - ) - - if outputs: - output_index = pd.DataFrame.from_records(outputs) - else: - output_index = pd.DataFrame(columns=cols) + # Update the cache + app.db["output_index_cache"] = current_outs - if app: - app.db["output_index"] = output_index + if new_outputs: + app.db["output_index"] = pd.concat([app.db["output_index"], pd.DataFrame.from_records(new_outputs)]) + elif app.db["output_index"] is None: + app.db["output_index"] = pd.DataFrame(columns=cols) - return output_index + return app.db["output_index"] def export_campaign_outputs(campaign_id):