1 %% The contents of this file are subject to the Mozilla Public License
2 %% Version 1.1 (the "License"); you may not use this file except in
3 %% compliance with the License. You may obtain a copy of the License at
4 %% http://www.mozilla.org/MPL/
6 %% Software distributed under the License is distributed on an "AS IS"
7 %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8 %% License for the specific language governing rights and limitations
11 %% The Original Code is RabbitMQ.
13 %% The Initial Developer of the Original Code is VMware, Inc.
14 %% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
19 %% Guaranteed Multicast
20 %% ====================
22 %% This module provides the ability to create named groups of
23 %% processes to which members can be dynamically added and removed,
24 %% and for messages to be broadcast within the group that are
25 %% guaranteed to reach all members of the group during the lifetime of
26 %% the message. The lifetime of a message is defined as being, at a
27 %% minimum, the time from which the message is first sent to any
28 %% member of the group, up until the time at which it is known by the
29 %% member who published the message that the message has reached all
32 %% The guarantee given is that provided a message, once sent, makes it
33 %% to members who do not all leave the group, the message will
34 %% continue to propagate to all group members.
36 %% Another way of stating the guarantee is that if member P publishes
37 %% messages m and m', then for all members P', if P' is a member of
38 %% the group prior to the publication of m, and P' receives m', then
41 %% Note that only local-ordering is enforced: i.e. if member P sends
42 %% message m and then message m', then for-all members P', if P'
43 %% receives m and m', then they will receive m' after m. Causality
44 %% ordering is _not_ enforced. I.e. if member P receives message m
45 %% and as a result publishes message m', there is no guarantee that
46 %% other members P' will receive m before m'.
52 %% Mnesia must be started. Use the idempotent create_tables/0 function
53 %% to create the tables required.
56 %% Provide the group name, the callback module name, and any arguments
57 %% you wish to be passed into the callback module's functions. The
58 %% joined/2 function will be called when we have joined the group,
59 %% with the arguments passed to start_link and a list of the current
60 %% members of the group. See the callbacks specs and the comments
61 %% below for further details of the callback functions.
64 %% Provide the Pid. Removes the Pid from the group. The callback
65 %% terminate/2 function will be called.
68 %% Provide the Pid and a Message. The message will be sent to all
69 %% members of the group as per the guarantees given above. This is a
70 %% cast and the function call will return immediately. There is no
71 %% guarantee that the message will reach any member of the group.
73 %% confirmed_broadcast/2
74 %% Provide the Pid and a Message. As per broadcast/2 except that this
75 %% is a call, not a cast, and only returns 'ok' once the Message has
76 %% reached every member of the group. Do not call
77 %% confirmed_broadcast/2 directly from the callback module otherwise
78 %% you will deadlock the entire group.
81 %% Provide the Pid. Returns a list of the current group members.
84 %% Implementation Overview
85 %% -----------------------
87 %% One possible means of implementation would be a fan-out from the
88 %% sender to every member of the group. This would require that the
89 %% group is fully connected, and, in the event that the original
90 %% sender of the message disappears from the group before the message
91 %% has made it to every member of the group, raises questions as to
92 %% who is responsible for sending on the message to new group members.
93 %% In particular, the issue is with [ Pid ! Msg || Pid <- Members ] -
94 %% if the sender dies part way through, who is responsible for
95 %% ensuring that the remaining Members receive the Msg? In the event
96 %% that within the group, messages sent are broadcast from a subset of
97 %% the members, the fan-out arrangement has the potential to
98 %% substantially impact the CPU and network workload of such members,
99 %% as such members would have to accommodate the cost of sending each
100 %% message to every group member.
102 %% Instead, if the members of the group are arranged in a chain, then
103 %% it becomes easier to reason about who within the group has received
104 %% each message and who has not. It eases issues of responsibility: in
105 %% the event of a group member disappearing, the nearest upstream
106 %% member of the chain is responsible for ensuring that messages
107 %% continue to propagate down the chain. It also results in equal
108 %% distribution of sending and receiving workload, even if all
109 %% messages are being sent from just a single group member. This
110 %% configuration has the further advantage that it is not necessary
111 %% for every group member to know of every other group member, and
112 %% even that a group member does not have to be accessible from all
113 %% other group members.
115 %% Performance is kept high by permitting pipelining and all
116 %% communication between joined group members is asynchronous. In the
117 %% chain A -> B -> C -> D, if A sends a message to the group, it will
118 %% not directly contact C or D. However, it must know that D receives
119 %% the message (in addition to B and C) before it can consider the
120 %% message fully sent. A simplistic implementation would require that
121 %% D replies to C, C replies to B and B then replies to A. This would
122 %% result in a propagation delay of twice the length of the chain. It
123 %% would also require, in the event of the failure of C, that D knows
124 %% to directly contact B and issue the necessary replies. Instead, the
125 %% chain forms a ring: D sends the message on to A: D does not
126 %% distinguish A as the sender, merely as the next member (downstream)
127 %% within the chain (which has now become a ring). When A receives
128 %% from D messages that A sent, it knows that all members have
129 %% received the message. However, the message is not dead yet: if C
130 %% died as B was sending to C, then B would need to detect the death
131 %% of C and forward the message on to D instead: thus every node has
132 %% to remember every message published until it is told that it can
133 %% forget about the message. This is essential not just for dealing
134 %% with failure of members, but also for the addition of new members.
136 %% Thus once A receives the message back again, it then sends to B an
137 %% acknowledgement for the message, indicating that B can now forget
138 %% about the message. B does so, and forwards the ack to C. C forgets
139 %% the message, and forwards the ack to D, which forgets the message
140 %% and finally forwards the ack back to A. At this point, A takes no
141 %% further action: the message and its acknowledgement have made it to
142 %% every member of the group. The message is now dead, and any new
143 %% member joining the group at this point will not receive the
146 %% We therefore have two roles:
148 %% 1. The sender, who upon receiving their own messages back, must
149 %% then send out acknowledgements, and upon receiving their own
150 %% acknowledgements back perform no further action.
152 %% 2. The other group members who upon receiving messages and
153 %% acknowledgements must update their own internal state accordingly
154 %% (the sending member must also do this in order to be able to
155 %% accommodate failures), and forwards messages on to their downstream
159 %% Implementation: It gets trickier
160 %% --------------------------------
162 %% Chain A -> B -> C -> D
164 %% A publishes a message which B receives. A now dies. B and D will
165 %% detect the death of A, and will link up, thus the chain is now B ->
166 %% C -> D. B forwards A's message on to C, who forwards it to D, who
167 %% forwards it to B. Thus B is now responsible for A's messages - both
168 %% publications and acknowledgements that were in flight at the point
169 %% at which A died. Even worse is that this is transitive: after B
170 %% forwards A's message to C, B dies as well. Now C is not only
171 %% responsible for B's in-flight messages, but is also responsible for
172 %% A's in-flight messages.
174 %% Lemma 1: A member can only determine which dead members they have
175 %% inherited responsibility for if there is a total ordering on the
176 %% conflicting additions and subtractions of members from the group.
178 %% Consider the simultaneous death of B and addition of B' that
179 %% transitions a chain from A -> B -> C to A -> B' -> C. Either B' or
180 %% C is responsible for in-flight messages from B. It is easy to
181 %% ensure that at least one of them thinks they have inherited B, but
182 %% if we do not ensure that exactly one of them inherits B, then we
183 %% could have B' converting publishes to acks, which then will crash C
184 %% as C does not believe it has issued acks for those messages.
186 %% More complex scenarios are easy to concoct: A -> B -> C -> D -> E
187 %% becoming A -> C' -> E. Who has inherited which of B, C and D?
189 %% However, for non-conflicting membership changes, only a partial
190 %% ordering is required. For example, A -> B -> C becoming A -> A' ->
191 %% B. The addition of A', between A and B can have no conflicts with
192 %% the death of C: it is clear that A has inherited C's messages.
194 %% For ease of implementation, we adopt the simple solution, of
195 %% imposing a total order on all membership changes.
197 %% On the death of a member, it is ensured the dead member's
198 %% neighbours become aware of the death, and the upstream neighbour
199 %% now sends to its new downstream neighbour its state, including the
200 %% messages pending acknowledgement. The downstream neighbour can then
201 %% use this to calculate which publishes and acknowledgements it has
202 %% missed out on, due to the death of its old upstream. Thus the
203 %% downstream can catch up, and continues the propagation of messages
204 %% through the group.
206 %% Lemma 2: When a member is joining, it must synchronously
207 %% communicate with its upstream member in order to receive its
208 %% starting state atomically with its addition to the group.
210 %% New members must start with the same state as their nearest
211 %% upstream neighbour. This ensures that it is not surprised by
212 %% acknowledgements they are sent, and that should their downstream
213 %% neighbour die, they are able to send the correct state to their new
214 %% downstream neighbour to ensure it can catch up. Thus in the
215 %% transition A -> B -> C becomes A -> A' -> B -> C becomes A -> A' ->
216 %% C, A' must start with the state of A, so that it can send C the
217 %% correct state when B dies, allowing C to detect any missed
220 %% If A' starts by adding itself to the group membership, A could then
221 %% die, without A' having received the necessary state from A. This
222 %% would leave A' responsible for in-flight messages from A, but
223 %% having the least knowledge of all, of those messages. Thus A' must
224 %% start by synchronously calling A, which then immediately sends A'
225 %% back its state. A then adds A' to the group. If A dies at this
226 %% point then A' will be able to see this (as A' will fail to appear
227 %% in the group membership), and thus A' will ignore the state it
228 %% receives from A, and will simply repeat the process, trying to now
229 %% join downstream from some other member. This ensures that should
230 %% the upstream die as soon as the new member has been joined, the new
231 %% member is guaranteed to receive the correct state, allowing it to
232 %% correctly process messages inherited due to the death of its
233 %% upstream neighbour.
235 %% The canonical definition of the group membership is held by a
236 %% distributed database. Whilst this allows the total ordering of
237 %% changes to be achieved, it is nevertheless undesirable to have to
238 %% query this database for the current view, upon receiving each
239 %% message. Instead, we wish for members to be able to cache a view of
240 %% the group membership, which then requires a cache invalidation
241 %% mechanism. Each member maintains its own view of the group
242 %% membership. Thus when the group's membership changes, members may
243 %% need to become aware of such changes in order to be able to
244 %% accurately process messages they receive. Because of the
245 %% requirement of a total ordering of conflicting membership changes,
246 %% it is not possible to use the guaranteed broadcast mechanism to
247 %% communicate these changes: to achieve the necessary ordering, it
248 %% would be necessary for such messages to be published by exactly one
249 %% member, which can not be guaranteed given that such a member could
252 %% The total ordering we enforce on membership changes gives rise to a
253 %% view version number: every change to the membership creates a
254 %% different view, and the total ordering permits a simple
255 %% monotonically increasing view version number.
257 %% Lemma 3: If a message is sent from a member that holds view version
258 %% N, it can be correctly processed by any member receiving the
259 %% message with a view version >= N.
261 %% Initially, let us suppose that each view contains the ordering of
262 %% every member that was ever part of the group. Dead members are
263 %% marked as such. Thus we have a ring of members, some of which are
264 %% dead, and are thus inherited by the nearest alive downstream
267 %% In the chain A -> B -> C, all three members initially have view
268 %% version 1, which reflects reality. B publishes a message, which is
269 %% forward by C to A. B now dies, which A notices very quickly. Thus A
270 %% updates the view, creating version 2. It now forwards B's
271 %% publication, sending that message to its new downstream neighbour,
272 %% C. This happens before C is aware of the death of B. C must become
273 %% aware of the view change before it interprets the message its
274 %% received, otherwise it will fail to learn of the death of B, and
275 %% thus will not realise it has inherited B's messages (and will
278 %% Thus very simply, we have that each subsequent view contains more
279 %% information than the preceding view.
281 %% However, to avoid the views growing indefinitely, we need to be
282 %% able to delete members which have died _and_ for which no messages
283 %% are in-flight. This requires that upon inheriting a dead member, we
284 %% know the last publication sent by the dead member (this is easy: we
285 %% inherit a member because we are the nearest downstream member which
286 %% implies that we know at least as much than everyone else about the
287 %% publications of the dead member), and we know the earliest message
288 %% for which the acknowledgement is still in flight.
290 %% In the chain A -> B -> C, when B dies, A will send to C its state
291 %% (as C is the new downstream from A), allowing C to calculate which
292 %% messages it has missed out on (described above). At this point, C
293 %% also inherits B's messages. If that state from A also includes the
294 %% last message published by B for which an acknowledgement has been
295 %% seen, then C knows exactly which further acknowledgements it must
296 %% receive (also including issuing acknowledgements for publications
297 %% still in-flight that it receives), after which it is known there
298 %% are no more messages in flight for B, thus all evidence that B was
299 %% ever part of the group can be safely removed from the canonical
302 %% Thus, for every message that a member sends, it includes with that
303 %% message its view version. When a member receives a message it will
304 %% update its view from the canonical copy, should its view be older
305 %% than the view version included in the message it has received.
307 %% The state held by each member therefore includes the messages from
308 %% each publisher pending acknowledgement, the last publication seen
309 %% from that publisher, and the last acknowledgement from that
310 %% publisher. In the case of the member's own publications or
311 %% inherited members, this last acknowledgement seen state indicates
312 %% the last acknowledgement retired, rather than sent.
318 %% We need to prove that with the provided operational semantics, we
319 %% can never reach a state that is not well formed from a well-formed
322 %% Operational semantics (small step): straight-forward message
323 %% sending, process monitoring, state updates.
325 %% Well formed state: dead members inherited by exactly one non-dead
326 %% member; for every entry in anyone's pending-acks, either (the
327 %% publication of the message is in-flight downstream from the member
328 %% and upstream from the publisher) or (the acknowledgement of the
329 %% message is in-flight downstream from the publisher and upstream
332 %% Proof by induction on the applicable operational semantics.
338 %% The ring configuration and double traversal of messages around the
339 %% ring is similar (though developed independently) to the LCR
340 %% protocol by [Levy 2008]. However, LCR differs in several
341 %% ways. Firstly, by using vector clocks, it enforces a total order of
342 %% message delivery, which is unnecessary for our purposes. More
343 %% significantly, it is built on top of a "group communication system"
344 %% which performs the group management functions, taking
345 %% responsibility away from the protocol as to how to cope with safely
346 %% adding and removing members. When membership changes do occur, the
347 %% protocol stipulates that every member must perform communication
348 %% with every other member of the group, to ensure all outstanding
349 %% deliveries complete, before the entire group transitions to the new
350 %% view. This, in total, requires two sets of all-to-all synchronous
353 %% This is not only rather inefficient, but also does not explain what
354 %% happens upon the failure of a member during this process. It does
355 %% though entirely avoid the need for inheritance of responsibility of
356 %% dead members that our protocol incorporates.
358 %% In [Marandi et al 2010], a Paxos-based protocol is described. This
359 %% work explicitly focuses on the efficiency of communication. LCR
360 %% (and our protocol too) are more efficient, but at the cost of
361 %% higher latency. The Ring-Paxos protocol is itself built on top of
362 %% IP-multicast, which rules it out for many applications where
363 %% point-to-point communication is all that can be required. They also
364 %% have an excellent related work section which I really ought to
368 %% [Levy 2008] The Complexity of Reliable Distributed Storage, 2008.
369 %% [Marandi et al 2010] Ring Paxos: A High-Throughput Atomic Broadcast
373 -behaviour(gen_server2).
375 -export([create_tables/0, start_link/3, leave/1, broadcast/2,
376 confirmed_broadcast/2, group_members/1]).
378 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
379 code_change/3, prioritise_info/2]).
382 -export([behaviour_info/1]).
385 -export([table_definitions/0]).
387 -define(GROUP_TABLE, gm_group).
388 -define(HIBERNATE_AFTER_MIN, 1000).
389 -define(DESIRED_HIBERNATE, 10000).
390 -define(BROADCAST_TIMER, 25).
391 -define(VERSION_START, 0).
392 -define(SETS, ordsets).
393 -define(DICT, orddict).
410 -record(gm_group, { name, version, members }).
412 -record(view_member, { id, aliases, left, right }).
414 -record(member, { pending_ack, last_pub, last_ack }).
416 -define(TABLE, {?GROUP_TABLE, [{record_name, gm_group},
417 {attributes, record_info(fields, gm_group)}]}).
418 -define(TABLE_MATCH, {match, #gm_group { _ = '_' }}).
424 -export_type([group_name/0]).
426 -type(group_name() :: any()).
428 -spec(create_tables/0 :: () -> 'ok' | {'aborted', any()}).
429 -spec(start_link/3 :: (group_name(), atom(), any()) ->
430 rabbit_types:ok_pid_or_error()).
431 -spec(leave/1 :: (pid()) -> 'ok').
432 -spec(broadcast/2 :: (pid(), any()) -> 'ok').
433 -spec(confirmed_broadcast/2 :: (pid(), any()) -> 'ok').
434 -spec(group_members/1 :: (pid()) -> [pid()]).
436 %% The joined, members_changed and handle_msg callbacks can all
437 %% return any of the following terms:
439 %% 'ok' - the callback function returns normally
441 %% {'stop', Reason} - the callback indicates the member should
442 %% stop with reason Reason and should leave the group.
444 %% {'become', Module, Args} - the callback indicates that the
445 %% callback module should be changed to Module and that the
446 %% callback functions should now be passed the arguments
447 %% Args. This allows the callback module to be dynamically
450 %% Called when we've successfully joined the group. Supplied with
451 %% Args provided in start_link, plus current group members.
452 -callback joined(Args :: term(), Members :: [pid()]) ->
453 ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}.
455 %% Supplied with Args provided in start_link, the list of new
456 %% members and the list of members previously known to us that
457 %% have since died. Note that if a member joins and dies very
458 %% quickly, it's possible that we will never see that member
459 %% appear in either births or deaths. However we are guaranteed
460 %% that (1) we will see a member joining either in the births
461 %% here, or in the members passed to joined/2 before receiving
462 %% any messages from it; and (2) we will not see members die that
463 %% we have not seen born (or supplied in the members to
465 -callback members_changed(Args :: term(), Births :: [pid()],
466 Deaths :: [pid()]) ->
467 ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}.
469 %% Supplied with Args provided in start_link, the sender, and the
470 %% message. This does get called for messages injected by this
471 %% member, however, in such cases, there is no special
472 %% significance of this invocation: it does not indicate that the
473 %% message has made it to any other members, let alone all other
475 -callback handle_msg(Args :: term(), From :: pid(), Message :: term()) ->
476 ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}.
478 %% Called on gm member termination as per rules in gen_server,
479 %% with the Args provided in start_link plus the termination
481 -callback terminate(Args :: term(), Reason :: term()) ->
486 behaviour_info(callbacks) ->
487 [{joined, 2}, {members_changed, 3}, {handle_msg, 3}, {terminate, 2}];
488 behaviour_info(_Other) ->
494 create_tables([?TABLE]).
498 create_tables([{Table, Attributes} | Tables]) ->
499 case mnesia:create_table(Table, Attributes) of
500 {atomic, ok} -> create_tables(Tables);
501 {aborted, {already_exists, gm_group}} -> create_tables(Tables);
505 table_definitions() ->
506 {Name, Attributes} = ?TABLE,
507 [{Name, [?TABLE_MATCH | Attributes]}].
509 start_link(GroupName, Module, Args) ->
510 gen_server2:start_link(?MODULE, [GroupName, Module, Args], []).
513 gen_server2:cast(Server, leave).
515 broadcast(Server, Msg) ->
516 gen_server2:cast(Server, {broadcast, Msg}).
518 confirmed_broadcast(Server, Msg) ->
519 gen_server2:call(Server, {confirmed_broadcast, Msg}, infinity).
521 group_members(Server) ->
522 gen_server2:call(Server, group_members, infinity).
525 init([GroupName, Module, Args]) ->
526 {MegaSecs, Secs, MicroSecs} = now(),
527 random:seed(MegaSecs, Secs, MicroSecs),
528 Self = make_member(GroupName),
529 gen_server2:cast(self(), join),
530 {ok, #state { self = Self,
531 left = {Self, undefined},
532 right = {Self, undefined},
533 group_name = GroupName,
537 members_state = undefined,
538 callback_args = Args,
539 confirms = queue:new(),
540 broadcast_buffer = [],
541 broadcast_timer = undefined }, hibernate,
542 {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
545 handle_call({confirmed_broadcast, _Msg}, _From,
546 State = #state { members_state = undefined }) ->
547 reply(not_joined, State);
549 handle_call({confirmed_broadcast, Msg}, _From,
550 State = #state { self = Self,
551 right = {Self, undefined},
553 callback_args = Args }) ->
554 handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg),
557 handle_call({confirmed_broadcast, Msg}, From, State) ->
558 internal_broadcast(Msg, From, State);
560 handle_call(group_members, _From,
561 State = #state { members_state = undefined }) ->
562 reply(not_joined, State);
564 handle_call(group_members, _From, State = #state { view = View }) ->
565 reply(alive_view_members(View), State);
567 handle_call({add_on_right, _NewMember}, _From,
568 State = #state { members_state = undefined }) ->
569 reply(not_ready, State);
571 handle_call({add_on_right, NewMember}, _From,
572 State = #state { self = Self,
573 group_name = GroupName,
575 members_state = MembersState,
577 callback_args = Args }) ->
578 Group = record_new_member_in_group(
579 GroupName, Self, NewMember,
581 View1 = group_to_view(Group1),
582 ok = send_right(NewMember, View1,
583 {catchup, Self, prepare_members_state(
586 View2 = group_to_view(Group),
587 State1 = check_neighbours(State #state { view = View2 }),
588 Result = callback_view_changed(Args, Module, View, View2),
589 handle_callback_result({Result, {ok, Group}, State1}).
592 handle_cast({?TAG, ReqVer, Msg},
593 State = #state { view = View,
594 group_name = GroupName,
596 callback_args = Args }) ->
598 case needs_view_update(ReqVer, View) of
600 View1 = group_to_view(read_group(GroupName)),
601 {callback_view_changed(Args, Module, View, View1),
602 check_neighbours(State #state { view = View1 })};
606 handle_callback_result(
608 Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1));
610 handle_cast({broadcast, _Msg}, State = #state { members_state = undefined }) ->
613 handle_cast({broadcast, Msg},
614 State = #state { self = Self,
615 right = {Self, undefined},
617 callback_args = Args }) ->
618 handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg),
621 handle_cast({broadcast, Msg}, State) ->
622 internal_broadcast(Msg, none, State);
624 handle_cast(join, State = #state { self = Self,
625 group_name = GroupName,
626 members_state = undefined,
628 callback_args = Args }) ->
629 View = join_group(Self, GroupName),
631 case alive_view_members(View) of
632 [Self] -> blank_member_state();
635 State1 = check_neighbours(State #state { view = View,
636 members_state = MembersState }),
637 handle_callback_result(
638 {Module:joined(Args, get_pids(all_known_members(View))), State1});
640 handle_cast(leave, State) ->
641 {stop, normal, State}.
644 handle_info(flush, State) ->
646 flush_broadcast_buffer(State #state { broadcast_timer = undefined }));
648 handle_info({'DOWN', MRef, process, _Pid, _Reason},
649 State = #state { self = Self,
652 group_name = GroupName,
655 callback_args = Args,
656 confirms = Confirms }) ->
657 Member = case {Left, Right} of
658 {{Member1, MRef}, _} -> Member1;
659 {_, {Member1, MRef}} -> Member1;
667 group_to_view(record_dead_member_in_group(Member, GroupName)),
668 State1 = State #state { view = View1 },
670 case alive_view_members(View1) of
674 members_state = blank_member_state(),
675 confirms = purge_confirms(Confirms) });
677 %% here we won't be pointing out any deaths:
678 %% the concern is that there maybe births
679 %% which we'd otherwise miss.
680 {callback_view_changed(Args, Module, View, View1),
683 handle_callback_result({Result, check_neighbours(State2)})
687 terminate(Reason, State = #state { module = Module,
688 callback_args = Args }) ->
689 flush_broadcast_buffer(State),
690 Module:terminate(Args, Reason).
693 code_change(_OldVsn, State, _Extra) ->
696 prioritise_info(flush, _State) -> 1;
697 prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _State) -> 1;
698 prioritise_info(_ , _State) -> 0.
701 handle_msg(check_neighbours, State) ->
702 %% no-op - it's already been done by the calling handle_cast
705 handle_msg({catchup, Left, MembersStateLeft},
706 State = #state { self = Self,
707 left = {Left, _MRefL},
708 right = {Right, _MRefR},
710 members_state = undefined }) ->
711 ok = send_right(Right, View, {catchup, Self, MembersStateLeft}),
712 MembersStateLeft1 = build_members_state(MembersStateLeft),
713 {ok, State #state { members_state = MembersStateLeft1 }};
715 handle_msg({catchup, Left, MembersStateLeft},
716 State = #state { self = Self,
717 left = {Left, _MRefL},
719 members_state = MembersState })
720 when MembersState =/= undefined ->
721 MembersStateLeft1 = build_members_state(MembersStateLeft),
722 AllMembers = lists:usort(?DICT:fetch_keys(MembersState) ++
723 ?DICT:fetch_keys(MembersStateLeft1)),
724 {MembersState1, Activity} =
726 fun (Id, MembersStateActivity) ->
727 #member { pending_ack = PALeft, last_ack = LA } =
728 find_member_or_blank(Id, MembersStateLeft1),
730 fun (#member { pending_ack = PA } = Member, Activity1) ->
731 case is_member_alias(Id, Self, View) of
733 {_AcksInFlight, Pubs, _PA1} =
734 find_prefix_common_suffix(PALeft, PA),
735 {Member #member { last_ack = LA },
736 activity_cons(Id, pubs_from_queue(Pubs),
739 {Acks, _Common, Pubs} =
740 find_prefix_common_suffix(PA, PALeft),
742 activity_cons(Id, pubs_from_queue(Pubs),
743 acks_from_queue(Acks),
746 end, Id, MembersStateActivity)
747 end, {MembersState, activity_nil()}, AllMembers),
748 handle_msg({activity, Left, activity_finalise(Activity)},
749 State #state { members_state = MembersState1 });
751 handle_msg({catchup, _NotLeft, _MembersState}, State) ->
754 handle_msg({activity, Left, Activity},
755 State = #state { self = Self,
756 left = {Left, _MRefL},
758 members_state = MembersState,
759 confirms = Confirms })
760 when MembersState =/= undefined ->
761 {MembersState1, {Confirms1, Activity1}} =
763 fun ({Id, Pubs, Acks}, MembersStateConfirmsActivity) ->
765 fun (Member = #member { pending_ack = PA,
768 {Confirms2, Activity2}) ->
769 case is_member_alias(Id, Self, View) of
772 find_common(queue_from_pubs(Pubs), PA,
774 LA1 = last_ack(Acks, LA),
775 AckNums = acks_from_queue(ToAck),
776 Confirms3 = maybe_confirm(
777 Self, Id, Confirms2, AckNums),
778 {Member #member { pending_ack = PA1,
782 Id, [], AckNums, Activity2)}};
784 PA1 = apply_acks(Acks, join_pubs(PA, Pubs)),
785 LA1 = last_ack(Acks, LA),
786 LP1 = last_pub(Pubs, LP),
787 {Member #member { pending_ack = PA1,
791 activity_cons(Id, Pubs, Acks, Activity2)}}
793 end, Id, MembersStateConfirmsActivity)
794 end, {MembersState, {Confirms, activity_nil()}}, Activity),
795 State1 = State #state { members_state = MembersState1,
796 confirms = Confirms1 },
797 Activity3 = activity_finalise(Activity1),
798 {Result, State2} = maybe_erase_aliases(State1),
799 ok = maybe_send_activity(Activity3, State2),
801 Result, fun activity_true/3, fun activity_false/3, Activity3, State2);
803 handle_msg({activity, _NotLeft, _Activity}, State) ->
808 {noreply, ensure_broadcast_timer(State), hibernate}.
810 reply(Reply, State) ->
811 {reply, Reply, ensure_broadcast_timer(State), hibernate}.
813 ensure_broadcast_timer(State = #state { broadcast_buffer = [],
814 broadcast_timer = undefined }) ->
816 ensure_broadcast_timer(State = #state { broadcast_buffer = [],
817 broadcast_timer = TRef }) ->
818 erlang:cancel_timer(TRef),
819 State #state { broadcast_timer = undefined };
820 ensure_broadcast_timer(State = #state { broadcast_timer = undefined }) ->
821 TRef = erlang:send_after(?BROADCAST_TIMER, self(), flush),
822 State #state { broadcast_timer = TRef };
823 ensure_broadcast_timer(State) ->
826 internal_broadcast(Msg, From, State = #state { self = Self,
827 pub_count = PubCount,
830 callback_args = Args,
831 broadcast_buffer = Buffer }) ->
832 Result = Module:handle_msg(Args, get_pid(Self), Msg),
833 Buffer1 = [{PubCount, Msg} | Buffer],
834 Confirms1 = case From of
836 _ -> queue:in({PubCount, From}, Confirms)
838 State1 = State #state { pub_count = PubCount + 1,
839 confirms = Confirms1,
840 broadcast_buffer = Buffer1 },
841 case From =/= none of
843 handle_callback_result({Result, flush_broadcast_buffer(State1)});
845 handle_callback_result(
846 {Result, State1 #state { broadcast_buffer = Buffer1 }})
849 flush_broadcast_buffer(State = #state { broadcast_buffer = [] }) ->
851 flush_broadcast_buffer(State = #state { self = Self,
852 members_state = MembersState,
853 broadcast_buffer = Buffer }) ->
854 Pubs = lists:reverse(Buffer),
855 Activity = activity_cons(Self, Pubs, [], activity_nil()),
856 ok = maybe_send_activity(activity_finalise(Activity), State),
857 MembersState1 = with_member(
858 fun (Member = #member { pending_ack = PA }) ->
859 PA1 = queue:join(PA, queue:from_list(Pubs)),
860 Member #member { pending_ack = PA1 }
861 end, Self, MembersState),
862 State #state { members_state = MembersState1,
863 broadcast_buffer = [] }.
866 %% ---------------------------------------------------------------------------
867 %% View construction and inspection
868 %% ---------------------------------------------------------------------------
870 needs_view_update(ReqVer, {Ver, _View}) ->
873 view_version({Ver, _View}) ->
876 is_member_alive({dead, _Member}) -> false;
877 is_member_alive(_) -> true.
879 is_member_alias(Self, Self, _View) ->
881 is_member_alias(Member, Self, View) ->
882 ?SETS:is_element(Member,
883 ((fetch_view_member(Self, View)) #view_member.aliases)).
885 dead_member_id({dead, Member}) -> Member.
887 store_view_member(VMember = #view_member { id = Id }, {Ver, View}) ->
888 {Ver, ?DICT:store(Id, VMember, View)}.
890 with_view_member(Fun, View, Id) ->
891 store_view_member(Fun(fetch_view_member(Id, View)), View).
893 fetch_view_member(Id, {_Ver, View}) ->
894 ?DICT:fetch(Id, View).
896 find_view_member(Id, {_Ver, View}) ->
897 ?DICT:find(Id, View).
902 alive_view_members({_Ver, View}) ->
903 ?DICT:fetch_keys(View).
905 all_known_members({_Ver, View}) ->
907 fun (Member, #view_member { aliases = Aliases }, Acc) ->
908 ?SETS:to_list(Aliases) ++ [Member | Acc]
911 group_to_view(#gm_group { members = Members, version = Ver }) ->
912 Alive = lists:filter(fun is_member_alive/1, Members),
913 [_|_] = Alive, %% ASSERTION - can't have all dead members
914 add_aliases(link_view(Alive ++ Alive ++ Alive, blank_view(Ver)), Members).
916 link_view([Left, Middle, Right | Rest], View) ->
917 case find_view_member(Middle, View) of
920 [Middle, Right | Rest],
921 store_view_member(#view_member { id = Middle,
922 aliases = ?SETS:new(),
924 right = Right }, View));
928 link_view(_, View) ->
931 add_aliases(View, Members) ->
932 Members1 = ensure_alive_suffix(Members),
933 {EmptyDeadSet, View1} =
935 fun (Member, {DeadAcc, ViewAcc}) ->
936 case is_member_alive(Member) of
941 #view_member { aliases = Aliases }) ->
942 VMember #view_member {
943 aliases = ?SETS:union(Aliases, DeadAcc) }
944 end, ViewAcc, Member)};
946 {?SETS:add_element(dead_member_id(Member), DeadAcc),
949 end, {?SETS:new(), View}, Members1),
950 0 = ?SETS:size(EmptyDeadSet), %% ASSERTION
953 ensure_alive_suffix(Members) ->
954 queue:to_list(ensure_alive_suffix1(queue:from_list(Members))).
956 ensure_alive_suffix1(MembersQ) ->
957 {{value, Member}, MembersQ1} = queue:out_r(MembersQ),
958 case is_member_alive(Member) of
960 false -> ensure_alive_suffix1(queue:in_r(Member, MembersQ1))
964 %% ---------------------------------------------------------------------------
966 %% ---------------------------------------------------------------------------
968 join_group(Self, GroupName) ->
969 join_group(Self, GroupName, read_group(GroupName)).
971 join_group(Self, GroupName, {error, not_found}) ->
972 join_group(Self, GroupName, prune_or_create_group(Self, GroupName));
973 join_group(Self, _GroupName, #gm_group { members = [Self] } = Group) ->
974 group_to_view(Group);
975 join_group(Self, GroupName, #gm_group { members = Members } = Group) ->
976 case lists:member(Self, Members) of
978 group_to_view(Group);
980 case lists:filter(fun is_member_alive/1, Members) of
982 join_group(Self, GroupName,
983 prune_or_create_group(Self, GroupName));
985 Left = lists:nth(random:uniform(length(Alive)), Alive),
990 record_dead_member_in_group(Left, GroupName))
993 case gen_server2:call(
994 get_pid(Left), {add_on_right, Self}, infinity) of
995 {ok, Group1} -> group_to_view(Group1);
996 not_ready -> join_group(Self, GroupName)
1000 when R =:= noproc; R =:= normal; R =:= shutdown ->
1003 when R =:= nodedown; R =:= shutdown ->
1009 read_group(GroupName) ->
1010 case mnesia:dirty_read(?GROUP_TABLE, GroupName) of
1011 [] -> {error, not_found};
1015 prune_or_create_group(Self, GroupName) ->
1017 mnesia:sync_transaction(
1018 fun () -> GroupNew = #gm_group { name = GroupName,
1020 version = ?VERSION_START },
1021 case mnesia:read({?GROUP_TABLE, GroupName}) of
1023 mnesia:write(GroupNew),
1025 [Group1 = #gm_group { members = Members }] ->
1026 case lists:any(fun is_member_alive/1, Members) of
1028 false -> mnesia:write(GroupNew),
1035 record_dead_member_in_group(Member, GroupName) ->
1037 mnesia:sync_transaction(
1038 fun () -> [Group1 = #gm_group { members = Members, version = Ver }] =
1039 mnesia:read({?GROUP_TABLE, GroupName}),
1040 case lists:splitwith(
1041 fun (Member1) -> Member1 =/= Member end, Members) of
1042 {_Members1, []} -> %% not found - already recorded dead
1044 {Members1, [Member | Members2]} ->
1045 Members3 = Members1 ++ [{dead, Member} | Members2],
1046 Group2 = Group1 #gm_group { members = Members3,
1047 version = Ver + 1 },
1048 mnesia:write(Group2),
1054 record_new_member_in_group(GroupName, Left, NewMember, Fun) ->
1056 mnesia:sync_transaction(
1058 [#gm_group { members = Members, version = Ver } = Group1] =
1059 mnesia:read({?GROUP_TABLE, GroupName}),
1060 {Prefix, [Left | Suffix]} =
1061 lists:splitwith(fun (M) -> M =/= Left end, Members),
1062 Members1 = Prefix ++ [Left, NewMember | Suffix],
1063 Group2 = Group1 #gm_group { members = Members1,
1064 version = Ver + 1 },
1066 mnesia:write(Group2),
1071 erase_members_in_group(Members, GroupName) ->
1072 DeadMembers = [{dead, Id} || Id <- Members],
1074 mnesia:sync_transaction(
1076 [Group1 = #gm_group { members = [_|_] = Members1,
1078 mnesia:read({?GROUP_TABLE, GroupName}),
1079 case Members1 -- DeadMembers of
1081 Members2 -> Group2 =
1082 Group1 #gm_group { members = Members2,
1083 version = Ver + 1 },
1084 mnesia:write(Group2),
1090 maybe_erase_aliases(State = #state { self = Self,
1091 group_name = GroupName,
1093 members_state = MembersState,
1095 callback_args = Args }) ->
1096 #view_member { aliases = Aliases } = fetch_view_member(Self, View),
1097 {Erasable, MembersState1}
1099 fun (Id, {ErasableAcc, MembersStateAcc} = Acc) ->
1100 #member { last_pub = LP, last_ack = LA } =
1101 find_member_or_blank(Id, MembersState),
1102 case can_erase_view_member(Self, Id, LA, LP) of
1103 true -> {[Id | ErasableAcc],
1104 erase_member(Id, MembersStateAcc)};
1107 end, {[], MembersState}, Aliases),
1108 State1 = State #state { members_state = MembersState1 },
1111 _ -> View1 = group_to_view(
1112 erase_members_in_group(Erasable, GroupName)),
1113 {callback_view_changed(Args, Module, View, View1),
1114 State1 #state { view = View1 }}
1117 can_erase_view_member(Self, Self, _LA, _LP) -> false;
1118 can_erase_view_member(_Self, _Id, N, N) -> true;
1119 can_erase_view_member(_Self, _Id, _LA, _LP) -> false.
1122 %% ---------------------------------------------------------------------------
1123 %% View monitoring and maintanence
1124 %% ---------------------------------------------------------------------------
1126 ensure_neighbour(_Ver, Self, {Self, undefined}, Self) ->
1128 ensure_neighbour(Ver, Self, {Self, undefined}, RealNeighbour) ->
1129 ok = gen_server2:cast(get_pid(RealNeighbour),
1130 {?TAG, Ver, check_neighbours}),
1131 {RealNeighbour, maybe_monitor(RealNeighbour, Self)};
1132 ensure_neighbour(_Ver, _Self, {RealNeighbour, MRef}, RealNeighbour) ->
1133 {RealNeighbour, MRef};
1134 ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) ->
1135 true = erlang:demonitor(MRef),
1136 Msg = {?TAG, Ver, check_neighbours},
1137 ok = gen_server2:cast(get_pid(RealNeighbour), Msg),
1138 ok = case Neighbour of
1140 _ -> gen_server2:cast(get_pid(Neighbour), Msg)
1142 {Neighbour, maybe_monitor(Neighbour, Self)}.
1144 maybe_monitor(Self, Self) ->
1146 maybe_monitor(Other, _Self) ->
1147 erlang:monitor(process, get_pid(Other)).
1149 check_neighbours(State = #state { self = Self,
1153 broadcast_buffer = Buffer }) ->
1154 #view_member { left = VLeft, right = VRight }
1155 = fetch_view_member(Self, View),
1156 Ver = view_version(View),
1157 Left1 = ensure_neighbour(Ver, Self, Left, VLeft),
1158 Right1 = ensure_neighbour(Ver, Self, Right, VRight),
1159 Buffer1 = case Right1 of
1160 {Self, undefined} -> [];
1163 State1 = State #state { left = Left1, right = Right1,
1164 broadcast_buffer = Buffer1 },
1165 ok = maybe_send_catchup(Right, State1),
1168 maybe_send_catchup(Right, #state { right = Right }) ->
1170 maybe_send_catchup(_Right, #state { self = Self,
1171 right = {Self, undefined} }) ->
1173 maybe_send_catchup(_Right, #state { members_state = undefined }) ->
1175 maybe_send_catchup(_Right, #state { self = Self,
1176 right = {Right, _MRef},
1178 members_state = MembersState }) ->
1179 send_right(Right, View,
1180 {catchup, Self, prepare_members_state(MembersState)}).
1183 %% ---------------------------------------------------------------------------
1184 %% Catch_up delta detection
1185 %% ---------------------------------------------------------------------------
1187 find_prefix_common_suffix(A, B) ->
1188 {Prefix, A1} = find_prefix(A, B, queue:new()),
1189 {Common, Suffix} = find_common(A1, B, queue:new()),
1190 {Prefix, Common, Suffix}.
1192 %% Returns the elements of A that occur before the first element of B,
1193 %% plus the remainder of A.
1194 find_prefix(A, B, Prefix) ->
1195 case {queue:out(A), queue:out(B)} of
1196 {{{value, Val}, _A1}, {{value, Val}, _B1}} ->
1198 {{empty, A1}, {{value, _A}, _B1}} ->
1200 {{{value, {NumA, _MsgA} = Val}, A1},
1201 {{value, {NumB, _MsgB}}, _B1}} when NumA < NumB ->
1202 find_prefix(A1, B, queue:in(Val, Prefix));
1203 {_, {empty, _B1}} ->
1204 {A, Prefix} %% Prefix well be empty here
1207 %% A should be a prefix of B. Returns the commonality plus the
1209 find_common(A, B, Common) ->
1210 case {queue:out(A), queue:out(B)} of
1211 {{{value, Val}, A1}, {{value, Val}, B1}} ->
1212 find_common(A1, B1, queue:in(Val, Common));
1218 %% ---------------------------------------------------------------------------
1220 %% ---------------------------------------------------------------------------
1222 with_member(Fun, Id, MembersState) ->
1224 Id, Fun(find_member_or_blank(Id, MembersState)), MembersState).
1226 with_member_acc(Fun, Id, {MembersState, Acc}) ->
1227 {MemberState, Acc1} = Fun(find_member_or_blank(Id, MembersState), Acc),
1228 {store_member(Id, MemberState, MembersState), Acc1}.
1230 find_member_or_blank(Id, MembersState) ->
1231 case ?DICT:find(Id, MembersState) of
1232 {ok, Result} -> Result;
1233 error -> blank_member()
1236 erase_member(Id, MembersState) ->
1237 ?DICT:erase(Id, MembersState).
1240 #member { pending_ack = queue:new(), last_pub = -1, last_ack = -1 }.
1242 blank_member_state() ->
1245 store_member(Id, MemberState, MembersState) ->
1246 ?DICT:store(Id, MemberState, MembersState).
1248 prepare_members_state(MembersState) ->
1249 ?DICT:to_list(MembersState).
1251 build_members_state(MembersStateList) ->
1252 ?DICT:from_list(MembersStateList).
1254 make_member(GroupName) ->
1255 {case read_group(GroupName) of
1256 #gm_group { version = Version } -> Version;
1257 {error, not_found} -> ?VERSION_START
1260 get_pid({_Version, Pid}) -> Pid.
1262 get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids].
1264 %% ---------------------------------------------------------------------------
1265 %% Activity assembly
1266 %% ---------------------------------------------------------------------------
1271 activity_cons(_Id, [], [], Tail) ->
1273 activity_cons(Sender, Pubs, Acks, Tail) ->
1274 queue:in({Sender, Pubs, Acks}, Tail).
1276 activity_finalise(Activity) ->
1277 queue:to_list(Activity).
1279 maybe_send_activity([], _State) ->
1281 maybe_send_activity(Activity, #state { self = Self,
1282 right = {Right, _MRefR},
1284 send_right(Right, View, {activity, Self, Activity}).
1286 send_right(Right, View, Msg) ->
1287 ok = gen_server2:cast(get_pid(Right), {?TAG, view_version(View), Msg}).
1289 callback(Args, Module, Activity) ->
1291 fun ({Id, Pubs, _Acks}, ok) ->
1292 lists:foldl(fun ({_PubNum, Pub}, ok) ->
1293 Module:handle_msg(Args, get_pid(Id), Pub);
1301 callback_view_changed(Args, Module, OldView, NewView) ->
1302 OldMembers = all_known_members(OldView),
1303 NewMembers = all_known_members(NewView),
1304 Births = NewMembers -- OldMembers,
1305 Deaths = OldMembers -- NewMembers,
1306 case {Births, Deaths} of
1308 _ -> Module:members_changed(Args, get_pids(Births),
1312 handle_callback_result({Result, State}) ->
1313 if_callback_success(
1314 Result, fun no_reply_true/3, fun no_reply_false/3, undefined, State);
1315 handle_callback_result({Result, Reply, State}) ->
1316 if_callback_success(
1317 Result, fun reply_true/3, fun reply_false/3, Reply, State).
1319 no_reply_true (_Result, _Undefined, State) -> noreply(State).
1320 no_reply_false({stop, Reason}, _Undefined, State) -> {stop, Reason, State}.
1322 reply_true (_Result, Reply, State) -> reply(Reply, State).
1323 reply_false({stop, Reason}, Reply, State) -> {stop, Reason, Reply, State}.
1325 handle_msg_true (_Result, Msg, State) -> handle_msg(Msg, State).
1326 handle_msg_false(Result, _Msg, State) -> {Result, State}.
1328 activity_true(_Result, Activity, State = #state { module = Module,
1329 callback_args = Args }) ->
1330 {callback(Args, Module, Activity), State}.
1331 activity_false(Result, _Activity, State) ->
1334 if_callback_success(ok, True, _False, Arg, State) ->
1335 True(ok, Arg, State);
1336 if_callback_success(
1337 {become, Module, Args} = Result, True, _False, Arg, State) ->
1338 True(Result, Arg, State #state { module = Module,
1339 callback_args = Args });
1340 if_callback_success({stop, _Reason} = Result, _True, False, Arg, State) ->
1341 False(Result, Arg, State).
1343 maybe_confirm(_Self, _Id, Confirms, []) ->
1345 maybe_confirm(Self, Self, Confirms, [PubNum | PubNums]) ->
1346 case queue:out(Confirms) of
1347 {empty, _Confirms} ->
1349 {{value, {PubNum, From}}, Confirms1} ->
1350 gen_server2:reply(From, ok),
1351 maybe_confirm(Self, Self, Confirms1, PubNums);
1352 {{value, {PubNum1, _From}}, _Confirms} when PubNum1 > PubNum ->
1353 maybe_confirm(Self, Self, Confirms, PubNums)
1355 maybe_confirm(_Self, _Id, Confirms, _PubNums) ->
1358 purge_confirms(Confirms) ->
1359 [gen_server2:reply(From, ok) || {_PubNum, From} <- queue:to_list(Confirms)],
1363 %% ---------------------------------------------------------------------------
1364 %% Msg transformation
1365 %% ---------------------------------------------------------------------------
1367 acks_from_queue(Q) ->
1368 [PubNum || {PubNum, _Msg} <- queue:to_list(Q)].
1370 pubs_from_queue(Q) ->
1373 queue_from_pubs(Pubs) ->
1374 queue:from_list(Pubs).
1376 apply_acks([], Pubs) ->
1378 apply_acks(List, Pubs) ->
1379 {_, Pubs1} = queue:split(length(List), Pubs),
1382 join_pubs(Q, []) -> Q;
1383 join_pubs(Q, Pubs) -> queue:join(Q, queue_from_pubs(Pubs)).
1387 last_ack(List, LA) ->
1388 LA1 = lists:last(List),
1389 true = LA1 > LA, %% ASSERTION
1394 last_pub(List, LP) ->
1395 {PubNum, _Msg} = lists:last(List),
1396 true = PubNum > LP, %% ASSERTION