Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

I/O Stack Simplification and Optimization #430

Merged
merged 17 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@
can be tweaked with the `numHighlightingThreads` and `maxQueuedPerThread` attributes on the
`OcrHighlightComponent` in `solrconfig.xml`.
- Removed `PageCacheWarmer`, no longer needed due to multithreading support.
- Completely refactored, simplified and optimized I/O stack to reduce number of file system reads
and allocations/data copies during highlighting, accounting for a significant performance improvement
over previous versions (4-8 times faster in a synthetic benchmark that was not I/O-bound)
- We no longer memory-map files for reading. Benchmarking revealed that it did not improve performance
with the new I/O stack (probably due to the reduced amount of actual reads), on the contrary,
performance was improved for many concurrent queries. A huge drawback of the memory-mapped approach
was that in the presence of I/O errors like disappearing mounts, truncated files, etc, the JVM could
simply crash (due to the kernel sending a `SIGBUS` signal when encountering an I/O error).


## 0.8.5 (2024-04-25)
Expand Down
12 changes: 12 additions & 0 deletions docs/performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@ Generally speaking, local storage is better than remote storage (like NFS or CIF
flash-based storage is better than disk-based storage, due to the lower random read latency and the possibility to
do parallel reads. A RAID1/10 setup is preferred over a RAID0/JBOD setup, due to the increased potential for parallel reads.

When building passages during highlighting (i.e. determining where a snippet starts and ends), the plugin reads
the OCR files in aligned sections and caches these to reduce the number of reads and allocations. The bigger
the cache size, the more data is read from the disk, i.e. the chances of cache hits increase. However, this
comes at the cost of more memory usage and more allocations in the JVM, which can have a performance impact.
By default, the plugin uses a section size of 8KiB with a maximum number of cached sections of 10,
which is a good trade-off for most setups and performed well in our benchmarks. If you want to tweak these
settings, use the `sectionReadSizeKib` and `maxSectionCacheSizeKib` parameters on the `OcrHighlightComponent`
in your `solrconfig.xml`:

- `sectionReadSizeKib`: The size of the sections that are read from the OCR files. The default is 8KiB.
- `maxSectionCacheSizeKib`: The maximum memory that is used for caching sections. The default is 10 * `sectionReadSizeKib`.
jbaiter marked this conversation as resolved.
Show resolved Hide resolved

## Concurrency
The plugin can read multiple files in parallel and also process them concurrently. By default, it will
use as many threads as there are available logical CPU cores on the machine, but this can be tweaked
Expand Down
41 changes: 28 additions & 13 deletions example/bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"""

import argparse
import gzip
import json
import os
import random
Expand All @@ -33,7 +34,7 @@
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from multiprocessing import cpu_count
from pathlib import Path
from typing import Iterable, Mapping, NamedTuple, TextIO, cast
from typing import Iterable, Mapping, NamedTuple, Sequence, TextIO, cast
from urllib.parse import urlencode
from urllib.request import Request, urlopen
from collections import Counter
Expand All @@ -43,8 +44,9 @@


class BenchmarkResult(NamedTuple):
query_times_ms: list[Mapping[str, float]]
hl_times_ms: list[Mapping[str, float]]
query_times_ms: Sequence[Mapping[str, float]]
hl_times_ms: Sequence[Mapping[str, float]]
responses: Sequence[Mapping[str, dict]] = []

def mean_query_time(self) -> float:
return statistics.mean(
Expand Down Expand Up @@ -111,14 +113,18 @@ def parse_hocr(hocr_path: Path) -> Iterable[tuple[str, ...]]:
yield passage


def _queryset_worker_fn(p):
return analyze_phrases(parse_hocr(p))


def build_query_set(
hocr_base_path: Path, min_count=8, max_count=256
) -> Iterable[tuple[str, int]]:
# Counts in how many documents a phrase occurs
phrase_counter = Counter()
with ProcessPoolExecutor(max_workers=cpu_count()) as pool:
futs = [
pool.submit(lambda p: analyze_phrases(parse_hocr(p)), hocr_path)
pool.submit(_queryset_worker_fn, hocr_path)
for hocr_path in hocr_base_path.glob("**/*.hocr")
]
num_completed = 0
Expand Down Expand Up @@ -146,7 +152,7 @@ def build_query_set(

def run_query(
query: str, solr_handler: str, num_rows: int, num_snippets: int
) -> tuple[float, float]:
) -> tuple[float, float, dict]:
query_params = {
"q": f"ocr_text:{query}",
"hl": "on",
Expand All @@ -161,7 +167,7 @@ def run_query(
solr_resp = json.load(http_resp)
hl_duration = solr_resp["debug"]["timing"]["process"]["ocrHighlight"]["time"]
query_duration = solr_resp["debug"]["timing"]["time"]
return query_duration, hl_duration
return query_duration, hl_duration, solr_resp


def run_benchmark(
Expand All @@ -185,19 +191,21 @@ def run_benchmark(
def _run_query(query):
return query, run_query(query, solr_handler, num_rows, num_snippets)

responses: list[dict[str, dict]] = []
for iteration_idx in range(iterations):
iter_futs = [pool.submit(_run_query, query) for query in queries]

query_times = {}
hl_times = {}
for idx, fut in enumerate(as_completed(iter_futs)):
try:
query, (query_time, hl_time) = fut.result()
query, (query_time, hl_time, resp) = fut.result()
except Exception as e:
print(f"\nError: {e}", file=sys.stderr)
continue
query_times[query] = query_time
hl_times[query] = hl_time
responses.append({query: resp})
hl_factor = statistics.mean(hl_times.values()) / statistics.mean(
query_times.values()
)
Expand All @@ -212,7 +220,7 @@ def _run_query(query):
all_query_times.append(query_times)
all_hl_times.append(hl_times)

return BenchmarkResult(all_query_times, all_hl_times)
return BenchmarkResult(all_query_times, all_hl_times, responses)


if __name__ == "__main__":
Expand Down Expand Up @@ -245,7 +253,14 @@ def _run_query(query):
type=str,
default=None,
metavar="PATH",
help="Path to save the results to as a JSON file (optional)",
help="Path to save the benchmarking results for every query to as a JSON file (optional)",
)
parser.add_argument(
"--save-responses",
type=str,
default=None,
metavar="PATH",
help="Path to save the responses for every query to as a JSON file (optional)",
)
parser.add_argument(
"--num-rows",
Expand All @@ -271,8 +286,6 @@ def _run_query(query):

if os.path.exists(args.queries_path):
if args.queries_path.endswith(".gz"):
import gzip

with gzip.open(args.queries_path, "rt") as f:
queries = set(
q for q in (line.strip() for line in cast(TextIO, f)) if q
Expand All @@ -284,8 +297,6 @@ def _run_query(query):
hocr_base_path = Path("./data/google1000")
queries = set(q for q, _ in build_query_set(hocr_base_path))
if args.queries_path.endswith(".gz"):
import gzip

with cast(TextIO, gzip.open(args.queries_path, "wt", compresslevel=9)) as f:
f.write("\n".join(queries))
else:
Expand Down Expand Up @@ -318,3 +329,7 @@ def _run_query(query):
},
f,
)

if args.save_responses:
with cast(TextIO, gzip.open(args.save_responses, "wt", compresslevel=9)) as f:
json.dump(results.responses, f)
1 change: 0 additions & 1 deletion example/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ services:
- ENABLE_REMOTE_JMX_OPTS=true
- SOLR_HEAP=4g
- ADDITIONAL_CMD_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:1044 -XX:StartFlightRecording=settings=profile,filename=/flightrecords/profile.jfr,maxage=30m -XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints -XX:+PreserveFramePointer

- SOLR_SECURITY_MANAGER_ENABLED=false
entrypoint:
- docker-entrypoint.sh
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

import com.github.dbmdz.solrocr.formats.OcrParser;
import com.github.dbmdz.solrocr.iter.BreakLocator;
import com.github.dbmdz.solrocr.iter.IterableCharSequence;
import com.github.dbmdz.solrocr.iter.TagBreakLocator;
import com.github.dbmdz.solrocr.model.OcrBlock;
import com.github.dbmdz.solrocr.model.OcrFormat;
import com.github.dbmdz.solrocr.model.OcrPage;
import com.github.dbmdz.solrocr.reader.SourceReader;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Range;
import java.awt.Dimension;
Expand All @@ -25,11 +25,11 @@ public class AltoFormat implements OcrFormat {
OcrBlock.WORD, "String");

@Override
public BreakLocator getBreakLocator(IterableCharSequence text, OcrBlock... blockTypes) {
public BreakLocator getBreakLocator(SourceReader reader, OcrBlock... blockTypes) {
// NOTE: The ALTO hierarchy we support is pretty rigid, i.e. Page > TextBlock > TextLine >
// String is a given, hence we only grab the lowest-hierarchy block and call it a day
String breakTag = blockTagMapping.get(blockTypes[0]);
return new TagBreakLocator(text, breakTag);
return new TagBreakLocator(reader, breakTag);
}

@Override
Expand Down
Loading
Loading