Skip to content

Commit

Permalink
fixed Codacy reported error-prone issues #1
Browse files Browse the repository at this point in the history
  • Loading branch information
RalphSteinhagen committed Feb 15, 2021
1 parent 559421d commit 8a45278
Show file tree
Hide file tree
Showing 23 changed files with 195 additions and 575 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;

import javax.validation.constraints.NotNull;
import org.jetbrains.annotations.NotNull;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -317,7 +317,7 @@ public void run() { // NOPMD NOSONAR - complexity
housekeeping();
}
}
} catch (final Exception e) { // NOSONAR -- terminate normally beyond this point
} catch (final Exception e) { // NOPMD NOSONAR -- terminate normally beyond this point
LOGGER.atError().setCause(e).log("data acquisition loop abnormally terminated");
} finally {
externalSocket.close();
Expand Down
1 change: 1 addition & 0 deletions client/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
requires java.validation;
requires kotlin.stdlib;
requires it.unimi.dsi.fastutil;
requires org.jetbrains.annotations;

exports io.opencmw.client.cmwlight;
exports io.opencmw.client.rest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ private static ZFrame getRequest(final Socket socket) {
private static void handleRouterSocket(final Socket router) {
System.err.println("### called handleRouterSocket");
// Get [id, ] message on client connection.
ZFrame handle;
if ((handle = getConnectionID(router)) == null) {
ZFrame handle = getConnectionID(router);
if (handle == null) {
// did not receive proper [ID, null msg] frames
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@
* (ie. JIT, whether services are identified by single characters etc.)
*/
@SuppressWarnings("PMD.DoNotUseThreads")
class RoundTripAndNotifyEvaluation { // NOPMD -- nomen est omen
class RoundTripAndNotifyEvaluation {
// private static final String SUB_TOPIC = "x";
private static final String SUB_TOPIC = "<domain>/<property>?<filter>#<ctx> - a very long topic to test the dependence of pub/sub pairs on topic lengths";
private static final byte[] SUB_DATA = "D".getBytes(StandardCharsets.UTF_8); // custom minimal data
private static final byte[] CLIENT_ID = "C".getBytes(StandardCharsets.UTF_8); // client name
private static final byte[] WORKER_ID = "W".getBytes(StandardCharsets.UTF_8); // worker-service name
private static final byte[] PUBLISH_ID = "P".getBytes(StandardCharsets.UTF_8); // publish-service name
private static final byte[] SUBSCRIBER_ID = "S".getBytes(StandardCharsets.UTF_8); // subscriber name
public static final String START = "start";
public static final String TCP_LOCALHOST_5555 = "tcp://localhost:5555";
private static final AtomicBoolean run = new AtomicBoolean(true);
private static final boolean VERBOSE_PRINTOUT = false;
private static int sampleSize = 10_000;
Expand Down Expand Up @@ -251,7 +253,7 @@ public void run() {
worker.connect("inproc://broker");
while (run.get() && !Thread.currentThread().isInterrupted()) {
ZMsg msg = ZMsg.recvMsg(worker);
if ("start".equals(msg.getFirst().getString(ZMQ.CHARSET))) {
if (START.equals(msg.getFirst().getString(ZMQ.CHARSET))) {
// System.err.println("dealer (indirect): start pushing");
for (int requests = 0; requests < sampleSizePub; requests++) {
worker.send(SUB_TOPIC, ZMQ.SNDMORE);
Expand Down Expand Up @@ -313,7 +315,7 @@ public void run() {
worker.connect("tcp://localhost:5556");
while (run.get() && !Thread.currentThread().isInterrupted()) {
ZMsg msg = ZMsg.recvMsg(worker);
if ("start".equals(msg.getFirst().getString(ZMQ.CHARSET))) {
if (START.equals(msg.getFirst().getString(ZMQ.CHARSET))) {
// System.err.println("dealer (indirect): start pushing");
for (int requests = 0; requests < sampleSizePub; requests++) {
worker.send(SUB_TOPIC, ZMQ.SNDMORE);
Expand All @@ -337,7 +339,7 @@ public void run() {
worker.bind("tcp://localhost:5558");
while (run.get() && !Thread.currentThread().isInterrupted()) {
ZMsg msg = ZMsg.recvMsg(worker);
if ("start".equals(msg.getFirst().getString(ZMQ.CHARSET))) {
if (START.equals(msg.getFirst().getString(ZMQ.CHARSET))) {
// System.err.println("dealer (direct): start pushing");
for (int requests = 0; requests < sampleSizePub; requests++) {
worker.send(SUB_TOPIC, ZMQ.SNDMORE);
Expand All @@ -358,7 +360,7 @@ public void run() {
Socket client = ctx.createSocket(SocketType.DEALER);
client.setHWM(0);
client.setIdentity(CLIENT_ID);
client.connect("tcp://localhost:5555");
client.connect(TCP_LOCALHOST_5555);

Socket subClient = ctx.createSocket(SocketType.SUB);
subClient.setHWM(0);
Expand Down Expand Up @@ -418,24 +420,24 @@ public void run() {
});
subClient.unsubscribe(SUB_TOPIC.getBytes(ZMQ.CHARSET));

client.disconnect("tcp://localhost:5555");
client.disconnect(TCP_LOCALHOST_5555);
client.setIdentity(SUBSCRIBER_ID);
client.connect("tcp://localhost:5555");
ZMsg.newStringMsg("start").addFirst("E").send(client);
client.connect(TCP_LOCALHOST_5555);
ZMsg.newStringMsg(START).addFirst("E").send(client);
measure("Subscription (DEALER) test (TCP)", sampleSizePub, () -> {
ZMsg req = ZMsg.recvMsg(client);
req.destroy();
});

ZMsg.newStringMsg("start").addFirst("I").send(client);
ZMsg.newStringMsg(START).addFirst("I").send(client);
measure("Subscription (DEALER) test (InProc)", sampleSizePub, () -> {
ZMsg req = ZMsg.recvMsg(client);
req.destroy();
});

client.disconnect("tcp://localhost:5555");
client.disconnect(TCP_LOCALHOST_5555);
client.connect("tcp://localhost:5558");
ZMsg.newStringMsg("start").send(client);
ZMsg.newStringMsg(START).send(client);
measure("Subscription (direct DEALER) test", sampleSizePub, () -> {
ZMsg req = ZMsg.recvMsg(client);
req.destroy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* default client time-out [s] is set by system property: 'OpenCMW.clientTimeOut' // default: 3600 [s] -- after which unanswered client messages and infos are being deleted
*
*/
@SuppressWarnings("PMD.DoNotUseThreads")
@SuppressWarnings({ "PMD.DoNotUseThreads", "PMD.TooManyMethods", "PMD.GodClass" }) // this is a concept
public class MajordomoBroker extends Thread {
private static final Logger LOGGER = LoggerFactory.getLogger(MajordomoBroker.class);
private static final byte[] INTERNAL_SENDER_ID = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ public class MajordomoClientV1 {
private static final AtomicInteger CLIENT_V1_INSTANCE = new AtomicInteger();
private final String uniqueID;
private final byte[] uniqueIdBytes;
private String broker;
private ZContext ctx;
private final String broker;
private final ZContext ctx;
private ZMQ.Socket clientSocket;
private long timeout = 2500;
private int retries = 3;
private Formatter log = new Formatter(System.out);
private final Formatter log = new Formatter(System.out);
private ZMQ.Poller poller;

public MajordomoClientV1(String broker, String clientName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void setTimeout(long timeout) {
/**
* Connect or reconnect to broker
*/
void reconnectToBroker() {
private void reconnectToBroker() {
if (clientSocket != null) {
clientSocket.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,8 @@ public MdpMessage(final byte[] senderID, final MdpSubProtocol protocol, final by

public byte[] getRbacFrame() {
if (hasRbackToken()) {
return payload[payload.length - 1];
final byte[] rbacFrame = payload[payload.length - 1];
return Arrays.copyOf(rbacFrame, rbacFrame.length);
}
return null;
}
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/java/io/opencmw/EventStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ public final LocalEventHandlerGroup then(final Predicate<RingBufferEvent> filter
}

public static class EventStoreFactory {
private boolean isSingleProducer = false;
private boolean singleProducer = false;
private int maxThreadNumber = 4;
private int ringbufferSize = 64;
private int lengthHistoryBuffer = 10;
Expand All @@ -291,7 +291,7 @@ public EventStore build() {
if (muxBuilder == null) {
muxBuilder = Cache.<String, Disruptor<RingBufferEvent>>builder().withLimit(lengthHistoryBuffer);
}
return new EventStore(muxBuilder, muxCtxFunction, ringbufferSize, lengthHistoryBuffer, maxThreadNumber, isSingleProducer, waitStrategy, filterConfig);
return new EventStore(muxBuilder, muxCtxFunction, ringbufferSize, lengthHistoryBuffer, maxThreadNumber, singleProducer, waitStrategy, filterConfig);
}

public Class<? extends Filter>[] getFilterConfig() {
Expand Down Expand Up @@ -376,11 +376,11 @@ public EventStoreFactory setWaitStrategy(final WaitStrategy waitStrategy) {
}

public boolean isSingleProducer() {
return isSingleProducer;
return singleProducer;
}

public EventStoreFactory setSingleProducer(final boolean singleProducer) {
isSingleProducer = singleProducer;
this.singleProducer = singleProducer;
return this;
}
}
Expand Down Expand Up @@ -422,7 +422,7 @@ public void onEvent(final RingBufferEvent event, final long sequence, final bool
final RingBufferEvent result;
try {
result = callback.onEvent(history, eventStore, sequence, endOfBatch);
} catch (Exception e) {
} catch (Exception e) { // NOPMD - part of exception handling/forwarding scheme
LOGGER.atError().setCause(e).addArgument(history.size()).addArgument(sequence).addArgument(endOfBatch) //
.log("caught error for arguments (history={}, eventStore, sequence={}, endOfBatch={})");
event.throwables.add(e);
Expand Down
1 change: 1 addition & 0 deletions core/src/main/java/io/opencmw/OpenCmwProtocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ public static MdpMessage receive(final Socket socket) {
* @param wait setting the flag to ZMQ.DONTWAIT does a non-blocking recv.
* @return MdpMessage if valid, or {@code null} otherwise
*/
@SuppressWarnings("NPath.Complexity")
public static MdpMessage receive(@NotNull final Socket socket, final boolean wait) {
final int flags = wait ? 0 : ZMQ.DONTWAIT;
final ZMsg msg = ZMsg.recvMsg(socket, flags);
Expand Down
1 change: 1 addition & 0 deletions core/src/main/java/io/opencmw/RingBufferEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.lmax.disruptor.EventHandler;

@SuppressWarnings("PMD.TooManyMethods")
public class RingBufferEvent implements FilterPredicate, Cloneable {
private final static Logger LOGGER = LoggerFactory.getLogger(RingBufferEvent.class);
/**
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/io/opencmw/filter/TimingCtx.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void clear() {
pid = -1;
gid = -1;
bpcts = -1;
ctxName = null;
ctxName = "";
}

@Override
Expand Down
Loading

0 comments on commit 8a45278

Please sign in to comment.