教你用純Java實現一個即時通訊系統(附源碼)

雜文論 發佈 2022-08-10T04:53:30.177283+00:00

項目背景和各位讀者大致介紹下具體場景,線上的小程序中開放一些語音麥克風的房間,讓用戶進入房間之後可以互相通過語音聊天的方式進行互動。這裡分享一下相關的技術設計方案。這款系統的核心點設計在於如何能讓一個用戶發出的語音通知到其他用戶上邊。

項目背景

和各位讀者大致介紹下具體場景,線上的小程序中開放一些語音麥克風的房間,讓用戶進入房間之後可以互相通過語音聊天的方式進行互動。

這裡分享一下相關的技術設計方案。這款系統的核心點設計在於如何能讓一個用戶發出的語音通知到其他用戶上邊。語音數據在客戶端同事的處理下最終變成了io數據流請求到了後端,後端只需要將這些數據流傳達給各個不同的終端即可達到廣播通知的效果。

單機版架構

最初期上線的時候,為了趕速度,快速試錯,所以簡單地採用了單機版架構去設計。結合技術棧為 SpringBoot,Websocket,MySQL技術。

線上一間語音房間的同時在線人數並不會特別多,大概在15-50人的區間段內,系統核心代碼是通過SpringBoot內部的WebSocket技術去進行數據的主動推送。

設計思路

整體的設計圖比較簡單,基本就是一台伺服器存儲WebSocket連接,如下圖所示:

用戶進行WebSocket初始化連接的時候需要一個連接分配和存儲的過程:

早期的存儲是存放在了伺服器本地的一個Map集合中。

當WebSocket進行連接的時候就會往內存中寫入一條數據信息,當連結斷開的時候,就將內存中的數據移除。然後進行語音廣播的時候需要結合WebSocket內部的廣播發送功能進行通知

看似設計比較簡單,但是在後期業務變得龐大的時候出現了瓶頸。因為隨著參加語音活動用戶的增加,越來越多的WebSocketSession對象需要被存儲到內存當中,這種有狀態性的存儲對於單機擴容不靈活。

設計缺陷

1.假設原先的伺服器擴容到了A,B兩台機器,A用戶在A機器上邊建立了WebSocketSession,B用戶在B機器上邊建立的WebSocketSession連接。此時如果A想要和B進行對話發送,需要先查找到具體WebSocketSession存放在哪台機器上邊。

2.當用戶出現了網絡異常,臨時斷開連接進行重連的時候,也可能會出現1所說的問題。

集群架構

設計思路

一旦出現需要發送語音通知的時候,發送一條廣播的mq消息,每個機器都接收到消息之後,觸發自己的廣播操作即可。

RocketMq的接入系統設計裡面mq採用的是廣播模式,這和我們通常使用的集群模式有一定的區別。

消息隊列RocketMQ版是基於發布或訂閱模型的消息系統。消費者,即消息的訂閱方訂閱關注的Topic,以獲取並消費消息。由於消費者應用一般是分布式系統,以集群方式部署,因此消息隊列RocketMQ版約定以下概念:

集群:使用相同Group ID的消費者屬於同一個集群。同一個集群下的消費者消費邏輯必須完全一致(包括Tag的使用)。

集群消費:當使用集群消費模式時,消息隊列RocketMQ版認為任意一條消息只需要被集群內的任意一個消費者處理即可。

廣播消費:當使用廣播消費模式時,消息隊列RocketMQ版會將每條消息推送給集群內所有註冊過的消費者,保證消息至少被每個消費者消費一次。

集群消費模式適用場景 適用於消費端集群化部署,每條消息只需要被處理一次的場景。此外,由於消費進度在服務端維護,可靠性更高。具體消費示例如下圖所示。

注意事項

集群消費模式下,每一條消息都只會被分發到一台機器上處理。如果需要被集群下的每一台機器都處理,請使用廣播模式。

集群消費模式下,不保證每一次失敗重投的消息路由到同一台機器上。

廣播消費模式適用場景 適用於消費端集群化部署,每條消息需要被集群下的每個消費者處理的場景。具體消費示例如下圖所示。

注意事項

廣播消費模式下不支持順序消息。

廣播消費模式下不支持重置消費位點。

