1. 首页 > 快讯

使用消息队列打造高效分布式WebSocket应用

这篇文章给大家聊聊关于使用消息队列打造高效分布式WebSocket应用,以及对应的知识点,希望对各位有所帮助,不要忘了收藏本站哦。

简单的概括一下:如果我们的项目是分布式环境,登录的用户被Nginx的反向代理分配到多个不同服务器,那么在其中一个服务器建立了WebSocket连接的用户如何给在另外一个服务器上建立了WebSocket连接的用户发送消息呢?

今天就来解答一下球友的问题:其实,要解决这个问题就需要实现分布式WebSocket,而分布式WebSocket一般可以通过以下两种方案来实现:

  • 将消息(<用户id,消息内容>)统一推送到一个消息队列(Redis、Kafka等)的的topic,然后每个应用节点都订阅这个topic,在接收到WebSocket消息后取出这个消息的“消息接收者的用户ID/用户名”,然后再比对自身是否存在相应用户的连接,如果存在则推送消息,否则丢弃接收到的这个消息(这个消息接收者所在的应用节点会处理)
  • 在用户建立WebSocket连接后,使用Redis缓存记录用户的WebSocket建立在哪个应用节点上,然后同样使用消息队列将消息推送到接收者所在的应用节点上面(实现上比方案一要复杂,但是网络流量会更低)

实现方案

下面将以第一种方案来具体实现,实现方式如下:

1. 定义一个WebSocket Channel枚举类

publicenumWebSocketChannelEnum{
//测试使用的简易点对点聊天
CHAT("CHAT","测试使用的简易点对点聊天","/topic/reply");

WebSocketChannelEnum(String code,String description,String subscribeUrl){
this.codecode;
this.descriptiondescription;
this.subscribeUrlsubscribeUrl;
}

/**
* 唯一CODE
*/
private String code;
/**
* 描述
*/
private String description;
/**
* WebSocket客户端订阅的URL
*/
private String subscribeUrl;

public String getCode(){
return code;
}

public String getDescription(){
return description;
}

public String getSubscribeUrl(){
return subscribeUrl;
}

/**
* 通过CODE查找枚举类
*/
public static WebSocketChannelEnum fromCode(String code){
if(StringUtils.isNoneBlank(code)){
for(WebSocketChannelEnum channelEnum:values()){
if(channelEnum.code.equals(code)){
return channelEnum;
}
}
}

returnnull;
}

}

2. 配置基于Redis的消息队列

需要注意的是,在大中型正式项目中并不推荐使用Redis实现的消息队列,因为经过测试它并不是特别可靠,所以应该考虑使用Kafka、rabbitMQ等专业的消息队列中间件

@Configuration
@ConditionalOnClass({JedisCluster.class})
public class RedisConfig{

@Value("${spring.redis.timeout}")
private String timeOut;

@Value("${spring.redis.cluster.nodes}")
private String nodes;

@Value("${spring.redis.cluster.max-redirects}")
privateintmaxRedirects;

@Value("${spring.redis.jedis.pool.max-active}")
privateintmaxActive;

@Value("${spring.redis.jedis.pool.max-wait}")
privateintmaxWait;

@Value("${spring.redis.jedis.pool.max-idle}")
privateintmaxIdle;

@Value("${spring.redis.jedis.pool.min-idle}")
privateintminIdle;

@Value("${spring.redis.message.topic-name}")
private String topicName;

@Bean
public JedisPoolConfig jedisPoolConfig(){
JedisPoolConfig confignew JedisPoolConfig();
config.setMaxTotal(maxActive);
config.setMaxIdle(maxIdle);
config.setMinIdle(minIdle);
config.setMaxWaitMillis(maxWait);

return config;
}

@Bean
public RedisClusterConfiguration redisClusterConfiguration(){
RedisClusterConfiguration configurationnew RedisClusterConfiguration(Arrays.asList(nodes));
configuration.setMaxRedirects(maxRedirects);

return configuration;
}

/**
* JedisConnectionFactory
*/
@Bean
public JedisConnectionFactory jedisConnectionFactory(RedisClusterConfiguration configuration,JedisPoolConfig jedisPoolConfig){
return new JedisConnectionFactory(configuration,jedisPoolConfig);
}

/**
* 使用Jackson序列化对象
*/
@Bean
public Jackson2JsonRedisSerializer<Object>jackson2JsonRedisSerializer(){
Jackson2JsonRedisSerializer<Object>serializernew Jackson2JsonRedisSerializer<Object>(Object.class);

ObjectMapper objectMappernew ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL,JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
serializer.setObjectMapper(objectMapper);

return serializer;
}

/**
* RedisTemplate
*/
@Bean
public RedisTemplate<String,Object>redisTemplate(JedisConnectionFactory factory,Jackson2JsonRedisSerializer<Object>jackson2JsonRedisSerializer){
RedisTemplate<String,Object>redisTemplatenew RedisTemplate<>();
redisTemplate.setConnectionFactory(factory);

//字符串方式序列化KEY
StringRedisSerializer stringRedisSerializernew StringRedisSerializer();
redisTemplate.setKeySerializer(stringRedisSerializer);
redisTemplate.setHashKeySerializer(stringRedisSerializer);

//JSON方式序列化VALUE
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);

redisTemplate.afterPropertiesSet();

return redisTemplate;
}

