Skip to content

Commit

Permalink
changed to write via tensorstore
Browse files Browse the repository at this point in the history
  • Loading branch information
fcollman committed Jan 31, 2024
1 parent aa0b558 commit 04e623b
Showing 1 changed file with 89 additions and 32 deletions.
121 changes: 89 additions & 32 deletions python/neuroglancer/write_annotations.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,7 @@
from collections.abc import Sequence
from typing import Literal, NamedTuple, Optional, Union, cast
from logging import warning
try:
from cloudvolume.datasource.precomputed.sharding import (
ShardingSpecification, synthesize_shard_files
)
except ImportError:
class ShardingSpecification:
def __init__(self, *args, **kwargs):
raise NotImplementedError("cloudvolume is not installed")

def synthesize_shard_files(*args, **kwargs):
raise NotImplementedError("cloudvolume is not installed")

warning("cloudvolume is not installed, so sharding is not supported."
"pip install cloud-volume to install")

import tensorstore as ts
import numpy as np

from . import coordinate_space, viewer_state
Expand Down Expand Up @@ -74,8 +60,50 @@ class Annotation(NamedTuple):
}

AnnotationType = Literal["point", "line", "axis_aligned_bounding_box", "ellipsoid"]
ShardHashType = Literal["murmurhash3_x86_128", "identity_hash"]

MINISHARD_TARGET_COUNT = 1000
SHARD_TARGET_SIZE = 50000000


def choose_output_spec(total_count, total_bytes,
hash: ShardHashType = "murmurhash3_x86_128",
gzip_compress=True):
if total_count ==1:
return None

options = {
'@type': 'neuroglancer_uint64_sharded_v1',
'hash': hash,
}

total_minishard_bits = 0
while (total_count >> total_minishard_bits) > MINISHARD_TARGET_COUNT:
total_minishard_bits += 1

shard_bits = 0
while (total_bytes >> shard_bits) > SHARD_TARGET_SIZE:
shard_bits += 1

preshift_bits = 0
while MINISHARD_TARGET_COUNT >> preshift_bits:
preshift_bits += 1

options['preshift_bits'] = preshift_bits
options['shard_bits'] = shard_bits
options['minishard_bits'] = total_minishard_bits - min(total_minishard_bits, shard_bits)
if gzip_compress:
options['data_encoding'] = 'gzip'
options['minishard_index_encoding'] = 'gzip'
else:
options['data_encoding'] = 'raw'
options['minishard_index_encoding'] = 'raw'

# options.setdefault('minishard_index_compression', {}).setdefault('gzip_compression', {})['level'] = minishard_index_compression
# options.setdefault('data_compression', {}).setdefault('gzip_compression', {})['level'] = data_compression

return options

def _get_dtype_for_geometry(annotation_type: AnnotationType, rank: int):
geometry_size = rank if annotation_type == "point" else 2 * rank
return [("geometry", "<f4", geometry_size)]
Expand Down Expand Up @@ -116,9 +144,30 @@ def __init__(
lower_bound: Sequence,
relationships: Sequence[str] = (),
properties: Sequence[viewer_state.AnnotationPropertySpec] = (),
chunk_size: Sequence[int] = [256, 256, 256],
id_sharding_spec: ShardingSpecification = None
chunk_size: Sequence[int] = [256, 256, 256]
):
"""Initializes an `AnnotationWriter`.
Args:
coordinate_space: The coordinate space in which the annotations are
defined. is a `CoordinateSpace` object.
annotation_type: The type of annotation. Must be one of "point",
"line", "axis_aligned_bounding_box", or "ellipsoid".
lower_bound: The lower bound of the bounding box of the annotations.
relationships: The names of relationships between annotations. Each
relationship is a string that is used as a key in the `relationships`
field of each annotation. For example, if `relationships` is
`["parent", "child"]`, then each annotation may have a `parent` and
`child` relationship, and the `relationships` field of each annotation
is a dictionary with keys `"parent"` and `"child"`.
properties: The properties of each annotation. Each property is a
`AnnotationPropertySpec` object.
chunk_size: The size of each chunk in the spatial index. Must have the
same length as `coordinate_space.rank`.
write_id_sharded: If True, the annotations will be sharded by id.
id_sharding_spec: The sharding specification for the id sharding. If
not specified spec will be automatically configured
"""
self.chunk_size = np.array(chunk_size)
self.coordinate_space = coordinate_space
self.relationships = list(relationships)
Expand All @@ -137,8 +186,7 @@ def __init__(
shape=(self.rank,), fill_value=float("-inf"), dtype=np.float32
)
self.related_annotations = [{} for _ in self.relationships]
self.id_sharding_spec = id_sharding_spec



