Skip to content

Commit

Permalink
Add KNNVector DocValues Fields
Browse files Browse the repository at this point in the history
Add FetchSubPhase supported Synthetic KNN field source
  • Loading branch information
luyuncheng committed Mar 27, 2024
1 parent dbaf420 commit fa4ff4c
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 23 deletions.
29 changes: 22 additions & 7 deletions src/main/java/org/opensearch/knn/index/KNNSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,7 @@
import org.opensearch.monitor.os.OsProbe;

import java.security.InvalidParameterException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -77,6 +72,7 @@ public class KNNSettings {
public static final String MODEL_CACHE_SIZE_LIMIT = "knn.model.cache.size.limit";
public static final String ADVANCED_FILTERED_EXACT_SEARCH_THRESHOLD = "index.knn.advanced.filtered_exact_search_threshold";
public static final String KNN_FAISS_AVX2_DISABLED = "knn.faiss.avx2.disabled";
public static final String KNN_SYNTHETIC_SOURCE_ENABLED = "knn.synthetic_source.enabled";

/**
* Default setting values
Expand Down Expand Up @@ -234,6 +230,12 @@ public class KNNSettings {

public static final Setting<Boolean> KNN_FAISS_AVX2_DISABLED_SETTING = Setting.boolSetting(KNN_FAISS_AVX2_DISABLED, false, NodeScope);

public static final Setting<Boolean> KNN_SYNTHETIC_SOURCE_ENABLED_SETTING = Setting.boolSetting(
KNN_SYNTHETIC_SOURCE_ENABLED,
false,
IndexScope
);

/**
* Dynamic settings
*/
Expand Down Expand Up @@ -347,6 +349,10 @@ private Setting<?> getSetting(String key) {
return KNN_FAISS_AVX2_DISABLED_SETTING;
}

if (KNN_SYNTHETIC_SOURCE_ENABLED.equals(key)) {
return KNN_SYNTHETIC_SOURCE_ENABLED_SETTING;
}

throw new IllegalArgumentException("Cannot find setting by key [" + key + "]");
}

Expand All @@ -364,7 +370,8 @@ public List<Setting<?>> getSettings() {
MODEL_INDEX_NUMBER_OF_REPLICAS_SETTING,
MODEL_CACHE_SIZE_LIMIT_SETTING,
ADVANCED_FILTERED_EXACT_SEARCH_THRESHOLD_SETTING,
KNN_FAISS_AVX2_DISABLED_SETTING
KNN_FAISS_AVX2_DISABLED_SETTING,
KNN_SYNTHETIC_SOURCE_ENABLED_SETTING
);
return Stream.concat(settings.stream(), dynamicCacheSettings.values().stream()).collect(Collectors.toList());
}
Expand Down Expand Up @@ -397,6 +404,14 @@ public static Integer getFilteredExactSearchThreshold(final String indexName) {
.getAsInt(ADVANCED_FILTERED_EXACT_SEARCH_THRESHOLD, ADVANCED_FILTERED_EXACT_SEARCH_THRESHOLD_DEFAULT_VALUE);
}

public static boolean isKNNSyntheticEnabled(final String indexName) {
return KNNSettings.state().clusterService.state()
.getMetadata()
.index(indexName)
.getSettings()
.getAsBoolean(KNN_SYNTHETIC_SOURCE_ENABLED, false);
}

public void initialize(Client client, ClusterService clusterService) {
this.client = client;
this.clusterService = clusterService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public boolean advanceExact(int doc) throws IOException {

@Override
public int docValueCount() {
return docExists ? floats.length : 0;
return docExists ? floats.length : 1;
}

@Override
Expand Down
68 changes: 53 additions & 15 deletions src/main/java/org/opensearch/knn/index/fetch/KNNFetchSubPhase.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,66 +16,104 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.LeafReaderContext;
import org.opensearch.common.document.DocumentField;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.mapper.DocValueFetcher;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.ValueFetcher;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.mapper.KNNVectorFieldMapper;
import org.opensearch.search.SearchHit;
import org.opensearch.search.fetch.FetchContext;
import org.opensearch.search.fetch.FetchSubPhase;
import org.opensearch.search.fetch.FetchSubPhaseProcessor;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.opensearch.knn.common.KNNConstants.BYTES_PER_KILOBYTES;


/**
* Fetch sub phase which pull data from doc values.
* and fulfill the value into source map
*/
public class KNNFetchSubPhase implements FetchSubPhase {
private static Logger logger = LogManager.getLogger(KNNFetchSubPhase.class);


@Override
public FetchSubPhaseProcessor getProcessor(FetchContext fetchContext) throws IOException {
return null;
if (!KNNSettings.isKNNSyntheticEnabled(fetchContext.getIndexName())) {
return null;
}
MapperService mapperService = fetchContext.mapperService();

List<DocValueField> fields = new ArrayList<>();
for (MappedFieldType mappedFieldType : mapperService.fieldTypes()) {
if (mappedFieldType != null && mappedFieldType instanceof KNNVectorFieldMapper.KNNVectorFieldType) {
String fieldName = mappedFieldType.name();
ValueFetcher fetcher = new DocValueFetcher(mappedFieldType.docValueFormat(null, null),
fetchContext.searchLookup().doc().getForField(mappedFieldType));
fields.add(new DocValueField(fieldName, fetcher));
}
}
return new KNNFetchSubPhaseProcessor(fetchContext, fields);
}

@AllArgsConstructor
@Getter
class KNNFetchSubPhaseProcessor implements FetchSubPhaseProcessor {

private final FetchContext fetchContext;

private final List<DocValueField> fields;

@Override
public void setNextReader(LeafReaderContext leafReaderContext) throws IOException {

public void setNextReader(LeafReaderContext readerContext) throws IOException {
for (DocValueField f : fields) {
f.fetcher.setNextReader(readerContext);
}
}

@Override
public void process(HitContext hitContext) throws IOException {
SearchHit hit = hitContext.hit();
Map<String, DocumentField> fields = hit.getFields();
MapperService mapperService = fetchContext.mapperService();
final boolean hasNested = mapperService.hasNested();
SearchHit hit = hitContext.hit();
Map<String, Object> maps = hit.getSourceAsMap();
if (maps == null) {
//when source is disabled, return
return;
}

for (Map.Entry<String, DocumentField> fieldsEntry : fields.entrySet()) {
String fieldName = fieldsEntry.getKey();
MappedFieldType mappedFieldType = mapperService.fieldType(fieldName);
if (mappedFieldType != null && mappedFieldType instanceof KNNVectorFieldMapper.KNNVectorFieldType) {
maps.put(fieldName, fieldsEntry.getValue());
if (hasNested) {
//TODO handle nested field
logger.debug("Use nested:" + hasNested);
}
for (DocValueField f : fields) {
if (maps.containsKey(f.field)) {
continue;
}
maps.put(f.field, f.fetcher.fetchValues(hitContext.sourceLookup()));
}

//TODO process nested
BytesStreamOutput streamOutput = new BytesStreamOutput(BYTES_PER_KILOBYTES);
XContentBuilder builder = new XContentBuilder(XContentType.JSON.xContent(), streamOutput);
builder.value(maps);
hitContext.hit().sourceRef(BytesReference.bytes(builder));
}
}

private static class DocValueField {
private final String field;
private final ValueFetcher fetcher;

DocValueField(String field, ValueFetcher fetcher) {
this.field = field;
this.fetcher = fetcher;
}
}
}

0 comments on commit fa4ff4c

Please sign in to comment.