lyf666 2017-03-24 15:37:37 +08:00
parent 0f37197cc3
commit 7ae06b2818
13 changed files with 356 additions and 1 deletions

View File

@ -6,7 +6,7 @@
<groupId>cn.cloudowr</groupId> <groupId>cn.cloudowr</groupId>
<artifactId>sdk</artifactId> <artifactId>sdk</artifactId>
<version>1.3.6</version> <version>1.4.12</version>
<build> <build>
<plugins> <plugins>
<plugin> <plugin>
@ -51,5 +51,11 @@
<version>3.4.2</version> <version>3.4.2</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>com.talent-aio</groupId>
<artifactId>talent-aio-common</artifactId>
<version>1.6.6.v20170318-RELEASE</version>
<scope>provided</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -0,0 +1,86 @@
package cn.cloudowr.sdk.log;
import cn.cloudowr.sdk.log.entity.HeartBeatMessage;
import cn.cloudowr.sdk.log.entity.JSONStringMessage;
import cn.cloudowr.sdk.log.entity.Message;
import com.talent.aio.common.ChannelContext;
import com.talent.aio.common.GroupContext;
import com.talent.aio.common.exception.AioDecodeException;
import com.talent.aio.common.intf.AioHandler;
import java.nio.ByteBuffer;
/**
* Created by lyf66 on 2017/3/24.
*/
public abstract class BaseAbsAioHandler implements AioHandler<Object, Message, Object>{
@Override
public ByteBuffer encode(Message message, GroupContext<Object, Message, Object> groupContext, ChannelContext<Object, Message, Object> channelContext) {
byte[] body = message.getBody();
int bodyLen = 0;
if (body != null) {
bodyLen = body.length;
}
int allLen = message.HEADER_LENGTH + bodyLen;
ByteBuffer buffer = ByteBuffer.allocate(allLen);
buffer.order(groupContext.getByteOrder());
buffer.putInt(message.getType());
buffer.putInt(bodyLen);
if (body != null) {
buffer.put(body);
}
return buffer;
}
@Override
public Message decode(ByteBuffer byteBuffer, ChannelContext<Object, Message, Object> channelContext) throws AioDecodeException {
int readableLen = byteBuffer.limit() - byteBuffer.position();
if (readableLen < Message.HEADER_LENGTH) {
return null;
}
int type = byteBuffer.getInt();
if (type < 0) {
throw new AioDecodeException("type [" + type + "] is not right, remote:" + channelContext.getClientNode());
}
int bodyLen = byteBuffer.getInt();
if (bodyLen < 0) {
throw new AioDecodeException("bodyLength [" + bodyLen + "] is not right, remote:" + channelContext.getClientNode());
}
int neededLen = Message.HEADER_LENGTH + bodyLen;
int test = readableLen - neededLen;
if (test < 0) {
return null;
} else {
Message packet = null;
packet = switchMessage(byteBuffer, type, bodyLen, packet);
if (packet == null) {
System.out.println("packet is not recognized, remote:" + channelContext.getClientNode());
}
return packet;
}
}
//todo 新增消息类型后在此添加消息体解析
private Message switchMessage(ByteBuffer byteBuffer, int type, int bodyLen, Message packet) {
switch (type) {
case Message.TYPE_HEART_BEAT:
packet = HeartBeatMessage.PACKET;
break;
case Message.TYPE_JSONO_STRING_LOG:
if (bodyLen > 0) {
byte[] dst = new byte[bodyLen];
byteBuffer.get(dst);
packet = new JSONStringMessage(dst);
}
break;
}
return packet;
}
}

View File

@ -0,0 +1,8 @@
package cn.cloudowr.sdk.log;
/**
* Created by lyf66 on 2017/3/24.
*/
public class Const {
public static final int PORT = 13579;
}

View File

@ -0,0 +1,8 @@
package cn.cloudowr.sdk.log;
/**
* Created by lyf66 on 2017/3/24.
*/
public interface Handler {
void handle(String msg);
}

View File

