Skip to content

Commit

Permalink
Merge pull request #46 from lalithkota/develop
Browse files Browse the repository at this point in the history
Removed .keyword for types not string. Added query keyword addition in ES Query. Removed error when hit is found but field is not found
  • Loading branch information
pjoshi751 committed Aug 10, 2023
2 parents 1d2827a + 1b73d71 commit cd3cd65
Showing 1 changed file with 28 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,20 @@ private class ESQueryConfig extends Config{
String esIndex;
String[] esInputFields;
String esOutputField;
String esInputQueryAddKeyword;

// RestHighLevelClient esClient;
CloseableHttpClient hClient;
HttpGet hGet;

ESQueryConfig(String type, String esUrl, String esIndex, String[] esInputFields, String esOutputField, String[] inputFields, String[] inputDefaultValues,String outputField) {
super(type,inputFields,inputDefaultValues,outputField,Schema.STRING_SCHEMA);
ESQueryConfig(String type, String esUrl, String esIndex, String[] esInputFields, String esOutputField, String[] inputFields, String[] inputDefaultValues,String outputField, String esInputQueryAddKeyword) {
super(type,inputFields,inputDefaultValues,outputField,Schema.OPTIONAL_STRING_SCHEMA);

this.esUrl=esUrl;
this.esIndex=esIndex;
this.esInputFields=esInputFields;
this.esOutputField=esOutputField;
this.esInputQueryAddKeyword=esInputQueryAddKeyword;

// esClient = new RestHighLevelClient(RestClient.builder(HttpHost.create(this.esUrl)));
hClient = HttpClients.createDefault();
Expand All @@ -105,7 +107,18 @@ else if(inputValues.size()==0){

for(int i=0; i<inputFields.length; i++){
if(i!=0)requestJson+=",";
requestJson += "{\"term\": {\"" + esInputFields[i] + ".keyword\": \"" + inputValues.get(i) + "\"}}";
requestJson += "{\"term\": {";

Object value = inputValues.get(i);

if(!"true".equals(this.esInputQueryAddKeyword)){
requestJson += "\"" + esInputFields[i] + "\": ";
requestJson += value;
} else {
requestJson += "\"" + esInputFields[i] + ".keyword\": ";
requestJson += "\"" + value + "\"";
}
requestJson += "}}";
}
requestJson += "]}}}";

Expand All @@ -125,15 +138,21 @@ else if(inputValues.size()==0){
else continue;
}

JSONObject responseSource;
// if(responseJson.getJSONObject("hits").getJSONArray("hits").length()!=0){
try{
// get the top hit .. error handling not done properly
return responseJson.getJSONObject("hits").getJSONArray("hits").getJSONObject(0).getJSONObject("_source").getString(esOutputField);
responseSource = responseJson.getJSONObject("hits").getJSONArray("hits").getJSONObject(0).getJSONObject("_source");
}
catch(JSONException je){
if(i==MAX_RETRIES) return "Error: No hits found";
else continue;
}
if(responseSource.has(esOutputField) && !responseSource.isNull(esOutputField)){
return responseSource.get(esOutputField);
} else {
return null;
}
}
// control shouldn't reach here .. it shouldve thrown exception before or returned
return "EMPTY";
Expand Down Expand Up @@ -243,6 +262,7 @@ void close(){
public static final String INPUT_FIELDS_CONFIG = "input.fields";
public static final String OUTPUT_FIELD_CONFIG = "output.field";
public static final String DEFAULT_VALUE_CONFIG = "input.default.values";
public static final String ES_INPUT_QUERY_ADD_KEYWORD = "es.input.query.add.keyword";

private Config config;
private Cache<Schema, Schema> schemaUpdateCache;
Expand All @@ -255,7 +275,8 @@ void close(){
.define(ES_OUTPUT_FIELD_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, "If a successful match is made with the above input field+value, the value of this output field from the same document will be returned")
.define(INPUT_FIELDS_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, "Name of the field in the current index")
.define(OUTPUT_FIELD_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, "Name to give to the new field")
.define(DEFAULT_VALUE_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, "Default vlaues for input fields");
.define(DEFAULT_VALUE_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, "Default vlaues for input fields")
.define(ES_INPUT_QUERY_ADD_KEYWORD, ConfigDef.Type.STRING, "true", ConfigDef.Importance.HIGH, "Should add the .keyword suffix while querying ES?");


@Override
Expand All @@ -274,6 +295,7 @@ public void configure(Map<String, ?> configs) {
String inputFieldBulk = absconf.getString(INPUT_FIELDS_CONFIG);
String outputField = absconf.getString(OUTPUT_FIELD_CONFIG);
String inputDefaultValuesBulk = absconf.getString(DEFAULT_VALUE_CONFIG);
String esInputQueryAddKeyword = absconf.getString(ES_INPUT_QUERY_ADD_KEYWORD);

if(type.isEmpty() || esUrl.isEmpty() || esIndex.isEmpty() || esInputFieldBulk.isEmpty() || esOutputField.isEmpty() || inputFieldBulk.isEmpty() || outputField.isEmpty() || inputDefaultValuesBulk.isEmpty()){
throw new ConfigException("One of required transform config fields not set. Required field in tranforms: " + ES_URL_CONFIG + " ," + ES_INDEX_CONFIG + " ," + ES_INPUT_FIELDS_CONFIG + " ," + ES_OUTPUT_FIELD_CONFIG + " ," + INPUT_FIELDS_CONFIG + " ," + OUTPUT_FIELD_CONFIG + " ," + DEFAULT_VALUE_CONFIG);
Expand All @@ -288,7 +310,7 @@ public void configure(Map<String, ?> configs) {
}

try{
config = new ESQueryConfig(type,esUrl,esIndex,esInputFields,esOutputField,inputFields,inputDefaultValues,outputField);
config = new ESQueryConfig(type,esUrl,esIndex,esInputFields,esOutputField,inputFields,inputDefaultValues,outputField,esInputQueryAddKeyword);
}
catch(Exception e){
throw new ConfigException("Can't connect to ElasticSearch. Given url : " + esUrl + " Error: " + e.getMessage());
Expand Down

0 comments on commit cd3cd65

Please sign in to comment.