每條消息都需要被相同訂閱邏輯的多台機器處理。

消費進度在客戶端維護,出現重複消費的概率稍大於集群模式。

廣播模式下,消息隊列RocketMQ版保證每條消息至少被每台客戶端消費一次,但是並不會重投消費失敗的消息,因此業務方需要關注消費失敗的情況。

廣播模式下,客戶端每一次重啟都會從最新消息消費。客戶端在被停止期間發送至服務端的消息將會被自動跳過,請謹慎選擇。

廣播模式下,每條消息都會被大量的客戶端重複處理,因此推薦儘可能使用集群模式。

廣播模式下服務端不維護消費進度,所以消息隊列RocketMQ版控制台不支持消息堆積查詢、消息堆積報警和訂閱關係查詢功能。

這裡面的應用場景需要對集群內部對每個消費者都對伺服器內存中的socket連接進行session是否存在對判斷,因此需要採用mq的廣播模式。

關於mq部分的接入代碼

Consumer模塊的配置:

package org.idea.web.socket.config;

import org.springframework.boot.context.properties.ConfigurationProperties;

/**

* @Author linhao

* @Date created in 10:30 上午 2021/5/10

*/

@ConfigurationProperties(prefix = "rocketmq.consumer")

public class MqConsumerConfig {

private boolean isOn;

private String groupName;

private String nameSrvAddr;

private String topics;

private Integer consumeThreadMin;

private Integer consumeThreadMax;

private Integer consumeMessageBatchMaxSize;


/**

getter 和 setter部分省略

**/

}


Producer模塊的配置展示:


package org.idea.web.socket.config;

import org.springframework.boot.context.properties.ConfigurationProperties;

/**

* @Author linhao

* @Date created in 10:26 上午 2021/5/10

*/

@ConfigurationProperties(prefix = "rocketmq.producer")

public class MqProducerConfig {

private boolean isOn;

private String groupName;

private String nameSrvAddr;

private Integer maxMessageSize;

private Integer sendMsgTimeout;

private Integer retryTimesWhenSendFailed;


/**

getter 和 setter部分省略

**/

}

RocketMq內部的消費端Bean配置


package org.idea.web.socket.mq;

import lombok.extern.slf4j.Slf4j;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;

import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import org.apache.rocketmq.client.Exception.MQClientException;

import org.apache.rocketmq.common.consumer.ConsumeFromWhere;

import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import org.idea.web.socket.config.MqConsumerConfig;

import org.idea.web.socket.config.MqProducerConfig;

import org.springframework.boot.autoconfigure.AutoConfigureAfter;

import org.springframework.boot.autoconfigure.AutoConfigureBefore;

import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;

import org.springframework.boot.context.properties.EnableConfigurationProperties;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;

/**

* @Author linhao

* @Date created in 10:34 上午 2021/5/10

*/

@Configuration

@Slf4j

@EnableConfigurationProperties({MqConsumerConfig.class})

public class MqConsumerAutoConfig {

@Resource

private MqConsumerConfig mqConsumerConfig;

@Resource

//這個接口需要手動實現順序消費的邏輯 每次獲取到消息隊列的第一條數據

private MessageListenerHandler messageListenerConcurrently;

@Bean

@ConditionalOnMissingBean

public DefaultMQPushConsumer defaultMQPushConsumer() {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();

consumer.setNamesrvAddr(mqConsumerConfig.getNameSrvAddr());

consumer.setConsumerGroup(mqConsumerConfig.getGroupName());

consumer.setConsumeThreadMin(mqConsumerConfig.getConsumeThreadMin());

consumer.setConsumeThreadMax(mqConsumerConfig.getConsumeThreadMax());

consumer.registerMessageListener(messageListenerConcurrently);

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

//消費模型是什麼?

consumer.setMessageModel(MessageModel.BROADCASTING);

//默認一次拉取一條消費

consumer.setConsumeMessageBatchMaxSize(mqConsumerConfig.getConsumeMessageBatchMaxSize());

//*表示訂閱所有的tag

try {

consumer.subscribe(mqConsumerConfig.getTopics(), "*");

consumer.start();

log.info("【 MqConsumerAutoConfig 】mq consumer is started!");

} catch (Exception e) {

log.error("mq start fail,e is ", e);

}

return consumer;

}

}


