这篇文章给大家聊聊关于使用消息队列打造高效分布式WebSocket应用,以及对应的知识点,希望对各位有所帮助,不要忘了收藏本站哦。
简单的概括一下:如果我们的项目是分布式环境,登录的用户被Nginx的反向代理分配到多个不同服务器,那么在其中一个服务器建立了WebSocket连接的用户如何给在另外一个服务器上建立了WebSocket连接的用户发送消息呢?
今天就来解答一下球友的问题:其实,要解决这个问题就需要实现分布式WebSocket,而分布式WebSocket一般可以通过以下两种方案来实现:
- 将消息(<用户id,消息内容>)统一推送到一个消息队列(Redis、Kafka等)的的topic,然后每个应用节点都订阅这个topic,在接收到WebSocket消息后取出这个消息的“消息接收者的用户ID/用户名”,然后再比对自身是否存在相应用户的连接,如果存在则推送消息,否则丢弃接收到的这个消息(这个消息接收者所在的应用节点会处理)
- 在用户建立WebSocket连接后,使用Redis缓存记录用户的WebSocket建立在哪个应用节点上,然后同样使用消息队列将消息推送到接收者所在的应用节点上面(实现上比方案一要复杂,但是网络流量会更低)
实现方案
下面将以第一种方案来具体实现,实现方式如下:
1. 定义一个WebSocket Channel枚举类
publicenumWebSocketChannelEnum{
//测试使用的简易点对点聊天
CHAT("CHAT","测试使用的简易点对点聊天","/topic/reply");
WebSocketChannelEnum(String code,String description,String subscribeUrl){
this.codecode;
this.descriptiondescription;
this.subscribeUrlsubscribeUrl;
}
/**
* 唯一CODE
*/
private String code;
/**
* 描述
*/
private String description;
/**
* WebSocket客户端订阅的URL
*/
private String subscribeUrl;
public String getCode(){
return code;
}
public String getDescription(){
return description;
}
public String getSubscribeUrl(){
return subscribeUrl;
}
/**
* 通过CODE查找枚举类
*/
public static WebSocketChannelEnum fromCode(String code){
if(StringUtils.isNoneBlank(code)){
for(WebSocketChannelEnum channelEnum:values()){
if(channelEnum.code.equals(code)){
return channelEnum;
}
}
}
returnnull;
}
}
2. 配置基于Redis的消息队列
需要注意的是,在大中型正式项目中并不推荐使用Redis实现的消息队列,因为经过测试它并不是特别可靠,所以应该考虑使用Kafka、rabbitMQ等专业的消息队列中间件
@Configuration
@ConditionalOnClass({JedisCluster.class})
public class RedisConfig{
@Value("${spring.redis.timeout}")
private String timeOut;
@Value("${spring.redis.cluster.nodes}")
private String nodes;
@Value("${spring.redis.cluster.max-redirects}")
privateintmaxRedirects;
@Value("${spring.redis.jedis.pool.max-active}")
privateintmaxActive;
@Value("${spring.redis.jedis.pool.max-wait}")
privateintmaxWait;
@Value("${spring.redis.jedis.pool.max-idle}")
privateintmaxIdle;
@Value("${spring.redis.jedis.pool.min-idle}")
privateintminIdle;
@Value("${spring.redis.message.topic-name}")
private String topicName;
@Bean
public JedisPoolConfig jedisPoolConfig(){
JedisPoolConfig confignew JedisPoolConfig();
config.setMaxTotal(maxActive);
config.setMaxIdle(maxIdle);
config.setMinIdle(minIdle);
config.setMaxWaitMillis(maxWait);
return config;
}
@Bean
public RedisClusterConfiguration redisClusterConfiguration(){
RedisClusterConfiguration configurationnew RedisClusterConfiguration(Arrays.asList(nodes));
configuration.setMaxRedirects(maxRedirects);
return configuration;
}
/**
* JedisConnectionFactory
*/
@Bean
public JedisConnectionFactory jedisConnectionFactory(RedisClusterConfiguration configuration,JedisPoolConfig jedisPoolConfig){
return new JedisConnectionFactory(configuration,jedisPoolConfig);
}
/**
* 使用Jackson序列化对象
*/
@Bean
public Jackson2JsonRedisSerializer<Object>jackson2JsonRedisSerializer(){
Jackson2JsonRedisSerializer<Object>serializernew Jackson2JsonRedisSerializer<Object>(Object.class);
ObjectMapper objectMappernew ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL,JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
serializer.setObjectMapper(objectMapper);
return serializer;
}
/**
* RedisTemplate
*/
@Bean
public RedisTemplate<String,Object>redisTemplate(JedisConnectionFactory factory,Jackson2JsonRedisSerializer<Object>jackson2JsonRedisSerializer){
RedisTemplate<String,Object>redisTemplatenew RedisTemplate<>();
redisTemplate.setConnectionFactory(factory);
//字符串方式序列化KEY
StringRedisSerializer stringRedisSerializernew StringRedisSerializer();
redisTemplate.setKeySerializer(stringRedisSerializer);
redisTemplate.setHashKeySerializer(stringRedisSerializer);
//JSON方式序列化VALUE
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
/**
* 消息监听器
*/
@Bean
MessageListenerAdapter messageListenerAdapter(MessageReceiver messageReceiver,Jackson2JsonRedisSerializer<Object>jackson2JsonRedisSerializer){
//消息接收者以及对应的默认处理方法
MessageListenerAdapter messageListenerAdapternew MessageListenerAdapter(messageReceiver,"receiveMessage");
//消息的反序列化方式
messageListenerAdapter.setSerializer(jackson2JsonRedisSerializer);
return messageListenerAdapter;
}
/**
* message listener container
*/
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory
,MessageListenerAdapter messageListenerAdapter){
RedisMessageListenerContainer containernew RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//添加消息监听器
container.addMessageListener(messageListenerAdapter,new PatternTopic(topicName));
return container;
}
}
需要注意的是,这里使用的配置如下所示:
spring:
...
#redis
redis:
cluster:
nodes:namenode22:6379,datanode23:6379,datanode24:6379
maxredirects:6
timeout:300000
jedis:
pool:
maxactive:8
maxwait:100000
maxidle:8
minidle:0
#自定义的监听的TOPIC路径
message:
topicname:topictest
3. 定义一个Redis消息的处理者
@Component
public class MessageReceiver{
private final Logger loggerLoggerFactory.getLogger(getClass());
@Autowired
private SimpMessagingTemplate messagingTemplate;
@Autowired
private SimpUserRegistry userRegistry;
/**
* 处理WebSocket消息
*/
public void receiveMessage(RedisWebsocketMsg redisWebsocketMsg){
logger.info(MessageFormat.format("Received Message: {0}",redisWebsocketMsg));
//1. 取出用户名并判断是否连接到当前应用节点的WebSocket
SimpUser simpUseruserRegistry.getUser(redisWebsocketMsg.getReceiver());
if(simpUser!=null&&StringUtils.isNoneBlank(simpUser.getName())){
//2. 获取WebSocket客户端的订阅地址
WebSocketChannelEnum channelEnumWebSocketChannelEnum.fromCode(redisWebsocketMsg.getChannelCode());
if(channelEnum!=null){
//3. 给WebSocket客户端发送消息
messagingTemplate.convertAndSendToUser(redisWebsocketMsg.getReceiver(),channelEnum.getSubscribeUrl(),redisWebsocketMsg.getContent());
}
}
}
}
4. 在Controller中发送WebSocket消息
@Controller
@RequestMapping(("/wsTemplate"))
public class RedisMessageController{
private final Logger loggerLoggerFactory.getLogger(getClass());
@Value("${spring.redis.message.topic-name}")
private String topicName;
@Autowired
private SimpMessagingTemplate messagingTemplate;
@Autowired
private SimpUserRegistry userRegistry;
@Resource(name"redisServiceImpl")
private RedisService redisService;
/**
* 给指定用户发送WebSocket消息
*/
@PostMapping("/sendToUser")
@ResponseBody
public String chat(HttpServletRequest request){
//消息接收者
String receiverrequest.getParameter("receiver");
//消息内容
String msgrequest.getParameter("msg");
HttpSession sessionSpringContextUtils.getSession();
User loginUser(User)session.getAttribute(Constants.SESSION_USER);
HelloMessage resultDatanew HelloMessage(MessageFormat.format("{0} say: {1}",loginUser.getUsername(),msg));
this.sendToUser(loginUser.getUsername(),receiver,WebSocketChannelEnum.CHAT.getSubscribeUrl(),JsonUtils.toJson(resultData));
return"ok";
}
/**
* 给指定用户发送消息,并处理接收者不在线的情况
* @param sender 消息发送者
* @param receiver 消息接收者
* @param destination 目的地
* @param payload 消息正文
*/
private void sendToUser(String sender,String receiver,String destination,String payload){
SimpUser simpUseruserRegistry.getUser(receiver);
//如果接收者存在,则发送消息
if(simpUser!=null&&StringUtils.isNoneBlank(simpUser.getName())){
messagingTemplate.convertAndSendToUser(receiver,destination,payload);
}
//如果接收者在线,则说明接收者连接了集群的其他节点,需要通知接收者连接的那个节点发送消息
else if(redisService.isSetMember(Constants.REDIS_WEBSOCKET_USER_SET,receiver)){
RedisWebsocketMsg<String>redisWebsocketMsgnew RedisWebsocketMsg<>(receiver,WebSocketChannelEnum.CHAT.getCode(),payload);
redisService.convertAndSend(topicName,redisWebsocketMsg);
}
//否则将消息存储到redis,等用户上线后主动拉取未读消息
else{
//存储消息的Redis列表名
String listKeyConstants.REDIS_UNREAD_MSG_PREFIX+receiver+":"+destination;
logger.info(MessageFormat.format("消息接收者{0}还未建立WebSocket连接,{1}发送的消息【{2}】将被存储到Redis的【{3}】列表中",receiver,sender,payload,listKey));
//存储消息到Redis中
redisService.addToListRight(listKey,ExpireEnum.UNREAD_MSG,payload);
}
}
/**
* 拉取指定监听路径的未读的WebSocket消息
* @param destination 指定监听路径
* @return java.util.Map
*/
@PostMapping("/pullUnreadMessage")
@ResponseBody
public Map<String,Object>pullUnreadMessage(String destination){
Map<String,Object>resultnew HashMap<>();
try{
HttpSession sessionSpringContextUtils.getSession();
//当前登录用户
User loginUser(User)session.getAttribute(Constants.SESSION_USER);
//存储消息的Redis列表名
String listKeyConstants.REDIS_UNREAD_MSG_PREFIX+loginUser.getUsername()+":"+destination;
//从Redis中拉取所有未读消息
List<Object>messageListredisService.rangeList(listKey,0,1);
result.put("code","200");
if(messageList!=null&&messageList.size()>0){
//删除Redis中的这个未读消息列表
redisService.delete(listKey);
//将数据添加到返回集,供前台页面展示
result.put("result",messageList);
}
}catch(Exception e){
result.put("code","500");
result.put("msg",e.getMessage());
}
return result;
}
}
5. WebSocket相关配置
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer{
@Autowired
private AuthHandshakeInterceptor authHandshakeInterceptor;
@Autowired
private MyHandshakeHandler myHandshakeHandler;
@Autowired
private MyChannelInterceptor myChannelInterceptor;
@Override
public void registerStompEndpoints(StompEndpointRegistry registry){
registry.addEndpoint("/chat-websocket")
.addInterceptors(authHandshakeInterceptor)
.setHandshakeHandler(myHandshakeHandler)
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry){
//客户端需要把消息发送到/message/xxx地址
registry.setApplicationDestinationPrefixes("/message");
//服务端广播消息的路径前缀,客户端需要相应订阅/topic/yyy这个地址的消息
registry.enableSimpleBroker("/topic");
//给指定用户发送消息的路径前缀,默认值是/user/
registry.setUserDestinationPrefix("/user/");
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration){
registration.interceptors(myChannelInterceptor);
}
}
6. 示例页面
<head>
<meta content"text/html;charset=UTF-8"/>
<meta httpequiv"Content-Type"content"text/html; charset=utf-8"/>
<meta httpequiv"X-UA-Compatible"content"IE=edge"/>
<meta name"viewport"content"width=device-width, initial-scale=1"/>
<title>Chat With STOMP Messagetitle>
<script src"https://cdnjs.cloudflare.com/ajax/libs/jquery/3.3.1/jquery.min.js">script>
<script src"https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.1.4/sockjs.min.js">script>
<script src"https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js">script>
<script th:src"@{/layui/layui.js}">script>
<script th:src"@{/layui/lay/modules/layer.js}">script>
<link th:href"@{/layui/css/layui.css}"rel"stylesheet">
<link th:href"@{/layui/css/modules/layer/default/layer.css}"rel"stylesheet">
<link th:href"@{/css/style.css}"rel"stylesheet">
<style type"text/css">
#connectcontainer{
margin:0auto;
width:400px;
}
#connectcontainer div{
padding:5px;
margin:07px10px0;
}
.messageinput{
padding:5px;
margin:07px10px0;
}
.layuibtn{
display:inlineblock;
}
style>
<script type"text/javascript">
var stompClientnull;
$(function(){
var target$("#target");
if(window.location.protocol'http:'){
target.val('http://'+window.location.host+target.val());
}else{
target.val('https://'+window.location.host+target.val());
}
});
function setConnected(connected){
var connect$("#connect");
var disconnect$("#disconnect");
var echo$("#echo");
if(connected){
connect.addClass("layui-btn-disabled");
disconnect.removeClass("layui-btn-disabled");
echo.removeClass("layui-btn-disabled");
}else{
connect.removeClass("layui-btn-disabled");
disconnect.addClass("layui-btn-disabled");
echo.addClass("layui-btn-disabled");
}
connect.attr("disabled",connected);
disconnect.attr("disabled",!connected);
echo.attr("disabled",!connected);
}
//连接
function connect(){
var target$("#target").val();
var wsnew SockJS(target);
stompClientStomp.over(ws);
stompClient.connect({},function(){
setConnected(true);
log('Info: STOMP connection opened.');
//连接成功后,主动拉取未读消息
pullUnreadMessage("/topic/reply");
//订阅服务端的/topic/reply地址
stompClient.subscribe("/user/topic/reply",function(response){
log(JSON.parse(response.body).content);
})
},function(){
//断开处理
setConnected(false);
log('Info: STOMP connection closed.');
});
}
//断开连接
function disconnect(){
if(stompClient!=null){
stompClient.disconnect();
stompClientnull;
}
setConnected(false);
log('Info: STOMP connection closed.');
}
//向指定用户发送消息
function sendMessage(){
if(stompClient!=null){
var receiver$("#receiver").val();
var msg$("#message").val();
log('Sent: '+JSON.stringify({'receiver':receiver,'msg':msg}));
$.ajax({
url:"/wsTemplate/sendToUser",
type:"POST",
dataType:"json",
async:true,
data:{
"receiver":receiver,
"msg":msg
},
success:function(data){
}
});
}else{
layer.msg('STOMP connection not established, please connect.',{
offset:'auto'
,icon:2
});
}
}
//从服务器拉取未读消息
function pullUnreadMessage(destination){
$.ajax({
url:"/wsTemplate/pullUnreadMessage",
type:"POST",
dataType:"json",
async:true,
data:{
"destination":destination
},
success:function(data){
if(data.result!=null){
$.each(data.result,function(i,item){
log(JSON.parse(item).content);
})
}else if(data.code!=null&&data.code"500"){
layer.msg(data.msg,{
offset:'auto'
,icon:2
});
}
}
});
}
//日志输出
function log(message){
console.debug(message);
}
script>
head>
<body>
<noscript><h2 style"color: #ff0000">Seems your browser doesn't support Javascript! Websockets rely on Javascript being
enabled. Please enable
Javascript and reload this page!