Skip to content

Commit

Permalink
Use SegmentReplicationTarget test to validate non-active on-disk file…
Browse files Browse the repository at this point in the history
…s are reused for replication

Signed-off-by: Suraj Singh <[email protected]>
  • Loading branch information
dreamer-89 committed Jan 6, 2024
1 parent 8440468 commit 8b69e04
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -366,97 +366,6 @@ public void testPrimaryRestart() throws Exception {
}
}

/**
* This test validates that unreferenced on disk file are ignored while requesting files from replication source to
* prevent FileAlreadyExistsException. It does so by only copying files in first round of segment replication without
* committing locally so that in next round of segment replication those files are not considered for download again
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/10885")
public void testSegRepSucceedsOnPreviousCopiedFiles() throws Exception {
try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) {
shards.startAll();
IndexShard primary = shards.getPrimary();
final IndexShard replica = shards.getReplicas().get(0);

shards.indexDocs(10);
primary.refresh("Test");

final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class);
final SegmentReplicationTargetService targetService = newTargetService(sourceFactory);
when(sourceFactory.get(any())).thenReturn(
getRemoteStoreReplicationSource(replica, () -> { throw new RuntimeException("Simulated"); })
);
CountDownLatch latch = new CountDownLatch(1);

// Start first round of segment replication. This should fail with simulated error but with replica having
// files in its local store but not in active reader.
final SegmentReplicationTarget target = targetService.startReplication(
replica,
primary.getLatestReplicationCheckpoint(),
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
latch.countDown();
Assert.fail("Replication should fail with simulated error");
}

@Override
public void onReplicationFailure(
SegmentReplicationState state,
ReplicationFailedException e,
boolean sendShardFailure
) {
latch.countDown();
assertFalse(sendShardFailure);
logger.error("Replication error", e);
}
}
);
latch.await();
Set<String> onDiskFiles = new HashSet<>(Arrays.asList(replica.store().directory().listAll()));
onDiskFiles.removeIf(name -> EXCLUDE_FILES.contains(name) || name.startsWith(IndexFileNames.SEGMENTS));
List<String> activeFiles = replica.getSegmentMetadataMap()
.values()
.stream()
.map(metadata -> metadata.name())
.collect(Collectors.toList());
assertTrue("Files should not be committed", activeFiles.isEmpty());
assertEquals("Files should be copied to disk", false, onDiskFiles.isEmpty());
assertEquals(target.state().getStage(), SegmentReplicationState.Stage.GET_FILES);

// Start next round of segment replication and not throwing exception resulting in commit on replica
when(sourceFactory.get(any())).thenReturn(getRemoteStoreReplicationSource(replica, () -> {}));
CountDownLatch waitForSecondRound = new CountDownLatch(1);
final SegmentReplicationTarget newTarget = targetService.startReplication(
replica,
primary.getLatestReplicationCheckpoint(),
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
waitForSecondRound.countDown();
}

@Override
public void onReplicationFailure(
SegmentReplicationState state,
ReplicationFailedException e,
boolean sendShardFailure
) {
waitForSecondRound.countDown();
logger.error("Replication error", e);
Assert.fail("Replication should not fail");
}
}
);
waitForSecondRound.await();
assertEquals(newTarget.state().getStage(), SegmentReplicationState.Stage.DONE);
activeFiles = replica.getSegmentMetadataMap().values().stream().map(metadata -> metadata.name()).collect(Collectors.toList());
assertTrue("Replica should have consistent disk & reader", activeFiles.containsAll(onDiskFiles));
shards.removeReplica(replica);
closeShards(replica);
}
}

/**
* This test validates that local non-readable (corrupt, partially) on disk are deleted vs failing the
* replication event. This test mimics local files (not referenced by reader) by throwing exception post file copy and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,16 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.replication.TestReplicationSource;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardTestCase;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.store.StoreTests;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.DummyShardLock;
import org.opensearch.test.IndexSettingsModule;
import org.junit.Assert;

import java.io.FileNotFoundException;
Expand Down Expand Up @@ -81,11 +76,6 @@ public class SegmentReplicationTargetTests extends IndexShardTestCase {

private static final Map<String, StoreFileMetadata> SI_SNAPSHOT_DIFFERENT = Map.of(SEGMENT_FILE_DIFF.name(), SEGMENT_FILE_DIFF);

private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings(
"index",
Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT).build()
);

private SegmentInfos testSegmentInfos;

@Override
Expand Down Expand Up @@ -441,7 +431,10 @@ public void test_MissingFiles_NotCausingFailure() throws IOException {
// Generate a list of MetadataSnapshot containing two elements. The second snapshot contains extra files
// generated due to delete operations. These two snapshots can then be used in test to mock the primary shard
// snapshot (2nd element which contains delete operations) and replica's existing snapshot (1st element).
List<Store.MetadataSnapshot> storeMetadataSnapshots = generateStoreMetadataSnapshot(docCount);
List<Store.MetadataSnapshot> storeMetadataSnapshots = generateStoreMetadataSnapshot(docCount, spyIndexShard);
// Delete on-disk files so that they are not considered for file diff
deleteContent(spyIndexShard.store().directory());
spyIndexShard.store().close();

SegmentReplicationSource segrepSource = new TestReplicationSource() {
@Override
Expand Down Expand Up @@ -486,14 +479,72 @@ public void onFailure(Exception e) {
});
}

/**
* This tests ensures that on-disk files on replica are taken into consideration while evaluating the files diff
* from primary. The test mocks the files referred by active reader to a smaller subset so that logic to filter
* out on-disk files be exercised.
* @throws IOException if an indexing operation fails or segment replication fails
*/
public void test_OnDiskFiles_ReusedForReplication() throws IOException {
int docCount = 1 + random().nextInt(10);
// Generate a list of MetadataSnapshot containing two elements. The second snapshot contains extra files
// generated due to delete operations. These two snapshots can then be used in test to mock the primary shard
// snapshot (2nd element which contains delete operations) and replica's existing snapshot (1st element).
List<Store.MetadataSnapshot> storeMetadataSnapshots = generateStoreMetadataSnapshot(docCount, spyIndexShard);

SegmentReplicationSource segrepSource = new TestReplicationSource() {
@Override
public void getCheckpointMetadata(
long replicationId,
ReplicationCheckpoint checkpoint,
ActionListener<CheckpointInfoResponse> listener
) {
listener.onResponse(new CheckpointInfoResponse(checkpoint, storeMetadataSnapshots.get(1).asMap(), buffer.toArrayCopy()));
}

@Override
public void getSegmentFiles(
long replicationId,
ReplicationCheckpoint checkpoint,
List<StoreFileMetadata> filesToFetch,
IndexShard indexShard,
BiConsumer<String, Long> fileProgressTracker,
ActionListener<GetSegmentFilesResponse> listener
) {
// No files should be requested from replication source
assertEquals(0, filesToFetch.size());
listener.onResponse(new GetSegmentFilesResponse(filesToFetch));
}
};

segrepTarget = new SegmentReplicationTarget(spyIndexShard, repCheckpoint, segrepSource, mock(
SegmentReplicationTargetService.SegmentReplicationListener.class
));
// Mask the files returned by active reader. This is needed so that logic to filter out on disk is exercised
when(spyIndexShard.getSegmentMetadataMap()).thenReturn(storeMetadataSnapshots.get(0).asMap());
segrepTarget.startReplication(new ActionListener<Void>() {
@Override
public void onResponse(Void replicationResponse) {
segrepTarget.markAsDone();
}

@Override
public void onFailure(Exception e) {
Assert.fail();
}
});
}