RocketMq的服務生產者Bean配置

package org.idea.web.socket.mq;

import lombok.extern.slf4j.Slf4j;

import org.apache.rocketmq.client.producer.DefaultMQProducer;

import org.idea.web.socket.config.MqProducerConfig;

import org.springframework.boot.autoconfigure.AutoConfigureAfter;

import org.springframework.boot.autoconfigure.AutoConfigureBefore;

import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;

import org.springframework.boot.context.properties.EnableConfigurationProperties;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;

/**

* @Author linhao

* @Date created in 11:05 上午 2021/5/10

*/

@Configuration

@Slf4j

@EnableConfigurationProperties({MqProducerConfig.class})

public class MqProducerAutoConfig {

@Resource

private MqProducerConfig mqProducerConfig;

@Bean

@ConditionalOnMissingBean

//意味著DefaultMQProducer的配置可以被覆蓋

public DefaultMQProducer defaultMQProducer() {

DefaultMQProducer producer = new DefaultMQProducer(mqProducerConfig.getGroupName());

producer.setNamesrvAddr(mqProducerConfig.getNameSrvAddr());

//沒有則自動創建topic的key

// producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");

producer.setMaxMessageSize(mqProducerConfig.getMaxMessageSize());

producer.setSendMsgTimeout(mqProducerConfig.getSendMsgTimeout());

producer.setRetryTimesWhenSendFailed(mqProducerConfig.getRetryTimesWhenSendFailed());

try {

producer.start();

log.info("【 MqProducerAutoConfig 】mq producer is started!");

} catch (Exception e) {

log.error("[MqProducerAutoConfig] start fail, e is ", e);

}

return producer;

}

}

然後是對RocketMq內部發送消息事件的一層函數封裝

package org.idea.web.socket.mq;

import com.alibaba.fastjson.JSON;

import lombok.extern.slf4j.Slf4j;

import org.apache.commons.lang3.StringUtils;

import org.apache.rocketmq.client.producer.DefaultMQProducer;

import org.apache.rocketmq.client.producer.SendResult;

import org.apache.rocketmq.common.message.Message;

import org.apache.rocketmq.remoting.common.RemotingHelper;

import org.idea.web.socket.config.MqProducerConfig;

import org.idea.web.socket.dto.BroadcastMqDTO;

import org.springframework.stereotype.Component;

import javax.annotation.Resource;

import java.io.UnsupportedEncodingException;

/**

* 消息廣播發送端

*

* @Author linhao

* @Date created in 10:43 下午 2021/5/9

*/

@Component

@Slf4j

public class BroadcastMqProducer {

@Resource

private DefaultMQProducer defaultMQProducer;

@Resource

private MqProducerConfig mqProducerConfig;

private static String TOPIC = "ws-topic";

private static String TAGS = "ws-tag";


public static Integer ALL_USER_RECEIVE_TYPE = 1;

public static Integer ONE_USER_RECEIVE_TYPE = 2;

/**

* 點對點之間的消息發送

*

* @param destSessionKey

* @param msg

* @return

*/

public SendResult sendWebSocketToUser(String destSessionKey,String msg) {

if (StringUtils.isEmpty(msg)) {

log.error("[sendWebSocketToUser] msg can not be null!");

return null;

}

Message message = null;

SendResult sendResult = null;

try {

BroadcastMqDTO broadcastMqDTO = new BroadcastMqDTO();

broadcastMqDTO.setEventType(ONE_USER_RECEIVE_TYPE);

broadcastMqDTO.setMessage(msg);

broadcastMqDTO.setSessionKey(destSessionKey);

message = new Message(TOPIC, TAGS, (JSON.toJSONString(broadcastMqDTO)).getBytes(RemotingHelper.DEFAULT_CHARSET));

sendResult = defaultMQProducer.send(message);

} catch (Exception e) {

log.error("[sendWebSocketBroadcastMsg] e is ", e);

}

return sendResult;

}

/**

* 廣播消息發送

*

* @param msg

* @return

*/

public SendResult sendWebSocketBroadcastMsg(String msg) {

if (StringUtils.isEmpty(msg)) {

log.error("[sendWebSocketBroadcastMsg] msg can not be null!");

return null;

}

Message message = null;

SendResult sendResult = null;

try {

BroadcastMqDTO broadcastMqDTO = new BroadcastMqDTO();

broadcastMqDTO.setEventType(ALL_USER_RECEIVE_TYPE);

broadcastMqDTO.setMessage(msg);

message = new Message(TOPIC, TAGS, (JSON.toJSONString(broadcastMqDTO)).getBytes(RemotingHelper.DEFAULT_CHARSET));

sendResult = defaultMQProducer.send(message);

} catch (Exception e) {

log.error("[sendWebSocketBroadcastMsg] e is ", e);

}

return sendResult;

}

}

