Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add caching #167

Merged
merged 2 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions factgenie/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
# coloredlogs.install(level="INFO", logger=logger, fmt="%(asctime)s %(levelname)s %(message)s")


def create_example_record(line, metadata, annotation_span_categories, annotation_records):
def create_example_record(line, metadata, annotation_span_categories, annotation_records, jsonl_file):
# a record is created even if there are no annotations
j = json.loads(line)

example_record = workflows.create_annotation_example_record(j)
example_record = workflows.create_annotation_example_record(j, jsonl_file)

for i, category in enumerate(annotation_span_categories):
example_record["cat_" + str(i)] = 0
Expand Down Expand Up @@ -67,7 +67,7 @@ def load_annotations_for_campaign(campaign):
annotation_index += annotation_records

example_record = create_example_record(
line, campaign.metadata, annotation_span_categories, annotation_records
line, campaign.metadata, annotation_span_categories, annotation_records, jsonl_file
)
example_index.append(example_record)
except Exception as e:
Expand Down
2 changes: 2 additions & 0 deletions factgenie/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
app = Flask("factgenie", template_folder=TEMPLATES_DIR, static_folder=STATIC_DIR)
app.db = {}
app.db["annotation_index"] = None
app.db["annotation_index_cache"] = {}
app.db["output_index"] = None
app.db["output_index_cache"] = {}
app.db["lock"] = threading.Lock()
app.db["running_campaigns"] = set()
app.db["announcers"] = {}
Expand Down
171 changes: 110 additions & 61 deletions factgenie/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import zipfile
import traceback
import tempfile

import factgenie.utils as utils

from io import BytesIO
Expand Down Expand Up @@ -189,32 +188,18 @@ def generate_campaign_index(app, force_reload=True):
return app.db["campaign_index"]


def load_annotations_for_campaign(subdir):
def load_annotations_from_file(file_path):
annotations_campaign = []

# find metadata for the campaign
metadata_path = CAMPAIGN_DIR / subdir / "metadata.json"
if not metadata_path.exists():
return []

with open(metadata_path) as f:
metadata = json.load(f)

if metadata["mode"] == CampaignMode.HIDDEN or metadata["mode"] == CampaignMode.LLM_GEN:
return []

jsonl_files = (CAMPAIGN_DIR / subdir / "files").glob("*.jsonl")

for jsonl_file in jsonl_files:
with open(jsonl_file) as f:
for line in f:
annotation_records = load_annotations_from_record(line)
annotations_campaign.append(annotation_records[0])
with open(file_path) as f:
for line in f:
annotation_records = load_annotations_from_record(line, jsonl_file=file_path)
annotations_campaign.append(annotation_records[0])

return annotations_campaign


def create_annotation_example_record(j):
def create_annotation_example_record(j, jsonl_file):
return {
"annotation_span_categories": j["metadata"]["annotation_span_categories"],
"annotator_id": j["metadata"]["annotator_id"],
Expand All @@ -227,14 +212,15 @@ def create_annotation_example_record(j):
"flags": j.get("flags", []),
"options": j.get("options", []),
"text_fields": j.get("text_fields", []),
"jsonl_file": jsonl_file,
}


def load_annotations_from_record(line, split_spans=False):
def load_annotations_from_record(line, jsonl_file, split_spans=False):
j = json.loads(line)
annotation_records = []

record = create_annotation_example_record(j)
record = create_annotation_example_record(j, jsonl_file)

if split_spans:
for annotation in j["annotations"]:
Expand All @@ -250,27 +236,65 @@ def load_annotations_from_record(line, split_spans=False):
return annotation_records


def get_annotation_files():
"""Get dictionary of annotation JSONL files and their modification times"""
files_dict = {}
for jsonl_file in Path(CAMPAIGN_DIR).rglob("*.jsonl"):
campaign_dir = jsonl_file.parent.parent

# find metadata for the campaign
metadata_path = campaign_dir / "metadata.json"
if not metadata_path.exists():
continue

with open(metadata_path) as f:
metadata = json.load(f)

if metadata["mode"] == CampaignMode.HIDDEN or metadata["mode"] == CampaignMode.LLM_GEN:
continue

files_dict[str(jsonl_file)] = jsonl_file.stat().st_mtime

return files_dict


def remove_annotations(app, file_path):
"""Remove annotations from the annotation index for a specific file"""
if app.db["annotation_index"] is not None:
# Filter out annotations from the specified file
app.db["annotation_index"] = app.db["annotation_index"][app.db["annotation_index"]["jsonl_file"] != file_path]


