最近在项目中在做一个音讯推送的功能,比方客户下单之后告诉给给对应的客户发送系统告诉,这种音讯推送需要运用到全双工的websocket推送音讯。

所谓的全双工表示客户端和服务端都能向对方发送音讯。不运用同样是全双工的http是因为http只能由客户端主动发起恳求,服务接纳后返回音讯。websocket树立起衔接之后,客户端和服务端都能主动向对方发送音讯。

上一篇文章Spring Boot 整合单机websocket介绍了websocket在单机形式下进行音讯的发送和接纳:

Websocket集群解决方案

用户A用户Bweb服务器树立衔接之后,用户A发送一条音讯到服务器,服务器再推送给用户B,在单机系统上一切的用户都和同一个服务器树立衔接,一切的session都存储在同一个服务器中。

单个服务器是无法支撑几万人一起衔接同一个服务器,需要运用到分布式或许集群将恳求衔接负载均衡到到不同的服务下。音讯的发送方和接纳方在同一个服务器,这就和单体服务器类似,能成功接纳到音讯:

Websocket集群解决方案

但负载均衡运用轮询的算法,无法保证音讯发送方和接纳方处于同一个服务器,当发送方和接纳方不是在同一个服务器时,接纳方是无法接受到音讯的:

Websocket集群解决方案

websocket集群问题处理思路

客户端和服务端每次树立衔接时分,会创立有状态的会话session,服务器的保存维持衔接的session。客户端每次只能和集群服务器其间的一个服务器衔接,后续也是和该服务器进行数据传输。

要处理集群的问题,应该考虑session同享的问题,客户端成功衔接服务器之后,其他服务器也知道客户端衔接成功。

计划一:session 同享(不可行)

websocket类似的http是如何处理集群问题的?处理计划之一便是同享session,客户端登录服务端之后,将session信息存储在Redis数据库中,衔接其他服务器时,从Redis获取session,实践便是将session信息存储在Redis中,完成redis的同享。

session能够被同享的条件是能够被序列化,而websocketsession是无法被序列化的,httpsession记录的是恳求的数据,而websocketsession对应的是衔接,衔接到不同的服务器,session也不同,无法被序列化。

计划二:ip hash(不可行)

http不运用session同享,就能够运用Nginx负载均衡的ip hash算法,客户端每次都是恳求同一个服务器,客户端的session都保存在服务器上,而后续恳求都是恳求该服务器,都能获取到session,就不存在分布式session问题了。

websocket相对http来说,能够由服务端主动推进音讯给客户端,假如接纳音讯的服务端和发送音讯音讯的服务端不是同一个服务端,发送音讯的服务端无法找到接纳音讯对应的session,即两个session不处于同一个服务端,也就无法推送音讯。如下图所示:

Websocket集群解决方案

处理问题的办法是将一切音讯的发送方和接纳方都处于同一个服务器下,而音讯发送方和接纳方都是不确定的,显然是无法完成的。

计划三:播送形式

将音讯的发送方和接纳方都处于同一个服务器下才能发送音讯,那么能够转换一下思路,能够将音讯以音讯播送的方法告诉给一切的服务器,能够运用音讯中间件发布订阅形式,音讯脱离了服务器的限制,通过发送到中间件,再发送给订阅的服务器,类似播送相同,只要订阅了音讯,都能接纳到音讯的告诉:

Websocket集群解决方案

发布者发布音讯到音讯中间件,音讯中间件再将发送给一切订阅者:

Websocket集群解决方案

播送形式的完成

建立单机 websocket

参考曾经写的websocket单机建立 文章,先建立单机websocket完成音讯的推送。

1. 增加依靠

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-freemarker</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. 创立 ServerEndpointExporter 的 bean 实例

ServerEndpointExporter 的 bean 实例主动注册 @ServerEndpoint 注解声明的 websocket endpoint,运用springboot自带tomcat发动需要该装备,运用独立 tomcat 则不需要该装备。

@Configuration
public class WebSocketConfig {
    //tomcat发动无需该装备
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

3. 创立服务端点 ServerEndpoint 和 客户端端