對消息的訂閱模塊實現代碼如下:

package org.idea.web.socket.mq;

import com.alibaba.fastjson.JSON;

import com.oracle.tools.packager.Log;

import lombok.extern.slf4j.Slf4j;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import org.apache.rocketmq.common.message.MessageExt;

import org.idea.web.socket.dto.BroadcastMqDTO;

import org.idea.web.socket.manager.SocketManager;

import org.springframework.messaging.simp.SimpMessagingTemplate;

import org.springframework.stereotype.Component;

import org.springframework.util.CollectionUtils;

import org.springframework.web.socket.WebSocketSession;

import javax.annotation.Resource;

import java.util.List;

import static org.idea.web.socket.mq.BroadcastMqProducer.ALL_USER_RECEIVE_TYPE;

import static org.idea.web.socket.mq.BroadcastMqProducer.ONE_USER_RECEIVE_TYPE;

/**

* @Author linhao

* @Date created in 10:59 上午 2021/5/10

*/

@Component

@Slf4j

public class MessageListenerHandler implements MessageListenerConcurrently {

@Resource

private SocketManager socketManager;

@Resource

private SimpMessagingTemplate template;

@Override

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {

if (CollectionUtils.isEmpty(list)) {

Log.info("receive empty msg");

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

MessageExt messageExt = list.get(0);

byte[] bytes = messageExt.getBody();

String json = new String(bytes);

BroadcastMqDTO broadcastMqDTO = JSON.parseObject(json, BroadcastMqDTO.class);

log.info("[MessageListenerHandler] broadcastMqDTO is " + broadcastMqDTO);

if (ALL_USER_RECEIVE_TYPE.equals(broadcastMqDTO.getEventType())) {

log.info("[consumeMessage] 廣播發送消息:觸發----》消息內容為:" + broadcastMqDTO);

template.convertAndSend("/topic/sendTopic", broadcastMqDTO);

} else if (ONE_USER_RECEIVE_TYPE.equals(broadcastMqDTO.getEventType())) {

String sessionKey = broadcastMqDTO.getSessionKey();

WebSocketSession webSocketSession = socketManager.get(sessionKey);

if (webSocketSession != null) {

template.convertAndSendToUser(sessionKey, "/queue/sendUser", broadcastMqDTO.getMessage());

log.info("[consumeMessage] 點對點發送消息;觸發----》消息內容為:" + broadcastMqDTO);

}

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

}

整體設計結構如下圖:

於是按照這個結構進行了一版本的緊急開發疊代,原先的單台伺服器擴展為了服務集群。

業務拓展後續產品經理提出一個需求,要求支持在同一間房內的兩個用戶之間發送悄悄話功能。這就需要我們進行一個點對點之間傳輸通訊的功能了。因此需要在mq通知到每台機器的時候加一個本地Session遍歷的邏輯,如果當前機器存有用戶token對應的session變量,那麼就單獨針對那個Session進行WebSocket的發送通知。

設計弊端一旦某台機器出現了異常崩潰,那麼就意味著這台機器上的所有語音連接可能會出現中斷情況。目前這一塊的問題也在考慮解決,計劃是將WebSocketSession存入到分布式緩存的redis中保證數據可靠存儲,但是在後續嘗試的時候發現WebSocketSession對象沒有實現序列化接口,在存儲到Redis的時候會出現異常。目前這個問題還在尋找解決思路中,不知道各位讀者朋友們有什麼好的思路。

關鍵字: