src/gm.erl
author Matthias Radestock <matthias@rabbitmq.com>
Wed May 16 17:39:07 2012 +0100 (11 hours ago)
changeset 9570 2fb4318aa9ee
parent 8906 a27fb2043278
permissions -rw-r--r--
start supervisor children as, er, supervisors, not workers
     1 %% The contents of this file are subject to the Mozilla Public License
     2 %% Version 1.1 (the "License"); you may not use this file except in
     3 %% compliance with the License. You may obtain a copy of the License at
     4 %% http://www.mozilla.org/MPL/
     5 %%
     6 %% Software distributed under the License is distributed on an "AS IS"
     7 %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
     8 %% License for the specific language governing rights and limitations
     9 %% under the License.
    10 %%
    11 %% The Original Code is RabbitMQ.
    12 %%
    13 %% The Initial Developer of the Original Code is VMware, Inc.
    14 %% Copyright (c) 2007-2012 VMware, Inc.  All rights reserved.
    15 %%
    16 
    17 -module(gm).
    18 
    19 %% Guaranteed Multicast
    20 %% ====================
    21 %%
    22 %% This module provides the ability to create named groups of
    23 %% processes to which members can be dynamically added and removed,
    24 %% and for messages to be broadcast within the group that are
    25 %% guaranteed to reach all members of the group during the lifetime of
    26 %% the message. The lifetime of a message is defined as being, at a
    27 %% minimum, the time from which the message is first sent to any
    28 %% member of the group, up until the time at which it is known by the
    29 %% member who published the message that the message has reached all
    30 %% group members.
    31 %%
    32 %% The guarantee given is that provided a message, once sent, makes it
    33 %% to members who do not all leave the group, the message will
    34 %% continue to propagate to all group members.
    35 %%
    36 %% Another way of stating the guarantee is that if member P publishes
    37 %% messages m and m', then for all members P', if P' is a member of
    38 %% the group prior to the publication of m, and P' receives m', then
    39 %% P' will receive m.
    40 %%
    41 %% Note that only local-ordering is enforced: i.e. if member P sends
    42 %% message m and then message m', then for-all members P', if P'
    43 %% receives m and m', then they will receive m' after m. Causality
    44 %% ordering is _not_ enforced. I.e. if member P receives message m
    45 %% and as a result publishes message m', there is no guarantee that
    46 %% other members P' will receive m before m'.
    47 %%
    48 %%
    49 %% API Use
    50 %% -------
    51 %%
    52 %% Mnesia must be started. Use the idempotent create_tables/0 function
    53 %% to create the tables required.
    54 %%
    55 %% start_link/3
    56 %% Provide the group name, the callback module name, and any arguments
    57 %% you wish to be passed into the callback module's functions. The
    58 %% joined/2 function will be called when we have joined the group,
    59 %% with the arguments passed to start_link and a list of the current
    60 %% members of the group. See the callbacks specs and the comments
    61 %% below for further details of the callback functions.
    62 %%
    63 %% leave/1
    64 %% Provide the Pid. Removes the Pid from the group. The callback
    65 %% terminate/2 function will be called.
    66 %%
    67 %% broadcast/2
    68 %% Provide the Pid and a Message. The message will be sent to all
    69 %% members of the group as per the guarantees given above. This is a
    70 %% cast and the function call will return immediately. There is no
    71 %% guarantee that the message will reach any member of the group.
    72 %%
    73 %% confirmed_broadcast/2
    74 %% Provide the Pid and a Message. As per broadcast/2 except that this
    75 %% is a call, not a cast, and only returns 'ok' once the Message has
    76 %% reached every member of the group. Do not call
    77 %% confirmed_broadcast/2 directly from the callback module otherwise
    78 %% you will deadlock the entire group.
    79 %%
    80 %% group_members/1
    81 %% Provide the Pid. Returns a list of the current group members.
    82 %%
    83 %%
    84 %% Implementation Overview
    85 %% -----------------------
    86 %%
    87 %% One possible means of implementation would be a fan-out from the
    88 %% sender to every member of the group. This would require that the
    89 %% group is fully connected, and, in the event that the original
    90 %% sender of the message disappears from the group before the message
    91 %% has made it to every member of the group, raises questions as to
    92 %% who is responsible for sending on the message to new group members.
    93 %% In particular, the issue is with [ Pid ! Msg || Pid <- Members ] -
    94 %% if the sender dies part way through, who is responsible for
    95 %% ensuring that the remaining Members receive the Msg? In the event
    96 %% that within the group, messages sent are broadcast from a subset of
    97 %% the members, the fan-out arrangement has the potential to
    98 %% substantially impact the CPU and network workload of such members,
    99 %% as such members would have to accommodate the cost of sending each
   100 %% message to every group member.
   101 %%
   102 %% Instead, if the members of the group are arranged in a chain, then
   103 %% it becomes easier to reason about who within the group has received
   104 %% each message and who has not. It eases issues of responsibility: in
   105 %% the event of a group member disappearing, the nearest upstream
   106 %% member of the chain is responsible for ensuring that messages
   107 %% continue to propagate down the chain. It also results in equal
   108 %% distribution of sending and receiving workload, even if all
   109 %% messages are being sent from just a single group member. This
   110 %% configuration has the further advantage that it is not necessary
   111 %% for every group member to know of every other group member, and
   112 %% even that a group member does not have to be accessible from all
   113 %% other group members.
   114 %%
   115 %% Performance is kept high by permitting pipelining and all
   116 %% communication between joined group members is asynchronous. In the
   117 %% chain A -> B -> C -> D, if A sends a message to the group, it will
   118 %% not directly contact C or D. However, it must know that D receives
   119 %% the message (in addition to B and C) before it can consider the
   120 %% message fully sent. A simplistic implementation would require that
   121 %% D replies to C, C replies to B and B then replies to A. This would
   122 %% result in a propagation delay of twice the length of the chain. It
   123 %% would also require, in the event of the failure of C, that D knows
   124 %% to directly contact B and issue the necessary replies. Instead, the
   125 %% chain forms a ring: D sends the message on to A: D does not
   126 %% distinguish A as the sender, merely as the next member (downstream)
   127 %% within the chain (which has now become a ring). When A receives
   128 %% from D messages that A sent, it knows that all members have
   129 %% received the message. However, the message is not dead yet: if C
   130 %% died as B was sending to C, then B would need to detect the death
   131 %% of C and forward the message on to D instead: thus every node has
   132 %% to remember every message published until it is told that it can
   133 %% forget about the message. This is essential not just for dealing
   134 %% with failure of members, but also for the addition of new members.
   135 %%
   136 %% Thus once A receives the message back again, it then sends to B an
   137 %% acknowledgement for the message, indicating that B can now forget
   138 %% about the message. B does so, and forwards the ack to C. C forgets
   139 %% the message, and forwards the ack to D, which forgets the message
   140 %% and finally forwards the ack back to A. At this point, A takes no
   141 %% further action: the message and its acknowledgement have made it to
   142 %% every member of the group. The message is now dead, and any new
   143 %% member joining the group at this point will not receive the
   144 %% message.
   145 %%
   146 %% We therefore have two roles:
   147 %%
   148 %% 1. The sender, who upon receiving their own messages back, must
   149 %% then send out acknowledgements, and upon receiving their own
   150 %% acknowledgements back perform no further action.
   151 %%
   152 %% 2. The other group members who upon receiving messages and
   153 %% acknowledgements must update their own internal state accordingly
   154 %% (the sending member must also do this in order to be able to
   155 %% accommodate failures), and forwards messages on to their downstream
   156 %% neighbours.
   157 %%
   158 %%
   159 %% Implementation: It gets trickier
   160 %% --------------------------------
   161 %%
   162 %% Chain A -> B -> C -> D
   163 %%
   164 %% A publishes a message which B receives. A now dies. B and D will
   165 %% detect the death of A, and will link up, thus the chain is now B ->
   166 %% C -> D. B forwards A's message on to C, who forwards it to D, who
   167 %% forwards it to B. Thus B is now responsible for A's messages - both
   168 %% publications and acknowledgements that were in flight at the point
   169 %% at which A died. Even worse is that this is transitive: after B
   170 %% forwards A's message to C, B dies as well. Now C is not only
   171 %% responsible for B's in-flight messages, but is also responsible for
   172 %% A's in-flight messages.
   173 %%
   174 %% Lemma 1: A member can only determine which dead members they have
   175 %% inherited responsibility for if there is a total ordering on the
   176 %% conflicting additions and subtractions of members from the group.
   177 %%
   178 %% Consider the simultaneous death of B and addition of B' that
   179 %% transitions a chain from A -> B -> C to A -> B' -> C. Either B' or
   180 %% C is responsible for in-flight messages from B. It is easy to
   181 %% ensure that at least one of them thinks they have inherited B, but
   182 %% if we do not ensure that exactly one of them inherits B, then we
   183 %% could have B' converting publishes to acks, which then will crash C
   184 %% as C does not believe it has issued acks for those messages.
   185 %%
   186 %% More complex scenarios are easy to concoct: A -> B -> C -> D -> E
   187 %% becoming A -> C' -> E. Who has inherited which of B, C and D?
   188 %%
   189 %% However, for non-conflicting membership changes, only a partial
   190 %% ordering is required. For example, A -> B -> C becoming A -> A' ->
   191 %% B. The addition of A', between A and B can have no conflicts with
   192 %% the death of C: it is clear that A has inherited C's messages.
   193 %%
   194 %% For ease of implementation, we adopt the simple solution, of
   195 %% imposing a total order on all membership changes.
   196 %%
   197 %% On the death of a member, it is ensured the dead member's
   198 %% neighbours become aware of the death, and the upstream neighbour
   199 %% now sends to its new downstream neighbour its state, including the
   200 %% messages pending acknowledgement. The downstream neighbour can then
   201 %% use this to calculate which publishes and acknowledgements it has
   202 %% missed out on, due to the death of its old upstream. Thus the
   203 %% downstream can catch up, and continues the propagation of messages
   204 %% through the group.
   205 %%
   206 %% Lemma 2: When a member is joining, it must synchronously
   207 %% communicate with its upstream member in order to receive its
   208 %% starting state atomically with its addition to the group.
   209 %%
   210 %% New members must start with the same state as their nearest
   211 %% upstream neighbour. This ensures that it is not surprised by
   212 %% acknowledgements they are sent, and that should their downstream
   213 %% neighbour die, they are able to send the correct state to their new
   214 %% downstream neighbour to ensure it can catch up. Thus in the
   215 %% transition A -> B -> C becomes A -> A' -> B -> C becomes A -> A' ->
   216 %% C, A' must start with the state of A, so that it can send C the
   217 %% correct state when B dies, allowing C to detect any missed
   218 %% messages.
   219 %%
   220 %% If A' starts by adding itself to the group membership, A could then
   221 %% die, without A' having received the necessary state from A. This
   222 %% would leave A' responsible for in-flight messages from A, but
   223 %% having the least knowledge of all, of those messages. Thus A' must
   224 %% start by synchronously calling A, which then immediately sends A'
   225 %% back its state. A then adds A' to the group. If A dies at this
   226 %% point then A' will be able to see this (as A' will fail to appear
   227 %% in the group membership), and thus A' will ignore the state it
   228 %% receives from A, and will simply repeat the process, trying to now
   229 %% join downstream from some other member. This ensures that should
   230 %% the upstream die as soon as the new member has been joined, the new
   231 %% member is guaranteed to receive the correct state, allowing it to
   232 %% correctly process messages inherited due to the death of its
   233 %% upstream neighbour.
   234 %%
   235 %% The canonical definition of the group membership is held by a
   236 %% distributed database. Whilst this allows the total ordering of
   237 %% changes to be achieved, it is nevertheless undesirable to have to
   238 %% query this database for the current view, upon receiving each
   239 %% message. Instead, we wish for members to be able to cache a view of
   240 %% the group membership, which then requires a cache invalidation
   241 %% mechanism. Each member maintains its own view of the group
   242 %% membership. Thus when the group's membership changes, members may
   243 %% need to become aware of such changes in order to be able to
   244 %% accurately process messages they receive. Because of the
   245 %% requirement of a total ordering of conflicting membership changes,
   246 %% it is not possible to use the guaranteed broadcast mechanism to
   247 %% communicate these changes: to achieve the necessary ordering, it
   248 %% would be necessary for such messages to be published by exactly one
   249 %% member, which can not be guaranteed given that such a member could
   250 %% die.
   251 %%
   252 %% The total ordering we enforce on membership changes gives rise to a
   253 %% view version number: every change to the membership creates a
   254 %% different view, and the total ordering permits a simple
   255 %% monotonically increasing view version number.
   256 %%
   257 %% Lemma 3: If a message is sent from a member that holds view version
   258 %% N, it can be correctly processed by any member receiving the
   259 %% message with a view version >= N.
   260 %%
   261 %% Initially, let us suppose that each view contains the ordering of
   262 %% every member that was ever part of the group. Dead members are
   263 %% marked as such. Thus we have a ring of members, some of which are
   264 %% dead, and are thus inherited by the nearest alive downstream
   265 %% member.
   266 %%
   267 %% In the chain A -> B -> C, all three members initially have view
   268 %% version 1, which reflects reality. B publishes a message, which is
   269 %% forward by C to A. B now dies, which A notices very quickly. Thus A
   270 %% updates the view, creating version 2. It now forwards B's
   271 %% publication, sending that message to its new downstream neighbour,
   272 %% C. This happens before C is aware of the death of B. C must become
   273 %% aware of the view change before it interprets the message its
   274 %% received, otherwise it will fail to learn of the death of B, and
   275 %% thus will not realise it has inherited B's messages (and will
   276 %% likely crash).
   277 %%
   278 %% Thus very simply, we have that each subsequent view contains more
   279 %% information than the preceding view.
   280 %%
   281 %% However, to avoid the views growing indefinitely, we need to be
   282 %% able to delete members which have died _and_ for which no messages
   283 %% are in-flight. This requires that upon inheriting a dead member, we
   284 %% know the last publication sent by the dead member (this is easy: we
   285 %% inherit a member because we are the nearest downstream member which
   286 %% implies that we know at least as much than everyone else about the
   287 %% publications of the dead member), and we know the earliest message
   288 %% for which the acknowledgement is still in flight.
   289 %%
   290 %% In the chain A -> B -> C, when B dies, A will send to C its state
   291 %% (as C is the new downstream from A), allowing C to calculate which
   292 %% messages it has missed out on (described above). At this point, C
   293 %% also inherits B's messages. If that state from A also includes the
   294 %% last message published by B for which an acknowledgement has been
   295 %% seen, then C knows exactly which further acknowledgements it must
   296 %% receive (also including issuing acknowledgements for publications
   297 %% still in-flight that it receives), after which it is known there
   298 %% are no more messages in flight for B, thus all evidence that B was
   299 %% ever part of the group can be safely removed from the canonical
   300 %% group membership.
   301 %%
   302 %% Thus, for every message that a member sends, it includes with that
   303 %% message its view version. When a member receives a message it will
   304 %% update its view from the canonical copy, should its view be older
   305 %% than the view version included in the message it has received.
   306 %%
   307 %% The state held by each member therefore includes the messages from
   308 %% each publisher pending acknowledgement, the last publication seen
   309 %% from that publisher, and the last acknowledgement from that
   310 %% publisher. In the case of the member's own publications or
   311 %% inherited members, this last acknowledgement seen state indicates
   312 %% the last acknowledgement retired, rather than sent.
   313 %%
   314 %%
   315 %% Proof sketch
   316 %% ------------
   317 %%
   318 %% We need to prove that with the provided operational semantics, we
   319 %% can never reach a state that is not well formed from a well-formed
   320 %% starting state.
   321 %%
   322 %% Operational semantics (small step): straight-forward message
   323 %% sending, process monitoring, state updates.
   324 %%
   325 %% Well formed state: dead members inherited by exactly one non-dead
   326 %% member; for every entry in anyone's pending-acks, either (the
   327 %% publication of the message is in-flight downstream from the member
   328 %% and upstream from the publisher) or (the acknowledgement of the
   329 %% message is in-flight downstream from the publisher and upstream
   330 %% from the member).
   331 %%
   332 %% Proof by induction on the applicable operational semantics.
   333 %%
   334 %%
   335 %% Related work
   336 %% ------------
   337 %%
   338 %% The ring configuration and double traversal of messages around the
   339 %% ring is similar (though developed independently) to the LCR
   340 %% protocol by [Levy 2008]. However, LCR differs in several
   341 %% ways. Firstly, by using vector clocks, it enforces a total order of
   342 %% message delivery, which is unnecessary for our purposes. More
   343 %% significantly, it is built on top of a "group communication system"
   344 %% which performs the group management functions, taking
   345 %% responsibility away from the protocol as to how to cope with safely
   346 %% adding and removing members. When membership changes do occur, the
   347 %% protocol stipulates that every member must perform communication
   348 %% with every other member of the group, to ensure all outstanding
   349 %% deliveries complete, before the entire group transitions to the new
   350 %% view. This, in total, requires two sets of all-to-all synchronous
   351 %% communications.
   352 %%
   353 %% This is not only rather inefficient, but also does not explain what
   354 %% happens upon the failure of a member during this process. It does
   355 %% though entirely avoid the need for inheritance of responsibility of
   356 %% dead members that our protocol incorporates.
   357 %%
   358 %% In [Marandi et al 2010], a Paxos-based protocol is described. This
   359 %% work explicitly focuses on the efficiency of communication. LCR
   360 %% (and our protocol too) are more efficient, but at the cost of
   361 %% higher latency. The Ring-Paxos protocol is itself built on top of
   362 %% IP-multicast, which rules it out for many applications where
   363 %% point-to-point communication is all that can be required. They also
   364 %% have an excellent related work section which I really ought to
   365 %% read...
   366 %%
   367 %%
   368 %% [Levy 2008] The Complexity of Reliable Distributed Storage, 2008.
   369 %% [Marandi et al 2010] Ring Paxos: A High-Throughput Atomic Broadcast
   370 %% Protocol
   371 
   372 
   373 -behaviour(gen_server2).
   374 
   375 -export([create_tables/0, start_link/3, leave/1, broadcast/2,
   376          confirmed_broadcast/2, group_members/1]).
   377 
   378 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
   379          code_change/3, prioritise_info/2]).
   380 
   381 -ifndef(use_specs).
   382 -export([behaviour_info/1]).
   383 -endif.
   384 
   385 -export([table_definitions/0]).
   386 
   387 -define(GROUP_TABLE, gm_group).
   388 -define(HIBERNATE_AFTER_MIN, 1000).
   389 -define(DESIRED_HIBERNATE, 10000).
   390 -define(BROADCAST_TIMER, 25).
   391 -define(VERSION_START, 0).
   392 -define(SETS, ordsets).
   393 -define(DICT, orddict).
   394 
   395 -record(state,
   396         { self,
   397           left,
   398           right,
   399           group_name,
   400           module,
   401           view,
   402           pub_count,
   403           members_state,
   404           callback_args,
   405           confirms,
   406           broadcast_buffer,
   407           broadcast_timer
   408         }).
   409 
   410 -record(gm_group, { name, version, members }).
   411 
   412 -record(view_member, { id, aliases, left, right }).
   413 
   414 -record(member, { pending_ack, last_pub, last_ack }).
   415 
   416 -define(TABLE, {?GROUP_TABLE, [{record_name, gm_group},
   417                                {attributes, record_info(fields, gm_group)}]}).
   418 -define(TABLE_MATCH, {match, #gm_group { _ = '_' }}).
   419 
   420 -define(TAG, '$gm').
   421 
   422 -ifdef(use_specs).
   423 
   424 -export_type([group_name/0]).
   425 
   426 -type(group_name() :: any()).
   427 
   428 -spec(create_tables/0 :: () -> 'ok' | {'aborted', any()}).
   429 -spec(start_link/3 :: (group_name(), atom(), any()) ->
   430                            rabbit_types:ok_pid_or_error()).
   431 -spec(leave/1 :: (pid()) -> 'ok').
   432 -spec(broadcast/2 :: (pid(), any()) -> 'ok').
   433 -spec(confirmed_broadcast/2 :: (pid(), any()) -> 'ok').
   434 -spec(group_members/1 :: (pid()) -> [pid()]).
   435 
   436 %% The joined, members_changed and handle_msg callbacks can all
   437 %% return any of the following terms:
   438 %%
   439 %% 'ok' - the callback function returns normally
   440 %%
   441 %% {'stop', Reason} - the callback indicates the member should
   442 %% stop with reason Reason and should leave the group.
   443 %%
   444 %% {'become', Module, Args} - the callback indicates that the
   445 %% callback module should be changed to Module and that the
   446 %% callback functions should now be passed the arguments
   447 %% Args. This allows the callback module to be dynamically
   448 %% changed.
   449 
   450 %% Called when we've successfully joined the group. Supplied with
   451 %% Args provided in start_link, plus current group members.
   452 -callback joined(Args :: term(), Members :: [pid()]) ->
   453     ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}.
   454 
   455 %% Supplied with Args provided in start_link, the list of new
   456 %% members and the list of members previously known to us that
   457 %% have since died. Note that if a member joins and dies very
   458 %% quickly, it's possible that we will never see that member
   459 %% appear in either births or deaths. However we are guaranteed
   460 %% that (1) we will see a member joining either in the births
   461 %% here, or in the members passed to joined/2 before receiving
   462 %% any messages from it; and (2) we will not see members die that
   463 %% we have not seen born (or supplied in the members to
   464 %% joined/2).
   465 -callback members_changed(Args :: term(), Births :: [pid()],
   466                           Deaths :: [pid()]) ->
   467     ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}.
   468 
   469 %% Supplied with Args provided in start_link, the sender, and the
   470 %% message. This does get called for messages injected by this
   471 %% member, however, in such cases, there is no special
   472 %% significance of this invocation: it does not indicate that the
   473 %% message has made it to any other members, let alone all other
   474 %% members.
   475 -callback handle_msg(Args :: term(), From :: pid(), Message :: term()) ->
   476     ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}.
   477 
   478 %% Called on gm member termination as per rules in gen_server,
   479 %% with the Args provided in start_link plus the termination
   480 %% Reason.
   481 -callback terminate(Args :: term(), Reason :: term()) ->
   482     ok | term().
   483 
   484 -else.
   485 
   486 behaviour_info(callbacks) ->
   487     [{joined, 2}, {members_changed, 3}, {handle_msg, 3}, {terminate, 2}];
   488 behaviour_info(_Other) ->
   489     undefined.
   490 
   491 -endif.
   492 
   493 create_tables() ->
   494     create_tables([?TABLE]).
   495 
   496 create_tables([]) ->
   497     ok;
   498 create_tables([{Table, Attributes} | Tables]) ->
   499     case mnesia:create_table(Table, Attributes) of
   500         {atomic, ok}                          -> create_tables(Tables);
   501         {aborted, {already_exists, gm_group}} -> create_tables(Tables);
   502         Err                                   -> Err
   503     end.
   504 
   505 table_definitions() ->
   506     {Name, Attributes} = ?TABLE,
   507     [{Name, [?TABLE_MATCH | Attributes]}].
   508 
   509 start_link(GroupName, Module, Args) ->
   510     gen_server2:start_link(?MODULE, [GroupName, Module, Args], []).
   511 
   512 leave(Server) ->
   513     gen_server2:cast(Server, leave).
   514 
   515 broadcast(Server, Msg) ->
   516     gen_server2:cast(Server, {broadcast, Msg}).
   517 
   518 confirmed_broadcast(Server, Msg) ->
   519     gen_server2:call(Server, {confirmed_broadcast, Msg}, infinity).
   520 
   521 group_members(Server) ->
   522     gen_server2:call(Server, group_members, infinity).
   523 
   524 
   525 init([GroupName, Module, Args]) ->
   526     {MegaSecs, Secs, MicroSecs} = now(),
   527     random:seed(MegaSecs, Secs, MicroSecs),
   528     Self = make_member(GroupName),
   529     gen_server2:cast(self(), join),
   530     {ok, #state { self             = Self,
   531                   left             = {Self, undefined},
   532                   right            = {Self, undefined},
   533                   group_name       = GroupName,
   534                   module           = Module,
   535                   view             = undefined,
   536                   pub_count        = 0,
   537                   members_state    = undefined,
   538                   callback_args    = Args,
   539                   confirms         = queue:new(),
   540                   broadcast_buffer = [],
   541                   broadcast_timer  = undefined }, hibernate,
   542      {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
   543 
   544 
   545 handle_call({confirmed_broadcast, _Msg}, _From,
   546             State = #state { members_state = undefined }) ->
   547     reply(not_joined, State);
   548 
   549 handle_call({confirmed_broadcast, Msg}, _From,
   550             State = #state { self          = Self,
   551                              right         = {Self, undefined},
   552                              module        = Module,
   553                              callback_args = Args }) ->
   554     handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg),
   555                            ok, State});
   556 
   557 handle_call({confirmed_broadcast, Msg}, From, State) ->
   558     internal_broadcast(Msg, From, State);
   559 
   560 handle_call(group_members, _From,
   561             State = #state { members_state = undefined }) ->
   562     reply(not_joined, State);
   563 
   564 handle_call(group_members, _From, State = #state { view = View }) ->
   565     reply(alive_view_members(View), State);
   566 
   567 handle_call({add_on_right, _NewMember}, _From,
   568             State = #state { members_state = undefined }) ->
   569     reply(not_ready, State);
   570 
   571 handle_call({add_on_right, NewMember}, _From,
   572             State = #state { self          = Self,
   573                              group_name    = GroupName,
   574                              view          = View,
   575                              members_state = MembersState,
   576                              module        = Module,
   577                              callback_args = Args }) ->
   578     Group = record_new_member_in_group(
   579               GroupName, Self, NewMember,
   580               fun (Group1) ->
   581                       View1 = group_to_view(Group1),
   582                       ok = send_right(NewMember, View1,
   583                                       {catchup, Self, prepare_members_state(
   584                                                         MembersState)})
   585               end),
   586     View2 = group_to_view(Group),
   587     State1 = check_neighbours(State #state { view = View2 }),
   588     Result = callback_view_changed(Args, Module, View, View2),
   589     handle_callback_result({Result, {ok, Group}, State1}).
   590 
   591 
   592 handle_cast({?TAG, ReqVer, Msg},
   593             State = #state { view          = View,
   594                              group_name    = GroupName,
   595                              module        = Module,
   596                              callback_args = Args }) ->
   597     {Result, State1} =
   598         case needs_view_update(ReqVer, View) of
   599             true ->
   600                 View1 = group_to_view(read_group(GroupName)),
   601                 {callback_view_changed(Args, Module, View, View1),
   602                  check_neighbours(State #state { view = View1 })};
   603             false ->
   604                 {ok, State}
   605         end,
   606     handle_callback_result(
   607       if_callback_success(
   608         Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1));
   609 
   610 handle_cast({broadcast, _Msg}, State = #state { members_state = undefined }) ->
   611     noreply(State);
   612 
   613 handle_cast({broadcast, Msg},
   614             State = #state { self          = Self,
   615                              right         = {Self, undefined},
   616                              module        = Module,
   617                              callback_args = Args }) ->
   618     handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg),
   619                             State});
   620 
   621 handle_cast({broadcast, Msg}, State) ->
   622     internal_broadcast(Msg, none, State);
   623 
   624 handle_cast(join, State = #state { self          = Self,
   625                                    group_name    = GroupName,
   626                                    members_state = undefined,
   627                                    module        = Module,
   628                                    callback_args = Args }) ->
   629     View = join_group(Self, GroupName),
   630     MembersState =
   631         case alive_view_members(View) of
   632             [Self] -> blank_member_state();
   633             _      -> undefined
   634         end,
   635     State1 = check_neighbours(State #state { view          = View,
   636                                              members_state = MembersState }),
   637     handle_callback_result(
   638       {Module:joined(Args, get_pids(all_known_members(View))), State1});
   639 
   640 handle_cast(leave, State) ->
   641     {stop, normal, State}.
   642 
   643 
   644 handle_info(flush, State) ->
   645     noreply(
   646       flush_broadcast_buffer(State #state { broadcast_timer = undefined }));
   647 
   648 handle_info({'DOWN', MRef, process, _Pid, _Reason},
   649             State = #state { self          = Self,
   650                              left          = Left,
   651                              right         = Right,
   652                              group_name    = GroupName,
   653                              view          = View,
   654                              module        = Module,
   655                              callback_args = Args,
   656                              confirms      = Confirms }) ->
   657     Member = case {Left, Right} of
   658                  {{Member1, MRef}, _} -> Member1;
   659                  {_, {Member1, MRef}} -> Member1;
   660                  _                    -> undefined
   661              end,
   662     case Member of
   663         undefined ->
   664             noreply(State);
   665         _ ->
   666             View1 =
   667                 group_to_view(record_dead_member_in_group(Member, GroupName)),
   668             State1 = State #state { view = View1 },
   669             {Result, State2} =
   670                 case alive_view_members(View1) of
   671                     [Self] ->
   672                         maybe_erase_aliases(
   673                           State1 #state {
   674                             members_state = blank_member_state(),
   675                             confirms      = purge_confirms(Confirms) });
   676                     _ ->
   677                         %% here we won't be pointing out any deaths:
   678                         %% the concern is that there maybe births
   679                         %% which we'd otherwise miss.
   680                         {callback_view_changed(Args, Module, View, View1),
   681                          State1}
   682                 end,
   683             handle_callback_result({Result, check_neighbours(State2)})
   684     end.
   685 
   686 
   687 terminate(Reason, State = #state { module        = Module,
   688                                    callback_args = Args }) ->
   689     flush_broadcast_buffer(State),
   690     Module:terminate(Args, Reason).
   691 
   692 
   693 code_change(_OldVsn, State, _Extra) ->
   694     {ok, State}.
   695 
   696 prioritise_info(flush,                                   _State) -> 1;
   697 prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _State) -> 1;
   698 prioritise_info(_                                      , _State) -> 0.
   699 
   700 
   701 handle_msg(check_neighbours, State) ->
   702     %% no-op - it's already been done by the calling handle_cast
   703     {ok, State};
   704 
   705 handle_msg({catchup, Left, MembersStateLeft},
   706            State = #state { self          = Self,
   707                             left          = {Left, _MRefL},
   708                             right         = {Right, _MRefR},
   709                             view          = View,
   710                             members_state = undefined }) ->
   711     ok = send_right(Right, View, {catchup, Self, MembersStateLeft}),
   712     MembersStateLeft1 = build_members_state(MembersStateLeft),
   713     {ok, State #state { members_state = MembersStateLeft1 }};
   714 
   715 handle_msg({catchup, Left, MembersStateLeft},
   716            State = #state { self = Self,
   717                             left = {Left, _MRefL},
   718                             view = View,
   719                             members_state = MembersState })
   720   when MembersState =/= undefined ->
   721     MembersStateLeft1 = build_members_state(MembersStateLeft),
   722     AllMembers = lists:usort(?DICT:fetch_keys(MembersState) ++
   723                                  ?DICT:fetch_keys(MembersStateLeft1)),
   724     {MembersState1, Activity} =
   725         lists:foldl(
   726           fun (Id, MembersStateActivity) ->
   727                   #member { pending_ack = PALeft, last_ack = LA } =
   728                       find_member_or_blank(Id, MembersStateLeft1),
   729                   with_member_acc(
   730                     fun (#member { pending_ack = PA } = Member, Activity1) ->
   731                             case is_member_alias(Id, Self, View) of
   732                                 true ->
   733                                     {_AcksInFlight, Pubs, _PA1} =
   734                                         find_prefix_common_suffix(PALeft, PA),
   735                                     {Member #member { last_ack = LA },
   736                                      activity_cons(Id, pubs_from_queue(Pubs),
   737                                                    [], Activity1)};
   738                                 false ->
   739                                     {Acks, _Common, Pubs} =
   740                                         find_prefix_common_suffix(PA, PALeft),
   741                                     {Member,
   742                                      activity_cons(Id, pubs_from_queue(Pubs),
   743                                                    acks_from_queue(Acks),
   744                                                    Activity1)}
   745                             end
   746                     end, Id, MembersStateActivity)
   747           end, {MembersState, activity_nil()}, AllMembers),
   748     handle_msg({activity, Left, activity_finalise(Activity)},
   749                State #state { members_state = MembersState1 });
   750 
   751 handle_msg({catchup, _NotLeft, _MembersState}, State) ->
   752     {ok, State};
   753 
   754 handle_msg({activity, Left, Activity},
   755            State = #state { self          = Self,
   756                             left          = {Left, _MRefL},
   757                             view          = View,
   758                             members_state = MembersState,
   759                             confirms      = Confirms })
   760   when MembersState =/= undefined ->
   761     {MembersState1, {Confirms1, Activity1}} =
   762         lists:foldl(
   763           fun ({Id, Pubs, Acks}, MembersStateConfirmsActivity) ->
   764                   with_member_acc(
   765                     fun (Member = #member { pending_ack = PA,
   766                                             last_pub    = LP,
   767                                             last_ack    = LA },
   768                          {Confirms2, Activity2}) ->
   769                             case is_member_alias(Id, Self, View) of
   770                                 true ->
   771                                     {ToAck, PA1} =
   772                                         find_common(queue_from_pubs(Pubs), PA,
   773                                                     queue:new()),
   774                                     LA1 = last_ack(Acks, LA),
   775                                     AckNums = acks_from_queue(ToAck),
   776                                     Confirms3 = maybe_confirm(
   777                                                   Self, Id, Confirms2, AckNums),
   778                                     {Member #member { pending_ack = PA1,
   779                                                       last_ack    = LA1 },
   780                                      {Confirms3,
   781                                       activity_cons(
   782                                         Id, [], AckNums, Activity2)}};
   783                                 false ->
   784                                     PA1 = apply_acks(Acks, join_pubs(PA, Pubs)),
   785                                     LA1 = last_ack(Acks, LA),
   786                                     LP1 = last_pub(Pubs, LP),
   787                                     {Member #member { pending_ack = PA1,
   788                                                       last_pub    = LP1,
   789                                                       last_ack    = LA1 },
   790                                      {Confirms2,
   791                                       activity_cons(Id, Pubs, Acks, Activity2)}}
   792                             end
   793                     end, Id, MembersStateConfirmsActivity)
   794           end, {MembersState, {Confirms, activity_nil()}}, Activity),
   795     State1 = State #state { members_state = MembersState1,
   796                             confirms      = Confirms1 },
   797     Activity3 = activity_finalise(Activity1),
   798     {Result, State2} = maybe_erase_aliases(State1),
   799     ok = maybe_send_activity(Activity3, State2),
   800     if_callback_success(
   801       Result, fun activity_true/3, fun activity_false/3, Activity3, State2);
   802 
   803 handle_msg({activity, _NotLeft, _Activity}, State) ->
   804     {ok, State}.
   805 
   806 
   807 noreply(State) ->
   808     {noreply, ensure_broadcast_timer(State), hibernate}.
   809 
   810 reply(Reply, State) ->
   811     {reply, Reply, ensure_broadcast_timer(State), hibernate}.
   812 
   813 ensure_broadcast_timer(State = #state { broadcast_buffer = [],
   814                                         broadcast_timer  = undefined }) ->
   815     State;
   816 ensure_broadcast_timer(State = #state { broadcast_buffer = [],
   817                                         broadcast_timer  = TRef }) ->
   818     erlang:cancel_timer(TRef),
   819     State #state { broadcast_timer = undefined };
   820 ensure_broadcast_timer(State = #state { broadcast_timer = undefined }) ->
   821     TRef = erlang:send_after(?BROADCAST_TIMER, self(), flush),
   822     State #state { broadcast_timer = TRef };
   823 ensure_broadcast_timer(State) ->
   824     State.
   825 
   826 internal_broadcast(Msg, From, State = #state { self             = Self,
   827                                                pub_count        = PubCount,
   828                                                module           = Module,
   829                                                confirms         = Confirms,
   830                                                callback_args    = Args,
   831                                                broadcast_buffer = Buffer }) ->
   832     Result = Module:handle_msg(Args, get_pid(Self), Msg),
   833     Buffer1 = [{PubCount, Msg} | Buffer],
   834     Confirms1 = case From of
   835                     none -> Confirms;
   836                     _    -> queue:in({PubCount, From}, Confirms)
   837                 end,
   838     State1 = State #state { pub_count        = PubCount + 1,
   839                             confirms         = Confirms1,
   840                             broadcast_buffer = Buffer1 },
   841     case From =/= none of
   842         true ->
   843             handle_callback_result({Result, flush_broadcast_buffer(State1)});
   844         false ->
   845             handle_callback_result(
   846               {Result, State1 #state { broadcast_buffer = Buffer1 }})
   847     end.
   848 
   849 flush_broadcast_buffer(State = #state { broadcast_buffer = [] }) ->
   850     State;
   851 flush_broadcast_buffer(State = #state { self             = Self,
   852                                         members_state    = MembersState,
   853                                         broadcast_buffer = Buffer }) ->
   854     Pubs = lists:reverse(Buffer),
   855     Activity = activity_cons(Self, Pubs, [], activity_nil()),
   856     ok = maybe_send_activity(activity_finalise(Activity), State),
   857     MembersState1 = with_member(
   858                       fun (Member = #member { pending_ack = PA }) ->
   859                               PA1 = queue:join(PA, queue:from_list(Pubs)),
   860                               Member #member { pending_ack = PA1 }
   861                       end, Self, MembersState),
   862     State #state { members_state    = MembersState1,
   863                    broadcast_buffer = [] }.
   864 
   865 
   866 %% ---------------------------------------------------------------------------
   867 %% View construction and inspection
   868 %% ---------------------------------------------------------------------------
   869 
   870 needs_view_update(ReqVer, {Ver, _View}) ->
   871     Ver < ReqVer.
   872 
   873 view_version({Ver, _View}) ->
   874     Ver.
   875 
   876 is_member_alive({dead, _Member}) -> false;
   877 is_member_alive(_)               -> true.
   878 
   879 is_member_alias(Self, Self, _View) ->
   880     true;
   881 is_member_alias(Member, Self, View) ->
   882     ?SETS:is_element(Member,
   883                      ((fetch_view_member(Self, View)) #view_member.aliases)).
   884 
   885 dead_member_id({dead, Member}) -> Member.
   886 
   887 store_view_member(VMember = #view_member { id = Id }, {Ver, View}) ->
   888     {Ver, ?DICT:store(Id, VMember, View)}.
   889 
   890 with_view_member(Fun, View, Id) ->
   891     store_view_member(Fun(fetch_view_member(Id, View)), View).
   892 
   893 fetch_view_member(Id, {_Ver, View}) ->
   894     ?DICT:fetch(Id, View).
   895 
   896 find_view_member(Id, {_Ver, View}) ->
   897     ?DICT:find(Id, View).
   898 
   899 blank_view(Ver) ->
   900     {Ver, ?DICT:new()}.
   901 
   902 alive_view_members({_Ver, View}) ->
   903     ?DICT:fetch_keys(View).
   904 
   905 all_known_members({_Ver, View}) ->
   906     ?DICT:fold(
   907        fun (Member, #view_member { aliases = Aliases }, Acc) ->
   908                ?SETS:to_list(Aliases) ++ [Member | Acc]
   909        end, [], View).
   910 
   911 group_to_view(#gm_group { members = Members, version = Ver }) ->
   912     Alive = lists:filter(fun is_member_alive/1, Members),
   913     [_|_] = Alive, %% ASSERTION - can't have all dead members
   914     add_aliases(link_view(Alive ++ Alive ++ Alive, blank_view(Ver)), Members).
   915 
   916 link_view([Left, Middle, Right | Rest], View) ->
   917     case find_view_member(Middle, View) of
   918         error ->
   919             link_view(
   920               [Middle, Right | Rest],
   921               store_view_member(#view_member { id      = Middle,
   922                                                aliases = ?SETS:new(),
   923                                                left    = Left,
   924                                                right   = Right }, View));
   925         {ok, _} ->
   926             View
   927     end;
   928 link_view(_, View) ->
   929     View.
   930 
   931 add_aliases(View, Members) ->
   932     Members1 = ensure_alive_suffix(Members),
   933     {EmptyDeadSet, View1} =
   934         lists:foldl(
   935           fun (Member, {DeadAcc, ViewAcc}) ->
   936                   case is_member_alive(Member) of
   937                       true ->
   938                           {?SETS:new(),
   939                            with_view_member(
   940                              fun (VMember =
   941                                       #view_member { aliases = Aliases }) ->
   942                                      VMember #view_member {
   943                                        aliases = ?SETS:union(Aliases, DeadAcc) }
   944                              end, ViewAcc, Member)};
   945                       false ->
   946                           {?SETS:add_element(dead_member_id(Member), DeadAcc),
   947                            ViewAcc}
   948                   end
   949           end, {?SETS:new(), View}, Members1),
   950     0 = ?SETS:size(EmptyDeadSet), %% ASSERTION
   951     View1.
   952 
   953 ensure_alive_suffix(Members) ->
   954     queue:to_list(ensure_alive_suffix1(queue:from_list(Members))).
   955 
   956 ensure_alive_suffix1(MembersQ) ->
   957     {{value, Member}, MembersQ1} = queue:out_r(MembersQ),
   958     case is_member_alive(Member) of
   959         true  -> MembersQ;
   960         false -> ensure_alive_suffix1(queue:in_r(Member, MembersQ1))
   961     end.
   962 
   963 
   964 %% ---------------------------------------------------------------------------
   965 %% View modification
   966 %% ---------------------------------------------------------------------------
   967 
   968 join_group(Self, GroupName) ->
   969     join_group(Self, GroupName, read_group(GroupName)).
   970 
   971 join_group(Self, GroupName, {error, not_found}) ->
   972     join_group(Self, GroupName, prune_or_create_group(Self, GroupName));
   973 join_group(Self, _GroupName, #gm_group { members = [Self] } = Group) ->
   974     group_to_view(Group);
   975 join_group(Self, GroupName, #gm_group { members = Members } = Group) ->
   976     case lists:member(Self, Members) of
   977         true ->
   978             group_to_view(Group);
   979         false ->
   980             case lists:filter(fun is_member_alive/1, Members) of
   981                 [] ->
   982                     join_group(Self, GroupName,
   983                                prune_or_create_group(Self, GroupName));
   984                 Alive ->
   985                     Left = lists:nth(random:uniform(length(Alive)), Alive),
   986                     Handler =
   987                         fun () ->
   988                                 join_group(
   989                                   Self, GroupName,
   990                                   record_dead_member_in_group(Left, GroupName))
   991                         end,
   992                     try
   993                         case gen_server2:call(
   994                                get_pid(Left), {add_on_right, Self}, infinity) of
   995                             {ok, Group1} -> group_to_view(Group1);
   996                             not_ready    -> join_group(Self, GroupName)
   997                         end
   998                     catch
   999                         exit:{R, _}
  1000                           when R =:= noproc; R =:= normal; R =:= shutdown ->
  1001                             Handler();
  1002                         exit:{{R, _}, _}
  1003                           when R =:= nodedown; R =:= shutdown ->
  1004                             Handler()
  1005                     end
  1006             end
  1007     end.
  1008 
  1009 read_group(GroupName) ->
  1010     case mnesia:dirty_read(?GROUP_TABLE, GroupName) of
  1011         []      -> {error, not_found};
  1012         [Group] -> Group
  1013     end.
  1014 
  1015 prune_or_create_group(Self, GroupName) ->
  1016     {atomic, Group} =
  1017         mnesia:sync_transaction(
  1018           fun () -> GroupNew = #gm_group { name    = GroupName,
  1019                                            members = [Self],
  1020                                            version = ?VERSION_START },
  1021                     case mnesia:read({?GROUP_TABLE, GroupName}) of
  1022                         [] ->
  1023                             mnesia:write(GroupNew),
  1024                             GroupNew;
  1025                         [Group1 = #gm_group { members = Members }] ->
  1026                             case lists:any(fun is_member_alive/1, Members) of
  1027                                 true  -> Group1;
  1028                                 false -> mnesia:write(GroupNew),
  1029                                          GroupNew
  1030                             end
  1031                     end
  1032           end),
  1033     Group.
  1034 
  1035 record_dead_member_in_group(Member, GroupName) ->
  1036     {atomic, Group} =
  1037         mnesia:sync_transaction(
  1038           fun () -> [Group1 = #gm_group { members = Members, version = Ver }] =
  1039                         mnesia:read({?GROUP_TABLE, GroupName}),
  1040                     case lists:splitwith(
  1041                            fun (Member1) -> Member1 =/= Member end, Members) of
  1042                         {_Members1, []} -> %% not found - already recorded dead
  1043                             Group1;
  1044                         {Members1, [Member | Members2]} ->
  1045                             Members3 = Members1 ++ [{dead, Member} | Members2],
  1046                             Group2 = Group1 #gm_group { members = Members3,
  1047                                                         version = Ver + 1 },
  1048                             mnesia:write(Group2),
  1049                             Group2
  1050                     end
  1051           end),
  1052     Group.
  1053 
  1054 record_new_member_in_group(GroupName, Left, NewMember, Fun) ->
  1055     {atomic, Group} =
  1056         mnesia:sync_transaction(
  1057           fun () ->
  1058                   [#gm_group { members = Members, version = Ver } = Group1] =
  1059                       mnesia:read({?GROUP_TABLE, GroupName}),
  1060                   {Prefix, [Left | Suffix]} =
  1061                       lists:splitwith(fun (M) -> M =/= Left end, Members),
  1062                   Members1 = Prefix ++ [Left, NewMember | Suffix],
  1063                   Group2 = Group1 #gm_group { members = Members1,
  1064                                               version = Ver + 1 },
  1065                   ok = Fun(Group2),
  1066                   mnesia:write(Group2),
  1067                   Group2
  1068           end),
  1069     Group.
  1070 
  1071 erase_members_in_group(Members, GroupName) ->
  1072     DeadMembers = [{dead, Id} || Id <- Members],
  1073     {atomic, Group} =
  1074         mnesia:sync_transaction(
  1075           fun () ->
  1076                   [Group1 = #gm_group { members = [_|_] = Members1,
  1077                                         version = Ver }] =
  1078                       mnesia:read({?GROUP_TABLE, GroupName}),
  1079                   case Members1 -- DeadMembers of
  1080                       Members1 -> Group1;
  1081                       Members2 -> Group2 =
  1082                                       Group1 #gm_group { members = Members2,
  1083                                                          version = Ver + 1 },
  1084                                   mnesia:write(Group2),
  1085                                   Group2
  1086                   end
  1087           end),
  1088     Group.
  1089 
  1090 maybe_erase_aliases(State = #state { self          = Self,
  1091                                      group_name    = GroupName,
  1092                                      view          = View,
  1093                                      members_state = MembersState,
  1094                                      module        = Module,
  1095                                      callback_args = Args }) ->
  1096     #view_member { aliases = Aliases } = fetch_view_member(Self, View),
  1097     {Erasable, MembersState1}
  1098         = ?SETS:fold(
  1099              fun (Id, {ErasableAcc, MembersStateAcc} = Acc) ->
  1100                      #member { last_pub = LP, last_ack = LA } =
  1101                          find_member_or_blank(Id, MembersState),
  1102                      case can_erase_view_member(Self, Id, LA, LP) of
  1103                          true  -> {[Id | ErasableAcc],
  1104                                    erase_member(Id, MembersStateAcc)};
  1105                          false -> Acc
  1106                      end
  1107              end, {[], MembersState}, Aliases),
  1108     State1 = State #state { members_state = MembersState1 },
  1109     case Erasable of
  1110         [] -> {ok, State1};
  1111         _  -> View1 = group_to_view(
  1112                         erase_members_in_group(Erasable, GroupName)),
  1113               {callback_view_changed(Args, Module, View, View1),
  1114                State1 #state { view = View1 }}
  1115     end.
  1116 
  1117 can_erase_view_member(Self, Self, _LA, _LP) -> false;
  1118 can_erase_view_member(_Self, _Id,   N,   N) -> true;
  1119 can_erase_view_member(_Self, _Id, _LA, _LP) -> false.
  1120 
  1121 
  1122 %% ---------------------------------------------------------------------------
  1123 %% View monitoring and maintanence
  1124 %% ---------------------------------------------------------------------------
  1125 
  1126 ensure_neighbour(_Ver, Self, {Self, undefined}, Self) ->
  1127     {Self, undefined};
  1128 ensure_neighbour(Ver, Self, {Self, undefined}, RealNeighbour) ->
  1129     ok = gen_server2:cast(get_pid(RealNeighbour),
  1130                           {?TAG, Ver, check_neighbours}),
  1131     {RealNeighbour, maybe_monitor(RealNeighbour, Self)};
  1132 ensure_neighbour(_Ver, _Self, {RealNeighbour, MRef}, RealNeighbour) ->
  1133     {RealNeighbour, MRef};
  1134 ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) ->
  1135     true = erlang:demonitor(MRef),
  1136     Msg = {?TAG, Ver, check_neighbours},
  1137     ok = gen_server2:cast(get_pid(RealNeighbour), Msg),
  1138     ok = case Neighbour of
  1139              Self -> ok;
  1140              _    -> gen_server2:cast(get_pid(Neighbour), Msg)
  1141          end,
  1142     {Neighbour, maybe_monitor(Neighbour, Self)}.
  1143 
  1144 maybe_monitor(Self, Self) ->
  1145     undefined;
  1146 maybe_monitor(Other, _Self) ->
  1147     erlang:monitor(process, get_pid(Other)).
  1148 
  1149 check_neighbours(State = #state { self             = Self,
  1150                                   left             = Left,
  1151                                   right            = Right,
  1152                                   view             = View,
  1153                                   broadcast_buffer = Buffer }) ->
  1154     #view_member { left = VLeft, right = VRight }
  1155         = fetch_view_member(Self, View),
  1156     Ver = view_version(View),
  1157     Left1 = ensure_neighbour(Ver, Self, Left, VLeft),
  1158     Right1 = ensure_neighbour(Ver, Self, Right, VRight),
  1159     Buffer1 = case Right1 of
  1160                   {Self, undefined} -> [];
  1161                   _                 -> Buffer
  1162               end,
  1163     State1 = State #state { left = Left1, right = Right1,
  1164                             broadcast_buffer = Buffer1 },
  1165     ok = maybe_send_catchup(Right, State1),
  1166     State1.
  1167 
  1168 maybe_send_catchup(Right, #state { right = Right }) ->
  1169     ok;
  1170 maybe_send_catchup(_Right, #state { self  = Self,
  1171                                     right = {Self, undefined} }) ->
  1172     ok;
  1173 maybe_send_catchup(_Right, #state { members_state = undefined }) ->
  1174     ok;
  1175 maybe_send_catchup(_Right, #state { self          = Self,
  1176                                     right         = {Right, _MRef},
  1177                                     view          = View,
  1178                                     members_state = MembersState }) ->
  1179     send_right(Right, View,
  1180                {catchup, Self, prepare_members_state(MembersState)}).
  1181 
  1182 
  1183 %% ---------------------------------------------------------------------------
  1184 %% Catch_up delta detection
  1185 %% ---------------------------------------------------------------------------
  1186 
  1187 find_prefix_common_suffix(A, B) ->
  1188     {Prefix, A1} = find_prefix(A, B, queue:new()),
  1189     {Common, Suffix} = find_common(A1, B, queue:new()),
  1190     {Prefix, Common, Suffix}.
  1191 
  1192 %% Returns the elements of A that occur before the first element of B,
  1193 %% plus the remainder of A.
  1194 find_prefix(A, B, Prefix) ->
  1195     case {queue:out(A), queue:out(B)} of
  1196         {{{value, Val}, _A1}, {{value, Val}, _B1}} ->
  1197             {Prefix, A};
  1198         {{empty, A1}, {{value, _A}, _B1}} ->
  1199             {Prefix, A1};
  1200         {{{value, {NumA, _MsgA} = Val}, A1},
  1201          {{value, {NumB, _MsgB}}, _B1}} when NumA < NumB ->
  1202             find_prefix(A1, B, queue:in(Val, Prefix));
  1203         {_, {empty, _B1}} ->
  1204             {A, Prefix} %% Prefix well be empty here
  1205     end.
  1206 
  1207 %% A should be a prefix of B. Returns the commonality plus the
  1208 %% remainder of B.
  1209 find_common(A, B, Common) ->
  1210     case {queue:out(A), queue:out(B)} of
  1211         {{{value, Val}, A1}, {{value, Val}, B1}} ->
  1212             find_common(A1, B1, queue:in(Val, Common));
  1213         {{empty, _A}, _} ->
  1214             {Common, B}
  1215     end.
  1216 
  1217 
  1218 %% ---------------------------------------------------------------------------
  1219 %% Members helpers
  1220 %% ---------------------------------------------------------------------------
  1221 
  1222 with_member(Fun, Id, MembersState) ->
  1223     store_member(
  1224       Id, Fun(find_member_or_blank(Id, MembersState)), MembersState).
  1225 
  1226 with_member_acc(Fun, Id, {MembersState, Acc}) ->
  1227     {MemberState, Acc1} = Fun(find_member_or_blank(Id, MembersState), Acc),
  1228     {store_member(Id, MemberState, MembersState), Acc1}.
  1229 
  1230 find_member_or_blank(Id, MembersState) ->
  1231     case ?DICT:find(Id, MembersState) of
  1232         {ok, Result} -> Result;
  1233         error        -> blank_member()
  1234     end.
  1235 
  1236 erase_member(Id, MembersState) ->
  1237     ?DICT:erase(Id, MembersState).
  1238 
  1239 blank_member() ->
  1240     #member { pending_ack = queue:new(), last_pub = -1, last_ack = -1 }.
  1241 
  1242 blank_member_state() ->
  1243     ?DICT:new().
  1244 
  1245 store_member(Id, MemberState, MembersState) ->
  1246     ?DICT:store(Id, MemberState, MembersState).
  1247 
  1248 prepare_members_state(MembersState) ->
  1249     ?DICT:to_list(MembersState).
  1250 
  1251 build_members_state(MembersStateList) ->
  1252     ?DICT:from_list(MembersStateList).
  1253 
  1254 make_member(GroupName) ->
  1255    {case read_group(GroupName) of
  1256         #gm_group { version = Version } -> Version;
  1257         {error, not_found}              -> ?VERSION_START
  1258     end, self()}.
  1259 
  1260 get_pid({_Version, Pid}) -> Pid.
  1261 
  1262 get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids].
  1263 
  1264 %% ---------------------------------------------------------------------------
  1265 %% Activity assembly
  1266 %% ---------------------------------------------------------------------------
  1267 
  1268 activity_nil() ->
  1269     queue:new().
  1270 
  1271 activity_cons(_Id, [], [], Tail) ->
  1272     Tail;
  1273 activity_cons(Sender, Pubs, Acks, Tail) ->
  1274     queue:in({Sender, Pubs, Acks}, Tail).
  1275 
  1276 activity_finalise(Activity) ->
  1277     queue:to_list(Activity).
  1278 
  1279 maybe_send_activity([], _State) ->
  1280     ok;
  1281 maybe_send_activity(Activity, #state { self  = Self,
  1282                                        right = {Right, _MRefR},
  1283                                        view  = View }) ->
  1284     send_right(Right, View, {activity, Self, Activity}).
  1285 
  1286 send_right(Right, View, Msg) ->
  1287     ok = gen_server2:cast(get_pid(Right), {?TAG, view_version(View), Msg}).
  1288 
  1289 callback(Args, Module, Activity) ->
  1290     lists:foldl(
  1291       fun ({Id, Pubs, _Acks}, ok) ->
  1292               lists:foldl(fun ({_PubNum, Pub}, ok) ->
  1293                                   Module:handle_msg(Args, get_pid(Id), Pub);
  1294                               (_, Error) ->
  1295                                   Error
  1296                           end, ok, Pubs);
  1297           (_, Error) ->
  1298               Error
  1299       end, ok, Activity).
  1300 
  1301 callback_view_changed(Args, Module, OldView, NewView) ->
  1302     OldMembers = all_known_members(OldView),
  1303     NewMembers = all_known_members(NewView),
  1304     Births = NewMembers -- OldMembers,
  1305     Deaths = OldMembers -- NewMembers,
  1306     case {Births, Deaths} of
  1307         {[], []} -> ok;
  1308         _        -> Module:members_changed(Args, get_pids(Births),
  1309                                                  get_pids(Deaths))
  1310     end.
  1311 
  1312 handle_callback_result({Result, State}) ->
  1313     if_callback_success(
  1314       Result, fun no_reply_true/3, fun no_reply_false/3, undefined, State);
  1315 handle_callback_result({Result, Reply, State}) ->
  1316     if_callback_success(
  1317       Result, fun reply_true/3, fun reply_false/3, Reply, State).
  1318 
  1319 no_reply_true (_Result,        _Undefined, State) -> noreply(State).
  1320 no_reply_false({stop, Reason}, _Undefined, State) -> {stop, Reason, State}.
  1321 
  1322 reply_true (_Result,        Reply, State) -> reply(Reply, State).
  1323 reply_false({stop, Reason}, Reply, State) -> {stop, Reason, Reply, State}.
  1324 
  1325 handle_msg_true (_Result, Msg, State) -> handle_msg(Msg, State).
  1326 handle_msg_false(Result, _Msg, State) -> {Result, State}.
  1327 
  1328 activity_true(_Result, Activity, State = #state { module        = Module,
  1329                                                   callback_args = Args }) ->
  1330     {callback(Args, Module, Activity), State}.
  1331 activity_false(Result, _Activity, State) ->
  1332     {Result, State}.
  1333 
  1334 if_callback_success(ok, True, _False, Arg, State) ->
  1335     True(ok, Arg, State);
  1336 if_callback_success(
  1337   {become, Module, Args} = Result, True, _False, Arg, State) ->
  1338     True(Result, Arg, State #state { module        = Module,
  1339                                      callback_args = Args });
  1340 if_callback_success({stop, _Reason} = Result, _True, False, Arg, State) ->
  1341     False(Result, Arg, State).
  1342 
  1343 maybe_confirm(_Self, _Id, Confirms, []) ->
  1344     Confirms;
  1345 maybe_confirm(Self, Self, Confirms, [PubNum | PubNums]) ->
  1346     case queue:out(Confirms) of
  1347         {empty, _Confirms} ->
  1348             Confirms;
  1349         {{value, {PubNum, From}}, Confirms1} ->
  1350             gen_server2:reply(From, ok),
  1351             maybe_confirm(Self, Self, Confirms1, PubNums);
  1352         {{value, {PubNum1, _From}}, _Confirms} when PubNum1 > PubNum ->
  1353             maybe_confirm(Self, Self, Confirms, PubNums)
  1354     end;
  1355 maybe_confirm(_Self, _Id, Confirms, _PubNums) ->
  1356     Confirms.
  1357 
  1358 purge_confirms(Confirms) ->
  1359     [gen_server2:reply(From, ok) || {_PubNum, From} <- queue:to_list(Confirms)],
  1360     queue:new().
  1361 
  1362 
  1363 %% ---------------------------------------------------------------------------
  1364 %% Msg transformation
  1365 %% ---------------------------------------------------------------------------
  1366 
  1367 acks_from_queue(Q) ->
  1368     [PubNum || {PubNum, _Msg} <- queue:to_list(Q)].
  1369 
  1370 pubs_from_queue(Q) ->
  1371     queue:to_list(Q).
  1372 
  1373 queue_from_pubs(Pubs) ->
  1374     queue:from_list(Pubs).
  1375 
  1376 apply_acks([], Pubs) ->
  1377     Pubs;
  1378 apply_acks(List, Pubs) ->
  1379     {_, Pubs1} = queue:split(length(List), Pubs),
  1380     Pubs1.
  1381 
  1382 join_pubs(Q, [])   -> Q;
  1383 join_pubs(Q, Pubs) -> queue:join(Q, queue_from_pubs(Pubs)).
  1384 
  1385 last_ack([], LA) ->
  1386     LA;
  1387 last_ack(List, LA) ->
  1388     LA1 = lists:last(List),
  1389     true = LA1 > LA, %% ASSERTION
  1390     LA1.
  1391 
  1392 last_pub([], LP) ->
  1393     LP;
  1394 last_pub(List, LP) ->
  1395     {PubNum, _Msg} = lists:last(List),
  1396     true = PubNum > LP, %% ASSERTION
  1397     PubNum.