/**
* 消息监听器
*/
@Bean
MessageListenerAdapter messageListenerAdapter(MessageReceiver messageReceiver,Jackson2JsonRedisSerializer<Object>jackson2JsonRedisSerializer){
//消息接收者以及对应的默认处理方法
MessageListenerAdapter messageListenerAdapternew MessageListenerAdapter(messageReceiver,"receiveMessage");
//消息的反序列化方式
messageListenerAdapter.setSerializer(jackson2JsonRedisSerializer);

return messageListenerAdapter;
}

/**
* message listener container
*/
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory
,MessageListenerAdapter messageListenerAdapter){
RedisMessageListenerContainer containernew RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//添加消息监听器
container.addMessageListener(messageListenerAdapter,new PatternTopic(topicName));

return container;
}

}

需要注意的是,这里使用的配置如下所示:

spring:
...
#redis
redis:
cluster:
nodes:namenode22:6379,datanode23:6379,datanode24:6379
maxredirects:6
timeout:300000
jedis:
pool:
maxactive:8
maxwait:100000
maxidle:8
minidle:0
#自定义的监听的TOPIC路径
message:
topicname:topictest

3. 定义一个Redis消息的处理者

@Component
public class MessageReceiver{
private final Logger loggerLoggerFactory.getLogger(getClass());

@Autowired
private SimpMessagingTemplate messagingTemplate;

@Autowired
private SimpUserRegistry userRegistry;

/**
* 处理WebSocket消息
*/
public void receiveMessage(RedisWebsocketMsg redisWebsocketMsg){
logger.info(MessageFormat.format("Received Message: {0}",redisWebsocketMsg));
//1. 取出用户名并判断是否连接到当前应用节点的WebSocket
SimpUser simpUseruserRegistry.getUser(redisWebsocketMsg.getReceiver());

if(simpUser!=null&&StringUtils.isNoneBlank(simpUser.getName())){
//2. 获取WebSocket客户端的订阅地址
WebSocketChannelEnum channelEnumWebSocketChannelEnum.fromCode(redisWebsocketMsg.getChannelCode());

if(channelEnum!=null){
//3. 给WebSocket客户端发送消息
messagingTemplate.convertAndSendToUser(redisWebsocketMsg.getReceiver(),channelEnum.getSubscribeUrl(),redisWebsocketMsg.getContent());
}
}

}
}

4. 在Controller中发送WebSocket消息

