作者 | 張超
責編 | 伍杏玲
最近我在做連接服務的結構調整,梳理到其中的一個功能點:同帳號連續登錄,舊的連接會被新的連接踢掉的功能。
第一個感覺這是個簡單的需求點,不就是新的建立好了,然後把舊的連接踢掉?
可仔細想想就會發現一個問題:是在新的連接建立好了再踢掉舊的連接還是先踢掉舊的連接,然後再建立新的連接呢?如果存在多個同時進行的新連接呢,是不是就有數據競爭了?如果集群要是有多個機房,多個集群該如何高效的實現這個功能點?
如何既要考慮功能的實現,又要考慮工程實現的複雜程度以及後續的維護成本,是否滿足對服務的整體服務質量的要求?在本文中,我和大家一起探討下。
功能需求的描述
同一個帳號(這裡的同一個帳號,指的是系統的唯一識別碼,如果系統支持同一個帳號的多端登錄, 則指的式特定的某一個端)登錄系統時,後登錄的連接保留,而之前建立的連接將默認被踢掉(斷開連接,只是斷開的原因是踢掉)。踢掉舊的連接之後主要達到下面的兩個效果:
1、新連接能夠正常的進行操作。
一般的操作是新連接會覆蓋舊連接,在這種情況下,不會影響新連接的所有工作。在通常的實現下,確實是這樣的,覆蓋之後肯定舊連接沒法正常找到,所以發送給連接的數據能夠正常到達新的連接上。
但有些系統為了保證連接的活躍性,採用的連接是定時進行刷新自己的連接信息。在這種情況下,就會出現新連接被覆蓋的情況,具體關於連接的維護的後續的文章中在詳細的介紹。
2、舊連接不能再發送或者接收任何的數據。
如果不踢掉的話,舊的連接至少能夠進行數據上傳,該數據在特定的場景下就會帶來業務上的不一致性,甚至是錯誤;當然這種情況下收到數據也是有可能的,1中描述的實現策略是一種情況,我接觸的發布訂閱的模型就是新的訂閱默認是不會覆蓋舊的訂閱,所以是兩者共存。
實現方案
下面介紹幾種解決方案,並進行優缺點比較。該文章中的數據均是基於Erlang版本,其他的程式語言除去第一種之外都是一樣的。
一、利用可比較的程序標識實現:
%% 新創建連接的邏輯
open_session(SID, User, Server, Resource, Priority, Info) ->
set_session(SID, User, Server, Resource, Priority, Info),
check_for_sessions_to_replace(User, Server, Resource),
JID = jid:make(User, Server, Resource),
ejabberd_hooks:run(sm_register_connection_hook,
JID#jid.lserver, [SID, JID, Info]).
%% 設置新的連接信息,新的連接信息是可以存儲在不同的資料庫中,目前在ejabberd的實現中就
%% 已經支持了mnesia,redis,mysql,postgresql等資料庫的實現
set_session(SID, User, Server, Resource, Priority, Info) ->
LUser = jid:nodeprep(User),
LServer = jid:nameprep(Server),
LResource = jid:resourceprep(Resource),
US = {LUser, LServer},
USR = {LUser, LServer, LResource},
set_session(#session{sid = SID, usr = USR, us = US,
priority = Priority, info = Info}).
-spec set_session(#session{}) -> ok | {error, any}.
set_session(#session{us = {LUser, LServer}} = Session) ->
Mod = get_sm_backend(LServer),
case Mod:set_session(Session) of
ok ->
case use_cache(Mod, LServer) of
true ->
ets_cache:delete(?SM_CACHE, {LUser, LServer},
cache_nodes(Mod, LServer));
false ->
ok
end;
{error, _} = Err ->
Err
end.
%% 檢查已經存在的連接,如果檢測到連接已經是離線的狀態,就直接進行刪除相應的連接信息
%% 連接是在線的狀態的話,則保留最大的連接,其他的連接則踢掉(這裡通過下面的replaced消息處理)
check_for_sessions_to_replace(User, Server, Resource) ->
LUser = jid:nodeprep(User),
LServer = jid:nameprep(Server),
LResource = jid:resourceprep(Resource),
check_existing_resources(LUser, LServer, LResource),
check_max_sessions(LUser, LServer).
-spec check_existing_resources(binary, binary, binary) -> ok.
check_existing_resources(LUser, LServer, LResource) ->
Mod = get_sm_backend(LServer),
Ss = get_sessions(Mod, LUser, LServer, LResource),
if Ss == -> ok;
true ->
SIDs = [SID || #session{sid = SID} <- Ss],
MaxSID = lists:max(SIDs),
lists:foreach(fun ({_, Pid} = S) when S /= MaxSID ->
ejabberd_c2s:route(Pid, replaced);
(_) -> ok
end,
SIDs)
end.
%% 生成連接的唯一ID,這裡加入了時間戳,而且是單調遞增的數據
%% self加入到sid中,能夠保證sid的唯一性,pid在erlang中是能夠保證唯一性的,具體的
%% 實現原理有興趣可以自行查找
make_sid ->
{misc:unique_timestamp, self}.
%% misc.erl
unique_timestamp ->
{MS, S, _} = erlang:timestamp,
{MS, S, erlgang:unique_integer([positive, monotonic]) rem 1000000}.
原理說明:
1. 設置新的連接信息是這段代碼中的set_session方法;
2. 查找已經存在的連接是代碼中的get_sessions,並沒有列出完整的實現;
3. 踢掉舊的連接,通過上一步查到的所有連接,根據連接的大小關係,除了最大的連接保留之外,其他的所有連接都執行踢掉的邏輯。
關鍵點的說明:
1. 連接的唯一識別從單一的帳號信息,添加了一個唯一識別碼(即代碼中的sid),有了這個識別標誌之後,在寫入新的連接數據同時,不會覆蓋原有的舊連接,進而保證了後續的連接清理工作。
2. 在連接信息中添加了一個可進行比較的欄位sid,有了這規則之後,無論怎樣的踢掉操作都能夠保證競爭的雙方保留的結果是一致的。即使在競爭的情況下,也不會出現刪除錯誤的情況。
3. 下面列舉了一些可能的時序場景:
(1)正常的登錄場景
(2)登錄過程在setSession之後之後被其他的登錄過程打斷:
(3)新的登錄過程插入到另一個登錄過程中:
(4)雖然業務是A是先執行setSession但是,但是寫入的sid卻比後寫入的sid要大,比如A在寫入時網絡延時很大等情況都有可能出現如下的情況:
二、利用獨立的鎖結構實現
%% 1. 獲取新鍵連接的鎖,獲取到相應的鎖之後
%% 2. 踢掉之前已經存在的舊連接
%% 3. 初始化新的連接
open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
CleanStart = fun(_)->
ok = discard_session(ClientId),
%% 具體的初始化新連接的具體邏輯
Session = emqx_session:init(ClientInfo, ConnInfo),
{ok, #{session => Session, present => false}}
end,
%% 這裡沒有列舉獲取鎖的相關過程,處理完所有的邏輯之後,釋放獲取的鎖
emqx_cm_locker:trans(ClientId, CleanStart);
%% 根據連接的ClientID找到具體的ChanPid,然後循環執行discard_session
discard_session(ClientId) when is_binary(ClientId) ->
case lookup_channels(ClientId) of
-> ok;
ChanPids ->
lists:foreach(
fun(ChanPid)->
try
discard_session(ClientId, ChanPid)
catch
_:Error:_Stk ->
?LOG(error, "Failed to discard ~p: ~p", [ChanPid, Error])
end
end, ChanPids)
end.
%% 具體的操作踢掉的邏輯
discard_session(ClientId, ChanPid) when node(ChanPid) == node ->
case get_chan_attrs(ClientId, ChanPid) of
#{conninfo := #{conn_mod := ConnMod}} ->
ConnMod:call(ChanPid, discard);
undefined -> ok
end;
原理說明如下:
1. 根據連接的標識(ClientID)來獲取對應的鎖,該鎖是排他的,emqx的代碼中的鎖是程序實現通過集群中廣播的方式。
在集群中維護著對應鎖,對應的鎖可以藉助其他的資料庫,比如Redis等方式實現同樣的鎖的功能。不過要注意鎖的釋放,否則將會影響正常的用戶登錄。
2. 查找已經存在的連接,踢掉相應的連接。
3. 初始化新創建的連接。
討論方案的對比
以上兩種實現是現在使用相對較多的長連接服務的登錄,並且踢掉舊連接的邏輯。
1. 功能實現:
兩種方案都能完整地實現新連接踢掉舊連接的過程。
2. 實現的複雜度:
Ejabberd的實現通過添加一個可比較的sid的方式,保留最大sid的連接。在存在競爭的情況下,依然能保證邏輯的正確性;EMQX採用的是比較常見的加鎖的方式來避免競爭帶來的數據不一致性問題。
總的來說,實現的複雜度Ejabberd相對複雜一些,這裡不包括EMQX實現的鎖功能的複雜度。
3. 理解的難易程度:
Ejabberd的理解難度相對較大,因為要考慮多種情況下的服務表現是否符合預期;EMQX通過直接加鎖的方式來避免了這些竟態的出現。Ejabberd的理解難度更大。
4. 數據存儲方面:
Ejabberd的設計是同一個用戶的連接信息在資料庫是有一組的(不同的sid),而EMQX的存儲只會有一個(其中 ClientID 為唯一性的key)。
從這方面來看,EMQX在存儲上會有一定的優勢,在沒有考慮對鎖的存儲需求下。
5. 集群擴展性:
Ejabberd由於做到了無狀態的新創建連接,能進行很好地擴展。EMQX由於實現是在集群中進行全局鎖的情況,機器的擴展會加大獲取鎖的成本,雖然採用並發調用的邏輯,但是在網絡等不穩定的條件下,可能會出現獲取鎖比較慢,或者失敗的情況,所以整體的可擴展性會較差一些。
多機房、多集群的部署方案
在多機房、多集群的部署形態下,由於連接的信息都需要進行跨機房的訪問,會存在很大的延時和挑戰。
然而多機房的部署主要是提供多機房的活躍互相備份,以保證在其中任何一個機房出現問題也不影響整體的服務。
所以如果是在多機房的模式下,建議通過負載的方式來將同一個帳號連接到一個機房/集群,這樣問題就重新降到解決一個集群中的踢掉問題。
結語
登錄連接這個看似簡單、但又不簡單的需求,其中涉及到對服務的預期和要求。我們可採用結構簡單的加鎖方式來實現,也可以通過在業務層通過自己的選擇策略來進行踢出連接的操作。
選擇策略時要保證分布式情況下保持一致(切忌將除了自己之外的其他連接都踢掉,這樣就會出現多個連接的選擇策略不同)。實現功能的方式可以是多種選擇的,關鍵看具體的需求是什麼樣,選擇一種適合團隊的方式才是最重要的。
傳送門:
EMQX源碼:https://github.com/emqx/emqx
Ejabberd源碼:https://github.com/processone/ejabberd
作者簡介:張超,360 IoT 雲連接服務技術負責人,畢業於南京大學。曾從事過遊戲開發、IM 服務端開發,目前從事物聯網接入層相關工作。