Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Index Operations to Translog from Engine #2977

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,8 @@ public Condition newCondition() {
*/
public abstract IndexResult index(Index index) throws IOException;

public abstract IndexResult addIndexOperationToTranslog(Index index) throws IOException;

/**
* Perform document delete operation on the engine
* @param delete operation to perform
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1024,23 +1024,7 @@ public IndexResult index(Index index) throws IOException {
}
}
if (index.origin().isFromTranslog() == false) {
final Translog.Location location;
if (indexResult.getResultType() == Result.Type.SUCCESS) {
location = translog.add(new Translog.Index(index, indexResult));
} else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
// if we have document failure, record it as a no-op in the translog and Lucene with the generated seq_no
final NoOp noOp = new NoOp(
indexResult.getSeqNo(),
index.primaryTerm(),
index.origin(),
index.startTime(),
indexResult.getFailure().toString()
);
location = innerNoOp(noOp).getTranslogLocation();
} else {
location = null;
}
indexResult.setTranslogLocation(location);
addIndexOperationToTranslog(index, indexResult);
}
if (plan.indexIntoLucene && indexResult.getResultType() == Result.Type.SUCCESS) {
final Translog.Location translogLocation = trackTranslogLocation.get() ? indexResult.getTranslogLocation() : null;
Expand Down Expand Up @@ -1075,6 +1059,44 @@ public IndexResult index(Index index) throws IOException {
}
}

@Override
public Engine.IndexResult addIndexOperationToTranslog(Index index) throws IOException {
try (Releasable ignored = versionMap.acquireLock(index.uid().bytes())) {
Copy link
Collaborator

@reta reta Apr 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you need to acquire readLock lock here (as all other public methods do):

try (ReleasableLock ignored = readLock.acquire()) {
 ....
}

Also, it probably makes sense to do basic checks like ensureOpen(); and assertIncomingSequenceNumber before adding anything to translog.

ensureOpen();
assert assertIncomingSequenceNumber(index.origin(), index.seqNo());

Another concern I have is that write operations like index and delete do update lastWriteNanos but this is not done here, is there a reason?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd actually prefer to leave this method out until a segrep PR needs it. Nothing in this PR, except tests, call this method so it doesn't make sense to add at this point until we have a change that's actually using it. That will also concretely communicate why the method is needed to those not intimately working segrep; along with how to best structure the method.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 no objections, certainly

IndexingStrategy plan = indexingStrategyForOperation(index);
/**
* Matches the logic in {@link #indexIntoLucene(Index, IndexingStrategy)}
*/
IndexResult indexResult = new IndexResult(
plan.versionForIndexing,
index.primaryTerm(),
index.seqNo(),
plan.currentNotFoundOrDeleted
);
addIndexOperationToTranslog(index, indexResult);
indexResult.setTook(System.nanoTime() - index.startTime());
indexResult.freeze();
return indexResult;
}
}

private void addIndexOperationToTranslog(Index index, IndexResult indexResult) throws IOException {
Translog.Location location = null;
if (indexResult.getResultType() == Result.Type.SUCCESS) {
location = translog.add(new Translog.Index(index, indexResult));
} else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
// if we have document failure, record it as a no-op in the translog and Lucene with the generated seq_no
final NoOp noOp = new NoOp(
indexResult.getSeqNo(),
index.primaryTerm(),
index.origin(),
index.startTime(),
indexResult.getFailure().toString()
);
location = innerNoOp(noOp).getTranslogLocation();
}
indexResult.setTranslogLocation(location);
}

protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOException {
assert assertNonPrimaryOrigin(index);
// needs to maintain the auto_id timestamp in case this replica becomes primary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,12 @@ public IndexResult index(Index index) {
throw new UnsupportedOperationException("indexing is not supported on a read-only engine");
}

@Override
public IndexResult addIndexOperationToTranslog(Index index) throws IOException {
assert false : "this should not be called";
throw new UnsupportedOperationException("Translog operations are not supported on a read-only engine");
}

@Override
public DeleteResult delete(Delete delete) {
assert false : "this should not be called";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1656,6 +1656,15 @@ public void testUpdateWithFullyDeletedSegments() throws IOException {
}
}

public void testAddIndexOperationToTranslog() throws IOException {
try (Store store = createStore(); InternalEngine engine = createEngine(store, createTempDir())) {
ParsedDocument doc = createParsedDoc("1", null);
Engine.IndexResult result = engine.addIndexOperationToTranslog(indexForDoc(doc));
assertEquals(Engine.Operation.TYPE.INDEX, result.getOperationType());
assertNotNull(result.getTranslogLocation());
}
}

public void testForceMergeWithSoftDeletesRetention() throws Exception {
final long retainedExtraOps = randomLongBetween(0, 10);
Settings.Builder settings = Settings.builder()
Expand Down