Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close all resources gracefully on JVM shutdown #98

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 41 additions & 23 deletions core/src/main/java/dev/keva/core/aof/AOFContainer.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dev.keva.core.aof;

import dev.keva.core.config.KevaConfig;
import dev.keva.core.exception.StartupException;
import dev.keva.ioc.annotation.Autowired;
import dev.keva.ioc.annotation.Component;
import dev.keva.protocol.resp.Command;
Expand All @@ -16,11 +17,13 @@

@Slf4j
@Component
public class AOFContainer {
public class AOFContainer implements Closeable {
private ReentrantLock bufferLock;
private ObjectOutputStream output;
private FileDescriptor fd;
private boolean alwaysFlush;
private ScheduledExecutorService executorService;
private volatile boolean isOpen;

@Autowired
private KevaConfig kevaConfig;
Expand All @@ -47,7 +50,7 @@ public void init() {
}

if (!alwaysFlush) {
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
executorService = Executors.newSingleThreadScheduledExecutor();
executorService.scheduleAtFixedRate(() -> {
try {
flush();
Expand All @@ -59,10 +62,15 @@ public void init() {
} else {
log.info("AOF will trigger for every new mutate command");
}
isOpen = true;
}

public void write(Command command) {
bufferLock.lock();
if (!isOpen) {
log.warn("Dropping write to AOF as it is closed!");
return;
}
try {
output.writeObject(command.getObjects());
if (alwaysFlush) {
Expand All @@ -75,34 +83,44 @@ public void write(Command command) {
}
}

private void flush() throws IOException {
fd.sync();
}

public List<Command> read() throws IOException {
try {
List<Command> commands = new ArrayList<>(100);
FileInputStream fis = new FileInputStream(getWorkingDir() + "keva.aof");
final List<Command> commands = new ArrayList<>(100);
try (FileInputStream fis = new FileInputStream(getWorkingDir() + "keva.aof");
ObjectInputStream input = new ObjectInputStream(fis)) {
log.info("AOF size is: {}", fis.getChannel().size());
ObjectInputStream input = new ObjectInputStream(fis);
while (true) {
try {
byte[][] objects = (byte[][]) input.readObject();
commands.add(Command.newInstance(objects, false));
} catch (EOFException e) {
log.error("Error while reading AOF command", e);
fis.close();
return commands;
} catch (ClassNotFoundException e) {
log.error("Error reading AOF file", e);
return commands;
}
byte[][] objects = (byte[][]) input.readObject();
commands.add(Command.newInstance(objects, false));
}
} catch (final FileNotFoundException | EOFException ignored) {
return commands;
} catch (final ClassNotFoundException e) {
final String msg = "Error reading AOF file";
log.error(msg, e);
throw new StartupException(msg, e);
}
}

public void close() throws IOException {
bufferLock.lock();
isOpen = false;
log.info("Closing AOF log.");
try {
if (executorService != null) {
executorService.shutdown();
}
} catch (FileNotFoundException ignored) {
throw new FileNotFoundException("AOF file not found");
// Closing the stream should flush it, but still doing it explicitly!
flush();
output.close();
} finally {
bufferLock.unlock();
}
}

private void flush() throws IOException {
fd.sync();
}

private String getWorkingDir() {
String workingDir = kevaConfig.getWorkDirectory();
return workingDir.equals("./") ? "" : workingDir + "/";
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/dev/keva/core/aof/AOFManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,8 @@ public void init() {

aof.init();
}

public void stop() throws IOException {
aof.close();
}
}
11 changes: 11 additions & 0 deletions core/src/main/java/dev/keva/core/exception/StartupException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package dev.keva.core.exception;

/**
* StartupException indicates any fatal error encountered during server boot.
*/
public class StartupException extends RuntimeException{

public StartupException(final String msg, final Throwable cause){
super(msg, cause);
}
}
145 changes: 103 additions & 42 deletions core/src/main/java/dev/keva/core/server/KevaServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ public class KevaServer implements Server {
" | |/ / | __| \\ \\ / / /_\\ \n" +
" | ' < | _| \\ V / / _ \\ \n" +
" |_|\\_\\ |___| \\_/ /_/ \\_\\";
private static final int SHUTDOWN_TIMEOUT_MS = 1000;
private enum State {
CREATED, CREATING, RUNNING, TERMINATING, TERMINATED
}

private volatile State state;
private final KevaDatabase database;
private final KevaConfig config;
private final NettyChannelInitializer nettyChannelInitializer;
Expand All @@ -47,6 +53,7 @@ public KevaServer(KevaDatabase database, KevaConfig config, NettyChannelInitiali
this.nettyChannelInitializer = nettyChannelInitializer;
this.commandMapper = commandMapper;
this.aofManager = aofManager;
this.state = State.CREATED;
}

public static KevaServer ofDefaults() {
Expand All @@ -64,7 +71,102 @@ public static KevaServer ofCustomBeans(Object... beans) {
return context.getBean(KevaServer.class);
}

public ServerBootstrap bootstrapServer() throws NettyNativeTransportLoader.NettyNativeLoaderException {
@Override
public void shutdown() {
switch (state) {
case CREATED:
case CREATING:
throw new RuntimeException("Attempt to shutdown a non-started server!");
case RUNNING:
boolean set = updateState(State.TERMINATING);
if (!set) {
// The state was concurrently modified, so re-check the condition.
shutdown();
return;
}
try {
bossGroup.shutdownGracefully(0, SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS).sync();
workerGroup.shutdownGracefully(0, SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS).sync();
channel.close();
database.close();
aofManager.stop();
} catch (Exception e) {
log.warn("Encountered error while shutting down server, ignoring", e);
}
state = State.TERMINATED;
log.info("Keva server at {} stopped", config.getPort());
log.info("Bye bye!");
return;
default:
}
}

@Override
public void run() {
switch (state) {
case CREATING:
case RUNNING:
return;
case CREATED:
// take create lock and call run again
boolean set = updateState(State.CREATING);
if (!set) {
// The state was concurrently modified, so re-check the condition.
run();
return;
}
try {
stopwatch.start();
ServerBootstrap server = bootstrapServer();

aofManager.init();

ChannelFuture sync = server.bind(config.getPort()).sync();
log.info("{} server started at {}:{}, in {} ms",
KEVA_BANNER,
config.getHostname(), config.getPort(),
stopwatch.elapsed(TimeUnit.MILLISECONDS));
log.info("Ready to accept connections");
state = State.RUNNING;
System.out.println("Set state to running");
channel = sync.channel();
channel.closeFuture().sync(); //block
} catch (InterruptedException e) {
log.error("Failed to start server: ", e);
Thread.currentThread().interrupt();
} catch (Exception e) {
log.error("Failed to start server: ", e);
} finally {
stopwatch.stop();
}
return;
default:
throw new RuntimeException("Attempt to run a stopped server");
}
}

@Override
public void clear() {
switch (state) {
case RUNNING:
database.flush();
return;
default:
throw new RuntimeException("Attempt to clear a non-running server");
}

}

// Do a CAS on state
private synchronized boolean updateState(State state) {
if (this.state.equals(state)) {
return false;
}
this.state = state;
return true;
}

private ServerBootstrap bootstrapServer() throws NettyNativeTransportLoader.NettyNativeLoaderException {
try {
commandMapper.init();
Class<? extends AbstractEventExecutorGroup> executorGroupClazz = NettyNativeTransportLoader.getEventExecutorGroupClazz();
Expand All @@ -85,45 +187,4 @@ public ServerBootstrap bootstrapServer() throws NettyNativeTransportLoader.Netty
throw new NettyNativeTransportLoader.NettyNativeLoaderException("Cannot load Netty classes");
}
}

@Override
public void shutdown() {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
channel.close();
log.info("Keva server at {} stopped", config.getPort());
log.info("Bye bye!");
}

@Override
public void run() {
try {
stopwatch.start();
ServerBootstrap server = bootstrapServer();

aofManager.init();

ChannelFuture sync = server.bind(config.getPort()).sync();
log.info("{} server started at {}:{}, in {} ms",
KEVA_BANNER,
config.getHostname(), config.getPort(),
stopwatch.elapsed(TimeUnit.MILLISECONDS));
log.info("Ready to accept connections");

channel = sync.channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
log.error("Failed to start server: ", e);
Thread.currentThread().interrupt();
} catch (Exception e) {
log.error("Failed to start server: ", e);
} finally {
stopwatch.stop();
}
}

@Override
public void clear() {
database.flush();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

public abstract class AbstractServerTest {
static Jedis jedis;
static Server server;
static volatile Server server;
static Jedis subscriber;

@Test
Expand Down
6 changes: 5 additions & 1 deletion store/src/main/java/dev/keva/store/KevaDatabase.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@

import dev.keva.util.hashbytes.BytesKey;

import java.io.Closeable;
import java.util.AbstractMap;
import java.util.concurrent.locks.Lock;

public interface KevaDatabase {
public interface KevaDatabase extends Closeable {
Lock getLock();

void flush();

@Override
default void close() {}

void put(byte[] key, byte[] val);

void expireAt(byte[] key, long timestampInMillis);
Expand Down
10 changes: 10 additions & 0 deletions store/src/main/java/dev/keva/store/impl/OffHeapDatabaseImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ public void flush() {
}
}

@Override
public void close() {
lock.lock();
try {
chronicleMap.close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @the123saurav, please consider this case https://github.com/OpenHFT/Chronicle-Map/blob/ea/docs/CM_Tutorial.adoc#close-chroniclemap

If you call close() too early before you have finished working with the map, this can cause your JVM to crash. Close MUST be the last thing that you do with the map.

So, although it is recommended that you close() when you have finished with the map, it is not something that you must do; it’s just something that we recommend you should do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm good point. But does that stop us from calling this?
It will be called from 2 places:

  • During JVM shutdown: In this case we are guranteed to not get any calls.
  • User initiates shutdown in embedded mode. In this case we can just add to our doc, that this should be the last thing done to KevaServer.

I am also okay to remove this from shutdown procedure.
Kindly let me know.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure @the123saurav, I think we can add that warning to our doc

} finally {
lock.unlock();
}
}

private byte[] getExpireKey(byte[] key) {
return Bytes.concat(key, EXP_POSTFIX);
}
Expand Down