diff --git a/src/main/java/org/opensearch/knn/index/KNNSettings.java b/src/main/java/org/opensearch/knn/index/KNNSettings.java index 572c9220e..caf5c6625 100644 --- a/src/main/java/org/opensearch/knn/index/KNNSettings.java +++ b/src/main/java/org/opensearch/knn/index/KNNSettings.java @@ -27,12 +27,7 @@ import org.opensearch.monitor.os.OsProbe; import java.security.InvalidParameterException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.util.*; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -77,6 +72,7 @@ public class KNNSettings { public static final String MODEL_CACHE_SIZE_LIMIT = "knn.model.cache.size.limit"; public static final String ADVANCED_FILTERED_EXACT_SEARCH_THRESHOLD = "index.knn.advanced.filtered_exact_search_threshold"; public static final String KNN_FAISS_AVX2_DISABLED = "knn.faiss.avx2.disabled"; + public static final String KNN_SYNTHETIC_SOURCE_ENABLED = "knn.synthetic_source.enabled"; /** * Default setting values @@ -234,6 +230,12 @@ public class KNNSettings { public static final Setting KNN_FAISS_AVX2_DISABLED_SETTING = Setting.boolSetting(KNN_FAISS_AVX2_DISABLED, false, NodeScope); + public static final Setting KNN_SYNTHETIC_SOURCE_ENABLED_SETTING = Setting.boolSetting( + KNN_SYNTHETIC_SOURCE_ENABLED, + false, + IndexScope + ); + /** * Dynamic settings */ @@ -347,6 +349,10 @@ private Setting getSetting(String key) { return KNN_FAISS_AVX2_DISABLED_SETTING; } + if (KNN_SYNTHETIC_SOURCE_ENABLED.equals(key)) { + return KNN_SYNTHETIC_SOURCE_ENABLED_SETTING; + } + throw new IllegalArgumentException("Cannot find setting by key [" + key + "]"); } @@ -364,7 +370,8 @@ public List> getSettings() { MODEL_INDEX_NUMBER_OF_REPLICAS_SETTING, MODEL_CACHE_SIZE_LIMIT_SETTING, ADVANCED_FILTERED_EXACT_SEARCH_THRESHOLD_SETTING, - KNN_FAISS_AVX2_DISABLED_SETTING + KNN_FAISS_AVX2_DISABLED_SETTING, + KNN_SYNTHETIC_SOURCE_ENABLED_SETTING ); return Stream.concat(settings.stream(), dynamicCacheSettings.values().stream()).collect(Collectors.toList()); } @@ -397,6 +404,14 @@ public static Integer getFilteredExactSearchThreshold(final String indexName) { .getAsInt(ADVANCED_FILTERED_EXACT_SEARCH_THRESHOLD, ADVANCED_FILTERED_EXACT_SEARCH_THRESHOLD_DEFAULT_VALUE); } + public static boolean isKNNSyntheticEnabled(final String indexName) { + return KNNSettings.state().clusterService.state() + .getMetadata() + .index(indexName) + .getSettings() + .getAsBoolean(KNN_SYNTHETIC_SOURCE_ENABLED, false); + } + public void initialize(Client client, ClusterService clusterService) { this.client = client; this.clusterService = clusterService; diff --git a/src/main/java/org/opensearch/knn/index/KNNVectorDVLeafFieldData.java b/src/main/java/org/opensearch/knn/index/KNNVectorDVLeafFieldData.java index b28ce5ee2..ea2389d39 100644 --- a/src/main/java/org/opensearch/knn/index/KNNVectorDVLeafFieldData.java +++ b/src/main/java/org/opensearch/knn/index/KNNVectorDVLeafFieldData.java @@ -73,7 +73,7 @@ public boolean advanceExact(int doc) throws IOException { @Override public int docValueCount() { - return docExists ? floats.length : 0; + return docExists ? floats.length : 1; } @Override diff --git a/src/main/java/org/opensearch/knn/index/fetch/KNNFetchSubPhase.java b/src/main/java/org/opensearch/knn/index/fetch/KNNFetchSubPhase.java index bf9a50cb2..c1d8f082b 100644 --- a/src/main/java/org/opensearch/knn/index/fetch/KNNFetchSubPhase.java +++ b/src/main/java/org/opensearch/knn/index/fetch/KNNFetchSubPhase.java @@ -16,13 +16,15 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.index.LeafReaderContext; -import org.opensearch.common.document.DocumentField; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.index.mapper.DocValueFetcher; import org.opensearch.index.mapper.MappedFieldType; import org.opensearch.index.mapper.MapperService; +import org.opensearch.index.mapper.ValueFetcher; +import org.opensearch.knn.index.KNNSettings; import org.opensearch.knn.index.mapper.KNNVectorFieldMapper; import org.opensearch.search.SearchHit; import org.opensearch.search.fetch.FetchContext; @@ -30,18 +32,37 @@ import org.opensearch.search.fetch.FetchSubPhaseProcessor; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import static org.opensearch.knn.common.KNNConstants.BYTES_PER_KILOBYTES; +/** + * Fetch sub phase which pull data from doc values. + * and fulfill the value into source map + */ public class KNNFetchSubPhase implements FetchSubPhase { private static Logger logger = LogManager.getLogger(KNNFetchSubPhase.class); - @Override public FetchSubPhaseProcessor getProcessor(FetchContext fetchContext) throws IOException { - return null; + if (!KNNSettings.isKNNSyntheticEnabled(fetchContext.getIndexName())) { + return null; + } + MapperService mapperService = fetchContext.mapperService(); + + List fields = new ArrayList<>(); + for (MappedFieldType mappedFieldType : mapperService.fieldTypes()) { + if (mappedFieldType != null && mappedFieldType instanceof KNNVectorFieldMapper.KNNVectorFieldType) { + String fieldName = mappedFieldType.name(); + ValueFetcher fetcher = new DocValueFetcher(mappedFieldType.docValueFormat(null, null), + fetchContext.searchLookup().doc().getForField(mappedFieldType)); + fields.add(new DocValueField(fieldName, fetcher)); + } + } + return new KNNFetchSubPhaseProcessor(fetchContext, fields); } @AllArgsConstructor @@ -49,33 +70,50 @@ public FetchSubPhaseProcessor getProcessor(FetchContext fetchContext) throws IOE class KNNFetchSubPhaseProcessor implements FetchSubPhaseProcessor { private final FetchContext fetchContext; - + private final List fields; @Override - public void setNextReader(LeafReaderContext leafReaderContext) throws IOException { - + public void setNextReader(LeafReaderContext readerContext) throws IOException { + for (DocValueField f : fields) { + f.fetcher.setNextReader(readerContext); + } } @Override public void process(HitContext hitContext) throws IOException { - SearchHit hit = hitContext.hit(); - Map fields = hit.getFields(); MapperService mapperService = fetchContext.mapperService(); + final boolean hasNested = mapperService.hasNested(); + SearchHit hit = hitContext.hit(); Map maps = hit.getSourceAsMap(); + if (maps == null) { + //when source is disabled, return + return; + } - for (Map.Entry fieldsEntry : fields.entrySet()) { - String fieldName = fieldsEntry.getKey(); - MappedFieldType mappedFieldType = mapperService.fieldType(fieldName); - if (mappedFieldType != null && mappedFieldType instanceof KNNVectorFieldMapper.KNNVectorFieldType) { - maps.put(fieldName, fieldsEntry.getValue()); + if (hasNested) { + //TODO handle nested field + logger.debug("Use nested:" + hasNested); + } + for (DocValueField f : fields) { + if (maps.containsKey(f.field)) { + continue; } + maps.put(f.field, f.fetcher.fetchValues(hitContext.sourceLookup())); } - - //TODO process nested BytesStreamOutput streamOutput = new BytesStreamOutput(BYTES_PER_KILOBYTES); XContentBuilder builder = new XContentBuilder(XContentType.JSON.xContent(), streamOutput); builder.value(maps); hitContext.hit().sourceRef(BytesReference.bytes(builder)); } } + + private static class DocValueField { + private final String field; + private final ValueFetcher fetcher; + + DocValueField(String field, ValueFetcher fetcher) { + this.field = field; + this.fetcher = fetcher; + } + } }