Skip to content

Commit

Permalink
Merge pull request #619 from bytedance/fix-no-data
Browse files Browse the repository at this point in the history
Resolving the issue of no data reporting in the BOE environment due t…
  • Loading branch information
yoloyyh authored May 23, 2024
2 parents 7e0de6d + 62b9920 commit e9f9e40
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,63 +47,3 @@ public void setData(JsonNode data) {
this.data = data;
}
}


class MessageDeserializer extends StdDeserializer<Message> {
protected MessageDeserializer() {
super(Message.class);
}

protected MessageDeserializer(Class<?> vc) {
super(vc);
}

@Override
public Message deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
JsonNode node = p.getCodec().readTree(p);

Message message = new Message();

message.setOperate(node.get("message_type").asInt());
message.setData(node.get("data"));

return message;
}
}

class MessageEncoder extends MessageToByteEncoder<Object> {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();

byte[] payload = objectMapper.writeValueAsBytes(msg);
int payloadSize = payload.length;

ByteBuffer buffer = ByteBuffer.allocate(payloadSize + Message.PROTOCOL_HEADER_SIZE);

buffer.putInt(payloadSize);
buffer.put(payload);

buffer.flip();

out.writeBytes(buffer);
}
}

class MessageDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws IOException {
long payloadSize = in.readUnsignedInt();

if (payloadSize > Message.MAX_PAYLOAD_SIZE)
return;

byte[] buffer = new byte[(int) payloadSize];
in.readBytes(buffer);

Message message = new ObjectMapper().readValue(buffer, Message.class);

if (message != null)
out.add(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.security.smith.client;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.handler.codec.ReplayingDecoder;
import io.netty.channel.ChannelHandlerContext;
import io.netty.buffer.ByteBuf;
import java.util.List;
import java.io.IOException;

public class MessageDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws IOException {
long payloadSize = in.readUnsignedInt();

if (payloadSize > Message.MAX_PAYLOAD_SIZE)
return;

byte[] buffer = new byte[(int) payloadSize];
in.readBytes(buffer);

Message message = new ObjectMapper().readValue(buffer, Message.class);

if (message != null)
out.add(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.security.smith.client;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;

public class MessageDeserializer extends StdDeserializer<Message> {
protected MessageDeserializer() {
super(Message.class);
}

protected MessageDeserializer(Class<?> vc) {
super(vc);
}

@Override
public Message deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
JsonNode node = p.getCodec().readTree(p);

Message message = new Message();

message.setOperate(node.get("message_type").asInt());
message.setData(node.get("data"));

return message;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.security.smith.client;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import java.nio.ByteBuffer;

public class MessageEncoder extends MessageToByteEncoder<Object> {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();

byte[] payload = objectMapper.writeValueAsBytes(msg);
int payloadSize = payload.length;

ByteBuffer buffer = ByteBuffer.allocate(payloadSize + Message.PROTOCOL_HEADER_SIZE);

buffer.putInt(payloadSize);
buffer.put(payload);

buffer.flip();

out.writeBytes(buffer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ public class MessageSerializer extends StdSerializer<Message> {
static {
pid = ProcessHelper.getCurrentPID();
jvmVersion = ManagementFactory.getRuntimeMXBean().getSpecVersion();
probeVersion = MessageSerializer.class.getPackage().getImplementationVersion();
}

public static void setProbeVersion(String probeVer) {
Expand Down

0 comments on commit e9f9e40

Please sign in to comment.