Skip to content

Commit

Permalink
fixed Codacy reported error-prone issues #1
Browse files Browse the repository at this point in the history
  • Loading branch information
RalphSteinhagen committed Feb 15, 2021
1 parent 559421d commit 2add8cd
Show file tree
Hide file tree
Showing 40 changed files with 312 additions and 832 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
* Serializes CmwLightMessage to ZeroMQ messages and vice versa.
*/
@SuppressWarnings("PMD.UnusedLocalVariable") // Unused variables are taken from the protocol and should be available for reference
public class CmwLightProtocol {
public class CmwLightProtocol { //NOPMD -- nomen est omen
private static final String CONTEXT_ACQ_STAMP = "ContextAcqStamp";
private static final String CONTEXT_CYCLE_STAMP = "ContextCycleStamp";
private static final String MESSAGE = "Message";
Expand All @@ -37,6 +37,10 @@ public class CmwLightProtocol {
public static final String VERSION = "1.0.0"; // Protocol version used if msg.version is null or empty
private static final int SERIALISER_QUIRK = 100; // there seems to be a bug in the serialiser which does not update the buffer position correctly, so send more

private CmwLightProtocol() {
// utility class
}

/**
* The message specified by the byte contained in the first frame of a message defines what type of message is present
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;

import javax.validation.constraints.NotNull;

import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.spi.LoggingEventBuilder;
Expand Down Expand Up @@ -106,7 +105,7 @@ public RestDataSource(final ZContext ctx, final String endpoint, final Duration
super(endpoint);
synchronized (LOGGER) { // prevent race condition between multiple constructor invocations
if (okClient == null) {
okClient = new OkHttpClient();
okClient = new OkHttpClient(); // NOPMD
eventSourceFactory = EventSources.createFactory(okClient);
}
}
Expand Down Expand Up @@ -317,7 +316,7 @@ public void run() { // NOPMD NOSONAR - complexity
housekeeping();
}
}
} catch (final Exception e) { // NOSONAR -- terminate normally beyond this point
} catch (final Exception e) { // NOPMD NOSONAR -- terminate normally beyond this point
LOGGER.atError().setCause(e).log("data acquisition loop abnormally terminated");
} finally {
externalSocket.close();
Expand All @@ -328,7 +327,7 @@ public void run() { // NOPMD NOSONAR - complexity

public void start() {
createPair();
new Thread(this).start();
new Thread(this).start(); // NOPMD
}

public void stop() {
Expand Down
1 change: 1 addition & 0 deletions client/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
requires java.validation;
requires kotlin.stdlib;
requires it.unimi.dsi.fastutil;
requires org.jetbrains.annotations;

exports io.opencmw.client.cmwlight;
exports io.opencmw.client.rest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ public class ManyVsLargeFrameEvaluation { // NOPMD -- nomen est omen
private static int sampleSize = 100_000;
private static final int N_BUFFER_SIZE = 8;
private static final int N_FRAMES = 10;
public static volatile byte[] smallMessage = new byte[N_BUFFER_SIZE * N_FRAMES];
public static volatile byte[] largeMessage = new byte[N_BUFFER_SIZE];
public static volatile byte[] smallMessage = new byte[N_BUFFER_SIZE * N_FRAMES]; // NOPMD - volatile on purpose
public static volatile byte[] largeMessage = new byte[N_BUFFER_SIZE]; // NOPMD - volatile on purpose

private static final int N_LOOPS = 5;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ private static ZFrame getRequest(final Socket socket) {
private static void handleRouterSocket(final Socket router) {
System.err.println("### called handleRouterSocket");
// Get [id, ] message on client connection.
ZFrame handle;
if ((handle = getConnectionID(router)) == null) {
ZFrame handle = getConnectionID(router);
if (handle == null) {
// did not receive proper [ID, null msg] frames
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,20 @@
* (ie. JIT, whether services are identified by single characters etc.)
*/
@SuppressWarnings("PMD.DoNotUseThreads")
class RoundTripAndNotifyEvaluation { // NOPMD -- nomen est omen
class RoundTripAndNotifyEvaluation {
// private static final String SUB_TOPIC = "x";
private static final String SUB_TOPIC = "<domain>/<property>?<filter>#<ctx> - a very long topic to test the dependence of pub/sub pairs on topic lengths";
private static final byte[] SUB_DATA = "D".getBytes(StandardCharsets.UTF_8); // custom minimal data
private static final byte[] CLIENT_ID = "C".getBytes(StandardCharsets.UTF_8); // client name
private static final byte[] WORKER_ID = "W".getBytes(StandardCharsets.UTF_8); // worker-service name
private static final byte[] PUBLISH_ID = "P".getBytes(StandardCharsets.UTF_8); // publish-service name
private static final byte[] SUBSCRIBER_ID = "S".getBytes(StandardCharsets.UTF_8); // subscriber name
public static final String START = "start";
public static final String TCP_LOCALHOST_5555 = "tcp://localhost:5555";
private static final AtomicBoolean run = new AtomicBoolean(true);
private static final boolean VERBOSE_PRINTOUT = false;
public static final char TAG_INTERNAL = 'I';
public static final char TAG_EXTERNAL = 'E';
private static int sampleSize = 10_000;
private static int sampleSizePub = 100_000;

Expand Down Expand Up @@ -147,18 +151,18 @@ public void run() { // NOPMD single-loop broker ... simplifies reading
ZFrame address = msg.pop();
ZFrame internal = msg.pop();
if (address.getData()[0] == CLIENT_ID[0]) {
if ('E' == internal.getData()[0]) {
if (TAG_EXTERNAL == internal.getData()[0]) {
msg.addFirst(new ZFrame(WORKER_ID));
msg.send(tcpBackend);
} else if ('I' == internal.getData()[0]) {
} else if (TAG_INTERNAL == internal.getData()[0]) {
msg.addFirst(new ZFrame(WORKER_ID));
msg.send(inprocBackend);
}
} else {
if ('E' == internal.getData()[0]) {
if (TAG_EXTERNAL == internal.getData()[0]) {
msg.addFirst(new ZFrame(PUBLISH_ID));
msg.send(tcpBackend);
} else if ('I' == internal.getData()[0]) {
} else if (TAG_INTERNAL == internal.getData()[0]) {
msg.addFirst(new ZFrame(PUBLISH_ID));
msg.send(inprocBackend);
}
Expand Down Expand Up @@ -251,7 +255,7 @@ public void run() {
worker.connect("inproc://broker");
while (run.get() && !Thread.currentThread().isInterrupted()) {
ZMsg msg = ZMsg.recvMsg(worker);
if ("start".equals(msg.getFirst().getString(ZMQ.CHARSET))) {
if (START.equals(msg.getFirst().getString(ZMQ.CHARSET))) {
// System.err.println("dealer (indirect): start pushing");
for (int requests = 0; requests < sampleSizePub; requests++) {
worker.send(SUB_TOPIC, ZMQ.SNDMORE);
Expand Down Expand Up @@ -313,7 +317,7 @@ public void run() {
worker.connect("tcp://localhost:5556");
while (run.get() && !Thread.currentThread().isInterrupted()) {
ZMsg msg = ZMsg.recvMsg(worker);
if ("start".equals(msg.getFirst().getString(ZMQ.CHARSET))) {
if (START.equals(msg.getFirst().getString(ZMQ.CHARSET))) {
// System.err.println("dealer (indirect): start pushing");
for (int requests = 0; requests < sampleSizePub; requests++) {
worker.send(SUB_TOPIC, ZMQ.SNDMORE);
Expand All @@ -337,7 +341,7 @@ public void run() {
worker.bind("tcp://localhost:5558");
while (run.get() && !Thread.currentThread().isInterrupted()) {
ZMsg msg = ZMsg.recvMsg(worker);
if ("start".equals(msg.getFirst().getString(ZMQ.CHARSET))) {
if (START.equals(msg.getFirst().getString(ZMQ.CHARSET))) {
// System.err.println("dealer (direct): start pushing");
for (int requests = 0; requests < sampleSizePub; requests++) {
worker.send(SUB_TOPIC, ZMQ.SNDMORE);
Expand All @@ -358,7 +362,7 @@ public void run() {
Socket client = ctx.createSocket(SocketType.DEALER);
client.setHWM(0);
client.setIdentity(CLIENT_ID);
client.connect("tcp://localhost:5555");
client.connect(TCP_LOCALHOST_5555);

Socket subClient = ctx.createSocket(SocketType.SUB);
subClient.setHWM(0);
Expand Down Expand Up @@ -418,24 +422,24 @@ public void run() {
});
subClient.unsubscribe(SUB_TOPIC.getBytes(ZMQ.CHARSET));

client.disconnect("tcp://localhost:5555");
client.disconnect(TCP_LOCALHOST_5555);
client.setIdentity(SUBSCRIBER_ID);
client.connect("tcp://localhost:5555");
ZMsg.newStringMsg("start").addFirst("E").send(client);
client.connect(TCP_LOCALHOST_5555);
ZMsg.newStringMsg(START).addFirst("E").send(client);
measure("Subscription (DEALER) test (TCP)", sampleSizePub, () -> {
ZMsg req = ZMsg.recvMsg(client);
req.destroy();
});

ZMsg.newStringMsg("start").addFirst("I").send(client);
ZMsg.newStringMsg(START).addFirst("I").send(client);
measure("Subscription (DEALER) test (InProc)", sampleSizePub, () -> {
ZMsg req = ZMsg.recvMsg(client);
req.destroy();
});

client.disconnect("tcp://localhost:5555");
client.disconnect(TCP_LOCALHOST_5555);
client.connect("tcp://localhost:5558");
ZMsg.newStringMsg("start").send(client);
ZMsg.newStringMsg(START).send(client);
measure("Subscription (direct DEALER) test", sampleSizePub, () -> {
ZMsg req = ZMsg.recvMsg(client);
req.destroy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.LockSupport;

import io.opencmw.utils.Cache;
Expand Down Expand Up @@ -33,7 +32,7 @@ public class DemuxEventDispatcher implements SequenceReportingEventHandler<TestE
private final List<AggregationHandler> freeWorkers = Collections.synchronizedList(new ArrayList<>(N_WORKERS));
private final RingBuffer<TestEventSource.IngestedEvent> rb;
// private Map<Long, Object> aggregatedBpcts = new SoftHashMap<>(RETENTION_SIZE);
private Map<Long, Object> aggregatedBpcts = new Cache<>(RETENTION_SIZE);
private final Cache<Long, Object> aggregatedBpcts = new Cache<>(RETENTION_SIZE);
private Sequence seq;

public DemuxEventDispatcher(final RingBuffer<TestEventSource.IngestedEvent> ringBuffer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@
/**
* Majordomo Protocol client example. Uses the mdcli API to hide all OpenCmwProtocol aspects
*/
public class ClientSampleV1 {
public final class ClientSampleV1 { // nomen est omen
private static final int N_SAMPLES = 50_000;

private ClientSampleV1() {
// requires only static methods for testing
}

public static void main(String[] args) {
MajordomoClientV1 clientSession = new MajordomoClientV1("tcp://localhost:5555", "customClientName");
final byte[] serviceBytes = "mmi.echo".getBytes(StandardCharsets.UTF_8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,13 @@
* all OpenCmwProtocol aspects
*/

public class ClientSampleV2 {
public final class ClientSampleV2 { // NOPMD -- nomen est omen
private static final int N_SAMPLES = 1_000_000;

private ClientSampleV2() {
// requires only static methods for testing
}

public static void main(String[] args) {
MajordomoClientV2 clientSession = new MajordomoClientV2("tcp://localhost:5555");
final byte[] serviceBytes = "mmi.echo".getBytes(StandardCharsets.UTF_8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
*
* default client time-out [s] is set by system property: 'OpenCMW.clientTimeOut' // default: 3600 [s] -- after which unanswered client messages and infos are being deleted
*
*/
@SuppressWarnings("PMD.DoNotUseThreads")
*/
@SuppressWarnings({ "PMD.DoNotUseThreads", "PMD.TooManyMethods", "PMD.GodClass", "PMD.UseConcurrentHashMap" }) // this is a concept, HashMap invoked in single-threaded context
public class MajordomoBroker extends Thread {
private static final Logger LOGGER = LoggerFactory.getLogger(MajordomoBroker.class);
private static final byte[] INTERNAL_SENDER_ID = null;
Expand All @@ -50,7 +50,7 @@ public class MajordomoBroker extends Thread {
private final Socket internalRouterSocket;
private final Socket internalServiceSocket;
private final List<Socket> routerSockets = new ArrayList<>(); // Sockets for clients & public external workers
private final AtomicBoolean run = new AtomicBoolean(false);
private final AtomicBoolean run = new AtomicBoolean(false); // NOPMD
private final SortedSet<RbacRole<?>> rbacRoles;
private final Map<String, Service> services = new HashMap<>(); // known services Map<'service name', Service>
private final Map<String, Worker> workers = new HashMap<>(); // known workers Map<addressHex, Worker>
Expand Down Expand Up @@ -174,7 +174,7 @@ public void run() {
}

@Override
public synchronized void start() {
public void start() {
run.set(true);
services.forEach((serviceName, service) -> service.internalWorkers.forEach(Thread::start));
super.start();
Expand Down Expand Up @@ -352,7 +352,7 @@ protected void processWorker(final Socket receiveSocket, final MdpWorkerMessage
/**
* Look for &amp; kill expired clients.
*/
protected /*synchronized*/ void purgeClients() {
protected void purgeClients() {
if (CLIENT_TIMEOUT <= 0) {
return;
}
Expand Down Expand Up @@ -508,7 +508,7 @@ protected class Service {
protected final byte[] nameBytes; // Service name as byte array
protected final MajordomoWorker mdpWorker;
protected final boolean isInternal;
protected final Map<RbacRole<?>, Queue<MdpClientMessage>> requests = new HashMap<>(); // RBAC-based queuing
protected final Map<RbacRole<?>, Queue<MdpClientMessage>> requests = new HashMap<>(); // NOPMD RBAC-based queuing - thread-safe use of HashMap
protected final Deque<Worker> waiting = new ArrayDeque<>(); // List of waiting workers
protected final List<Thread> internalWorkers = new ArrayList<>();
protected final Socket internalDispatchSocket;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ public class MajordomoClientV1 {
private static final AtomicInteger CLIENT_V1_INSTANCE = new AtomicInteger();
private final String uniqueID;
private final byte[] uniqueIdBytes;
private String broker;
private ZContext ctx;
private final String broker;
private final ZContext ctx;
private ZMQ.Socket clientSocket;
private long timeout = 2500;
private int retries = 3;
private Formatter log = new Formatter(System.out);
private final Formatter log = new Formatter(System.out);
private ZMQ.Poller poller;

public MajordomoClientV1(String broker, String clientName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void setTimeout(long timeout) {
/**
* Connect or reconnect to broker
*/
void reconnectToBroker() {
private void reconnectToBroker() {
if (clientSocket != null) {
clientSocket.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,10 @@ public MdpMessage(final byte[] senderID, final MdpSubProtocol protocol, final by

public byte[] getRbacFrame() {
if (hasRbackToken()) {
return payload[payload.length - 1];
final byte[] rbacFrame = payload[payload.length - 1];
return Arrays.copyOf(rbacFrame, rbacFrame.length);
}
return null;
return new byte[0];
}

public boolean hasRbackToken() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ protected void handleReceive() { // NOPMD -- single serial function .. easier to
}
liveness = HEARTBEAT_LIVENESS;
// Don't try to handle errors, just assert noisily
assert msg.payload.length > 0 : "MdpWorkerMessage payload is equal or less than zero: " + msg.payload.length;
assert msg.payload != null : "MdpWorkerMessage payload is null";
if (!(msg instanceof MdpWorkerMessage)) {
assert false : "msg is not instance of MdpWorkerMessage";
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@
* Majordomo Protocol worker example. Uses the mdwrk API to hide all OpenCmwProtocol aspects
*
*/
public class SimpleEchoServiceWorker {
public class SimpleEchoServiceWorker { // NOPMD - nomen est omen

private SimpleEchoServiceWorker() {
// private helper/test class
}

public static void main(String[] args) {
MajordomoWorker workerSession = new MajordomoWorker("tcp://localhost:5556", "echo", BasicRbacRole.ADMIN);
// workerSession.setDaemon(true); // use this if running in another app that controls threads
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
*/
@Deprecated(since = "2020")
public class CmwLightClient {
public static final String FAIR_SELECTOR_ALL = "FAIR.SELECTOR.ALL";
private final ZContext context = new ZContext();
private final ZMQ.Socket controlChannel;
private final AtomicInteger connectionState = new AtomicInteger(0);
Expand Down Expand Up @@ -187,7 +188,7 @@ public void subscribeSnoopFromDigitizer() {

connectClient();

subscribe("GSCD001", "SnoopTriggerEvents", "FAIR.SELECTOR.ALL");
subscribe("GSCD001", "SnoopTriggerEvents", FAIR_SELECTOR_ALL);

while (!Thread.currentThread().isInterrupted()) {
receiveData();
Expand Down Expand Up @@ -217,7 +218,7 @@ public void getLocal() {

sendHeartBeat();

get("testdevice", "unknownProp", "FAIR.SELECTOR.ALL");
get("testdevice", "unknownProp", FAIR_SELECTOR_ALL);

// return reply to data
while (true) {
Expand All @@ -228,12 +229,12 @@ public void getLocal() {
}
System.out.println("Received GET Exception");

get("testdevice", "testproperty", "FAIR.SELECTOR.ALL");
get("testdevice", "testproperty", FAIR_SELECTOR_ALL);

receiveData();
receiveData();

subscribe("testdevice", "testproperty", "FAIR.SELECTOR.ALL");
subscribe("testdevice", "testproperty", FAIR_SELECTOR_ALL);
// just return all data
while (!Thread.currentThread().isInterrupted()) {
receiveData();
Expand Down
Loading

0 comments on commit 2add8cd

Please sign in to comment.