@ -0,0 +1,54 @@
package cn.cloudowr.sdk.log;
import cn.cloudowr.sdk.log.client.BaseClientAioHandler;
import cn.cloudowr.sdk.log.entity.JSONStringMessage;
import cn.cloudowr.sdk.log.entity.Message;
import cn.cloudowr.sdk.log.server.HandlerChain;
import com.talent.aio.client.AioClient;
import com.talent.aio.client.ClientChannelContext;
import com.talent.aio.client.ClientGroupContext;
import com.talent.aio.client.ReconnConf;
import com.talent.aio.client.intf.ClientAioHandler;
import com.talent.aio.client.intf.ClientAioListener;
import com.talent.aio.common.Aio;
import com.talent.aio.common.Node;
import java.io.UnsupportedEncodingException;
/**
* Created by lyf66 on 2017/3/20.
*/
public class Logger {
private static Node serverNode = null;
private static AioClient<Object, Message, Object> aioClient;
private static ClientGroupContext<Object, Message, Object> clientGroupContext = null;
private static ClientAioHandler<Object, Message, Object> aioClientHandler = null;
private static ClientAioListener<Object, Message, Object> aioListener = null;
//用来自动连接的不想自动连接请设为null
private static ReconnConf<Object, Message, Object> reconnConf = new ReconnConf<Object, Message, Object>(5000L);
private static ClientChannelContext<Object, Message, Object> clientChannelContext;
public static void initLogger() throws Exception {
String serverIp = "log.cloudowr.cn";
int serverPort = Const.PORT;
serverNode = new Node(serverIp, serverPort);
aioClientHandler = new BaseClientAioHandler();
aioListener = null;
clientGroupContext = new ClientGroupContext<>(aioClientHandler, aioListener, reconnConf);
aioClient = new AioClient<>(clientGroupContext);
clientChannelContext = aioClient.connect(serverNode);
}
public static void log(String msg) {
try {
Message packet = new JSONStringMessage(msg.getBytes(Message.CHARSET));
Aio.send(clientChannelContext, packet);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,21 @@
package cn.cloudowr.sdk.log;
import cn.cloudowr.sdk.log.server.LoggerServer;
import java.io.IOException;
/**
* Created by lyf66 on 2017/3/24.
*/
public class Main {
public static void main(String[] args) throws IOException {
LoggerServer server = new LoggerServer();
server.addHandler(new Handler() {
@Override
public void handle(String msg) {
System.out.println(msg);
}
});
server.start();
}
}

View File

@ -0,0 +1,24 @@
package cn.cloudowr.sdk.log.client;
import cn.cloudowr.sdk.log.BaseAbsAioHandler;
import cn.cloudowr.sdk.log.entity.HeartBeatMessage;
import cn.cloudowr.sdk.log.entity.Message;
import com.talent.aio.client.intf.ClientAioHandler;
import com.talent.aio.common.ChannelContext;
/**
* Created by lyf66 on 2017/3/24.
*/
public class BaseClientAioHandler extends BaseAbsAioHandler implements ClientAioHandler<Object, Message, Object> {
@Override
public Message heartbeatPacket() {
return HeartBeatMessage.PACKET;
}
@Override
public Object handler(Message packet, ChannelContext<Object, Message, Object> channelContext) throws Exception {
return null;
}
}

View File

@ -0,0 +1,17 @@
package cn.cloudowr.sdk.log.entity;
/**
* Created by lyf66 on 2017/3/24.
*/
public class HeartBeatMessage extends Message {
public static final HeartBeatMessage PACKET = new HeartBeatMessage();
private HeartBeatMessage() {}
public byte[] getBody() {
return new byte[]{0};
}
@Override
public int getType() {
return TYPE_HEART_BEAT;
}
}

View File

@ -0,0 +1,23 @@
package cn.cloudowr.sdk.log.entity;
/**
* Created by lyf66 on 2017/3/24.
*/
public class JSONStringMessage extends Message {
public JSONStringMessage(byte[] body) {
this.body = body;
}
private byte[] body;
@Override
public byte[] getBody() {
return body;
}
@Override
public int getType() {
return TYPE_JSONO_STRING_LOG;
}
}

View File

@ -0,0 +1,16 @@
package cn.cloudowr.sdk.log.entity;
import com.talent.aio.common.intf.Packet;
/**
* Created by lyf66 on 2017/3/24.
*/
public abstract class Message extends Packet {
public static final int TYPE_HEART_BEAT = 0;
public static final int TYPE_JSONO_STRING_LOG = 1;
public static final int HEADER_LENGTH = 8;
public static final String CHARSET = "utf-8";
public abstract byte[] getBody();
public abstract int getType();
}

View File

@ -0,0 +1,23 @@
package cn.cloudowr.sdk.log.server;
import cn.cloudowr.sdk.log.BaseAbsAioHandler;
import cn.cloudowr.sdk.log.entity.Message;
import com.talent.aio.common.ChannelContext;
import com.talent.aio.server.intf.ServerAioHandler;
/**
* Created by lyf66 on 2017/3/24.
*/
public class BaseServerAioHandler extends BaseAbsAioHandler implements ServerAioHandler<Object, Message, Object> {
@Override
public Object handler(Message message, ChannelContext<Object, Message, Object> channelContext) throws Exception {
byte[] body = message.getBody();
if (body != null) {
String str = new String(body, Message.CHARSET);
LoggerServer.getChain().next(str);
}
return null;
}
}

View File

@ -0,0 +1,22 @@
package cn.cloudowr.sdk.log.server;
import cn.cloudowr.sdk.log.Handler;
import java.util.LinkedList;
/**
* Created by lyf66 on 2017/3/24.
*/
public class HandlerChain {
LinkedList<Handler> handlers = new LinkedList<>();
public void next(String str) {
for (Handler handler : handlers) {
handler.handle(str);
}
}
public LinkedList<Handler> getHandlers() {
return handlers;
}
}

View File

@ -0,0 +1,47 @@
package cn.cloudowr.sdk.log.server;
import cn.cloudowr.sdk.log.Const;
import cn.cloudowr.sdk.log.Handler;
import cn.cloudowr.sdk.log.entity.Message;
import com.talent.aio.common.ChannelContext;
import com.talent.aio.server.AioServer;
import com.talent.aio.server.ServerGroupContext;
import com.talent.aio.server.intf.ServerAioHandler;
import com.talent.aio.server.intf.ServerAioListener;
import java.io.IOException;
/**
* Created by lyf66 on 2017/3/24.
*/
public class LoggerServer {
private ServerGroupContext<Object, Message, Object> serverGroupContext = null;
private AioServer<Object, Message, Object> aioServer = null; //可以为空
private ServerAioHandler<Object, Message, Object> aioHandler = null;
private ServerAioListener<Object, Message, Object> aioListener = null;
private static HandlerChain chain;
public static HandlerChain getChain() {
return chain;
}
public void addHandler(Handler handler) {
chain.handlers.add(handler);
}
public LoggerServer() {
chain = new HandlerChain();
}
public void start() throws IOException {
aioHandler = new BaseServerAioHandler();
serverGroupContext = new ServerGroupContext<>(aioHandler, aioListener);
aioServer = new AioServer<>(serverGroupContext);
aioServer.start("0.0.0.0", Const.PORT);
}
public void stop() {
aioServer.stop();
chain = null;
}
}