Skip to content

Commit

Permalink
[NOCOMMIT] Brainstorming w/ Yupeng for pull-based ingestion
Browse files Browse the repository at this point in the history
  • Loading branch information
msfroh committed Oct 23, 2024
1 parent 9dd1a59 commit cca234b
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ public class CreateIndexRequest extends AcknowledgedRequest<CreateIndexRequest>
public static final ParseField SETTINGS = new ParseField("settings");
public static final ParseField ALIASES = new ParseField("aliases");
public static final ParseField CONTEXT = new ParseField("context");
public static final ParseField INGESTION_SOURCE = new ParseField("ingestion_source");


private String cause = "";

Expand Down Expand Up @@ -533,6 +535,8 @@ public CreateIndexRequest source(Map<String, ?> source, DeprecationHandler depre
aliases((Map<String, Object>) entry.getValue());
} else if (CONTEXT.match(name, deprecationHandler)) {
context((Map<String, Object>) entry.getValue());
} else if (INGESTION_SOURCE.match(name, deprecationHandler)) {

} else {
throw new OpenSearchParseException("unknown key [{}] for create index", name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.indices.IndicesService;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand Down Expand Up @@ -87,6 +88,12 @@ protected GetMappingsResponse read(StreamInput in) throws IOException {
return new GetMappingsResponse(in);
}


@Override
protected void doExecute(Task task, GetMappingsRequest request, ActionListener<GetMappingsResponse> listener) {
super.doExecute(task, request, listener);
}

@Override
protected void doClusterManagerOperation(
final GetMappingsRequest request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,11 @@ public static APIBlock readFrom(StreamInput input) throws IOException {

private final long[] primaryTerms;

public CompressedXContent getIngestionSourceDefinition() {
return ingestionSourceDefinition;
}

private final CompressedXContent ingestionSourceDefinition;
private final State state;

private final Map<String, AliasMetadata> aliases;
Expand Down Expand Up @@ -724,7 +729,7 @@ private IndexMetadata(
final State state,
final int numberOfShards,
final int numberOfReplicas,
final int numberOfSearchOnlyReplicas,
final int numberOfSearchOnlyReplicas, CompressedXContent ingestionSourceDefinition,
final Settings settings,
final Map<String, MappingMetadata> mappings,
final Map<String, AliasMetadata> aliases,
Expand All @@ -747,6 +752,7 @@ private IndexMetadata(

this.index = index;
this.version = version;
this.ingestionSourceDefinition = ingestionSourceDefinition;
assert mappingVersion >= 0 : mappingVersion;
this.mappingVersion = mappingVersion;
assert settingsVersion >= 0 : settingsVersion;
Expand Down Expand Up @@ -1747,7 +1753,7 @@ public IndexMetadata build() {
state,
numberOfShards,
numberOfReplicas,
numberOfSearchReplicas,
numberOfSearchReplicas, ,
tmpSettings,
mappings,
tmpAliases,
Expand All @@ -1765,8 +1771,7 @@ public IndexMetadata build() {
rolloverInfos,
isSystem,
indexTotalShardsPerNodeLimit,
context
);
context);
}

public static void toXContent(IndexMetadata indexMetadata, XContentBuilder builder, ToXContent.Params params) throws IOException {
Expand Down
32 changes: 32 additions & 0 deletions server/src/main/java/org/opensearch/index/IngestionSource.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.index.engine.Engine;

import java.util.List;

public interface IngestionSource<T extends IngestionSource.SourcePointer> {
interface SourcePointer {
byte[] serialize();
}

class ReadResult<T> {
T nextPointer;
Engine.Operation indexOperation;
}

T createSourcePointer(int shardNum);

List<ReadResult<T>> readNext(T pointer, int maxOperations);

T deserialize(byte[] serializedPointer);

}
18 changes: 18 additions & 0 deletions server/src/main/java/org/opensearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.IngestionSource;
import org.opensearch.index.analysis.AnalysisRegistry;
import org.opensearch.index.cache.request.ShardRequestCache;
import org.opensearch.index.compositeindex.CompositeIndexSettings;
Expand Down Expand Up @@ -155,6 +156,7 @@
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.plugins.IngestionSourcePlugin;
import org.opensearch.plugins.PluginsService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
Expand Down Expand Up @@ -205,6 +207,7 @@
import static org.opensearch.index.IndexService.IndexCreationContext.CREATE_INDEX;
import static org.opensearch.index.IndexService.IndexCreationContext.METADATA_VERIFICATION;
import static org.opensearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder;
import static org.opensearch.index.query.Rewriteable.rewrite;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent;
import static org.opensearch.search.SearchService.ALLOW_EXPENSIVE_QUERIES;

Expand Down Expand Up @@ -360,6 +363,7 @@ public class IndicesService extends AbstractLifecycleComponent
private final FileCache fileCache;
private final CompositeIndexSettings compositeIndexSettings;
private final Consumer<IndexShard> replicator;
private final Map<String, IngestionSourcePlugin.IngestionSourceFactory<?>> ingestionSourceFactories;

@Override
protected void doStart() {
Expand Down Expand Up @@ -507,6 +511,15 @@ protected void closeInternal() {
this.compositeIndexSettings = compositeIndexSettings;
this.fileCache = fileCache;
this.replicator = replicator;
this.ingestionSourceFactories = readIngestionSourceFactories(pluginsService.filterPlugins(IngestionSourcePlugin.class));
}

private static Map<String, IngestionSourcePlugin.IngestionSourceFactory<?>> readIngestionSourceFactories(List<IngestionSourcePlugin> ingestionSourcePlugins) {
Map<String, IngestionSourcePlugin.IngestionSourceFactory<?>> factories = new HashMap<>();
for (IngestionSourcePlugin plugin : ingestionSourcePlugins) {
factories.putAll(plugin.getIngestionSourceFactories());
}
return factories;
}

public IndicesService(
Expand Down Expand Up @@ -1000,6 +1013,11 @@ private EngineFactory getEngineFactory(final IndexSettings idxSettings) {
return NoOpEngine::new;
}

if (indexMetadata.getIngestionSourceDefinition() != null) {
IngestionSourcePlugin.IngestionSourceFactory<?> ingestionSourceFactory =
return new PullBasedEngineFactory(...);
}

final List<Optional<EngineFactory>> engineFactories = engineFactoryProviders.stream()
.map(engineFactoryProvider -> engineFactoryProvider.apply(idxSettings))
.filter(maybe -> Objects.requireNonNull(maybe).isPresent())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugins;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.index.IngestionSource;

import java.util.Collections;
import java.util.Map;

public interface IngestionSourcePlugin {

interface IngestionSourceFactory<T extends IngestionSource<?>> {
T create(IndexMetadata indexMetadata);
}

default Map<String, IngestionSourceFactory<? extends IngestionSource<?>>> getIngestionSourceFactories() {
return Collections.emptyMap();
}
}

0 comments on commit cca234b

Please sign in to comment.