def get_annotation_index(app, force_reload=True):
if app and app.db["annotation_index"] is not None and not force_reload:
return app.db["annotation_index"]

logger.debug("Reloading annotation index")

# contains annotations for each generated output
annotations = []
# Get current files and their modification times
current_files = get_annotation_files()
cached_files = app.db.get("annotation_index_cache", {})
new_annotations = []

# for all subdirectories in CAMPAIGN_DIR, load content of all the jsonl files
for subdir in os.listdir(CAMPAIGN_DIR):
try:
annotations += load_annotations_for_campaign(subdir)
except:
traceback.print_exc()
logger.error(f"Error while loading annotations for {subdir}")
# Handle modified files
for file_path, mod_time in current_files.items():
if file_path not in cached_files or cached_files[file_path] < mod_time:
remove_annotations(app, file_path)
new_annotations.extend(load_annotations_from_file(file_path))

# Handle deleted files
for file_path in set(cached_files.keys()) - set(current_files.keys()):
remove_annotations(app, file_path)

annotation_index = pd.DataFrame.from_records(annotations)
app.db["annotation_index"] = annotation_index
# Update the cache
app.db["annotation_index_cache"] = current_files

return annotation_index
if app.db["annotation_index"] is None:
app.db["annotation_index"] = pd.DataFrame.from_records(new_annotations)
else:
app.db["annotation_index"] = pd.concat([app.db["annotation_index"], pd.DataFrame.from_records(new_annotations)])

return app.db["annotation_index"]


def get_annotations(app, dataset_id, split, example_idx, setup_id):
Expand All @@ -292,45 +316,70 @@ def get_annotations(app, dataset_id, split, example_idx, setup_id):
return annotations.to_dict(orient="records")


def get_output_files():
"""Get dictionary of annotation JSONL files and their modification times"""
files_dict = {}
for jsonl_file in Path(OUTPUT_DIR).rglob("*.jsonl"):
files_dict[str(jsonl_file)] = jsonl_file.stat().st_mtime

return files_dict


def load_outputs_from_file(file_path, cols):
outputs = []

with open(file_path) as f:
for line_num, line in enumerate(f):
try:
j = json.loads(line)

for key in ["dataset", "split", "setup_id"]:
j[key] = slugify(j[key])

# drop any keys that are not in the key set
j = {k: v for k, v in j.items() if k in cols}
outputs.append(j)
except Exception as e:
logger.error(
f"Error parsing output file {file_path} at line {line_num + 1}:\n\t{e.__class__.__name__}: {e}"
)

return outputs


def get_output_index(app, force_reload=True):
if hasattr(app, "db") and app.db["output_index"] is not None and not force_reload:
return app.db["output_index"]

logger.debug("Reloading output index")

outputs = []
cols = ["dataset", "split", "setup_id", "example_idx", "output"]

# find recursively all JSONL files in the output directory
outs = list(Path(OUTPUT_DIR).rglob("*.jsonl"))
current_outs = get_output_files()
cached_outs = app.db.get("output_index_cache", {})
new_outputs = []

for out in outs:
with open(out) as f:
for line_num, line in enumerate(f):
try:
j = json.loads(line)

for key in ["dataset", "split", "setup_id"]:
j[key] = slugify(j[key])
# Handle modified files
for file_path, mod_time in current_outs.items():
if file_path not in cached_outs or cached_outs[file_path] < mod_time:
new_outputs.extend(load_outputs_from_file(file_path, cols))

# drop any keys that are not in the key set
j = {k: v for k, v in j.items() if k in cols}
# Handle deleted files
for file_path in set(cached_outs.keys()) - set(current_outs.keys()):
# Remove outputs for deleted files from the index
if app.db["output_index"] is not None:
file_mask = app.db["output_index"]["file_path"] == file_path
app.db["output_index"] = app.db["output_index"][~file_mask]

outputs.append(j)
except Exception as e:
logger.error(
f"Error parsing output file {out} at line {line_num + 1}:\n\t{e.__class__.__name__}: {e}"
)

if outputs:
output_index = pd.DataFrame.from_records(outputs)
else:
output_index = pd.DataFrame(columns=cols)
# Update the cache
app.db["output_index_cache"] = current_outs

if app:
app.db["output_index"] = output_index
if new_outputs:
app.db["output_index"] = pd.concat([app.db["output_index"], pd.DataFrame.from_records(new_outputs)])
elif app.db["output_index"] is None:
app.db["output_index"] = pd.DataFrame(columns=cols)

return output_index
return app.db["output_index"]


def export_campaign_outputs(campaign_id):
Expand Down
Loading