Skip to content

Commit

Permalink
More changes to derive source
Browse files Browse the repository at this point in the history
  • Loading branch information
mgodwan committed Sep 30, 2024
1 parent 5082b0e commit 76bc64e
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 20 deletions.
6 changes: 5 additions & 1 deletion .idea/runConfigurations/Debug_OpenSearch.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,8 @@ protected void closeInternal() {
recoverySettings,
remoteStoreSettings,
seedRemote,
discoveryNodes
discoveryNodes,
indexFieldData
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.lucene.util.Accountable;
import org.opensearch.ExceptionsHelper;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.core.index.shard.ShardId;
Expand All @@ -59,6 +60,7 @@
*
* @opensearch.internal
*/
@PublicApi(since = "1.0.0")
public class IndexFieldDataService extends AbstractIndexComponent implements Closeable {
public static final String FIELDDATA_CACHE_VALUE_NODE = "node";
public static final String FIELDDATA_CACHE_KEY = "index.fielddata.cache";
Expand Down
62 changes: 46 additions & 16 deletions server/src/main/java/org/opensearch/index/get/ShardGetService.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,22 @@
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.metrics.MeanMetric;
import org.opensearch.common.util.set.Sets;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.common.xcontent.support.XContentMapValues;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.VersionType;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.TranslogLeafReader;
import org.opensearch.index.fielddata.IndexFieldDataCache;
import org.opensearch.index.fielddata.IndexFieldDataService;
import org.opensearch.index.fielddata.LeafNumericFieldData;
import org.opensearch.index.fielddata.SortedNumericDoubleValues;
import org.opensearch.index.fieldvisitor.CustomFieldsVisitor;
import org.opensearch.index.fieldvisitor.FieldsVisitor;
import org.opensearch.index.mapper.DocumentMapper;
Expand All @@ -80,14 +87,18 @@
import org.opensearch.index.mapper.Uid;
import org.opensearch.index.shard.AbstractIndexShardComponent;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.fetch.subphase.FetchSourceContext;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
Expand Down Expand Up @@ -291,12 +302,6 @@ private GetResult innerGetLoadFromStoredFields(
}
source = fieldVisitor.source();

try {
Map<String, Object> sourceAsMap = buildUsingDocValues(docIdAndVersion.docId, docIdAndVersion.reader, mapperService);
} catch (IOException ex) {
throw new RuntimeException(ex);
}

// in case we read from translog, some extra steps are needed to make _source consistent and to load stored fields
if (get.isFromTranslog()) {
// Fast path: if only asked for the source or stored fields that have been already provided by TranslogLeafReader,
Expand Down Expand Up @@ -428,6 +433,16 @@ private GetResult innerGetLoadFromStoredFields(
}
}

try {
Map<String, Object> sourceAsMap = buildUsingDocValues(docIdAndVersion.docId, docIdAndVersion.reader, mapperService, indexShard);
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
builder.map(sourceAsMap);
source = BytesReference.bytes(builder);
}
} catch (IOException ex) {
throw new RuntimeException(ex);
}

return new GetResult(
shardId.getIndexName(),
id,
Expand All @@ -449,7 +464,7 @@ private static FieldsVisitor buildFieldsVisitors(String[] fields, FetchSourceCon
return new CustomFieldsVisitor(Sets.newHashSet(fields), fetchSourceContext.fetchSource());
}

private static Map<String, Object> buildUsingDocValues(int docId, LeafReader reader, MapperService mapperService) throws IOException {
private static Map<String, Object> buildUsingDocValues(int docId, LeafReader reader, MapperService mapperService, IndexShard indexShard) throws IOException {
Map<String, Object> docValues = new HashMap<>();
for (Mapper mapper: mapperService.documentMapper().mappers()) {
if (mapper instanceof MetadataFieldMapper) {
Expand All @@ -461,6 +476,7 @@ private static Map<String, Object> buildUsingDocValues(int docId, LeafReader rea
if (fieldMapper.fieldType().hasDocValues()) {
String fieldName = fieldMapper.name();
FieldInfo fieldInfo = reader.getFieldInfos().fieldInfo(fieldName);
DocValueFormat format = fieldMapper.fieldType().docValueFormat(null, null);
if (fieldInfo != null) {
switch (fieldInfo.getDocValuesType()) {
case SORTED_SET:
Expand All @@ -471,26 +487,40 @@ private static Map<String, Object> buildUsingDocValues(int docId, LeafReader rea
values[i] = dv.lookupOrd(dv.nextOrd());
}
if (values.length > 1) {
docValues.put(fieldName, values);
docValues.put(fieldName, Arrays.stream(values).map(format::format).collect(Collectors.toList()));
} else {
docValues.put(fieldName, values[0]);
docValues.put(fieldName, format.format(values[0]));
}
}
break;
case SORTED_NUMERIC:
SortedNumericDocValues sndv = reader.getSortedNumericDocValues(fieldName);
if (sndv.advanceExact(docId)) {
Long[] values = new Long[sndv.docValueCount()];
// if (sndv.advanceExact(docId)) {
// Long[] values = new Long[sndv.docValueCount()];
// for (int i = 0; i < sndv.docValueCount(); i++) {
// values[i] = sndv.nextValue();
// }
// if (values.length > 1) {
// docValues.put(fieldName, Arrays.stream(values).map(format::format).collect(Collectors.toList()));
// } else {
// docValues.put(fieldName, format.format(values[0]));
// }
// }

SortedNumericDoubleValues doubleValues = ((LeafNumericFieldData) indexShard.indexFieldDataService().getForField(fieldMapper.fieldType(), "", () -> null)
.load(reader.getContext())).getDoubleValues();
if (doubleValues.advanceExact(docId)) {
double[] vals = new double[doubleValues.docValueCount()];
for (int i = 0; i < sndv.docValueCount(); i++) {
fieldMapper.fieldType().fielddataBuilder()
values[i] = sndv.nextValue();
vals[i] = doubleValues.nextValue();
}
if (values.length > 1) {
docValues.put(fieldName, values);
if (vals.length > 1) {
docValues.put(fieldName, vals);
} else {
docValues.put(fieldName, values[0]);
docValues.put(fieldName, vals[0]);
}
}

break;
}
}
Expand Down
12 changes: 10 additions & 2 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
import org.opensearch.index.engine.Segment;
import org.opensearch.index.engine.SegmentsStats;
import org.opensearch.index.fielddata.FieldDataStats;
import org.opensearch.index.fielddata.IndexFieldDataService;
import org.opensearch.index.fielddata.ShardFieldData;
import org.opensearch.index.flush.FlushStats;
import org.opensearch.index.get.GetStats;
Expand Down Expand Up @@ -361,6 +362,7 @@ Runnable getGlobalCheckpointSyncer() {
*/
private final ShardMigrationState shardMigrationState;
private DiscoveryNodes discoveryNodes;
private final IndexFieldDataService indexFieldDataService;

public IndexShard(
final ShardRouting shardRouting,
Expand Down Expand Up @@ -391,8 +393,9 @@ public IndexShard(
final RecoverySettings recoverySettings,
final RemoteStoreSettings remoteStoreSettings,
boolean seedRemote,
final DiscoveryNodes discoveryNodes
) throws IOException {
final DiscoveryNodes discoveryNodes,
final IndexFieldDataService indexFieldDataService
) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
this.shardRouting = shardRouting;
Expand Down Expand Up @@ -493,6 +496,7 @@ public boolean shouldCache(Query query) {
this.fileDownloader = new RemoteStoreFileDownloader(shardRouting.shardId(), threadPool, recoverySettings);
this.shardMigrationState = getShardMigrationState(indexSettings, seedRemote);
this.discoveryNodes = discoveryNodes;
this.indexFieldDataService = indexFieldDataService;
}

public ThreadPool getThreadPool() {
Expand All @@ -513,6 +517,10 @@ public boolean shouldSeedRemoteStore() {
return shardMigrationState == REMOTE_MIGRATING_UNSEEDED;
}

public IndexFieldDataService indexFieldDataService() {
return indexFieldDataService;
}

/**
* To be delegated to {@link ReplicationTracker} so that relevant remote store based
* operations can be ignored during engine migration
Expand Down

0 comments on commit 76bc64e

Please sign in to comment.