@Controller
@RequestMapping(("/wsTemplate"))
public class RedisMessageController{
private final Logger loggerLoggerFactory.getLogger(getClass());

@Value("${spring.redis.message.topic-name}")
private String topicName;

@Autowired
private SimpMessagingTemplate messagingTemplate;

@Autowired
private SimpUserRegistry userRegistry;

@Resource(name"redisServiceImpl")
private RedisService redisService;

/**
* 给指定用户发送WebSocket消息
*/
@PostMapping("/sendToUser")
@ResponseBody
public String chat(HttpServletRequest request){
//消息接收者
String receiverrequest.getParameter("receiver");
//消息内容
String msgrequest.getParameter("msg");
HttpSession sessionSpringContextUtils.getSession();
User loginUser(User)session.getAttribute(Constants.SESSION_USER);

HelloMessage resultDatanew HelloMessage(MessageFormat.format("{0} say: {1}",loginUser.getUsername(),msg));
this.sendToUser(loginUser.getUsername(),receiver,WebSocketChannelEnum.CHAT.getSubscribeUrl(),JsonUtils.toJson(resultData));

return"ok";
}

/**
* 给指定用户发送消息,并处理接收者不在线的情况
* @param sender 消息发送者
* @param receiver 消息接收者
* @param destination 目的地
* @param payload 消息正文
*/
private void sendToUser(String sender,String receiver,String destination,String payload){
SimpUser simpUseruserRegistry.getUser(receiver);

//如果接收者存在,则发送消息
if(simpUser!=null&&StringUtils.isNoneBlank(simpUser.getName())){
messagingTemplate.convertAndSendToUser(receiver,destination,payload);
}
//如果接收者在线,则说明接收者连接了集群的其他节点,需要通知接收者连接的那个节点发送消息
else if(redisService.isSetMember(Constants.REDIS_WEBSOCKET_USER_SET,receiver)){
RedisWebsocketMsg<String>redisWebsocketMsgnew RedisWebsocketMsg<>(receiver,WebSocketChannelEnum.CHAT.getCode(),payload);

redisService.convertAndSend(topicName,redisWebsocketMsg);
}
//否则将消息存储到redis,等用户上线后主动拉取未读消息
else{
//存储消息的Redis列表名
String listKeyConstants.REDIS_UNREAD_MSG_PREFIX+receiver+":"+destination;
logger.info(MessageFormat.format("消息接收者{0}还未建立WebSocket连接,{1}发送的消息【{2}】将被存储到Redis的【{3}】列表中",receiver,sender,payload,listKey));

//存储消息到Redis中
redisService.addToListRight(listKey,ExpireEnum.UNREAD_MSG,payload);
}

}


/**
* 拉取指定监听路径的未读的WebSocket消息
* @param destination 指定监听路径
* @return java.util.Map
*/
@PostMapping("/pullUnreadMessage")
@ResponseBody
public Map<String,Object>pullUnreadMessage(String destination){
Map<String,Object>resultnew HashMap<>();
try{
HttpSession sessionSpringContextUtils.getSession();
//当前登录用户
User loginUser(User)session.getAttribute(Constants.SESSION_USER);

//存储消息的Redis列表名
String listKeyConstants.REDIS_UNREAD_MSG_PREFIX+loginUser.getUsername()+":"+destination;
//从Redis中拉取所有未读消息
List<Object>messageListredisService.rangeList(listKey,0,1);

result.put("code","200");
if(messageList!=null&&messageList.size()>0){
//删除Redis中的这个未读消息列表
redisService.delete(listKey);
//将数据添加到返回集,供前台页面展示
result.put("result",messageList);
}
}catch(Exception e){
result.put("code","500");
result.put("msg",e.getMessage());
}

return result;
}

}

5. WebSocket相关配置

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer{
@Autowired
private AuthHandshakeInterceptor authHandshakeInterceptor;

@Autowired
private MyHandshakeHandler myHandshakeHandler;

@Autowired
private MyChannelInterceptor myChannelInterceptor;

@Override
public void registerStompEndpoints(StompEndpointRegistry registry){
registry.addEndpoint("/chat-websocket")
.addInterceptors(authHandshakeInterceptor)
.setHandshakeHandler(myHandshakeHandler)
.withSockJS();
}

@Override
public void configureMessageBroker(MessageBrokerRegistry registry){
//客户端需要把消息发送到/message/xxx地址
registry.setApplicationDestinationPrefixes("/message");
//服务端广播消息的路径前缀,客户端需要相应订阅/topic/yyy这个地址的消息
registry.enableSimpleBroker("/topic");
//给指定用户发送消息的路径前缀,默认值是/user/
registry.setUserDestinationPrefix("/user/");
}

@Override
public void configureClientInboundChannel(ChannelRegistration registration){
registration.interceptors(myChannelInterceptor);
}

}

6. 示例页面

<head>
<meta content"text/html;charset=UTF-8"/>
<meta httpequiv"Content-Type"content"text/html; charset=utf-8"/>
<meta httpequiv"X-UA-Compatible"content"IE=edge"/>
<meta name"viewport"content"width=device-width, initial-scale=1"/>
<title>Chat With STOMP Messagetitle>
<script src"https://cdnjs.cloudflare.com/ajax/libs/jquery/3.3.1/jquery.min.js">script>
<script src"https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.1.4/sockjs.min.js">script>
<script src"https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js">script>
<script th:src"@{/layui/layui.js}">script>
<script th:src"@{/layui/lay/modules/layer.js}">script>
<link th:href"@{/layui/css/layui.css}"rel"stylesheet">
<link th:href"@{/layui/css/modules/layer/default/layer.css}"rel"stylesheet">
<link th:href"@{/css/style.css}"rel"stylesheet">
<style type"text/css">
#connectcontainer{
margin:0auto;
width:400px;
}

#connectcontainer div{
padding:5px;
margin:07px10px0;
}

.messageinput{
padding:5px;
margin:07px10px0;
}

.layuibtn{
display:inlineblock;
}
style>
<script type"text/javascript">
var stompClientnull;

$(function(){
var target$("#target");
if(window.location.protocol'http:'){
target.val('http://'+window.location.host+target.val());
}else{
target.val('https://'+window.location.host+target.val());
}
});

