diff --git a/api/src/main/java/datawave/microservice/querymetric/BaseQueryMetric.java b/api/src/main/java/datawave/microservice/querymetric/BaseQueryMetric.java index 9a07ae72..976d1776 100644 --- a/api/src/main/java/datawave/microservice/querymetric/BaseQueryMetric.java +++ b/api/src/main/java/datawave/microservice/querymetric/BaseQueryMetric.java @@ -617,7 +617,7 @@ public int getFieldNumber(String name) { @XmlElement protected String queryId = null; @XmlElement - protected long setupTime = 0; + protected long setupTime = -1; @XmlElement protected String query = null; @XmlElement diff --git a/api/src/main/java/datawave/microservice/querymetric/BaseQueryMetricListResponse.java b/api/src/main/java/datawave/microservice/querymetric/BaseQueryMetricListResponse.java index 3fa12288..809118f5 100644 --- a/api/src/main/java/datawave/microservice/querymetric/BaseQueryMetricListResponse.java +++ b/api/src/main/java/datawave/microservice/querymetric/BaseQueryMetricListResponse.java @@ -3,7 +3,9 @@ import java.text.SimpleDateFormat; import java.util.Collections; import java.util.Date; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.TreeMap; import javax.xml.bind.annotation.XmlElement; @@ -13,6 +15,8 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.text.StringEscapeUtils; +import com.fasterxml.jackson.annotation.JsonIgnore; + import datawave.microservice.querymetric.BaseQueryMetric.PageMetric; import datawave.webservice.HtmlProvider; import datawave.webservice.result.BaseResponse; @@ -27,10 +31,27 @@ public abstract class BaseQueryMetricListResponse ext protected List result = null; @XmlElement protected int numResults = 0; + @XmlElement + protected boolean isGeoQuery = false; @XmlTransient private boolean administratorMode = false; - @XmlTransient - private boolean isGeoQuery = false; + private String JQUERY_INCLUDES; + protected String BASE_URL = "/DataWave/Query/Metrics"; + + public BaseQueryMetricListResponse() { + setHtmlIncludePaths(new HashMap<>()); + } + + public void setHtmlIncludePaths(Map pathMap) { + // @formatter:off + JQUERY_INCLUDES = + "\n"; + // @formatter:on + } + + public void setBaseUrl(String baseUrl) { + this.BASE_URL = baseUrl; + } private static String numToString(long number) { return (number == -1 || number == 0) ? "" : Long.toString(number); @@ -69,21 +90,27 @@ public void setGeoQuery(boolean geoQuery) { isGeoQuery = geoQuery; } + @JsonIgnore + @XmlTransient @Override public String getTitle() { return TITLE; } + @JsonIgnore + @XmlTransient @Override public String getPageHeader() { return getTitle(); } + @JsonIgnore + @XmlTransient @Override public String getHeadContent() { if (isGeoQuery) { // @formatter:off - return "" + + return JQUERY_INCLUDES + "\n"; + JQUERY_INCLUDES = + "\n"; + MAP_INCLUDES = + "\n" + + ""; + // @formatter:on + } + + @XmlElement(name = "queryId", nillable = true) + protected String queryId = null; + + @JsonIgnore + @XmlTransient + protected String basemaps = null; + + @XmlElementWrapper(name = "features") + @XmlElement(name = "feature") + protected List result = null; + + @JsonIgnore + @XmlTransient + @Override + public String getTitle() { + if (queryId != null) + return TITLE + " - " + queryId; + return TITLE; + } + + @JsonIgnore + @XmlTransient + @Override + public String getHeadContent() { + String basemapData = "\n"; + String featureData = "\n"; + return String.join("\n", featureData, JQUERY_INCLUDES, LEAFLET_INCLUDES, basemapData, MAP_INCLUDES); + } + + @JsonIgnore + @XmlTransient + @Override + public String getPageHeader() { + return getTitle(); + } + + @JsonIgnore + @XmlTransient + @Override + public String getMainContent() { + return "
"; + } + + private String toGeoJsonFeatures() { + if (!this.result.isEmpty()) + return "[ " + this.result.stream().map(QueryGeometry::toGeoJsonFeature).collect(Collectors.joining(", ")) + " ]"; + else + return "undefined"; + } + + public String getQueryId() { + return queryId; + } + + public void setQueryId(String queryId) { + this.queryId = queryId; + } + + public List getResult() { + return result; + } + + public void setResult(List result) { + this.result = result; + } + + public String getBasemaps() { + return basemaps; + } + + public void setBasemaps(String basemaps) { + this.basemaps = basemaps; + } +} diff --git a/api/src/main/java/datawave/microservice/querymetric/QueryMetricsDetailListResponse.java b/api/src/main/java/datawave/microservice/querymetric/QueryMetricsDetailListResponse.java index 777b1109..193e2fef 100644 --- a/api/src/main/java/datawave/microservice/querymetric/QueryMetricsDetailListResponse.java +++ b/api/src/main/java/datawave/microservice/querymetric/QueryMetricsDetailListResponse.java @@ -14,10 +14,13 @@ import javax.xml.bind.annotation.XmlAccessorOrder; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlRootElement; +import javax.xml.bind.annotation.XmlTransient; import org.apache.commons.lang3.StringUtils; import org.apache.commons.text.StringEscapeUtils; +import com.fasterxml.jackson.annotation.JsonIgnore; + import datawave.microservice.query.QueryImpl.Parameter; import datawave.microservice.querymetric.BaseQueryMetric.PageMetric; import datawave.microservice.querymetric.BaseQueryMetric.Prediction; @@ -36,6 +39,8 @@ public String getHeadContent() { // metric page } + @JsonIgnore + @XmlTransient @Override public String getMainContent() { StringBuilder builder = new StringBuilder(), pageTimesBuilder = new StringBuilder(); @@ -88,7 +93,13 @@ public String getMainContent() { builder.append("").append(userDN == null ? "" : userDN).append(""); String proxyServers = metric.getProxyServers() == null ? "" : StringUtils.join(metric.getProxyServers(), "
"); builder.append("").append(proxyServers).append(""); - builder.append("").append(metric.getQueryId()).append(""); + if (this.isAdministratorMode()) { + builder.append("").append(metric.getQueryId()).append(""); + } else { + builder.append("").append(metric.getQueryId()) + .append(""); + } builder.append("").append(metric.getQueryType()).append(""); builder.append("").append(metric.getQueryLogic()).append(""); // Note the query and query plan are added to the table later (see the javascript at the end of this for loop) @@ -120,7 +131,7 @@ public String getMainContent() { builder.append(""); } builder.append("").append(numToString(metric.getLoginTime(), 0)).append(""); - builder.append("").append(metric.getSetupTime()).append(""); + builder.append("").append(numToString(metric.getSetupTime(), 0)).append(""); builder.append("").append(numToString(metric.getCreateCallTime(), 0)).append("\n"); builder.append("").append(metric.getNumPages()).append(""); builder.append("").append(metric.getNumResults()).append(""); diff --git a/pom.xml b/pom.xml index ee5f4d22..eb429e0e 100644 --- a/pom.xml +++ b/pom.xml @@ -17,6 +17,7 @@ scm:git:https://github.com/NationalSecurityAgency/datawave-query-metric-service.git scm:git:git@github.com:NationalSecurityAgency/datawave-query-metric-service.git + HEAD https://github.com/NationalSecurityAgency/datawave-query-metric-service diff --git a/service/src/main/java/datawave/microservice/querymetric/Correlator.java b/service/src/main/java/datawave/microservice/querymetric/Correlator.java new file mode 100644 index 00000000..d473c647 --- /dev/null +++ b/service/src/main/java/datawave/microservice/querymetric/Correlator.java @@ -0,0 +1,84 @@ +package datawave.microservice.querymetric; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import datawave.microservice.querymetric.config.CorrelatorProperties; + +@Component +public class Correlator { + + private CorrelatorProperties correlatorProperties; + private boolean isShuttingDown = false; + private Map> updates = new HashMap<>(); + private LinkedHashMap created = new LinkedHashMap<>(); + + @Autowired + public Correlator(CorrelatorProperties correlatorProperties) { + this.correlatorProperties = correlatorProperties; + } + + public void addMetricUpdate(QueryMetricUpdate update) { + String queryId = update.getMetric().getQueryId(); + synchronized (this.updates) { + List updatesForQuery = this.updates.get(queryId); + if (updatesForQuery == null) { + updatesForQuery = new ArrayList<>(); + this.created.put(queryId, System.currentTimeMillis()); + this.updates.put(queryId, updatesForQuery); + } + updatesForQuery.add(update); + } + } + + public List getMetricUpdates(Set inProcess) { + long now = System.currentTimeMillis(); + long maxQueueSize = this.correlatorProperties.getMaxCorrelationQueueSize(); + long maxCorrelationTimeMs = this.correlatorProperties.getMaxCorrelationTimeMs(); + String oldestQueryId; + List returnedUpdates = null; + synchronized (this.updates) { + long numEntries = this.created.size(); + // Find the oldest entry that is not inProcessing on this instance + // If shuttingDown, then just return the oldest entry + Map.Entry oldestAvailableEntry = this.created.entrySet().stream().filter(e -> this.isShuttingDown || !inProcess.contains(e.getKey())) + .findFirst().orElse(null); + + // If we have reached the max queue size, then don't filter by !inProcess + if (oldestAvailableEntry == null && numEntries >= maxQueueSize) { + oldestAvailableEntry = this.created.entrySet().stream().findFirst().orElse(null); + } + + if (oldestAvailableEntry != null) { + long maxAge = now - oldestAvailableEntry.getValue(); + oldestQueryId = oldestAvailableEntry.getKey(); + if (numEntries >= maxQueueSize || maxAge > maxCorrelationTimeMs || this.isShuttingDown) { + returnedUpdates = this.updates.remove(oldestQueryId); + this.created.remove(oldestQueryId); + } + } + } + return returnedUpdates; + } + + public boolean isEnabled() { + return this.correlatorProperties.isEnabled(); + } + + public void shutdown(boolean isShuttingDown) { + this.isShuttingDown = isShuttingDown; + } + + public boolean isShuttingDown() { + return isShuttingDown; + } +} diff --git a/service/src/main/java/datawave/microservice/querymetric/MergeLockLifecycleListener.java b/service/src/main/java/datawave/microservice/querymetric/MergeLockLifecycleListener.java index d052e1e9..04ee8182 100644 --- a/service/src/main/java/datawave/microservice/querymetric/MergeLockLifecycleListener.java +++ b/service/src/main/java/datawave/microservice/querymetric/MergeLockLifecycleListener.java @@ -91,12 +91,12 @@ public void stateChanged(LifecycleEvent event) { switch (event.getState()) { case MERGING: // lock for a maximum time so that we don't lock forever - this.writeLockRunnable.lock(120000, event.getState()); + this.writeLockRunnable.lock(event.getState(), 5, TimeUnit.MINUTES); log.info(event + " [" + getLocalMemberUuid() + "]"); break; case SHUTTING_DOWN: // lock for a maximum time so that we don't lock forever - this.writeLockRunnable.lock(60000, event.getState()); + this.writeLockRunnable.lock(event.getState(), 60, TimeUnit.SECONDS); makeServiceUnavailable(); log.info(event + " [" + getLocalMemberUuid() + "]"); break; @@ -178,6 +178,11 @@ public void run() { } else { try { Thread.sleep(100); + // If a write lock timed out and the clusterLock is no longer locked when requestUnlock + // happens, we should reset requestUnlock to false and allow the requesting thread to continue + if (this.requestUnlock.get() == true && !this.clusterLock.isWriteLocked()) { + this.requestUnlock.set(false); + } } catch (InterruptedException e) { log.error(e.getMessage(), e); } @@ -185,11 +190,19 @@ public void run() { } } - public void lock(long maxLockMilliseconds, LifecycleEvent.LifecycleState state) { + public void lock(LifecycleEvent.LifecycleState state) { + lock(state, -1, TimeUnit.MILLISECONDS); + } + + public void lock(LifecycleEvent.LifecycleState state, long maxDuration, TimeUnit timeUnit) { log.info("locking for write [" + state + "]"); // prompt run() method to lock the writeLock if (this.requestLock.compareAndSet(false, true)) { - this.scheduledUnlockTime = System.currentTimeMillis() + maxLockMilliseconds; + if (maxDuration >= 0) { + this.scheduledUnlockTime = System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(maxDuration, timeUnit); + } else { + this.scheduledUnlockTime = Long.MAX_VALUE; + } // wait until run() method locks the writeLock and sets requestLock to false while (requestLock.get() == true && !isShuttingDown()) { try { diff --git a/service/src/main/java/datawave/microservice/querymetric/MetricUpdateEntryProcessor.java b/service/src/main/java/datawave/microservice/querymetric/MetricUpdateEntryProcessor.java index 84843cbf..08c4cc7b 100644 --- a/service/src/main/java/datawave/microservice/querymetric/MetricUpdateEntryProcessor.java +++ b/service/src/main/java/datawave/microservice/querymetric/MetricUpdateEntryProcessor.java @@ -18,33 +18,34 @@ public MetricUpdateEntryProcessor(QueryMetricUpdateHolder metricUpdate, QueryMet @Override public Long process(Map.Entry entry) { - QueryMetricUpdateHolder updatedHolder; + QueryMetricUpdateHolder storedHolder; QueryMetricType metricType = this.metricUpdate.getMetricType(); BaseQueryMetric updatedMetric = this.metricUpdate.getMetric(); long start = System.currentTimeMillis(); if (entry.getValue() == null) { - updatedHolder = this.metricUpdate; + storedHolder = this.metricUpdate; } else { - updatedHolder = entry.getValue(); - BaseQueryMetric storedMetric = entry.getValue().getMetric(); + storedHolder = entry.getValue(); + BaseQueryMetric storedMetric = storedHolder.getMetric(); BaseQueryMetric combinedMetric; combinedMetric = this.combiner.combineMetrics(updatedMetric, storedMetric, metricType); - updatedHolder.setMetric(combinedMetric); - updatedHolder.setMetricType(metricType); + storedHolder.setMetric(combinedMetric); + storedHolder.setMetricType(metricType); + storedHolder.updateLowestLifecycle(this.metricUpdate.getLowestLifecycle()); } if (metricType.equals(QueryMetricType.DISTRIBUTED) && updatedMetric != null) { // these values are added incrementally in a distributed update. Because we can not be sure // exactly when the incomingQueryMetricCache value is stored, it would otherwise be possible // for updates to be included twice. These values are reset after being used in the AccumuloMapStore - updatedHolder.addValue("sourceCount", updatedMetric.getSourceCount()); - updatedHolder.addValue("nextCount", updatedMetric.getNextCount()); - updatedHolder.addValue("seekCount", updatedMetric.getSeekCount()); - updatedHolder.addValue("yieldCount", updatedMetric.getYieldCount()); - updatedHolder.addValue("docRanges", updatedMetric.getDocRanges()); - updatedHolder.addValue("fiRanges", updatedMetric.getFiRanges()); + storedHolder.addValue("sourceCount", updatedMetric.getSourceCount()); + storedHolder.addValue("nextCount", updatedMetric.getNextCount()); + storedHolder.addValue("seekCount", updatedMetric.getSeekCount()); + storedHolder.addValue("yieldCount", updatedMetric.getYieldCount()); + storedHolder.addValue("docRanges", updatedMetric.getDocRanges()); + storedHolder.addValue("fiRanges", updatedMetric.getFiRanges()); } - entry.setValue(updatedHolder); + entry.setValue(storedHolder); return Long.valueOf(System.currentTimeMillis() - start); } } diff --git a/service/src/main/java/datawave/microservice/querymetric/QueryMetricOperations.java b/service/src/main/java/datawave/microservice/querymetric/QueryMetricOperations.java index f4e71737..b8e456bf 100644 --- a/service/src/main/java/datawave/microservice/querymetric/QueryMetricOperations.java +++ b/service/src/main/java/datawave/microservice/querymetric/QueryMetricOperations.java @@ -2,19 +2,28 @@ import static datawave.microservice.querymetric.QueryMetricOperations.DEFAULT_DATETIME.BEGIN; import static datawave.microservice.querymetric.QueryMetricOperations.DEFAULT_DATETIME.END; -import static datawave.microservice.querymetric.QueryMetricOperationsStats.METERS; import static datawave.microservice.querymetric.QueryMetricOperationsStats.TIMERS; import static datawave.microservice.querymetric.config.HazelcastMetricCacheConfiguration.INCOMING_METRICS; -import static datawave.microservice.querymetric.config.HazelcastMetricCacheConfiguration.LAST_WRITTEN_METRICS; import java.net.InetAddress; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Calendar; +import java.util.Collections; import java.util.Date; +import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.TimeZone; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import javax.annotation.PreDestroy; import javax.annotation.security.PermitAll; @@ -27,10 +36,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.cache.Cache; import org.springframework.cache.CacheManager; import org.springframework.http.MediaType; -import org.springframework.messaging.support.MessageBuilder; +import org.springframework.integration.IntegrationMessageHeaderAccessor; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.Message; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.security.access.annotation.Secured; @@ -43,13 +56,15 @@ import org.springframework.web.bind.annotation.RestController; import com.codahale.metrics.Timer; +import com.google.common.collect.Lists; import com.hazelcast.core.HazelcastInstanceNotActiveException; -import com.hazelcast.map.EntryProcessor; import com.hazelcast.map.IMap; import com.hazelcast.spring.cache.HazelcastCacheManager; import datawave.marking.MarkingFunctions; import datawave.microservice.authorization.user.DatawaveUserDetails; +import datawave.microservice.querymetric.config.QueryMetricProperties; +import datawave.microservice.querymetric.config.QueryMetricProperties.Retry; import datawave.microservice.querymetric.factory.BaseQueryMetricListResponseFactory; import datawave.microservice.querymetric.function.QueryMetricSupplier; import datawave.microservice.querymetric.handler.QueryGeometryHandler; @@ -60,7 +75,6 @@ import datawave.security.authorization.DatawaveUser; import datawave.webservice.query.exception.DatawaveErrorCode; import datawave.webservice.query.exception.QueryException; -import datawave.webservice.query.map.QueryGeometryResponse; import datawave.webservice.result.VoidResponse; import io.swagger.v3.oas.annotations.ExternalDocumentation; import io.swagger.v3.oas.annotations.Operation; @@ -77,23 +91,31 @@ @RestController @RequestMapping(path = "/v1") public class QueryMetricOperations { + // Note: This must match 'confirmAckChannel' in the service configuration. Default set in bootstrap.yml. + public static final String CONFIRM_ACK_CHANNEL = "confirmAckChannel"; private Logger log = LoggerFactory.getLogger(QueryMetricOperations.class); + private QueryMetricProperties queryMetricProperties; private ShardTableQueryMetricHandler handler; private QueryGeometryHandler geometryHandler; private CacheManager cacheManager; private Cache incomingQueryMetricsCache; - private Cache lastWrittenQueryMetricCache; private MarkingFunctions markingFunctions; private BaseQueryMetricListResponseFactory queryMetricListResponseFactory; private MergeLockLifecycleListener mergeLock; + private Correlator correlator; + private AtomicBoolean timedCorrelationInProgress = new AtomicBoolean(false); private MetricUpdateEntryProcessorFactory entryProcessorFactory; private QueryMetricOperationsStats stats; + private static Set inProcess = Collections.synchronizedSet(new HashSet<>()); + private final LinkedHashMap pathPrefixMap = new LinkedHashMap<>(); private final QueryMetricSupplier queryMetricSupplier; private final DnUtils dnUtils; + private static final Map correlationLatchMap = new ConcurrentHashMap<>(); + /** * The enum Default datetime. */ @@ -111,6 +133,8 @@ enum DEFAULT_DATETIME { /** * Instantiates a new QueryMetricOperations. * + * @param queryMetricProperties + * the query metric properties * @param cacheManager * the CacheManager * @param handler @@ -133,30 +157,87 @@ enum DEFAULT_DATETIME { * the stats */ @Autowired - public QueryMetricOperations(@Named("queryMetricCacheManager") CacheManager cacheManager, ShardTableQueryMetricHandler handler, - QueryGeometryHandler geometryHandler, MarkingFunctions markingFunctions, BaseQueryMetricListResponseFactory queryMetricListResponseFactory, - MergeLockLifecycleListener mergeLock, MetricUpdateEntryProcessorFactory entryProcessorFactory, QueryMetricOperationsStats stats, - QueryMetricSupplier queryMetricSupplier, DnUtils dnUtils) { + public QueryMetricOperations(QueryMetricProperties queryMetricProperties, @Named("queryMetricCacheManager") CacheManager cacheManager, + ShardTableQueryMetricHandler handler, QueryGeometryHandler geometryHandler, MarkingFunctions markingFunctions, + BaseQueryMetricListResponseFactory queryMetricListResponseFactory, MergeLockLifecycleListener mergeLock, Correlator correlator, + MetricUpdateEntryProcessorFactory entryProcessorFactory, QueryMetricOperationsStats stats, QueryMetricSupplier queryMetricSupplier, + DnUtils dnUtils) { + this.queryMetricProperties = queryMetricProperties; this.handler = handler; this.geometryHandler = geometryHandler; this.cacheManager = cacheManager; this.incomingQueryMetricsCache = cacheManager.getCache(INCOMING_METRICS); - this.lastWrittenQueryMetricCache = cacheManager.getCache(LAST_WRITTEN_METRICS); this.markingFunctions = markingFunctions; this.queryMetricListResponseFactory = queryMetricListResponseFactory; this.mergeLock = mergeLock; + this.correlator = correlator; this.entryProcessorFactory = entryProcessorFactory; this.stats = stats; this.queryMetricSupplier = queryMetricSupplier; this.dnUtils = dnUtils; + this.pathPrefixMap.put("jquery", "/querymetric/webjars/jquery"); + this.pathPrefixMap.put("leaflet", "/querymetric/webjars/leaflet"); + this.pathPrefixMap.put("css", "/querymetric/css"); + this.pathPrefixMap.put("js", "/querymetric/js"); } @PreDestroy public void shutdown() { + if (this.correlator.isEnabled()) { + this.correlator.shutdown(true); + // we've locked out the timer thread, but need to + // wait for it to complete the last write + while (isTimedCorrelationInProgress()) { + try { + Thread.sleep(200); + } catch (InterruptedException e) { + + } + } + ensureUpdatesProcessed(false); + } this.stats.queueAggregatedQueryStatsForTimely(); this.stats.writeQueryStatsToTimely(); } + public boolean isTimedCorrelationInProgress() { + return this.timedCorrelationInProgress.get(); + } + + @Scheduled(fixedDelay = 2000) + public void ensureUpdatesProcessedScheduled() { + // don't write metrics from this thread while shutting down, + // so we can make sure that the process completes + if (!this.correlator.isShuttingDown()) { + this.timedCorrelationInProgress.set(true); + try { + ensureUpdatesProcessed(true); + } finally { + this.timedCorrelationInProgress.set(false); + } + } + } + + public void ensureUpdatesProcessed(boolean scheduled) { + if (this.correlator.isEnabled()) { + List correlatedUpdates; + do { + correlatedUpdates = this.correlator.getMetricUpdates(QueryMetricOperations.inProcess); + if (correlatedUpdates != null && !correlatedUpdates.isEmpty()) { + try { + String queryId = correlatedUpdates.get(0).getMetric().getQueryId(); + QueryMetricType metricType = correlatedUpdates.get(0).getMetricType(); + QueryMetricUpdateHolder metricUpdate = combineMetricUpdates(correlatedUpdates, metricType); + log.debug("storing correlated updates for {}", queryId); + storeMetricUpdate(metricUpdate); + } catch (Exception e) { + log.error("exception while combining correlated updates: " + e.getMessage(), e); + } + } + } while (!(scheduled && this.correlator.isShuttingDown()) && correlatedUpdates != null && !correlatedUpdates.isEmpty()); + } + } + /** * Update metrics void response. * @@ -177,17 +258,22 @@ public VoidResponse updateMetrics(@RequestBody List queryMetric if (!this.mergeLock.isAllowedReadLock()) { throw new IllegalStateException("service unavailable"); } - stats.getMeter(METERS.REST).mark(queryMetrics.size()); - VoidResponse response = new VoidResponse(); for (BaseQueryMetric m : queryMetrics) { if (log.isTraceEnabled()) { - log.trace("received metric update via REST: " + m.toString()); + log.trace("received bulk metric update via REST: " + m.toString()); } else { - log.debug("received metric update via REST: " + m.getQueryId()); + log.debug("received bulk metric update via REST: " + m.getQueryId()); } - queryMetricSupplier.send(MessageBuilder.withPayload(new QueryMetricUpdate(m, metricType)).build()); } - return response; + Timer.Context restTimer = this.stats.getTimer(TIMERS.REST).time(); + try { + if (!updateMetrics(queryMetrics.stream().map(m -> new QueryMetricUpdate<>(m, metricType)).collect(Collectors.toList()))) { + throw new RuntimeException("Unable to process bulk query metric update"); + } + } finally { + restTimer.stop(); + } + return new VoidResponse(); } /** @@ -210,47 +296,237 @@ public VoidResponse updateMetric(@RequestBody BaseQueryMetric queryMetric, if (!this.mergeLock.isAllowedReadLock()) { throw new IllegalStateException("service unavailable"); } - stats.getMeter(METERS.REST).mark(); - if (log.isTraceEnabled()) { - log.trace("received metric update via REST: " + queryMetric.toString()); - } else { - log.debug("received metric update via REST: " + queryMetric.getQueryId()); + Timer.Context restTimer = this.stats.getTimer(TIMERS.REST).time(); + try { + if (log.isTraceEnabled()) { + log.trace("received metric update via REST: " + queryMetric.toString()); + } else { + log.debug("received metric update via REST: " + queryMetric.getQueryId()); + } + if (!updateMetrics(Lists.newArrayList(new QueryMetricUpdate(queryMetric, metricType)))) { + throw new RuntimeException("Unable to process query metric update for query [" + queryMetric.getQueryId() + "]"); + } + } finally { + restTimer.stop(); } - queryMetricSupplier.send(MessageBuilder.withPayload(new QueryMetricUpdate(queryMetric, metricType)).build()); return new VoidResponse(); } /** - * Handle event. + * Receives producer confirm acks, and disengages the latch associated with the given correlation ID. * - * @param update - * the query metric update + * @param message + * the confirmation ack message + */ + @ConditionalOnProperty(value = "datawave.query.metric.confirmAckEnabled", havingValue = "true", matchIfMissing = true) + @ServiceActivator(inputChannel = CONFIRM_ACK_CHANNEL) + public void processConfirmAck(Message message) { + Object headerObj = message.getHeaders().get(IntegrationMessageHeaderAccessor.CORRELATION_ID); + + if (headerObj != null) { + String correlationId = headerObj.toString(); + if (correlationLatchMap.containsKey(correlationId)) { + correlationLatchMap.get(correlationId).countDown(); + } else { + log.warn("Unable to decrement latch for ID [{}]", correlationId); + } + } else { + log.warn("No correlation ID found in confirm ack message"); + } + } + + private boolean updateMetrics(List updates) { + Map updatesById = new LinkedHashMap<>(); + Map timersById = new LinkedHashMap<>(); + + boolean success; + final long updateStartTime = System.currentTimeMillis(); + long currentTime; + int attempts = 0; + + Retry retry = queryMetricProperties.getRetry(); + + List failedConfirmAck = new ArrayList<>(updates.size()); + do { + if (attempts++ > 0) { + try { + Thread.sleep(retry.getBackoffIntervalMillis()); + } catch (InterruptedException e) { + // Ignore -- we'll just end up retrying a little too fast + } + + // perform some retry upkeep + updates.addAll(failedConfirmAck); + failedConfirmAck.clear(); + } + + if (log.isDebugEnabled()) { + log.debug("Bulk update attempt {} of {}", attempts, retry.getMaxAttempts()); + } + + // send all of the remaining metric updates + success = sendMessages(updates, updatesById, timersById) && awaitConfirmAcks(updatesById, failedConfirmAck, timersById); + currentTime = System.currentTimeMillis(); + } while (!success && (currentTime - updateStartTime) < retry.getFailTimeoutMillis() && attempts < retry.getMaxAttempts()); + + // stop any timers that remain + timersById.values().stream().forEach(timer -> timer.stop()); + + if (!success) { + log.warn("Bulk update failed. {attempts = {}, elapsedMillis = {}}", attempts, (currentTime - updateStartTime)); + } else { + log.debug("Bulk update successful. {attempts = {}, elapsedMillis = {}}", attempts, (currentTime - updateStartTime)); + } + + return success; + } + + /** + * Passes query metric messages to the messaging infrastructure. + * + * @param updates + * The query metric updates to be sent, not null + * @param updatesById + * A map that will be populated with the correlation ids and associated metric updates, not null + * @param timersById + * A map of dropwizard timers to record the time to send or send/ack each message + * @return true if all messages were successfully sent, false otherwise */ - public void storeMetric(QueryMetricUpdate update) { - stats.getMeter(METERS.MESSAGE).mark(); - String queryId = update.getMetric().getQueryId(); - this.stats.queueTimelyMetrics(update); - log.debug("storing update for {}", queryId); - if (update.getMetric().getPositiveSelectors() == null) { - this.handler.populateMetricSelectors(update.getMetric()); + private boolean sendMessages(List updates, Map updatesById, Map timersById) { + + List failedSend = new ArrayList<>(updates.size()); + + boolean success = true; + // send all of the remaining metric updates + for (QueryMetricUpdate update : updates) { + Timer.Context timer = this.stats.getTimer(TIMERS.MESSAGE_SEND).time(); + String correlationId = UUID.randomUUID().toString(); + boolean sendSuccessful = false; + try { + sendSuccessful = sendMessage(correlationId, update); + if (sendSuccessful) { + if (queryMetricProperties.isConfirmAckEnabled()) { + updatesById.put(correlationId, update); + } + } else { + // if it failed, add it to the failed list + failedSend.add(update); + success = false; + } + } finally { + if (sendSuccessful && queryMetricProperties.isConfirmAckEnabled()) { + // message send successful but waiting for confirmAck, so store the timer by correlationId + timersById.put(correlationId, timer); + } else { + // either message send successful and not waiting for confirmAck or message send failed + timer.stop(); + } + } } - storeMetricUpdate(new QueryMetricUpdateHolder(update)); + + updates.retainAll(failedSend); + + return success; + } + + private boolean sendMessage(String correlationId, QueryMetricUpdate update) { + boolean success = false; + if (queryMetricSupplier.send(MessageBuilder.withPayload(update).setCorrelationId(correlationId).build())) { + success = true; + if (queryMetricProperties.isConfirmAckEnabled()) { + correlationLatchMap.put(correlationId, new CountDownLatch(1)); + } + } + return success; + } + + /** + * Waits for the producer confirm acks to be received for the updates that were sent. If a producer confirm ack is not received within the specified amount + * of time, a 500 Internal Server Error will be returned to the caller. + * + * @param updatesById + * A map of query metric updates keyed by their correlation id, not null + * @param failedConfirmAck + * A list that will be populated with the failed metric updates, not null + * @param timersById + * A map of dropwizard timers to record the time to send or send/ack each message + * @return true if all confirm acks were successfully received, false otherwise + */ + private boolean awaitConfirmAcks(Map updatesById, List failedConfirmAck, + Map timersById) { + boolean success = true; + // wait for the confirm acks only after all sends are successful + if (queryMetricProperties.isConfirmAckEnabled()) { + for (String correlationId : new HashSet<>(updatesById.keySet())) { + try { + if (!awaitConfirmAck(correlationId)) { + failedConfirmAck.add(updatesById.remove(correlationId)); + success = false; + } + } finally { + // either ack confirmed or we need to resend, so stop the timer for this update + Timer.Context timer = timersById.get(correlationId); + if (timer != null) { + timer.stop(); + } + } + } + } + return success; + } + + private boolean awaitConfirmAck(String correlationId) { + boolean success = false; + if (queryMetricProperties.isConfirmAckEnabled()) { + try { + success = correlationLatchMap.get(correlationId).await(queryMetricProperties.getConfirmAckTimeoutMillis(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + log.warn("Interrupted waiting for confirm ack {}", correlationId); + } finally { + correlationLatchMap.remove(correlationId); + } + } + return success; } private String getClusterLocalMemberUuid() { return ((HazelcastCacheManager) this.cacheManager).getHazelcastInstance().getCluster().getLocalMember().getUuid().toString(); } - private void storeMetricUpdate(QueryMetricUpdateHolder metricUpdate) { + public QueryMetricUpdateHolder combineMetricUpdates(List updates, QueryMetricType metricType) throws Exception { + BaseQueryMetric combinedMetric = null; + BaseQueryMetric.Lifecycle lowestLifecycle = null; + for (QueryMetricUpdate u : updates) { + if (combinedMetric == null) { + combinedMetric = u.getMetric(); + lowestLifecycle = u.getMetric().getLifecycle(); + } else { + if (u.getMetric().getLifecycle().ordinal() < lowestLifecycle.ordinal()) { + lowestLifecycle = u.getMetric().getLifecycle(); + } + combinedMetric = this.handler.combineMetrics(u.getMetric(), combinedMetric, metricType); + } + } + QueryMetricUpdateHolder metricUpdateHolder = new QueryMetricUpdateHolder(combinedMetric, metricType); + metricUpdateHolder.updateLowestLifecycle(lowestLifecycle); + return metricUpdateHolder; + } + + public void storeMetricUpdate(QueryMetricUpdateHolder metricUpdate) { Timer.Context storeTimer = this.stats.getTimer(TIMERS.STORE).time(); String queryId = metricUpdate.getMetric().getQueryId(); try { IMap incomingQueryMetricsCacheHz = ((IMap) incomingQueryMetricsCache .getNativeCache()); + if (metricUpdate.getMetric().getPositiveSelectors() == null) { + this.handler.populateMetricSelectors(metricUpdate.getMetric()); + } this.mergeLock.lock(); + QueryMetricOperations.inProcess.add(queryId); try { incomingQueryMetricsCacheHz.executeOnKey(queryId, this.entryProcessorFactory.createEntryProcessor(metricUpdate)); } finally { + QueryMetricOperations.inProcess.remove(queryId); this.mergeLock.unlock(); } } catch (Exception e) { @@ -263,8 +539,13 @@ private void storeMetricUpdate(QueryMetricUpdateHolder metricUpdate) { } // fail the handling of the message throw new RuntimeException(e.getMessage()); + } finally { + storeTimer.stop(); } - storeTimer.stop(); + } + + public static Set getInProcess() { + return inProcess; } /** @@ -286,14 +567,16 @@ public BaseQueryMetricListResponse query(@AuthenticationPrincipal DatawaveUserDe @Parameter(description = "queryId to return") @PathVariable("queryId") String queryId) { BaseQueryMetricListResponse response = this.queryMetricListResponseFactory.createDetailedResponse(); + response.setHtmlIncludePaths(this.pathPrefixMap); + response.setBaseUrl("/querymetric/v1"); List metricList = new ArrayList<>(); try { BaseQueryMetric metric; QueryMetricUpdateHolder metricUpdate = incomingQueryMetricsCache.get(queryId, QueryMetricUpdateHolder.class); - if (metricUpdate != null && metricUpdate.isNewMetric()) { - metric = metricUpdate.getMetric(); - } else { + if (metricUpdate == null) { metric = this.handler.getQueryMetric(queryId); + } else { + metric = metricUpdate.getMetric(); } if (metric != null) { boolean allowAllMetrics = false; @@ -326,11 +609,27 @@ public BaseQueryMetricListResponse query(@AuthenticationPrincipal DatawaveUserDe response.addException(new QueryException(e.getMessage(), 500)); } // Set the result to have the formatted query and query plan - response.setResult(JexlFormattedStringBuildingVisitor.formatMetrics(metricList)); - if (metricList.isEmpty()) { + // StackOverflowErrors seen in JexlFormattedStringBuildingVisitor.formatMetrics, so protect + // this call for each metric with try/catch and add original metric if formatMetrics fails + List fmtMetricList = new ArrayList<>(); + for (BaseQueryMetric m : metricList) { + List formatted = null; + try { + formatted = JexlFormattedStringBuildingVisitor.formatMetrics(Collections.singletonList(m)); + } catch (StackOverflowError | Exception e) { + log.warn(String.format("%s while formatting metric %s: %s", e.getClass().getCanonicalName(), m.getQueryId(), e.getMessage())); + } + if (formatted == null || formatted.isEmpty()) { + fmtMetricList.add(m); + } else { + fmtMetricList.addAll(formatted); + } + } + response.setResult(fmtMetricList); + if (fmtMetricList.isEmpty()) { response.setHasResults(false); } else { - response.setGeoQuery(metricList.stream().anyMatch(SimpleQueryGeometryHandler::isGeoQuery)); + response.setGeoQuery(fmtMetricList.stream().anyMatch(SimpleQueryGeometryHandler::isGeoQuery)); response.setHasResults(true); } return response; @@ -354,11 +653,13 @@ public BaseQueryMetricListResponse query(@AuthenticationPrincipal DatawaveUserDe public QueryGeometryResponse map(@AuthenticationPrincipal DatawaveUserDetails currentUser, @PathVariable("queryId") String queryId) { QueryGeometryResponse queryGeometryResponse = new QueryGeometryResponse(); BaseQueryMetricListResponse metricResponse = query(currentUser, queryId); - if (!metricResponse.getExceptions().isEmpty()) { + if (metricResponse.getExceptions() == null || metricResponse.getExceptions().isEmpty()) { + QueryGeometryResponse response = geometryHandler.getQueryGeometryResponse(queryId, metricResponse.getResult()); + response.setHtmlIncludePaths(pathPrefixMap); + return response; + } else { metricResponse.getExceptions().forEach(e -> queryGeometryResponse.addException(new QueryException(e.getMessage(), e.getCause(), e.getCode()))); return queryGeometryResponse; - } else { - return geometryHandler.getQueryGeometryResponse(queryId, metricResponse.getResult()); } } @@ -478,8 +779,6 @@ public CacheStats getCacheStats() { CacheStats cacheStats = new CacheStats(); IMap incomingCacheHz = ((IMap) incomingQueryMetricsCache.getNativeCache()); cacheStats.setIncomingQueryMetrics(this.stats.getLocalMapStats(incomingCacheHz.getLocalMapStats())); - IMap lastWrittenCacheHz = ((IMap) lastWrittenQueryMetricCache.getNativeCache()); - cacheStats.setLastWrittenQueryMetrics(this.stats.getLocalMapStats(lastWrittenCacheHz.getLocalMapStats())); cacheStats.setServiceStats(this.stats.formatStats(this.stats.getServiceStats(), true)); cacheStats.setMemberUuid(getClusterLocalMemberUuid()); try { diff --git a/service/src/main/java/datawave/microservice/querymetric/QueryMetricOperationsStats.java b/service/src/main/java/datawave/microservice/querymetric/QueryMetricOperationsStats.java index 18f491da..d3bed256 100644 --- a/service/src/main/java/datawave/microservice/querymetric/QueryMetricOperationsStats.java +++ b/service/src/main/java/datawave/microservice/querymetric/QueryMetricOperationsStats.java @@ -3,7 +3,6 @@ import static java.util.concurrent.TimeUnit.MINUTES; import static datawave.microservice.querymetric.config.HazelcastMetricCacheConfiguration.INCOMING_METRICS; -import static datawave.microservice.querymetric.config.HazelcastMetricCacheConfiguration.LAST_WRITTEN_METRICS; import java.net.InetAddress; import java.text.DecimalFormat; @@ -20,6 +19,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.cache.Cache; import org.springframework.cache.CacheManager; @@ -40,6 +40,17 @@ public class QueryMetricOperationsStats { private Logger log = LoggerFactory.getLogger(getClass()); + private static final String RatePerSec_1_Min_Avg = ".RatePerSec_1_Min_Avg"; + private static final String RatePerSec_5_Min_Avg = ".RatePerSec_5_Min_Avg"; + private static final String RatePerSec_15_Min_Avg = ".RatePerSec_15_Min_Avg"; + private static final String Latency_Mean = ".Latency_Mean"; + private static final String Latency_Median = ".Latency_Median"; + private static final String Latency_Max = ".Latency_Max"; + private static final String Latency_Min = ".Latency_Min"; + private static final String Latency_75 = ".Latency_75"; + private static final String Latency_95 = ".Latency_95"; + private static final String Latency_99 = ".Latency_99"; + private static final String Latency_999 = ".Latency_999"; private Map timerMap = new HashMap<>(); private Map meterMap = new HashMap<>(); private TcpClient timelyTcpClient; @@ -49,6 +60,7 @@ public class QueryMetricOperationsStats { protected ShardTableQueryMetricHandler handler; protected AccumuloMapStore mapStore; protected CacheManager cacheManager; + protected Cache lastWrittenCache; protected Map hostCountMap = new HashMap<>(); protected Map userCountMap = new HashMap<>(); protected Map logicCountMap = new HashMap<>(); @@ -56,11 +68,11 @@ public class QueryMetricOperationsStats { protected Map staticTags = new LinkedHashMap<>(); public enum TIMERS { - STORE + MESSAGE_SEND, REST, STORE } public enum METERS { - REST, MESSAGE + MESSAGE_RECEIVE } /* @@ -70,11 +82,12 @@ public enum METERS { */ public QueryMetricOperationsStats(TimelyProperties timelyProperties, ShardTableQueryMetricHandler handler, CacheManager cacheManager, - AccumuloMapStore mapStore) { + @Qualifier("lastWrittenQueryMetrics") Cache lastWrittenCache, AccumuloMapStore mapStore) { this.timelyProperties = timelyProperties; this.handler = handler; this.mapStore = mapStore; this.cacheManager = cacheManager; + this.lastWrittenCache = lastWrittenCache; for (TIMERS name : TIMERS.values()) { this.timerMap.put(name, new Timer(new SlidingTimeWindowArrayReservoir(1, MINUTES))); } @@ -109,18 +122,16 @@ public Meter getMeter(METERS name) { return this.meterMap.get(name); } - protected void addLocalMapStats(Map serviceStats) { + protected void addCacheStats(Map serviceStats) { Cache incomingCache = this.cacheManager.getCache(INCOMING_METRICS); if (incomingCache != null) { IMap incomingQueryMetricsCache = ((IMap) incomingCache.getNativeCache()); serviceStats.put("incomingQueryMetrics.dirtyEntryCount", Double.valueOf(incomingQueryMetricsCache.getLocalMapStats().getDirtyEntryCount())); serviceStats.put("incomingQueryMetrics.ownedEntryCount", Double.valueOf(incomingQueryMetricsCache.getLocalMapStats().getOwnedEntryCount())); } - Cache lastWrittenCache = this.cacheManager.getCache(LAST_WRITTEN_METRICS); - if (lastWrittenCache != null) { - IMap lastWrittenQueryMetricCache = ((IMap) lastWrittenCache.getNativeCache()); - serviceStats.put("lastWrittenQueryMetric.dirtyEntryCount", Double.valueOf(lastWrittenQueryMetricCache.getLocalMapStats().getDirtyEntryCount())); - serviceStats.put("lastWrittenQueryMetric.ownedEntryCount", Double.valueOf(lastWrittenQueryMetricCache.getLocalMapStats().getOwnedEntryCount())); + if (this.lastWrittenCache != null) { + long estimatedSize = ((com.github.benmanes.caffeine.cache.Cache) this.lastWrittenCache.getNativeCache()).estimatedSize(); + serviceStats.put("lastWrittenQueryMetric.estimatedSize", Double.valueOf(estimatedSize)); } } @@ -130,7 +141,7 @@ public void writeServiceStatsToTimely() { long timestamp = System.currentTimeMillis(); Map serviceStatsDouble = getServiceStats(); - addLocalMapStats(serviceStatsDouble); + addCacheStats(serviceStatsDouble); Map serviceStats = formatStats(serviceStatsDouble, false); serviceStats.entrySet().forEach(entry -> { serviceStatsToWriteToTimely @@ -234,9 +245,11 @@ public Map formatStats(Map stats, boolean useSepar public Map getServiceStats() { Map stats = new LinkedHashMap<>(); addTimerStats("store", getTimer(TIMERS.STORE), stats); - addTimerStats("accumulo", this.mapStore.getWriteTimer(), stats); - addMeterStats("message", getMeter(METERS.MESSAGE), stats); - addMeterStats("rest", getMeter(METERS.REST), stats); + addTimerStats("accumulo.write", this.mapStore.getWriteTimer(), stats); + addTimerStats("accumulo.read", this.mapStore.getReadTimer(), stats); + addTimerStats("message.send", getTimer(TIMERS.MESSAGE_SEND), stats); + addTimerStats("rest", getTimer(TIMERS.REST), stats); + addMeterStats("message.receive", getMeter(METERS.MESSAGE_RECEIVE), stats); return stats; } @@ -245,38 +258,40 @@ public void queueTimelyMetrics(QueryMetricUpdate update) { } public void queueTimelyMetrics(BaseQueryMetric queryMetric) { - String queryType = queryMetric.getQueryType(); - if (this.timelyProperties.isEnabled() && queryType != null && queryType.equalsIgnoreCase("RunningQuery")) { - BaseQueryMetric.Lifecycle lifecycle = queryMetric.getLifecycle(); - String host = queryMetric.getHost(); - String user = queryMetric.getUser(); - String logic = queryMetric.getQueryLogic(); - if (lifecycle.equals(BaseQueryMetric.Lifecycle.CLOSED) || lifecycle.equals(BaseQueryMetric.Lifecycle.CANCELLED)) { - long createDate = queryMetric.getCreateDate().getTime(); - // write ELAPSED_TIME - this.queryStatsToWriteToTimely.add("put dw.query.metrics.ELAPSED_TIME " + createDate + " " + queryMetric.getElapsedTime() + " HOST=" + host - + getCommonTags() + "\n"); - this.queryStatsToWriteToTimely.add("put dw.query.metrics.ELAPSED_TIME " + createDate + " " + queryMetric.getElapsedTime() + " USER=" + user - + getCommonTags() + "\n"); - this.queryStatsToWriteToTimely.add("put dw.query.metrics.ELAPSED_TIME " + createDate + " " + queryMetric.getElapsedTime() + " QUERY_LOGIC=" - + logic + getCommonTags() + "\n"); - - // write NUM_RESULTS - this.queryStatsToWriteToTimely.add("put dw.query.metrics.NUM_RESULTS " + createDate + " " + queryMetric.getNumResults() + " HOST=" + host - + getCommonTags() + "\n"); - this.queryStatsToWriteToTimely.add("put dw.query.metrics.NUM_RESULTS " + createDate + " " + queryMetric.getNumResults() + " USER=" + user - + getCommonTags() + "\n"); - this.queryStatsToWriteToTimely.add("put dw.query.metrics.NUM_RESULTS " + createDate + " " + queryMetric.getNumResults() + " QUERY_LOGIC=" - + logic + getCommonTags() + "\n"); - } else if (lifecycle.equals(BaseQueryMetric.Lifecycle.INITIALIZED)) { - // aggregate these metrics for later writing to timely - synchronized (this.hostCountMap) { - Long hostCount = this.hostCountMap.get(host); - this.hostCountMap.put(host, hostCount == null ? 1l : hostCount + 1); - Long userCount = this.userCountMap.get(user); - this.userCountMap.put(user, userCount == null ? 1l : userCount + 1); - Long logicCount = this.logicCountMap.get(logic); - this.logicCountMap.put(logic, logicCount == null ? 1l : logicCount + 1); + if (this.timelyProperties.isEnabled()) { + String queryType = queryMetric.getQueryType(); + if (queryType != null && queryType.equalsIgnoreCase("RunningQuery")) { + BaseQueryMetric.Lifecycle lifecycle = queryMetric.getLifecycle(); + String host = queryMetric.getHost(); + String user = queryMetric.getUser(); + String logic = queryMetric.getQueryLogic(); + if (lifecycle.equals(BaseQueryMetric.Lifecycle.CLOSED) || lifecycle.equals(BaseQueryMetric.Lifecycle.CANCELLED)) { + long createDate = queryMetric.getCreateDate().getTime(); + // write ELAPSED_TIME + this.queryStatsToWriteToTimely.add("put dw.query.metrics.ELAPSED_TIME " + createDate + " " + queryMetric.getElapsedTime() + " HOST=" + host + + getCommonTags() + "\n"); + this.queryStatsToWriteToTimely.add("put dw.query.metrics.ELAPSED_TIME " + createDate + " " + queryMetric.getElapsedTime() + " USER=" + user + + getCommonTags() + "\n"); + this.queryStatsToWriteToTimely.add("put dw.query.metrics.ELAPSED_TIME " + createDate + " " + queryMetric.getElapsedTime() + " QUERY_LOGIC=" + + logic + getCommonTags() + "\n"); + + // write NUM_RESULTS + this.queryStatsToWriteToTimely.add("put dw.query.metrics.NUM_RESULTS " + createDate + " " + queryMetric.getNumResults() + " HOST=" + host + + getCommonTags() + "\n"); + this.queryStatsToWriteToTimely.add("put dw.query.metrics.NUM_RESULTS " + createDate + " " + queryMetric.getNumResults() + " USER=" + user + + getCommonTags() + "\n"); + this.queryStatsToWriteToTimely.add("put dw.query.metrics.NUM_RESULTS " + createDate + " " + queryMetric.getNumResults() + " QUERY_LOGIC=" + + logic + getCommonTags() + "\n"); + } else if (lifecycle.equals(BaseQueryMetric.Lifecycle.INITIALIZED)) { + // aggregate these metrics for later writing to timely + synchronized (this.hostCountMap) { + Long hostCount = this.hostCountMap.get(host); + this.hostCountMap.put(host, hostCount == null ? 1l : hostCount + 1); + Long userCount = this.userCountMap.get(user); + this.userCountMap.put(user, userCount == null ? 1l : userCount + 1); + Long logicCount = this.logicCountMap.get(logic); + this.logicCountMap.put(logic, logicCount == null ? 1l : logicCount + 1); + } } } } @@ -296,15 +311,15 @@ protected String getCommonTags() { public void logServiceStats() { Map stats = getServiceStats(); Map serviceStats = formatStats(stats, true); - String storeRate1 = serviceStats.get("storeRatePerSec_1_Min_Avg"); - String storeRate5 = serviceStats.get("storeRatePerSec_5_Min_Avg"); - String storeRate15 = serviceStats.get("storeRatePerSec_15_Min_Avg"); - String storeLat = serviceStats.get("storeLatency_Mean"); + String storeRate1 = serviceStats.get("store" + RatePerSec_1_Min_Avg); + String storeRate5 = serviceStats.get("store" + RatePerSec_5_Min_Avg); + String storeRate15 = serviceStats.get("store" + RatePerSec_15_Min_Avg); + String storeLat = serviceStats.get("store" + Latency_Mean); log.info("storeMetric rates/sec 1m={} 5m={} 15m={} opLatMs={}", storeRate1, storeRate5, storeRate15, storeLat); - String accumuloRate1 = serviceStats.get("accumuloRatePerSec_1_Min_Avg"); - String accumuloRate5 = serviceStats.get("accumuloRatePerSec_5_Min_Avg"); - String accumuloRate15 = serviceStats.get("accumuloRatePerSec_15_Min_Avg"); - String accumuloLat = serviceStats.get("accumuloLatency_Mean"); + String accumuloRate1 = serviceStats.get("accumulo.write" + RatePerSec_1_Min_Avg); + String accumuloRate5 = serviceStats.get("accumulo.write" + RatePerSec_5_Min_Avg); + String accumuloRate15 = serviceStats.get("accumulo.write" + RatePerSec_15_Min_Avg); + String accumuloLat = serviceStats.get("accumulo.write" + Latency_Mean); log.info("accumulo rates/sec 1m={} 5m={} 15m={} opLatMs={}", accumuloRate1, accumuloRate5, accumuloRate15, accumuloLat); } @@ -333,22 +348,22 @@ public void queueAggregatedQueryStatsForTimely() { private void addTimerStats(String baseName, Timer timer, Map stats) { Snapshot snapshot = timer.getSnapshot(); - stats.put(baseName + "Latency_Mean", snapshot.getMean() / 1000000); - stats.put(baseName + "Latency_Median", snapshot.getMedian() / 1000000); - stats.put(baseName + "Latency_Max", ((Number) snapshot.getMax()).doubleValue() / 1000000); - stats.put(baseName + "Latency_Min", ((Number) snapshot.getMin()).doubleValue() / 1000000); - stats.put(baseName + "Latency_75", snapshot.get75thPercentile() / 1000000); - stats.put(baseName + "Latency_95", snapshot.get95thPercentile() / 1000000); - stats.put(baseName + "Latency_99", snapshot.get99thPercentile() / 1000000); - stats.put(baseName + "Latency_999", snapshot.get999thPercentile() / 1000000); - stats.put(baseName + "RatePerSec_1_Min_Avg", timer.getOneMinuteRate()); - stats.put(baseName + "RatePerSec_5_Min_Avg", timer.getFiveMinuteRate()); - stats.put(baseName + "RatePerSec_15_Min_Avg", timer.getFifteenMinuteRate()); + stats.put(baseName + Latency_Mean, snapshot.getMean() / 1000000); + stats.put(baseName + Latency_Median, snapshot.getMedian() / 1000000); + stats.put(baseName + Latency_Max, ((Number) snapshot.getMax()).doubleValue() / 1000000); + stats.put(baseName + Latency_Min, ((Number) snapshot.getMin()).doubleValue() / 1000000); + stats.put(baseName + Latency_75, snapshot.get75thPercentile() / 1000000); + stats.put(baseName + Latency_95, snapshot.get95thPercentile() / 1000000); + stats.put(baseName + Latency_99, snapshot.get99thPercentile() / 1000000); + stats.put(baseName + Latency_999, snapshot.get999thPercentile() / 1000000); + stats.put(baseName + RatePerSec_1_Min_Avg, timer.getOneMinuteRate()); + stats.put(baseName + RatePerSec_5_Min_Avg, timer.getFiveMinuteRate()); + stats.put(baseName + RatePerSec_15_Min_Avg, timer.getFifteenMinuteRate()); } private void addMeterStats(String baseName, Metered meter, Map stats) { - stats.put(baseName + "RatePerSec_1_Min_Avg", meter.getOneMinuteRate()); - stats.put(baseName + "RatePerSec_5_Min_Avg", meter.getFiveMinuteRate()); - stats.put(baseName + "RatePerSec_15_Min_Avg", meter.getFifteenMinuteRate()); + stats.put(baseName + RatePerSec_1_Min_Avg, meter.getOneMinuteRate()); + stats.put(baseName + RatePerSec_5_Min_Avg, meter.getFiveMinuteRate()); + stats.put(baseName + RatePerSec_15_Min_Avg, meter.getFifteenMinuteRate()); } } diff --git a/service/src/main/java/datawave/microservice/querymetric/QueryMetricUpdateHolder.java b/service/src/main/java/datawave/microservice/querymetric/QueryMetricUpdateHolder.java index 8d12812d..99112e42 100644 --- a/service/src/main/java/datawave/microservice/querymetric/QueryMetricUpdateHolder.java +++ b/service/src/main/java/datawave/microservice/querymetric/QueryMetricUpdateHolder.java @@ -8,12 +8,12 @@ public class QueryMetricUpdateHolder extends QueryMetricUpdate { private boolean persisted = false; - private Lifecycle lowestLifecycleSincePersist; + private Lifecycle lowestLifecycle; private Map values = new HashMap<>(); public QueryMetricUpdateHolder(T metric, QueryMetricType metricType) { super(metric, metricType); - this.lowestLifecycleSincePersist = this.metric.getLifecycle(); + this.lowestLifecycle = this.metric.getLifecycle(); } public QueryMetricUpdateHolder(T metric) { @@ -27,7 +27,7 @@ public QueryMetricUpdateHolder(QueryMetricUpdate metricUpdate) { // If we know that this metric has been persisted by the AccumuloMapStore, then it is not new // Because the metric can be ejected from the incoming cache, we also track the lowest lifecycle public boolean isNewMetric() { - return !persisted && (lowestLifecycleSincePersist == null || lowestLifecycleSincePersist.equals(Lifecycle.DEFINED)); + return !persisted && (lowestLifecycle == null || lowestLifecycle.equals(Lifecycle.DEFINED)); } public void addValue(String key, Long value) { @@ -46,21 +46,29 @@ public Long getValue(String key) { } } - public void persisted() { + public void setPersisted() { persisted = true; values.clear(); - lowestLifecycleSincePersist = null; + lowestLifecycle = null; } - public Lifecycle getLowestLifecycleSincePersist() { - return lowestLifecycleSincePersist; + public Lifecycle getLowestLifecycle() { + return lowestLifecycle; + } + + public void updateLowestLifecycle(Lifecycle lifecycle) { + if (!persisted && lifecycle != null) { + if (this.lowestLifecycle == null || (lifecycle.ordinal() < this.lowestLifecycle.ordinal())) { + this.lowestLifecycle = lifecycle; + } + } } @Override public void setMetric(T metric) { super.setMetric(metric); - if (this.lowestLifecycleSincePersist == null || this.metric.getLifecycle().ordinal() < this.lowestLifecycleSincePersist.ordinal()) { - this.lowestLifecycleSincePersist = this.metric.getLifecycle(); + if (this.lowestLifecycle == null || this.metric.getLifecycle().ordinal() < this.lowestLifecycle.ordinal()) { + this.lowestLifecycle = this.metric.getLifecycle(); } } } diff --git a/service/src/main/java/datawave/microservice/querymetric/config/CorrelatorConfiguration.java b/service/src/main/java/datawave/microservice/querymetric/config/CorrelatorConfiguration.java new file mode 100644 index 00000000..d2ba0f95 --- /dev/null +++ b/service/src/main/java/datawave/microservice/querymetric/config/CorrelatorConfiguration.java @@ -0,0 +1,10 @@ +package datawave.microservice.querymetric.config; + +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@Configuration +@EnableConfigurationProperties({CorrelatorProperties.class}) +public class CorrelatorConfiguration { + +} diff --git a/service/src/main/java/datawave/microservice/querymetric/config/CorrelatorProperties.java b/service/src/main/java/datawave/microservice/querymetric/config/CorrelatorProperties.java new file mode 100644 index 00000000..7e8e6afc --- /dev/null +++ b/service/src/main/java/datawave/microservice/querymetric/config/CorrelatorProperties.java @@ -0,0 +1,35 @@ +package datawave.microservice.querymetric.config; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties(prefix = "datawave.query.metric.correlator") +public class CorrelatorProperties { + + private long maxCorrelationTimeMs = 30000; + private long maxCorrelationQueueSize = 1000; + private boolean enabled = true; + + public long getMaxCorrelationTimeMs() { + return maxCorrelationTimeMs; + } + + public void setMaxCorrelationTimeMs(long maxCorrelationTimeMs) { + this.maxCorrelationTimeMs = maxCorrelationTimeMs; + } + + public long getMaxCorrelationQueueSize() { + return maxCorrelationQueueSize; + } + + public void setMaxCorrelationQueueSize(long maxCorrelationQueueSize) { + this.maxCorrelationQueueSize = maxCorrelationQueueSize; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public boolean isEnabled() { + return enabled; + } +} diff --git a/service/src/main/java/datawave/microservice/querymetric/config/HazelcastMetricCacheConfiguration.java b/service/src/main/java/datawave/microservice/querymetric/config/HazelcastMetricCacheConfiguration.java index 2b76eb4c..dc646e19 100644 --- a/service/src/main/java/datawave/microservice/querymetric/config/HazelcastMetricCacheConfiguration.java +++ b/service/src/main/java/datawave/microservice/querymetric/config/HazelcastMetricCacheConfiguration.java @@ -14,6 +14,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.cache.Cache; import org.springframework.cloud.consul.discovery.ConsulDiscoveryProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -25,7 +26,6 @@ import com.hazelcast.config.JoinConfig; import com.hazelcast.config.ListenerConfig; import com.hazelcast.config.MapConfig; -import com.hazelcast.config.MapStoreConfig; import com.hazelcast.config.TcpIpConfig; import com.hazelcast.config.XmlConfigBuilder; import com.hazelcast.core.Hazelcast; @@ -49,7 +49,6 @@ public class HazelcastMetricCacheConfiguration { private Logger log = LoggerFactory.getLogger(HazelcastMetricCacheConfiguration.class); - public static final String LAST_WRITTEN_METRICS = "lastWrittenQueryMetrics"; public static final String INCOMING_METRICS = "incomingQueryMetrics"; @Value("${hazelcast.clusterName:${spring.application.name}}") @@ -63,27 +62,19 @@ public HazelcastCacheManager queryMetricCacheManager(@Qualifier("metrics") Hazel @Bean @Qualifier("metrics") HazelcastInstance hazelcastInstance(Config config, @Qualifier("store") AccumuloMapStore mapStore, @Qualifier("loader") AccumuloMapLoader mapLoader, - MergeLockLifecycleListener lifecycleListener) { + MergeLockLifecycleListener lifecycleListener, @Qualifier("lastWrittenQueryMetrics") Cache lastWrittenCache) { // Autowire both the AccumuloMapStore and AccumuloMapLoader so that they both get created // Ensure that the lastWrittenQueryMetricCache is set into the MapStore before the instance is active and the writeLock is released - lifecycleListener.writeLockRunnable.lock(60000, LifecycleEvent.LifecycleState.STARTING); + lifecycleListener.writeLockRunnable.lock(LifecycleEvent.LifecycleState.STARTING); HazelcastInstance instance = Hazelcast.newHazelcastInstance(config); try { HazelcastCacheManager cacheManager = new HazelcastCacheManager(instance); - HazelcastCache lastWrittenQueryMetricsCache = (HazelcastCache) cacheManager.getCache(LAST_WRITTEN_METRICS); - lastWrittenQueryMetricsCache.getNativeCache().addEntryListener(new MetricMapListener(LAST_WRITTEN_METRICS), true); - HazelcastCache incomingMetricsCache = (HazelcastCache) cacheManager.getCache(INCOMING_METRICS); incomingMetricsCache.getNativeCache().addEntryListener(new MetricMapListener(INCOMING_METRICS), true); - MapStoreConfig mapStoreConfig = config.getMapConfigs().get(LAST_WRITTEN_METRICS).getMapStoreConfig(); - if (mapStoreConfig.getInitialLoadMode().equals(MapStoreConfig.InitialLoadMode.LAZY)) { - // prompts loading all keys otherwise we are getting a deadlock - lastWrittenQueryMetricsCache.getNativeCache().size(); - } - mapStore.setLastWrittenQueryMetricCache(lastWrittenQueryMetricsCache); + mapStore.setLastWrittenQueryMetricCache(lastWrittenCache); System.setProperty("hzAddress", instance.getCluster().getLocalMember().getAddress().toString()); System.setProperty("hzUuid", instance.getCluster().getLocalMember().getUuid().toString()); } catch (Exception e) { diff --git a/service/src/main/java/datawave/microservice/querymetric/config/QueryMetricCacheConfiguration.java b/service/src/main/java/datawave/microservice/querymetric/config/QueryMetricCacheConfiguration.java new file mode 100644 index 00000000..f8cbef23 --- /dev/null +++ b/service/src/main/java/datawave/microservice/querymetric/config/QueryMetricCacheConfiguration.java @@ -0,0 +1,39 @@ +package datawave.microservice.querymetric.config; + +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.cache.Cache; +import org.springframework.cache.caffeine.CaffeineCache; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import com.github.benmanes.caffeine.cache.Caffeine; + +import datawave.microservice.querymetric.persistence.MetricCacheListener; + +@Configuration +@EnableConfigurationProperties({QueryMetricCacheProperties.class}) +public class QueryMetricCacheConfiguration { + + private Logger log = LoggerFactory.getLogger(QueryMetricCacheConfiguration.class); + public static final String LAST_WRITTEN_METRICS = "lastWrittenQueryMetrics"; + + @Bean + @Qualifier("lastWrittenQueryMetrics") + public Cache lastWrittenQueryMetrics(QueryMetricCacheProperties cacheProperties) { + // @formatter:off + QueryMetricCacheProperties.Cache lastWrittenQueryMetrics = cacheProperties.getLastWrittenQueryMetrics(); + return new CaffeineCache("lastWrittenQueryMetrics", + Caffeine.newBuilder() + .initialCapacity(lastWrittenQueryMetrics.getMaximumSize()) + .maximumSize(lastWrittenQueryMetrics.getMaximumSize()) + .expireAfterWrite(lastWrittenQueryMetrics.getTtlSeconds(), TimeUnit.SECONDS) + .removalListener(new MetricCacheListener(LAST_WRITTEN_METRICS)) + .build(), false); + // @formatter:on + } +} diff --git a/service/src/main/java/datawave/microservice/querymetric/config/QueryMetricCacheProperties.java b/service/src/main/java/datawave/microservice/querymetric/config/QueryMetricCacheProperties.java new file mode 100644 index 00000000..6a6ad974 --- /dev/null +++ b/service/src/main/java/datawave/microservice/querymetric/config/QueryMetricCacheProperties.java @@ -0,0 +1,44 @@ +package datawave.microservice.querymetric.config; + +import javax.validation.constraints.PositiveOrZero; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.validation.annotation.Validated; + +@Validated +@ConfigurationProperties(prefix = "datawave.query.metric.cache") +public class QueryMetricCacheProperties { + + private Cache lastWrittenQueryMetrics = new Cache(); + + public void setLastWrittenQueryMetrics(Cache lastWrittenQueryMetrics) { + this.lastWrittenQueryMetrics = lastWrittenQueryMetrics; + } + + public Cache getLastWrittenQueryMetrics() { + return lastWrittenQueryMetrics; + } + + public class Cache { + @PositiveOrZero + private int maximumSize = 5000; + @PositiveOrZero + private long ttlSeconds = 600; + + public void setMaximumSize(int maximumSize) { + this.maximumSize = maximumSize; + } + + public int getMaximumSize() { + return maximumSize; + } + + public void setTtlSeconds(long ttlSeconds) { + this.ttlSeconds = ttlSeconds; + } + + public long getTtlSeconds() { + return ttlSeconds; + } + } +} diff --git a/service/src/main/java/datawave/microservice/querymetric/config/QueryMetricHandlerConfiguration.java b/service/src/main/java/datawave/microservice/querymetric/config/QueryMetricHandlerConfiguration.java index 493fc573..39020c01 100644 --- a/service/src/main/java/datawave/microservice/querymetric/config/QueryMetricHandlerConfiguration.java +++ b/service/src/main/java/datawave/microservice/querymetric/config/QueryMetricHandlerConfiguration.java @@ -31,9 +31,11 @@ import datawave.core.query.result.event.DefaultResponseObjectFactory; import datawave.marking.MarkingFunctions; import datawave.microservice.querymetric.BaseQueryMetric; +import datawave.microservice.querymetric.Correlator; import datawave.microservice.querymetric.QueryMetricFactory; import datawave.microservice.querymetric.QueryMetricFactoryImpl; import datawave.microservice.querymetric.QueryMetricOperations; +import datawave.microservice.querymetric.QueryMetricOperationsStats; import datawave.microservice.querymetric.factory.BaseQueryMetricListResponseFactory; import datawave.microservice.querymetric.factory.QueryMetricListResponseFactory; import datawave.microservice.querymetric.factory.QueryMetricQueryLogicFactory; @@ -56,12 +58,12 @@ import datawave.webservice.query.result.event.ResponseObjectFactory; @Configuration -@EnableConfigurationProperties({QueryMetricHandlerProperties.class, TimelyProperties.class}) +@EnableConfigurationProperties({QueryMetricProperties.class, QueryMetricHandlerProperties.class, TimelyProperties.class}) public class QueryMetricHandlerConfiguration { @Bean - public QueryMetricConsumer queryMetricSink(QueryMetricOperations queryMetricOperations) { - return new QueryMetricConsumer(queryMetricOperations); + public QueryMetricConsumer queryMetricSink(QueryMetricOperations queryMetricOperations, Correlator correlator, QueryMetricOperationsStats stats) { + return new QueryMetricConsumer(queryMetricOperations, correlator, stats); } @Bean @@ -94,10 +96,11 @@ QueryMetricFactory queryMetricFactory() { public ShardTableQueryMetricHandler shardTableQueryMetricHandler(QueryMetricHandlerProperties queryMetricHandlerProperties, @Qualifier("warehouse") AccumuloClientPool accumuloClientPool, QueryMetricQueryLogicFactory logicFactory, QueryMetricFactory metricFactory, MarkingFunctions markingFunctions, QueryMetricCombiner queryMetricCombiner, LuceneToJexlQueryParser luceneToJexlQueryParser, - WebClient.Builder webClientBuilder, @Autowired(required = false) JWTTokenHandler jwtTokenHandler, DnUtils dnUtils) { + ResponseObjectFactory responseObjectFactory, WebClient.Builder webClientBuilder, + @Autowired(required = false) JWTTokenHandler jwtTokenHandler, DnUtils dnUtils) { if (queryMetricHandlerProperties.isUseRemoteQuery()) { return new RemoteShardTableQueryMetricHandler(queryMetricHandlerProperties, accumuloClientPool, logicFactory, metricFactory, markingFunctions, - queryMetricCombiner, luceneToJexlQueryParser, webClientBuilder, jwtTokenHandler, dnUtils); + queryMetricCombiner, luceneToJexlQueryParser, responseObjectFactory, webClientBuilder, jwtTokenHandler, dnUtils); } else { return new LocalShardTableQueryMetricHandler(queryMetricHandlerProperties, accumuloClientPool, logicFactory, metricFactory, markingFunctions, queryMetricCombiner, luceneToJexlQueryParser, dnUtils); diff --git a/service/src/main/java/datawave/microservice/querymetric/config/QueryMetricProperties.java b/service/src/main/java/datawave/microservice/querymetric/config/QueryMetricProperties.java new file mode 100644 index 00000000..a43af671 --- /dev/null +++ b/service/src/main/java/datawave/microservice/querymetric/config/QueryMetricProperties.java @@ -0,0 +1,79 @@ +package datawave.microservice.querymetric.config; + +import java.util.concurrent.TimeUnit; + +import javax.validation.Valid; +import javax.validation.constraints.PositiveOrZero; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.validation.annotation.Validated; + +@Validated +@ConfigurationProperties(prefix = "datawave.query.metric") +public class QueryMetricProperties { + private boolean confirmAckEnabled = true; + private long confirmAckTimeoutMillis = 500L; + + @Valid + private Retry retry = new Retry(); + + public boolean isConfirmAckEnabled() { + return confirmAckEnabled; + } + + public void setConfirmAckEnabled(boolean confirmAckEnabled) { + this.confirmAckEnabled = confirmAckEnabled; + } + + public long getConfirmAckTimeoutMillis() { + return confirmAckTimeoutMillis; + } + + public void setConfirmAckTimeoutMillis(long confirmAckTimeoutMillis) { + this.confirmAckTimeoutMillis = confirmAckTimeoutMillis; + } + + public Retry getRetry() { + return retry; + } + + public void setRetry(Retry retry) { + this.retry = retry; + } + + @Validated + public static class Retry { + @PositiveOrZero + private int maxAttempts = 10; + + @PositiveOrZero + private long failTimeoutMillis = TimeUnit.MINUTES.toMillis(5); + + @PositiveOrZero + private long backoffIntervalMillis = TimeUnit.SECONDS.toMillis(5); + + public int getMaxAttempts() { + return maxAttempts; + } + + public void setMaxAttempts(int maxAttempts) { + this.maxAttempts = maxAttempts; + } + + public long getFailTimeoutMillis() { + return failTimeoutMillis; + } + + public void setFailTimeoutMillis(long failTimeoutMillis) { + this.failTimeoutMillis = failTimeoutMillis; + } + + public long getBackoffIntervalMillis() { + return backoffIntervalMillis; + } + + public void setBackoffIntervalMillis(long backoffIntervalMillis) { + this.backoffIntervalMillis = backoffIntervalMillis; + } + } +} diff --git a/service/src/main/java/datawave/microservice/querymetric/config/StatsConfiguration.java b/service/src/main/java/datawave/microservice/querymetric/config/StatsConfiguration.java index 9d231c20..fec48ef0 100644 --- a/service/src/main/java/datawave/microservice/querymetric/config/StatsConfiguration.java +++ b/service/src/main/java/datawave/microservice/querymetric/config/StatsConfiguration.java @@ -2,7 +2,9 @@ import javax.inject.Named; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.cache.Cache; import org.springframework.cache.CacheManager; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -17,7 +19,8 @@ public class StatsConfiguration { @Bean @ConditionalOnMissingBean QueryMetricOperationsStats queryMetricOperationsStats(TimelyProperties timelyProperties, ShardTableQueryMetricHandler handler, - @Named("queryMetricCacheManager") CacheManager cacheManager, AccumuloMapStore mapStore) { - return new QueryMetricOperationsStats(timelyProperties, handler, cacheManager, mapStore); + @Named("queryMetricCacheManager") CacheManager cacheManager, @Qualifier("lastWrittenQueryMetrics") Cache lastWrittenCache, + AccumuloMapStore mapStore) { + return new QueryMetricOperationsStats(timelyProperties, handler, cacheManager, lastWrittenCache, mapStore); } } diff --git a/service/src/main/java/datawave/microservice/querymetric/function/QueryMetricConsumer.java b/service/src/main/java/datawave/microservice/querymetric/function/QueryMetricConsumer.java index cabae393..3ef1e6b8 100644 --- a/service/src/main/java/datawave/microservice/querymetric/function/QueryMetricConsumer.java +++ b/service/src/main/java/datawave/microservice/querymetric/function/QueryMetricConsumer.java @@ -1,29 +1,77 @@ package datawave.microservice.querymetric.function; +import java.util.List; import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import datawave.microservice.querymetric.BaseQueryMetric; +import datawave.microservice.querymetric.Correlator; import datawave.microservice.querymetric.QueryMetricOperations; +import datawave.microservice.querymetric.QueryMetricOperationsStats; +import datawave.microservice.querymetric.QueryMetricType; import datawave.microservice.querymetric.QueryMetricUpdate; +import datawave.microservice.querymetric.QueryMetricUpdateHolder; public class QueryMetricConsumer implements Consumer { private final Logger log = LoggerFactory.getLogger(this.getClass()); private QueryMetricOperations queryMetricOperations; + private Correlator correlator; + private QueryMetricOperationsStats stats; - public QueryMetricConsumer(QueryMetricOperations queryMetricOperations) { + public QueryMetricConsumer(QueryMetricOperations queryMetricOperations, Correlator correlator, QueryMetricOperationsStats stats) { this.queryMetricOperations = queryMetricOperations; + this.correlator = correlator; + this.stats = stats; } @Override public void accept(QueryMetricUpdate queryMetricUpdate) { try { - queryMetricOperations.storeMetric(queryMetricUpdate); + this.stats.queueTimelyMetrics(queryMetricUpdate); + this.stats.getMeter(QueryMetricOperationsStats.METERS.MESSAGE_RECEIVE).mark(); + if (shouldCorrelate(queryMetricUpdate)) { + log.debug("adding update for {} to correlator", queryMetricUpdate.getMetric().getQueryId()); + this.correlator.addMetricUpdate(queryMetricUpdate); + } else { + log.debug("storing update for {}", queryMetricUpdate.getMetric().getQueryId()); + this.queryMetricOperations.storeMetricUpdate(new QueryMetricUpdateHolder(queryMetricUpdate)); + } + + if (this.correlator.isEnabled()) { + List correlatedUpdates; + do { + correlatedUpdates = this.correlator.getMetricUpdates(QueryMetricOperations.getInProcess()); + if (correlatedUpdates != null && !correlatedUpdates.isEmpty()) { + try { + String queryId = correlatedUpdates.get(0).getMetric().getQueryId(); + QueryMetricType metricType = correlatedUpdates.get(0).getMetricType(); + QueryMetricUpdateHolder metricUpdate = this.queryMetricOperations.combineMetricUpdates(correlatedUpdates, metricType); + log.debug("storing correlated updates for {}", queryId); + queryMetricOperations.storeMetricUpdate(metricUpdate); + } catch (Exception e) { + log.error("exception while combining correlated updates: " + e.getMessage(), e); + } + } + } while (correlatedUpdates != null && !correlatedUpdates.isEmpty()); + } } catch (Exception e) { log.error("Error processing query metric update message: {}", e.getMessage()); throw new RuntimeException(e); } } + + private boolean shouldCorrelate(QueryMetricUpdate update) { + // add the first update for a metric to get it into the cache + if ((update.getMetric().getLifecycle().ordinal() <= BaseQueryMetric.Lifecycle.DEFINED.ordinal())) { + return false; + } + if (this.correlator.isEnabled()) { + return true; + } else { + return false; + } + } } diff --git a/service/src/main/java/datawave/microservice/querymetric/handler/BaseQueryMetricHandler.java b/service/src/main/java/datawave/microservice/querymetric/handler/BaseQueryMetricHandler.java index d324b317..1467a68f 100644 --- a/service/src/main/java/datawave/microservice/querymetric/handler/BaseQueryMetricHandler.java +++ b/service/src/main/java/datawave/microservice/querymetric/handler/BaseQueryMetricHandler.java @@ -5,8 +5,8 @@ import java.util.Date; import java.util.List; -import org.apache.commons.jexl2.parser.ASTEQNode; -import org.apache.commons.jexl2.parser.ASTJexlScript; +import org.apache.commons.jexl3.parser.ASTEQNode; +import org.apache.commons.jexl3.parser.ASTJexlScript; import org.apache.commons.lang.time.DateUtils; import org.apache.log4j.Logger; diff --git a/service/src/main/java/datawave/microservice/querymetric/handler/ContentQueryMetricsIngestHelper.java b/service/src/main/java/datawave/microservice/querymetric/handler/ContentQueryMetricsIngestHelper.java index 467d4ff0..36bbd512 100644 --- a/service/src/main/java/datawave/microservice/querymetric/handler/ContentQueryMetricsIngestHelper.java +++ b/service/src/main/java/datawave/microservice/querymetric/handler/ContentQueryMetricsIngestHelper.java @@ -2,7 +2,7 @@ import java.text.SimpleDateFormat; import java.util.Collection; -import java.util.Date; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -34,8 +34,8 @@ public class ContentQueryMetricsIngestHelper extends CSVIngestHelper implements private Set contentIndexFields = new HashSet<>(); private HelperDelegate delegate; - public ContentQueryMetricsIngestHelper(boolean deleteMode) { - this(deleteMode, new HelperDelegate<>()); + public ContentQueryMetricsIngestHelper(boolean deleteMode, Collection ignoredFields) { + this(deleteMode, new HelperDelegate<>(ignoredFields)); } public ContentQueryMetricsIngestHelper(boolean deleteMode, HelperDelegate delegate) { @@ -94,6 +94,13 @@ public int getFieldSizeThreshold() { } public static class HelperDelegate { + private Collection ignoredFields = Collections.EMPTY_LIST; + + public HelperDelegate() {} + + public HelperDelegate(Collection ignoredFields) { + this.ignoredFields = ignoredFields; + } protected boolean isChanged(String updated, String stored) { if ((StringUtils.isBlank(stored) && StringUtils.isNotBlank(updated)) || (stored != null && updated != null && !stored.equals(updated))) { @@ -166,17 +173,25 @@ public Multimap getEventFieldsToWrite(T updated, T stored) { SimpleDateFormat sdf_date_time1 = new SimpleDateFormat("yyyyMMdd HHmmss"); SimpleDateFormat sdf_date_time2 = new SimpleDateFormat("yyyyMMdd HHmmss"); - if (isFirstWrite(updated.getPositiveSelectors(), stored == null ? null : stored.getPositiveSelectors())) { - fields.putAll("POSITIVE_SELECTORS", updated.getPositiveSelectors()); + if (!ignoredFields.contains("POSITIVE_SELECTORS")) { + if (isFirstWrite(updated.getPositiveSelectors(), stored == null ? null : stored.getPositiveSelectors())) { + fields.putAll("POSITIVE_SELECTORS", updated.getPositiveSelectors()); + } } - if (isFirstWrite(updated.getNegativeSelectors(), stored == null ? null : stored.getNegativeSelectors())) { - fields.putAll("NEGATIVE_SELECTORS", updated.getNegativeSelectors()); + if (!ignoredFields.contains("NEGATIVE_SELECTORS")) { + if (isFirstWrite(updated.getNegativeSelectors(), stored == null ? null : stored.getNegativeSelectors())) { + fields.putAll("NEGATIVE_SELECTORS", updated.getNegativeSelectors()); + } } - if (isFirstWrite(updated.getQueryAuthorizations(), stored == null ? null : stored.getQueryAuthorizations())) { - fields.put("AUTHORIZATIONS", updated.getQueryAuthorizations()); + if (!ignoredFields.contains("AUTHORIZATIONS")) { + if (isFirstWrite(updated.getQueryAuthorizations(), stored == null ? null : stored.getQueryAuthorizations())) { + fields.put("AUTHORIZATIONS", updated.getQueryAuthorizations()); + } } - if (isFirstWrite(updated.getBeginDate(), stored == null ? null : stored.getBeginDate())) { - fields.put("BEGIN_DATE", sdf_date_time1.format(updated.getBeginDate())); + if (!ignoredFields.contains("BEGIN_DATE")) { + if (isFirstWrite(updated.getBeginDate(), stored == null ? null : stored.getBeginDate())) { + fields.put("BEGIN_DATE", sdf_date_time1.format(updated.getBeginDate())); + } } if (isChanged(updated.getCreateCallTime(), stored == null ? -1 : stored.getCreateCallTime())) { fields.put("CREATE_CALL_TIME", Long.toString(updated.getCreateCallTime())); @@ -190,8 +205,10 @@ public Multimap getEventFieldsToWrite(T updated, T stored) { if (isChanged(updated.getElapsedTime(), stored == null ? -1 : stored.getElapsedTime())) { fields.put("ELAPSED_TIME", Long.toString(updated.getElapsedTime())); } - if (isFirstWrite(updated.getEndDate(), stored == null ? null : stored.getEndDate())) { - fields.put("END_DATE", sdf_date_time1.format(updated.getEndDate())); + if (!ignoredFields.contains("END_DATE")) { + if (isFirstWrite(updated.getEndDate(), stored == null ? null : stored.getEndDate())) { + fields.put("END_DATE", sdf_date_time1.format(updated.getEndDate())); + } } if (isChanged(updated.getErrorCode(), stored == null ? null : stored.getErrorCode())) { fields.put("ERROR_CODE", updated.getErrorCode()); @@ -237,14 +254,18 @@ public Multimap getEventFieldsToWrite(T updated, T stored) { if (isChanged(updated.getNumUpdates(), stored == null ? -1 : stored.getNumUpdates())) { fields.put("NUM_UPDATES", Long.toString(updated.getNumUpdates())); } - if (isFirstWrite(updated.getParameters(), stored == null ? null : stored.getParameters())) { - fields.put("PARAMETERS", QueryUtil.toParametersString(updated.getParameters())); + if (!ignoredFields.contains("PARAMETERS")) { + if (isFirstWrite(updated.getParameters(), stored == null ? null : stored.getParameters())) { + fields.put("PARAMETERS", QueryUtil.toParametersString(updated.getParameters())); + } } - if (isFirstWrite(updated.getPlan(), stored == null ? null : stored.getPlan())) { + if (isChanged(updated.getPlan(), stored == null ? null : stored.getPlan())) { fields.put("PLAN", updated.getPlan()); } - if (isFirstWrite(updated.getProxyServers(), stored == null ? null : stored.getProxyServers())) { - fields.put("PROXY_SERVERS", StringUtils.join(updated.getProxyServers(), ",")); + if (!ignoredFields.contains("PROXY_SERVERS")) { + if (isFirstWrite(updated.getProxyServers(), stored == null ? null : stored.getProxyServers())) { + fields.put("PROXY_SERVERS", StringUtils.join(updated.getProxyServers(), ",")); + } } Map storedPageMetricMap = new HashMap<>(); @@ -276,20 +297,28 @@ public Multimap getEventFieldsToWrite(T updated, T stored) { } } } - if (isFirstWrite(updated.getQuery(), stored == null ? null : stored.getQuery())) { - fields.put("QUERY", updated.getQuery()); + if (!ignoredFields.contains("QUERY")) { + if (isFirstWrite(updated.getQuery(), stored == null ? null : stored.getQuery())) { + fields.put("QUERY", updated.getQuery()); + } } if (isFirstWrite(updated.getQueryId(), stored == null ? null : stored.getQueryId())) { fields.put("QUERY_ID", updated.getQueryId()); } - if (isFirstWrite(updated.getQueryLogic(), stored == null ? null : stored.getQueryLogic())) { - fields.put("QUERY_LOGIC", updated.getQueryLogic()); + if (!ignoredFields.contains("QUERY_LOGIC")) { + if (isFirstWrite(updated.getQueryLogic(), stored == null ? null : stored.getQueryLogic())) { + fields.put("QUERY_LOGIC", updated.getQueryLogic()); + } } - if (isFirstWrite(updated.getQueryName(), stored == null ? null : stored.getQueryName())) { - fields.put("QUERY_NAME", updated.getQueryName()); + if (!ignoredFields.contains("QUERY_NAME")) { + if (isFirstWrite(updated.getQueryName(), stored == null ? null : stored.getQueryName())) { + fields.put("QUERY_NAME", updated.getQueryName()); + } } - if (isFirstWrite(updated.getQueryType(), stored == null ? null : stored.getQueryType())) { - fields.put("QUERY_TYPE", updated.getQueryType()); + if (!ignoredFields.contains("QUERY_TYPE")) { + if (isFirstWrite(updated.getQueryType(), stored == null ? null : stored.getQueryType())) { + fields.put("QUERY_TYPE", updated.getQueryType()); + } } if (isFirstWrite(updated.getSetupTime(), stored == null ? 0 : stored.getSetupTime(), 0)) { fields.put("SETUP_TIME", Long.toString(updated.getSetupTime())); @@ -300,18 +329,24 @@ public Multimap getEventFieldsToWrite(T updated, T stored) { if (isChanged(updated.getSourceCount(), stored == null ? -1 : stored.getSourceCount())) { fields.put("SOURCE_COUNT", Long.toString(updated.getSourceCount())); } - if (isFirstWrite(updated.getUser(), stored == null ? null : stored.getUser())) { - fields.put("USER", updated.getUser()); + if (!ignoredFields.contains("USER")) { + if (isFirstWrite(updated.getUser(), stored == null ? null : stored.getUser())) { + fields.put("USER", updated.getUser()); + } } - if (isFirstWrite(updated.getUserDN(), stored == null ? null : stored.getUserDN())) { - fields.put("USER_DN", updated.getUserDN()); + if (!ignoredFields.contains("USER_DN")) { + if (isFirstWrite(updated.getUserDN(), stored == null ? null : stored.getUserDN())) { + fields.put("USER_DN", updated.getUserDN()); + } } - if (isFirstWrite(updated.getVersionMap(), stored == null ? null : stored.getVersionMap())) { - Map versionMap = updated.getVersionMap(); - if (versionMap != null) { - versionMap.entrySet().stream().forEach(e -> { - fields.put("VERSION." + e.getKey().toUpperCase(), e.getValue()); - }); + if (!ignoredFields.contains("VERSION")) { + if (isFirstWrite(updated.getVersionMap(), stored == null ? null : stored.getVersionMap())) { + Map versionMap = updated.getVersionMap(); + if (versionMap != null) { + versionMap.entrySet().stream().forEach(e -> { + fields.put("VERSION." + e.getKey().toUpperCase(), e.getValue()); + }); + } } } if (isChanged(updated.getYieldCount(), stored == null ? -1 : stored.getYieldCount())) { @@ -404,6 +439,11 @@ public Multimap getEventFieldsToDelete(T updated, T stored) { } } } + if (stored.getPlan() != null) { + if (stored.getPlan() != null && isChanged(updated.getPlan(), stored.getPlan())) { + fields.put("PLAN", stored.getPlan()); + } + } if (isChanged(updated.getSeekCount(), stored.getSeekCount())) { fields.put("SEEK_COUNT", Long.toString(stored.getSeekCount())); } diff --git a/service/src/main/java/datawave/microservice/querymetric/handler/QueryGeometryHandler.java b/service/src/main/java/datawave/microservice/querymetric/handler/QueryGeometryHandler.java index 2d3e06ae..79cb1acb 100644 --- a/service/src/main/java/datawave/microservice/querymetric/handler/QueryGeometryHandler.java +++ b/service/src/main/java/datawave/microservice/querymetric/handler/QueryGeometryHandler.java @@ -3,7 +3,7 @@ import java.util.List; import datawave.microservice.querymetric.BaseQueryMetric; -import datawave.webservice.query.map.QueryGeometryResponse; +import datawave.microservice.querymetric.QueryGeometryResponse; public interface QueryGeometryHandler { diff --git a/service/src/main/java/datawave/microservice/querymetric/handler/QueryMetricCombiner.java b/service/src/main/java/datawave/microservice/querymetric/handler/QueryMetricCombiner.java index 8a33815d..9abe789a 100644 --- a/service/src/main/java/datawave/microservice/querymetric/handler/QueryMetricCombiner.java +++ b/service/src/main/java/datawave/microservice/querymetric/handler/QueryMetricCombiner.java @@ -6,6 +6,7 @@ import java.util.Map; import java.util.TreeMap; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,6 +26,11 @@ public T combineMetrics(T updatedQueryMetric, T cachedQueryMetric, QueryMetricTy // duplicate cachedQueryMetric so that we leave that object unchanged and return a combined metric combinedMetric = (T) cachedQueryMetric.duplicate(); + boolean inOrderUpdate = true; + if (updatedQueryMetric.getLastUpdated() != null && cachedQueryMetric.getLastUpdated() != null) { + inOrderUpdate = updatedQueryMetric.getLastUpdated().after(cachedQueryMetric.getLastUpdated()); + } + // only update once if (combinedMetric.getQueryType() == null && updatedQueryMetric.getQueryType() != null) { combinedMetric.setQueryType(updatedQueryMetric.getQueryType()); @@ -49,6 +55,7 @@ public T combineMetrics(T updatedQueryMetric, T cachedQueryMetric, QueryMetricTy if (combinedMetric.getQuery() == null && updatedQueryMetric.getQuery() != null) { combinedMetric.setQuery(updatedQueryMetric.getQuery()); } + // only update once if (combinedMetric.getHost() == null && updatedQueryMetric.getHost() != null) { combinedMetric.setHost(updatedQueryMetric.getHost()); @@ -111,7 +118,9 @@ public T combineMetrics(T updatedQueryMetric, T cachedQueryMetric, QueryMetricTy combinedMetric.setErrorCode(updatedQueryMetric.getErrorCode()); } // use updated lifecycle unless trying to update a final lifecycle with a non-final lifecycle - if ((combinedMetric.isLifecycleFinal() && !updatedQueryMetric.isLifecycleFinal()) == false) { + // or if updating with a lifecycle that is less than the current + if ((combinedMetric.isLifecycleFinal() && !updatedQueryMetric.isLifecycleFinal()) == false + && updatedQueryMetric.getLifecycle().compareTo(combinedMetric.getLifecycle()) > 0) { combinedMetric.setLifecycle(updatedQueryMetric.getLifecycle()); } // only update once @@ -156,16 +165,16 @@ public T combineMetrics(T updatedQueryMetric, T cachedQueryMetric, QueryMetricTy if (combinedMetric.getParameters() == null && updatedQueryMetric.getParameters() != null) { combinedMetric.setParameters(updatedQueryMetric.getParameters()); } - // only update once - if (combinedMetric.getSetupTime() > -1) { + // if updatedQueryMetric.setupTime is greater than combinedMetric.setupTime then update + if (updatedQueryMetric.getSetupTime() > combinedMetric.getSetupTime()) { combinedMetric.setSetupTime(updatedQueryMetric.getSetupTime()); } - // only update once - if (combinedMetric.getCreateCallTime() > -1) { + // if updatedQueryMetric.createCallTime is greater than combinedMetric.createCallTime then update + if (updatedQueryMetric.getCreateCallTime() > combinedMetric.getCreateCallTime()) { combinedMetric.setCreateCallTime(updatedQueryMetric.getCreateCallTime()); } - // only update once - if (combinedMetric.getLoginTime() > -1) { + // if updatedQueryMetric.loginTime is greater than combinedMetric.loginTime then update + if (updatedQueryMetric.getLoginTime() > combinedMetric.getLoginTime()) { combinedMetric.setLoginTime(updatedQueryMetric.getLoginTime()); } @@ -184,8 +193,8 @@ public T combineMetrics(T updatedQueryMetric, T cachedQueryMetric, QueryMetricTy combinedMetric.setDocRanges(updatedQueryMetric.getDocRanges()); combinedMetric.setFiRanges(updatedQueryMetric.getFiRanges()); } - // only update once - if (combinedMetric.getPlan() == null && updatedQueryMetric.getPlan() != null) { + // update if the update is in-order and the value changed + if (inOrderUpdate && isChanged(updatedQueryMetric.getPlan(), combinedMetric.getPlan())) { combinedMetric.setPlan(updatedQueryMetric.getPlan()); } // only update once @@ -250,4 +259,12 @@ protected PageMetric combinePageMetrics(PageMetric updated, PageMetric stored) { } return pm; } + + protected boolean isChanged(String updated, String stored) { + if ((StringUtils.isBlank(stored) && StringUtils.isNotBlank(updated)) || (stored != null && updated != null && !stored.equals(updated))) { + return true; + } else { + return false; + } + } } diff --git a/service/src/main/java/datawave/microservice/querymetric/handler/QueryMetricHandler.java b/service/src/main/java/datawave/microservice/querymetric/handler/QueryMetricHandler.java index 9bf00034..85e1fb21 100644 --- a/service/src/main/java/datawave/microservice/querymetric/handler/QueryMetricHandler.java +++ b/service/src/main/java/datawave/microservice/querymetric/handler/QueryMetricHandler.java @@ -1,5 +1,6 @@ package datawave.microservice.querymetric.handler; +import java.util.Collection; import java.util.Date; import java.util.Map; @@ -17,7 +18,7 @@ public interface QueryMetricHandler { Map getEventFields(BaseQueryMetric queryMetric); - ContentQueryMetricsIngestHelper getQueryMetricsIngestHelper(boolean deleteMode); + ContentQueryMetricsIngestHelper getQueryMetricsIngestHelper(boolean deleteMode, Collection ignoredFields); Query createQuery(); diff --git a/service/src/main/java/datawave/microservice/querymetric/handler/RemoteShardTableQueryMetricHandler.java b/service/src/main/java/datawave/microservice/querymetric/handler/RemoteShardTableQueryMetricHandler.java index c0c67fc1..1203d53b 100644 --- a/service/src/main/java/datawave/microservice/querymetric/handler/RemoteShardTableQueryMetricHandler.java +++ b/service/src/main/java/datawave/microservice/querymetric/handler/RemoteShardTableQueryMetricHandler.java @@ -28,22 +28,26 @@ import datawave.query.language.parser.jexl.LuceneToJexlQueryParser; import datawave.security.authorization.DatawaveUser; import datawave.security.authorization.JWTTokenHandler; +import datawave.webservice.query.result.event.ResponseObjectFactory; import datawave.webservice.result.BaseQueryResponse; public class RemoteShardTableQueryMetricHandler extends ShardTableQueryMetricHandler { private static final Logger log = LoggerFactory.getLogger(RemoteShardTableQueryMetricHandler.class); private DatawaveUserDetails userDetails; + private final ResponseObjectFactory responseObjectFactory; private final WebClient webClient; private final WebClient authWebClient; private final JWTTokenHandler jwtTokenHandler; public RemoteShardTableQueryMetricHandler(QueryMetricHandlerProperties queryMetricHandlerProperties, @Qualifier("warehouse") AccumuloClientPool clientPool, QueryMetricQueryLogicFactory logicFactory, QueryMetricFactory metricFactory, MarkingFunctions markingFunctions, - QueryMetricCombiner queryMetricCombiner, LuceneToJexlQueryParser luceneToJexlQueryParser, WebClient.Builder webClientBuilder, - JWTTokenHandler jwtTokenHandler, DnUtils dnUtils) { + QueryMetricCombiner queryMetricCombiner, LuceneToJexlQueryParser luceneToJexlQueryParser, ResponseObjectFactory responseObjectFactory, + WebClient.Builder webClientBuilder, JWTTokenHandler jwtTokenHandler, DnUtils dnUtils) { super(queryMetricHandlerProperties, clientPool, logicFactory, metricFactory, markingFunctions, queryMetricCombiner, luceneToJexlQueryParser, dnUtils); + this.responseObjectFactory = responseObjectFactory; + this.webClient = webClientBuilder.baseUrl(queryMetricHandlerProperties.getQueryServiceUri()).build(); this.jwtTokenHandler = jwtTokenHandler; @@ -90,7 +94,6 @@ protected BaseQueryResponse createAndNext(Query query) throws Exception { return webClient.post() .uri(uriBuilder -> uriBuilder .path("/" + queryMetricHandlerProperties.getQueryMetricsLogic() + "/createAndNext") - .queryParam(QueryParameters.QUERY_POOL, queryMetricHandlerProperties.getQueryPool()) .queryParam(QueryParameters.QUERY_BEGIN, beginDate) .queryParam(QueryParameters.QUERY_END, endDate) .queryParam(QueryParameters.QUERY_LOGIC_NAME, query.getQueryLogicName()) @@ -103,9 +106,10 @@ protected BaseQueryResponse createAndNext(Query query) throws Exception { .queryParam(QueryParameters.QUERY_PARAMS, query.getParameters().stream().map(p -> String.join(":", p.getParameterName(), p.getParameterValue())).collect(Collectors.joining(";"))) .build()) .header("Authorization", bearerHeader) + .header("Pool", queryMetricHandlerProperties.getQueryPool()) .header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE) .retrieve() - .bodyToMono(BaseQueryResponse.class) + .bodyToMono(responseObjectFactory.getEventQueryResponse().getClass()) .block(Duration.ofMillis(queryMetricHandlerProperties.getRemoteQueryTimeoutMillis())); // @formatter:on } catch (IllegalStateException e) { @@ -125,7 +129,7 @@ protected BaseQueryResponse next(String queryId) throws Exception { .header("Authorization", bearerHeader) .header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE) .retrieve() - .bodyToMono(BaseQueryResponse.class) + .bodyToMono(responseObjectFactory.getEventQueryResponse().getClass()) .block(Duration.ofMillis(queryMetricHandlerProperties.getRemoteQueryTimeoutMillis())); // @formatter:on } catch (IllegalStateException e) { diff --git a/service/src/main/java/datawave/microservice/querymetric/handler/ShardTableQueryMetricHandler.java b/service/src/main/java/datawave/microservice/querymetric/handler/ShardTableQueryMetricHandler.java index fec57598..bb041ede 100644 --- a/service/src/main/java/datawave/microservice/querymetric/handler/ShardTableQueryMetricHandler.java +++ b/service/src/main/java/datawave/microservice/querymetric/handler/ShardTableQueryMetricHandler.java @@ -9,6 +9,7 @@ import java.util.Collections; import java.util.Date; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -74,6 +75,7 @@ import datawave.microservice.querymetric.config.QueryMetricHandlerProperties; import datawave.microservice.querymetric.factory.QueryMetricQueryLogicFactory; import datawave.microservice.security.util.DnUtils; +import datawave.query.QueryParameters; import datawave.query.iterator.QueryOptions; import datawave.query.language.parser.jexl.LuceneToJexlQueryParser; import datawave.security.authorization.DatawaveUser; @@ -175,7 +177,7 @@ public void verifyTables() { AbstractColumnBasedHandler handler = new ContentIndexingColumnBasedHandler() { @Override public AbstractContentIngestHelper getContentIndexingDataTypeHelper() { - return getQueryMetricsIngestHelper(false); + return getQueryMetricsIngestHelper(false, Collections.EMPTY_LIST); } }; Map trackingMap = AccumuloClientTracking.getTrackingMap(Thread.currentThread().getStackTrace()); @@ -205,6 +207,11 @@ private void writeMetric(T updated, T stored, long timestamp, boolean delete, Co } public void writeMetric(T updatedQueryMetric, List storedQueryMetrics, long timestamp, boolean delete) throws Exception { + writeMetric(updatedQueryMetric, storedQueryMetrics, timestamp, delete, Collections.EMPTY_LIST); + } + + public void writeMetric(T updatedQueryMetric, List storedQueryMetrics, long timestamp, boolean delete, Collection ignoredFields) + throws Exception { try { TaskAttemptID taskId = new TaskAttemptID(new TaskID(new JobID(JOB_ID, 1), TaskType.MAP, 1), 1); this.accumuloRecordWriterLock.readLock().lock(); @@ -214,7 +221,7 @@ public void writeMetric(T updatedQueryMetric, List storedQueryMetrics, long t ContentIndexingColumnBasedHandler handler = new ContentIndexingColumnBasedHandler() { @Override public AbstractContentIngestHelper getContentIndexingDataTypeHelper() { - return getQueryMetricsIngestHelper(delete); + return getQueryMetricsIngestHelper(delete, ignoredFields); } }; handler.setup(context); @@ -254,7 +261,7 @@ private Mutation getMutation(Key key, Value value) { public Map getEventFields(BaseQueryMetric queryMetric) { // ignore duplicates as none are expected Map eventFields = new HashMap<>(); - ContentQueryMetricsIngestHelper ingestHelper = getQueryMetricsIngestHelper(false); + ContentQueryMetricsIngestHelper ingestHelper = getQueryMetricsIngestHelper(false, Collections.EMPTY_LIST); ingestHelper.setup(conf); Multimap fieldsToWrite = ingestHelper.getEventFieldsToWrite(queryMetric, null); for (Entry entry : fieldsToWrite.entries()) { @@ -350,7 +357,11 @@ public T combineMetrics(T updatedQueryMetric, T cachedQueryMetric, QueryMetricTy } public T getQueryMetric(final String queryId) throws Exception { - List queryMetrics = getQueryMetrics("QUERY_ID == '" + queryId + "'"); + return getQueryMetric(queryId, Collections.emptySet()); + } + + public T getQueryMetric(final String queryId, Collection ignoredFields) throws Exception { + List queryMetrics = getQueryMetrics("QUERY_ID == '" + queryId + "'", ignoredFields); return queryMetrics.isEmpty() ? null : queryMetrics.get(0); } @@ -358,7 +369,7 @@ public Query createQuery() { return new QueryImpl(); } - public List getQueryMetrics(final String query) throws Exception { + public List getQueryMetrics(final String query, Collection ignoredFields) throws Exception { Date end = new Date(); Date begin = DateUtils.setYears(end, 2000); Query queryImpl = createQuery(); @@ -372,7 +383,13 @@ public List getQueryMetrics(final String query) throws Exception { queryImpl.setExpirationDate(DateUtils.addDays(new Date(), 1)); queryImpl.setPagesize(1000); queryImpl.setId(UUID.randomUUID()); - queryImpl.setParameters(ImmutableMap.of(QueryOptions.INCLUDE_GROUPING_CONTEXT, "true")); + Map parameters = new LinkedHashMap<>(); + parameters.put(QueryOptions.INCLUDE_GROUPING_CONTEXT, "true"); + parameters.put(QueryOptions.DATATYPE_FILTER, "querymetrics"); + if (ignoredFields != null && !ignoredFields.isEmpty()) { + parameters.put(QueryOptions.DISALLOWLISTED_FIELDS, StringUtils.join(ignoredFields, ",")); + } + queryImpl.setParameters(parameters); return getQueryMetrics(queryImpl); } @@ -740,8 +757,8 @@ public void reload() { } @Override - public ContentQueryMetricsIngestHelper getQueryMetricsIngestHelper(boolean deleteMode) { - return new ContentQueryMetricsIngestHelper(deleteMode); + public ContentQueryMetricsIngestHelper getQueryMetricsIngestHelper(boolean deleteMode, Collection ignoredFields) { + return new ContentQueryMetricsIngestHelper(deleteMode, ignoredFields); } @Override @@ -776,7 +793,10 @@ public QueryMetricsSummaryResponse getQueryMetricsSummary(Date begin, Date end, query.setPagesize(1000); query.setUserDN(datawaveUserShortName); query.setId(UUID.randomUUID()); - query.setParameters(ImmutableMap.of(QueryOptions.INCLUDE_GROUPING_CONTEXT, "true")); + Map parameters = new LinkedHashMap<>(); + parameters.put(QueryOptions.INCLUDE_GROUPING_CONTEXT, "true"); + parameters.put(QueryParameters.DATATYPE_FILTER_SET, "querymetrics"); + query.setParameters(parameters); List queryMetrics = getQueryMetrics(query); response = processQueryMetricsSummary(queryMetrics, end); diff --git a/service/src/main/java/datawave/microservice/querymetric/handler/SimpleQueryGeometryHandler.java b/service/src/main/java/datawave/microservice/querymetric/handler/SimpleQueryGeometryHandler.java index 59936ddb..961b7795 100644 --- a/service/src/main/java/datawave/microservice/querymetric/handler/SimpleQueryGeometryHandler.java +++ b/service/src/main/java/datawave/microservice/querymetric/handler/SimpleQueryGeometryHandler.java @@ -6,20 +6,21 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; -import org.apache.commons.jexl2.parser.JexlNode; +import org.apache.commons.jexl3.parser.JexlNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import datawave.microservice.query.QueryImpl; import datawave.microservice.querymetric.BaseQueryMetric; +import datawave.microservice.querymetric.QueryGeometry; +import datawave.microservice.querymetric.QueryGeometryResponse; import datawave.microservice.querymetric.config.QueryMetricHandlerProperties; import datawave.query.jexl.JexlASTHelper; import datawave.query.jexl.visitors.GeoFeatureVisitor; import datawave.query.language.parser.ParseException; import datawave.query.language.parser.jexl.LuceneToJexlQueryParser; -import datawave.webservice.query.map.QueryGeometry; -import datawave.webservice.query.map.QueryGeometryResponse; /** * This class is used to extract query geometries from the query metrics in an effort to provide those geometries for subsequent display to the user. @@ -40,7 +41,7 @@ public SimpleQueryGeometryHandler(QueryMetricHandlerProperties queryMetricHandle @Override public QueryGeometryResponse getQueryGeometryResponse(String id, List metrics) { - QueryGeometryResponse response = new QueryMetricGeometryResponse(id, basemaps); + QueryGeometryResponse response = new QueryGeometryResponse(id, basemaps); if (metrics != null) { Set queryGeometries = new LinkedHashSet<>(); @@ -49,7 +50,8 @@ public QueryGeometryResponse getQueryGeometryResponse(String id, List features = GeoFeatureVisitor.getGeoFeatures(queryNode, isLuceneQuery); + queryGeometries.addAll(features.stream().map(f -> new QueryGeometry(f.getFunction(), f.getGeometry())).collect(Collectors.toList())); } catch (Exception e) { log.error(e.getMessage(), e); response.addException(new Exception("Unable to parse the geo features")); diff --git a/service/src/main/java/datawave/microservice/querymetric/persistence/AccumuloMapStore.java b/service/src/main/java/datawave/microservice/querymetric/persistence/AccumuloMapStore.java index e4a99b9e..4433b356 100644 --- a/service/src/main/java/datawave/microservice/querymetric/persistence/AccumuloMapStore.java +++ b/service/src/main/java/datawave/microservice/querymetric/persistence/AccumuloMapStore.java @@ -2,13 +2,15 @@ import static java.util.concurrent.TimeUnit.MINUTES; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import javax.annotation.PreDestroy; @@ -22,14 +24,12 @@ import com.codahale.metrics.SlidingTimeWindowArrayReservoir; import com.codahale.metrics.Timer; -import com.google.common.cache.CacheBuilder; -import com.hazelcast.map.IMap; +import com.github.benmanes.caffeine.cache.Caffeine; import com.hazelcast.map.MapLoader; import com.hazelcast.map.MapStore; import com.hazelcast.map.MapStoreFactory; import datawave.microservice.querymetric.BaseQueryMetric; -import datawave.microservice.querymetric.MergeLockLifecycleListener; import datawave.microservice.querymetric.QueryMetricType; import datawave.microservice.querymetric.QueryMetricUpdate; import datawave.microservice.querymetric.QueryMetricUpdateHolder; @@ -38,13 +38,25 @@ @Component("store") @ConditionalOnProperty(name = "hazelcast.server.enabled") public class AccumuloMapStore extends AccumuloMapLoader implements MapStore> { + // A list of fields that won't change once written. There is no need to retrieve these fields from Accumulo + // Any change here must be accounted for in ContentQueryMetricIngestHelper. + public static final List ignoreFieldsOnQuery = Arrays.asList("POSITIVE_SELECTORS", "NEGATIVE_SELECTORS", "AUTHORIZATIONS", "BEGIN_DATE", "END_DATE", + "PARAMETERS", "PROXY_SERVERS", "PREDICTION", "QUERY", "QUERY_LOGIC", "QUERY_NAME", "QUERY_TYPE", "USER", "USER_DN", "VERSION", + "PAGE_METRICS"); + // Exclude PREDICTION, PAGE_METRICS which we don't want to pull from Accumulo but which can change after query creation + public static final List ignoreFieldsOnWrite = new ArrayList<>(); + + static { + AccumuloMapStore.ignoreFieldsOnWrite.addAll(ignoreFieldsOnQuery); + AccumuloMapStore.ignoreFieldsOnWrite.removeAll(Arrays.asList("PREDICTION", "PAGE_METRICS")); + } private static AccumuloMapStore instance; private Logger log = LoggerFactory.getLogger(AccumuloMapStore.class); - private IMap lastWrittenQueryMetricCache; - private MergeLockLifecycleListener mergeLock; - private com.google.common.cache.Cache failures; + private Cache lastWrittenQueryMetricCache; + private com.github.benmanes.caffeine.cache.Cache failures; private Timer writeTimer = new Timer(new SlidingTimeWindowArrayReservoir(1, MINUTES)); + private Timer readTimer = new Timer(new SlidingTimeWindowArrayReservoir(1, MINUTES)); private boolean shuttingDown = false; public static class Factory implements MapStoreFactory { @@ -55,10 +67,13 @@ public MapLoader newMapStore(String mapName, Properties } @Autowired - public AccumuloMapStore(ShardTableQueryMetricHandler handler, MergeLockLifecycleListener mergeLock) { + public AccumuloMapStore(ShardTableQueryMetricHandler handler) { this.handler = handler; - this.mergeLock = mergeLock; - this.failures = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.MINUTES).build(); + // @formatter:off + this.failures = Caffeine.newBuilder() + .expireAfterWrite(60, TimeUnit.MINUTES) + .build(); + // @formatter:on AccumuloMapStore.instance = this; } @@ -75,7 +90,7 @@ public void shutdown() { } public void setLastWrittenQueryMetricCache(Cache lastWrittenQueryMetricCache) { - this.lastWrittenQueryMetricCache = (IMap) lastWrittenQueryMetricCache.getNativeCache(); + this.lastWrittenQueryMetricCache = lastWrittenQueryMetricCache; } @Override @@ -107,16 +122,34 @@ public void storeWithRetry(QueryMetricUpdateHolder queryMetricUpdate) { } public void store(QueryMetricUpdateHolder queryMetricUpdate) throws Exception { + String queryId = queryMetricUpdate.getMetric().getQueryId(); T updatedMetric = null; - this.mergeLock.lock(); try { updatedMetric = (T) queryMetricUpdate.getMetric().duplicate(); QueryMetricType metricType = queryMetricUpdate.getMetricType(); QueryMetricUpdateHolder lastQueryMetricUpdate = null; + final List ignoredFields = new ArrayList<>(); if (!queryMetricUpdate.isNewMetric()) { - lastQueryMetricUpdate = (QueryMetricUpdateHolder) lastWrittenQueryMetricCache.get(queryId); + lastQueryMetricUpdate = lastWrittenQueryMetricCache.get(queryId, QueryMetricUpdateHolder.class); + if (lastQueryMetricUpdate == null) { + log.debug("getting metric {} from accumulo", queryId); + Timer.Context readTimerContext = readTimer.time(); + try { + T m = handler.getQueryMetric(queryId, ignoreFieldsOnQuery); + if (m != null) { + // these fields will not be populated in the returned metric, + // so we should not compare them later for writing mutations + ignoredFields.addAll(ignoreFieldsOnWrite); + lastQueryMetricUpdate = new QueryMetricUpdateHolder(m, metricType); + } + } catch (Exception e) { + log.error(e.getMessage(), e); + } finally { + readTimerContext.stop(); + } + } } if (lastQueryMetricUpdate != null) { @@ -146,15 +179,13 @@ public void store(QueryMetricUpdateHolder queryMetricUpdate) throws Exception } long writeTimestamp = deleteTimestamp + 1; updatedMetric.setNumUpdates(numUpdates + 1); - updatedMetric.setLastUpdated(new Date(updatedMetric.getLastUpdated().getTime() + 1)); if (lastQueryMetric.getLastUpdated() != null) { - handler.writeMetric(updatedMetric, Collections.singletonList(lastQueryMetric), deleteTimestamp, true); + handler.writeMetric(updatedMetric, Collections.singletonList(lastQueryMetric), deleteTimestamp, true, ignoredFields); } - handler.writeMetric(updatedMetric, Collections.singletonList(lastQueryMetric), writeTimestamp, false); + handler.writeMetric(updatedMetric, Collections.singletonList(lastQueryMetric), writeTimestamp, false, ignoredFields); } else { - updatedMetric.setLastUpdated(updatedMetric.getCreateDate()); - handler.writeMetric(updatedMetric, Collections.emptyList(), updatedMetric.getCreateDate().getTime(), false); + handler.writeMetric(updatedMetric, Collections.emptyList(), updatedMetric.getCreateDate().getTime(), false, ignoredFields); } if (log.isTraceEnabled()) { log.trace("writing metric to accumulo: " + queryId + " - " + queryMetricUpdate.getMetric()); @@ -162,8 +193,8 @@ public void store(QueryMetricUpdateHolder queryMetricUpdate) throws Exception log.debug("writing metric to accumulo: " + queryId); } - lastWrittenQueryMetricCache.set(queryId, new QueryMetricUpdateHolder(updatedMetric)); - queryMetricUpdate.persisted(); + lastWrittenQueryMetricCache.put(queryId, new QueryMetricUpdateHolder(updatedMetric)); + queryMetricUpdate.setPersisted(); failures.invalidate(queryId); } finally { if (queryMetricUpdate.getMetricType().equals(QueryMetricType.DISTRIBUTED)) { @@ -178,7 +209,6 @@ public void store(QueryMetricUpdateHolder queryMetricUpdate) throws Exception queryMetricUpdate.getMetric().setFiRanges(updatedMetric.getFiRanges()); } } - this.mergeLock.unlock(); } } @@ -186,8 +216,8 @@ private boolean retryOnException(QueryMetricUpdate update, Exception e) { String queryId = update.getMetric().getQueryId(); Integer numFailures = 1; try { - numFailures = (Integer) this.failures.get(queryId, () -> 0) + 1; - } catch (ExecutionException e1) { + numFailures = (Integer) this.failures.get(queryId, o -> 0) + 1; + } catch (Exception e1) { log.error(e1.getMessage(), e1); } if (numFailures < 3) { @@ -243,4 +273,8 @@ public void deleteAll(Collection keys) { public Timer getWriteTimer() { return writeTimer; } + + public Timer getReadTimer() { + return readTimer; + } } diff --git a/service/src/main/java/datawave/microservice/querymetric/persistence/MetricCacheListener.java b/service/src/main/java/datawave/microservice/querymetric/persistence/MetricCacheListener.java new file mode 100644 index 00000000..abaa8960 --- /dev/null +++ b/service/src/main/java/datawave/microservice/querymetric/persistence/MetricCacheListener.java @@ -0,0 +1,48 @@ +package datawave.microservice.querymetric.persistence; + +import java.text.SimpleDateFormat; + +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.github.benmanes.caffeine.cache.RemovalListener; + +import datawave.microservice.querymetric.BaseQueryMetric; +import datawave.microservice.querymetric.QueryMetricUpdate; + +public class MetricCacheListener implements RemovalListener { + + private Logger log = LoggerFactory.getLogger(MetricCacheListener.class); + + private final String cacheName; + + public MetricCacheListener(String cacheName) { + this.cacheName = cacheName; + } + + @Override + public void onRemoval(@Nullable Object key, @Nullable Object value, @NonNull RemovalCause cause) { + if (!cause.equals(RemovalCause.REPLACED)) { + SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd HHmmss"); + String queryId = (String) key; + StringBuilder sb = new StringBuilder(); + sb.append(String.format("removalCause=%s, key=%s", cause, queryId)); + BaseQueryMetric metric = null; + if (value != null && value instanceof QueryMetricUpdate) { + metric = ((QueryMetricUpdate) value).getMetric(); + sb.append(String.format(" metric[createDate=%s, host=%s, lifecycle=%s, numPages=%s, numUpdates=%s]", sdf.format(metric.getCreateDate()), + metric.getHost(), metric.getLifecycle(), metric.getPageTimes().size(), metric.getNumUpdates())); + } + if (cause.equals(RemovalCause.SIZE) && metric != null && !metric.isLifecycleFinal()) { + // reaching max cache size and evicting metrics that are not done writing + log.info(cacheName + " " + sb); + } else { + // more routine - evicting due to expired time + log.debug(cacheName + " " + sb); + } + } + } +} diff --git a/service/src/main/resources/config/bootstrap.yml b/service/src/main/resources/config/bootstrap.yml index bb3cf0fe..e6d49725 100644 --- a/service/src/main/resources/config/bootstrap.yml +++ b/service/src/main/resources/config/bootstrap.yml @@ -13,7 +13,13 @@ spring: max-attempts: 60 uri: '${CONFIG_SERVER_URL:http://configuration:8888/configserver}' allow-override: true - + stream: + rabbit: + bindings: + queryMetricSource-out-0: + producer: + # Note: This must match CONFIRM_ACK_CHANNEL in QueryMetricOperations.java or producer confirms will not work. + confirmAckChannel: 'confirmAckChannel' datawave: table: cache: diff --git a/service/src/test/java/datawave/microservice/querymetric/CorrelatorTest.java b/service/src/test/java/datawave/microservice/querymetric/CorrelatorTest.java new file mode 100644 index 00000000..22537571 --- /dev/null +++ b/service/src/test/java/datawave/microservice/querymetric/CorrelatorTest.java @@ -0,0 +1,150 @@ +package datawave.microservice.querymetric; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +import datawave.microservice.querymetric.config.CorrelatorProperties; +import datawave.microservice.querymetric.function.QueryMetricConsumer; + +@ExtendWith(SpringExtension.class) +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +@ActiveProfiles({"CorrelatorTest", "QueryMetricTest", "hazelcast-writebehind", "correlator"}) +public class CorrelatorTest extends QueryMetricTestBase { + + @Autowired + private QueryMetricOperations queryMetricOperations; + + @Autowired + private CorrelatorProperties correlatorProperties; + + @Autowired + private Correlator correlator; + + @Autowired + private QueryMetricConsumer queryMetricConsumer; + + @BeforeEach + public void setup() { + super.setup(); + } + + @AfterEach + public void cleanup() { + super.cleanup(); + this.correlator.shutdown(false); + } + + @Test + public void TestSizeLimitedQueue() throws Exception { + this.correlatorProperties.setMaxCorrelationTimeMs(30000); + this.correlatorProperties.setMaxCorrelationQueueSize(95); + testMetricsCorrelated(100, 100); + } + + @Test + public void TestTimeLimitedQueue() throws Exception { + this.correlatorProperties.setMaxCorrelationTimeMs(500); + this.correlatorProperties.setMaxCorrelationQueueSize(100); + testMetricsCorrelated(100, 100); + } + + @Test + public void TestNoStorageUntilShutdown() throws Exception { + this.correlatorProperties.setMaxCorrelationTimeMs(60000); + this.correlatorProperties.setMaxCorrelationQueueSize(100); + testMetricsCorrelated(100, 10); + } + + @Test + public void TestManyQueries() throws Exception { + this.correlatorProperties.setMaxCorrelationTimeMs(500); + this.correlatorProperties.setMaxCorrelationQueueSize(100); + testMetricsCorrelated(1000, 20); + } + + public void testMetricsCorrelated(int numMetrics, int maxPages) throws Exception { + List updates = new ArrayList<>(); + List metrics = new ArrayList<>(); + for (int x = 0; x < numMetrics; x++) { + BaseQueryMetric m = createMetric(); + metrics.add(m); + updates.add(m.duplicate()); + } + + List shuffledUpdates = new ArrayList<>(); + Random r = new Random(); + for (BaseQueryMetric m : metrics) { + int numPages = r.nextInt(maxPages); + BaseQueryMetric m2 = m; + for (int x = 0; x < numPages; x++) { + m2 = m2.duplicate(); + m.setLifecycle(BaseQueryMetric.Lifecycle.RESULTS); + m2.setLifecycle(BaseQueryMetric.Lifecycle.RESULTS); + BaseQueryMetric.PageMetric pageMetric = new BaseQueryMetric.PageMetric("localhost", 100, 100, 100, 100, -1, -1, -1, -1); + m.addPageMetric(pageMetric); + m2.addPageMetric(pageMetric); + shuffledUpdates.add(m2); + } + } + + // randomize the order of metric updates + Collections.shuffle(shuffledUpdates); + updates.addAll(shuffledUpdates); + + LinkedBlockingDeque updateDeque = new LinkedBlockingDeque<>(); + updateDeque.addAll(updates); + ExecutorService executorService = Executors.newFixedThreadPool(20); + for (int x = 0; x < 20; x++) { + Runnable runnable = () -> { + BaseQueryMetric m; + do { + m = updateDeque.poll(); + if (m != null) { + queryMetricConsumer.accept(new QueryMetricUpdate(m, QueryMetricType.COMPLETE)); + } + } while (m != null); + }; + executorService.submit(runnable); + } + log.debug("done submitting metrics"); + executorService.shutdown(); + boolean completed = executorService.awaitTermination(2, TimeUnit.MINUTES); + assertTrue(completed, "executor tasks completed"); + // flush the correlator + this.correlator.shutdown(true); + while (this.queryMetricOperations.isTimedCorrelationInProgress()) { + try { + Thread.sleep(50); + } catch (InterruptedException e) { + + } + } + this.queryMetricOperations.ensureUpdatesProcessed(false); + for (BaseQueryMetric m : metrics) { + String queryId = m.getQueryId(); + ensureDataStored(incomingQueryMetricsCache, queryId); + QueryMetricUpdate metricUpdate = incomingQueryMetricsCache.get(queryId, QueryMetricUpdateHolder.class); + assertNotNull(metricUpdate, "missing metric " + queryId); + assertEquals(m, metricUpdate.getMetric(), "incomingQueryMetricsCache metric wrong for id:" + m.getQueryId()); + } + } +} diff --git a/service/src/test/java/datawave/microservice/querymetric/HazelcastCachingTest.java b/service/src/test/java/datawave/microservice/querymetric/HazelcastCachingTest.java index 0a14792c..8239a525 100644 --- a/service/src/test/java/datawave/microservice/querymetric/HazelcastCachingTest.java +++ b/service/src/test/java/datawave/microservice/querymetric/HazelcastCachingTest.java @@ -3,8 +3,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; -import java.util.Collections; - import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -30,21 +28,6 @@ public void cleanup() { super.cleanup(); } - @Test - public void TestReadThroughCache() { - - try { - String queryId = createQueryId(); - BaseQueryMetric m = createMetric(queryId); - shardTableQueryMetricHandler.writeMetric(m, Collections.emptyList(), m.getCreateDate().getTime(), false); - BaseQueryMetric metricFromReadThroughCache = lastWrittenQueryMetricCache.get(queryId, QueryMetricUpdate.class).getMetric(); - metricAssertEquals("read through cache failed", m, metricFromReadThroughCache); - } catch (Exception e) { - log.error(e.getMessage(), e); - fail(e.getMessage()); - } - } - @Test public void TestWriteThroughCache() { diff --git a/service/src/test/java/datawave/microservice/querymetric/NonWebApplicationMessagingTest.java b/service/src/test/java/datawave/microservice/querymetric/NonWebApplicationMessagingTest.java index eb9fdc26..52c2097e 100644 --- a/service/src/test/java/datawave/microservice/querymetric/NonWebApplicationMessagingTest.java +++ b/service/src/test/java/datawave/microservice/querymetric/NonWebApplicationMessagingTest.java @@ -83,7 +83,7 @@ public CacheManager queryMetricCacheManager() { @Bean public QueryMetricOperationsStats queryMetricOperationStats() { - return new QueryMetricOperationsStats(new TimelyProperties(), null, queryMetricCacheManager(), null); + return new QueryMetricOperationsStats(new TimelyProperties(), null, queryMetricCacheManager(), null, null); } @Bean diff --git a/service/src/test/java/datawave/microservice/querymetric/QueryMetricConsistencyTest.java b/service/src/test/java/datawave/microservice/querymetric/QueryMetricConsistencyTest.java index a85d3b48..d67e8fdd 100644 --- a/service/src/test/java/datawave/microservice/querymetric/QueryMetricConsistencyTest.java +++ b/service/src/test/java/datawave/microservice/querymetric/QueryMetricConsistencyTest.java @@ -2,6 +2,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.fail; import java.util.ArrayList; @@ -130,6 +131,41 @@ public void OutOfOrderLifecycleTest() throws Exception { assertNoDuplicateFields(queryId); } + @Test + public void ChangePlanTest() throws Exception { + int port = this.webServicePort; + String queryId = createQueryId(); + BaseQueryMetric m = createMetric(queryId); + UriComponents metricUri = UriComponentsBuilder.newInstance().scheme("https").host("localhost").port(port).path(String.format(getMetricsUrl, queryId)) + .build(); + m.setPlan("InitialPlan"); + // @formatter:off + client.submit(new QueryMetricClient.Request.Builder() + .withMetric(m) + .withMetricType(QueryMetricType.COMPLETE) + .withUser(this.adminUser) + .build()); + // @formatter:on + m.setPlan("RevisedPlan"); + m.setLastUpdated(new Date(m.getLastUpdated().getTime() + 1)); + // @formatter:off + client.submit(new QueryMetricClient.Request.Builder() + .withMetric(m) + .withMetricType(QueryMetricType.COMPLETE) + .withUser(this.adminUser) + .build()); + // @formatter:on + + HttpEntity metricRequestEntity = createRequestEntity(null, this.adminUser, null); + ResponseEntity metricResponse = this.restTemplate.exchange(metricUri.toUri(), HttpMethod.GET, metricRequestEntity, + BaseQueryMetricListResponse.class); + + assertEquals(1, metricResponse.getBody().getNumResults()); + BaseQueryMetric returnedMetric = (BaseQueryMetric) metricResponse.getBody().getResult().get(0); + assertEquals(m.getPlan(), returnedMetric.getPlan(), "plan incorrect"); + assertNoDuplicateFields(queryId); + } + @Test public void DistributedUpdateTest() throws Exception { int port = this.webServicePort; @@ -244,6 +280,26 @@ public void ToMetricTest() { metricAssertEquals("metrics are not equal", queryMetric, newMetric); } + /* + * Check that the last updated time (which is used to calculate the elapsed time) does not get changed when being written to Accumulo + */ + @Test + public void LastUpdatedTest() { + QueryMetric queryMetric = (QueryMetric) createMetric(); + String queryId = queryMetric.getQueryId(); + Date lastUpdated = new Date(queryMetric.getCreateDate().getTime() + 60000); + queryMetric.setLastUpdated(lastUpdated); + queryMetric.setLifecycle(BaseQueryMetric.Lifecycle.CLOSED); + incomingQueryMetricsCache.put(queryId, new QueryMetricUpdateHolder(queryMetric.duplicate())); + ensureDataWritten(incomingQueryMetricsCache, lastWrittenQueryMetricCache, queryId); + + QueryMetricUpdateHolder storedMetricHolder = lastWrittenQueryMetricCache.get(queryId, QueryMetricUpdateHolder.class); + assertNotNull(storedMetricHolder, "storedQueryMetric is null"); + metricAssertEquals("metric should not change", queryMetric, storedMetricHolder.getMetric()); + assertEquals(60000, storedMetricHolder.getMetric().getElapsedTime(), "Elapsed time incorrect"); + assertEquals(lastUpdated, storedMetricHolder.getMetric().getLastUpdated(), "Last updated incorrect"); + } + @Test public void CombineMetricsTest() throws Exception { QueryMetric storedQueryMetric = (QueryMetric) createMetric(); @@ -313,7 +369,7 @@ public void DuplicateAccumuloEntryTest() throws Exception { String queryId = createQueryId(); QueryMetric storedQueryMetric = (QueryMetric) createMetric(queryId); QueryMetric updatedQueryMetric = (QueryMetric) storedQueryMetric.duplicate(); - updatedQueryMetric.setLifecycle(BaseQueryMetric.Lifecycle.CLOSED); + updatedQueryMetric.setLifecycle(BaseQueryMetric.Lifecycle.RESULTS); updatedQueryMetric.setNumResults(2000); updatedQueryMetric.setDocRanges(400); updatedQueryMetric.setNextCount(400); diff --git a/service/src/test/java/datawave/microservice/querymetric/QueryMetricOperationsTest.java b/service/src/test/java/datawave/microservice/querymetric/QueryMetricOperationsTest.java index 1484596e..9141c178 100644 --- a/service/src/test/java/datawave/microservice/querymetric/QueryMetricOperationsTest.java +++ b/service/src/test/java/datawave/microservice/querymetric/QueryMetricOperationsTest.java @@ -36,8 +36,8 @@ public void MetricStoredCorrectlyInCachesAndAccumulo() throws Exception { .build()); // @formatter:on ensureDataWritten(incomingQueryMetricsCache, lastWrittenQueryMetricCache, queryId); - metricAssertEquals("lastWrittenQueryMetricCache metric wrong", m, lastWrittenQueryMetricCache.get(queryId, QueryMetricUpdate.class).getMetric()); - metricAssertEquals("incomingQueryMetricsCache metric wrong", m, incomingQueryMetricsCache.get(queryId, QueryMetricUpdate.class).getMetric()); + metricAssertEquals("lastWrittenQueryMetricCache metric wrong", m, lastWrittenQueryMetricCache.get(queryId, QueryMetricUpdateHolder.class).getMetric()); + metricAssertEquals("incomingQueryMetricsCache metric wrong", m, incomingQueryMetricsCache.get(queryId, QueryMetricUpdateHolder.class).getMetric()); metricAssertEquals("accumulo metric wrong", m, shardTableQueryMetricHandler.getQueryMetric(queryId)); } @@ -60,14 +60,15 @@ public void MultipleMetricsStoredCorrectlyInCachesAndAccumulo() throws Exception metrics.forEach((m) -> { String queryId = m.getQueryId(); ensureDataWritten(incomingQueryMetricsCache, lastWrittenQueryMetricCache, queryId); - metricAssertEquals("lastWrittenQueryMetricCache metric wrong", m, lastWrittenQueryMetricCache.get(queryId, QueryMetricUpdate.class).getMetric()); + metricAssertEquals("lastWrittenQueryMetricCache metric wrong", m, + lastWrittenQueryMetricCache.get(queryId, QueryMetricUpdateHolder.class).getMetric()); try { metricAssertEquals("accumulo metric wrong", m, shardTableQueryMetricHandler.getQueryMetric(queryId)); } catch (Exception e) { log.error(e.getMessage(), e); fail(e.getMessage()); } - metricAssertEquals("incomingQueryMetricsCache metric wrong", m, incomingQueryMetricsCache.get(queryId, QueryMetricUpdate.class).getMetric()); + metricAssertEquals("incomingQueryMetricsCache metric wrong", m, incomingQueryMetricsCache.get(queryId, QueryMetricUpdateHolder.class).getMetric()); }); } @@ -90,8 +91,9 @@ public void MultipleMetricsAsListStoredCorrectlyInCachesAndAccumulo() throws Exc metrics.forEach((m) -> { String queryId = m.getQueryId(); ensureDataWritten(incomingQueryMetricsCache, lastWrittenQueryMetricCache, queryId); - metricAssertEquals("lastWrittenQueryMetricCache metric wrong", m, lastWrittenQueryMetricCache.get(queryId, QueryMetricUpdate.class).getMetric()); - metricAssertEquals("incomingQueryMetricsCache metric wrong", m, incomingQueryMetricsCache.get(queryId, QueryMetricUpdate.class).getMetric()); + metricAssertEquals("lastWrittenQueryMetricCache metric wrong", m, + lastWrittenQueryMetricCache.get(queryId, QueryMetricUpdateHolder.class).getMetric()); + metricAssertEquals("incomingQueryMetricsCache metric wrong", m, incomingQueryMetricsCache.get(queryId, QueryMetricUpdateHolder.class).getMetric()); try { metricAssertEquals("accumulo metric wrong", m, shardTableQueryMetricHandler.getQueryMetric(queryId)); } catch (Exception e) { diff --git a/service/src/test/java/datawave/microservice/querymetric/QueryMetricTestBase.java b/service/src/test/java/datawave/microservice/querymetric/QueryMetricTestBase.java index ebaa0a3c..2bff3ab3 100644 --- a/service/src/test/java/datawave/microservice/querymetric/QueryMetricTestBase.java +++ b/service/src/test/java/datawave/microservice/querymetric/QueryMetricTestBase.java @@ -1,7 +1,6 @@ package datawave.microservice.querymetric; import static datawave.microservice.querymetric.config.HazelcastMetricCacheConfiguration.INCOMING_METRICS; -import static datawave.microservice.querymetric.config.HazelcastMetricCacheConfiguration.LAST_WRITTEN_METRICS; import static datawave.security.authorization.DatawaveUser.UserType.USER; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -129,9 +128,12 @@ public class QueryMetricTestBase { @Autowired private DnUtils dnUtils; - protected Cache incomingQueryMetricsCache; + @Autowired + @Qualifier("lastWrittenQueryMetrics") protected Cache lastWrittenQueryMetricCache; + protected Cache incomingQueryMetricsCache; + @LocalServerPort protected int webServicePort; @@ -169,7 +171,6 @@ public void setup() { QueryMetricTestBase.isHazelCast = cacheManager instanceof HazelcastCacheManager; QueryMetricTestBase.staticCacheManager = cacheManager; this.incomingQueryMetricsCache = cacheManager.getCache(INCOMING_METRICS); - this.lastWrittenQueryMetricCache = cacheManager.getCache(LAST_WRITTEN_METRICS); this.shardTableQueryMetricHandler.verifyTables(); BaseQueryMetric m = createMetric(); // this is to ensure that the QueryMetrics_m table @@ -520,7 +521,7 @@ protected void ensureDataStored(Cache incomingCache, String queryId) { found = hzCache.containsKey(queryId); if (!found) { try { - Thread.sleep(250); + Thread.sleep(50); } catch (InterruptedException e) {} } } @@ -530,14 +531,13 @@ protected void ensureDataWritten(Cache incomingCache, Cache lastWrittenCache, St long now = System.currentTimeMillis(); Config config = ((HazelcastCacheManager) this.cacheManager).getHazelcastInstance().getConfig(); MapStoreConfig mapStoreConfig = config.getMapConfig(incomingCache.getName()).getMapStoreConfig(); - int writeDelaySeconds = Math.min(mapStoreConfig.getWriteDelaySeconds(), 1000); + int writeDelaySeconds = 2 * Math.min(mapStoreConfig.getWriteDelaySeconds(), 1000); boolean found = false; - IMap hzCache = ((IMap) lastWrittenCache.getNativeCache()); while (!found && System.currentTimeMillis() < (now + (1000 * (writeDelaySeconds + 1)))) { - found = hzCache.containsKey(queryId); + found = lastWrittenCache.get(queryId, QueryMetricUpdateHolder.class) != null; if (!found) { try { - Thread.sleep(250); + Thread.sleep(50); } catch (InterruptedException e) {} } } @@ -627,7 +627,7 @@ public QueryMetricSupplier testQueryMetricSource(@Lazy QueryMetricOperations que return new QueryMetricSupplier() { @Override public boolean send(Message queryMetricUpdate) { - queryMetricOperations.storeMetric(queryMetricUpdate.getPayload()); + queryMetricOperations.storeMetricUpdate(new QueryMetricUpdateHolder(queryMetricUpdate.getPayload())); return true; } }; diff --git a/service/src/test/java/datawave/microservice/querymetric/config/AlternateContentQueryMetricsIngestHelper.java b/service/src/test/java/datawave/microservice/querymetric/config/AlternateContentQueryMetricsIngestHelper.java index a68e0b17..5e85267e 100644 --- a/service/src/test/java/datawave/microservice/querymetric/config/AlternateContentQueryMetricsIngestHelper.java +++ b/service/src/test/java/datawave/microservice/querymetric/config/AlternateContentQueryMetricsIngestHelper.java @@ -1,16 +1,23 @@ package datawave.microservice.querymetric.config; +import java.util.Collection; + import com.google.common.collect.Multimap; import datawave.microservice.querymetric.handler.ContentQueryMetricsIngestHelper; public class AlternateContentQueryMetricsIngestHelper extends ContentQueryMetricsIngestHelper { - public AlternateContentQueryMetricsIngestHelper(boolean deleteMode) { - super(deleteMode, new HelperDelegate()); + public AlternateContentQueryMetricsIngestHelper(boolean deleteMode, Collection ignoredFields) { + super(deleteMode, new HelperDelegate(ignoredFields)); } private static class HelperDelegate extends ContentQueryMetricsIngestHelper.HelperDelegate { + + public HelperDelegate(Collection ignoredFields) { + super(ignoredFields); + } + @Override protected void putExtendedFieldsToWrite(AlternateQueryMetric updated, AlternateQueryMetric stored, Multimap fields) { if (isFirstWrite(updated.getExtraField(), stored == null ? null : stored.getExtraField())) { diff --git a/service/src/test/java/datawave/microservice/querymetric/config/AlternateShardTableQueryMetricHandler.java b/service/src/test/java/datawave/microservice/querymetric/config/AlternateShardTableQueryMetricHandler.java index 37a43aed..bd1e89ff 100644 --- a/service/src/test/java/datawave/microservice/querymetric/config/AlternateShardTableQueryMetricHandler.java +++ b/service/src/test/java/datawave/microservice/querymetric/config/AlternateShardTableQueryMetricHandler.java @@ -1,5 +1,6 @@ package datawave.microservice.querymetric.config; +import java.util.Collection; import java.util.List; import org.springframework.beans.factory.annotation.Qualifier; @@ -27,8 +28,8 @@ public AlternateShardTableQueryMetricHandler(QueryMetricHandlerProperties queryM } @Override - public ContentQueryMetricsIngestHelper getQueryMetricsIngestHelper(boolean deleteMode) { - return new AlternateContentQueryMetricsIngestHelper(deleteMode); + public ContentQueryMetricsIngestHelper getQueryMetricsIngestHelper(boolean deleteMode, Collection ignoredFields) { + return new AlternateContentQueryMetricsIngestHelper(deleteMode, ignoredFields); } @Override diff --git a/service/src/test/java/datawave/microservice/querymetric/handler/SimpleQueryGeometryHandlerTest.java b/service/src/test/java/datawave/microservice/querymetric/handler/SimpleQueryGeometryHandlerTest.java index d0029550..662fc72c 100644 --- a/service/src/test/java/datawave/microservice/querymetric/handler/SimpleQueryGeometryHandlerTest.java +++ b/service/src/test/java/datawave/microservice/querymetric/handler/SimpleQueryGeometryHandlerTest.java @@ -13,11 +13,11 @@ import org.junit.jupiter.api.Test; import datawave.microservice.query.QueryImpl; +import datawave.microservice.querymetric.QueryGeometry; +import datawave.microservice.querymetric.QueryGeometryResponse; import datawave.microservice.querymetric.QueryMetric; import datawave.microservice.querymetric.config.QueryMetricHandlerProperties; import datawave.webservice.query.exception.QueryExceptionType; -import datawave.webservice.query.map.QueryGeometry; -import datawave.webservice.query.map.QueryGeometryResponse; public class SimpleQueryGeometryHandlerTest { diff --git a/service/src/test/resources/config/application-correlator.yml b/service/src/test/resources/config/application-correlator.yml new file mode 100644 index 00000000..77a0d5a1 --- /dev/null +++ b/service/src/test/resources/config/application-correlator.yml @@ -0,0 +1,5 @@ +datawave: + query: + metric: + correlator: + enabled: true diff --git a/service/src/test/resources/config/application-hazelcast-writebehind.yml b/service/src/test/resources/config/application-hazelcast-writebehind.yml index b3d904c2..204efa74 100644 --- a/service/src/test/resources/config/application-hazelcast-writebehind.yml +++ b/service/src/test/resources/config/application-hazelcast-writebehind.yml @@ -31,21 +31,4 @@ hazelcast: 1000 - - OBJECT - - 1 - - 600 - - 3600 - com.hazelcast.spi.merge.LatestUpdateMergePolicy - true - - - - datawave.microservice.querymetric.persistence.AccumuloMapLoader$Factory - - diff --git a/service/src/test/resources/config/application-hazelcast-writethrough.yml b/service/src/test/resources/config/application-hazelcast-writethrough.yml index ba12192a..f42e8707 100644 --- a/service/src/test/resources/config/application-hazelcast-writethrough.yml +++ b/service/src/test/resources/config/application-hazelcast-writethrough.yml @@ -31,21 +31,4 @@ hazelcast: 1000 - - OBJECT - - 1 - - 3600 - - 7200 - com.hazelcast.spi.merge.LatestUpdateMergePolicy - true - - - - datawave.microservice.querymetric.persistence.AccumuloMapLoader$Factory - - diff --git a/service/src/test/resources/config/application.yml b/service/src/test/resources/config/application.yml index ee4768bc..46508a65 100644 --- a/service/src/test/resources/config/application.yml +++ b/service/src/test/resources/config/application.yml @@ -59,7 +59,6 @@ datawave: username: ${warehouse-cluster.accumulo.username} password: ${warehouse-cluster.accumulo.password} accumuloClientPoolSize: 16 - mapStoreWriteThreads: 1 numShards: 10 fieldLengthThreshold: 4049 shardTableName: QueryMetrics_e @@ -81,11 +80,16 @@ datawave: remoteQueryTimeout: 10 remoteQueryTimeUnit: SECONDS maxReadMilliseconds: 10000 + cache: + lastWrittenQueryMetrics: + maximumSize: 5000 + ttlSeconds: 600 client: enabled: true transport: message host: localhost port: ${server.port} + confirmAckEnabled: false timely: enabled: false host: localhost @@ -96,6 +100,9 @@ datawave: logServiceStatsRateMs: 300000 publishServiceStatsToTimelyRateMs: 60000 publishQueryStatsToTimelyRateMs: 60000 + confirmAckEnabled: false + correlator: + enabled: false metadata: all-metadata-auths: