WebSocket开发记录落地功能

前言

在上文:WebSocket开发(一对一聊天) 完成了一对一聊天的功能,但是消息补偿的功能并没有验证,这需要将客户端id的设置参数进行修改。

而且光日志打印记录WebSocket事件的流转有点不靠谱,所以需要将事件进行落地,结构化数据像用户登陆记录用户代收消息用户在线状态操作日志等业务线强的数据可以放到mysql中,像聊天记录图片漫游等已经落地的消息数据可以放到mongodb、es中备份存储。这里demo为了方便就都使用mysql存储。

在这里插入图片描述

1. 持久化设计

1.1 引入持久层框架

引入持久层框架,这里使用mybatis-plus

添加依赖

        <!-- mybatis-plus 所需依赖  -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.5.1</version>
        </dependency>
        <!-- MySQL连接 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>

配置yml文件

server:
  port: 5822
mybatis-plus:
  global-config:
    db-config:
      id-type: auto
      field-strategy: not_empty
      column-underline: true
      logic-delete-value: 0
      logic-not-delete-value: 1
      db-type: mysql
    refresh: false
  configuration:
    map-underscore-to-camel-case: true
    cache-enabled: false
spring:
  datasource:
    url: jdbc:mysql://127.0.0.1/chatroom-im?useUnicode=true&characterEncoding=utf8
    driver-class-name: com.mysql.jdbc.Driver
    username: root
    password: xxxx

启动类配置

启动类注解指定mapper包地址

@MapperScan("com.an.im.mapper")

1.2 表设计

因为目前没有增加用户落地的概念先不加用户表了,只按客户端定义的id为客户端用户标识,统计一下目前的流程中需要增加那些表。

  • 用户连接记录表
    • 描述:客户端建立/断开连接的日志记录表;
    • 作用:追溯数据使用
  • 客户端发送消息表
    • 描述:客户端发送的消息记录表;
    • 作用:追溯跟对照数据使用
  • 服务端发送消息表
    • 描述:服务端发送的消息记录表;
    • 作用:追溯跟对照数据使用
  • 一对一消息记录表
    • 描述:客户端发送的消息记录表;
    • 作用:双客户端聊天数据记录漫游
  • 消息补偿表
    • 描述:客户端待接收的消息记录表
    • 作用:客户端连接补偿消息使用
  • 异常记录表
    • 描述:产生异常的日志收集表
    • 作用:排除异常情况使用

1.2.1 用户连接记录表

此表主要统计用户连接跟断连的日志,核心字段就是用户id时间事件类型(连接/断连)

