From 7ae06b28189455c79ed07f6f8a271569b0aded9f Mon Sep 17 00:00:00 2001 From: lyf666 Date: Fri, 24 Mar 2017 15:37:37 +0800 Subject: [PATCH] . --- pom.xml | 8 +- .../cloudowr/sdk/log/BaseAbsAioHandler.java | 86 +++++++++++++++++++ src/main/java/cn/cloudowr/sdk/log/Const.java | 8 ++ .../java/cn/cloudowr/sdk/log/Handler.java | 8 ++ src/main/java/cn/cloudowr/sdk/log/Logger.java | 54 ++++++++++++ src/main/java/cn/cloudowr/sdk/log/Main.java | 21 +++++ .../sdk/log/client/BaseClientAioHandler.java | 24 ++++++ .../sdk/log/entity/HeartBeatMessage.java | 17 ++++ .../sdk/log/entity/JSONStringMessage.java | 23 +++++ .../cn/cloudowr/sdk/log/entity/Message.java | 16 ++++ .../sdk/log/server/BaseServerAioHandler.java | 23 +++++ .../cloudowr/sdk/log/server/HandlerChain.java | 22 +++++ .../cloudowr/sdk/log/server/LoggerServer.java | 47 ++++++++++ 13 files changed, 356 insertions(+), 1 deletion(-) create mode 100644 src/main/java/cn/cloudowr/sdk/log/BaseAbsAioHandler.java create mode 100644 src/main/java/cn/cloudowr/sdk/log/Const.java create mode 100644 src/main/java/cn/cloudowr/sdk/log/Handler.java create mode 100644 src/main/java/cn/cloudowr/sdk/log/Logger.java create mode 100644 src/main/java/cn/cloudowr/sdk/log/Main.java create mode 100644 src/main/java/cn/cloudowr/sdk/log/client/BaseClientAioHandler.java create mode 100644 src/main/java/cn/cloudowr/sdk/log/entity/HeartBeatMessage.java create mode 100644 src/main/java/cn/cloudowr/sdk/log/entity/JSONStringMessage.java create mode 100644 src/main/java/cn/cloudowr/sdk/log/entity/Message.java create mode 100644 src/main/java/cn/cloudowr/sdk/log/server/BaseServerAioHandler.java create mode 100644 src/main/java/cn/cloudowr/sdk/log/server/HandlerChain.java create mode 100644 src/main/java/cn/cloudowr/sdk/log/server/LoggerServer.java diff --git a/pom.xml b/pom.xml index a896bfa..7c8006f 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ cn.cloudowr sdk - 1.3.6 + 1.4.12 @@ -51,5 +51,11 @@ 3.4.2 provided + + com.talent-aio + talent-aio-common + 1.6.6.v20170318-RELEASE + provided + \ No newline at end of file diff --git a/src/main/java/cn/cloudowr/sdk/log/BaseAbsAioHandler.java b/src/main/java/cn/cloudowr/sdk/log/BaseAbsAioHandler.java new file mode 100644 index 0000000..93c31d6 --- /dev/null +++ b/src/main/java/cn/cloudowr/sdk/log/BaseAbsAioHandler.java @@ -0,0 +1,86 @@ +package cn.cloudowr.sdk.log; + +import cn.cloudowr.sdk.log.entity.HeartBeatMessage; +import cn.cloudowr.sdk.log.entity.JSONStringMessage; +import cn.cloudowr.sdk.log.entity.Message; +import com.talent.aio.common.ChannelContext; +import com.talent.aio.common.GroupContext; +import com.talent.aio.common.exception.AioDecodeException; +import com.talent.aio.common.intf.AioHandler; + +import java.nio.ByteBuffer; + +/** + * Created by lyf66 on 2017/3/24. + */ +public abstract class BaseAbsAioHandler implements AioHandler{ + + @Override + public ByteBuffer encode(Message message, GroupContext groupContext, ChannelContext channelContext) { + byte[] body = message.getBody(); + int bodyLen = 0; + if (body != null) { + bodyLen = body.length; + } + + int allLen = message.HEADER_LENGTH + bodyLen; + ByteBuffer buffer = ByteBuffer.allocate(allLen); + buffer.order(groupContext.getByteOrder()); + + buffer.putInt(message.getType()); + buffer.putInt(bodyLen); + + if (body != null) { + buffer.put(body); + } + return buffer; + } + + @Override + public Message decode(ByteBuffer byteBuffer, ChannelContext channelContext) throws AioDecodeException { + int readableLen = byteBuffer.limit() - byteBuffer.position(); + if (readableLen < Message.HEADER_LENGTH) { + return null; + } + + int type = byteBuffer.getInt(); + if (type < 0) { + throw new AioDecodeException("type [" + type + "] is not right, remote:" + channelContext.getClientNode()); + } + + int bodyLen = byteBuffer.getInt(); + if (bodyLen < 0) { + throw new AioDecodeException("bodyLength [" + bodyLen + "] is not right, remote:" + channelContext.getClientNode()); + } + + int neededLen = Message.HEADER_LENGTH + bodyLen; + int test = readableLen - neededLen; + if (test < 0) { + return null; + } else { + Message packet = null; + packet = switchMessage(byteBuffer, type, bodyLen, packet); + if (packet == null) { + System.out.println("packet is not recognized, remote:" + channelContext.getClientNode()); + } + return packet; + } + } + + //todo 新增消息类型后在此添加消息体解析 + private Message switchMessage(ByteBuffer byteBuffer, int type, int bodyLen, Message packet) { + switch (type) { + case Message.TYPE_HEART_BEAT: + packet = HeartBeatMessage.PACKET; + break; + case Message.TYPE_JSONO_STRING_LOG: + if (bodyLen > 0) { + byte[] dst = new byte[bodyLen]; + byteBuffer.get(dst); + packet = new JSONStringMessage(dst); + } + break; + } + return packet; + } +} diff --git a/src/main/java/cn/cloudowr/sdk/log/Const.java b/src/main/java/cn/cloudowr/sdk/log/Const.java new file mode 100644 index 0000000..de3ac74 --- /dev/null +++ b/src/main/java/cn/cloudowr/sdk/log/Const.java @@ -0,0 +1,8 @@ +package cn.cloudowr.sdk.log; + +/** + * Created by lyf66 on 2017/3/24. + */ +public class Const { + public static final int PORT = 13579; +} diff --git a/src/main/java/cn/cloudowr/sdk/log/Handler.java b/src/main/java/cn/cloudowr/sdk/log/Handler.java new file mode 100644 index 0000000..74172e1 --- /dev/null +++ b/src/main/java/cn/cloudowr/sdk/log/Handler.java @@ -0,0 +1,8 @@ +package cn.cloudowr.sdk.log; + +/** + * Created by lyf66 on 2017/3/24. + */ +public interface Handler { + void handle(String msg); +} diff --git a/src/main/java/cn/cloudowr/sdk/log/Logger.java b/src/main/java/cn/cloudowr/sdk/log/Logger.java new file mode 100644 index 0000000..ec489ed --- /dev/null +++ b/src/main/java/cn/cloudowr/sdk/log/Logger.java @@ -0,0 +1,54 @@ +package cn.cloudowr.sdk.log; + +import cn.cloudowr.sdk.log.client.BaseClientAioHandler; +import cn.cloudowr.sdk.log.entity.JSONStringMessage; +import cn.cloudowr.sdk.log.entity.Message; +import cn.cloudowr.sdk.log.server.HandlerChain; +import com.talent.aio.client.AioClient; +import com.talent.aio.client.ClientChannelContext; +import com.talent.aio.client.ClientGroupContext; +import com.talent.aio.client.ReconnConf; +import com.talent.aio.client.intf.ClientAioHandler; +import com.talent.aio.client.intf.ClientAioListener; +import com.talent.aio.common.Aio; +import com.talent.aio.common.Node; + +import java.io.UnsupportedEncodingException; + +/** + * Created by lyf66 on 2017/3/20. + */ +public class Logger { + private static Node serverNode = null; + private static AioClient aioClient; + private static ClientGroupContext clientGroupContext = null; + private static ClientAioHandler aioClientHandler = null; + private static ClientAioListener aioListener = null; + + //用来自动连接的,不想自动连接请设为null + private static ReconnConf reconnConf = new ReconnConf(5000L); + private static ClientChannelContext clientChannelContext; + + public static void initLogger() throws Exception { + String serverIp = "log.cloudowr.cn"; + int serverPort = Const.PORT; + serverNode = new Node(serverIp, serverPort); + aioClientHandler = new BaseClientAioHandler(); + aioListener = null; + + clientGroupContext = new ClientGroupContext<>(aioClientHandler, aioListener, reconnConf); + aioClient = new AioClient<>(clientGroupContext); + + clientChannelContext = aioClient.connect(serverNode); + } + + public static void log(String msg) { + try { + Message packet = new JSONStringMessage(msg.getBytes(Message.CHARSET)); + Aio.send(clientChannelContext, packet); + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + } + } + +} diff --git a/src/main/java/cn/cloudowr/sdk/log/Main.java b/src/main/java/cn/cloudowr/sdk/log/Main.java new file mode 100644 index 0000000..b50332a --- /dev/null +++ b/src/main/java/cn/cloudowr/sdk/log/Main.java @@ -0,0 +1,21 @@ +package cn.cloudowr.sdk.log; + +import cn.cloudowr.sdk.log.server.LoggerServer; + +import java.io.IOException; + +/** + * Created by lyf66 on 2017/3/24. + */ +public class Main { + public static void main(String[] args) throws IOException { + LoggerServer server = new LoggerServer(); + server.addHandler(new Handler() { + @Override + public void handle(String msg) { + System.out.println(msg); + } + }); + server.start(); + } +} diff --git a/src/main/java/cn/cloudowr/sdk/log/client/BaseClientAioHandler.java b/src/main/java/cn/cloudowr/sdk/log/client/BaseClientAioHandler.java new file mode 100644 index 0000000..661de00 --- /dev/null +++ b/src/main/java/cn/cloudowr/sdk/log/client/BaseClientAioHandler.java @@ -0,0 +1,24 @@ +package cn.cloudowr.sdk.log.client; + +import cn.cloudowr.sdk.log.BaseAbsAioHandler; +import cn.cloudowr.sdk.log.entity.HeartBeatMessage; +import cn.cloudowr.sdk.log.entity.Message; +import com.talent.aio.client.intf.ClientAioHandler; +import com.talent.aio.common.ChannelContext; + +/** + * Created by lyf66 on 2017/3/24. + */ +public class BaseClientAioHandler extends BaseAbsAioHandler implements ClientAioHandler { + + @Override + public Message heartbeatPacket() { + return HeartBeatMessage.PACKET; + } + + @Override + public Object handler(Message packet, ChannelContext channelContext) throws Exception { + + return null; + } +} diff --git a/src/main/java/cn/cloudowr/sdk/log/entity/HeartBeatMessage.java b/src/main/java/cn/cloudowr/sdk/log/entity/HeartBeatMessage.java new file mode 100644 index 0000000..a37866b --- /dev/null +++ b/src/main/java/cn/cloudowr/sdk/log/entity/HeartBeatMessage.java @@ -0,0 +1,17 @@ +package cn.cloudowr.sdk.log.entity; + +/** + * Created by lyf66 on 2017/3/24. + */ +public class HeartBeatMessage extends Message { + public static final HeartBeatMessage PACKET = new HeartBeatMessage(); + private HeartBeatMessage() {} + public byte[] getBody() { + return new byte[]{0}; + } + + @Override + public int getType() { + return TYPE_HEART_BEAT; + } +} diff --git a/src/main/java/cn/cloudowr/sdk/log/entity/JSONStringMessage.java b/src/main/java/cn/cloudowr/sdk/log/entity/JSONStringMessage.java new file mode 100644 index 0000000..d12c2fe --- /dev/null +++ b/src/main/java/cn/cloudowr/sdk/log/entity/JSONStringMessage.java @@ -0,0 +1,23 @@ +package cn.cloudowr.sdk.log.entity; + +/** + * Created by lyf66 on 2017/3/24. + */ +public class JSONStringMessage extends Message { + + public JSONStringMessage(byte[] body) { + this.body = body; + } + + private byte[] body; + + @Override + public byte[] getBody() { + return body; + } + + @Override + public int getType() { + return TYPE_JSONO_STRING_LOG; + } +} diff --git a/src/main/java/cn/cloudowr/sdk/log/entity/Message.java b/src/main/java/cn/cloudowr/sdk/log/entity/Message.java new file mode 100644 index 0000000..21c4d3a --- /dev/null +++ b/src/main/java/cn/cloudowr/sdk/log/entity/Message.java @@ -0,0 +1,16 @@ +package cn.cloudowr.sdk.log.entity; + +import com.talent.aio.common.intf.Packet; + +/** + * Created by lyf66 on 2017/3/24. + */ +public abstract class Message extends Packet { + public static final int TYPE_HEART_BEAT = 0; + public static final int TYPE_JSONO_STRING_LOG = 1; + + public static final int HEADER_LENGTH = 8; + public static final String CHARSET = "utf-8"; + public abstract byte[] getBody(); + public abstract int getType(); +} diff --git a/src/main/java/cn/cloudowr/sdk/log/server/BaseServerAioHandler.java b/src/main/java/cn/cloudowr/sdk/log/server/BaseServerAioHandler.java new file mode 100644 index 0000000..18d4671 --- /dev/null +++ b/src/main/java/cn/cloudowr/sdk/log/server/BaseServerAioHandler.java @@ -0,0 +1,23 @@ +package cn.cloudowr.sdk.log.server; + +import cn.cloudowr.sdk.log.BaseAbsAioHandler; +import cn.cloudowr.sdk.log.entity.Message; +import com.talent.aio.common.ChannelContext; +import com.talent.aio.server.intf.ServerAioHandler; + +/** + * Created by lyf66 on 2017/3/24. + */ +public class BaseServerAioHandler extends BaseAbsAioHandler implements ServerAioHandler { + + + @Override + public Object handler(Message message, ChannelContext channelContext) throws Exception { + byte[] body = message.getBody(); + if (body != null) { + String str = new String(body, Message.CHARSET); + LoggerServer.getChain().next(str); + } + return null; + } +} diff --git a/src/main/java/cn/cloudowr/sdk/log/server/HandlerChain.java b/src/main/java/cn/cloudowr/sdk/log/server/HandlerChain.java new file mode 100644 index 0000000..10e2f28 --- /dev/null +++ b/src/main/java/cn/cloudowr/sdk/log/server/HandlerChain.java @@ -0,0 +1,22 @@ +package cn.cloudowr.sdk.log.server; + +import cn.cloudowr.sdk.log.Handler; + +import java.util.LinkedList; + +/** + * Created by lyf66 on 2017/3/24. + */ +public class HandlerChain { + LinkedList handlers = new LinkedList<>(); + + public void next(String str) { + for (Handler handler : handlers) { + handler.handle(str); + } + } + + public LinkedList getHandlers() { + return handlers; + } +} diff --git a/src/main/java/cn/cloudowr/sdk/log/server/LoggerServer.java b/src/main/java/cn/cloudowr/sdk/log/server/LoggerServer.java new file mode 100644 index 0000000..cfc5d16 --- /dev/null +++ b/src/main/java/cn/cloudowr/sdk/log/server/LoggerServer.java @@ -0,0 +1,47 @@ +package cn.cloudowr.sdk.log.server; + +import cn.cloudowr.sdk.log.Const; +import cn.cloudowr.sdk.log.Handler; +import cn.cloudowr.sdk.log.entity.Message; +import com.talent.aio.common.ChannelContext; +import com.talent.aio.server.AioServer; +import com.talent.aio.server.ServerGroupContext; +import com.talent.aio.server.intf.ServerAioHandler; +import com.talent.aio.server.intf.ServerAioListener; + +import java.io.IOException; + +/** + * Created by lyf66 on 2017/3/24. + */ +public class LoggerServer { + private ServerGroupContext serverGroupContext = null; + private AioServer aioServer = null; //可以为空 + private ServerAioHandler aioHandler = null; + private ServerAioListener aioListener = null; + private static HandlerChain chain; + + public static HandlerChain getChain() { + return chain; + } + + public void addHandler(Handler handler) { + chain.handlers.add(handler); + } + + public LoggerServer() { + chain = new HandlerChain(); + } + + public void start() throws IOException { + aioHandler = new BaseServerAioHandler(); + serverGroupContext = new ServerGroupContext<>(aioHandler, aioListener); + aioServer = new AioServer<>(serverGroupContext); + aioServer.start("0.0.0.0", Const.PORT); + } + + public void stop() { + aioServer.stop(); + chain = null; + } +}