  • 服务端点
@Component
@ServerEndpoint(value = "/message")
@Slf4j
public class WebSocket {
	private static Map<String, WebSocket> webSocketSet = new ConcurrentHashMap<>();
	private Session session;
	@OnOpen
	public void onOpen(Session session) throws SocketException {
		this.session = session;
		webSocketSet.put(this.session.getId(),this);
		log.info("【websocket】有新的衔接,总数:{}",webSocketSet.size());
	}
	@OnClose
	public void onClose(){
		String id = this.session.getId();
		if (id != null){
			webSocketSet.remove(id);
			log.info("【websocket】衔接断开:总数:{}",webSocketSet.size());
		}
	}
	@OnMessage
	public void onMessage(String message){
		if (!message.equals("ping")){
			log.info("【wesocket】收到客户端发送的音讯,message={}",message);
			sendMessage(message);
		}
	}
	/**
	 * 发送音讯
	 * @param message
	 * @return
	 */
	public void sendMessage(String message){
		for (WebSocket webSocket : webSocketSet.values()) {
			webSocket.session.getAsyncRemote().sendText(message);
		}
		log.info("【wesocket】发送音讯,message={}", message);
	}
}
  • 客户端点
<div>
    <input type="text" name="message" id="message">
    <button id="sendBtn">发送</button>
</div>
<div style="width:100px;height: 500px;" id="content">
</div>
<script src="https://cdn.bootcdn.net/ajax/libs/jquery/3.6.0/jquery.js"></script>
<script type="text/javascript">
    var ws = new WebSocket("ws://127.0.0.1:8080/message");
    ws.onopen = function(evt) {
        console.log("Connection open ...");
    };
    ws.onmessage = function(evt) {
        console.log( "Received Message: " + evt.data);
        var p = $("<p>"+evt.data+"</p>")
        $("#content").prepend(p);
        $("#message").val("");
    };
    ws.onclose = function(evt) {
        console.log("Connection closed.");
    };
    $("#sendBtn").click(function(){
        var aa = $("#message").val();
        ws.send(aa);
    })
</script>

服务端和客户端中的OnOpenoncloseonmessage都是一一对应的。

  • 服务发动后,客户端ws.onopen调用服务端的@OnOpen注解的办法,储存客户端的session信息,握手树立衔接。
  • 客户端调用ws.send发送音讯,对应服务端的@OnMessage注解下面的办法接纳音讯。
  • 服务端调用session.getAsyncRemote().sendText发送音讯,对应的客户端ws.onmessage接纳音讯。

增加 controller

@GetMapping({"","index.html"})
public ModelAndView index() {
	ModelAndView view = new ModelAndView("index");
	return view;
}

作用展现

翻开两个客户端,其间的一个客户端发送音讯,另一个客户端也能接纳到音讯。

Websocket集群解决方案

增加 RabbitMQ 中间件

这儿运用比较常用的RabbitMQ作为音讯中间件,而RabbitMQ支持发布订阅形式

Websocket集群解决方案

增加音讯订阅

交换机运用扇形交换机,音讯分发给每一条绑定该交换机的行列。以服务器地点的IP + 端口作为仅有标识作为行列的命名,发动一个服务,运用行列绑定交换机,完成音讯的订阅:

@Configuration
public class RabbitConfig {
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("PUBLISH_SUBSCRIBE_EXCHANGE");
    }
    @Bean
    public Queue psQueue() throws SocketException {
        // ip + 端口 为行列名 
        String ip = IpUtils.getServerIp() + "_" + IpUtils.getPort();
        return new Queue("ps_" + ip);
    }
    @Bean
    public Binding routingFirstBinding() throws SocketException {
        return BindingBuilder.bind(psQueue()).to(fanoutExchange());
    }
}

获取服务器IP和端口能够具体查看Github源码,这儿就不做详细描述了。

修正服务端点 ServerEndpoint

WebSocket增加音讯的接纳办法,@RabbitListener 接纳音讯,行列称号运用常量命名,动态行列称号运用 #{name},其间的nameQueuebean 称号:

@RabbitListener(queues= "#{psQueue.name}")
public void pubsubQueueFirst(String message) {
  System.out.println(message);
  sendMessage(message);
}

然后再调用sendMessage办法发送给地点衔接的客户端。

修正音讯发送

WebSocket类的onMessage办法将音讯发送改成RabbitMQ方法发送:

@OnMessage
public void onMessage(String message){
  if (!message.equals("ping")){
    log.info("【wesocket】收到客户端发送的音讯,message={}",message);
    //sendMessage(message);
    if (rabbitTemplate == null) {
      rabbitTemplate = (RabbitTemplate) SpringContextUtil.getBean("rabbitTemplate");
    }
    rabbitTemplate.convertAndSend("PUBLISH_SUBSCRIBE_EXCHANGE", null, message);
  }
}

音讯告诉流程如下所示:

Websocket集群解决方案

发动两个实例,模仿集群环境

翻开idea的Edit Configurations

Websocket集群解决方案

点击左上角的COPY,然后增加端口server.port=8081

Websocket集群解决方案

发动两个服务,端口分别是80808081。在发动8081端口的服务,将前端衔接端口改成8081:

var ws = new WebSocket("ws://127.0.0.1:8081/message");

作用展现

Websocket集群解决方案

源码

github源码

参考

  • Spring Websocket in a tomcat cluster

  • WebSocket 集群计划