Skip to content
This repository has been archived by the owner on Nov 4, 2024. It is now read-only.

Commit

Permalink
Insert to index with read lock and write lock
Browse files Browse the repository at this point in the history
  • Loading branch information
jschmieg committed Oct 4, 2017
1 parent b40d4ef commit b51d95a
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 73 deletions.
2 changes: 1 addition & 1 deletion src/pmse_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ void InsertIndexChange::rollback() {
try {
transaction::exec_tx(_pop, [this] {
IndexKeyEntry entry(_key.getOwned(), _loc);
_tree->remove(_pop, entry, _dupsAllowed, _desc->keyPattern(), nullptr);
_tree->remove(_pop, entry, _dupsAllowed, _desc->keyPattern());
});
} catch (std::exception &e) {
log() << e.what();
Expand Down
40 changes: 21 additions & 19 deletions src/pmse_index_cursor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,20 @@ PmseCursor::PmseCursor(OperationContext* txn, bool isForward,
_ordering(ordering),
_first(tree->_first),
_last(tree->_last),
_unique(unique),
_tree(tree),
_endPositionIsDataEnd(false),
_locateFoundDataEnd(false),
_wasMoved(false),
_eofRestore(false) {}

// Find entry in tree which is equal or bigger to input entry
// Locates input cursor on that entry
// Sets _locateFoundDataEnd when result is after last entry in tree
bool PmseCursor::lower_bound(IndexKeyEntry entry, CursorObject& cursor, std::list<LocksPtr>& locks) {
bool PmseCursor::lower_bound(IndexKeyEntry entry, CursorObject& cursor, std::list<nvml::obj::shared_mutex *>& locks) {
uint64_t i = 0;
int64_t cmp;
(_tree->_root->_pmutex).lock_shared();
persistent_ptr<PmseTreeNode> current = _tree->_root;
(current->_pmutex).lock_shared();
persistent_ptr<PmseTreeNode> child;
while (!current->is_leaf) {
i = 0;
while (i < current->num_keys) {
Expand All @@ -81,11 +80,12 @@ bool PmseCursor::lower_bound(IndexKeyEntry entry, CursorObject& cursor, std::lis
break;
}
}
(current->children_array[i]->_pmutex).lock_shared();
current = current->children_array[i];
current->parent->_pmutex.unlock_shared();
child = current->children_array[i];
child->_pmutex.lock_shared();
current->_pmutex.unlock_shared();
current = child;
}
locks.push_back(LocksPtr(&(current->_pmutex)));
locks.push_back(&(current->_pmutex));
i = 0;
IndexEntryComparison c(Ordering::make(_ordering));

Expand All @@ -100,8 +100,10 @@ bool PmseCursor::lower_bound(IndexKeyEntry entry, CursorObject& cursor, std::lis
cursor.index = 0;
return true;
}
_locateFoundDataEnd = true;
return false;
else{
_locateFoundDataEnd = true;
return false;
}
}
cursor.node = current;
cursor.index = i;
Expand Down Expand Up @@ -133,7 +135,7 @@ bool PmseCursor::atOrPastEndPointAfterSeeking() {
}
}

void PmseCursor::locate(const BSONObj& key, const RecordId& loc, std::list<LocksPtr>& locks) {
void PmseCursor::locate(const BSONObj& key, const RecordId& loc, std::list<nvml::obj::shared_mutex *>& locks) {
bool locateFound;
CursorObject locateCursor;
_isEOF = false;
Expand Down Expand Up @@ -182,7 +184,7 @@ void PmseCursor::seekEndCursor() {

if (!_endState || !_tree->_root)
return;
std::list<LocksPtr> locks;
std::list<nvml::obj::shared_mutex *> locks;
found = lower_bound(_endState->query, endCursor, locks);
if (_locateFoundDataEnd) {
_endPositionIsDataEnd = true;
Expand Down Expand Up @@ -251,7 +253,7 @@ bool PmseCursor::atEndPoint() {

boost::optional<IndexKeyEntry> PmseCursor::next(
RequestedInfo parts = kKeyAndLoc) {
std::list<LocksPtr> locks;
std::list<nvml::obj::shared_mutex *> locks;
if (_wasRestore) {
locate(_cursorKey, RecordId(_cursorId), locks);
moveToNext();
Expand Down Expand Up @@ -319,22 +321,22 @@ void PmseCursor::moveToNext() {
}
}

void PmseCursor::unlockTree(std::list<LocksPtr>& locks) {
std::list<LocksPtr>::const_iterator iterator;
void PmseCursor::unlockTree(std::list<nvml::obj::shared_mutex*>& locks) {
std::list<nvml::obj::shared_mutex *>::const_iterator iterator;
try {
for (iterator = locks.begin(); iterator != locks.end(); ++iterator) {
iterator->ptr->unlock_shared();
(*iterator)->unlock_shared();
}
locks.erase(locks.begin(), locks.end());
}catch(std::exception &e) {}
} catch (std::exception &e) {}
}

boost::optional<IndexKeyEntry> PmseCursor::seek(const BSONObj& key,
bool inclusive,
RequestedInfo parts = kKeyAndLoc) {
if (!_tree->_root)
return {};
std::list<LocksPtr> locks;
std::list<nvml::obj::shared_mutex*> locks;

if (key.isEmpty()) {
if (inclusive) {
Expand Down Expand Up @@ -372,7 +374,7 @@ boost::optional<IndexKeyEntry> PmseCursor::seek(const IndexSeekPoint& seekPoint,

const BSONObj query = IndexEntryComparison::makeQueryObject(seekPoint, _forward);
auto discriminator = RecordId::min();
std::list<LocksPtr> locks;
std::list<nvml::obj::shared_mutex *> locks;
locate(query, _forward ? RecordId::min() : RecordId::max(), locks);

if (_isEOF) {
Expand Down
12 changes: 3 additions & 9 deletions src/pmse_index_cursor.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,29 +95,24 @@ class PmseCursor final : public SortedDataInterface::Cursor {
}
return bb.obj();
}
void locate(const BSONObj& key, const RecordId& loc, std::list<LocksPtr>& locks);
void unlockTree(std::list<LocksPtr>& locks);
void locate(const BSONObj& key, const RecordId& loc, std::list<nvml::obj::shared_mutex*>& locks);
void unlockTree(std::list<nvml::obj::shared_mutex *>& locks);
void seekEndCursor();
bool lower_bound(IndexKeyEntry entry, CursorObject& cursor, std::list<LocksPtr>& locks);
bool previous(CursorObject&);
bool lower_bound(IndexKeyEntry entry, CursorObject& cursor, std::list<nvml::obj::shared_mutex*>& locks);
void moveToNext();
bool atOrPastEndPointAfterSeeking();
bool atEndPoint();
const bool _forward;
const BSONObj& _ordering;
persistent_ptr<PmseTreeNode> _first;
persistent_ptr<PmseTreeNode> _last;
const bool _unique;
persistent_ptr<PmseTree> _tree;
bool _isEOF = true;
/*
* Cursor used for iterating with next until "_endPosition"
*/
boost::optional<IndexKeyEntry> _endPosition;
CursorObject _cursor;
CursorObject _returnValue;
static IndexKeyEntry_PM min;
static IndexKeyEntry_PM max;

struct EndState {
EndState(BSONObj key, RecordId loc) : query(std::move(key), loc) {}
Expand All @@ -128,7 +123,6 @@ class PmseCursor final : public SortedDataInterface::Cursor {
int64_t _cursorId;
bool _endPositionIsDataEnd;
bool _locateFoundDataEnd;
bool _wasMoved;
bool _eofRestore;
bool _wasRestore = false;
};
Expand Down
2 changes: 1 addition & 1 deletion src/pmse_sorted_data_interface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ void PmseSortedDataInterface::unindex(OperationContext* txn, const BSONObj& key,
IndexKeyEntry entry(key.getOwned(), loc);
try {
transaction::exec_tx(_pm_pool, [this, &entry, dupsAllowed, txn] {
_tree->remove(_pm_pool, entry, dupsAllowed, _desc->keyPattern(), txn);
_tree->remove(_pm_pool, entry, dupsAllowed, _desc->keyPattern());
});
} catch (std::exception &e) {
log() << e.what();
Expand Down
128 changes: 95 additions & 33 deletions src/pmse_tree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,11 @@ BSONObj IndexKeyEntry_PM::getBSON() {
}

bool PmseTree::remove(pool_base pop, IndexKeyEntry& entry,
bool dupsAllowed, const BSONObj& ordering,
OperationContext* txn = nullptr) {
bool dupsAllowed, const BSONObj& ordering) {
persistent_ptr<PmseTreeNode> node;
uint64_t i;
int64_t cmp;
std::list<LocksPtr> locks;
std::list<nvml::obj::shared_mutex *> locks;
persistent_ptr<PmseTreeNode> lockNode;
_ordering = ordering;
// find node with key
Expand Down Expand Up @@ -498,17 +497,35 @@ bool PmseTree::nodeIsSafeForOperation(persistent_ptr<PmseTreeNode> node, bool in

persistent_ptr<PmseTreeNode> PmseTree::locateLeafWithKeyPM(
persistent_ptr<PmseTreeNode> node, IndexKeyEntry& entry,
const BSONObj& ordering, std::list<LocksPtr>& locks,
const BSONObj& ordering, std::list<nvml::obj::shared_mutex *>& locks,
persistent_ptr<PmseTreeNode>& lockNode, bool insert) {
uint64_t i = 0;
int64_t cmp;
(node->_pmutex).lock();
locks.push_back(LocksPtr(&(node->_pmutex)));
persistent_ptr<PmseTreeNode> current = node;
persistent_ptr<PmseTreeNode> current = _root;
persistent_ptr<PmseTreeNode> child;

if (current == nullptr)
return current;

return nullptr;

if (current->is_leaf){
(current->_pmutex).lock();
if (current.raw_ptr()->off != _root.raw_ptr()->off)
{
(current->_pmutex).unlock();
current = _root;
}
else{
locks.push_back(&(_root->_pmutex));
return _root;
}
}
(current->_pmutex).lock_shared();
if (current.raw_ptr()->off != _root.raw_ptr()->off)
{
(current->_pmutex).unlock_shared();
(_root->_pmutex).lock_shared();
current = _root;
}
while (!current->is_leaf) {
i = 0;
while (i < current->num_keys) {
Expand All @@ -519,12 +536,49 @@ persistent_ptr<PmseTreeNode> PmseTree::locateLeafWithKeyPM(
break;
}
}
if (current->children_array[i]->is_leaf){
(current->children_array[i]->_pmutex).lock();
current = current->children_array[i];
if (nodeIsSafeForOperation(current, insert)) {
unlockTree(locks);
locks.push_back(&(current->children_array[i]->_pmutex));
}
else{
(current->children_array[i]->_pmutex).lock_shared();
}

current = current->children_array[i];
current->parent->_pmutex.unlock_shared();
}
if (!nodeIsSafeForOperation(current, insert)){
unlockTree(locks);
current = _root;
(current->_pmutex).lock();
while (current.raw_ptr()->off != _root.raw_ptr()->off)
{
(current->_pmutex).unlock();
current = _root;
(current->_pmutex).lock();
}
locks.push_back(&(current->_pmutex));
if (current == nullptr)
return nullptr;

while (!current->is_leaf) {
i = 0;
while (i < current->num_keys) {
cmp = IndexKeyEntry_PM::compareEntries(entry, current->keys[i], ordering);
if (cmp >= 0) {
i++;
} else {
break;
}
}
child = current->children_array[i];
(child->_pmutex).lock();
current = child;
if (nodeIsSafeForOperation(current, insert)) {
unlockTree(locks);
}
locks.push_back(&(current->_pmutex));
}
locks.push_back(LocksPtr(&(current->_pmutex)));
}
if (current->next) {
current->next->_pmutex.lock();
Expand All @@ -533,6 +587,7 @@ persistent_ptr<PmseTreeNode> PmseTree::locateLeafWithKeyPM(
return current;
}


/*
* Insert leaf into correct place.
*/
Expand Down Expand Up @@ -572,7 +627,8 @@ uint64_t PmseTree::cut(uint64_t length) {
*/
persistent_ptr<PmseTreeNode> PmseTree::splitFullNodeAndInsert(
pool_base pop, persistent_ptr<PmseTreeNode> node,
IndexKeyEntry& entry, const BSONObj& _ordering) {
IndexKeyEntry& entry, const BSONObj& _ordering,
std::list<nvml::obj::shared_mutex *>& locks) {
persistent_ptr<PmseTreeNode> new_leaf;
IndexKeyEntry_PM new_entry;
uint64_t insertion_index = 0;
Expand Down Expand Up @@ -633,6 +689,11 @@ persistent_ptr<PmseTreeNode> PmseTree::splitFullNodeAndInsert(
new_leaf->parent = node->parent;
new_entry = new_leaf->keys[0];
new_root = insertIntoNodeParent(pop, _root, node, new_entry, new_leaf);
if(new_root.raw_ptr()->off!=_root.raw_ptr()->off)
{
new_root->_pmutex.lock();
locks.push_back(&(new_root->_pmutex));
}
new_leaf->_pmutex.unlock();
_last = new_leaf;
return new_root;
Expand Down Expand Up @@ -788,11 +849,11 @@ persistent_ptr<PmseTreeNode> PmseTree::allocateNewRoot(
return new_root;
}

void PmseTree::unlockTree(std::list<LocksPtr>& locks) {
std::list<LocksPtr>::const_iterator iterator;
void PmseTree::unlockTree(std::list<nvml::obj::shared_mutex *>& locks) {
std::list<nvml::obj::shared_mutex *>::const_iterator iterator;
try {
for (iterator = locks.begin(); iterator != locks.end(); ++iterator) {
iterator->ptr->unlock();
(*iterator)->unlock();
}
locks.erase(locks.begin(), locks.end());
}catch(std::exception &e) {}
Expand All @@ -804,25 +865,26 @@ Status PmseTree::insert(pool_base pop, IndexKeyEntry& entry,
Status status = Status::OK();
uint64_t i;
int64_t cmp;
std::list<LocksPtr> locks;
std::list<nvml::obj::shared_mutex *> locks;
persistent_ptr<PmseTreeNode> lockNode;

if (!_root) {
// root not allocated yet
try {
transaction::exec_tx(pop, [this, &entry] {
_root = makeTreeRoot(entry);
_first = _root;
_last = _root;
});
} catch (std::exception &e) {
log() << "Index: " << e.what();
status = Status(ErrorCodes::CommandFailed, e.what());
stdx::lock_guard<nvml::obj::mutex> guard(globalMutex);
if(!_root){
// root not allocated yet
try {
transaction::exec_tx(pop, [this, &entry] {
_root = makeTreeRoot(entry);
_first = _root;
_last = _root;
});
} catch (std::exception &e) {
log() << "Index: " << e.what();
status = Status(ErrorCodes::CommandFailed, e.what());
}
return status;
}
return status;
}
node = locateLeafWithKeyPM(_root, entry, ordering, locks, lockNode, true);

/*
* Duplicate key check
*/
Expand Down Expand Up @@ -870,8 +932,8 @@ Status PmseTree::insert(pool_base pop, IndexKeyEntry& entry,
* splitting
*/
try {
transaction::exec_tx(pop, [this, pop, &node, &entry, ordering] {
_root = splitFullNodeAndInsert(pop, node, entry, ordering);
transaction::exec_tx(pop, [this, pop, &node, &entry, ordering, &locks] {
_root = splitFullNodeAndInsert(pop, node, entry, ordering, locks);
});
} catch (std::exception &e) {
log() << "Index: " << e.what();
Expand Down
Loading

0 comments on commit b51d95a

Please sign in to comment.