CREATE TABLE `chatroom-im`.`USER_LOGIN_EVENT`  (
  `id` int(0) NOT NULL,
  `uid` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '用户id',
  `event_type` tinyint(1) NOT NULL COMMENT '0:连接;1:断连',
  `trigger_date` datetime(0) NOT NULL COMMENT '事件触发时间',
  `create_date` datetime(0) NOT NULL ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '创建日期',
  `update_date` datetime(0) NOT NULL ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '修改日期',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

1.2.2 客户端发送消息表

客户端发送的所有消息都要记录下来,这一步可以异步操作,作为消息的落地存储,核心字段为客户端id时间消息内容

CREATE TABLE `chatroom-im`.`client_send_msg`  (
  `id` int(0) NOT NULL,
  `uid` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '客户端id',
  `info_msg` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '信息明细',
  `send_date` datetime(0) NOT NULL COMMENT '发送时间',
  `create_date` datetime(0) NOT NULL ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '创建日期',
  `update_date` datetime(0) NOT NULL ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '修改日期',
  `del_flag` tinyint(1) NOT NULL DEFAULT 0 COMMENT '是否删除(0:未删除;1:已删除)',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

1.2.2 服务端发送消息表

服务端发送消息存储的信息跟客户端的类似,将存储的客户端id修改为接收端id就可以复用

CREATE TABLE `chatroom-im`.`server_send_msg`  (
  `id` int(0) NOT NULL,
  `accept_id` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '接收端id',
  `info_msg` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '信息明细',
  `send_date` datetime(0) NOT NULL COMMENT '发送时间',
  `create_date` datetime(0) NOT NULL ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '创建日期',
  `update_date` datetime(0) NOT NULL ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '修改日期',
  `del_flag` tinyint(1) NOT NULL DEFAULT 0 COMMENT '是否删除(0:未删除;1:已删除)',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

1.2.3 一对一消息记录表

明确已经双端接收的消息进行记录作为漫游使用,核心字段:发送端id接收端id发送消息明细id接收消息明细id消息内容发送时间接收时间

CREATE TABLE `chatroom-im`.`Untitled`  (
  `id` int(0) NOT NULL,
  `send_id` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '发送端id',
  `accept_id` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '接收端id',
  `send_msg_id` int(0) NULL DEFAULT NULL COMMENT '发送消息明细id',
  `accept_msg_id` int(0) NULL DEFAULT NULL COMMENT '接收消息明细id',
  `info_msg` text CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '消息内容',
  `msg_send_date` datetime(0) NULL DEFAULT NULL COMMENT '消息发送时间',
  `accept_date` datetime(0) NULL DEFAULT NULL COMMENT '消息接收时间',
  `create_date` datetime(0) NOT NULL ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '创建日期',
  `update_date` datetime(0) NOT NULL ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '修改日期',
  `del_flag` tinyint(1) NOT NULL DEFAULT 0 COMMENT '是否删除(0:未删除;1:已删除)',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

1.2.4 消息补偿表

这个表主要记录需要补充的消息记录来做消息补偿历史数据追溯。核心字段:接收端id消息内容补偿时间补偿状态

CREATE TABLE `chatroom-im`.`client_compensate_msg`  (
  `id` int(0) NOT NULL,
  `accept_id` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '接收端id',
  `info_msg` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '信息明细',
  `send_date` datetime(0) NOT NULL COMMENT '发送时间',
	`compensate_satus` tinyint(1) NOT NULL COMMENT '补偿状态(0:未补偿;1:已补偿;2:失败)',
  `create_date` datetime(0) NOT NULL ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '创建日期',
  `update_date` datetime(0) NOT NULL ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '修改日期',
  `del_flag` tinyint(1) NOT NULL DEFAULT 0 COMMENT '是否删除(0:未删除;1:已删除)',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

1.2.5 异常记录表

异常记录主要作为在OnError事件中发生异常内容的记录,核心字段:客户端id异常内容触发事件

CREATE TABLE `chatroom-im`.`error_event_msg`  (
  `id` int(0) NOT NULL,
  `uid` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '接收端id',
  `error_msg` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '异常明细',
  `trigger_date` datetime(0) NOT NULL COMMENT '触发时间',
  `create_date` datetime(0) NOT NULL ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '创建日期',
  `update_date` datetime(0) NOT NULL ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '修改日期',
  `del_flag` tinyint(1) NOT NULL DEFAULT 0 COMMENT '是否删除(0:未删除;1:已删除)',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

2. 事件持久化

表结构既然定义好了,就在各事件触发时进行持久化操作,需要先对这些表建立Mybatis-plus的实体跟Maaper类,这里不写出来了,后面会给出git地址

WebSocketserver里注入对应会有以下问题:

问题在websocket的server文件里是无法使用@autowired注解自动注入的
原因spring容器管理的是单例的,他只会注入一次,而websocket是多对象的,当有新的用户使用的时候,他就会新创建一个websocket对象,这就导致了用户创建的websocket对象都不能注入对象了,所以在运行的时候就会发生注入对象为null的情况;
解决方法把需要注入的service声明为静态对象,如下代码:

    private static BaseWebSocketService baseWebSocketService;

    @Autowired
    public void setService(BaseWebSocketService baseWebSocketService){
        WebSocketClient.baseWebSocketService = baseWebSocketService;
    }

这里的持久化操作我使用一个统一的接口BaseWebSocketService异步来进行处理,不会影响主业务并且方便以后可以调整是否持久化

2.1 用户连接记录持久化

用户记录的持久化是在OnOpen事件中进行的添加,代码如下:

    @OnOpen
    public void onOpen(Session session,@PathParam("clientId") String clientId){
        if (!webSocketClientMap.containsKey(clientId)){
            onlineUsers.addAndGet(1);
        }
        webSocketClientMap.put(clientId,this);
        infoSession = session;
        log.info("客户端:{}建立连接,当前在线人数:{}",clientId,onlineUsers.get());
        /**
         * 持久化
         */
        baseWebSocketService.saveUserLoginEvent(clientId,(byte) 0,new Date());
        /**
         * 消息补偿
         */
        if (!CollectionUtils.isEmpty(this.ToBeSentMap.get(clientId))){
            this.ToBeSentMap.get(clientId).forEach(userMessageModel->{
                this.sendMessage(BaseResponseMessage.success(userMessageModel));
            });
        }
    }
  • 持久化操作:saveUserLoginEvent
  • 核心字段:用户id事件类型(连接/断连)时间

user_login_event 表内数据验证:

在这里插入图片描述

2.2 客户端发送记录持久化

客户端发送信息服务端是在onMessage这个事件中接收的,因此持久化操作也在这个方法里实现,这个持久化是做信息记录的,所以只要是发送上来的数据都进行记录,将它放到方法的最前面。

代码如下:

	@OnMessage
    public void onMessage(String message, Session session,@PathParam("clientId") String clientId){
        /**
         * 持久化
         */
        baseWebSocketService.saveClientSendMsg(clientId,message,new Date());
        /**
         * 处理消息
         */
        UserMessageModel userMessageModel = JSONObject.parseObject(message, UserMessageModel.class);
        if (userMessageModel == null){
            this.sendMessage(BaseResponseMessage.error(null,"传递参数结构异常"));
        }
        if(!webSocketClientMap.containsKey(userMessageModel.getAcceptId())){
            // 放到待发送列表里
            if(!this.ToBeSentMap.containsKey(userMessageModel.getAcceptId())){
                this.ToBeSentMap.put(userMessageModel.getAcceptId(),new CopyOnWriteArrayList<>());
            }
            List<UserMessageModel> addList = this.ToBeSentMap.get(userMessageModel.getAcceptId());
            addList.add(userMessageModel);
            log.info("客户端:{} 发送消息到接受端:{} 不在线,放置到代发送列表,当前待发送列表:{}条",clientId,userMessageModel.getAcceptId(), addList.size());
            this.sendMessage(BaseResponseMessage.error(null,"接收端不在线"));
        }else{
            log.info("客户端:{} 发送到客户端:{},消息内容:{}",clientId,userMessageModel.getAcceptId(),userMessageModel.getMessage());
            webSocketClientMap.get(userMessageModel.getAcceptId()).sendMessage(BaseResponseMessage.success(userMessageModel));
            this.sendMessage(BaseResponseMessage.success(userMessageModel));
        }
    }
  • 持久化操作:saveClientSendMsg
  • 核心字段:客户端id时间消息内容

client_send_msg 表内数据验证:

在这里插入图片描述

2.3 服务端发送记录持久化

服务端发送消息是需要找到对应客户端的Session进行send事件的,我们之前创建了一个方法sendMessage专门用来做发送消息使用,所以将持久化的操作放到这里来。

但是在这里发现传参没有加客户端id,但是每次发送数据都传参客户端id并不太方便也不好维护,所以定义一个类的局部变量,在建立连接时将客户端id放到这个局部变量中

伪代码:

    private String clientId;
    
    @OnOpen
    public void onOpen(Session session,@PathParam("clientId") String clientId){
		this.clientId = clientId;
	}

这样可以直接在sendMessage方法中拿到所属的客户端id

代码如下:

    private void  sendMessage(Object message){
        try {
            baseWebSocketService.saveServerSendMsg(message,this.clientId,new Date());
            this.infoSession.getBasicRemote().sendObject(message);
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (EncodeException e) {
            throw new RuntimeException(e);
        }
    }
  • 持久化操作:saveServerSendMsg
  • 核心字段:接收端id时间消息内容

server_send_msg 表内数据验证:

在这里插入图片描述

2.4 一对一消息记录持久化

一对一的记录需要摘选发送端id接收端id消息内容发送时间接受时间等,处理逻辑相较复杂写,这里不考虑数据一致性,否则还有很多事情需要做,只是建立基础的信息记录。

创建一个新的方法来拆分需要记录的参数和异步记录,代码如下:

    private void toCSucceed(UserMessageModel userMessageModel){
        WebSocketClient webSocketClient = webSocketClientMap.get(userMessageModel.getAcceptId());
        BaseResponseMessage infoMsg = BaseResponseMessage.success(userMessageModel);
        /**
         * 持久化
         */
        baseWebSocketService.saveCTOCMsg(this.clientId,webSocketClient.clientId,JSONObject.toJSONString(infoMsg),new Date(),new Date());
        /**
         * 发送消息
         */
        webSocketClient.sendMessage(infoMsg);
        this.sendMessage(infoMsg);
        log.info("客户端:{} 发送到客户端:{},消息内容:{}",clientId,userMessageModel.getAcceptId(),userMessageModel.getMessage());
    }
  • 持久化操作:saveCTOCMsg
  • 核心字段:发送端id接收端id消息内容发送时间接收时间 (因为是异步操作就不计消息明细的对应id了)

c_to_c_msg 表内数据验证:

在这里插入图片描述

2.5 消息补偿记录持久化

这个也是一个比较核心的功能,记录肯定要补偿的,这一部分可以替换ConcurrentHashMap<String,List<UserMessageModel>>直接存到Mysql中,每次连接去mysql读取有没有需要补偿的记录。

2.5.1 写入消息补偿记录

之前是在服务端接受信息OnMessage事件中如果接收端不在线就放入补偿列表里,现在直接将这步调整为写入mysql消息补偿表

代码如下:

    @OnMessage
    public void onMessage(String message, Session session,@PathParam("clientId") String clientId){
        /**
         * 持久化
         */
        baseWebSocketService.saveClientSendMsg(clientId,message,new Date());
        /**
         * 处理消息
         */
        UserMessageModel userMessageModel = JSONObject.parseObject(message, UserMessageModel.class);
        if (userMessageModel == null){
            this.sendMessage(BaseResponseMessage.error(null,"传递参数结构异常"));
        }
        if(!webSocketClientMap.containsKey(userMessageModel.getAcceptId())){
            // 放到待发送列表里
            /*if(!this.ToBeSentMap.containsKey(userMessageModel.getAcceptId())){
                this.ToBeSentMap.put(userMessageModel.getAcceptId(),new CopyOnWriteArrayList<>());
            }
            List<UserMessageModel> addList = this.ToBeSentMap.get(userMessageModel.getAcceptId());
            addList.add(userMessageModel);*/
            baseWebSocketService.saveClientCompensateMsg(userMessageModel.getAcceptId(),message,(byte) 0);
            log.info("客户端:{} 发送消息到接受端:{} 不在线,放置到代发送列表,当前待发送列表:{}条",clientId,userMessageModel.getAcceptId());
            this.sendMessage(BaseResponseMessage.error(null,"接收端不在线"));
        }else{
            this.toCSucceed(userMessageModel);
        }
    }
  • 持久化操作:saveClientCompensateMsg
  • 核心字段:接收端id消息内容补偿状态

client_compensate_msg 表内数据验证:

在这里插入图片描述

2.5.2 消息补偿

消息补偿是在OnOpen事件中进行的,不再通过内存中的Map结构进行补偿,改为根据客户端id查看mysql中有没有需要补偿的数据。

代码如下:

    @OnOpen
    public void onOpen(Session session,@PathParam("clientId") String clientId){
        if (!webSocketClientMap.containsKey(clientId)){
            onlineUsers.addAndGet(1);
        }
        this.clientId = clientId;
        webSocketClientMap.put(clientId,this);
        infoSession = session;
        log.info("客户端:{}建立连接,当前在线人数:{}",clientId,onlineUsers.get());
        /**
         * 持久化
         */
        baseWebSocketService.saveUserLoginEvent(clientId,(byte) 0,new Date());
        /**
         * 消息补偿
         */
        /*
        if (!CollectionUtils.isEmpty(this.ToBeSentMap.get(clientId))){
            this.ToBeSentMap.get(clientId).forEach(userMessageModel->{
                this.sendMessage(BaseResponseMessage.success(userMessageModel));
            });
        }
        */
        List<ClientCompensateMsg> list = baseWebSocketService.queryClientCompensateMsg(clientId,0);
        if (!CollectionUtils.isEmpty(list)){
            list.forEach(userMessageModel->{
            	log.info("消息补偿记录,客户端:{},消息内容:{}",clientId,userMessageModel);
                this.sendMessage(BaseResponseMessage.success(userMessageModel));
            });
        }
    }
  • 获取补偿列表:queryClientCompensateMsg
  • 核心字段条件:接收端id未补偿状态

验证:

补偿表中有一条110ID的客户端有代发送记录,将前端的uid参数设置由时间戳改为110

var uid = 110;

重启服务进行连接验证

日志验证:

在这里插入图片描述


web验证:

在这里插入图片描述


补偿成功后将补偿表对应数据状态进行修改

2.6 异常记录持久化

这个操作比较简单,只要触发onError事件就将信息存储起来即可

代码如下:

    @OnError
    public void onError(Session session, Throwable error, @PathParam("clientId") String clientId){
        log.error("连接异常:{}",error.getMessage());
        baseWebSocketService.saveErrorEventMsg(clientId,error.getMessage());
    }
  • 持久化操作:saveErrorEventMsg
  • 核心字段:客户端id异常内容

验证:

模拟一个WebSocket的异常,随便抛出一个异常即可,只要触发onError事件即可

在这里插入图片描述


数据库记录:

在这里插入图片描述


到此完成了基础的数据落地,以上代码细节没有经过推敲所以并不太完善,大概模拟了一个落地的场景,实际记录落地还要考虑各样的 数据一致性丢数据补数据异常状态场景机制重试等等功能,所以理解这个想法即可。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


学习编程是顺着互联网的发展潮流,是一件好事。新手如何学习编程?其实不难,不过在学习编程之前你得先了解你的目的是什么?这个很重要,因为目的决定你的发展方向、决定你的发展速度。
IT行业是什么工作做什么?IT行业的工作有:产品策划类、页面设计类、前端与移动、开发与测试、营销推广类、数据运营类、运营维护类、游戏相关类等,根据不同的分类下面有细分了不同的岗位。
女生学Java好就业吗?女生适合学Java编程吗?目前有不少女生学习Java开发,但要结合自身的情况,先了解自己适不适合去学习Java,不要盲目的选择不适合自己的Java培训班进行学习。只要肯下功夫钻研,多看、多想、多练
Can’t connect to local MySQL server through socket \'/var/lib/mysql/mysql.sock问题 1.进入mysql路径
oracle基本命令 一、登录操作 1.管理员登录 # 管理员登录 sqlplus / as sysdba 2.普通用户登录
一、背景 因为项目中需要通北京网络,所以需要连vpn,但是服务器有时候会断掉,所以写个shell脚本每五分钟去判断是否连接,于是就有下面的shell脚本。
BETWEEN 操作符选取介于两个值之间的数据范围内的值。这些值可以是数值、文本或者日期。
假如你已经使用过苹果开发者中心上架app,你肯定知道在苹果开发者中心的web界面,无法直接提交ipa文件,而是需要使用第三方工具,将ipa文件上传到构建版本,开...
下面的 SQL 语句指定了两个别名,一个是 name 列的别名,一个是 country 列的别名。**提示:**如果列名称包含空格,要求使用双引号或方括号:
在使用H5混合开发的app打包后,需要将ipa文件上传到appstore进行发布,就需要去苹果开发者中心进行发布。​
+----+--------------+---------------------------+-------+---------+
数组的声明并不是声明一个个单独的变量,比如 number0、number1、...、number99,而是声明一个数组变量,比如 numbers,然后使用 nu...
第一步:到appuploader官网下载辅助工具和iCloud驱动,使用前面创建的AppID登录。
如需删除表中的列,请使用下面的语法(请注意,某些数据库系统不允许这种在数据库表中删除列的方式):
前不久在制作win11pe,制作了一版,1.26GB,太大了,不满意,想再裁剪下,发现这次dism mount正常,commit或discard巨慢,以前都很快...
赛门铁克各个版本概览:https://knowledge.broadcom.com/external/article?legacyId=tech163829
实测Python 3.6.6用pip 21.3.1,再高就报错了,Python 3.10.7用pip 22.3.1是可以的
Broadcom Corporation (博通公司,股票代号AVGO)是全球领先的有线和无线通信半导体公司。其产品实现向家庭、 办公室和移动环境以及在这些环境...
发现个问题,server2016上安装了c4d这些版本,低版本的正常显示窗格,但红色圈出的高版本c4d打开后不显示窗格,
TAT:https://cloud.tencent.com/document/product/1340