Skip to content

Commit

Permalink
POC for segment replication. (#2075)
Browse files Browse the repository at this point in the history
* Initial POC for segment replication.

In this POC replicas are configured as read only by not creating an indexwriter.
After primary shards refresh, a checkpoint is sent over the transport layer to replicas.
Once received, replicas fetch files in the checkpoint from the primary shard.
This initial commit ignores failover, retention leases, and shard allocation.

Signed-off-by: Marc Handalian <[email protected]>

* Remove bypass of fsync on primaries and force fsync on replicas.

This change will force an fsync on replicas when a new commit point is received.

Signed-off-by: Marc Handalian <[email protected]>

* Fix replicas from processing checkpoints from other indices.

Signed-off-by: Marc Handalian <[email protected]>

* Remove explicit fsync when every file is copied.

Signed-off-by: Marc Handalian <[email protected]>

* Fix recovery states to move to completed during intiial shard recovery and mark the shard as active.

With this change IndexShard.startRecovery will only set up a replica shard and mark
it as tracked with the primary.  We will then only start replication after the
primary has refreshed after performing the first operation.
This also avoids a condition when the initial recovery is trying to replicate
from a primary shard that has not performed any operations and waits indefinately for
a replica to catch up to the latest sequence number.  This change also ensures that
we are only ever performing one replication event at any given moment.

Signed-off-by: Marc Handalian <[email protected]>

* Ignore replication checkpoints if we are already up to the published checkpoint.

This change ensures we do not start a replication sequence if we already have the checkpoint.
This changes the checkpoint published from the primary to the latest processed checkpoint instead of the latest persisted.

Signed-off-by: Marc Handalian <[email protected]>

* Fix retention lease invariant in ReplicationTracker.

To satisfy this invariant, This change updates the TRACK_SHARD action to clone
the primary's retention lease and use it as the replicas.

Signed-off-by: Marc Handalian <[email protected]>

* Fix SegmentReplicationPrimaryService to wait until replica ShardRouting is setup.

Fixes to sleep the thread instead of incorrectly using a monitor.

Signed-off-by: Marc Handalian <[email protected]>

* Remove duplicate method to fetch local checkpoint.

Signed-off-by: Marc Handalian <[email protected]>

* Fix startup when replicas are not listed in primary's routing table.

This removes a wait in favor of throwing a retryable exception.

Signed-off-by: Marc Handalian <[email protected]>

* PR cleanup.

Renamed SegmentReplicationService -> SegmentReplicationReplicaService.
Removed if conditions in SyncedFlushService and TransportShardFlushAction.
Improved comments and documentation.

Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 authored Feb 22, 2022
1 parent 4822c28 commit 11a1f52
Show file tree
Hide file tree
Showing 48 changed files with 3,802 additions and 141 deletions.
3 changes: 3 additions & 0 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@
import org.opensearch.index.seqno.RetentionLeaseActions;
import org.opensearch.indices.SystemIndices;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.replication.checkpoint.PublishCheckpointAction;
import org.opensearch.indices.replication.checkpoint.TransportPublishCheckpointAction;
import org.opensearch.persistent.CompletionPersistentTaskAction;
import org.opensearch.persistent.RemovePersistentTaskAction;
import org.opensearch.persistent.StartPersistentTaskAction;
Expand Down Expand Up @@ -588,6 +590,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(SimulateTemplateAction.INSTANCE, TransportSimulateTemplateAction.class);
actions.register(ValidateQueryAction.INSTANCE, TransportValidateQueryAction.class);
actions.register(RefreshAction.INSTANCE, TransportRefreshAction.class);
actions.register(PublishCheckpointAction.INSTANCE, TransportPublishCheckpointAction.class);
actions.register(FlushAction.INSTANCE, TransportFlushAction.class);
actions.register(SyncedFlushAction.INSTANCE, TransportSyncedFlushAction.class);
actions.register(ForceMergeAction.INSTANCE, TransportForceMergeAction.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ public class RefreshResponse extends BroadcastResponse {
declareBroadcastFields(PARSER);
}

RefreshResponse(StreamInput in) throws IOException {
public RefreshResponse(StreamInput in) throws IOException {
super(in);
}

RefreshResponse(int totalShards, int successfulShards, int failedShards, List<DefaultShardOperationFailedException> shardFailures) {
public RefreshResponse(int totalShards, int successfulShards, int failedShards, List<DefaultShardOperationFailedException> shardFailures) {
super(totalShards, successfulShards, failedShards, shardFailures);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.cluster.metadata.IndexMetadata.APIBlock;
import org.opensearch.common.Nullable;
import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest;

/**
* Administrative actions/operations against indices.
Expand Down Expand Up @@ -405,6 +406,13 @@ public interface IndicesAdminClient extends OpenSearchClient {
*/
void refresh(RefreshRequest request, ActionListener<RefreshResponse> listener);

/**
* Publish the latest primary checkpoint to replica shards.
* @param request {@link PublishCheckpointRequest} The PublishCheckpointRequest
* @param listener A listener to be notified with a result
*/
void publishCheckpoint(PublishCheckpointRequest request, ActionListener<RefreshResponse> listener);

/**
* Explicitly refresh one or more indices (making the content indexed since the last refresh searchable).
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,8 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.indices.replication.checkpoint.PublishCheckpointAction;
import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest;
import org.opensearch.tasks.TaskId;
import org.opensearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -1632,6 +1634,11 @@ public void refresh(final RefreshRequest request, final ActionListener<RefreshRe
execute(RefreshAction.INSTANCE, request, listener);
}

@Override
public void publishCheckpoint(PublishCheckpointRequest request, ActionListener<RefreshResponse> listener) {
execute(PublishCheckpointAction.INSTANCE, request, listener);
}

@Override
public RefreshRequestBuilder prepareRefresh(String... indices) {
return new RefreshRequestBuilder(this, RefreshAction.INSTANCE).setIndices(indices);
Expand Down
8 changes: 5 additions & 3 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.TransportCheckpointPublisher;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
Expand Down Expand Up @@ -466,7 +467,8 @@ public IndexService newIndexService(
IndicesFieldDataCache indicesFieldDataCache,
NamedWriteableRegistry namedWriteableRegistry,
BooleanSupplier idFieldDataEnabled,
ValuesSourceRegistry valuesSourceRegistry
ValuesSourceRegistry valuesSourceRegistry,
TransportCheckpointPublisher checkpointPublisher
) throws IOException {
final IndexEventListener eventListener = freeze();
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
Expand Down Expand Up @@ -520,8 +522,8 @@ public IndexService newIndexService(
allowExpensiveQueries,
expressionResolver,
valuesSourceRegistry,
recoveryStateFactory
);
recoveryStateFactory,
checkpointPublisher);
success = true;
return indexService;
} finally {
Expand Down
15 changes: 10 additions & 5 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.TransportCheckpointPublisher;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
Expand Down Expand Up @@ -165,6 +166,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final IndexNameExpressionResolver expressionResolver;
private final Supplier<Sort> indexSortSupplier;
private final ValuesSourceRegistry valuesSourceRegistry;
private final TransportCheckpointPublisher checkpointPublisher;

public IndexService(
IndexSettings indexSettings,
Expand Down Expand Up @@ -195,8 +197,8 @@ public IndexService(
BooleanSupplier allowExpensiveQueries,
IndexNameExpressionResolver expressionResolver,
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory
) {
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory,
TransportCheckpointPublisher checkpointPublisher) {
super(indexSettings);
this.allowExpensiveQueries = allowExpensiveQueries;
this.indexSettings = indexSettings;
Expand All @@ -206,6 +208,7 @@ public IndexService(
this.circuitBreakerService = circuitBreakerService;
this.expressionResolver = expressionResolver;
this.valuesSourceRegistry = valuesSourceRegistry;
this.checkpointPublisher = checkpointPublisher;
if (needsMapperService(indexSettings, indexCreationContext)) {
assert indexAnalyzers != null;
this.mapperService = new MapperService(
Expand Down Expand Up @@ -520,8 +523,8 @@ public synchronized IndexShard createShard(
indexingOperationListeners,
() -> globalCheckpointSyncer.accept(shardId),
retentionLeaseSyncer,
circuitBreakerService
);
circuitBreakerService,
checkpointPublisher);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();
Expand Down Expand Up @@ -911,7 +914,9 @@ private void maybeRefreshEngine(boolean force) {
if (indexSettings.getRefreshInterval().millis() > 0 || force) {
for (IndexShard shard : this.shards.values()) {
try {
shard.scheduledRefresh();
if (shard.routingEntry().primary()) {
shard.scheduledRefresh();
}
} catch (IndexShardClosedException | AlreadyClosedException ex) {
// fine - continue;
}
Expand Down
40 changes: 40 additions & 0 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,10 @@ public void verifyEngineBeforeIndexClosing() throws IllegalStateException {
}
}

public void updateCurrentInfos(byte[] infosBytes, long gen, long seqNo) throws IOException {};

public long getProcessedLocalCheckpoint() { return 0L; };

/**
* A throttling class that can be activated, causing the
* {@code acquireThrottle} method to block on a lock when throttling
Expand Down Expand Up @@ -1196,6 +1200,20 @@ public abstract void forceMerge(
*/
public abstract IndexCommitRef acquireLastIndexCommit(boolean flushFirst) throws EngineException;

/**
* Fetch a snapshot of the latest SegmentInfos from the engine and ensure that segment files are retained in the directory
* until closed.
* @return {@link SegmentInfosRef} - A ref to segmentInfos that must be closed for segment files to be deleted.
*/
public SegmentInfosRef getLatestSegmentInfosSafe() { return null; };

/**
* Fetch a snapshot of the latest SegmentInfos from the engine.
* This method does not ensure that segment files are retained in the directory.
* @return {@link SegmentInfos}
*/
public SegmentInfos getLatestSegmentInfos() { return null; };

/**
* Snapshots the most recent safe index commit from the engine.
*/
Expand Down Expand Up @@ -1999,6 +2017,28 @@ public IndexCommit getIndexCommit() {
}
}

public static class SegmentInfosRef implements Closeable {
private final AtomicBoolean closed = new AtomicBoolean();
private final CheckedRunnable<IOException> onClose;
private final SegmentInfos segmentInfos;

public SegmentInfosRef(SegmentInfos segmentInfos, CheckedRunnable<IOException> onClose) {
this.segmentInfos = segmentInfos;
this.onClose = onClose;
}

@Override
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
onClose.run();
}
}

public SegmentInfos getSegmentInfos() {
return segmentInfos;
}
}

public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) {

}
Expand Down
14 changes: 10 additions & 4 deletions server/src/main/java/org/opensearch/index/engine/EngineConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public final class EngineConfig {
private final CircuitBreakerService circuitBreakerService;
private final LongSupplier globalCheckpointSupplier;
private final Supplier<RetentionLeases> retentionLeasesSupplier;
private boolean isPrimary;

/**
* A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been
Expand Down Expand Up @@ -169,8 +170,8 @@ public EngineConfig(
LongSupplier globalCheckpointSupplier,
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
TombstoneDocSupplier tombstoneDocSupplier
) {
TombstoneDocSupplier tombstoneDocSupplier,
boolean isPrimary) {
this(
shardId,
threadPool,
Expand All @@ -193,7 +194,7 @@ public EngineConfig(
circuitBreakerService,
globalCheckpointSupplier,
retentionLeasesSupplier,
primaryTermSupplier,
isPrimary, primaryTermSupplier,
tombstoneDocSupplier
);
}
Expand Down Expand Up @@ -223,7 +224,7 @@ public EngineConfig(
CircuitBreakerService circuitBreakerService,
LongSupplier globalCheckpointSupplier,
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
boolean isPrimary, LongSupplier primaryTermSupplier,
TombstoneDocSupplier tombstoneDocSupplier
) {
this.shardId = shardId;
Expand All @@ -237,6 +238,7 @@ public EngineConfig(
this.codecService = codecService;
this.eventListener = eventListener;
codecName = indexSettings.getValue(INDEX_CODEC_SETTING);
this.isPrimary = isPrimary;
// We need to make the indexing buffer for this shard at least as large
// as the amount of memory that is available for all engines on the
// local node so that decisions to flush segments to disk are made by
Expand Down Expand Up @@ -458,6 +460,10 @@ public LongSupplier getPrimaryTermSupplier() {
return primaryTermSupplier;
}

public boolean isPrimary() {
return isPrimary;
}

/**
* A supplier supplies tombstone documents which will be used in soft-update methods.
* The returned document consists only _uid, _seqno, _term and _version fields; other metadata fields are excluded.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ public EngineConfig newEngineConfig(
LongSupplier globalCheckpointSupplier,
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
EngineConfig.TombstoneDocSupplier tombstoneDocSupplier
EngineConfig.TombstoneDocSupplier tombstoneDocSupplier,
Boolean isPrimary
) {

return new EngineConfig(
Expand All @@ -137,6 +138,7 @@ public EngineConfig newEngineConfig(
circuitBreakerService,
globalCheckpointSupplier,
retentionLeasesSupplier,
isPrimary,
primaryTermSupplier,
tombstoneDocSupplier
);
Expand Down
Loading

0 comments on commit 11a1f52

Please sign in to comment.