srouin

tio集群配置不生效

2019-5-21 13:55:28 0 待实现

参照 https://www.t-io.org/322 配置了tio集群 但不生效

package com.example.websocket;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.StringCodec;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;
import org.tio.cluster.TioClusterConfig;
import org.tio.cluster.redisson.RedissonTioClusterTopic;
import org.tio.server.ServerGroupContext;
import org.tio.websocket.server.WsServerStarter;

@SpringBootConfiguration
public class TioWsConfiguration {
    @Value("${tio.port}")
    private Integer port;
    @Value("${spring.redis.host}")
    private String redisHost;
    @Value("${spring.redis.port}")
    private String redisPort;
    @Value("${spring.redis.password}")
    private String redisPassword;
    @Value("${spring.redis.lettuce.pool.max-active}")
    private Integer redisMaxActive;
    @Value("${spring.redis.lettuce.pool.min-idle}")
    private Integer redisMinIdle;
    @Bean
    public WsServerStarter wsServerStarter() throws Exception {
        return new WsServerStarter(port, new TioWsMessageHandler(port));
    }
    @Bean
    public ServerGroupContext serverGroupContext(RedissonClient redissonClient, WsServerStarter wsServerStarter) {
        RedissonTioClusterTopic tioClusterTopic = new RedissonTioClusterTopic("_TOPIC", redissonClient);
        TioClusterConfig tioClusterConfig = new TioClusterConfig(tioClusterTopic);
        tioClusterConfig.setCluster4group(true);
        ServerGroupContext serverGroupContext = wsServerStarter.getServerGroupContext();
        serverGroupContext.setName("tio");
        serverGroupContext.setServerAioListener(new TioWsServerListener());
        serverGroupContext.setGroupListener(new TioWsGroupListener());
        serverGroupContext.setHeartbeatTimeout(1000 * 60);
        serverGroupContext.setTioClusterConfig(tioClusterConfig);
        return serverGroupContext;
    }
    @Bean(destroyMethod = "shutdown")
    public RedissonClient redissonClient() {
        Config config = new Config();
        config.setCodec(new StringCodec());
        config.setLockWatchdogTimeout(1000 * 30);
        config.useSingleServer().setAddress("redis://" + redisHost + ":" + redisPort).setConnectionPoolSize(redisMaxActive).setConnectionMinimumIdleSize(redisMinIdle).setPassword(redisPassword);
        return Redisson.create(config);
    }
}
package com.example.websocket;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.core.intf.Packet;
import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse;
import org.tio.http.common.HttpResponseStatus;
import org.tio.utils.hutool.StrUtil;
import org.tio.utils.json.Json;
import org.tio.websocket.common.Opcode;
import org.tio.websocket.common.WsRequest;
import org.tio.websocket.common.WsResponse;
import org.tio.websocket.server.handler.IWsMsgHandler;

import java.io.UnsupportedEncodingException;

public class TioWsMessageHandler implements IWsMsgHandler {
    private static final Logger logger = LoggerFactory.getLogger(TioWsMessageHandler.class);
    private Integer port;

