diff --git a/api/pom.xml b/api/pom.xml index a16d6200..7cf8e592 100644 --- a/api/pom.xml +++ b/api/pom.xml @@ -8,7 +8,7 @@ query-metric-api - 2.1.1-1229 + 2.1.3-SNAPSHOT https://code.nsa.gov/datawave-query-metric-service scm:git:https://github.com/NationalSecurityAgency/datawave-query-metric-service.git @@ -21,7 +21,7 @@ 2.1.0 3.12.0 1.9 - 5.1.0 + 5.9.1 2.3.6 31.1-jre 2.12.5 @@ -103,7 +103,7 @@ org.slf4j - slf4j-log4j12 + slf4j-api ${version.slf4j} @@ -186,7 +186,7 @@ org.slf4j - slf4j-log4j12 + slf4j-api junit @@ -220,4 +220,41 @@ https://raw.githubusercontent.com/NationalSecurityAgency/datawave/mvn-repo + + + + org.apache.maven.plugins + maven-enforcer-plugin + 1.4.1 + + + enforce-banned-dependencies + + enforce + + + + + + + ch.qos.reload4j:reload4j + org.slf4j:slf4j-reload4j + + log4j:log4j + + junit:junit + + + + junit:junit:*:*:test + + + + true + + + + + + diff --git a/api/src/main/java/datawave/microservice/querymetric/BaseQueryMetric.java b/api/src/main/java/datawave/microservice/querymetric/BaseQueryMetric.java index adb26ce0..3e6f8839 100644 --- a/api/src/main/java/datawave/microservice/querymetric/BaseQueryMetric.java +++ b/api/src/main/java/datawave/microservice/querymetric/BaseQueryMetric.java @@ -685,7 +685,7 @@ public int getFieldNumber(String name) { @XmlElement(name = "subplans") @XmlJavaTypeAdapter(StringMapAdapter.class) - protected Map subPlans = new HashMap<>(); + protected Map subPlans = new HashMap<>(); public static final String DATAWAVE = "DATAWAVE"; protected static final Map discoveredVersionMap = BaseQueryMetric.getVersionsFromClasspath(); @@ -696,15 +696,15 @@ public enum Lifecycle { NONE, DEFINED, INITIALIZED, RESULTS, CLOSED, CANCELLED, MAXRESULTS, NEXTTIMEOUT, TIMEOUT, SHUTDOWN, MAXWORK } - public void addSubPlan(String range, String plan) { - subPlans.put(range, plan); + public void addSubPlan(String plan, int[] rangeCounts) { + subPlans.put(plan, rangeCounts); } - public Map getSubPlans() { + public Map getSubPlans() { return subPlans; } - public void setSubPlans(Map subPlans) { + public void setSubPlans(Map subPlans) { this.subPlans = subPlans; } diff --git a/api/src/main/java/datawave/microservice/querymetric/BaseQueryMetricSubplanResponse.java b/api/src/main/java/datawave/microservice/querymetric/BaseQueryMetricSubplanResponse.java index ae383494..db1e8652 100644 --- a/api/src/main/java/datawave/microservice/querymetric/BaseQueryMetricSubplanResponse.java +++ b/api/src/main/java/datawave/microservice/querymetric/BaseQueryMetricSubplanResponse.java @@ -1,21 +1,26 @@ package datawave.microservice.querymetric; import datawave.webservice.HtmlProvider; +import datawave.webservice.query.QueryImpl; import datawave.webservice.result.BaseResponse; +import org.apache.commons.text.StringEscapeUtils; import javax.xml.bind.annotation.XmlElement; import javax.xml.bind.annotation.XmlElementWrapper; import javax.xml.bind.annotation.XmlTransient; +import java.text.SimpleDateFormat; +import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; -public class BaseQueryMetricSubplanResponse extends BaseResponse implements HtmlProvider { +public abstract class BaseQueryMetricSubplanResponse extends BaseResponse implements HtmlProvider { private static final long serialVersionUID = 1L; - private static final String TITLE = "Query Subplans"; + private static final String TITLE = "Query Metrics / Subplans"; private static final String EMPTY = ""; @XmlElementWrapper(name = "querySubplans") @XmlElement(name = "querySubplan") @@ -23,14 +28,8 @@ public class BaseQueryMetricSubplanResponse extends B @XmlElement protected int numResults = 0; @XmlTransient - private boolean administratorMode = false; - @XmlTransient private boolean isGeoQuery = false; - private static String numToString(long number) { - return (number == -1 || number == 0) ? "" : Long.toString(number); - } - public List getResult() { return result; } @@ -48,14 +47,6 @@ public void setNumResults(int numResults) { this.numResults = numResults; } - public boolean isAdministratorMode() { - return administratorMode; - } - - public void setAdministratorMode(boolean administratorMode) { - this.administratorMode = administratorMode; - } - public boolean isGeoQuery() { return isGeoQuery; } @@ -95,24 +86,27 @@ public String getHeadContent() { @Override public String getMainContent() { - StringBuilder builder = new StringBuilder(), pageTimesBuilder = new StringBuilder(); - - builder.append("\n"); - builder.append(""); - builder.append(""); - - pageTimesBuilder.append("
Subplan(s)
\n"); - - builder.append(pageTimesBuilder); - TreeMap metricMap = new TreeMap<>(Collections.reverseOrder()); - for (T metric : this.getResult()) { metricMap.put(metric.getCreateDate(), metric); } - + SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd HHmmss"); + StringBuilder builder = new StringBuilder(); int x = 0; for (T metric : metricMap.values()) { + builder.append("
\n"); + builder.append(""); + builder.append(""); + builder.append(""); + builder.append(""); + builder.append(""); + builder.append(""); + builder.append(""); + builder.append(""); + builder.append(""); + builder.append(""); + + Set parameters = metric.getParameters(); // highlight alternating rows if (x % 2 == 0) { builder.append("\n"); @@ -120,23 +114,48 @@ public String getMainContent() { builder.append("\n"); } x++; + + builder.append(""); + builder.append(""); + builder.append(""); + String userDN = metric.getUserDN(); + builder.append(""); + builder.append(""); + builder.append(""); + builder.append(isJexlQuery(parameters) ? ""); + builder.append(""); + builder.append("\n"); + builder.append("
VisibilityQuery DateUserUserDNQuery IDQuery LogicQueryQuery Plan
").append(metric.getColumnVisibility()).append("").append(sdf.format(metric.getCreateDate())).append("").append(metric.getUser()).append("").append(userDN == null ? "" : userDN).append("").append(metric.getQueryId()).append("").append(metric.getQueryLogic()).append("" : "") + .append(StringEscapeUtils.escapeHtml4(metric.getQuery())).append("").append(StringEscapeUtils.escapeHtml4(metric.getPlan())).append("
\n"); + + builder.append("
"); + builder.append("\n"); + builder.append(""); if (metric.getSubPlans() != null && !metric.getSubPlans().isEmpty()) { - for (Map.Entry subplan : metric.getSubPlans().entrySet()) { - builder.append(""); + int s = 0; + for (Map.Entry e : metric.getSubPlans().entrySet()) { + // highlight alternating rows + if (s % 2 == 0) { + builder.append(""); + } else { + builder.append(""); + } + builder.append(""); + builder.append(""); + builder.append("\n\n"); + s++; } } else { - builder.append(""); } - - builder.append("\n\n"); + builder.append("\n"); + builder.append("
RangeSub Plan
").append(subplan.getKey()).append(subplan.getValue()).append("
").append(e.getKey()).append("").append(Arrays.toString(e.getValue())).append("
NONE"); + builder.append("
NONE
\n"); + builder.append("\n

\n"); } - - builder.append("\n
\n"); - pageTimesBuilder.append("\n"); - - builder.append(pageTimesBuilder); - return builder.toString(); } + private static boolean isJexlQuery(Set params) { + return params.stream().anyMatch(p -> p.getParameterName().equals("query.syntax") && p.getParameterValue().equals("JEXL")); + } } \ No newline at end of file diff --git a/api/src/main/java/datawave/microservice/querymetric/QueryMetric.java b/api/src/main/java/datawave/microservice/querymetric/QueryMetric.java index 361d857a..fefbd90e 100644 --- a/api/src/main/java/datawave/microservice/querymetric/QueryMetric.java +++ b/api/src/main/java/datawave/microservice/querymetric/QueryMetric.java @@ -410,7 +410,7 @@ public void writeTo(Output output, QueryMetric message) throws IOException { } if (message.subPlans != null) { - for (Map.Entry entry : message.subPlans.entrySet()) { + for (Map.Entry entry : message.subPlans.entrySet()) { output.writeString(39, StringUtils.join(Arrays.asList(entry.getKey(), entry.getValue()), "\0"), true); } } @@ -570,7 +570,13 @@ public void mergeFrom(Input input, QueryMetric message) throws IOException { String encodedPlans = input.readString(); String[] splitPlans = StringUtils.split(encodedPlans, "\0"); if (splitPlans.length == 2) { - message.subPlans.put(splitPlans[0], splitPlans[1]); + int[] rangeCounts = new int[2]; + int index = 0; + for (String count : splitPlans[1].substring(1, splitPlans[1].length() - 1).split(", ")) { + rangeCounts[index] = Integer.parseInt(count); + index++; + } + message.subPlans.put(splitPlans[0], rangeCounts); } break; default: diff --git a/api/src/main/java/datawave/microservice/querymetric/QueryMetricUpdate.java b/api/src/main/java/datawave/microservice/querymetric/QueryMetricUpdate.java index e319bb1c..75801d63 100644 --- a/api/src/main/java/datawave/microservice/querymetric/QueryMetricUpdate.java +++ b/api/src/main/java/datawave/microservice/querymetric/QueryMetricUpdate.java @@ -9,10 +9,10 @@ public class QueryMetricUpdate implements Serializable { @XmlElement - private T metric; + protected T metric; @XmlElement - private QueryMetricType metricType; + protected QueryMetricType metricType; /* constructor for deserializing JSON messages */ public QueryMetricUpdate() { diff --git a/api/src/main/java/datawave/microservice/querymetric/QueryMetricsDetailListResponse.java b/api/src/main/java/datawave/microservice/querymetric/QueryMetricsDetailListResponse.java index d22487d3..d71154b4 100644 --- a/api/src/main/java/datawave/microservice/querymetric/QueryMetricsDetailListResponse.java +++ b/api/src/main/java/datawave/microservice/querymetric/QueryMetricsDetailListResponse.java @@ -86,12 +86,8 @@ public String getMainContent() { builder.append(isJexlQuery(parameters) ? "" : "") .append(StringEscapeUtils.escapeHtml4(metric.getQuery())).append(""); builder.append("").append(StringEscapeUtils.escapeHtml4(metric.getPlan())).append(""); - if (metric.getSubPlans() != null && !metric.getSubPlans().isEmpty()) { - builder.append("").append(" Subplan(s)").append(""); - } else { - builder.append(""); - } + builder.append("").append("Subplan(s)").append(""); builder.append("").append(metric.getQueryName()).append(""); String beginDate = metric.getBeginDate() == null ? "" : sdf.format(metric.getBeginDate()); diff --git a/api/src/main/java/datawave/microservice/querymetric/QueryMetricsSubplanResponse.java b/api/src/main/java/datawave/microservice/querymetric/QueryMetricsSubplanResponse.java index 7b11874b..fbbbc34e 100644 --- a/api/src/main/java/datawave/microservice/querymetric/QueryMetricsSubplanResponse.java +++ b/api/src/main/java/datawave/microservice/querymetric/QueryMetricsSubplanResponse.java @@ -1,83 +1,15 @@ package datawave.microservice.querymetric; -import datawave.webservice.query.QueryImpl.Parameter; -import datawave.microservice.querymetric.BaseQueryMetric.PageMetric; -import datawave.microservice.querymetric.BaseQueryMetric.Prediction; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.text.StringEscapeUtils; - import javax.xml.bind.annotation.XmlAccessOrder; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorOrder; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlRootElement; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.List; -import java.util.Set; -import java.util.TreeMap; -import java.util.Map; -import java.util.stream.Collectors; @XmlRootElement(name = "QueryMetricListResponse") @XmlAccessorType(XmlAccessType.NONE) @XmlAccessorOrder(XmlAccessOrder.ALPHABETICAL) -public class QueryMetricsSubplanResponse extends QueryMetricListResponse { +public class QueryMetricsSubplanResponse extends BaseQueryMetricSubplanResponse { private static final long serialVersionUID = 1L; - - private static final String TITLE = "Query Subplans"; - - @Override - public String getTitle() { - return TITLE; - } - - @Override - public String getMainContent() { - StringBuilder builder = new StringBuilder(), pageTimesBuilder = new StringBuilder(); - - builder.append("\n"); - builder.append(""); - builder.append(""); - - pageTimesBuilder.append("
Subplan(s)
\n"); - - builder.append(pageTimesBuilder); - - TreeMap metricMap = new TreeMap<>(Collections.reverseOrder()); - - for (QueryMetric metric : this.getResult()) { - metricMap.put(metric.getCreateDate(), metric); - } - - int x = 0; - for (QueryMetric metric : metricMap.values()) { - // highlight alternating rows - if (x % 2 == 0) { - builder.append("\n"); - } else { - builder.append("\n"); - } - x++; - if (metric.getSubPlans() != null && !metric.getSubPlans().isEmpty()) { - for (Map.Entry subplan : metric.getSubPlans().entrySet()) { - builder.append(""); - } - } else { - builder.append("\n"); - } - - builder.append("
").append(subplan.getKey()).append(subplan.getValue()).append("NONE"); - } - - builder.append("\n
\n
\n"); - pageTimesBuilder.append("\n"); - - builder.append(pageTimesBuilder); - - return builder.toString(); - } } diff --git a/pom.xml b/pom.xml index 184b3e1b..65fb2d90 100644 --- a/pom.xml +++ b/pom.xml @@ -8,7 +8,7 @@ query-metric-parent - 2.1.1-1229 + 2.1.3-SNAPSHOT pom https://code.nsa.gov/datawave-query-metric-service diff --git a/service/pom.xml b/service/pom.xml index d3dfe7e5..446a210b 100644 --- a/service/pom.xml +++ b/service/pom.xml @@ -8,7 +8,7 @@ query-metric-service - 2.1.1-1229 + 2.1.3-SNAPSHOT DATAWAVE Query Metric Microservice https://code.nsa.gov/datawave-query-metric-service @@ -21,18 +21,18 @@ datawave.microservice.querymetric.QueryMetricService 2.1.0 1.1.1 - 5.1.0 + 5.9.1 1.4 2.1.0 2.17.1 - 2.1.1 + 2.1.2 2.1.2 2.1.1 - 2.1.1 + 2.1.7 ${project.version} 2.1.1 1.5.5 - 1.11 + 1.15 3.7.1 2.1.4.RELEASE 2.0.2 @@ -74,6 +74,10 @@ datawave-query-core ${version.datawave} + + reload4j + ch.qos.reload4j + log4j log4j @@ -418,7 +422,7 @@ org.apache.maven.plugins maven-enforcer-plugin - 3.0.0 + 1.4.1 enforce-banned-dependencies @@ -430,6 +434,7 @@ + ch.qos.reload4j:reload4j org.slf4j:slf4j-reload4j log4j:log4j diff --git a/service/src/main/java/datawave/microservice/querymetric/MetricUpdateEntryProcessor.java b/service/src/main/java/datawave/microservice/querymetric/MetricUpdateEntryProcessor.java index 902a0a10..ac8826b4 100644 --- a/service/src/main/java/datawave/microservice/querymetric/MetricUpdateEntryProcessor.java +++ b/service/src/main/java/datawave/microservice/querymetric/MetricUpdateEntryProcessor.java @@ -2,6 +2,7 @@ import com.hazelcast.map.AbstractEntryProcessor; import datawave.microservice.querymetric.handler.QueryMetricCombiner; + import java.util.Map; public class MetricUpdateEntryProcessor extends AbstractEntryProcessor { @@ -16,22 +17,33 @@ public MetricUpdateEntryProcessor(QueryMetricUpdateHolder metricUpdate, QueryMet @Override public Long process(Map.Entry entry) { + QueryMetricUpdateHolder updatedHolder; + QueryMetricType metricType = this.metricUpdate.getMetricType(); + BaseQueryMetric updatedMetric = this.metricUpdate.getMetric(); long start = System.currentTimeMillis(); - BaseQueryMetric combinedMetric; if (entry.getValue() == null) { - entry.setValue(this.metricUpdate); + updatedHolder = this.metricUpdate; } else { - QueryMetricType metricType = this.metricUpdate.getMetricType(); + updatedHolder = entry.getValue(); BaseQueryMetric storedMetric = entry.getValue().getMetric(); - BaseQueryMetric updatedMetric = this.metricUpdate.getMetric(); + BaseQueryMetric combinedMetric; combinedMetric = this.combiner.combineMetrics(updatedMetric, storedMetric, metricType); - combinedMetric.setNumUpdates(storedMetric.getNumUpdates() + updatedMetric.getNumUpdates()); - boolean isNewMetric = entry.getValue().isNewMetric(); - if (isNewMetric == false) { - isNewMetric = QueryMetricUpdateHolder.isNewMetric(storedMetric) || QueryMetricUpdateHolder.isNewMetric(updatedMetric); - } - entry.setValue(new QueryMetricUpdateHolder(combinedMetric, metricType, isNewMetric)); + updatedHolder.setMetric(combinedMetric); + updatedHolder.setMetricType(metricType); + } + + if (metricType.equals(QueryMetricType.DISTRIBUTED) && updatedMetric != null) { + // these values are added incrementally in a distributed update. Because we can not be sure + // exactly when the incomingQueryMetricCache value is stored, it would otherwise be possible + // for updates to be included twice. These values are reset after being used in the AccumuloMapStore + updatedHolder.addValue("sourceCount", updatedMetric.getSourceCount()); + updatedHolder.addValue("nextCount", updatedMetric.getNextCount()); + updatedHolder.addValue("seekCount", updatedMetric.getSeekCount()); + updatedHolder.addValue("yieldCount", updatedMetric.getYieldCount()); + updatedHolder.addValue("docRanges", updatedMetric.getDocRanges()); + updatedHolder.addValue("fiRanges", updatedMetric.getFiRanges()); } + entry.setValue(updatedHolder); return Long.valueOf(System.currentTimeMillis() - start); } } diff --git a/service/src/main/java/datawave/microservice/querymetric/QueryMetricUpdateHolder.java b/service/src/main/java/datawave/microservice/querymetric/QueryMetricUpdateHolder.java index 5cdb0420..c1803a95 100644 --- a/service/src/main/java/datawave/microservice/querymetric/QueryMetricUpdateHolder.java +++ b/service/src/main/java/datawave/microservice/querymetric/QueryMetricUpdateHolder.java @@ -1,16 +1,19 @@ package datawave.microservice.querymetric; +import datawave.microservice.querymetric.BaseQueryMetric.Lifecycle; + +import java.util.HashMap; +import java.util.Map; + public class QueryMetricUpdateHolder extends QueryMetricUpdate { - private boolean newMetric; - - public QueryMetricUpdateHolder(T metric, QueryMetricType metricType, boolean newMetric) { - super(metric, metricType); - this.newMetric = newMetric; - } + private boolean persisted = false; + private Lifecycle lowestLifecycleSincePersist; + private Map values = new HashMap<>(); public QueryMetricUpdateHolder(T metric, QueryMetricType metricType) { - this(metric, metricType, QueryMetricUpdateHolder.isNewMetric(metric)); + super(metric, metricType); + this.lowestLifecycleSincePersist = this.metric.getLifecycle(); } public QueryMetricUpdateHolder(T metric) { @@ -21,11 +24,43 @@ public QueryMetricUpdateHolder(QueryMetricUpdate metricUpdate) { this((T) metricUpdate.getMetric(), metricUpdate.getMetricType()); } + // If we know that this metric has been persisted by the AccumuloMapStore, then it is not new + // Because the metric can be ejected from the incoming cache, we also track the lowest lifecycle public boolean isNewMetric() { - return newMetric; + return !persisted && (lowestLifecycleSincePersist == null || lowestLifecycleSincePersist.equals(Lifecycle.DEFINED)); + } + + public void addValue(String key, Long value) { + if (values.containsKey(key)) { + values.put(key, values.get(key) + value); + } else { + values.put(key, value); + } + } + + public Long getValue(String key) { + if (values.containsKey(key)) { + return values.get(key); + } else { + return 0l; + } + } + + public void persisted() { + persisted = true; + values.clear(); + lowestLifecycleSincePersist = null; + } + + public Lifecycle getLowestLifecycleSincePersist() { + return lowestLifecycleSincePersist; } - public static boolean isNewMetric(BaseQueryMetric metric) { - return metric.getLifecycle().equals(BaseQueryMetric.Lifecycle.DEFINED) || metric.getLifecycle().equals(BaseQueryMetric.Lifecycle.INITIALIZED); + @Override + public void setMetric(T metric) { + super.setMetric(metric); + if (this.lowestLifecycleSincePersist == null || this.metric.getLifecycle().ordinal() < this.lowestLifecycleSincePersist.ordinal()) { + this.lowestLifecycleSincePersist = this.metric.getLifecycle(); + } } } diff --git a/service/src/main/java/datawave/microservice/querymetric/config/HazelcastMetricCacheConfiguration.java b/service/src/main/java/datawave/microservice/querymetric/config/HazelcastMetricCacheConfiguration.java index 54b5192b..fe92f061 100644 --- a/service/src/main/java/datawave/microservice/querymetric/config/HazelcastMetricCacheConfiguration.java +++ b/service/src/main/java/datawave/microservice/querymetric/config/HazelcastMetricCacheConfiguration.java @@ -1,14 +1,18 @@ package datawave.microservice.querymetric.config; import com.hazelcast.config.Config; +import com.hazelcast.config.ConfigurationException; import com.hazelcast.config.DiscoveryStrategyConfig; +import com.hazelcast.config.InMemoryFormat; import com.hazelcast.config.JoinConfig; import com.hazelcast.config.ListenerConfig; +import com.hazelcast.config.MapConfig; import com.hazelcast.config.MapStoreConfig; import com.hazelcast.config.TcpIpConfig; import com.hazelcast.config.XmlConfigBuilder; import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.core.IMap; import com.hazelcast.core.LifecycleEvent; import com.hazelcast.kubernetes.HazelcastKubernetesDiscoveryStrategyFactory; import com.hazelcast.kubernetes.KubernetesProperties; @@ -35,6 +39,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.Collection; +import java.util.Map; import static java.nio.charset.StandardCharsets.UTF_8; @@ -188,6 +193,15 @@ private Config generateDefaultConfig(HazelcastMetricCacheProperties cachePropert ListenerConfig membershipListenerConfig = new ListenerConfig(); membershipListenerConfig.setImplementation(new ClusterMembershipListener()); config.addListenerConfig(membershipListenerConfig); + + Map mapConfigs = config.getMapConfigs(); + for (Map.Entry e : mapConfigs.entrySet()) { + InMemoryFormat inMemoryFormat = e.getValue().getInMemoryFormat(); + if (!inMemoryFormat.equals(InMemoryFormat.OBJECT)) { + log.info("overriding in-memory-format:" + inMemoryFormat + " for map " + e.getKey() + " to OBJECT"); + e.getValue().setInMemoryFormat(InMemoryFormat.OBJECT); + } + } return config; } } diff --git a/service/src/main/java/datawave/microservice/querymetric/config/QueryMetricHandlerProperties.java b/service/src/main/java/datawave/microservice/querymetric/config/QueryMetricHandlerProperties.java index 5f461c56..f6376dcb 100644 --- a/service/src/main/java/datawave/microservice/querymetric/config/QueryMetricHandlerProperties.java +++ b/service/src/main/java/datawave/microservice/querymetric/config/QueryMetricHandlerProperties.java @@ -39,7 +39,6 @@ public class QueryMetricHandlerProperties { @NotBlank protected String password; protected int accumuloClientPoolSize = 16; - protected int mapStoreWriteThreads = 16; protected int numShards = 10; protected String shardTableName = "QueryMetrics_e"; protected String indexTableName = "QueryMetrics_i"; @@ -98,7 +97,8 @@ public class QueryMetricHandlerProperties { "SEEK_COUNT", "SETUP_TIME", "SOURCE_COUNT", - "USER"); + "USER", + "YIELD_COUNT"); protected List additionalIndexFields = Collections.EMPTY_LIST; @@ -122,12 +122,18 @@ public class QueryMetricHandlerProperties { protected List numericFields = Arrays.asList( "CREATE_CALL_TIME", + "DOC_RANGES", "ELAPSED_TIME", + "FI_RANGES", "LOGIN_TIME", + "NEXT_COUNT", + "SEEK_COUNT", "SETUP_TIME", + "SOURCE_COUNT", "NUM_PAGES", "NUM_RESULTS", - "NUM_UPDATES"); + "NUM_UPDATES", + "YIELD_COUNT"); protected List additionalNumericFields = Collections.EMPTY_LIST; //@formatter:on @@ -242,14 +248,6 @@ public void setAccumuloClientPoolSize(int accumuloClientPoolSize) { this.accumuloClientPoolSize = accumuloClientPoolSize; } - public int getMapStoreWriteThreads() { - return mapStoreWriteThreads; - } - - public void setMapStoreWriteThreads(int mapStoreWriteThreads) { - this.mapStoreWriteThreads = mapStoreWriteThreads; - } - public int getNumShards() { return numShards; } diff --git a/service/src/main/java/datawave/microservice/querymetric/factory/QueryMetricSubplanResponseFactory.java b/service/src/main/java/datawave/microservice/querymetric/factory/QueryMetricSubplanResponseFactory.java index 948e0ca9..4d73449f 100644 --- a/service/src/main/java/datawave/microservice/querymetric/factory/QueryMetricSubplanResponseFactory.java +++ b/service/src/main/java/datawave/microservice/querymetric/factory/QueryMetricSubplanResponseFactory.java @@ -1,11 +1,12 @@ package datawave.microservice.querymetric.factory; import datawave.microservice.querymetric.BaseQueryMetricSubplanResponse; +import datawave.microservice.querymetric.QueryMetricsSubplanResponse; public class QueryMetricSubplanResponseFactory implements BaseQueryMetricSubplanResponseFactory { @Override public BaseQueryMetricSubplanResponse createSubplanResponse() { - return new BaseQueryMetricSubplanResponse(); + return new QueryMetricsSubplanResponse(); } } diff --git a/service/src/main/java/datawave/microservice/querymetric/handler/ContentQueryMetricsIngestHelper.java b/service/src/main/java/datawave/microservice/querymetric/handler/ContentQueryMetricsIngestHelper.java index dae30e8b..b859969c 100644 --- a/service/src/main/java/datawave/microservice/querymetric/handler/ContentQueryMetricsIngestHelper.java +++ b/service/src/main/java/datawave/microservice/querymetric/handler/ContentQueryMetricsIngestHelper.java @@ -9,13 +9,14 @@ import datawave.microservice.querymetric.BaseQueryMetric; import datawave.microservice.querymetric.BaseQueryMetric.PageMetric; import datawave.microservice.querymetric.BaseQueryMetric.Prediction; -import datawave.webservice.query.QueryImpl; import datawave.webservice.query.util.QueryUtil; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -66,8 +67,8 @@ public Multimap normalize(Multimap getEventFieldsToWrite(BaseQueryMetric updatedQueryMetric) { - return normalize(delegate.getEventFieldsToWrite(updatedQueryMetric)); + public Multimap getEventFieldsToWrite(BaseQueryMetric updatedQueryMetric, BaseQueryMetric storedQueryMetric) { + return normalize(delegate.getEventFieldsToWrite(updatedQueryMetric, storedQueryMetric)); } @Override @@ -91,116 +92,239 @@ public int getFieldSizeThreshold() { public static class HelperDelegate { - public Multimap getEventFieldsToWrite(T updatedQueryMetric) { + protected boolean isChanged(String updated, String stored) { + if ((StringUtils.isBlank(stored) && StringUtils.isNotBlank(updated)) || (stored != null && updated != null && !stored.equals(updated))) { + return true; + } else { + return false; + } + } + + protected boolean isChanged(long updated, long stored) { + if (updated != stored) { + return true; + } else { + return false; + } + } + + protected boolean isChanged(BaseQueryMetric.Lifecycle updated, BaseQueryMetric.Lifecycle stored) { + if ((stored == null && updated != null) || (stored != null && updated != null && (stored.ordinal() != updated.ordinal()))) { + return true; + } else { + return false; + } + } + + protected boolean isFirstWrite(Collection updated, Collection stored) { + if ((stored == null || stored.isEmpty()) && (updated != null && !updated.isEmpty())) { + return true; + } else { + return false; + } + } + + protected boolean isFirstWrite(Map updated, Map stored) { + if ((stored == null || stored.isEmpty()) && (updated != null && !updated.isEmpty())) { + return true; + } else { + return false; + } + } + + protected boolean isFirstWrite(String updated, String stored) { + if (stored == null && StringUtils.isNotBlank(updated)) { + return true; + } else { + return false; + } + } + + protected boolean isFirstWrite(Object updated, Object stored) { + if (stored == null && updated != null) { + return true; + } else { + return false; + } + } + + protected boolean isFirstWrite(long updated, long stored, long initValue) { + if (stored == initValue && updated != initValue) { + return true; + } else { + return false; + } + } + + public Multimap getEventFieldsToWrite(T updated, T stored) { HashMultimap fields = HashMultimap.create(); SimpleDateFormat sdf_date_time1 = new SimpleDateFormat("yyyyMMdd HHmmss"); SimpleDateFormat sdf_date_time2 = new SimpleDateFormat("yyyyMMdd HHmmss"); - if (updatedQueryMetric.getPositiveSelectors() != null) { - fields.putAll("POSITIVE_SELECTORS", updatedQueryMetric.getPositiveSelectors()); + if (isFirstWrite(updated.getPositiveSelectors(), stored == null ? null : stored.getPositiveSelectors())) { + fields.putAll("POSITIVE_SELECTORS", updated.getPositiveSelectors()); } - if (updatedQueryMetric.getNegativeSelectors() != null) { - fields.putAll("NEGATIVE_SELECTORS", updatedQueryMetric.getNegativeSelectors()); + if (isFirstWrite(updated.getNegativeSelectors(), stored == null ? null : stored.getNegativeSelectors())) { + fields.putAll("NEGATIVE_SELECTORS", updated.getNegativeSelectors()); } - if (updatedQueryMetric.getQueryAuthorizations() != null) { - fields.put("AUTHORIZATIONS", updatedQueryMetric.getQueryAuthorizations()); + if (isFirstWrite(updated.getQueryAuthorizations(), stored == null ? null : stored.getQueryAuthorizations())) { + fields.put("AUTHORIZATIONS", updated.getQueryAuthorizations()); } - if (updatedQueryMetric.getBeginDate() != null) { - fields.put("BEGIN_DATE", sdf_date_time1.format(updatedQueryMetric.getBeginDate())); + if (isFirstWrite(updated.getBeginDate(), stored == null ? null : stored.getBeginDate())) { + fields.put("BEGIN_DATE", sdf_date_time1.format(updated.getBeginDate())); } - fields.put("CREATE_CALL_TIME", Long.toString(updatedQueryMetric.getCreateCallTime())); - if (updatedQueryMetric.getCreateDate() != null) { - fields.put("CREATE_DATE", sdf_date_time2.format(updatedQueryMetric.getCreateDate())); + if (isChanged(updated.getCreateCallTime(), stored == null ? -1 : stored.getCreateCallTime())) { + fields.put("CREATE_CALL_TIME", Long.toString(updated.getCreateCallTime())); } - fields.put("DOC_RANGES", Long.toString(updatedQueryMetric.getDocRanges())); - fields.put("ELAPSED_TIME", Long.toString(updatedQueryMetric.getElapsedTime())); - if (updatedQueryMetric.getEndDate() != null) { - fields.put("END_DATE", sdf_date_time1.format(updatedQueryMetric.getEndDate())); + if (isFirstWrite(updated.getCreateDate(), stored == null ? null : stored.getCreateDate())) { + fields.put("CREATE_DATE", sdf_date_time2.format(updated.getCreateDate())); } - if (updatedQueryMetric.getErrorCode() != null) { - fields.put("ERROR_CODE", updatedQueryMetric.getErrorCode()); + if (isChanged(updated.getDocRanges(), stored == null ? -1 : stored.getDocRanges())) { + fields.put("DOC_RANGES", Long.toString(updated.getDocRanges())); } - if (updatedQueryMetric.getErrorMessage() != null) { - fields.put("ERROR_MESSAGE", updatedQueryMetric.getErrorMessage()); + if (isChanged(updated.getElapsedTime(), stored == null ? -1 : stored.getElapsedTime())) { + fields.put("ELAPSED_TIME", Long.toString(updated.getElapsedTime())); } - fields.put("FI_RANGES", Long.toString(updatedQueryMetric.getFiRanges())); - if (updatedQueryMetric.getHost() != null) { - fields.put("HOST", updatedQueryMetric.getHost()); + if (isFirstWrite(updated.getEndDate(), stored == null ? null : stored.getEndDate())) { + fields.put("END_DATE", sdf_date_time1.format(updated.getEndDate())); } - if (updatedQueryMetric.getLastUpdated() != null) { - fields.put("LAST_UPDATED", sdf_date_time2.format(updatedQueryMetric.getLastUpdated())); + if (isChanged(updated.getErrorCode(), stored == null ? null : stored.getErrorCode())) { + fields.put("ERROR_CODE", updated.getErrorCode()); } - if (updatedQueryMetric.getLifecycle() != null) { - fields.put("LIFECYCLE", updatedQueryMetric.getLifecycle().toString()); + if (isChanged(updated.getErrorMessage(), stored == null ? null : stored.getErrorMessage())) { + fields.put("ERROR_MESSAGE", updated.getErrorMessage()); } - fields.put("LOGIN_TIME", Long.toString(updatedQueryMetric.getLoginTime())); - fields.put("NEXT_COUNT", Long.toString(updatedQueryMetric.getNextCount())); - fields.put("NUM_RESULTS", Long.toString(updatedQueryMetric.getNumResults())); - fields.put("NUM_PAGES", Long.toString(updatedQueryMetric.getNumPages())); - fields.put("NUM_UPDATES", Long.toString(updatedQueryMetric.getNumUpdates())); - Set parameters = updatedQueryMetric.getParameters(); - if (parameters != null && !parameters.isEmpty()) { - fields.put("PARAMETERS", QueryUtil.toParametersString(parameters)); + if (isChanged(updated.getFiRanges(), stored == null ? -1 : stored.getFiRanges())) { + fields.put("FI_RANGES", Long.toString(updated.getFiRanges())); } - if (updatedQueryMetric.getPlan() != null) { - fields.put("PLAN", updatedQueryMetric.getPlan()); + if (isFirstWrite(updated.getHost(), stored == null ? null : stored.getHost())) { + fields.put("HOST", updated.getHost()); } - Map subPlans = updatedQueryMetric.getSubPlans(); - if (subPlans != null && !subPlans.isEmpty()) { - for (Map.Entry entry : subPlans.entrySet()) { - fields.put("SUBPLAN", entry.getKey() + " : " + entry.getValue()); + if (updated.getLastUpdated() != null) { + try { + String storedValue = ""; + if (stored != null && stored.getLastUpdated() != null) { + storedValue = sdf_date_time2.format(stored.getLastUpdated()); + } + String updatedValue = sdf_date_time2.format(updated.getLastUpdated()); + if (!updatedValue.isEmpty() && !updatedValue.equals(storedValue)) { + fields.put("LAST_UPDATED", updatedValue); + } + } catch (Exception e) { + log.error("lastUpdated:" + e.getMessage()); } } - if (updatedQueryMetric.getProxyServers() != null) { - fields.put("PROXY_SERVERS", StringUtils.join(updatedQueryMetric.getProxyServers(), ",")); + if (isChanged(updated.getLifecycle(), stored == null ? null : stored.getLifecycle())) { + fields.put("LIFECYCLE", updated.getLifecycle().toString()); + } + if (isChanged(updated.getLoginTime(), stored == null ? -1 : stored.getLoginTime())) { + fields.put("LOGIN_TIME", Long.toString(updated.getLoginTime())); + } + if (isChanged(updated.getNextCount(), stored == null ? -1 : stored.getNextCount())) { + fields.put("NEXT_COUNT", Long.toString(updated.getNextCount())); + } + if (isChanged(updated.getNumResults(), stored == null ? -1 : stored.getNumResults())) { + fields.put("NUM_RESULTS", Long.toString(updated.getNumResults())); + } + if (isChanged(updated.getNumPages(), stored == null ? -1 : stored.getNumPages())) { + fields.put("NUM_PAGES", Long.toString(updated.getNumPages())); + } + if (isChanged(updated.getNumUpdates(), stored == null ? -1 : stored.getNumUpdates())) { + fields.put("NUM_UPDATES", Long.toString(updated.getNumUpdates())); + } + if (isFirstWrite(updated.getParameters(), stored == null ? null : stored.getParameters())) { + fields.put("PARAMETERS", QueryUtil.toParametersString(updated.getParameters())); + } + if (isFirstWrite(updated.getPlan(), stored == null ? null : stored.getPlan())) { + fields.put("PLAN", updated.getPlan()); } - List pageMetrics = updatedQueryMetric.getPageTimes(); - if (pageMetrics != null && !pageMetrics.isEmpty()) { - for (PageMetric p : pageMetrics) { - fields.put("PAGE_METRICS." + p.getPageNumber(), p.toEventString()); + if (isFirstWrite(updated.getProxyServers(), stored == null ? null : stored.getProxyServers())) { + fields.put("PROXY_SERVERS", StringUtils.join(updated.getProxyServers(), ",")); + } + + Map storedPageMetricMap = new HashMap<>(); + if (stored != null) { + List storedPageMetrics = stored.getPageTimes(); + if (storedPageMetrics != null) { + for (PageMetric p : storedPageMetrics) { + storedPageMetricMap.put(p.getPageNumber(), p); + } + } + } + if (updated != null) { + List updatedPageMetrics = updated.getPageTimes(); + if (updatedPageMetrics != null) { + for (PageMetric p : updatedPageMetrics) { + long pageNum = p.getPageNumber(); + PageMetric storedPageMetric = storedPageMetricMap.get(pageNum); + if (storedPageMetric == null || !storedPageMetric.equals(p)) { + fields.put("PAGE_METRICS." + p.getPageNumber(), p.toEventString()); + } + } } } - Set predictions = updatedQueryMetric.getPredictions(); - if (predictions != null && !predictions.isEmpty()) { - for (Prediction prediction : predictions) { - fields.put("PREDICTION", prediction.getName() + ":" + prediction.getPrediction()); + if (isFirstWrite(updated.getPredictions(), stored == null ? null : stored.getPredictions())) { + Set predictions = updated.getPredictions(); + if (predictions != null && !predictions.isEmpty()) { + for (Prediction prediction : predictions) { + fields.put("PREDICTION", prediction.getName() + ":" + prediction.getPrediction()); + } } } - if (updatedQueryMetric.getQuery() != null) { - fields.put("QUERY", updatedQueryMetric.getQuery()); + if (isFirstWrite(updated.getQuery(), stored == null ? null : stored.getQuery())) { + fields.put("QUERY", updated.getQuery()); } - if (updatedQueryMetric.getQueryId() != null) { - fields.put("QUERY_ID", updatedQueryMetric.getQueryId()); + if (isFirstWrite(updated.getQueryId(), stored == null ? null : stored.getQueryId())) { + fields.put("QUERY_ID", updated.getQueryId()); } - if (updatedQueryMetric.getQueryLogic() != null) { - fields.put("QUERY_LOGIC", updatedQueryMetric.getQueryLogic()); + if (isFirstWrite(updated.getQueryLogic(), stored == null ? null : stored.getQueryLogic())) { + fields.put("QUERY_LOGIC", updated.getQueryLogic()); } - if (updatedQueryMetric.getQueryName() != null) { - fields.put("QUERY_NAME", updatedQueryMetric.getQueryName()); + if (isFirstWrite(updated.getQueryName(), stored == null ? null : stored.getQueryName())) { + fields.put("QUERY_NAME", updated.getQueryName()); } - if (updatedQueryMetric.getQueryType() != null) { - fields.put("QUERY_TYPE", updatedQueryMetric.getQueryType()); + if (isFirstWrite(updated.getQueryType(), stored == null ? null : stored.getQueryType())) { + fields.put("QUERY_TYPE", updated.getQueryType()); } - fields.put("SETUP_TIME", Long.toString(updatedQueryMetric.getSetupTime())); - fields.put("SEEK_COUNT", Long.toString(updatedQueryMetric.getSeekCount())); - fields.put("SOURCE_COUNT", Long.toString(updatedQueryMetric.getSourceCount())); - if (updatedQueryMetric.getUser() != null) { - fields.put("USER", updatedQueryMetric.getUser()); + if (isFirstWrite(updated.getSetupTime(), stored == null ? 0 : stored.getSetupTime(), 0)) { + fields.put("SETUP_TIME", Long.toString(updated.getSetupTime())); } - if (updatedQueryMetric.getUserDN() != null) { - fields.put("USER_DN", updatedQueryMetric.getUserDN()); + if (isChanged(updated.getSeekCount(), stored == null ? -1 : stored.getSeekCount())) { + fields.put("SEEK_COUNT", Long.toString(updated.getSeekCount())); } - Map versionMap = updatedQueryMetric.getVersionMap(); - if (versionMap != null) { - versionMap.entrySet().stream().forEach(e -> { - fields.put("VERSION." + e.getKey().toUpperCase(), e.getValue()); - }); + if (isChanged(updated.getSourceCount(), stored == null ? -1 : stored.getSourceCount())) { + fields.put("SOURCE_COUNT", Long.toString(updated.getSourceCount())); + } + Map updatedSubPlans = updated.getSubPlans(); + if (updatedSubPlans != null && !updatedSubPlans.isEmpty()) { + Map storedSubPlans = stored == null ? null : stored.getSubPlans(); + for (Map.Entry entry : updatedSubPlans.entrySet()) { + if (storedSubPlans == null || !storedSubPlans.containsKey(entry.getKey())) { + fields.put("SUBPLAN", entry.getKey() + " : " + Arrays.toString(entry.getValue())); + } + } + } + if (isFirstWrite(updated.getUser(), stored == null ? null : stored.getUser())) { + fields.put("USER", updated.getUser()); + } + if (isFirstWrite(updated.getUserDN(), stored == null ? null : stored.getUserDN())) { + fields.put("USER_DN", updated.getUserDN()); + } + if (isFirstWrite(updated.getVersionMap(), stored == null ? null : stored.getVersionMap())) { + Map versionMap = updated.getVersionMap(); + if (versionMap != null) { + versionMap.entrySet().stream().forEach(e -> { + fields.put("VERSION." + e.getKey().toUpperCase(), e.getValue()); + }); + } + } + if (isChanged(updated.getYieldCount(), stored == null ? -1 : stored.getYieldCount())) { + fields.put("YIELD_COUNT", Long.toString(updated.getYieldCount())); } - fields.put("YIELD_COUNT", Long.toString(updatedQueryMetric.getYieldCount())); - putExtendedFieldsToWrite(updatedQueryMetric, fields); + putExtendedFieldsToWrite(updated, stored, fields); HashMultimap truncatedFields = HashMultimap.create(); fields.entries().forEach(e -> { @@ -213,86 +337,93 @@ public Multimap getEventFieldsToWrite(T updatedQueryMetric) { return truncatedFields; } - protected void putExtendedFieldsToWrite(T updatedQueryMetric, Multimap fields) { + protected void putExtendedFieldsToWrite(T updated, T stored, Multimap fields) { } - public Multimap getEventFieldsToDelete(T updatedQueryMetric, T storedQueryMetric) { + public Multimap getEventFieldsToDelete(T updated, T stored) { HashMultimap fields = HashMultimap.create(); - - SimpleDateFormat sdf_date_time2 = new SimpleDateFormat("yyyyMMdd HHmmss"); - - if (updatedQueryMetric.getCreateCallTime() != storedQueryMetric.getCreateCallTime()) { - fields.put("CREATE_CALL_TIME", Long.toString(storedQueryMetric.getCreateCallTime())); - } - if (updatedQueryMetric.getDocRanges() != storedQueryMetric.getDocRanges()) { - fields.put("DOC_RANGES", Long.toString(storedQueryMetric.getDocRanges())); - } - if (updatedQueryMetric.getElapsedTime() != storedQueryMetric.getElapsedTime()) { - fields.put("ELAPSED_TIME", Long.toString(storedQueryMetric.getElapsedTime())); - } - if (updatedQueryMetric.getFiRanges() != storedQueryMetric.getFiRanges()) { - fields.put("FI_RANGES", Long.toString(storedQueryMetric.getFiRanges())); - } - if (storedQueryMetric.getLastUpdated() != null && updatedQueryMetric.getLastUpdated() != null) { - String storedValue = sdf_date_time2.format(storedQueryMetric.getLastUpdated()); - String updatedValue = sdf_date_time2.format(updatedQueryMetric.getLastUpdated()); - if (!updatedValue.equals(storedValue)) { - fields.put("LAST_UPDATED", storedValue); + if (updated != null && stored != null) { + + SimpleDateFormat sdf_date_time2 = new SimpleDateFormat("yyyyMMdd HHmmss"); + + if (isChanged(updated.getCreateCallTime(), stored.getCreateCallTime())) { + fields.put("CREATE_CALL_TIME", Long.toString(stored.getCreateCallTime())); } - } - if (!updatedQueryMetric.getLifecycle().equals(storedQueryMetric.getLifecycle())) { - if (storedQueryMetric.getLifecycle() != null) { - fields.put("LIFECYCLE", storedQueryMetric.getLifecycle().toString()); + if (isChanged(updated.getDocRanges(), stored.getDocRanges())) { + fields.put("DOC_RANGES", Long.toString(stored.getDocRanges())); } - } - if (updatedQueryMetric.getLoginTime() != storedQueryMetric.getLoginTime()) { - fields.put("LOGIN_TIME", Long.toString(storedQueryMetric.getLoginTime())); - } - if (updatedQueryMetric.getNumUpdates() != storedQueryMetric.getNumUpdates()) { - fields.put("NUM_UPDATES", Long.toString(storedQueryMetric.getNumUpdates())); - } - if (updatedQueryMetric.getNextCount() != storedQueryMetric.getNextCount()) { - fields.put("NEXT_COUNT", Long.toString(storedQueryMetric.getNextCount())); - } - if (updatedQueryMetric.getNumPages() != storedQueryMetric.getNumPages()) { - fields.put("NUM_PAGES", Long.toString(storedQueryMetric.getNumPages())); - } - if (updatedQueryMetric.getNumResults() != storedQueryMetric.getNumResults()) { - fields.put("NUM_RESULTS", Long.toString(storedQueryMetric.getNumResults())); - } - Map storedPageMetricMap = new HashMap<>(); - List storedPageMetrics = storedQueryMetric.getPageTimes(); - if (storedPageMetrics != null) { - for (PageMetric p : storedPageMetrics) { - storedPageMetricMap.put(p.getPageNumber(), p); + if (isChanged(updated.getElapsedTime(), stored.getElapsedTime())) { + fields.put("ELAPSED_TIME", Long.toString(stored.getElapsedTime())); } - } - List updatedPageMetrics = updatedQueryMetric.getPageTimes(); - if (updatedPageMetrics != null) { - for (PageMetric p : updatedPageMetrics) { - long pageNum = p.getPageNumber(); - PageMetric storedPageMetric = storedPageMetricMap.get(pageNum); - if (storedPageMetric != null && !storedPageMetric.equals(p)) { - fields.put("PAGE_METRICS." + storedPageMetric.getPageNumber(), storedPageMetric.toEventString()); + if (isChanged(updated.getFiRanges(), stored.getFiRanges())) { + fields.put("FI_RANGES", Long.toString(stored.getFiRanges())); + } + if (stored.getLastUpdated() != null && updated.getLastUpdated() != null) { + try { + String storedValue = sdf_date_time2.format(stored.getLastUpdated()); + String updatedValue = sdf_date_time2.format(updated.getLastUpdated()); + if (!updatedValue.equals(storedValue)) { + fields.put("LAST_UPDATED", storedValue); + } + } catch (Exception e) { + log.error("lastUpdated:" + e.getMessage()); } } + if (isChanged(updated.getLifecycle(), stored.getLifecycle())) { + fields.put("LIFECYCLE", stored.getLifecycle().toString()); + } + if (isChanged(updated.getLoginTime(), stored.getLoginTime())) { + fields.put("LOGIN_TIME", Long.toString(stored.getLoginTime())); + } + if (isChanged(updated.getNumUpdates(), stored.getNumUpdates())) { + fields.put("NUM_UPDATES", Long.toString(stored.getNumUpdates())); + } + if (isChanged(updated.getNextCount(), stored.getNextCount())) { + fields.put("NEXT_COUNT", Long.toString(stored.getNextCount())); + } + if (isChanged(updated.getNumPages(), stored.getNumPages())) { + fields.put("NUM_PAGES", Long.toString(stored.getNumPages())); + } + if (isChanged(updated.getNumResults(), stored.getNumResults())) { + fields.put("NUM_RESULTS", Long.toString(stored.getNumResults())); + } + Map storedPageMetricMap = new HashMap<>(); + if (stored != null) { + List storedPageMetrics = stored.getPageTimes(); + if (storedPageMetrics != null) { + for (PageMetric p : storedPageMetrics) { + storedPageMetricMap.put(p.getPageNumber(), p); + } + } + } + if (updated != null) { + List updatedPageMetrics = updated.getPageTimes(); + if (updatedPageMetrics != null) { + for (PageMetric p : updatedPageMetrics) { + long pageNum = p.getPageNumber(); + PageMetric storedPageMetric = storedPageMetricMap.get(pageNum); + if (storedPageMetric != null && !storedPageMetric.equals(p)) { + fields.put("PAGE_METRICS." + storedPageMetric.getPageNumber(), storedPageMetric.toEventString()); + } + } + } + } + if (isChanged(updated.getSeekCount(), stored.getSeekCount())) { + fields.put("SEEK_COUNT", Long.toString(stored.getSeekCount())); + } + if (isChanged(updated.getSetupTime(), stored.getSetupTime())) { + fields.put("SETUP_TIME", Long.toString(stored.getSetupTime())); + } + if (isChanged(updated.getSourceCount(), stored.getSourceCount())) { + fields.put("SOURCE_COUNT", Long.toString(stored.getSourceCount())); + } + if (isChanged(updated.getYieldCount(), stored.getYieldCount())) { + fields.put("YIELD_COUNT", Long.toString(stored.getYieldCount())); + } + putExtendedFieldsToDelete(updated, stored, fields); } - if (updatedQueryMetric.getSeekCount() != storedQueryMetric.getSeekCount()) { - fields.put("SEEK_COUNT", Long.toString(storedQueryMetric.getSeekCount())); - } - if (updatedQueryMetric.getSetupTime() != storedQueryMetric.getSetupTime()) { - fields.put("SETUP_TIME", Long.toString(storedQueryMetric.getSetupTime())); - } - if (updatedQueryMetric.getSourceCount() != storedQueryMetric.getSourceCount()) { - fields.put("SOURCE_COUNT", Long.toString(storedQueryMetric.getSourceCount())); - } - if (updatedQueryMetric.getYieldCount() != storedQueryMetric.getYieldCount()) { - fields.put("YIELD_COUNT", Long.toString(storedQueryMetric.getYieldCount())); - } - putExtendedFieldsToDelete(updatedQueryMetric, fields); - HashMultimap truncatedFields = HashMultimap.create(); fields.entries().forEach(e -> { if (e.getValue().length() > MAX_FIELD_VALUE_LENGTH) { @@ -304,7 +435,7 @@ public Multimap getEventFieldsToDelete(T updatedQueryMetric, T st return truncatedFields; } - protected void putExtendedFieldsToDelete(T updatedQueryMetric, Multimap fields) { + protected void putExtendedFieldsToDelete(T updated, T stored, Multimap fields) { } } diff --git a/service/src/main/java/datawave/microservice/querymetric/handler/QueryMetricCombiner.java b/service/src/main/java/datawave/microservice/querymetric/handler/QueryMetricCombiner.java index 61e45566..c141f682 100644 --- a/service/src/main/java/datawave/microservice/querymetric/handler/QueryMetricCombiner.java +++ b/service/src/main/java/datawave/microservice/querymetric/handler/QueryMetricCombiner.java @@ -36,11 +36,10 @@ public T combineMetrics(T updatedQueryMetric, T cachedQueryMetric, QueryMetricTy if (combinedMetric.getUserDN() == null && updatedQueryMetric.getUserDN() != null) { combinedMetric.setUserDN(updatedQueryMetric.getUserDN()); } - // keep the earliest create date - long cachedCreate = combinedMetric.getCreateDate() == null ? Long.MAX_VALUE : combinedMetric.getCreateDate().getTime(); - long updatedCreate = updatedQueryMetric.getCreateDate() == null ? Long.MAX_VALUE : updatedQueryMetric.getCreateDate().getTime(); - if (updatedCreate < cachedCreate) { - combinedMetric.setCreateDate(updatedQueryMetric.getCreateDate()); + + // keep the original createDate + if (cachedQueryMetric.getCreateDate() != null) { + combinedMetric.setCreateDate(cachedQueryMetric.getCreateDate()); } // Do not update queryId -- shouldn't change anyway @@ -189,11 +188,13 @@ public T combineMetrics(T updatedQueryMetric, T cachedQueryMetric, QueryMetricTy combinedMetric.setPlan(updatedQueryMetric.getPlan()); } // only update once - if (combinedMetric.getSubPlans() == null && updatedQueryMetric.getSubPlans() != null) { + if ((combinedMetric.getSubPlans() == null || combinedMetric.getSubPlans().isEmpty()) && updatedQueryMetric.getSubPlans() != null + && !updatedQueryMetric.getSubPlans().isEmpty()) { combinedMetric.setSubPlans(updatedQueryMetric.getSubPlans()); } // only update once - if (combinedMetric.getPredictions() == null && updatedQueryMetric.getPredictions() != null) { + if ((combinedMetric.getPredictions() == null || combinedMetric.getPredictions().isEmpty()) && updatedQueryMetric.getPredictions() != null + && !updatedQueryMetric.getPredictions().isEmpty()) { combinedMetric.setPredictions(updatedQueryMetric.getPredictions()); } // use the max numUpdates diff --git a/service/src/main/java/datawave/microservice/querymetric/handler/ShardTableQueryMetricHandler.java b/service/src/main/java/datawave/microservice/querymetric/handler/ShardTableQueryMetricHandler.java index 357910a2..aebfe458 100644 --- a/service/src/main/java/datawave/microservice/querymetric/handler/ShardTableQueryMetricHandler.java +++ b/service/src/main/java/datawave/microservice/querymetric/handler/ShardTableQueryMetricHandler.java @@ -26,13 +26,11 @@ import datawave.microservice.querymetric.QueryMetricFactory; import datawave.microservice.querymetric.QueryMetricType; import datawave.microservice.querymetric.QueryMetricsSummaryResponse; -import datawave.microservice.querymetric.QueryMetricsSubplanResponse; import datawave.microservice.querymetric.config.QueryMetricHandlerProperties; import datawave.microservice.querymetric.factory.QueryMetricQueryLogicFactory; import datawave.query.QueryParameters; import datawave.query.iterator.QueryOptions; import datawave.query.language.parser.jexl.LuceneToJexlQueryParser; -import datawave.query.map.SimpleQueryGeometryHandler; import datawave.security.authorization.DatawavePrincipal; import datawave.security.authorization.DatawaveUser; import datawave.security.authorization.SubjectIssuerDNPair; @@ -214,31 +212,39 @@ public AbstractContentIngestHelper getContentIndexingDataTypeHelper() { } } - public void writeMetric(T updatedQueryMetric, List storedQueryMetrics, Date lastUpdated, boolean delete) throws Exception { + private void writeMetric(T updated, T stored, long timestamp, boolean delete, ContentIndexingColumnBasedHandler handler) throws Exception { + Multimap r = getEntries(handler, updated, stored, timestamp); + if (r != null) { + for (Entry e : r.entries()) { + recordWriter.write(e.getKey().getTableName(), getMutation(e.getKey().getKey(), e.getValue())); + } + } + if (!delete && handler.getMetadata() != null) { + for (Entry e : handler.getMetadata().getBulkMetadata().entries()) { + recordWriter.write(e.getKey().getTableName(), getMutation(e.getKey().getKey(), e.getValue())); + } + } + } + + public void writeMetric(T updatedQueryMetric, List storedQueryMetrics, long timestamp, boolean delete) throws Exception { try { TaskAttemptID taskId = new TaskAttemptID(new TaskID(new JobID(JOB_ID, 1), TaskType.MAP, 1), 1); - this.accumuloRecordWriterLock.readLock().lock(); + try { MapContext context = new MapContextImpl<>(conf, taskId, null, this.recordWriter, null, reporter, null); - for (T storedQueryMetric : storedQueryMetrics) { - ContentIndexingColumnBasedHandler handler = new ContentIndexingColumnBasedHandler() { - @Override - public AbstractContentIngestHelper getContentIndexingDataTypeHelper() { - return getQueryMetricsIngestHelper(delete); - } - }; - handler.setup(context); - Multimap r = getEntries(handler, updatedQueryMetric, storedQueryMetric, lastUpdated); - if (r != null) { - for (Entry e : r.entries()) { - recordWriter.write(e.getKey().getTableName(), getMutation(e.getKey().getKey(), e.getValue())); - } + ContentIndexingColumnBasedHandler handler = new ContentIndexingColumnBasedHandler() { + @Override + public AbstractContentIngestHelper getContentIndexingDataTypeHelper() { + return getQueryMetricsIngestHelper(delete); } - if (!delete && handler.getMetadata() != null) { - for (Entry e : handler.getMetadata().getBulkMetadata().entries()) { - recordWriter.write(e.getKey().getTableName(), getMutation(e.getKey().getKey(), e.getValue())); - } + }; + handler.setup(context); + if (storedQueryMetrics.isEmpty()) { + writeMetric(updatedQueryMetric, null, timestamp, delete, handler); + } else { + for (T storedQueryMetric : storedQueryMetrics) { + writeMetric(updatedQueryMetric, storedQueryMetric, timestamp, delete, handler); } } } finally { @@ -272,14 +278,14 @@ public Map getEventFields(BaseQueryMetric queryMetric) { Map eventFields = new HashMap<>(); ContentQueryMetricsIngestHelper ingestHelper = getQueryMetricsIngestHelper(false); ingestHelper.setup(conf); - Multimap fieldsToWrite = ingestHelper.getEventFieldsToWrite(queryMetric); + Multimap fieldsToWrite = ingestHelper.getEventFieldsToWrite(queryMetric, null); for (Entry entry : fieldsToWrite.entries()) { eventFields.put(entry.getKey(), entry.getValue().getEventFieldValue()); } return eventFields; } - private Multimap getEntries(ContentIndexingColumnBasedHandler handler, T updatedQueryMetric, T storedQueryMetric, Date lastUpdated) { + private Multimap getEntries(ContentIndexingColumnBasedHandler handler, T updatedQueryMetric, T storedQueryMetric, long timestamp) { Type type = TypeRegistry.getType("querymetrics"); ContentQueryMetricsIngestHelper ingestHelper = (ContentQueryMetricsIngestHelper) handler.getContentIndexingDataTypeHelper(); boolean deleteMode = ingestHelper.getDeleteMode(); @@ -288,7 +294,7 @@ private Multimap getEntries(ContentIndexingColumnBasedHandl RawRecordContainerImpl event = new RawRecordContainerImpl(); event.setConf(this.conf); event.setDataType(type); - event.setDate(storedQueryMetric.getCreateDate().getTime()); + event.setDate(updatedQueryMetric.getCreateDate().getTime()); // get markings from metric, otherwise use the default markings Map markings = updatedQueryMetric.getMarkings(); if (markings != null && !markings.isEmpty()) { @@ -301,18 +307,18 @@ private Multimap getEntries(ContentIndexingColumnBasedHandl } else { event.setVisibility(this.queryMetricHandlerProperties.getDefaultMetricVisibility()); } - event.setAuxData(storedQueryMetric); + event.setAuxData(updatedQueryMetric); event.setRawRecordNumber(1000L); - event.addAltId(storedQueryMetric.getQueryId()); + event.addAltId(updatedQueryMetric.getQueryId()); - event.setId(uidBuilder.newId(storedQueryMetric.getQueryId().getBytes(Charset.forName("UTF-8")), (Date) null)); + event.setId(uidBuilder.newId(updatedQueryMetric.getQueryId().getBytes(Charset.forName("UTF-8")), (Date) null)); final Multimap fields; if (deleteMode) { fields = ingestHelper.getEventFieldsToDelete(updatedQueryMetric, storedQueryMetric); } else { - fields = ingestHelper.getEventFieldsToWrite(updatedQueryMetric); + fields = ingestHelper.getEventFieldsToWrite(updatedQueryMetric, storedQueryMetric); } Key key = new Key(); @@ -345,19 +351,14 @@ private Multimap getEntries(ContentIndexingColumnBasedHandl r.removeAll(b); } - // replace the longest of the keys from fields that get parse - // d as content + // replace the longest of the keys from + // fields that get parsed as content for (Entry l : tfFields.entrySet()) { r.put(l.getValue(), new Value(new byte[0])); } for (Entry> entry : r.asMap().entrySet()) { - if (deleteMode) { - entry.getKey().getKey().setTimestamp(lastUpdated.getTime()); - } else { - // this will ensure that the QueryMetrics can be found within second precision in most cases - entry.getKey().getKey().setTimestamp(storedQueryMetric.getCreateDate().getTime() + storedQueryMetric.getNumUpdates()); - } + entry.getKey().getKey().setTimestamp(timestamp); entry.getKey().getKey().setDeleted(deleteMode); } @@ -370,6 +371,11 @@ public T combineMetrics(T updatedQueryMetric, T cachedQueryMetric, QueryMetricTy return (T) queryMetricCombiner.combineMetrics(updatedQueryMetric, cachedQueryMetric, metricType); } + public T getQueryMetric(final String queryId) throws Exception { + List queryMetrics = getQueryMetrics("QUERY_ID == '" + queryId + "'", null); + return queryMetrics.isEmpty() ? null : queryMetrics.get(0); + } + public T getQueryMetric(final String queryId, String blacklistedFields) throws Exception { List queryMetrics = getQueryMetrics("QUERY_ID == '" + queryId + "'", blacklistedFields); return queryMetrics.isEmpty() ? null : queryMetrics.get(0); @@ -394,7 +400,7 @@ public List getQueryMetrics(final String query, String blacklistedFields) thr queryImpl.setPagesize(1000); queryImpl.setId(UUID.randomUUID()); queryImpl.setParameters(ImmutableMap.of(QueryOptions.INCLUDE_GROUPING_CONTEXT, "true")); - if (!blacklistedFields.isEmpty()) { + if (StringUtils.isNotBlank(blacklistedFields)) { queryImpl.addParameter(QueryParameters.BLACKLISTED_FIELDS, blacklistedFields); } return getQueryMetrics(queryImpl); @@ -503,7 +509,7 @@ public T toMetric(EventBase event) { List field = event.getFields(); m.setMarkings(event.getMarkings()); TreeMap pageMetrics = Maps.newTreeMap(); - Map subplans = new HashMap<>(); + Map subplans = new HashMap<>(); boolean createDateSet = false; for (FieldBase f : field) { @@ -518,7 +524,7 @@ public T toMetric(EventBase event) { Date d = sdf_date_time1.parse(fieldValue); m.setBeginDate(d); } catch (Exception e) { - log.error(e.getMessage()); + log.error(fieldName + ":" + fieldValue + ":" + e.getMessage()); } } else if (fieldName.equals("CREATE_CALL_TIME")) { m.setCreateCallTime(Long.parseLong(fieldValue)); @@ -528,34 +534,55 @@ public T toMetric(EventBase event) { m.setCreateDate(d); createDateSet = true; } catch (Exception e) { - log.error(e.getMessage()); + log.error(fieldName + ":" + fieldValue + ":" + e.getMessage()); } } else if (fieldName.equals("DOC_RANGES")) { - m.setDocRanges(Long.parseLong(fieldValue)); + try { + long l = Long.parseLong(fieldValue); + if (l > m.getDocRanges()) { + m.setDocRanges(l); + } + } catch (Exception e) { + log.error(fieldName + ":" + fieldValue + ":" + e.getMessage()); + } } else if (fieldName.equals("END_DATE")) { try { Date d = sdf_date_time1.parse(fieldValue); m.setEndDate(d); } catch (Exception e) { - log.error(e.getMessage()); + log.error(fieldName + ":" + fieldValue + ":" + e.getMessage()); } } else if (fieldName.equals("ERROR_CODE")) { m.setErrorCode(fieldValue); } else if (fieldName.equals("ERROR_MESSAGE")) { m.setErrorMessage(fieldValue); } else if (fieldName.equals("FI_RANGES")) { - m.setFiRanges(Long.parseLong(fieldValue)); + try { + long l = Long.parseLong(fieldValue); + if (l > m.getFiRanges()) { + m.setFiRanges(l); + } + } catch (Exception e) { + log.error(fieldName + ":" + fieldValue + ":" + e.getMessage()); + } } else if (fieldName.equals("HOST")) { m.setHost(fieldValue); } else if (fieldName.equals("LAST_UPDATED")) { try { Date d = sdf_date_time2.parse(fieldValue); - m.setLastUpdated(d); + // protect against multiple values by coosing the latest + if (m.getLastUpdated() == null || d.after(m.getLastUpdated())) { + m.setLastUpdated(d); + } } catch (Exception e) { - log.error(e.getMessage()); + log.error(fieldName + ":" + fieldValue + ":" + e.getMessage()); } } else if (fieldName.equals("LIFECYCLE")) { - m.setLifecycle(Lifecycle.valueOf(fieldValue)); + Lifecycle l = Lifecycle.valueOf(fieldValue); + // protect against multiple values by choosing the last by ordinal + if (m.getLifecycle() == null || l.ordinal() > m.getLifecycle().ordinal()) { + m.setLifecycle(Lifecycle.valueOf(fieldValue)); + } } else if (fieldName.equals("LOGIN_TIME")) { m.setLoginTime(Long.parseLong(fieldValue)); } else if (fieldName.equals("NEGATIVE_SELECTORS")) { @@ -566,13 +593,22 @@ public T toMetric(EventBase event) { negativeSelectors.add(fieldValue); m.setNegativeSelectors(negativeSelectors); } else if (fieldName.equals("NEXT_COUNT")) { - m.setNextCount(Long.parseLong(fieldValue)); + try { + long l = Long.parseLong(fieldValue); + if (l > m.getNextCount()) { + m.setNextCount(l); + } + } catch (Exception e) { + log.error(fieldName + ":" + fieldValue + ":" + e.getMessage()); + } } else if (fieldName.equals("NUM_UPDATES")) { try { - long numUpdates = Long.parseLong(fieldValue); - m.setNumUpdates(numUpdates); + long l = Long.parseLong(fieldValue); + if (l > m.getNumUpdates()) { + m.setNumUpdates(l); + } } catch (Exception e) { - log.error(e.getMessage()); + log.error(fieldName + ":" + fieldValue + ":" + e.getMessage()); } } else if (fieldName.startsWith("PAGE_METRICS")) { int index = fieldName.indexOf("."); @@ -597,8 +633,18 @@ public T toMetric(EventBase event) { } else if (fieldName.equals("PLAN")) { m.setPlan(fieldValue); } else if (fieldName.equals("SUBPLAN")) { - String[] arr = fieldValue.split(" : ", 2); - subplans.put(arr[0], arr[1]); + if (fieldValue != null) { + String[] arr = fieldValue.split(" : ", 2); + int[] rangeCounts = new int[2]; + int index = 0; + for (String count : arr[1].substring(1, arr[1].length() - 1).split(", ")) { + rangeCounts[index] = Integer.parseInt(count); + index++; + } + if (arr.length >= 2) { + subplans.put(arr[0], rangeCounts); + } + } } else if (fieldName.equals("POSITIVE_SELECTORS")) { List positiveSelectors = m.getPositiveSelectors(); if (positiveSelectors == null) { @@ -632,11 +678,25 @@ public T toMetric(EventBase event) { } else if (fieldName.equals("QUERY_TYPE")) { m.setQueryType(fieldValue); } else if (fieldName.equals("SEEK_COUNT")) { - m.setSeekCount(Long.parseLong(fieldValue)); + try { + long l = Long.parseLong(fieldValue); + if (l > m.getSeekCount()) { + m.setSeekCount(l); + } + } catch (Exception e) { + log.error(fieldName + ":" + fieldValue + ":" + e.getMessage()); + } } else if (fieldName.equals("SETUP_TIME")) { m.setSetupTime(Long.parseLong(fieldValue)); } else if (fieldName.equals("SOURCE_COUNT")) { - m.setSourceCount(Long.parseLong(fieldValue)); + try { + long l = Long.parseLong(fieldValue); + if (l > m.getSourceCount()) { + m.setSourceCount(l); + } + } catch (Exception e) { + log.error(fieldName + ":" + fieldValue + ":" + e.getMessage()); + } } else if (fieldName.equals("USER")) { m.setUser(fieldValue); } else if (fieldName.equals("USER_DN")) { @@ -646,7 +706,14 @@ public T toMetric(EventBase event) { } else if (fieldName.startsWith("VERSION.")) { m.addVersion(fieldName.substring(8), fieldValue); } else if (fieldName.equals("YIELD_COUNT")) { - m.setYieldCount(Long.parseLong(fieldValue)); + try { + long l = Long.parseLong(fieldValue); + if (l > m.getYieldCount()) { + m.setYieldCount(l); + } + } catch (Exception e) { + log.error(fieldName + ":" + fieldValue + ":" + e.getMessage()); + } } } } diff --git a/service/src/main/java/datawave/microservice/querymetric/persistence/AccumuloMapStore.java b/service/src/main/java/datawave/microservice/querymetric/persistence/AccumuloMapStore.java index 734965f3..0f5491d9 100644 --- a/service/src/main/java/datawave/microservice/querymetric/persistence/AccumuloMapStore.java +++ b/service/src/main/java/datawave/microservice/querymetric/persistence/AccumuloMapStore.java @@ -3,7 +3,6 @@ import com.codahale.metrics.SlidingTimeWindowArrayReservoir; import com.codahale.metrics.Timer; import com.google.common.cache.CacheBuilder; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.hazelcast.core.IMap; import com.hazelcast.core.MapLoader; import com.hazelcast.core.MapStore; @@ -12,7 +11,6 @@ import datawave.microservice.querymetric.MergeLockLifecycleListener; import datawave.microservice.querymetric.QueryMetricType; import datawave.microservice.querymetric.QueryMetricUpdateHolder; -import datawave.microservice.querymetric.config.QueryMetricHandlerProperties; import datawave.microservice.querymetric.handler.ShardTableQueryMetricHandler; import datawave.microservice.querymetric.QueryMetricUpdate; import org.slf4j.Logger; @@ -23,22 +21,16 @@ import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; import static java.util.concurrent.TimeUnit.MINUTES; -import static java.util.concurrent.TimeUnit.SECONDS; @Component("store") @ConditionalOnProperty(name = "hazelcast.server.enabled") @@ -48,11 +40,7 @@ public class AccumuloMapStore extends AccumuloMapLoad private Logger log = LoggerFactory.getLogger(AccumuloMapStore.class); private IMap lastWrittenQueryMetricCache; private MergeLockLifecycleListener mergeLock; - private QueryMetricHandlerProperties queryMetricHandlerProperties; private com.google.common.cache.Cache failures; - private ExecutorService executorService = null; - private SynchronousQueue updateQueue = new SynchronousQueue<>(true); - private List updateWriters = new ArrayList<>(); private Timer writeTimer = new Timer(new SlidingTimeWindowArrayReservoir(1, MINUTES)); private boolean shuttingDown = false; @@ -63,68 +51,17 @@ public MapLoader newMapStore(String mapName, Properties } } - public class Writer implements Runnable { - private SynchronousQueue updateQueue; - private boolean shuttingDown = false; - - public Writer(SynchronousQueue updateQueue) { - this.updateQueue = updateQueue; - } - - public void setShuttingDown(boolean shuttingDown) { - this.shuttingDown = shuttingDown; - } - - @Override - public void run() { - while (!this.shuttingDown) { - try { - QueryMetricUpdateHolder queryMetricUpdate = this.updateQueue.take(); - if (queryMetricUpdate != null) { - Timer.Context writeTimerContext = writeTimer.time(); - try { - AccumuloMapStore.this.storeWithRetry(queryMetricUpdate); - } finally { - writeTimerContext.stop(); - } - } - } catch (InterruptedException e) { - if (!this.shuttingDown) { - log.error(e.getMessage(), e); - } - } catch (Exception e) { - log.error(e.getMessage(), e); - } - } - } - } - @Autowired - public AccumuloMapStore(ShardTableQueryMetricHandler handler, QueryMetricHandlerProperties queryMetricHandlerProperties, - MergeLockLifecycleListener mergeLock) { + public AccumuloMapStore(ShardTableQueryMetricHandler handler, MergeLockLifecycleListener mergeLock) { this.handler = handler; this.mergeLock = mergeLock; - this.queryMetricHandlerProperties = queryMetricHandlerProperties; this.failures = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.MINUTES).build(); AccumuloMapStore.instance = this; - int writerThreads = queryMetricHandlerProperties.getMapStoreWriteThreads(); - if (writerThreads > 1) { - this.executorService = Executors.newFixedThreadPool(writerThreads, new ThreadFactoryBuilder().setNameFormat("map-store-write-thread-%d").build()); - for (int x = 0; x < writerThreads; x++) { - Writer w = new Writer(this.updateQueue); - this.updateWriters.add(w); - this.executorService.submit(w); - } - } } @PreDestroy public void shutdown() { this.shuttingDown = true; - // stop the writer threads - for (Writer w : this.updateWriters) { - w.setShuttingDown(true); - } // ensure that queued updates written to the handler's // MultiTabletBatchWriter are flushed to Accumulo on shutdown try { @@ -132,19 +69,6 @@ public void shutdown() { } catch (Exception e) { log.error(e.getMessage(), e); } - - if (this.executorService != null) { - // ensure the writer threads exit - boolean shutdownSuccess = false; - try { - shutdownSuccess = this.executorService.awaitTermination(60, SECONDS); - } catch (InterruptedException e) { - - } - if (!shutdownSuccess) { - this.executorService.shutdownNow(); - } - } } public void setLastWrittenQueryMetricCache(Cache lastWrittenQueryMetricCache) { @@ -153,16 +77,7 @@ public void setLastWrittenQueryMetricCache(Cache lastWrittenQueryMetricCache) { @Override public void store(String queryId, QueryMetricUpdateHolder queryMetricUpdate) { - if (queryMetricHandlerProperties.getMapStoreWriteThreads() > 1) { - try { - this.updateQueue.put(queryMetricUpdate); - } catch (Exception e) { - // hazelcast will retry storing the update - throw new RuntimeException(e); - } - } else { - storeWithRetry(queryMetricUpdate); - } + storeWithRetry(queryMetricUpdate); } public void storeWithRetry(QueryMetricUpdateHolder queryMetricUpdate) { @@ -185,24 +100,53 @@ public void storeWithRetry(QueryMetricUpdateHolder queryMetricUpdate) { public void store(QueryMetricUpdateHolder queryMetricUpdate) throws Exception { String queryId = queryMetricUpdate.getMetric().getQueryId(); - T updatedMetric; + T updatedMetric = null; this.mergeLock.lock(); try { - updatedMetric = queryMetricUpdate.getMetric(); + updatedMetric = (T) queryMetricUpdate.getMetric().duplicate(); QueryMetricType metricType = queryMetricUpdate.getMetricType(); QueryMetricUpdateHolder lastQueryMetricUpdate = null; + if (!queryMetricUpdate.isNewMetric()) { lastQueryMetricUpdate = (QueryMetricUpdateHolder) lastWrittenQueryMetricCache.get(queryId); } if (lastQueryMetricUpdate != null) { T lastQueryMetric = lastQueryMetricUpdate.getMetric(); + if (metricType.equals(QueryMetricType.DISTRIBUTED)) { + // these values are added incrementally in a distributed update. Because we can not be sure + // exactly when the incomingQueryMetricCache value is stored, it would otherwise be possible + // for updates to be included twice. + updatedMetric.setSourceCount(queryMetricUpdate.getValue("sourceCount")); + updatedMetric.setNextCount(queryMetricUpdate.getValue("nextCount")); + updatedMetric.setSeekCount(queryMetricUpdate.getValue("seekCount")); + updatedMetric.setYieldCount(queryMetricUpdate.getValue("yieldCount")); + updatedMetric.setDocRanges(queryMetricUpdate.getValue("docRanges")); + updatedMetric.setFiRanges(queryMetricUpdate.getValue("fiRanges")); + } + updatedMetric = handler.combineMetrics(updatedMetric, lastQueryMetric, metricType); - // if for some reason, lastQueryMetric doesn't have lastUpdated set, - // we can not delete the previous entries and will cause an NPE if we try + long numUpdates = updatedMetric.getNumUpdates(); + // The createDate shouldn't change once it is set, so this is just insurance + // We use the higher timestamp to ensure that the deletes and successive writes persist + // As long as this timestamp is greater than when it was written, then the delete will be effective + long deleteTimestamp; + if (lastQueryMetric.getCreateDate().after(updatedMetric.getCreateDate())) { + deleteTimestamp = updatedMetric.getCreateDate().getTime() + numUpdates; + } else { + deleteTimestamp = lastQueryMetric.getCreateDate().getTime() + numUpdates; + } + long writeTimestamp = deleteTimestamp + 1; + updatedMetric.setNumUpdates(numUpdates + 1); + updatedMetric.setLastUpdated(new Date(updatedMetric.getLastUpdated().getTime() + 1)); + if (lastQueryMetric.getLastUpdated() != null) { - handler.writeMetric(updatedMetric, Collections.singletonList(lastQueryMetric), lastQueryMetric.getLastUpdated(), true); + handler.writeMetric(updatedMetric, Collections.singletonList(lastQueryMetric), deleteTimestamp, true); } + handler.writeMetric(updatedMetric, Collections.singletonList(lastQueryMetric), writeTimestamp, false); + } else { + updatedMetric.setLastUpdated(updatedMetric.getCreateDate()); + handler.writeMetric(updatedMetric, Collections.emptyList(), updatedMetric.getCreateDate().getTime(), false); } if (log.isTraceEnabled()) { log.trace("writing metric to accumulo: " + queryId + " - " + queryMetricUpdate.getMetric()); @@ -210,14 +154,22 @@ public void store(QueryMetricUpdateHolder queryMetricUpdate) throws Exception log.debug("writing metric to accumulo: " + queryId); } - if (updatedMetric.getLastUpdated() == null) { - updatedMetric.setLastUpdated(new Date()); - } - - handler.writeMetric(updatedMetric, Collections.singletonList(updatedMetric), updatedMetric.getLastUpdated(), false); lastWrittenQueryMetricCache.set(queryId, new QueryMetricUpdateHolder(updatedMetric)); + queryMetricUpdate.persisted(); failures.invalidate(queryId); } finally { + if (queryMetricUpdate.getMetricType().equals(QueryMetricType.DISTRIBUTED)) { + // we've added the accumulated updates, so they can be reset + if (updatedMetric != null) { + // this ensures that the incomingQueryMetricsCache has the latest values + queryMetricUpdate.getMetric().setSourceCount(updatedMetric.getSourceCount()); + queryMetricUpdate.getMetric().setNextCount(updatedMetric.getNextCount()); + queryMetricUpdate.getMetric().setSeekCount(updatedMetric.getSeekCount()); + queryMetricUpdate.getMetric().setYieldCount(updatedMetric.getYieldCount()); + queryMetricUpdate.getMetric().setDocRanges(updatedMetric.getDocRanges()); + queryMetricUpdate.getMetric().setFiRanges(updatedMetric.getFiRanges()); + } + } this.mergeLock.unlock(); } } diff --git a/service/src/test/java/datawave/microservice/querymetric/HazelcastCachingTest.java b/service/src/test/java/datawave/microservice/querymetric/HazelcastCachingTest.java index 1ce6fdca..4b99d507 100644 --- a/service/src/test/java/datawave/microservice/querymetric/HazelcastCachingTest.java +++ b/service/src/test/java/datawave/microservice/querymetric/HazelcastCachingTest.java @@ -33,7 +33,7 @@ public void TestReadThroughCache() { try { String queryId = createQueryId(); BaseQueryMetric m = createMetric(queryId); - shardTableQueryMetricHandler.writeMetric(m, Collections.singletonList(m), m.getLastUpdated(), false); + shardTableQueryMetricHandler.writeMetric(m, Collections.emptyList(), m.getCreateDate().getTime(), false); BaseQueryMetric metricFromReadThroughCache = lastWrittenQueryMetricCache.get(queryId, QueryMetricUpdate.class).getMetric(); assertEquals("read through cache failed", m, metricFromReadThroughCache); } catch (Exception e) { diff --git a/service/src/test/java/datawave/microservice/querymetric/NonWebApplicationMessagingTest.java b/service/src/test/java/datawave/microservice/querymetric/NonWebApplicationMessagingTest.java index ec0a55d1..4b474e4a 100644 --- a/service/src/test/java/datawave/microservice/querymetric/NonWebApplicationMessagingTest.java +++ b/service/src/test/java/datawave/microservice/querymetric/NonWebApplicationMessagingTest.java @@ -76,7 +76,7 @@ public void setup() { // this is to ensure that the QueryMetrics_m table // is populated so that queries work properly try { - this.shardTableQueryMetricHandler.writeMetric(m, Collections.singletonList(m), m.getLastUpdated(), false); + this.shardTableQueryMetricHandler.writeMetric(m, Collections.emptyList(), m.getCreateDate().getTime(), false); this.shardTableQueryMetricHandler.flush(); } catch (Exception e) { log.error(e.getMessage(), e); diff --git a/service/src/test/java/datawave/microservice/querymetric/QueryMetricConsistencyTest.java b/service/src/test/java/datawave/microservice/querymetric/QueryMetricConsistencyTest.java index 0ce368fe..f5cb513f 100644 --- a/service/src/test/java/datawave/microservice/querymetric/QueryMetricConsistencyTest.java +++ b/service/src/test/java/datawave/microservice/querymetric/QueryMetricConsistencyTest.java @@ -1,6 +1,7 @@ package datawave.microservice.querymetric; import com.google.common.collect.Multimap; +import datawave.data.hash.UID; import datawave.microservice.querymetric.handler.ContentQueryMetricsIngestHelper; import datawave.microservice.querymetric.persistence.AccumuloMapStore; import datawave.util.StringUtils; @@ -9,6 +10,8 @@ import datawave.webservice.query.result.event.EventBase; import datawave.webservice.query.result.event.FieldBase; import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.junit.After; import org.junit.Assert; @@ -25,21 +28,23 @@ import org.springframework.web.util.UriComponents; import org.springframework.web.util.UriComponentsBuilder; +import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; @RunWith(SpringRunner.class) @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) @ActiveProfiles({"QueryMetricConsistencyTest", "QueryMetricTest", "hazelcast-writethrough"}) public class QueryMetricConsistencyTest extends QueryMetricTestBase { + private int documentRangeCount; + private int shardRangeCount; + @Autowired AccumuloMapStore mapStore; @@ -80,6 +85,59 @@ public void PageMetricTest() throws Exception { Assert.assertEquals(i + 1, returnedMetric.getPageTimes().size()); assertEquals(m, returnedMetric); } + assertNoDuplicateFields(queryId); + } + + public void updateRangeCounts(Range range) { + Key key = range.getStartKey(); + String cf = key.getColumnFamily().toString(); + if (cf.length() > 0) { + documentRangeCount++; + } else { + shardRangeCount++; + } + } + + @Test + public void SubPlanTest() throws Exception { + String queryId = createQueryId(); + BaseQueryMetric m = createMetric(queryId); + documentRangeCount = 0; + shardRangeCount = 0; + + Date now = new Date(); + SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd"); + String date = sdf.format(now); + int numShardSubplans = 10; + for (int i = 0; i < numShardSubplans; i++) { + String shard = date + "_" + i; + Key begin = new Key(shard); + Key end = begin.followingKey(PartialKey.ROW); + updateRangeCounts(new Range(begin, end)); + } + int numDocSubplans = 10; + for (int i = 0; i < numDocSubplans; i++) { + String shard = date + "_" + i; + String uid = UID.builder().newId().toString(); + Key begin = new Key(shard, "datatype\0" + uid); + Key end = begin.followingKey(PartialKey.ROW_COLFAM); + updateRangeCounts(new Range(begin, end)); + } + m.addSubPlan("subplan", new int[] {shardRangeCount, documentRangeCount}); + Assert.assertEquals(numDocSubplans, documentRangeCount); + Assert.assertEquals(numShardSubplans, shardRangeCount); + + // @formatter:off + client.submit(new QueryMetricClient.Request.Builder() + .withMetric(m) + .withMetricType(QueryMetricType.COMPLETE) + .withUser(this.adminUser) + .build()); + // @formatter:on + this.ensureDataWritten(incomingQueryMetricsCache, lastWrittenQueryMetricCache, queryId); + BaseQueryMetric storedMetric = shardTableQueryMetricHandler.getQueryMetric(queryId); + Assert.assertEquals(1, storedMetric.getSubPlans().size()); + assertEquals(m, storedMetric); } @Test @@ -123,6 +181,7 @@ public void OutOfOrderLifecycleTest() throws Exception { returnedMetric = (BaseQueryMetric) metricResponse.getBody().getResult().get(0); // metric should have been updated without backtracking on the lifecycle Assert.assertEquals("lifecycle incorrect", BaseQueryMetric.Lifecycle.CLOSED, returnedMetric.getLifecycle()); + assertNoDuplicateFields(queryId); } @Test @@ -178,7 +237,7 @@ public void DistributedUpdateTest() throws Exception { Assert.assertEquals(1, metricResponse.getBody().getNumResults()); BaseQueryMetric returnedMetric = (BaseQueryMetric) metricResponse.getBody().getResult().get(0); - Assert.assertEquals("create date should be the earlier of the two values", formatDate(new Date(now - 1000)), + Assert.assertEquals("create date should be the first received of the two values", formatDate(new Date(now)), formatDate(returnedMetric.getCreateDate())); Assert.assertEquals("last updated should only increase", formatDate(new Date(now)), formatDate(returnedMetric.getLastUpdated())); Assert.assertEquals("source count should be additive", 200, returnedMetric.getSourceCount()); @@ -216,6 +275,7 @@ public void DistributedUpdateTest() throws Exception { Assert.assertEquals("latest yield count should be used", 1000, returnedMetric.getYieldCount()); Assert.assertEquals("latest doc ranges count should be used", 1000, returnedMetric.getDocRanges()); Assert.assertEquals("latest fi ranges should be used", 1000, returnedMetric.getFiRanges()); + assertNoDuplicateFields(queryId); } @Test @@ -223,7 +283,7 @@ public void ToMetricTest() { ContentQueryMetricsIngestHelper.HelperDelegate helper = new ContentQueryMetricsIngestHelper.HelperDelegate<>(); QueryMetric queryMetric = (QueryMetric) createMetric(); - Multimap fieldsToWrite = helper.getEventFieldsToWrite(queryMetric); + Multimap fieldsToWrite = helper.getEventFieldsToWrite(queryMetric, null); EventBase event = new DefaultEvent(); long now = System.currentTimeMillis(); @@ -258,23 +318,21 @@ public void CombineMetricsTest() throws Exception { @Test public void MetricUpdateTest() throws Exception { - QueryMetric storedQueryMetric = (QueryMetric) createMetric(); + String queryId = createQueryId(); + QueryMetric storedQueryMetric = (QueryMetric) createMetric(queryId); QueryMetric updatedQueryMetric = (QueryMetric) storedQueryMetric.duplicate(); updatedQueryMetric.setLifecycle(BaseQueryMetric.Lifecycle.CLOSED); updatedQueryMetric.setNumResults(2000); - updatedQueryMetric.setNumUpdates(200); updatedQueryMetric.setDocRanges(400); updatedQueryMetric.setNextCount(400); updatedQueryMetric.setSeekCount(400); Date now = new Date(); - this.shardTableQueryMetricHandler.writeMetric(storedQueryMetric, Collections.singletonList(storedQueryMetric), now, false); - this.shardTableQueryMetricHandler.writeMetric(updatedQueryMetric, Collections.singletonList(storedQueryMetric), now, true); + this.shardTableQueryMetricHandler.writeMetric(storedQueryMetric, Collections.emptyList(), now.getTime(), false); + this.shardTableQueryMetricHandler.writeMetric(updatedQueryMetric, Collections.singletonList(storedQueryMetric), now.getTime(), true); - Collection> entries = QueryMetricTestBase.getAccumuloEntries(this.accumuloClient, - this.queryMetricHandlerProperties.getShardTableName(), this.auths); + Collection> entries = getEventEntriesFromAccumulo(queryId); Map updatedFields = new HashMap(); - updatedFields.put("NUM_UPDATES", "200"); updatedFields.put("NUM_RESULTS", "2000"); updatedFields.put("LIFECYCLE", "CLOSED"); updatedFields.put("DOC_RANGES", "400"); @@ -290,8 +348,8 @@ public void MetricUpdateTest() throws Exception { } } - shardTableQueryMetricHandler.writeMetric(updatedQueryMetric, Collections.singletonList(storedQueryMetric), now, false); - entries = QueryMetricTestBase.getAccumuloEntries(this.accumuloClient, this.queryMetricHandlerProperties.getShardTableName(), this.auths); + shardTableQueryMetricHandler.writeMetric(updatedQueryMetric, Collections.singletonList(storedQueryMetric), now.getTime(), false); + entries = getEventEntriesFromAccumulo(queryId); Assert.assertFalse("There should be entries in Accumulo", entries.isEmpty()); for (Map.Entry e : entries) { if (e.getKey().getColumnFamily().toString().startsWith("querymetrics")) { @@ -302,6 +360,7 @@ public void MetricUpdateTest() throws Exception { } } } + assertNoDuplicateFields(storedQueryMetric.getQueryId()); } @Test @@ -311,35 +370,25 @@ public void DuplicateAccumuloEntryTest() throws Exception { QueryMetric updatedQueryMetric = (QueryMetric) storedQueryMetric.duplicate(); updatedQueryMetric.setLifecycle(BaseQueryMetric.Lifecycle.CLOSED); updatedQueryMetric.setNumResults(2000); - updatedQueryMetric.setNumUpdates(200); updatedQueryMetric.setDocRanges(400); updatedQueryMetric.setNextCount(400); updatedQueryMetric.setSeekCount(400); - mapStore.store(queryId, new QueryMetricUpdateHolder(storedQueryMetric, QueryMetricType.COMPLETE)); + QueryMetricUpdateHolder holder = new QueryMetricUpdateHolder(storedQueryMetric, QueryMetricType.COMPLETE); + mapStore.store(queryId, holder); QueryMetricUpdateHolder lastWrittenMetricUpdate = this.lastWrittenQueryMetricCache.get(queryId, QueryMetricUpdateHolder.class); assertEquals(storedQueryMetric, lastWrittenMetricUpdate.getMetric()); - mapStore.store(queryId, new QueryMetricUpdateHolder(updatedQueryMetric, QueryMetricType.COMPLETE)); + holder.setMetric(updatedQueryMetric); + mapStore.store(queryId, holder); lastWrittenMetricUpdate = this.lastWrittenQueryMetricCache.get(queryId, QueryMetricUpdateHolder.class); // all fields that were changed should be reflected in the updated metric assertEquals(updatedQueryMetric, lastWrittenMetricUpdate.getMetric()); - Collection> entries = QueryMetricTestBase.getAccumuloEntries(this.accumuloClient, - this.queryMetricHandlerProperties.getShardTableName(), this.auths); - + Collection> entries = getEventEntriesFromAccumulo(queryId); Assert.assertFalse("There should be entries in Accumulo", entries.isEmpty()); - Set foundFields = new HashSet<>(); - for (Map.Entry e : entries) { - if (e.getKey().getColumnFamily().toString().startsWith("querymetrics")) { - String fieldName = fieldSplit(e, 0); - if (foundFields.contains(fieldName)) { - Assert.fail("duplicate field " + fieldName + " found in Accumulo"); - } else { - foundFields.add(fieldName); - } - } - } + + assertNoDuplicateFields(queryId); } private String fieldSplit(Map.Entry entry, int part) { diff --git a/service/src/test/java/datawave/microservice/querymetric/QueryMetricTestBase.java b/service/src/test/java/datawave/microservice/querymetric/QueryMetricTestBase.java index 0f2be100..68519dad 100644 --- a/service/src/test/java/datawave/microservice/querymetric/QueryMetricTestBase.java +++ b/service/src/test/java/datawave/microservice/querymetric/QueryMetricTestBase.java @@ -7,6 +7,7 @@ import com.hazelcast.config.MapStoreConfig; import com.hazelcast.core.IMap; import com.hazelcast.spring.cache.HazelcastCacheManager; +import datawave.ingest.protobuf.Uid; import datawave.marking.MarkingFunctions; import datawave.microservice.authorization.preauth.ProxiedEntityX509Filter; import datawave.microservice.authorization.user.ProxiedUserDetails; @@ -28,8 +29,10 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang.time.DateUtils; +import org.apache.hadoop.io.Text; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -55,9 +58,11 @@ import java.util.Collections; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import static datawave.microservice.querymetric.config.HazelcastMetricCacheConfiguration.INCOMING_METRICS; import static datawave.microservice.querymetric.config.HazelcastMetricCacheConfiguration.LAST_WRITTEN_METRICS; @@ -146,7 +151,7 @@ public void setup() { // this is to ensure that the QueryMetrics_m table // is populated so that queries work properly try { - this.shardTableQueryMetricHandler.writeMetric(m, Collections.singletonList(m), m.getLastUpdated(), false); + this.shardTableQueryMetricHandler.writeMetric(m, Collections.emptyList(), m.getCreateDate().getTime(), false); this.shardTableQueryMetricHandler.flush(); } catch (Exception e) { e.printStackTrace(); @@ -288,6 +293,7 @@ public static void assertEquals(String message, BaseQueryMetric m1, BaseQueryMet Assert.assertEquals(message + "docRanges", m1.getDocRanges(), m2.getDocRanges()); Assert.assertEquals(message + "fiRanges", m1.getFiRanges(), m2.getFiRanges()); Assert.assertTrue(message + "plan", assertObjectsEqual(m1.getPlan(), m2.getPlan())); + Assert.assertTrue(message + "subPlans", assertObjectsEqual(m1.getSubPlans(), m2.getSubPlans())); Assert.assertEquals(message + "loginTime", m1.getLoginTime(), m2.getLoginTime()); Assert.assertTrue(message + "predictions", assertObjectsEqual(m1.getPredictions(), m2.getPredictions())); Assert.assertEquals(message + "versionMap", m1.getVersionMap(), m2.getVersionMap()); @@ -418,4 +424,78 @@ protected void ensureDataWritten(Cache incomingCache, Cache lastWrittenCache, St } } } + + public Collection> getEventEntriesFromAccumulo(String queryId) { + Collection> entries = new ArrayList<>(); + try { + String indexTable = this.queryMetricHandlerProperties.getIndexTableName(); + String shardTable = this.queryMetricHandlerProperties.getShardTableName(); + Collection> indexEntries = getAccumuloEntries(this.accumuloClient, indexTable, this.auths); + Key key = null; + Value value = null; + for (Map.Entry e : indexEntries) { + Key k = e.getKey(); + if (k.getRow().toString().equals(queryId) && k.getColumnFamily().toString().equals("QUERY_ID")) { + key = k; + value = e.getValue(); + break; + } + } + String uid = null; + if (value != null) { + Uid.List l = Uid.List.parseFrom(value.get()); + if (l.getUIDCount() > 0) { + uid = l.getUID(0); + } + if (uid != null) { + String[] split = key.getColumnQualifier().toString().split("\0"); + if (split.length > 0) { + Text eventRow = new Text(split[0]); + Text eventColFam = new Text("querymetrics\0" + uid); + Collection> shardEntries = getAccumuloEntries(this.accumuloClient, shardTable, this.auths); + for (Map.Entry e : shardEntries) { + if (e.getKey().getRow().equals(eventRow) && e.getKey().getColumnFamily().equals(eventColFam)) { + entries.add(e); + } + } + } + } + } + } catch (Exception e) { + log.error(e.getMessage(), e); + } + return entries; + } + + public void printEventEntriesFromAccumulo(String queryId) { + Collection> entries = getEventEntriesFromAccumulo(queryId); + for (Map.Entry e : entries) { + System.out.println(e.getKey().toString()); + } + } + + public void assertNoDuplicateFields(String queryId) { + Collection> entries = getEventEntriesFromAccumulo(queryId); + Set fields = new HashSet<>(); + Set duplicateFields = new HashSet<>(); + for (Map.Entry e : entries) { + Key k = e.getKey(); + String[] split = k.getColumnQualifier().toString().split("\0"); + if (split.length > 0) { + String f = split[0]; + if (!fields.contains(f)) { + fields.add(f); + } else { + duplicateFields.add(f); + } + } + } + + if (!duplicateFields.isEmpty()) { + for (Map.Entry e : entries) { + System.out.println(e.getKey().toString()); + } + Assert.fail("Duplicate field values found for:" + duplicateFields); + } + } } diff --git a/service/src/test/java/datawave/microservice/querymetric/config/AlternateContentQueryMetricsIngestHelper.java b/service/src/test/java/datawave/microservice/querymetric/config/AlternateContentQueryMetricsIngestHelper.java index 62aadc1b..6cdd388e 100644 --- a/service/src/test/java/datawave/microservice/querymetric/config/AlternateContentQueryMetricsIngestHelper.java +++ b/service/src/test/java/datawave/microservice/querymetric/config/AlternateContentQueryMetricsIngestHelper.java @@ -11,9 +11,9 @@ public AlternateContentQueryMetricsIngestHelper(boolean deleteMode) { private static class HelperDelegate extends ContentQueryMetricsIngestHelper.HelperDelegate { @Override - protected void putExtendedFieldsToWrite(AlternateQueryMetric updatedQueryMetric, Multimap fields) { - if (updatedQueryMetric.getExtraField() != null) { - fields.put("EXTRA_FIELD", updatedQueryMetric.getExtraField()); + protected void putExtendedFieldsToWrite(AlternateQueryMetric updated, AlternateQueryMetric stored, Multimap fields) { + if (isFirstWrite(updated.getExtraField(), stored == null ? null : stored.getExtraField())) { + fields.put("EXTRA_FIELD", updated.getExtraField()); } } } diff --git a/service/src/test/java/datawave/microservice/querymetric/config/AlternateQueryMetric.java b/service/src/test/java/datawave/microservice/querymetric/config/AlternateQueryMetric.java index 8c23160a..1afb1674 100644 --- a/service/src/test/java/datawave/microservice/querymetric/config/AlternateQueryMetric.java +++ b/service/src/test/java/datawave/microservice/querymetric/config/AlternateQueryMetric.java @@ -1,5 +1,6 @@ package datawave.microservice.querymetric.config; +import datawave.microservice.querymetric.BaseQueryMetric; import datawave.microservice.querymetric.QueryMetric; import javax.xml.bind.annotation.XmlElement; @@ -13,6 +14,16 @@ public AlternateQueryMetric() { super(); } + public AlternateQueryMetric(AlternateQueryMetric other) { + super(other); + this.extraField = other.extraField; + } + + @Override + public BaseQueryMetric duplicate() { + return new AlternateQueryMetric(this); + } + public void setExtraField(String extraField) { this.extraField = extraField; }