Skip to content

Commit

Permalink
Allow system clock to be overridden
Browse files Browse the repository at this point in the history
Implements #3.
  • Loading branch information
jdhoek committed Jul 1, 2019
1 parent 651d74c commit e0ca30d
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
*/
public class BaseUniqueIDGenerator implements IDGenerator {
protected final GeneratorIdentityHolder generatorIdentityHolder;
private final Clock clock;
private final Mode mode;

long previousTimestamp = 0;
Expand All @@ -45,8 +46,12 @@ public class BaseUniqueIDGenerator implements IDGenerator {
* @param generatorIdentityHolder Generator identity holder.
* @param mode Generator mode.
*/
public BaseUniqueIDGenerator(GeneratorIdentityHolder generatorIdentityHolder, Mode mode) {
public BaseUniqueIDGenerator(GeneratorIdentityHolder generatorIdentityHolder,
Clock clock,
Mode mode) {
this.generatorIdentityHolder = generatorIdentityHolder;
// Fall back to the default wall clock if no alternative is passed.
this.clock = clock == null ? System::currentTimeMillis : clock;
this.mode = mode == null ? Mode.defaultMode() : mode;
}

Expand All @@ -55,17 +60,25 @@ public BaseUniqueIDGenerator(GeneratorIdentityHolder generatorIdentityHolder, Mo
*/
@Override
public synchronized byte[] generate() throws GeneratorException {
return generate(0);
}

synchronized byte[] generate(int attempt) throws GeneratorException {
// To prevent the generator from becoming stuck in a loop when the supplied clock
// doesn't progress, this safety valve will trigger after waiting too long for the
// next clock tick.
if (attempt > 10) throw new GeneratorException("Clock supplied to generator failed to progress.");

long now = System.currentTimeMillis();
long now = clock.getCurrentTimeMillis();
if (now == previousTimestamp) {
sequence++;
} else {
sequence = 0;
}
if (sequence > Blueprint.MAX_SEQUENCE_COUNTER) {
try {
TimeUnit.MILLISECONDS.sleep(1);
return generate();
TimeUnit.MICROSECONDS.sleep(400);
return generate(attempt + 1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
Expand Down
29 changes: 29 additions & 0 deletions uniqueid-core/src/main/java/org/lable/oss/uniqueid/Clock.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (C) 2014 Lable ([email protected])
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.lable.oss.uniqueid;

/**
* Abstraction for the clock implementation. This allows for use of this library in deterministic systems and tests.
*
* @implNote Clocks should at a minimum progress once every millisecond.
*/
@FunctionalInterface
public interface Clock {
/**
* @return The current time in milliseconds.
*/
long getCurrentTimeMillis();
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,31 @@ public class LocalUniqueIDGeneratorFactory {
*
* @param generatorId Generator ID to use (0 ≤ n ≤ 255).
* @param clusterId Cluster ID to use (0 ≤ n ≤ 15).
* @param clock Clock implementation.
* @param mode Generator mode.
* @return A thread-safe UniqueIDGenerator instance.
*/
public synchronized static IDGenerator generatorFor(int generatorId, int clusterId, Mode mode) {
public synchronized static IDGenerator generatorFor(int generatorId, int clusterId, Clock clock, Mode mode) {
assertParameterWithinBounds("generatorId", 0, Blueprint.MAX_GENERATOR_ID, generatorId);
assertParameterWithinBounds("clusterId", 0, Blueprint.MAX_CLUSTER_ID, clusterId);
String generatorAndCluster = String.format("%d_%d", generatorId, clusterId);
if (!instances.containsKey(generatorAndCluster)) {
GeneratorIdentityHolder identityHolder = LocalGeneratorIdentity.with(clusterId, generatorId);
instances.putIfAbsent(generatorAndCluster, new BaseUniqueIDGenerator(identityHolder, mode));
instances.putIfAbsent(generatorAndCluster, new BaseUniqueIDGenerator(identityHolder, clock, mode));
}
return instances.get(generatorAndCluster);
}

/**
* Return the UniqueIDGenerator instance for this specific generator-ID, cluster-ID combination. If one was
* already created, that is returned.
*
* @param generatorId Generator ID to use (0 ≤ n ≤ 255).
* @param clusterId Cluster ID to use (0 ≤ n ≤ 15).
* @param mode Generator mode.
* @return A thread-safe UniqueIDGenerator instance.
*/
public synchronized static IDGenerator generatorFor(int generatorId, int clusterId, Mode mode) {
return generatorFor(generatorId, clusterId, null, mode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void batchTest() throws Exception {

@Test
public void highGeneratorIdTest() throws Exception {
final int GENERATOR_ID = 255;
final int GENERATOR_ID = 10;
final int CLUSTER_ID = 15;
IDGenerator generator = LocalUniqueIDGeneratorFactory.generatorFor(GENERATOR_ID, CLUSTER_ID, Mode.SPREAD);

Expand All @@ -55,4 +55,43 @@ public void highGeneratorIdTest() throws Exception {
assertThat(blueprint.getGeneratorId(), is(GENERATOR_ID));
assertThat(blueprint.getClusterId(), is(CLUSTER_ID));
}

@Test
public void clockTest() throws Exception {
final int GENERATOR_ID = 20;
final int CLUSTER_ID = 15;
IDGenerator generator = LocalUniqueIDGeneratorFactory.generatorFor(
GENERATOR_ID,
CLUSTER_ID,
() -> 1,
Mode.SPREAD
);
byte[] id = null;
for (int i = 0; i < 64; i++) {
id = generator.generate();
}

Blueprint blueprint = IDBuilder.parse(id);
assertThat(blueprint.getGeneratorId(), is(GENERATOR_ID));
assertThat(blueprint.getClusterId(), is(CLUSTER_ID));
assertThat(blueprint.getTimestamp(), is(1L));
assertThat(blueprint.getSequence(), is(63));
}

@Test(expected = GeneratorException.class)
public void clockTestFails() throws Exception {
final int GENERATOR_ID = 30;
final int CLUSTER_ID = 15;
IDGenerator generator = LocalUniqueIDGeneratorFactory.generatorFor(
GENERATOR_ID,
CLUSTER_ID,
() -> 1,
Mode.SPREAD
);

// If the clock doesn't progress, no more then 64 ids can be generated.
for (int i = 0; i < 65; i++) {
generator.generate();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import org.lable.oss.dynamicconfig.zookeeper.MonitoringZookeeperConnection;
import org.lable.oss.uniqueid.BaseUniqueIDGenerator;
import org.lable.oss.uniqueid.Clock;
import org.lable.oss.uniqueid.GeneratorIdentityHolder;
import org.lable.oss.uniqueid.IDGenerator;
import org.lable.oss.uniqueid.bytes.Mode;
Expand Down Expand Up @@ -45,13 +46,15 @@ public class SynchronizedUniqueIDGeneratorFactory {
*
* @param zooKeeperConnection Connection to the ZooKeeper quorum.
* @param znode Base-path of the resource pool in ZooKeeper.
* @param clock Clock implementation.
* @param mode Generator mode.
* @return An instance of this class.
* @throws IOException Thrown when something went wrong trying to find the cluster ID or trying to claim a
* generator ID.
*/
public static synchronized IDGenerator generatorFor(MonitoringZookeeperConnection zooKeeperConnection,
String znode,
Clock clock,
Mode mode)
throws IOException {

Expand All @@ -60,29 +63,48 @@ public static synchronized IDGenerator generatorFor(MonitoringZookeeperConnectio
SynchronizedGeneratorIdentity generatorIdentityHolder =
new SynchronizedGeneratorIdentity(zooKeeperConnection, znode, clusterId, null, null);

return generatorFor(generatorIdentityHolder, mode);
return generatorFor(generatorIdentityHolder, clock, mode);
}
return instances.get(znode);
}

/**
* Get the synchronized ID generator instance.
*
* @param zooKeeperConnection Connection to the ZooKeeper quorum.
* @param znode Base-path of the resource pool in ZooKeeper.
* @param mode Generator mode.
* @return An instance of this class.
* @throws IOException Thrown when something went wrong trying to find the cluster ID or trying to claim a
* generator ID.
*/
public static synchronized IDGenerator generatorFor(MonitoringZookeeperConnection zooKeeperConnection,
String znode,
Mode mode)
throws IOException {
return generatorFor(zooKeeperConnection, znode, null, mode);
}

/**
* Get the synchronized ID generator instance.
*
* @param synchronizedGeneratorIdentity An instance of {@link SynchronizedGeneratorIdentity} to (re)use for
* acquiring the generator ID.
* @param clock Clock implementation.
* @param mode Generator mode.
* @return An instance of this class.
* @throws IOException Thrown when something went wrong trying to find the cluster ID or trying to claim a
* generator ID.
*/
public static synchronized IDGenerator generatorFor(SynchronizedGeneratorIdentity synchronizedGeneratorIdentity,
Clock clock,
Mode mode)
throws IOException {

String instanceKey = synchronizedGeneratorIdentity.getZNode();
if (!instances.containsKey(instanceKey)) {
logger.debug("Creating new instance.");
instances.putIfAbsent(instanceKey, new BaseUniqueIDGenerator(synchronizedGeneratorIdentity, mode));
instances.putIfAbsent(instanceKey, new BaseUniqueIDGenerator(synchronizedGeneratorIdentity, clock, mode));
}
return instances.get(instanceKey);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
Expand Down Expand Up @@ -67,9 +68,20 @@ public void simpleTest() throws Exception {

@Test
public void timeSequentialTest() throws Exception {
SynchronizedGeneratorIdentity generatorIdentityHolder =
new SynchronizedGeneratorIdentity(zooKeeperConnection, znode, 0, null, null);
IDGenerator generator = generatorFor(generatorIdentityHolder, Mode.TIME_SEQUENTIAL);
// Explicitly implement a clock ourselves for testing.
AtomicLong time = new AtomicLong(1_500_000_000);
SynchronizedGeneratorIdentity generatorIdentityHolder = new SynchronizedGeneratorIdentity(
zooKeeperConnection,
znode,
0,
null,
null
);
IDGenerator generator = generatorFor(
generatorIdentityHolder,
time::getAndIncrement,
Mode.TIME_SEQUENTIAL
);

Set<ByteArray> ids = new HashSet<>();
for (int i = 0; i < 100_000; i++) {
Expand Down

0 comments on commit e0ca30d

Please sign in to comment.