小小小丶盘子

实现App客户端(Socket)和Web端(WebSocket)消息互通

发表于:2019-5-12 18:02:18 点击量:0 赞:未知

前言

在实际的业务场景中,尤其是 IM存在多客户端的情况,例如 手机端,PC/Web 端同时登录。那么就存在一个问题,WebSocket 端口和Socket端口不一致,如何让他们互相通信呢?

解决思路

首先能够想到的就是,只要我们拿到ServerGroupContext其实就可以发送消息了,在初次使用t-io的时候我也是这么做的。代码如下

//这是一个 HTTP 接口,通过获取 WebSocketStarter 的 ServerGroupContext 来实现发送
@Request(HttpMethod.GET,value="/send")
public void sendByHttp(String to,String msg){
   ServerGroupContext context = TioWsServer.starter.getServerGroupContext();
   Tio.sendToUser(context,to,Packet(msg));
}

不过,今天要介绍的是t-io已经内置的方法:ServerGroupContext.share()

public void share(GroupContext groupContext) {
        this.clientNodes = groupContext.clientNodes;
        this.connections = groupContext.connections;
        this.groups = groupContext.groups;
        this.users = groupContext.users;
        this.tokens = groupContext.tokens;
        this.ids = groupContext.ids;
        this.bsIds = groupContext.bsIds;
        this.ipBlacklist = groupContext.ipBlacklist;
        this.ips = groupContext.ips;
    }

DEMO 演示

定义一个简单的ImPacket

public class ImPacket extends Packet {

    public static final ImPacket newPacket(byte[] body){
        ImPacket packet = new ImPacket();
        packet.setBody(body);
        return packet;
    }

    public void setBody(byte[] body){
        this.body = body;
    }

    public byte[] getBody() {
        return body;
    }

    private byte[] body;
}

客户端服务端编解码都是一样的

 public static Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext) throws AioDecodeException {
        //四位的 bodyLength + body
        int bodyLength = buffer.getInt();
        if (bodyLength <= 0) {
            return null;
        }
        //长度不够解包,返回 NULL
        if (buffer.remaining() < bodyLength) {
            return null;
        }
        byte[] body = new byte[bodyLength];
        buffer.get(body);
        return ImPacket.newPacket(body);
    }

    public static ByteBuffer encode(Packet packet, GroupContext groupContext, ChannelContext channelContext) {
        ImPacket packet1 = (ImPacket) packet;
        int bodyLength = packet1.getBody().length;
        ByteBuffer buffer = ByteBuffer.allocate(4 + bodyLength);
        buffer.putInt(bodyLength);
        buffer.put(packet1.getBody());
        return buffer;
    }

SocketServer 端

public class TioSocketServer {

    private  ServerGroupContext serverGroupContext;

    public ServerGroupContext getServerGroupContext() {
        return serverGroupContext;
    }

    public void start() throws IOException {
        serverGroupContext = new ServerGroupContext("Tio Socket Server", new ImSocketServerAioHandler(), new ImSocketServerAioListener(), Threads.getTioExecutor(), Threads.getGroupExecutor());
        TioServer server = new TioServer(serverGroupContext);
        server.start("127.0.0.1", 8899);
    }
}

SocketClient 端

private static void start() throws Exception {
        ClientGroupContext clientGroupContext = new ClientGroupContext(new ImSocketClientAioHandler(), new DefaultClientAioListerner(), new ReconnConf(2000), Threads.getTioExecutor(), Threads.getGroupExecutor());
        TioClient client = new TioClient(clientGroupContext);
        clientChannelContext = client.connect(new Node("127.0.0.1", 8899));
    }

WebSocketServer 端

   private ServerGroupContext sharedServerGroupContext;
    //这里将 SocketServer的ServerGroupContext传入
    public TioWsServer(ServerGroupContext groupContext){
        this.sharedServerGroupContext = groupContext;
    }

    private ServerGroupContext wsServerGroupContext;
    public void start() throws Exception{
        WsServerConfig config = new WsServerConfig(9000);
        WsServerStarter wsServerStarter = new WsServerStarter(config,new ImWsHandler(), Threads.getTioExecutor(),Threads.getGroupExecutor());

        wsServerGroupContext = wsServerStarter.getServerGroupContext();
        //核心代码:共享GroupContext
        wsServerGroupContext.share(sharedServerGroupContext);
        //协议转换器,因为发送的消息包是 ImPacket,可以通过实现PacketConverter的convert方法将普通包转换为WsPacket包。
        wsServerGroupContext.packetConverter = new ImPackageConvert();
        wsServerStarter.start();
    }

PacketConverter 代码

public class ImPackageConvert implements PacketConverter {
    /**
     * @param packet
     * @param channelContext 要发往的channelContext
     * @return
     * @author tanyaowu
     */
    @Override
    public Packet convert(Packet packet, ChannelContext channelContext) {
        if (packet instanceof ImPacket){
            ImPacket p = (ImPacket) packet;
            WsResponse wsResponse = new WsResponse();
            wsResponse.setWsOpcode(Opcode.TEXT);
            wsResponse.setBody(p.getBody());
            try {
                wsResponse.setWsBodyText(new String(p.getBody(), "utf-8"));
            }catch (UnsupportedEncodingException e){
                e.printStackTrace();
            }
            return wsResponse;
        }
        return packet;
    }

发送消息的时候,和平时的用法一致即可,这里的ServerGroupContext可以是 SocketServer.getServerGroupContext(),也可以是WebSocketServer.getServerGroupContext()

 Tio.sendToUser(serverGroupContext, "userId", packet);

启动两个服务端,一个客户端,一个浏览器。
SOCKET绑定用户为:SOCKET-1,WEBSOCKET绑定用户为WEBSOCKET-1.

演示


TCP连接数:, IP数:
    发 送