Skip to content

Commit

Permalink
Merge pull request #19 from salesforce/ashcoder.longIdSupport
Browse files Browse the repository at this point in the history
Ashcoder.long id support
  • Loading branch information
ashnacoder committed Dec 8, 2020
2 parents 73aa048 + db93a0f commit f91b9f7
Show file tree
Hide file tree
Showing 41 changed files with 965 additions and 219 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;
Expand Down Expand Up @@ -59,6 +60,10 @@ public class CarbonjAdmin

private final NameUtils nameUtils;

@Value( "${metrics.store.longId:false}" )
private boolean longId;


private Supplier<RuntimeException> notConfigured = ( ) -> new RuntimeException(
"Time Series Store is not configured." );

Expand Down Expand Up @@ -120,7 +125,7 @@ public void listMetrics2( @PathVariable final String pattern, Writer response )
}

@RequestMapping( value = "/dumpnames", method = RequestMethod.GET )
public void dumpNames( @RequestParam( value = "startId", required = false, defaultValue = "0" ) int startId,
public void dumpNames( @RequestParam( value = "startId", required = false, defaultValue = "0" ) long startId,
@RequestParam( value = "startName", required = false ) String startName,
@RequestParam( value = "count", required = false ) Integer count,
@RequestParam( value = "filter", required = false ) String wildcard, Writer response )
Expand All @@ -138,7 +143,7 @@ public void dumpNames( @RequestParam( value = "startId", required = false, defau
}
try
{
tsStore().scanMetrics( startId, Integer.MAX_VALUE, m -> {
tsStore().scanMetrics( startId, getMaxId(), m -> {
if ( !filter.test( m ) )
{
return;
Expand All @@ -164,6 +169,10 @@ public void dumpNames( @RequestParam( value = "startId", required = false, defau
}
}

private long getMaxId() {
return longId ? Long.MAX_VALUE : Integer.MAX_VALUE;
}

private boolean loadLock = false;

private volatile boolean abortLoad = false;
Expand Down Expand Up @@ -528,6 +537,7 @@ static class StopException

static boolean hasDataSince( TimeSeriesStore ts, String metric, int from )
{

for ( String dbName : Arrays.asList( "30m2y", "5m7d", "60s24h" ) )
{
if ( null != ts.getFirst( dbName, metric, from, Integer.MAX_VALUE ) )
Expand All @@ -554,7 +564,7 @@ public void cleanSeries( @RequestParam( value = "from", required = false, defaul

try
{
ts.scanMetrics( 0, Integer.MAX_VALUE, m -> {
ts.scanMetrics( 0, getMaxId(), m -> {
if ( written.get() >= count )
{
// produced big enough result - interrupt execution through exception (signal "donness")
Expand Down Expand Up @@ -622,7 +632,7 @@ public void dumpSeries( @PathVariable final String dbName,
try
{
ts.scanMetrics( cursor,
Integer.MAX_VALUE,
getMaxId(),
m -> {
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public interface TimeSeriesStore

DataPointExportResults exportPoints( String dbName, String metricName );

DataPointExportResults exportPoints( String dbName, int metricId );
DataPointExportResults exportPoints( String dbName, long metricId );

// to support testing
Metric selectRandomMetric();
Expand All @@ -47,13 +47,13 @@ public interface TimeSeriesStore

Metric getMetric( String name, boolean createIfMissing );

Metric getMetric( int metricId );
Metric getMetric( long metricId );

String getMetricName( int metricId );
String getMetricName( long metricId );

void scanMetrics( Consumer<Metric> m );

int scanMetrics( int start, int end, Consumer<Metric> m );
long scanMetrics( long start, long end, Consumer<Metric> m );

List<Metric> findMetrics( String pattern );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ public class TimeSeriesStoreImpl implements TimeSeriesStore

private volatile long logNoOfSeriesThreshold;

private boolean longId;

public static ThreadPoolExecutor newSerialTaskQueue(int queueSize) {
ThreadFactory tf =
new ThreadFactoryBuilder()
Expand Down Expand Up @@ -142,7 +144,8 @@ public TimeSeriesStoreImpl(MetricRegistry metricRegistry, MetricIndex nameIndex,
ThreadPoolExecutor heavyQueryTaskQueue, ThreadPoolExecutor serialTaskQueue,
DataPointStore pointStore, DatabaseMetrics dbMetrics,
boolean batchedSeriesRetrieval, int batchedSeriesSize, boolean dumpIndex,
File dumpIndexFile, int maxNonLeafPointsLoggedPerMin, String metricsStoreConfigFile) {
File dumpIndexFile, int maxNonLeafPointsLoggedPerMin, String metricsStoreConfigFile,
boolean longId) {
this.nameIndex = Preconditions.checkNotNull(nameIndex);
this.eventLogger = eventLogger;
this.pointStore = Preconditions.checkNotNull(pointStore);
Expand All @@ -154,6 +157,7 @@ public TimeSeriesStoreImpl(MetricRegistry metricRegistry, MetricIndex nameIndex,
this.dumpIndex = dumpIndex;
this.dumpIndexFile = dumpIndexFile;
this.nonLeafPointsLogQuota = new Quota(maxNonLeafPointsLoggedPerMin, 60);
this.longId = longId;


rejectedCounter = metricRegistry.counter(
Expand Down Expand Up @@ -358,11 +362,11 @@ public DataPointExportResults exportPoints(String dbName, String metricName) {
}

@Override
public DataPointExportResults exportPoints(String dbName, int metricId) {
public DataPointExportResults exportPoints(String dbName, long metricId) {
return exportPoints(dbName, null, metricId);
}

private DataPointExportResults exportPoints(String dbName, String metricName, Integer metricId) {
private DataPointExportResults exportPoints(String dbName, String metricName, Long metricId) {
if (!RetentionPolicy.dbNameExists(dbName)) {
throw new RuntimeException(String.format("Unknown dbName [%s]", dbName));
}
Expand Down Expand Up @@ -641,13 +645,13 @@ public DeleteAPIResult deleteAPI( String name, boolean delete, Set<String> exclu
}

@Override
public Metric getMetric( int metricId )
public Metric getMetric( long metricId )
{
return nameIndex.getMetric( metricId );
}

@Override
public String getMetricName( int metricId )
public String getMetricName( long metricId )
{
return nameIndex.getMetricName( metricId );
}
Expand All @@ -664,11 +668,18 @@ public void deleteAll()
@Override
public void scanMetrics( Consumer<Metric> m )
{
scanMetrics( 0, Integer.MAX_VALUE, m );
if(longId)
{
scanMetrics( 0, Long.MAX_VALUE, m );
}
else
{
scanMetrics( 0, Integer.MAX_VALUE, m );
}
}

@Override
public int scanMetrics( int start, int end, Consumer<Metric> m )
public long scanMetrics( long start, long end, Consumer<Metric> m )
{
return nameIndex.scanNames( start, end, m );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
*/
package com.demandware.carbonj.service.db;

import java.io.File;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.codahale.metrics.MetricRegistry;
import com.demandware.carbonj.service.db.index.cfgMetricIndex;
import com.demandware.carbonj.service.db.model.DataPointStore;
import com.demandware.carbonj.service.db.model.MetricIndex;
import com.demandware.carbonj.service.db.points.cfgDataPoints;
import com.demandware.carbonj.service.db.util.DatabaseMetrics;
import com.demandware.carbonj.service.engine.cfgCentralThreadPools;
import com.demandware.carbonj.service.events.EventsLogger;
import com.demandware.carbonj.service.events.cfgCarbonjEventsLogger;
Expand All @@ -21,15 +22,12 @@
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.annotation.Import;

import com.demandware.carbonj.service.db.index.cfgMetricIndex;
import com.demandware.carbonj.service.db.model.DataPointStore;
import com.demandware.carbonj.service.db.model.MetricIndex;
import com.demandware.carbonj.service.db.points.cfgDataPoints;
import com.demandware.carbonj.service.db.util.DatabaseMetrics;
import java.io.File;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@Import( { cfgMetricIndex.class, cfgDataPoints.class, cfgCentralThreadPools.class, cfgCarbonjEventsLogger.class } )
@ConditionalOnProperty(name=cfgTimeSeriesStorage.DB_ENABLED_PROPERTY_KEY, havingValue="true", matchIfMissing=true)
Expand All @@ -39,6 +37,9 @@ public class cfgTimeSeriesStorage

public static final String DB_ENABLED_PROPERTY_KEY = "metrics.store.enabled";

@Value( "${metrics.store.longId:false}" )
private boolean longId;

@Value( "${metrics.store.fetchSeriesThreads:20}" )
private int nTaskThreads;

Expand Down Expand Up @@ -85,7 +86,8 @@ TimeSeriesStore timeSeriesStore( MetricIndex nameIndex, DataPointStore pointStor
TimeSeriesStoreImpl.newHeavyQueryTaskQueue( nHeavyQueryThreads, heavyQueryBlockingQueueSize ),
TimeSeriesStoreImpl.newSerialTaskQueue( serialQueueSize ), pointStore,
dbMetrics, batchedSeriesRetrieval,
batchedSeriesSize, dumpIndex, new File( dumpIndexFile ), maxNonLeafPointsLoggedPerMin, metricStoreConfigFile);
batchedSeriesSize, dumpIndex, new File( dumpIndexFile ), maxNonLeafPointsLoggedPerMin, metricStoreConfigFile,
longId);

s.scheduleWithFixedDelay(timeSeriesStore::reload, 60, 60, TimeUnit.SECONDS );
s.scheduleWithFixedDelay(timeSeriesStore::refreshStats, 60, 10, TimeUnit.SECONDS );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,19 @@
import com.google.common.base.Preconditions;

class IdRecord
implements Record<Integer>
implements Record<Long>
{
private Integer key;
private Long key;

private String metricName;

public IdRecord( Integer key, String metricName)
public IdRecord( Long key, String metricName)
{
this.key = Preconditions.checkNotNull(key);
this.metricName = Preconditions.checkNotNull(metricName);
}

public Integer key()
public Long key()
{
return key;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,40 +7,44 @@
package com.demandware.carbonj.service.db.index;

import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;

import static java.nio.charset.StandardCharsets.UTF_8;

class IdRecordSerializer
implements RecordSerializer<Integer, IdRecord>
implements RecordSerializer<Long, IdRecord>
{
public IdRecordSerializer()
private boolean longId;

public IdRecordSerializer(boolean longId)
{
this.longId = longId;
}

@Override
public Integer key( byte[] keyBytes )
public Long key( byte[] keyBytes )
{
return Ints.fromByteArray( keyBytes );
return longId ? Longs.fromByteArray( keyBytes ) : Integer.valueOf(Ints.fromByteArray(keyBytes)).longValue();
}

@Override
public IdRecord toIndexEntry( byte[] keyBytes, byte[] valueBytes)
{
Integer key = key(keyBytes);
Long key = key(keyBytes);
return toIndexEntry( key, valueBytes);
}

@Override
public IdRecord toIndexEntry( Integer key, byte[] valueBytes)
public IdRecord toIndexEntry( Long key, byte[] valueBytes)
{
String value = new String(valueBytes, UTF_8);
return new IdRecord( key, value );
}

@Override
public byte[] keyBytes(Integer key)
public byte[] keyBytes(Long key)
{
return Ints.toByteArray(key);
return longId ? Longs.toByteArray(key) : Ints.toByteArray(key.intValue());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ public interface IndexStore<K, R extends Record<K>>

K maxKey();

int scan( K startKey, K endKey, Consumer<R> c );
long scan( K startKey, K endKey, Consumer<R> c );
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,18 @@
*/
package com.demandware.carbonj.service.db.index;

import java.io.File;
import java.io.PrintWriter;
import java.util.function.Consumer;

import com.codahale.metrics.MetricRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.rocksdb.CompressionType;
import org.rocksdb.Options;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.TtlDB;

import com.codahale.metrics.Timer;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.primitives.UnsignedBytes;
import com.google.common.primitives.SignedBytes;
import org.rocksdb.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.PrintWriter;
import java.util.function.Consumer;

class IndexStoreRocksDB<K, R extends Record<K>>
implements IndexStore<K, R>
Expand Down Expand Up @@ -88,17 +81,22 @@ public void dump( PrintWriter pw )

private static int keyCompare( byte[] keyBytes1, byte[] keyBytes2 )
{
return UnsignedBytes.lexicographicalComparator().compare( keyBytes1, keyBytes2 );
// Since few of the old shards are in negative, ids are no more unsigned
return SignedBytes.lexicographicalComparator().compare( keyBytes1, keyBytes2 );
}

@Override
public int scan( K startKey, K endKey, Consumer<R> c )
public long scan( K startKey, K endKey, Consumer<R> c )
{
int processed = 0;
long processed = 0;
byte[] endKeyBytes = null == endKey ? null : recSerializer.keyBytes( endKey );
try (RocksIterator iter = db.newIterator( new ReadOptions() ))
{
if ( null == startKey )
// Rocksdb jni does not support min negative value - Integer_MAX_VALUE + 1.
// This is a work around to seek to the first value.
// Would like to change the signature of the method to concrete type - long but
// that needs lot of changes.
if ( null == startKey || (long)startKey < 0)
{
iter.seekToFirst();
}
Expand All @@ -109,7 +107,9 @@ public int scan( K startKey, K endKey, Consumer<R> c )
for ( ; iter.isValid(); iter.next() )
{
byte[] key = iter.key();
if ( null != endKey && keyCompare( key, endKeyBytes ) >= 0 )
// Don't stop after reaching Integer.Max_VALUE as there may be
// negative ids after Integer overflow
if ( null != endKey && keyCompare( key, endKeyBytes ) > 0 )
{
break;
}
Expand Down
Loading

0 comments on commit f91b9f7

Please sign in to comment.