Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Forward pointers implementation (#651)" #800

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
### 1.73 (2018-11-27)
* Bugfix: #658 Write/append errors for Panel objects from older pandas versions
* Feature: #653 Add version meta-info in arctic module
* Feature: #663 Include arctic numerical version in the metadata of the version document
* Feature: #650 Implemented forward pointers for chunks in VersionStore (modes: enabled/disabled/hybrid)

### 1.72 (2018-11-06)
* Feature: #577 Added implementation for incremental serializer for numpy records
Expand Down
6 changes: 0 additions & 6 deletions arctic/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
""" The Arctic TimeSeries and Tick store."""

from .arctic import Arctic, register_library_type
from .arctic import VERSION_STORE, TICK_STORE, CHUNK_STORE
from .store._ndarray_store import NdarrayStore
from .store._pandas_ndarray_store import PandasDataFrameStore, PandasSeriesStore, PandasPanelStore
from .store.version_store import register_versioned_storage, register_version
Expand All @@ -10,16 +8,12 @@
from pkg_resources import get_distribution
str_version = get_distribution(__name__).version.strip()
int_parts = tuple(int(x) for x in str_version.split('.'))
num_version = sum([1000 ** i * v for i, v in enumerate(reversed(int_parts))])
register_version(str_version, num_version)
except Exception:
__version__ = None
__version_parts__ = tuple()
__version_numerical__ = 0
else:
__version__ = str_version
__version_parts__ = int_parts
__version_numerical__ = num_version


register_versioned_storage(PandasDataFrameStore)
Expand Down
4 changes: 0 additions & 4 deletions arctic/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@
_use_new_count_api = None


def get_fwptr_config(version):
return FwPointersCfg[version.get(FW_POINTERS_CONFIG_KEY, FwPointersCfg.DISABLED.name)]


def _detect_new_count_api():
try:
mongo_v = [int(v) for v in pymongo.version.split('.')]
Expand Down
2 changes: 0 additions & 2 deletions arctic/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,9 @@ class DataIntegrityException(ArcticException):
"""
pass


class ArcticSerializationException(ArcticException):
pass


class ConcurrentModificationException(DataIntegrityException):
pass

Expand Down
288 changes: 66 additions & 222 deletions arctic/store/_ndarray_store.py

Large diffs are not rendered by default.

82 changes: 9 additions & 73 deletions arctic/store/_version_store_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
from pandas.compat import pickle_compat
from pymongo.errors import OperationFailure

from arctic._config import FW_POINTERS_REFS_KEY, FW_POINTERS_CONFIG_KEY, FwPointersCfg
from arctic._util import mongo_count, get_fwptr_config
from arctic._util import mongo_count


def _split_arrs(array_2d, slices):
Expand Down Expand Up @@ -47,28 +46,15 @@ def checksum(symbol, doc):
return Binary(sha.digest())


def get_symbol_alive_shas(symbol, versions_coll):
return set(Binary(x) for x in versions_coll.distinct(FW_POINTERS_REFS_KEY, {'symbol': symbol}))


def _cleanup_fw_pointers(collection, symbol, version_ids, versions_coll, shas_to_delete, do_clean=True):
shas_to_delete = set(shas_to_delete) if shas_to_delete else set()

if not version_ids or not shas_to_delete:
return shas_to_delete

symbol_alive_shas = get_symbol_alive_shas(symbol, versions_coll)

# This is the set of shas which are not referenced by any FW pointers
shas_safe_to_delete = shas_to_delete - symbol_alive_shas

if do_clean and shas_safe_to_delete:
collection.delete_many({'symbol': symbol, 'sha': {'$in': list(shas_safe_to_delete)}})

return shas_safe_to_delete

def cleanup(arctic_lib, symbol, version_ids):
"""
Helper method for cleaning up chunks from a version store
"""
collection = arctic_lib.get_top_level_collection()

def _cleanup_parent_pointers(collection, symbol, version_ids):
# Remove any chunks which contain just the parents, at the outset
# We do this here, because $pullALL will make an empty array: []
# and the index which contains the parents field will fail the unique constraint.
for v in version_ids:
# Remove all documents which only contain the parent
collection.delete_many({'symbol': symbol,
Expand All @@ -83,56 +69,6 @@ def _cleanup_parent_pointers(collection, symbol, version_ids):
collection.delete_one({'symbol': symbol, 'parent': []})


def _cleanup_mixed(symbol, collection, version_ids, versions_coll):
# Pull the deleted version IDs from the the parents field
collection.update_many({'symbol': symbol, 'parent': {'$in': version_ids}}, {'$pullAll': {'parent': version_ids}})

# All-inclusive set of segments which are pointed by at least one version (SHA fw pointers)
symbol_alive_shas = get_symbol_alive_shas(symbol, versions_coll)

spec = {'symbol': symbol, 'parent': []}
if symbol_alive_shas:
# This query unfortunately, while it hits the index (symbol, sha) to find the documents, in order to filter
# the documents by "parent: []" it fetches at server side, and pollutes the cache of WiredTiger
# TODO: add a new index for segments collection: (symbol, sha, parent)
spec['sha'] = {'$nin': list(symbol_alive_shas)}
collection.delete_many(spec)


def _get_symbol_pointer_cfgs(symbol, versions_coll):
return set(get_fwptr_config(v)
for v in versions_coll.find({'symbol': symbol}, projection={FW_POINTERS_CONFIG_KEY: 1}))


def cleanup(arctic_lib, symbol, version_ids, versions_coll, shas_to_delete=None, pointers_cfgs=None):
"""
Helper method for cleaning up chunks from a version store
"""
pointers_cfgs = set(pointers_cfgs) if pointers_cfgs else set()
collection = arctic_lib.get_top_level_collection()
version_ids = list(version_ids)

# Iterate versions to check if they are created only with fw pointers, parent pointers (old), or mixed
# Keep in mind that the version is not yet inserted.
all_symbol_pointers_cfgs = _get_symbol_pointer_cfgs(symbol, versions_coll)
all_symbol_pointers_cfgs.update(pointers_cfgs)

# All the versions of the symbol have been created with old arctic or with disabled forward pointers.
# Preserves backwards compatibility and regression for old pointers implementation.
if all_symbol_pointers_cfgs == {FwPointersCfg.DISABLED} or not all_symbol_pointers_cfgs:
_cleanup_parent_pointers(collection, symbol, version_ids)
return

# All the versions of the symbol we wish to delete have been created with forward pointers
if FwPointersCfg.DISABLED not in all_symbol_pointers_cfgs:
_cleanup_fw_pointers(collection, symbol, version_ids, versions_coll,
shas_to_delete=shas_to_delete, do_clean=True)
return

# Reaching here means the symbol has versions with mixed forward pointers and legacy/parent pointer configurations
_cleanup_mixed(symbol, collection, version_ids, versions_coll)


def version_base_or_id(version):
return version.get('base_version_id', version['_id'])

Expand Down
108 changes: 16 additions & 92 deletions arctic/store/version_store.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import os
from datetime import datetime as dt, timedelta

import bson
Expand All @@ -8,10 +9,10 @@
from pymongo.errors import OperationFailure, AutoReconnect, DuplicateKeyError

from ._pickle_store import PickleStore
from ._version_store_utils import cleanup, get_symbol_alive_shas, _get_symbol_pointer_cfgs
from ._version_store_utils import cleanup
from .versioned_item import VersionedItem
from .._config import STRICT_WRITE_HANDLER_MATCH, FW_POINTERS_REFS_KEY, FW_POINTERS_CONFIG_KEY, FwPointersCfg
from .._util import indent, enable_sharding, mongo_count, get_fwptr_config
from .._config import STRICT_WRITE_HANDLER_MATCH
from .._util import indent, enable_sharding, mongo_count
from ..date import mktz, datetime_to_ms, ms_to_datetime
from ..decorators import mongo_retry
from ..exceptions import NoDataFoundException, DuplicateSnapshotException, \
Expand All @@ -22,14 +23,7 @@

VERSION_STORE_TYPE = 'VersionStore'
_TYPE_HANDLERS = []
ARCTIC_VERSION = None
ARCTIC_VERSION_NUMERICAL = None


def register_version(version, numerical):
global ARCTIC_VERSION, ARCTIC_VERSION_NUMERICAL
ARCTIC_VERSION = version
ARCTIC_VERSION_NUMERICAL = numerical
STRICT_WRITE_HANDLER_MATCH = bool(os.environ.get('STRICT_WRITE_HANDLER_MATCH'))


def register_versioned_storage(storageClass):
Expand Down Expand Up @@ -417,28 +411,6 @@ def handler_supports_read_option(handler, option):
# that it does support this option (i.e. fail-open)
return True

def get_arctic_version(self, symbol, as_of=None):
"""
Return the numerical representation of the arctic version used to write the last (or as_of) version for
the given symbol.

Parameters
----------
symbol : `str`
symbol name for the item
as_of : `str` or int or `datetime.datetime`
Return the data as it was as_of the point in time.
`int` : specific version number
`str` : snapshot name which contains the version
`datetime.datetime` : the version of the data that existed as_of the requested point in time

Returns
-------
arctic_version : int
The numerical representation of Arctic version, used to create the specified symbol version
"""
return self._read_metadata(symbol, as_of=as_of).get('arctic_version', 0)

def _do_read(self, symbol, version, from_version=None, **kwargs):
if version.get('deleted'):
raise NoDataFoundException("No data found for %s in library %s" % (symbol, self._arctic_lib.get_name()))
Expand Down Expand Up @@ -553,7 +525,6 @@ def append(self, symbol, data, metadata=None, prune_previous_version=True, upser
"""
self._arctic_lib.check_quota()
version = {'_id': bson.ObjectId()}
version['arctic_version'] = ARCTIC_VERSION_NUMERICAL
version['symbol'] = symbol
spec = {'symbol': symbol}
previous_version = self._versions.find_one(spec,
Expand Down Expand Up @@ -609,7 +580,6 @@ def append(self, symbol, data, metadata=None, prune_previous_version=True, upser
self._prune_previous_versions(
symbol,
keep_version=version.get('base_version_id'),
new_version_shas=version.get(FW_POINTERS_REFS_KEY),
keep_mins=kwargs.get('keep_mins', 120)
)

Expand Down Expand Up @@ -648,7 +618,6 @@ def write(self, symbol, data, metadata=None, prune_previous_version=True, **kwar
"""
self._arctic_lib.check_quota()
version = {'_id': bson.ObjectId()}
version['arctic_version'] = ARCTIC_VERSION_NUMERICAL
version['symbol'] = symbol
version['version'] = self._version_nums.find_one_and_update({'symbol': symbol},
{'$inc': {'version': 1}},
Expand All @@ -665,7 +634,6 @@ def write(self, symbol, data, metadata=None, prune_previous_version=True, **kwar
self._prune_previous_versions(
symbol,
keep_mins=kwargs.get('keep_mins', 120),
new_version_shas=version.get(FW_POINTERS_REFS_KEY)
)

# Insert the new version into the version DB
Expand Down Expand Up @@ -711,8 +679,9 @@ def _add_new_version_using_reference(self, symbol, new_version, reference_versio
"The previous version (%s, %d) has been removed during the update" %
(symbol, str(reference_version['_id']), reference_version['version']))


if prune_previous_version and reference_version:
self._prune_previous_versions(symbol, new_version_shas=new_version.get(FW_POINTERS_REFS_KEY))
self._prune_previous_versions(symbol)

logger.debug('Finished updating versions with new metadata for %s', symbol)

Expand Down Expand Up @@ -846,10 +815,9 @@ def _find_prunable_version_ids(self, symbol, keep_mins):
sort=[('version', pymongo.DESCENDING)],
# Guarantees at least one version is kept
skip=1,
projection={'_id': 1, FW_POINTERS_REFS_KEY: 1, FW_POINTERS_CONFIG_KEY: 1},
projection=['_id'],
)
return {v['_id']: ([bson.binary.Binary(x) for x in v.get(FW_POINTERS_REFS_KEY, [])], get_fwptr_config(v))
for v in cursor}
return [version["_id"] for version in cursor]

@mongo_retry
def _find_base_version_ids(self, symbol, version_ids):
Expand All @@ -860,17 +828,16 @@ def _find_base_version_ids(self, symbol, version_ids):
'_id': {'$nin': version_ids},
'base_version_id': {'$exists': True},
},
projection={'base_version_id': 1})
projection=['base_version_id'],
)
return [version["base_version_id"] for version in cursor]

def _prune_previous_versions(self, symbol, keep_mins=120, keep_version=None, new_version_shas=None):
def _prune_previous_versions(self, symbol, keep_mins=120, keep_version=None):
"""
Prune versions, not pointed at by snapshots which are at least keep_mins old. Prune will never
remove all versions.
"""
new_version_shas = new_version_shas if new_version_shas else []
prunable_ids_to_shas = self._find_prunable_version_ids(symbol, keep_mins)
prunable_ids = list(prunable_ids_to_shas.keys())
prunable_ids = self._find_prunable_version_ids(symbol, keep_mins)
if keep_version is not None:
try:
prunable_ids.remove(keep_version)
Expand All @@ -886,16 +853,8 @@ def _prune_previous_versions(self, symbol, keep_mins=120, keep_version=None, new

# Delete the version documents
mongo_retry(self._versions.delete_many)({'_id': {'$in': version_ids}})

prunable_ids_to_shas = {k: prunable_ids_to_shas[k] for k in version_ids}

# The new version has not been written yet, so make sure that any SHAs pointed by it are preserved
shas_to_delete = [sha for v in prunable_ids_to_shas.values() for sha in v[0] if sha not in new_version_shas]

# Cleanup any chunks
mongo_retry(cleanup)(self._arctic_lib, symbol, version_ids, self._versions,
shas_to_delete=shas_to_delete,
pointers_cfgs=[v[1] for v in prunable_ids_to_shas.values()])
mongo_retry(cleanup)(self._arctic_lib, symbol, version_ids)

@mongo_retry
def _delete_version(self, symbol, version_num, do_cleanup=True):
Expand All @@ -916,13 +875,8 @@ def _delete_version(self, symbol, version_num, do_cleanup=True):
snap_name))
return
self._versions.delete_one({'_id': version['_id']})
# TODO: for FW pointers, if the above statement fails, they we have no way to delete the orphaned segments.
# This would be possible only via FSCK, or by moving the above statement at the end of this method,
# but with the risk of failing to delelte the version catastrophically, and ending up with a corrupted v.
if do_cleanup:
cleanup(self._arctic_lib, symbol, [version['_id']], self._versions,
shas_to_delete=tuple(bson.binary.Binary(s) for s in version.get(FW_POINTERS_REFS_KEY, [])),
pointers_cfgs=(get_fwptr_config(version), ))
cleanup(self._arctic_lib, symbol, [version['_id']])

@mongo_retry
def delete(self, symbol):
Expand Down Expand Up @@ -1082,38 +1036,9 @@ def _fsck(self, dry_run):
"""
# Cleanup Orphaned Chunks
self._cleanup_orphaned_chunks(dry_run)
# Cleanup unreachable SHAs (forward pointers)
self._cleanup_unreachable_shas(dry_run)
# Cleanup Orphaned Snapshots
self._cleanup_orphaned_versions(dry_run)

def _cleanup_unreachable_shas(self, dry_run):
lib = self
chunks_coll = lib._collection
versions_coll = chunks_coll.versions

for symbol in chunks_coll.distinct('symbol'):
logger.debug('Checking %s (forward pointers)' % symbol)

all_symbol_pointers_cfgs = _get_symbol_pointer_cfgs(symbol, versions_coll)

if FwPointersCfg.DISABLED not in all_symbol_pointers_cfgs:
# Obtain the SHAs which are no longer pointed to by any version
symbol_alive_shas = get_symbol_alive_shas(symbol, versions_coll)
all_symbol_shas = set(chunks_coll.distinct('sha', {'symbol': symbol}))
unreachable_shas = all_symbol_shas - symbol_alive_shas

logger.info("Cleaning up {} SHAs for symbol {}".format(len(unreachable_shas), symbol))
if not dry_run:
# Be liberal with the generation time.
id_time_constraint = {'$lt': bson.ObjectId.from_datetime(dt.now() - timedelta(days=1))}
# Do delete the data segments
chunks_coll.delete_many({
'_id': id_time_constraint, # can't rely on the parent field only for fw-pointers
'symbol': symbol,
'parent': id_time_constraint,
'sha': {'$in': list(unreachable_shas)}})

def _cleanup_orphaned_chunks(self, dry_run):
"""
Fixes any chunks who have parent pointers to missing versions.
Expand Down Expand Up @@ -1158,8 +1083,7 @@ def _cleanup_orphaned_chunks(self, dry_run):
(x, symbol))
# Now cleanup the leaked versions
if not dry_run:
# This is now able to handle safely symbols which have both forward and legacy/parent pointers
cleanup(lib._arctic_lib, symbol, leaked_versions, versions_coll)
cleanup(lib._arctic_lib, symbol, leaked_versions)

def _cleanup_orphaned_versions(self, dry_run):
"""
Expand Down
Loading