function setConnected(connected){
var connect$("#connect");
var disconnect$("#disconnect");
var echo$("#echo");

if(connected){
connect.addClass("layui-btn-disabled");
disconnect.removeClass("layui-btn-disabled");
echo.removeClass("layui-btn-disabled");
}else{
connect.removeClass("layui-btn-disabled");
disconnect.addClass("layui-btn-disabled");
echo.addClass("layui-btn-disabled");
}

connect.attr("disabled",connected);
disconnect.attr("disabled",!connected);
echo.attr("disabled",!connected);
}

//连接
function connect(){
var target$("#target").val();

var wsnew SockJS(target);
stompClientStomp.over(ws);

stompClient.connect({},function(){
setConnected(true);
log('Info: STOMP connection opened.');

//连接成功后,主动拉取未读消息
pullUnreadMessage("/topic/reply");

//订阅服务端的/topic/reply地址
stompClient.subscribe("/user/topic/reply",function(response){
log(JSON.parse(response.body).content);
})
},function(){
//断开处理
setConnected(false);
log('Info: STOMP connection closed.');
});
}

//断开连接
function disconnect(){
if(stompClient!=null){
stompClient.disconnect();
stompClientnull;
}
setConnected(false);
log('Info: STOMP connection closed.');
}

//向指定用户发送消息
function sendMessage(){
if(stompClient!=null){
var receiver$("#receiver").val();
var msg$("#message").val();
log('Sent: '+JSON.stringify({'receiver':receiver,'msg':msg}));

$.ajax({
url:"/wsTemplate/sendToUser",
type:"POST",
dataType:"json",
async:true,
data:{
"receiver":receiver,
"msg":msg
},
success:function(data){

}
});
}else{
layer.msg('STOMP connection not established, please connect.',{
offset:'auto'
,icon:2
});
}
}

//从服务器拉取未读消息
function pullUnreadMessage(destination){
$.ajax({
url:"/wsTemplate/pullUnreadMessage",
type:"POST",
dataType:"json",
async:true,
data:{
"destination":destination
},
success:function(data){
if(data.result!=null){
$.each(data.result,function(i,item){
log(JSON.parse(item).content);
})
}else if(data.code!=null&&data.code"500"){
layer.msg(data.msg,{
offset:'auto'
,icon:2
});
}
}
});
}

//日志输出
function log(message){
console.debug(message);
}
script>
head>
<body>
<noscript><h2 style"color: #ff0000">Seems your browser doesn't support Javascript! Websockets rely on Javascript being
enabled. Please enable
Javascript and reload this page!


Chat With STOMP Message



















用户评论

三年约

哇,消息队列真强大!能把东西传递到不同的地方,太酷了!

    有9位网友表示赞同!

无关风月

所以说这个方案是用来解决WebSocket连接的问题吗?分散压力感觉不错!

    有8位网友表示赞同!

念初

看起来挺专业的啊,我需要好好学习一下消息队列的使用方法。

    有18位网友表示赞同!

等量代换

分布式系统越来越常见了,这篇文章正好让我了解一下如何使用消息队列来提高性能。

    有17位网友表示赞同!

一别经年

WebSocket-websocket 分布式?听着很复杂哎,能不能简单解释一下?

    有17位网友表示赞同!

我的黑色迷你裙

我一直对分布式技术感兴趣,这篇文章看起来挺值得一看的!

    有6位网友表示赞同!

風景綫つ

有没有什么专门用于 WebSocket 的消息队列软件推荐?

    有19位网友表示赞同!

一笑傾城゛

这个方案的好处在哪里呢?性能方面会有哪些提升?

    有8位网友表示赞同!

入骨相思

感觉使用消息队列实现分布式 WebSocket 会提高系统的稳定性吧?

    有15位网友表示赞同!

残花为谁悲丶

我之前接触过WebSocket,但是没有听说过用消息队列来实现分发的,真开眼界!

    有8位网友表示赞同!

灵魂摆渡人

这篇文章让我对消息队列和 WebSocket 的结合有了更深入的理解。

    有8位网友表示赞同!

青楼买醉

学习一下这个方案,说不定以后可以用到实际项目中呢!

    有17位网友表示赞同!

■□丶一切都无所谓

看来使用消息队列确实能带来很多好处,将来得好好研究一下。

    有17位网友表示赞同!

(り。薆情海

这篇文章的标题写的真简洁明了,容易让人理解文章内容。

    有13位网友表示赞同!

夏至离别

我对大数据和分布式系统很有兴趣,这个主题非常吸引我!

    有6位网友表示赞同!

米兰

看评论区里大家都在讨论分布式技术,感觉这个领域很热门呢!

    有17位网友表示赞同!

花菲

希望能看到更多关于消息队列和 WebSocket 的文章!

    有14位网友表示赞同!

本文采摘于网络,不代表本站立场,转载联系作者并注明出处:https://www.iotsj.com//kuaixun/7468.html

联系我们

在线咨询:点击这里给我发消息

微信号:666666