Spring boot集成websocket实现分布式websocketsession共享(一)--基于redis的发布订阅

  • 时间:
  • 来源:互联网

本文主要是针对分布式场景下的使用websocket的一个解决方案。我们以下面的图来说明下业务使用场景。

 针对如图的情况,很多人第一时间想到的是websocket的session共享,这是大多数的第一反应。很遗憾的是,websocketsession是不支持序列化操作,所以也就不可能存在redis中。那么我们有什么其他的方式解决呢。

  我们知道在单节点中我们只需要吧websocketsession存储在Map中就OK,每次发送通知都从map中根据clientID获取对应的websocket的session进行消息通知。那么我们是不是可以这样,不管是哪个服务节点要发送消息,我都告诉其他的服务,当前要发送到的客户端和发送的内容。然后各个服务节点判断自己是否存储了对应的clientID,然后将消息推送给出去呢。

 具体的实现逻辑,redis配置:

@Configuration
public class RedisConfig {
      // 注入 RedisConnectionFactory
    @Autowired
    private RedisConnectionFactory redisConnectionFactory;
 
    @Bean
    public RedisTemplate<String, Object> functionDomainRedisTemplate() {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        initDomainRedisTemplate(redisTemplate, redisConnectionFactory);
        return redisTemplate;
    }
 
    /**
     * 设置数据存入 redis 的序列化方式
     * @param redisTemplate
     * @param factory
     */
    private void initDomainRedisTemplate(RedisTemplate<String, Object> redisTemplate, RedisConnectionFactory factory) {
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashValueSerializer(new JdkSerializationRedisSerializer());
        redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());
        redisTemplate.setConnectionFactory(factory);
    }
    
     /**
     * 实例化 HashOperations 对象,可以使用 Hash 类型操作
     * @param redisTemplate
     * @return
     */
    @Bean
    public HashOperations<String, String, Object> hashOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForHash();
    }
 
    /**
     * 实例化 ValueOperations 对象,可以使用 String 操作
     * @param redisTemplate
     * @return
     */
    @Bean
    public ValueOperations<String, Object> valueOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForValue();
    }
 
    /**
     * 实例化 ListOperations 对象,可以使用 List 操作
     * @param redisTemplate
     * @return
     */
    @Bean
    public ListOperations<String, Object> listOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForList();
    }
 
    /**
     * 实例化 SetOperations 对象,可以使用 Set 操作
     * @param redisTemplate
     * @return
     */
    @Bean
    public SetOperations<String, Object> setOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForSet();
    }
    
    /**
     * 实例化 ZSetOperations 对象,可以使用 ZSet 操作
     * @param redisTemplate
     * @return
     */
    @Bean
    public ZSetOperations<String, Object> zSetOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForZSet();
    }
}
redis发布订阅配置:

@Configuration
public class RedisPublishConfig {
    @Autowired
    private StaticProperties staticProperties;
    /**
     * redis消息监听器容器 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
     * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
     * 
     * @param connectionFactory
     * @param listenerAdapter
     * @return
     */
    @Bean // 相当于xml中的bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
            MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        // 订阅了一个叫chat 的通道
        container.addMessageListener(listenerAdapter, new PatternTopic(staticProperties.getWS_CHANNEL()));
        // 这个container 可以添加多个 messageListener
        return container;
    }
 
    /**
     * 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法
     * 
     * @param receiver
     * @return
     */
    @Bean
    MessageListenerAdapter listenerAdapter(RedisMsg receiver) {
        // 这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage”
        // 也有好几个重载方法,这边默认调用处理器的方法 叫handleMessage 可以自己到源码里面看
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }
 
}
 定义接受信息的接口

@Component
public interface RedisMsg {
    /**
     * 接受信息
     * @param message
     */
    public void receiveMessage(String message);
}
websocket配置

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer{
 
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        
        //handler是webSocket的核心,配置入口
        registry.addHandler(new CTIHandler(), "/webscoket/{ID}").setAllowedOrigins("*").addInterceptors(new WebSocketInterceptor());
        
    }
 
}
 

interceptor配置

public class WebSocketInterceptor extends HttpSessionHandshakeInterceptor {
    /**
     * 配置日志
     */
    private final static Logger logger = LoggerFactory.getLogger(WebSocketInterceptor.class);
 