def get_chunk_index(self, coords):
return tuple(((coords-self.lower_bound) // self.chunk_size).astype(np.int32))
Expand All @@ -155,7 +203,6 @@ def add_point(self, point: Sequence[float], id: Optional[int] = None, **kwargs):

#self.lower_bound = np.minimum(self.lower_bound, point)
self.upper_bound = np.maximum(self.upper_bound, point)
self.kdtree.add(point)
self._add_obj(point, id, **kwargs)

def add_axis_aligned_bounding_box(
Expand Down Expand Up @@ -241,13 +288,20 @@ def _add_obj(self, coords: Sequence[float], id: Optional[int], **kwargs):
rel_index = self.related_annotations[i]
rel_index_list = rel_index.setdefault(segment_id, [])
rel_index_list.append(annotation)

def _serialize_annotations_sharded(self, path, annotations: list[Annotation], shard_spec: ShardingSpecification):
ann_dict_encoding = {a.id: a.encoded for a in annotations}
shard_files = synthesize_shard_files(shard_spec, ann_dict_encoding)
for shard_id, shard_file in shard_files.items():
with open(os.path.join(path, f"{shard_id}"), "wb") as f:
f.write(shard_file)

def _serialize_annotations_sharded(self, path, annotations, shard_spec):
spec = {
'driver': 'neuroglancer_uint64_sharded',
'metadata': shard_spec,
"base": f"file://{path}"
}
dataset = ts.KvStore.open(spec).result()
txn = ts.Transaction()
for ann in annotations:
# convert the ann.id to a binary representation of a uint64
key = ann.id.to_bytes(8, 'little')
dataset.with_transaction(txn)[key]=ann.encoded
txn.commit_async().result()

def _serialize_annotations(self, f, annotations: list[Annotation]):
f.write(struct.pack("<Q", len(annotations)))
Expand Down Expand Up @@ -279,9 +333,12 @@ def write(self, path: Union[str, pathlib.Path]):
"key": "by_id"
}
}
if self.id_sharding_spec is not None:
metadata["by_id"]["sharding"] = self.id_sharding_spec.to_dict()

sharding_spec = choose_output_spec(len(self.annotations),
sum(len(a.encoded) for a in self.annotations))
if sharding_spec is not None:
metadata["by_id"]["sharding"] = sharding_spec


# calculate the number of chunks in each dimension
num_chunks = np.ceil((self.upper_bound - self.lower_bound) / self.chunk_size).astype(int)

Expand Down Expand Up @@ -314,8 +371,8 @@ def write(self, path: Union[str, pathlib.Path]):
) as f:
self._serialize_annotations(f, annotations)

if self.id_sharding_spec is not None:
self._serialize_annotations_sharded(os.path.join(path, "by_id"), self.annotations, self.id_sharding_spec)
if sharding_spec is not None:
self._serialize_annotations_sharded(os.path.join(path, "by_id"), self.annotations, sharding_spec)
else:
for annotation in self.annotations:
with open(os.path.join(path, "by_id", str(annotation.id)), "wb") as f:
Expand Down

0 comments on commit 04e623b

Please sign in to comment.