    public TioWsMessageHandler(Integer port){
        this.port=port;
    }
    private static final String GROUP_ALL = "TIO-WEBSOCKET-SPRING-BOOT-STARTER-ALL";
    @Override
    public HttpResponse handshake(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {
        String userId = httpRequest.getParam("uid");
        if (StrUtil.isBlank(userId)) {
            httpResponse.setStatus(HttpResponseStatus.C401);
        }
        return httpResponse;
    }

    @Override
    public void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {
        Tio.bindGroup(channelContext, GROUP_ALL);
        String userId = httpRequest.getParam("uid");
        Tio.bindUser(channelContext, userId);
        Tio.sendToAll(channelContext.groupContext,wsPacket("new User:" + userId + " said :I come from :" + port));
    }

    @Override
    public Object onBytes(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
        return null;
    }

    @Override
    public Object onClose(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
        Tio.remove(channelContext, "websocket closed");
        return null;
    }

    @Override
    public Object onText(WsRequest wsRequest, String s, ChannelContext channelContext) throws Exception {
        Tio.sendToGroup(channelContext.groupContext, GROUP_ALL, wsPacket("msg from " + port + ":" + s));
        return null;
    }

    private static Packet wsPacket(Object msg){
        if (msg instanceof Packet){
            return (Packet)msg;
        }
        WsResponse response = new WsResponse();
        response.setWsOpcode(Opcode.TEXT);
        final String json = Json.toJson(msg);
        response.setWsBodyText(json);
        response.setBody(getBytes(json));
        return response;
    }

    private static byte[] getBytes(String value) {
        try {
            return value.getBytes("utf-8");
        }
        catch (UnsupportedEncodingException e){
            e.printStackTrace();
        }
        return null;
    }
}
package com.example.websocket;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.core.intf.Packet;
import org.tio.websocket.common.WsResponse;
import org.tio.websocket.common.WsSessionContext;
import org.tio.websocket.server.WsServerAioListener;

public class TioWsServerListener extends WsServerAioListener {
    private static final Logger logger = LoggerFactory.getLogger(TioWsServerListener.class);

    @Override
    public void onAfterSent(ChannelContext channelContext, Packet packet, boolean isSentSuccess) throws Exception {
        super.onAfterSent(channelContext, packet, isSentSuccess);

        System.out.println("onAfterSent");
    }

    @Override
    public void onAfterConnected(ChannelContext channelContext, boolean isConnected, boolean isReconnect) throws Exception {
        super.onAfterConnected(channelContext, isConnected, isReconnect);

        System.out.println("onAfterConnected");
    }

    @Override
    public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) throws Exception {
        super.onBeforeClose(channelContext, throwable, remark, isRemove);
        System.out.println("onBeforeClose");
    }
}
server.port=8182
tio.port=8011
#redis
spring.redis.host=127.0.0.1
spring.redis.port=6379
spring.redis.password=redis
# 连接超时时间(毫秒)
spring.redis.timeout=10000
# Redis默认情况下有16个分片,这里配置具体使用的分片,默认是0
spring.redis.database=1
# 连接池最大连接数(使用负值表示没有限制) 默认 8
spring.redis.lettuce.pool.max-active=8
# 连接池最大阻塞等待时间(使用负值表示没有限制) 默认 -1
spring.redis.lettuce.pool.max-wait=-1
# 连接池中的最大空闲连接 默认 8
spring.redis.lettuce.pool.max-idle=8
# 连接池中的最小空闲连接 默认 0
spring.redis.lettuce.pool.min-idle=0
spring.resources.static-locations=classpath:/static/

客户端

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
    <script>
        var uid =parseInt(Math.random()*10000);
        var url = "ws://localhost:8011";
        var ws =new WebSocket(url+"?uid="+uid);
        ws.onopen = function (event) {
            console.log("opened");
            ws.send("Hello Tio WebSocket");
        }
        ws.onmessage=function (p1) {
         appendHtml( document.getElementById('container'),'<li>'+p1.data+'</li>');
        }
        function send() {
            ws.send(document.getElementById('txtInput').value);
        }

        function appendHtml(elem,value){
            var node = document.createElement("div"),
                fragment = document.createDocumentFragment(),
                childs = null,
                i = 0;
            node.innerHTML = value;
            childs = node.childNodes;
            for( ; i < childs.length; i++){
                fragment.appendChild(childs[i]);
            }
            elem.appendChild(fragment);
            childs = null;
            fragment = null;
            node = null;
        }
    </script>
</head>
<body>

<div>
    <ul id="container">

    </ul>

    <input type="text"  id="txtInput"/>
    <input type="button" value="发言" onclick="send()">
</div>
</body>
</html>

连接同一服务器

连接不同服务器

TCP连接数:, IP数:
    发 送