diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index bba1d8c069c68..2b22f6a662d89 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -350,6 +350,14 @@ public Condition newCondition() { */ public abstract IndexResult index(Index index) throws IOException; + /** + * Add document index operation to Translog + * @param index operation to perform + * @return {@link IndexResult} containing updated translog location, version and + * document specific failures + */ + public abstract IndexResult addIndexOperationToTranslog(Index index) throws IOException; + /** * Perform document delete operation on the engine * @param delete operation to perform diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 6bef118e0b61f..6283f5634d4ba 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -1024,23 +1024,7 @@ public IndexResult index(Index index) throws IOException { } } if (index.origin().isFromTranslog() == false) { - final Translog.Location location; - if (indexResult.getResultType() == Result.Type.SUCCESS) { - location = translog.add(new Translog.Index(index, indexResult)); - } else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { - // if we have document failure, record it as a no-op in the translog and Lucene with the generated seq_no - final NoOp noOp = new NoOp( - indexResult.getSeqNo(), - index.primaryTerm(), - index.origin(), - index.startTime(), - indexResult.getFailure().toString() - ); - location = innerNoOp(noOp).getTranslogLocation(); - } else { - location = null; - } - indexResult.setTranslogLocation(location); + addIndexOperationToTranslog(index, indexResult); } if (plan.indexIntoLucene && indexResult.getResultType() == Result.Type.SUCCESS) { final Translog.Location translogLocation = trackTranslogLocation.get() ? indexResult.getTranslogLocation() : null; @@ -1075,6 +1059,44 @@ public IndexResult index(Index index) throws IOException { } } + @Override + public Engine.IndexResult addIndexOperationToTranslog(Index index) throws IOException { + try (Releasable ignored = versionMap.acquireLock(index.uid().bytes())) { + IndexingStrategy plan = indexingStrategyForOperation(index); + /** + * Matches the logic in {@link #indexIntoLucene(Index, IndexingStrategy)} + */ + IndexResult indexResult = new IndexResult( + plan.versionForIndexing, + index.primaryTerm(), + index.seqNo(), + plan.currentNotFoundOrDeleted + ); + addIndexOperationToTranslog(index, indexResult); + indexResult.setTook(System.nanoTime() - index.startTime()); + indexResult.freeze(); + return indexResult; + } + } + + private void addIndexOperationToTranslog(Index index, IndexResult indexResult) throws IOException { + Translog.Location location = null; + if (indexResult.getResultType() == Result.Type.SUCCESS) { + location = translog.add(new Translog.Index(index, indexResult)); + } else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { + // if we have document failure, record it as a no-op in the translog and Lucene with the generated seq_no + final NoOp noOp = new NoOp( + indexResult.getSeqNo(), + index.primaryTerm(), + index.origin(), + index.startTime(), + indexResult.getFailure().toString() + ); + location = innerNoOp(noOp).getTranslogLocation(); + } + indexResult.setTranslogLocation(location); + } + protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOException { assert assertNonPrimaryOrigin(index); // needs to maintain the auto_id timestamp in case this replica becomes primary diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java index 43fe10c217270..cd04cb9a7a54c 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java @@ -294,6 +294,12 @@ public IndexResult index(Index index) { throw new UnsupportedOperationException("indexing is not supported on a read-only engine"); } + @Override + public IndexResult addIndexOperationToTranslog(Index index) throws IOException { + assert false : "this should not be called"; + throw new UnsupportedOperationException("Translog operations are not supported on a read-only engine"); + } + @Override public DeleteResult delete(Delete delete) { assert false : "this should not be called"; diff --git a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java index c33adf3bcb558..337e0a93aa2f7 100644 --- a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java @@ -1656,6 +1656,15 @@ public void testUpdateWithFullyDeletedSegments() throws IOException { } } + public void testAddIndexOperationToTranslog() throws IOException { + try (Store store = createStore(); InternalEngine engine = createEngine(store, createTempDir())) { + ParsedDocument doc = createParsedDoc("1", null); + Engine.IndexResult result = engine.addIndexOperationToTranslog(indexForDoc(doc)); + assertEquals(Engine.Operation.TYPE.INDEX, result.getOperationType()); + assertNotNull(result.getTranslogLocation()); + } + } + public void testForceMergeWithSoftDeletesRetention() throws Exception { final long retainedExtraOps = randomLongBetween(0, 10); Settings.Builder settings = Settings.builder()