1 %% The contents of this file are subject to the Mozilla Public License
2 %% Version 1.1 (the "License"); you may not use this file except in
3 %% compliance with the License. You may obtain a copy of the License at
4 %% http://www.mozilla.org/MPL/
6 %% Software distributed under the License is distributed on an "AS IS"
7 %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8 %% License for the specific language governing rights and limitations
11 %% The Original Code is RabbitMQ.
13 %% The Initial Developer of the Original Code is VMware, Inc.
14 %% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
19 %% Guaranteed Multicast
20 %% ====================
22 %% This module provides the ability to create named groups of
23 %% processes to which members can be dynamically added and removed,
24 %% and for messages to be broadcast within the group that are
25 %% guaranteed to reach all members of the group during the lifetime of
26 %% the message. The lifetime of a message is defined as being, at a
27 %% minimum, the time from which the message is first sent to any
28 %% member of the group, up until the time at which it is known by the
29 %% member who published the message that the message has reached all
32 %% The guarantee given is that provided a message, once sent, makes it
33 %% to members who do not all leave the group, the message will
34 %% continue to propagate to all group members.
36 %% Another way of stating the guarantee is that if member P publishes
37 %% messages m and m', then for all members P', if P' is a member of
38 %% the group prior to the publication of m, and P' receives m', then
41 %% Note that only local-ordering is enforced: i.e. if member P sends
42 %% message m and then message m', then for-all members P', if P'
43 %% receives m and m', then they will receive m' after m. Causality
44 %% ordering is _not_ enforced. I.e. if member P receives message m
45 %% and as a result publishes message m', there is no guarantee that
46 %% other members P' will receive m before m'.
52 %% Mnesia must be started. Use the idempotent create_tables/0 function
53 %% to create the tables required.
56 %% Provide the group name, the callback module name, and any arguments
57 %% you wish to be passed into the callback module's functions. The
58 %% joined/2 function will be called when we have joined the group,
59 %% with the arguments passed to start_link and a list of the current
60 %% members of the group. See the callbacks specs and the comments
61 %% below for further details of the callback functions.
64 %% Provide the Pid. Removes the Pid from the group. The callback
65 %% terminate/2 function will be called.
68 %% Provide the Pid and a Message. The message will be sent to all
69 %% members of the group as per the guarantees given above. This is a
70 %% cast and the function call will return immediately. There is no
71 %% guarantee that the message will reach any member of the group.
73 %% confirmed_broadcast/2
74 %% Provide the Pid and a Message. As per broadcast/2 except that this
75 %% is a call, not a cast, and only returns 'ok' once the Message has
76 %% reached every member of the group. Do not call
77 %% confirmed_broadcast/2 directly from the callback module otherwise
78 %% you will deadlock the entire group.
81 %% Provide the Pid. Returns a proplist with various facts, including
82 %% the group name and the current group members.
85 %% Provide the group name. Removes its mnesia record. Makes no attempt
86 %% to ensure the group is empty.
88 %% Implementation Overview
89 %% -----------------------
91 %% One possible means of implementation would be a fan-out from the
92 %% sender to every member of the group. This would require that the
93 %% group is fully connected, and, in the event that the original
94 %% sender of the message disappears from the group before the message
95 %% has made it to every member of the group, raises questions as to
96 %% who is responsible for sending on the message to new group members.
97 %% In particular, the issue is with [ Pid ! Msg || Pid <- Members ] -
98 %% if the sender dies part way through, who is responsible for
99 %% ensuring that the remaining Members receive the Msg? In the event
100 %% that within the group, messages sent are broadcast from a subset of
101 %% the members, the fan-out arrangement has the potential to
102 %% substantially impact the CPU and network workload of such members,
103 %% as such members would have to accommodate the cost of sending each
104 %% message to every group member.
106 %% Instead, if the members of the group are arranged in a chain, then
107 %% it becomes easier to reason about who within the group has received
108 %% each message and who has not. It eases issues of responsibility: in
109 %% the event of a group member disappearing, the nearest upstream
110 %% member of the chain is responsible for ensuring that messages
111 %% continue to propagate down the chain. It also results in equal
112 %% distribution of sending and receiving workload, even if all
113 %% messages are being sent from just a single group member. This
114 %% configuration has the further advantage that it is not necessary
115 %% for every group member to know of every other group member, and
116 %% even that a group member does not have to be accessible from all
117 %% other group members.
119 %% Performance is kept high by permitting pipelining and all
120 %% communication between joined group members is asynchronous. In the
121 %% chain A -> B -> C -> D, if A sends a message to the group, it will
122 %% not directly contact C or D. However, it must know that D receives
123 %% the message (in addition to B and C) before it can consider the
124 %% message fully sent. A simplistic implementation would require that
125 %% D replies to C, C replies to B and B then replies to A. This would
126 %% result in a propagation delay of twice the length of the chain. It
127 %% would also require, in the event of the failure of C, that D knows
128 %% to directly contact B and issue the necessary replies. Instead, the
129 %% chain forms a ring: D sends the message on to A: D does not
130 %% distinguish A as the sender, merely as the next member (downstream)
131 %% within the chain (which has now become a ring). When A receives
132 %% from D messages that A sent, it knows that all members have
133 %% received the message. However, the message is not dead yet: if C
134 %% died as B was sending to C, then B would need to detect the death
135 %% of C and forward the message on to D instead: thus every node has
136 %% to remember every message published until it is told that it can
137 %% forget about the message. This is essential not just for dealing
138 %% with failure of members, but also for the addition of new members.
140 %% Thus once A receives the message back again, it then sends to B an
141 %% acknowledgement for the message, indicating that B can now forget
142 %% about the message. B does so, and forwards the ack to C. C forgets
143 %% the message, and forwards the ack to D, which forgets the message
144 %% and finally forwards the ack back to A. At this point, A takes no
145 %% further action: the message and its acknowledgement have made it to
146 %% every member of the group. The message is now dead, and any new
147 %% member joining the group at this point will not receive the
150 %% We therefore have two roles:
152 %% 1. The sender, who upon receiving their own messages back, must
153 %% then send out acknowledgements, and upon receiving their own
154 %% acknowledgements back perform no further action.
156 %% 2. The other group members who upon receiving messages and
157 %% acknowledgements must update their own internal state accordingly
158 %% (the sending member must also do this in order to be able to
159 %% accommodate failures), and forwards messages on to their downstream
163 %% Implementation: It gets trickier
164 %% --------------------------------
166 %% Chain A -> B -> C -> D
168 %% A publishes a message which B receives. A now dies. B and D will
169 %% detect the death of A, and will link up, thus the chain is now B ->
170 %% C -> D. B forwards A's message on to C, who forwards it to D, who
171 %% forwards it to B. Thus B is now responsible for A's messages - both
172 %% publications and acknowledgements that were in flight at the point
173 %% at which A died. Even worse is that this is transitive: after B
174 %% forwards A's message to C, B dies as well. Now C is not only
175 %% responsible for B's in-flight messages, but is also responsible for
176 %% A's in-flight messages.
178 %% Lemma 1: A member can only determine which dead members they have
179 %% inherited responsibility for if there is a total ordering on the
180 %% conflicting additions and subtractions of members from the group.
182 %% Consider the simultaneous death of B and addition of B' that
183 %% transitions a chain from A -> B -> C to A -> B' -> C. Either B' or
184 %% C is responsible for in-flight messages from B. It is easy to
185 %% ensure that at least one of them thinks they have inherited B, but
186 %% if we do not ensure that exactly one of them inherits B, then we
187 %% could have B' converting publishes to acks, which then will crash C
188 %% as C does not believe it has issued acks for those messages.
190 %% More complex scenarios are easy to concoct: A -> B -> C -> D -> E
191 %% becoming A -> C' -> E. Who has inherited which of B, C and D?
193 %% However, for non-conflicting membership changes, only a partial
194 %% ordering is required. For example, A -> B -> C becoming A -> A' ->
195 %% B. The addition of A', between A and B can have no conflicts with
196 %% the death of C: it is clear that A has inherited C's messages.
198 %% For ease of implementation, we adopt the simple solution, of
199 %% imposing a total order on all membership changes.
201 %% On the death of a member, it is ensured the dead member's
202 %% neighbours become aware of the death, and the upstream neighbour
203 %% now sends to its new downstream neighbour its state, including the
204 %% messages pending acknowledgement. The downstream neighbour can then
205 %% use this to calculate which publishes and acknowledgements it has
206 %% missed out on, due to the death of its old upstream. Thus the
207 %% downstream can catch up, and continues the propagation of messages
208 %% through the group.
210 %% Lemma 2: When a member is joining, it must synchronously
211 %% communicate with its upstream member in order to receive its
212 %% starting state atomically with its addition to the group.
214 %% New members must start with the same state as their nearest
215 %% upstream neighbour. This ensures that it is not surprised by
216 %% acknowledgements they are sent, and that should their downstream
217 %% neighbour die, they are able to send the correct state to their new
218 %% downstream neighbour to ensure it can catch up. Thus in the
219 %% transition A -> B -> C becomes A -> A' -> B -> C becomes A -> A' ->
220 %% C, A' must start with the state of A, so that it can send C the
221 %% correct state when B dies, allowing C to detect any missed
224 %% If A' starts by adding itself to the group membership, A could then
225 %% die, without A' having received the necessary state from A. This
226 %% would leave A' responsible for in-flight messages from A, but
227 %% having the least knowledge of all, of those messages. Thus A' must
228 %% start by synchronously calling A, which then immediately sends A'
229 %% back its state. A then adds A' to the group. If A dies at this
230 %% point then A' will be able to see this (as A' will fail to appear
231 %% in the group membership), and thus A' will ignore the state it
232 %% receives from A, and will simply repeat the process, trying to now
233 %% join downstream from some other member. This ensures that should
234 %% the upstream die as soon as the new member has been joined, the new
235 %% member is guaranteed to receive the correct state, allowing it to
236 %% correctly process messages inherited due to the death of its
237 %% upstream neighbour.
239 %% The canonical definition of the group membership is held by a
240 %% distributed database. Whilst this allows the total ordering of
241 %% changes to be achieved, it is nevertheless undesirable to have to
242 %% query this database for the current view, upon receiving each
243 %% message. Instead, we wish for members to be able to cache a view of
244 %% the group membership, which then requires a cache invalidation
245 %% mechanism. Each member maintains its own view of the group
246 %% membership. Thus when the group's membership changes, members may
247 %% need to become aware of such changes in order to be able to
248 %% accurately process messages they receive. Because of the
249 %% requirement of a total ordering of conflicting membership changes,
250 %% it is not possible to use the guaranteed broadcast mechanism to
251 %% communicate these changes: to achieve the necessary ordering, it
252 %% would be necessary for such messages to be published by exactly one
253 %% member, which can not be guaranteed given that such a member could
256 %% The total ordering we enforce on membership changes gives rise to a
257 %% view version number: every change to the membership creates a
258 %% different view, and the total ordering permits a simple
259 %% monotonically increasing view version number.
261 %% Lemma 3: If a message is sent from a member that holds view version
262 %% N, it can be correctly processed by any member receiving the
263 %% message with a view version >= N.
265 %% Initially, let us suppose that each view contains the ordering of
266 %% every member that was ever part of the group. Dead members are
267 %% marked as such. Thus we have a ring of members, some of which are
268 %% dead, and are thus inherited by the nearest alive downstream
271 %% In the chain A -> B -> C, all three members initially have view
272 %% version 1, which reflects reality. B publishes a message, which is
273 %% forward by C to A. B now dies, which A notices very quickly. Thus A
274 %% updates the view, creating version 2. It now forwards B's
275 %% publication, sending that message to its new downstream neighbour,
276 %% C. This happens before C is aware of the death of B. C must become
277 %% aware of the view change before it interprets the message its
278 %% received, otherwise it will fail to learn of the death of B, and
279 %% thus will not realise it has inherited B's messages (and will
282 %% Thus very simply, we have that each subsequent view contains more
283 %% information than the preceding view.
285 %% However, to avoid the views growing indefinitely, we need to be
286 %% able to delete members which have died _and_ for which no messages
287 %% are in-flight. This requires that upon inheriting a dead member, we
288 %% know the last publication sent by the dead member (this is easy: we
289 %% inherit a member because we are the nearest downstream member which
290 %% implies that we know at least as much than everyone else about the
291 %% publications of the dead member), and we know the earliest message
292 %% for which the acknowledgement is still in flight.
294 %% In the chain A -> B -> C, when B dies, A will send to C its state
295 %% (as C is the new downstream from A), allowing C to calculate which
296 %% messages it has missed out on (described above). At this point, C
297 %% also inherits B's messages. If that state from A also includes the
298 %% last message published by B for which an acknowledgement has been
299 %% seen, then C knows exactly which further acknowledgements it must
300 %% receive (also including issuing acknowledgements for publications
301 %% still in-flight that it receives), after which it is known there
302 %% are no more messages in flight for B, thus all evidence that B was
303 %% ever part of the group can be safely removed from the canonical
306 %% Thus, for every message that a member sends, it includes with that
307 %% message its view version. When a member receives a message it will
308 %% update its view from the canonical copy, should its view be older
309 %% than the view version included in the message it has received.
311 %% The state held by each member therefore includes the messages from
312 %% each publisher pending acknowledgement, the last publication seen
313 %% from that publisher, and the last acknowledgement from that
314 %% publisher. In the case of the member's own publications or
315 %% inherited members, this last acknowledgement seen state indicates
316 %% the last acknowledgement retired, rather than sent.
322 %% We need to prove that with the provided operational semantics, we
323 %% can never reach a state that is not well formed from a well-formed
326 %% Operational semantics (small step): straight-forward message
327 %% sending, process monitoring, state updates.
329 %% Well formed state: dead members inherited by exactly one non-dead
330 %% member; for every entry in anyone's pending-acks, either (the
331 %% publication of the message is in-flight downstream from the member
332 %% and upstream from the publisher) or (the acknowledgement of the
333 %% message is in-flight downstream from the publisher and upstream
336 %% Proof by induction on the applicable operational semantics.
342 %% The ring configuration and double traversal of messages around the
343 %% ring is similar (though developed independently) to the LCR
344 %% protocol by [Levy 2008]. However, LCR differs in several
345 %% ways. Firstly, by using vector clocks, it enforces a total order of
346 %% message delivery, which is unnecessary for our purposes. More
347 %% significantly, it is built on top of a "group communication system"
348 %% which performs the group management functions, taking
349 %% responsibility away from the protocol as to how to cope with safely
350 %% adding and removing members. When membership changes do occur, the
351 %% protocol stipulates that every member must perform communication
352 %% with every other member of the group, to ensure all outstanding
353 %% deliveries complete, before the entire group transitions to the new
354 %% view. This, in total, requires two sets of all-to-all synchronous
357 %% This is not only rather inefficient, but also does not explain what
358 %% happens upon the failure of a member during this process. It does
359 %% though entirely avoid the need for inheritance of responsibility of
360 %% dead members that our protocol incorporates.
362 %% In [Marandi et al 2010], a Paxos-based protocol is described. This
363 %% work explicitly focuses on the efficiency of communication. LCR
364 %% (and our protocol too) are more efficient, but at the cost of
365 %% higher latency. The Ring-Paxos protocol is itself built on top of
366 %% IP-multicast, which rules it out for many applications where
367 %% point-to-point communication is all that can be required. They also
368 %% have an excellent related work section which I really ought to
372 %% [Levy 2008] The Complexity of Reliable Distributed Storage, 2008.
373 %% [Marandi et al 2010] Ring Paxos: A High-Throughput Atomic Broadcast
377 -behaviour(gen_server2).
379 -export([create_tables/0, start_link/4, leave/1, broadcast/2,
380 confirmed_broadcast/2, info/1, forget_group/1]).
382 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
383 code_change/3, prioritise_info/3]).
386 -export([behaviour_info/1]).
389 -export([table_definitions/0]).
391 -define(GROUP_TABLE, gm_group).
392 -define(HIBERNATE_AFTER_MIN, 1000).
393 -define(DESIRED_HIBERNATE, 10000).
394 -define(BROADCAST_TIMER, 25).
395 -define(VERSION_START, 0).
396 -define(SETS, ordsets).
397 -define(DICT, orddict).
415 -record(gm_group, { name, version, members }).
417 -record(view_member, { id, aliases, left, right }).
419 -record(member, { pending_ack, last_pub, last_ack }).
421 -define(TABLE, {?GROUP_TABLE, [{record_name, gm_group},
422 {attributes, record_info(fields, gm_group)}]}).
423 -define(TABLE_MATCH, {match, #gm_group { _ = '_' }}).
429 -export_type([group_name/0]).
431 -type(group_name() :: any()).
432 -type(txn_fun() :: fun((fun(() -> any())) -> any())).
434 -spec(create_tables/0 :: () -> 'ok' | {'aborted', any()}).
435 -spec(start_link/4 :: (group_name(), atom(), any(), txn_fun()) ->
436 rabbit_types:ok_pid_or_error()).
437 -spec(leave/1 :: (pid()) -> 'ok').
438 -spec(broadcast/2 :: (pid(), any()) -> 'ok').
439 -spec(confirmed_broadcast/2 :: (pid(), any()) -> 'ok').
440 -spec(info/1 :: (pid()) -> rabbit_types:infos()).
441 -spec(forget_group/1 :: (group_name()) -> 'ok').
443 %% The joined, members_changed and handle_msg callbacks can all return
444 %% any of the following terms:
446 %% 'ok' - the callback function returns normally
448 %% {'stop', Reason} - the callback indicates the member should stop
449 %% with reason Reason and should leave the group.
451 %% {'become', Module, Args} - the callback indicates that the callback
452 %% module should be changed to Module and that the callback functions
453 %% should now be passed the arguments Args. This allows the callback
454 %% module to be dynamically changed.
456 %% Called when we've successfully joined the group. Supplied with Args
457 %% provided in start_link, plus current group members.
458 -callback joined(Args :: term(), Members :: [pid()]) ->
459 ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}.
461 %% Supplied with Args provided in start_link, the list of new members
462 %% and the list of members previously known to us that have since
463 %% died. Note that if a member joins and dies very quickly, it's
464 %% possible that we will never see that member appear in either births
465 %% or deaths. However we are guaranteed that (1) we will see a member
466 %% joining either in the births here, or in the members passed to
467 %% joined/2 before receiving any messages from it; and (2) we will not
468 %% see members die that we have not seen born (or supplied in the
469 %% members to joined/2).
470 -callback members_changed(Args :: term(), Births :: [pid()],
471 Deaths :: [pid()]) ->
472 ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}.
474 %% Supplied with Args provided in start_link, the sender, and the
475 %% message. This does get called for messages injected by this member,
476 %% however, in such cases, there is no special significance of this
477 %% invocation: it does not indicate that the message has made it to
478 %% any other members, let alone all other members.
479 -callback handle_msg(Args :: term(), From :: pid(), Message :: term()) ->
480 ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}.
482 %% Called on gm member termination as per rules in gen_server, with
483 %% the Args provided in start_link plus the termination Reason.
484 -callback terminate(Args :: term(), Reason :: term()) ->
489 behaviour_info(callbacks) ->
490 [{joined, 2}, {members_changed, 3}, {handle_msg, 3}, {terminate, 2}];
491 behaviour_info(_Other) ->
497 create_tables([?TABLE]).
501 create_tables([{Table, Attributes} | Tables]) ->
502 case mnesia:create_table(Table, Attributes) of
503 {atomic, ok} -> create_tables(Tables);
504 {aborted, {already_exists, gm_group}} -> create_tables(Tables);
508 table_definitions() ->
509 {Name, Attributes} = ?TABLE,
510 [{Name, [?TABLE_MATCH | Attributes]}].
512 start_link(GroupName, Module, Args, TxnFun) ->
513 gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun], []).
516 gen_server2:cast(Server, leave).
518 broadcast(Server, Msg) ->
519 gen_server2:cast(Server, {broadcast, Msg}).
521 confirmed_broadcast(Server, Msg) ->
522 gen_server2:call(Server, {confirmed_broadcast, Msg}, infinity).
525 gen_server2:call(Server, info, infinity).
527 forget_group(GroupName) ->
528 {atomic, ok} = mnesia:sync_transaction(
530 mnesia:delete({?GROUP_TABLE, GroupName})
534 init([GroupName, Module, Args, TxnFun]) ->
535 {MegaSecs, Secs, MicroSecs} = now(),
536 random:seed(MegaSecs, Secs, MicroSecs),
537 Self = make_member(GroupName),
538 gen_server2:cast(self(), join),
539 {ok, #state { self = Self,
540 left = {Self, undefined},
541 right = {Self, undefined},
542 group_name = GroupName,
546 members_state = undefined,
547 callback_args = Args,
548 confirms = queue:new(),
549 broadcast_buffer = [],
550 broadcast_timer = undefined,
551 txn_executor = TxnFun }, hibernate,
552 {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
555 handle_call({confirmed_broadcast, _Msg}, _From,
556 State = #state { members_state = undefined }) ->
557 reply(not_joined, State);
559 handle_call({confirmed_broadcast, Msg}, _From,
560 State = #state { self = Self,
561 right = {Self, undefined},
563 callback_args = Args }) ->
564 handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg),
567 handle_call({confirmed_broadcast, Msg}, From, State) ->
568 internal_broadcast(Msg, From, State);
570 handle_call(info, _From,
571 State = #state { members_state = undefined }) ->
572 reply(not_joined, State);
574 handle_call(info, _From, State = #state { group_name = GroupName,
577 reply([{group_name, GroupName},
579 {group_members, get_pids(alive_view_members(View))}], State);
581 handle_call({add_on_right, _NewMember}, _From,
582 State = #state { members_state = undefined }) ->
583 reply(not_ready, State);
585 handle_call({add_on_right, NewMember}, _From,
586 State = #state { self = Self,
587 group_name = GroupName,
589 members_state = MembersState,
591 callback_args = Args,
592 txn_executor = TxnFun }) ->
593 {MembersState1, Group} =
594 record_new_member_in_group(
595 GroupName, Self, NewMember,
597 View1 = group_to_view(Group1),
598 MembersState1 = remove_erased_members(MembersState, View1),
599 ok = send_right(NewMember, View1,
601 prepare_members_state(MembersState1)}),
604 View2 = group_to_view(Group),
605 State1 = check_neighbours(State #state { view = View2,
606 members_state = MembersState1 }),
607 Result = callback_view_changed(Args, Module, View, View2),
608 handle_callback_result({Result, {ok, Group}, State1}).
611 handle_cast({?TAG, ReqVer, Msg},
612 State = #state { view = View,
613 members_state = MembersState,
614 group_name = GroupName,
616 callback_args = Args }) ->
618 case needs_view_update(ReqVer, View) of
619 true -> View1 = group_to_view(read_group(GroupName)),
620 MemberState1 = remove_erased_members(MembersState, View1),
621 {callback_view_changed(Args, Module, View, View1),
623 State #state { view = View1,
624 members_state = MemberState1 })};
627 handle_callback_result(
629 Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1));
631 handle_cast({broadcast, _Msg}, State = #state { members_state = undefined }) ->
634 handle_cast({broadcast, Msg},
635 State = #state { self = Self,
636 right = {Self, undefined},
638 callback_args = Args }) ->
639 handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg),
642 handle_cast({broadcast, Msg}, State) ->
643 internal_broadcast(Msg, none, State);
645 handle_cast(join, State = #state { self = Self,
646 group_name = GroupName,
647 members_state = undefined,
649 callback_args = Args,
650 txn_executor = TxnFun }) ->
651 View = join_group(Self, GroupName, TxnFun),
653 case alive_view_members(View) of
654 [Self] -> blank_member_state();
657 State1 = check_neighbours(State #state { view = View,
658 members_state = MembersState }),
659 handle_callback_result(
660 {Module:joined(Args, get_pids(all_known_members(View))), State1});
662 handle_cast(leave, State) ->
663 {stop, normal, State}.
666 handle_info(flush, State) ->
668 flush_broadcast_buffer(State #state { broadcast_timer = undefined }));
670 handle_info(timeout, State) ->
671 noreply(flush_broadcast_buffer(State));
673 handle_info({'DOWN', MRef, process, _Pid, Reason},
674 State = #state { self = Self,
677 group_name = GroupName,
680 callback_args = Args,
682 txn_executor = TxnFun }) ->
683 Member = case {Left, Right} of
684 {{Member1, MRef}, _} -> Member1;
685 {_, {Member1, MRef}} -> Member1;
688 case {Member, Reason} of
691 {_, {shutdown, ring_shutdown}} ->
695 group_to_view(record_dead_member_in_group(Member,
698 case alive_view_members(View1) of
700 {Result1, State1} = maybe_erase_aliases(State, View1),
701 {Result1, State1 #state {
702 members_state = blank_member_state(),
703 confirms = purge_confirms(Confirms) }};
705 %% here we won't be pointing out any deaths:
706 %% the concern is that there maybe births
707 %% which we'd otherwise miss.
708 {callback_view_changed(Args, Module, View, View1),
709 check_neighbours(State #state { view = View1 })}
711 handle_callback_result({Result, State2})
715 terminate(Reason, State = #state { module = Module,
716 callback_args = Args }) ->
717 flush_broadcast_buffer(State),
718 Module:terminate(Args, Reason).
721 code_change(_OldVsn, State, _Extra) ->
724 prioritise_info(flush, _Len, _State) ->
726 prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _Len,
727 #state { members_state = MS }) when MS /= undefined ->
729 prioritise_info(_, _Len, _State) ->
733 handle_msg(check_neighbours, State) ->
734 %% no-op - it's already been done by the calling handle_cast
737 handle_msg({catchup, Left, MembersStateLeft},
738 State = #state { self = Self,
739 left = {Left, _MRefL},
740 right = {Right, _MRefR},
742 members_state = undefined }) ->
743 ok = send_right(Right, View, {catchup, Self, MembersStateLeft}),
744 MembersStateLeft1 = build_members_state(MembersStateLeft),
745 {ok, State #state { members_state = MembersStateLeft1 }};
747 handle_msg({catchup, Left, MembersStateLeft},
748 State = #state { self = Self,
749 left = {Left, _MRefL},
751 members_state = MembersState })
752 when MembersState =/= undefined ->
753 MembersStateLeft1 = build_members_state(MembersStateLeft),
754 AllMembers = lists:usort(?DICT:fetch_keys(MembersState) ++
755 ?DICT:fetch_keys(MembersStateLeft1)),
756 {MembersState1, Activity} =
758 fun (Id, MembersStateActivity) ->
759 #member { pending_ack = PALeft, last_ack = LA } =
760 find_member_or_blank(Id, MembersStateLeft1),
762 fun (#member { pending_ack = PA } = Member, Activity1) ->
763 case is_member_alias(Id, Self, View) of
765 {_AcksInFlight, Pubs, _PA1} =
766 find_prefix_common_suffix(PALeft, PA),
767 {Member #member { last_ack = LA },
768 activity_cons(Id, pubs_from_queue(Pubs),
771 {Acks, _Common, Pubs} =
772 find_prefix_common_suffix(PA, PALeft),
774 activity_cons(Id, pubs_from_queue(Pubs),
775 acks_from_queue(Acks),
778 end, Id, MembersStateActivity)
779 end, {MembersState, activity_nil()}, AllMembers),
780 handle_msg({activity, Left, activity_finalise(Activity)},
781 State #state { members_state = MembersState1 });
783 handle_msg({catchup, _NotLeft, _MembersState}, State) ->
786 handle_msg({activity, Left, Activity},
787 State = #state { self = Self,
788 left = {Left, _MRefL},
790 members_state = MembersState,
791 confirms = Confirms })
792 when MembersState =/= undefined ->
793 {MembersState1, {Confirms1, Activity1}} =
795 fun ({Id, Pubs, Acks}, MembersStateConfirmsActivity) ->
797 fun (Member = #member { pending_ack = PA,
800 {Confirms2, Activity2}) ->
801 case is_member_alias(Id, Self, View) of
804 find_common(queue_from_pubs(Pubs), PA,
806 LA1 = last_ack(Acks, LA),
807 AckNums = acks_from_queue(ToAck),
808 Confirms3 = maybe_confirm(
809 Self, Id, Confirms2, AckNums),
810 {Member #member { pending_ack = PA1,
814 Id, [], AckNums, Activity2)}};
816 PA1 = apply_acks(Acks, join_pubs(PA, Pubs)),
817 LA1 = last_ack(Acks, LA),
818 LP1 = last_pub(Pubs, LP),
819 {Member #member { pending_ack = PA1,
823 activity_cons(Id, Pubs, Acks, Activity2)}}
825 end, Id, MembersStateConfirmsActivity)
826 end, {MembersState, {Confirms, activity_nil()}}, Activity),
827 State1 = State #state { members_state = MembersState1,
828 confirms = Confirms1 },
829 Activity3 = activity_finalise(Activity1),
830 ok = maybe_send_activity(Activity3, State1),
831 {Result, State2} = maybe_erase_aliases(State1, View),
833 Result, fun activity_true/3, fun activity_false/3, Activity3, State2);
835 handle_msg({activity, _NotLeft, _Activity}, State) ->
840 {noreply, ensure_broadcast_timer(State), flush_timeout(State)}.
842 reply(Reply, State) ->
843 {reply, Reply, ensure_broadcast_timer(State), flush_timeout(State)}.
845 flush_timeout(#state{broadcast_buffer = []}) -> hibernate;
846 flush_timeout(_) -> 0.
848 ensure_broadcast_timer(State = #state { broadcast_buffer = [],
849 broadcast_timer = undefined }) ->
851 ensure_broadcast_timer(State = #state { broadcast_buffer = [],
852 broadcast_timer = TRef }) ->
853 erlang:cancel_timer(TRef),
854 State #state { broadcast_timer = undefined };
855 ensure_broadcast_timer(State = #state { broadcast_timer = undefined }) ->
856 TRef = erlang:send_after(?BROADCAST_TIMER, self(), flush),
857 State #state { broadcast_timer = TRef };
858 ensure_broadcast_timer(State) ->
861 internal_broadcast(Msg, From, State = #state { self = Self,
862 pub_count = PubCount,
865 callback_args = Args,
866 broadcast_buffer = Buffer }) ->
867 PubCount1 = PubCount + 1,
868 Result = Module:handle_msg(Args, get_pid(Self), Msg),
869 Buffer1 = [{PubCount1, Msg} | Buffer],
870 Confirms1 = case From of
872 _ -> queue:in({PubCount1, From}, Confirms)
874 State1 = State #state { pub_count = PubCount1,
875 confirms = Confirms1,
876 broadcast_buffer = Buffer1 },
877 case From =/= none of
879 handle_callback_result({Result, flush_broadcast_buffer(State1)});
881 handle_callback_result(
882 {Result, State1 #state { broadcast_buffer = Buffer1 }})
885 flush_broadcast_buffer(State = #state { broadcast_buffer = [] }) ->
887 flush_broadcast_buffer(State = #state { self = Self,
888 members_state = MembersState,
889 broadcast_buffer = Buffer,
890 pub_count = PubCount }) ->
891 [{PubCount, _Msg}|_] = Buffer, %% ASSERTION match on PubCount
892 Pubs = lists:reverse(Buffer),
893 Activity = activity_cons(Self, Pubs, [], activity_nil()),
894 ok = maybe_send_activity(activity_finalise(Activity), State),
895 MembersState1 = with_member(
896 fun (Member = #member { pending_ack = PA }) ->
897 PA1 = queue:join(PA, queue:from_list(Pubs)),
898 Member #member { pending_ack = PA1,
899 last_pub = PubCount }
900 end, Self, MembersState),
901 State #state { members_state = MembersState1,
902 broadcast_buffer = [] }.
905 %% ---------------------------------------------------------------------------
906 %% View construction and inspection
907 %% ---------------------------------------------------------------------------
909 needs_view_update(ReqVer, {Ver, _View}) -> Ver < ReqVer.
911 view_version({Ver, _View}) -> Ver.
913 is_member_alive({dead, _Member}) -> false;
914 is_member_alive(_) -> true.
916 is_member_alias(Self, Self, _View) ->
918 is_member_alias(Member, Self, View) ->
919 ?SETS:is_element(Member,
920 ((fetch_view_member(Self, View)) #view_member.aliases)).
922 dead_member_id({dead, Member}) -> Member.
924 store_view_member(VMember = #view_member { id = Id }, {Ver, View}) ->
925 {Ver, ?DICT:store(Id, VMember, View)}.
927 with_view_member(Fun, View, Id) ->
928 store_view_member(Fun(fetch_view_member(Id, View)), View).
930 fetch_view_member(Id, {_Ver, View}) -> ?DICT:fetch(Id, View).
932 find_view_member(Id, {_Ver, View}) -> ?DICT:find(Id, View).
934 blank_view(Ver) -> {Ver, ?DICT:new()}.
936 alive_view_members({_Ver, View}) -> ?DICT:fetch_keys(View).
938 all_known_members({_Ver, View}) ->
940 fun (Member, #view_member { aliases = Aliases }, Acc) ->
941 ?SETS:to_list(Aliases) ++ [Member | Acc]
944 group_to_view(#gm_group { members = Members, version = Ver }) ->
945 Alive = lists:filter(fun is_member_alive/1, Members),
946 [_|_] = Alive, %% ASSERTION - can't have all dead members
947 add_aliases(link_view(Alive ++ Alive ++ Alive, blank_view(Ver)), Members).
949 link_view([Left, Middle, Right | Rest], View) ->
950 case find_view_member(Middle, View) of
953 [Middle, Right | Rest],
954 store_view_member(#view_member { id = Middle,
955 aliases = ?SETS:new(),
957 right = Right }, View));
961 link_view(_, View) ->
964 add_aliases(View, Members) ->
965 Members1 = ensure_alive_suffix(Members),
966 {EmptyDeadSet, View1} =
968 fun (Member, {DeadAcc, ViewAcc}) ->
969 case is_member_alive(Member) of
974 #view_member { aliases = Aliases }) ->
975 VMember #view_member {
976 aliases = ?SETS:union(Aliases, DeadAcc) }
977 end, ViewAcc, Member)};
979 {?SETS:add_element(dead_member_id(Member), DeadAcc),
982 end, {?SETS:new(), View}, Members1),
983 0 = ?SETS:size(EmptyDeadSet), %% ASSERTION
986 ensure_alive_suffix(Members) ->
987 queue:to_list(ensure_alive_suffix1(queue:from_list(Members))).
989 ensure_alive_suffix1(MembersQ) ->
990 {{value, Member}, MembersQ1} = queue:out_r(MembersQ),
991 case is_member_alive(Member) of
993 false -> ensure_alive_suffix1(queue:in_r(Member, MembersQ1))
997 %% ---------------------------------------------------------------------------
999 %% ---------------------------------------------------------------------------
1001 join_group(Self, GroupName, TxnFun) ->
1002 join_group(Self, GroupName, read_group(GroupName), TxnFun).
1004 join_group(Self, GroupName, {error, not_found}, TxnFun) ->
1005 join_group(Self, GroupName,
1006 prune_or_create_group(Self, GroupName, TxnFun), TxnFun);
1007 join_group(Self, _GroupName, #gm_group { members = [Self] } = Group, _TxnFun) ->
1008 group_to_view(Group);
1009 join_group(Self, GroupName, #gm_group { members = Members } = Group, TxnFun) ->
1010 case lists:member(Self, Members) of
1012 group_to_view(Group);
1014 case lists:filter(fun is_member_alive/1, Members) of
1016 join_group(Self, GroupName,
1017 prune_or_create_group(Self, GroupName, TxnFun));
1019 Left = lists:nth(random:uniform(length(Alive)), Alive),
1024 record_dead_member_in_group(
1025 Left, GroupName, TxnFun),
1029 case gen_server2:call(
1030 get_pid(Left), {add_on_right, Self}, infinity) of
1031 {ok, Group1} -> group_to_view(Group1);
1032 not_ready -> join_group(Self, GroupName, TxnFun)
1036 when R =:= noproc; R =:= normal; R =:= shutdown ->
1039 when R =:= nodedown; R =:= shutdown ->
1045 read_group(GroupName) ->
1046 case mnesia:dirty_read(?GROUP_TABLE, GroupName) of
1047 [] -> {error, not_found};
1051 prune_or_create_group(Self, GroupName, TxnFun) ->
1054 GroupNew = #gm_group { name = GroupName,
1056 version = ?VERSION_START },
1057 case mnesia:read({?GROUP_TABLE, GroupName}) of
1059 mnesia:write(GroupNew),
1061 [Group1 = #gm_group { members = Members }] ->
1062 case lists:any(fun is_member_alive/1, Members) of
1064 false -> mnesia:write(GroupNew),
1071 record_dead_member_in_group(Member, GroupName, TxnFun) ->
1074 fun () -> [Group1 = #gm_group { members = Members, version = Ver }] =
1075 mnesia:read({?GROUP_TABLE, GroupName}),
1076 case lists:splitwith(
1077 fun (Member1) -> Member1 =/= Member end, Members) of
1078 {_Members1, []} -> %% not found - already recorded dead
1080 {Members1, [Member | Members2]} ->
1081 Members3 = Members1 ++ [{dead, Member} | Members2],
1082 Group2 = Group1 #gm_group { members = Members3,
1083 version = Ver + 1 },
1084 mnesia:write(Group2),
1090 record_new_member_in_group(GroupName, Left, NewMember, Fun, TxnFun) ->
1094 [#gm_group { members = Members, version = Ver } = Group1] =
1095 mnesia:read({?GROUP_TABLE, GroupName}),
1096 {Prefix, [Left | Suffix]} =
1097 lists:splitwith(fun (M) -> M =/= Left end, Members),
1098 Members1 = Prefix ++ [Left, NewMember | Suffix],
1099 Group2 = Group1 #gm_group { members = Members1,
1100 version = Ver + 1 },
1101 Result = Fun(Group2),
1102 mnesia:write(Group2),
1107 erase_members_in_group(Members, GroupName, TxnFun) ->
1108 DeadMembers = [{dead, Id} || Id <- Members],
1112 [Group1 = #gm_group { members = [_|_] = Members1,
1114 mnesia:read({?GROUP_TABLE, GroupName}),
1115 case Members1 -- DeadMembers of
1117 Members2 -> Group2 =
1118 Group1 #gm_group { members = Members2,
1119 version = Ver + 1 },
1120 mnesia:write(Group2),
1126 maybe_erase_aliases(State = #state { self = Self,
1127 group_name = GroupName,
1129 members_state = MembersState,
1131 callback_args = Args,
1132 txn_executor = TxnFun }, View) ->
1133 #view_member { aliases = Aliases } = fetch_view_member(Self, View),
1134 {Erasable, MembersState1}
1136 fun (Id, {ErasableAcc, MembersStateAcc} = Acc) ->
1137 #member { last_pub = LP, last_ack = LA } =
1138 find_member_or_blank(Id, MembersState),
1139 case can_erase_view_member(Self, Id, LA, LP) of
1140 true -> {[Id | ErasableAcc],
1141 erase_member(Id, MembersStateAcc)};
1144 end, {[], MembersState}, Aliases),
1145 State1 = State #state { members_state = MembersState1 },
1147 [] -> {ok, State1 #state { view = View }};
1148 _ -> View1 = group_to_view(
1149 erase_members_in_group(Erasable, GroupName, TxnFun)),
1150 {callback_view_changed(Args, Module, View0, View1),
1151 check_neighbours(State1 #state { view = View1 })}
1154 can_erase_view_member(Self, Self, _LA, _LP) -> false;
1155 can_erase_view_member(_Self, _Id, N, N) -> true;
1156 can_erase_view_member(_Self, _Id, _LA, _LP) -> false.
1159 %% ---------------------------------------------------------------------------
1160 %% View monitoring and maintanence
1161 %% ---------------------------------------------------------------------------
1163 ensure_neighbour(_Ver, Self, {Self, undefined}, Self) ->
1165 ensure_neighbour(Ver, Self, {Self, undefined}, RealNeighbour) ->
1166 ok = gen_server2:cast(get_pid(RealNeighbour),
1167 {?TAG, Ver, check_neighbours}),
1168 {RealNeighbour, maybe_monitor(RealNeighbour, Self)};
1169 ensure_neighbour(_Ver, _Self, {RealNeighbour, MRef}, RealNeighbour) ->
1170 {RealNeighbour, MRef};
1171 ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) ->
1172 true = erlang:demonitor(MRef),
1173 Msg = {?TAG, Ver, check_neighbours},
1174 ok = gen_server2:cast(get_pid(RealNeighbour), Msg),
1175 ok = case Neighbour of
1177 _ -> gen_server2:cast(get_pid(Neighbour), Msg)
1179 {Neighbour, maybe_monitor(Neighbour, Self)}.
1181 maybe_monitor( Self, Self) -> undefined;
1182 maybe_monitor(Other, _Self) -> erlang:monitor(process, get_pid(Other)).
1184 check_neighbours(State = #state { self = Self,
1188 broadcast_buffer = Buffer }) ->
1189 #view_member { left = VLeft, right = VRight }
1190 = fetch_view_member(Self, View),
1191 Ver = view_version(View),
1192 Left1 = ensure_neighbour(Ver, Self, Left, VLeft),
1193 Right1 = ensure_neighbour(Ver, Self, Right, VRight),
1194 Buffer1 = case Right1 of
1195 {Self, undefined} -> [];
1198 State1 = State #state { left = Left1, right = Right1,
1199 broadcast_buffer = Buffer1 },
1200 ok = maybe_send_catchup(Right, State1),
1203 maybe_send_catchup(Right, #state { right = Right }) ->
1205 maybe_send_catchup(_Right, #state { self = Self,
1206 right = {Self, undefined} }) ->
1208 maybe_send_catchup(_Right, #state { members_state = undefined }) ->
1210 maybe_send_catchup(_Right, #state { self = Self,
1211 right = {Right, _MRef},
1213 members_state = MembersState }) ->
1214 send_right(Right, View,
1215 {catchup, Self, prepare_members_state(MembersState)}).
1218 %% ---------------------------------------------------------------------------
1219 %% Catch_up delta detection
1220 %% ---------------------------------------------------------------------------
1222 find_prefix_common_suffix(A, B) ->
1223 {Prefix, A1} = find_prefix(A, B, queue:new()),
1224 {Common, Suffix} = find_common(A1, B, queue:new()),
1225 {Prefix, Common, Suffix}.
1227 %% Returns the elements of A that occur before the first element of B,
1228 %% plus the remainder of A.
1229 find_prefix(A, B, Prefix) ->
1230 case {queue:out(A), queue:out(B)} of
1231 {{{value, Val}, _A1}, {{value, Val}, _B1}} ->
1233 {{empty, A1}, {{value, _A}, _B1}} ->
1235 {{{value, {NumA, _MsgA} = Val}, A1},
1236 {{value, {NumB, _MsgB}}, _B1}} when NumA < NumB ->
1237 find_prefix(A1, B, queue:in(Val, Prefix));
1238 {_, {empty, _B1}} ->
1239 {A, Prefix} %% Prefix well be empty here
1242 %% A should be a prefix of B. Returns the commonality plus the
1244 find_common(A, B, Common) ->
1245 case {queue:out(A), queue:out(B)} of
1246 {{{value, Val}, A1}, {{value, Val}, B1}} ->
1247 find_common(A1, B1, queue:in(Val, Common));
1253 %% ---------------------------------------------------------------------------
1255 %% ---------------------------------------------------------------------------
1257 with_member(Fun, Id, MembersState) ->
1259 Id, Fun(find_member_or_blank(Id, MembersState)), MembersState).
1261 with_member_acc(Fun, Id, {MembersState, Acc}) ->
1262 {MemberState, Acc1} = Fun(find_member_or_blank(Id, MembersState), Acc),
1263 {store_member(Id, MemberState, MembersState), Acc1}.
1265 find_member_or_blank(Id, MembersState) ->
1266 case ?DICT:find(Id, MembersState) of
1267 {ok, Result} -> Result;
1268 error -> blank_member()
1271 erase_member(Id, MembersState) -> ?DICT:erase(Id, MembersState).
1274 #member { pending_ack = queue:new(), last_pub = -1, last_ack = -1 }.
1276 blank_member_state() -> ?DICT:new().
1278 store_member(Id, MemberState, MembersState) ->
1279 ?DICT:store(Id, MemberState, MembersState).
1281 prepare_members_state(MembersState) -> ?DICT:to_list(MembersState).
1283 build_members_state(MembersStateList) -> ?DICT:from_list(MembersStateList).
1285 make_member(GroupName) ->
1286 {case read_group(GroupName) of
1287 #gm_group { version = Version } -> Version;
1288 {error, not_found} -> ?VERSION_START
1291 remove_erased_members(MembersState, View) ->
1292 lists:foldl(fun (Id, MembersState1) ->
1293 store_member(Id, find_member_or_blank(Id, MembersState),
1295 end, blank_member_state(), all_known_members(View)).
1297 get_pid({_Version, Pid}) -> Pid.
1299 get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids].
1301 %% ---------------------------------------------------------------------------
1302 %% Activity assembly
1303 %% ---------------------------------------------------------------------------
1305 activity_nil() -> queue:new().
1307 activity_cons( _Id, [], [], Tail) -> Tail;
1308 activity_cons(Sender, Pubs, Acks, Tail) -> queue:in({Sender, Pubs, Acks}, Tail).
1310 activity_finalise(Activity) -> queue:to_list(Activity).
1312 maybe_send_activity([], _State) ->
1314 maybe_send_activity(Activity, #state { self = Self,
1315 right = {Right, _MRefR},
1317 send_right(Right, View, {activity, Self, Activity}).
1319 send_right(Right, View, Msg) ->
1320 ok = gen_server2:cast(get_pid(Right), {?TAG, view_version(View), Msg}).
1322 callback(Args, Module, Activity) ->
1325 fun ({Id, Pubs, _Acks}, {Args1, Module1, ok}) ->
1326 lists:foldl(fun ({_PubNum, Pub}, Acc = {Args2, Module2, ok}) ->
1327 case Module2:handle_msg(
1328 Args2, get_pid(Id), Pub) of
1331 {become, Module3, Args3} ->
1332 {Args3, Module3, ok};
1333 {stop, _Reason} = Error ->
1336 (_, Error = {stop, _Reason}) ->
1338 end, {Args1, Module1, ok}, Pubs);
1339 (_, Error = {stop, _Reason}) ->
1341 end, {Args, Module, ok}, Activity),
1343 {Args, Module, ok} -> ok;
1344 {Args1, Module1, ok} -> {become, Module1, Args1};
1345 {stop, _Reason} = Error -> Error
1348 callback_view_changed(Args, Module, OldView, NewView) ->
1349 OldMembers = all_known_members(OldView),
1350 NewMembers = all_known_members(NewView),
1351 Births = NewMembers -- OldMembers,
1352 Deaths = OldMembers -- NewMembers,
1353 case {Births, Deaths} of
1355 _ -> Module:members_changed(Args, get_pids(Births),
1359 handle_callback_result({Result, State}) ->
1360 if_callback_success(
1361 Result, fun no_reply_true/3, fun no_reply_false/3, undefined, State);
1362 handle_callback_result({Result, Reply, State}) ->
1363 if_callback_success(
1364 Result, fun reply_true/3, fun reply_false/3, Reply, State).
1366 no_reply_true (_Result, _Undefined, State) -> noreply(State).
1367 no_reply_false({stop, Reason}, _Undefined, State) -> {stop, Reason, State}.
1369 reply_true (_Result, Reply, State) -> reply(Reply, State).
1370 reply_false({stop, Reason}, Reply, State) -> {stop, Reason, Reply, State}.
1372 handle_msg_true (_Result, Msg, State) -> handle_msg(Msg, State).
1373 handle_msg_false(Result, _Msg, State) -> {Result, State}.
1375 activity_true(_Result, Activity, State = #state { module = Module,
1376 callback_args = Args }) ->
1377 {callback(Args, Module, Activity), State}.
1378 activity_false(Result, _Activity, State) ->
1381 if_callback_success(ok, True, _False, Arg, State) ->
1382 True(ok, Arg, State);
1383 if_callback_success(
1384 {become, Module, Args} = Result, True, _False, Arg, State) ->
1385 True(Result, Arg, State #state { module = Module,
1386 callback_args = Args });
1387 if_callback_success({stop, _Reason} = Result, _True, False, Arg, State) ->
1388 False(Result, Arg, State).
1390 maybe_confirm(_Self, _Id, Confirms, []) ->
1392 maybe_confirm(Self, Self, Confirms, [PubNum | PubNums]) ->
1393 case queue:out(Confirms) of
1394 {empty, _Confirms} ->
1396 {{value, {PubNum, From}}, Confirms1} ->
1397 gen_server2:reply(From, ok),
1398 maybe_confirm(Self, Self, Confirms1, PubNums);
1399 {{value, {PubNum1, _From}}, _Confirms} when PubNum1 > PubNum ->
1400 maybe_confirm(Self, Self, Confirms, PubNums)
1402 maybe_confirm(_Self, _Id, Confirms, _PubNums) ->
1405 purge_confirms(Confirms) ->
1406 [gen_server2:reply(From, ok) || {_PubNum, From} <- queue:to_list(Confirms)],
1410 %% ---------------------------------------------------------------------------
1411 %% Msg transformation
1412 %% ---------------------------------------------------------------------------
1414 acks_from_queue(Q) -> [PubNum || {PubNum, _Msg} <- queue:to_list(Q)].
1416 pubs_from_queue(Q) -> queue:to_list(Q).
1418 queue_from_pubs(Pubs) -> queue:from_list(Pubs).
1420 apply_acks( [], Pubs) -> Pubs;
1421 apply_acks(List, Pubs) -> {_, Pubs1} = queue:split(length(List), Pubs),
1424 join_pubs(Q, []) -> Q;
1425 join_pubs(Q, Pubs) -> queue:join(Q, queue_from_pubs(Pubs)).
1427 last_ack( [], LA) -> LA;
1428 last_ack(List, LA) -> LA1 = lists:last(List),
1429 true = LA1 > LA, %% ASSERTION
1432 last_pub( [], LP) -> LP;
1433 last_pub(List, LP) -> {PubNum, _Msg} = lists:last(List),
1434 true = PubNum > LP, %% ASSERTION