    @Override
    public boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse seHttpResponse,
            WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
//        HttpServletRequest request = ((ServletServerHttpRequest) serverHttpRequest).getServletRequest();
        String userName = serverHttpRequest.getURI().toString().split("ID=")[1];
        attributes.put("userName", userName);
        logger.info("握手之前");
        //从request里面获取对象,存放attributes
        return super.beforeHandshake(serverHttpRequest, seHttpResponse, wsHandler, attributes);
    }
 
    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
            Exception ex) {
        logger.info("握手之后");
        super.afterHandshake(request, response, wsHandler, ex);
    }
}
配置websocket的handler,并配置为redis的接受消息的实现类:

@Service
public class CTIHandler implements WebSocketHandler ,RedisMsg{
    /**
     * 配置日志
     */
    private final static Logger logger = LoggerFactory.getLogger(CTIHandler.class);
    /**
     * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
     */
    private static Map<String,WebSocketSession> socketMap = new HashMap<String, WebSocketSession>();
    //新增socket
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        logger.info("成功建立连接");
        //获取用户信息
        String userName = (String) session.getAttributes().get("userName");
        logger.info("获取当前"+socketMap.get(userName));
        if(socketMap.get(userName)==null) {
            socketMap.put(userName,session);
            sendMessageToUser(userName, new TextMessage("链接建立成功"));
            //并且通过redis发布和订阅广播给其他的的机器,或者通过消息队列
        }
        logger.info("链接成功");
    }
 
    //接收socket信息
    @Override
    public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage<?> webSocketMessage) throws Exception {
        logger.info("收到信息"+webSocketMessage.toString());
        String userName = (String) webSocketSession.getAttributes().get("userName");
        webSocketSession.sendMessage(new TextMessage("aaa"));
        sendMessageToUser(userName, new TextMessage("我收到你的信息了"));
    }
 
    /**
     * 发送信息给指定用户
     * @param clientId
     * @param message
     * @return
     */
    public boolean sendMessageToUser(String clientId, TextMessage message) {
        WebSocketSession session = socketMap.get(clientId);
        if(session==null) {
            return false;
        }
        logger.info("进入发送消息");
        if (!session.isOpen()) {
            return false;
        }
        try {
            logger.info("正在发送消息");
            session.sendMessage(message);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return true;
    }
 
 
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        if (session.isOpen()) {
            session.close();
        }
        logger.info("连接出错");
    }
 
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        //获取用户信息
        String userName = (String) session.getAttributes().get("userName");
        if(socketMap.get(userName)!=null) {
            socketMap.remove(userName);
            //并且通过redis发布和订阅广播给其他的的机器,或者通过消息队列
        }
        logger.info("连接已关闭:" + status);
    }
 
    @Override
    public boolean supportsPartialMessages() {
        return false;
    }
    /**
     * 接受订阅信息
     */
    @Override
    public void receiveMessage(String message) {
        // TODO Auto-generated method stub
        JSONObject sendMsg = JSONObject.fromObject(message.substring(message.indexOf("{")));
        String clientId = sendMsg.getString("userName");
        TextMessage receiveMessage = new TextMessage(sendMsg.getString("message"));
        boolean flag = sendMessageToUser(clientId, receiveMessage);
        if(flag) {
            logger.info("我发送消息成功了!");
        }
    }
 
 
}
配置文件配置

spring:
  application:
    name: crm-cti
  #redis配置
  redis:
    host: 47.95.250.218
    password: zhudaxian;.,68NB
    port: 6379
    database: 0
POM文件配置 

 <!-- websocket支持 -->
     <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
     </dependency>
     <!-- redis的支持 -->
     <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-data-redis</artifactId>
      </dependency>
已上就是大体的环境配置,业务接口发送消息

    @Override
    public String tjJTPush(String cno) {
        // TODO Auto-generated method stub
        JSONObject result = new JSONObject();
        if(null==Pub_Tools.getString_TrimZeroLenAsNull(cno)) {
            result.put("result", "error");
        }else {
            try {
                Map<String, Object> userData = crmCallBindDao.findSalesMessage(cno);
                //单节点实现方式,如果是单节点建议使用该方式,如果是分布式部署废弃该方式
                Boolean flag = ctiHanler.sendMessageToUser(userData.get("userName").toString(), new TextMessage("hangup"));
                if(!flag) {//发送失败广播出去,让其他节点发送
                    //广播消息到各个订阅者
                    JSONObject message = new JSONObject();
                    message.put("userName", userData.get("userName"));
                    message.put("message", "connect");
                    redisTemplate.convertAndSend(staticProperties.getWS_CHANNEL(),message.toString());
                }
            } catch (Exception e) {
                e.printStackTrace();
                logger.error("推送给客户端失败");
            }
            result.put("result", "success");
        }
        return result.toString();
    }

本文链接http://element-ui.cn/news/show-2197.aspx