/**
* Generates a list of Store.MetadataSnapshot with two elements where second snapshot has extra files due to delete
* operation. A list of snapshots is returned so that identical files have same checksum.
* @param docCount the number of documents to index in the first snapshot
* @param shard The IndexShard object to use for writing
* @return a list of Store.MetadataSnapshot with two elements where second snapshot has extra files due to delete
* @throws IOException if one of the indexing operations fails
*/
private List<Store.MetadataSnapshot> generateStoreMetadataSnapshot(int docCount) throws IOException {
private List<Store.MetadataSnapshot> generateStoreMetadataSnapshot(int docCount, IndexShard shard) throws IOException {
List<Document> docList = new ArrayList<>();
for (int i = 0; i < docCount; i++) {
Document document = new Document();
Expand All @@ -507,8 +558,7 @@ private List<Store.MetadataSnapshot> generateStoreMetadataSnapshot(int docCount)
IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random)).setCodec(TestUtil.getDefaultCodec());
iwc.setMergePolicy(NoMergePolicy.INSTANCE);
iwc.setUseCompoundFile(true);
final ShardId shardId = new ShardId("index", "_na_", 1);
Store store = new Store(shardId, INDEX_SETTINGS, StoreTests.newDirectory(random()), new DummyShardLock(shardId));
Store store = shard.store();
IndexWriter writer = new IndexWriter(store.directory(), iwc);
for (Document d : docList) {
writer.addDocument(d);
Expand All @@ -519,7 +569,6 @@ private List<Store.MetadataSnapshot> generateStoreMetadataSnapshot(int docCount)
writer.deleteDocuments(new Term("id", Integer.toString(random().nextInt(docCount))));
writer.commit();
Store.MetadataSnapshot storeMetadataWithDeletes = store.getMetadata();
deleteContent(store.directory());
writer.close();
store.close();
return Arrays.asList(storeMetadata, storeMetadataWithDeletes);
Expand Down

0 comments on commit 8b69e04

Please sign in to comment.