在开发过程中,咱们常常遇到需要对前台的列表数据,实现实时展示最新的几条数据,或者是调度的任务进度条实现实时的改写……,而对于这种需求,无状况的http协议显然无法满意咱们的需求,于是websocket协议应运而生。websocket协议本质上是一个根据tcp的协议,是双向通讯协议,实现了浏览器和客户端的实时通讯,接纳端和发送端能够互相发送或接纳音讯。

本文整合websocket办法选用后台自定义Endpoint,前端运用内置的WebSocket

一、SpringBoot装备

1、开发环境

SpringBoot:2.5.13 JDK:1.8

2、引进pom文件

<dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

3、新建WebSocketConfig装备类

在装备类中手动注入ServerEndpointExporter,这个bean会自动注册运用了@ServerEndpoint注解声明的Websocket端点。@ServerEndpoint注解与@Controller注解相似,都是用来装备恳求的uri。

@Configuration
public class WebSocketConfig {
    /**
     * ServerEndpointExporter类的作用是,会扫描所有的服务器端点,
     * 把带有@ServerEndpoint 注解的所有类都添加进来
     * 
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter(){
        return new ServerEndpointExporter();
    }
}

4、新建WebSocketServer服务类

server类用来对与前端树立的websocket衔接做出相应的呼应,一起通过该类咱们能够自动向前台推送音讯。在该类中咱们无法运用通过@Resource和@Autowired注入spring容器中的bean,由于spring容器办理的bean都是以单例的形式存在的,而websocket服务类则能够对应多个客户端。 项目初始化启动时,会初始化websocket服务类,此时还没有用户衔接,spring会为其注入service, 所以该目标的service不是null,而当新用户树立websocket衔接时,体系会新建一个websocket服务类目标,但不会注入service,导致后续用户衔接websocket服务类中的service 都是null。

@Slf4j
@Component
@ServerEndpoint("/notice/{userId}")
public class WebSocketServer {
    /**
     * 解决无法注入bean:定义静态service目标,通过@Autowired在体系启动时为静态变量赋值
     * @Autowired 注解作用在办法上,假如办法没有参数,spring容器会在类加载完后履行一次这个办法,
     * 假如办法中有参数的话,还会从容器中自动注入这个办法的参数,然后履行一次这个办法。
     */
    public static XxService xxService;
    @Autowired
    public void setXxService(XxService xxService){
        WebSocketServer.xxService = xxService;
    }
    //存储客户端session信息
    public static Map<String, Session> clients = new ConcurrentHashMap<>();
    //存储把不同用户的客户端session信息调集
    public static Map<String, Set<String>> connection = new ConcurrentHashMap<>();
    //会话id
    private String sid = null;
    //树立衔接的用户id
    private String userId;
    /**
     * @description: 当与前端的websocket衔接成功时,履行该办法
     * @PathParam 获取ServerEndpoint路径中的占位符信息相似 控制层的 @PathVariable注解
     **/
    @OnOpen
    public void onOpen(Session session, @PathParam("userId") String userId){
        this.sid = UUID.randomUUID().toString();
        this.userId = userId;
        clients.put(this.sid,session);
        //判别该用户是否存在会话信息,不存在则添加
        Set<String> clientSet = connection.get(userId);
        if (clientSet == null){
            clientSet = new HashSet<>();
            connection.put(userId,clientSet);
        }
        clientSet.add(this.sid);
        log.info(this.userId + "用户树立衔接," + this.sid+"衔接敞开!");
    }
     /**
     * @description: 当衔接失败时,履行该办法
     **/
    @OnClose
    public void onClose(){
        clients.remove(this.sid);
        log.info(this.sid+"衔接断开");
    }
    /**
     * @description: 当收到前台发送的音讯时,履行该办法
     **/
    @OnMessage
    public void onMessage(String message,Session session) {
        log.info("收到来自用户:" + this.userId + "的信息   " + message);
        //自定义音讯实体
        ViewQueryInfoDTO viewQueryInfoDTO = JSON.parseObject(message, ViewQueryInfoDTO.class);
        viewQueryInfoDTO.setUserId(this.userId);
        //判别该次恳求的音讯类型是心跳检测仍是获取信息
         if (viewQueryInfoDTO.getType().equals("heartbeat")){
            //立刻向前台发送音讯,代表后台正常运转
            sendMessageByUserId(this.userId,new MessageInfo("heartbeat","ok"));
        }
        if (viewQueryInfoDTO.getType().equals("message")){
            //履行事务逻辑
            MessageInfo messageInfo = xxService.list(viewQueryInfoDTO);
            sendMessageByUserId(this.userId,messageInfo);
        }
    }
    /**
     * @description: 当衔接产生错误时,履行该办法
     **/
    @OnError
    public void onError(Throwable error){
        log.info("体系错误");
        error.printStackTrace();
    }
    /**
     * @description: 通过userId向用户发送信息
     * 该类定义成静态能够配合守时任务实现守时推送
     **/
    public static void sendMessageByUserId(String userId, MessageInfo message){
        if (!StringUtils.isEmpty(userId)) {
            Set<String> clientSet = connection.get(userId);
            //用户是否存在客户端衔接
            if (Objects.nonNull(clientSet)) {
                Iterator<String> iterator = clientSet.iterator();
                while (iterator.hasNext()) {
                    String sid = iterator.next();
                    Session session = clients.get(sid);
                    //向每个会话发送音讯
                    if (Objects.nonNull(session)){
                        try {
                            String jsonString = JSON.toJSONString(message);
                            //同步发送数据,需要等上一个sendText发送完成才履行下一个发送
                            session.getBasicRemote().sendText(jsonString);
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    }
}

5、修正SecurityConfig装备类

体系中运用权限结构时,需要对端点进行放行。

    @Override
    protected void configure(HttpSecurity httpSecurity) throws Exception
    {
        httpSecurity.authorizeRequests().antMatchers("/notice/**").anonymous();
    }

假如恳求仍是被阻拦,则再加入下面的装备。

    //忽略websocket阻拦
    @Override
    public void configure(WebSecurity webSecurity){
        webSecurity.ignoring().antMatchers("/notice/**");
    }

6、运用衔接工具测验

websocket在线调试网址:websocket/ws/wss在线调试测验工具 (jackxiang.com)

​ 模拟前台:

SpringBoot+Vue整合WebSocket实现实时通讯

后台信息:

SpringBoot+Vue整合WebSocket实现实时通讯

二、Vue装备

1、编写websocket.js文件

//露出自定义websocket目标
export const socket = {
  //后台恳求路径
  url: "",
  //websocket目标
  websocket: null,
  //websocket状况
  websocketState: false,
  //从头衔接次数
  reconnectNum: 0,
  //重连锁状况,保证重连按次序履行
  lockReconnect: false,
  //守时器信息
  timeout: null,
  clientTimeout: null,
  serverTimeout: null,
  //初始化办法,根据url创建websocket目标封装基本衔接办法,并重置心跳检测
  initWebSocket(newUrl) {
    socket.url = newUrl;
    socket.websocket = new WebSocket(socket.url);
    socket.websocket.onopen = socket.websocketOnOpen;
    socket.websocket.onerror = socket.websocketOnError;
    socket.websocket.onclose = socket.websocketOnClose;
    this.resetHeartbeat()
  },
  reconnect() {
    //判别衔接状况
    if (socket.lockReconnect) return;
    socket.reconnectNum += 1;
    //从头衔接三次还未成功调用衔接封闭办法
    if (socket.reconnectNum === 3) {
      socket.reconnectNum = 0;
      socket.websocket.onclose()
      return;
    }
    //等候本次重连完成后再进行下一次
    socket.lockReconnect = true;
    //5s后进行从头衔接
    socket.timeout = setTimeout(() => {
      socket.initWebSocket(socket.url);
      socket.lockReconnect = false;
    }, 5000);
  },
  //重置心跳检测
  resetHeartbeat() {
    socket.heartbeat();
  },
  //心跳检测
  heartbeat() {
    socket.clientTimeout = setTimeout(() => {
      if (socket.websocket) {
        //向后台发送音讯进行心跳检测
        socket.websocket.send(JSON.stringify({ type: "heartbeat" }));
        socket.websocketState = false;
        //一分钟内服务器不呼应则封闭衔接
        socket.serverTimeout = setTimeout(() => {
          if (!socket.websocketState) {
            socket.websocket.onclose()
          } else {
            this.resetHeartbeat()
          }
        }, 60 * 1000);
      }
    }, 3 * 1000);
  },
  //发送音讯
  sendMsg(message) {
    socket.websocket.send(message);
  },
  websocketOnOpen(event) {
    //衔接敞开后向后台发送音讯进行一次心跳检测
    socket.sendMsg(JSON.stringify({ type: "heartbeat" }));
  },
  websocketOnError(error) {
    console.log(error);
    socket.reconnect();
  },
  websocketOnClose() {
    socket.websocket.close();
  },
};

2、组件中运用websocket

<script>
//引进socket目标
import { socket } from "@/utils/websocket";
export default {
 data() {
    return {
      loading: true,
      websocketCount: -1,
      //查询条件
      queryCondition: {
        type: "message",
      },
   }
 },
 created() {
    //初始化websocket目标
    //window.location.host获取ip和端口,
    //process.env.VUE_APP_WEBSOCKET_BASE_API获取恳求前缀
    socket.initWebSocket(
      `ws:${window.location.host}${process.env.VUE_APP_WEBSOCKET_BASE_API}/notice/` +
        userId
    );
    //绑定接纳音讯办法
    socket.websocket.onmessage = this.websocketOnMessage;
 },
 methods: {
    init() {
      this.queryCondition.type = "message";
      socket.sendMsg(JSON.stringify(this.queryCondition));
    },
    websocketOnMessage(event) {
      //初始化界面时,自动向后台发送一次音讯,获取数据
      this.websocketCount += 1;
      if (this.websocketCount === 0) {
        this.init();
      }
      let info = JSON.parse(event.data);
      switch (info.type) {
        case "heartbeat":
          socket.websocketState = true;
          break;
        case "message":
          this.loading = true;
          this.$nextTick(() => {
            this.consumeMessage(info);
          })
          break;
        case "error":
          this.loading = false;
          break;
      }
    },
    consumeMessage(info) {
      //拿到最新数据从头渲染界面
    },
 }
}
</script>

三、前后台衔接测验

后端能够正常通信呼应数据,至此整合websocket完毕。

SpringBoot+Vue整合WebSocket实现实时通讯