src/gm.erl
author Simon MacMullen <simon@rabbitmq.com>
Wed, 22 Oct 2014 11:26:43 +0100
changeset 14076 b0801a68881f
parent 13958 125195670790
permissions -rw-r--r--
stable to default
     1 %% The contents of this file are subject to the Mozilla Public License
     2 %% Version 1.1 (the "License"); you may not use this file except in
     3 %% compliance with the License. You may obtain a copy of the License at
     4 %% http://www.mozilla.org/MPL/
     5 %%
     6 %% Software distributed under the License is distributed on an "AS IS"
     7 %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
     8 %% License for the specific language governing rights and limitations
     9 %% under the License.
    10 %%
    11 %% The Original Code is RabbitMQ.
    12 %%
    13 %% The Initial Developer of the Original Code is GoPivotal, Inc.
    14 %% Copyright (c) 2007-2014 GoPivotal, Inc.  All rights reserved.
    15 %%
    16 
    17 -module(gm).
    18 
    19 %% Guaranteed Multicast
    20 %% ====================
    21 %%
    22 %% This module provides the ability to create named groups of
    23 %% processes to which members can be dynamically added and removed,
    24 %% and for messages to be broadcast within the group that are
    25 %% guaranteed to reach all members of the group during the lifetime of
    26 %% the message. The lifetime of a message is defined as being, at a
    27 %% minimum, the time from which the message is first sent to any
    28 %% member of the group, up until the time at which it is known by the
    29 %% member who published the message that the message has reached all
    30 %% group members.
    31 %%
    32 %% The guarantee given is that provided a message, once sent, makes it
    33 %% to members who do not all leave the group, the message will
    34 %% continue to propagate to all group members.
    35 %%
    36 %% Another way of stating the guarantee is that if member P publishes
    37 %% messages m and m', then for all members P', if P' is a member of
    38 %% the group prior to the publication of m, and P' receives m', then
    39 %% P' will receive m.
    40 %%
    41 %% Note that only local-ordering is enforced: i.e. if member P sends
    42 %% message m and then message m', then for-all members P', if P'
    43 %% receives m and m', then they will receive m' after m. Causality
    44 %% ordering is _not_ enforced. I.e. if member P receives message m
    45 %% and as a result publishes message m', there is no guarantee that
    46 %% other members P' will receive m before m'.
    47 %%
    48 %%
    49 %% API Use
    50 %% -------
    51 %%
    52 %% Mnesia must be started. Use the idempotent create_tables/0 function
    53 %% to create the tables required.
    54 %%
    55 %% start_link/3
    56 %% Provide the group name, the callback module name, and any arguments
    57 %% you wish to be passed into the callback module's functions. The
    58 %% joined/2 function will be called when we have joined the group,
    59 %% with the arguments passed to start_link and a list of the current
    60 %% members of the group. See the callbacks specs and the comments
    61 %% below for further details of the callback functions.
    62 %%
    63 %% leave/1
    64 %% Provide the Pid. Removes the Pid from the group. The callback
    65 %% handle_terminate/2 function will be called.
    66 %%
    67 %% broadcast/2
    68 %% Provide the Pid and a Message. The message will be sent to all
    69 %% members of the group as per the guarantees given above. This is a
    70 %% cast and the function call will return immediately. There is no
    71 %% guarantee that the message will reach any member of the group.
    72 %%
    73 %% confirmed_broadcast/2
    74 %% Provide the Pid and a Message. As per broadcast/2 except that this
    75 %% is a call, not a cast, and only returns 'ok' once the Message has
    76 %% reached every member of the group. Do not call
    77 %% confirmed_broadcast/2 directly from the callback module otherwise
    78 %% you will deadlock the entire group.
    79 %%
    80 %% info/1
    81 %% Provide the Pid. Returns a proplist with various facts, including
    82 %% the group name and the current group members.
    83 %%
    84 %% validate_members/2
    85 %% Check whether a given member list agrees with the chosen member's
    86 %% view. Any differences will be communicated via the members_changed
    87 %% callback. If there are no differences then there will be no reply.
    88 %% Note that members will not necessarily share the same view.
    89 %%
    90 %% forget_group/1
    91 %% Provide the group name. Removes its mnesia record. Makes no attempt
    92 %% to ensure the group is empty.
    93 %%
    94 %% Implementation Overview
    95 %% -----------------------
    96 %%
    97 %% One possible means of implementation would be a fan-out from the
    98 %% sender to every member of the group. This would require that the
    99 %% group is fully connected, and, in the event that the original
   100 %% sender of the message disappears from the group before the message
   101 %% has made it to every member of the group, raises questions as to
   102 %% who is responsible for sending on the message to new group members.
   103 %% In particular, the issue is with [ Pid ! Msg || Pid <- Members ] -
   104 %% if the sender dies part way through, who is responsible for
   105 %% ensuring that the remaining Members receive the Msg? In the event
   106 %% that within the group, messages sent are broadcast from a subset of
   107 %% the members, the fan-out arrangement has the potential to
   108 %% substantially impact the CPU and network workload of such members,
   109 %% as such members would have to accommodate the cost of sending each
   110 %% message to every group member.
   111 %%
   112 %% Instead, if the members of the group are arranged in a chain, then
   113 %% it becomes easier to reason about who within the group has received
   114 %% each message and who has not. It eases issues of responsibility: in
   115 %% the event of a group member disappearing, the nearest upstream
   116 %% member of the chain is responsible for ensuring that messages
   117 %% continue to propagate down the chain. It also results in equal
   118 %% distribution of sending and receiving workload, even if all
   119 %% messages are being sent from just a single group member. This
   120 %% configuration has the further advantage that it is not necessary
   121 %% for every group member to know of every other group member, and
   122 %% even that a group member does not have to be accessible from all
   123 %% other group members.
   124 %%
   125 %% Performance is kept high by permitting pipelining and all
   126 %% communication between joined group members is asynchronous. In the
   127 %% chain A -> B -> C -> D, if A sends a message to the group, it will
   128 %% not directly contact C or D. However, it must know that D receives
   129 %% the message (in addition to B and C) before it can consider the
   130 %% message fully sent. A simplistic implementation would require that
   131 %% D replies to C, C replies to B and B then replies to A. This would
   132 %% result in a propagation delay of twice the length of the chain. It
   133 %% would also require, in the event of the failure of C, that D knows
   134 %% to directly contact B and issue the necessary replies. Instead, the
   135 %% chain forms a ring: D sends the message on to A: D does not
   136 %% distinguish A as the sender, merely as the next member (downstream)
   137 %% within the chain (which has now become a ring). When A receives
   138 %% from D messages that A sent, it knows that all members have
   139 %% received the message. However, the message is not dead yet: if C
   140 %% died as B was sending to C, then B would need to detect the death
   141 %% of C and forward the message on to D instead: thus every node has
   142 %% to remember every message published until it is told that it can
   143 %% forget about the message. This is essential not just for dealing
   144 %% with failure of members, but also for the addition of new members.
   145 %%
   146 %% Thus once A receives the message back again, it then sends to B an
   147 %% acknowledgement for the message, indicating that B can now forget
   148 %% about the message. B does so, and forwards the ack to C. C forgets
   149 %% the message, and forwards the ack to D, which forgets the message
   150 %% and finally forwards the ack back to A. At this point, A takes no
   151 %% further action: the message and its acknowledgement have made it to
   152 %% every member of the group. The message is now dead, and any new
   153 %% member joining the group at this point will not receive the
   154 %% message.
   155 %%
   156 %% We therefore have two roles:
   157 %%
   158 %% 1. The sender, who upon receiving their own messages back, must
   159 %% then send out acknowledgements, and upon receiving their own
   160 %% acknowledgements back perform no further action.
   161 %%
   162 %% 2. The other group members who upon receiving messages and
   163 %% acknowledgements must update their own internal state accordingly
   164 %% (the sending member must also do this in order to be able to
   165 %% accommodate failures), and forwards messages on to their downstream
   166 %% neighbours.
   167 %%
   168 %%
   169 %% Implementation: It gets trickier
   170 %% --------------------------------
   171 %%
   172 %% Chain A -> B -> C -> D
   173 %%
   174 %% A publishes a message which B receives. A now dies. B and D will
   175 %% detect the death of A, and will link up, thus the chain is now B ->
   176 %% C -> D. B forwards A's message on to C, who forwards it to D, who
   177 %% forwards it to B. Thus B is now responsible for A's messages - both
   178 %% publications and acknowledgements that were in flight at the point
   179 %% at which A died. Even worse is that this is transitive: after B
   180 %% forwards A's message to C, B dies as well. Now C is not only
   181 %% responsible for B's in-flight messages, but is also responsible for
   182 %% A's in-flight messages.
   183 %%
   184 %% Lemma 1: A member can only determine which dead members they have
   185 %% inherited responsibility for if there is a total ordering on the
   186 %% conflicting additions and subtractions of members from the group.
   187 %%
   188 %% Consider the simultaneous death of B and addition of B' that
   189 %% transitions a chain from A -> B -> C to A -> B' -> C. Either B' or
   190 %% C is responsible for in-flight messages from B. It is easy to
   191 %% ensure that at least one of them thinks they have inherited B, but
   192 %% if we do not ensure that exactly one of them inherits B, then we
   193 %% could have B' converting publishes to acks, which then will crash C
   194 %% as C does not believe it has issued acks for those messages.
   195 %%
   196 %% More complex scenarios are easy to concoct: A -> B -> C -> D -> E
   197 %% becoming A -> C' -> E. Who has inherited which of B, C and D?
   198 %%
   199 %% However, for non-conflicting membership changes, only a partial
   200 %% ordering is required. For example, A -> B -> C becoming A -> A' ->
   201 %% B. The addition of A', between A and B can have no conflicts with
   202 %% the death of C: it is clear that A has inherited C's messages.
   203 %%
   204 %% For ease of implementation, we adopt the simple solution, of
   205 %% imposing a total order on all membership changes.
   206 %%
   207 %% On the death of a member, it is ensured the dead member's
   208 %% neighbours become aware of the death, and the upstream neighbour
   209 %% now sends to its new downstream neighbour its state, including the
   210 %% messages pending acknowledgement. The downstream neighbour can then
   211 %% use this to calculate which publishes and acknowledgements it has
   212 %% missed out on, due to the death of its old upstream. Thus the
   213 %% downstream can catch up, and continues the propagation of messages
   214 %% through the group.
   215 %%
   216 %% Lemma 2: When a member is joining, it must synchronously
   217 %% communicate with its upstream member in order to receive its
   218 %% starting state atomically with its addition to the group.
   219 %%
   220 %% New members must start with the same state as their nearest
   221 %% upstream neighbour. This ensures that it is not surprised by
   222 %% acknowledgements they are sent, and that should their downstream
   223 %% neighbour die, they are able to send the correct state to their new
   224 %% downstream neighbour to ensure it can catch up. Thus in the
   225 %% transition A -> B -> C becomes A -> A' -> B -> C becomes A -> A' ->
   226 %% C, A' must start with the state of A, so that it can send C the
   227 %% correct state when B dies, allowing C to detect any missed
   228 %% messages.
   229 %%
   230 %% If A' starts by adding itself to the group membership, A could then
   231 %% die, without A' having received the necessary state from A. This
   232 %% would leave A' responsible for in-flight messages from A, but
   233 %% having the least knowledge of all, of those messages. Thus A' must
   234 %% start by synchronously calling A, which then immediately sends A'
   235 %% back its state. A then adds A' to the group. If A dies at this
   236 %% point then A' will be able to see this (as A' will fail to appear
   237 %% in the group membership), and thus A' will ignore the state it
   238 %% receives from A, and will simply repeat the process, trying to now
   239 %% join downstream from some other member. This ensures that should
   240 %% the upstream die as soon as the new member has been joined, the new
   241 %% member is guaranteed to receive the correct state, allowing it to
   242 %% correctly process messages inherited due to the death of its
   243 %% upstream neighbour.
   244 %%
   245 %% The canonical definition of the group membership is held by a
   246 %% distributed database. Whilst this allows the total ordering of
   247 %% changes to be achieved, it is nevertheless undesirable to have to
   248 %% query this database for the current view, upon receiving each
   249 %% message. Instead, we wish for members to be able to cache a view of
   250 %% the group membership, which then requires a cache invalidation
   251 %% mechanism. Each member maintains its own view of the group
   252 %% membership. Thus when the group's membership changes, members may
   253 %% need to become aware of such changes in order to be able to
   254 %% accurately process messages they receive. Because of the
   255 %% requirement of a total ordering of conflicting membership changes,
   256 %% it is not possible to use the guaranteed broadcast mechanism to
   257 %% communicate these changes: to achieve the necessary ordering, it
   258 %% would be necessary for such messages to be published by exactly one
   259 %% member, which can not be guaranteed given that such a member could
   260 %% die.
   261 %%
   262 %% The total ordering we enforce on membership changes gives rise to a
   263 %% view version number: every change to the membership creates a
   264 %% different view, and the total ordering permits a simple
   265 %% monotonically increasing view version number.
   266 %%
   267 %% Lemma 3: If a message is sent from a member that holds view version
   268 %% N, it can be correctly processed by any member receiving the
   269 %% message with a view version >= N.
   270 %%
   271 %% Initially, let us suppose that each view contains the ordering of
   272 %% every member that was ever part of the group. Dead members are
   273 %% marked as such. Thus we have a ring of members, some of which are
   274 %% dead, and are thus inherited by the nearest alive downstream
   275 %% member.
   276 %%
   277 %% In the chain A -> B -> C, all three members initially have view
   278 %% version 1, which reflects reality. B publishes a message, which is
   279 %% forward by C to A. B now dies, which A notices very quickly. Thus A
   280 %% updates the view, creating version 2. It now forwards B's
   281 %% publication, sending that message to its new downstream neighbour,
   282 %% C. This happens before C is aware of the death of B. C must become
   283 %% aware of the view change before it interprets the message its
   284 %% received, otherwise it will fail to learn of the death of B, and
   285 %% thus will not realise it has inherited B's messages (and will
   286 %% likely crash).
   287 %%
   288 %% Thus very simply, we have that each subsequent view contains more
   289 %% information than the preceding view.
   290 %%
   291 %% However, to avoid the views growing indefinitely, we need to be
   292 %% able to delete members which have died _and_ for which no messages
   293 %% are in-flight. This requires that upon inheriting a dead member, we
   294 %% know the last publication sent by the dead member (this is easy: we
   295 %% inherit a member because we are the nearest downstream member which
   296 %% implies that we know at least as much than everyone else about the
   297 %% publications of the dead member), and we know the earliest message
   298 %% for which the acknowledgement is still in flight.
   299 %%
   300 %% In the chain A -> B -> C, when B dies, A will send to C its state
   301 %% (as C is the new downstream from A), allowing C to calculate which
   302 %% messages it has missed out on (described above). At this point, C
   303 %% also inherits B's messages. If that state from A also includes the
   304 %% last message published by B for which an acknowledgement has been
   305 %% seen, then C knows exactly which further acknowledgements it must
   306 %% receive (also including issuing acknowledgements for publications
   307 %% still in-flight that it receives), after which it is known there
   308 %% are no more messages in flight for B, thus all evidence that B was
   309 %% ever part of the group can be safely removed from the canonical
   310 %% group membership.
   311 %%
   312 %% Thus, for every message that a member sends, it includes with that
   313 %% message its view version. When a member receives a message it will
   314 %% update its view from the canonical copy, should its view be older
   315 %% than the view version included in the message it has received.
   316 %%
   317 %% The state held by each member therefore includes the messages from
   318 %% each publisher pending acknowledgement, the last publication seen
   319 %% from that publisher, and the last acknowledgement from that
   320 %% publisher. In the case of the member's own publications or
   321 %% inherited members, this last acknowledgement seen state indicates
   322 %% the last acknowledgement retired, rather than sent.
   323 %%
   324 %%
   325 %% Proof sketch
   326 %% ------------
   327 %%
   328 %% We need to prove that with the provided operational semantics, we
   329 %% can never reach a state that is not well formed from a well-formed
   330 %% starting state.
   331 %%
   332 %% Operational semantics (small step): straight-forward message
   333 %% sending, process monitoring, state updates.
   334 %%
   335 %% Well formed state: dead members inherited by exactly one non-dead
   336 %% member; for every entry in anyone's pending-acks, either (the
   337 %% publication of the message is in-flight downstream from the member
   338 %% and upstream from the publisher) or (the acknowledgement of the
   339 %% message is in-flight downstream from the publisher and upstream
   340 %% from the member).
   341 %%
   342 %% Proof by induction on the applicable operational semantics.
   343 %%
   344 %%
   345 %% Related work
   346 %% ------------
   347 %%
   348 %% The ring configuration and double traversal of messages around the
   349 %% ring is similar (though developed independently) to the LCR
   350 %% protocol by [Levy 2008]. However, LCR differs in several
   351 %% ways. Firstly, by using vector clocks, it enforces a total order of
   352 %% message delivery, which is unnecessary for our purposes. More
   353 %% significantly, it is built on top of a "group communication system"
   354 %% which performs the group management functions, taking
   355 %% responsibility away from the protocol as to how to cope with safely
   356 %% adding and removing members. When membership changes do occur, the
   357 %% protocol stipulates that every member must perform communication
   358 %% with every other member of the group, to ensure all outstanding
   359 %% deliveries complete, before the entire group transitions to the new
   360 %% view. This, in total, requires two sets of all-to-all synchronous
   361 %% communications.
   362 %%
   363 %% This is not only rather inefficient, but also does not explain what
   364 %% happens upon the failure of a member during this process. It does
   365 %% though entirely avoid the need for inheritance of responsibility of
   366 %% dead members that our protocol incorporates.
   367 %%
   368 %% In [Marandi et al 2010], a Paxos-based protocol is described. This
   369 %% work explicitly focuses on the efficiency of communication. LCR
   370 %% (and our protocol too) are more efficient, but at the cost of
   371 %% higher latency. The Ring-Paxos protocol is itself built on top of
   372 %% IP-multicast, which rules it out for many applications where
   373 %% point-to-point communication is all that can be required. They also
   374 %% have an excellent related work section which I really ought to
   375 %% read...
   376 %%
   377 %%
   378 %% [Levy 2008] The Complexity of Reliable Distributed Storage, 2008.
   379 %% [Marandi et al 2010] Ring Paxos: A High-Throughput Atomic Broadcast
   380 %% Protocol
   381 
   382 
   383 -behaviour(gen_server2).
   384 
   385 -export([create_tables/0, start_link/4, leave/1, broadcast/2, broadcast/3,
   386          confirmed_broadcast/2, info/1, validate_members/2, forget_group/1]).
   387 
   388 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
   389          code_change/3, prioritise_info/3]).
   390 
   391 %% For INSTR_MOD callbacks
   392 -export([call/3, cast/2, monitor/1, demonitor/1]).
   393 
   394 -ifndef(use_specs).
   395 -export([behaviour_info/1]).
   396 -endif.
   397 
   398 -export([table_definitions/0]).
   399 
   400 -define(GROUP_TABLE, gm_group).
   401 -define(MAX_BUFFER_SIZE, 100000000). %% 100MB
   402 -define(HIBERNATE_AFTER_MIN, 1000).
   403 -define(DESIRED_HIBERNATE, 10000).
   404 -define(BROADCAST_TIMER, 25).
   405 -define(VERSION_START, 0).
   406 -define(SETS, ordsets).
   407 -define(DICT, orddict).
   408 
   409 -record(state,
   410         { self,
   411           left,
   412           right,
   413           group_name,
   414           module,
   415           view,
   416           pub_count,
   417           members_state,
   418           callback_args,
   419           confirms,
   420           broadcast_buffer,
   421           broadcast_buffer_sz,
   422           broadcast_timer,
   423           txn_executor
   424         }).
   425 
   426 -record(gm_group, { name, version, members }).
   427 
   428 -record(view_member, { id, aliases, left, right }).
   429 
   430 -record(member, { pending_ack, last_pub, last_ack }).
   431 
   432 -define(TABLE, {?GROUP_TABLE, [{record_name, gm_group},
   433                                {attributes, record_info(fields, gm_group)}]}).
   434 -define(TABLE_MATCH, {match, #gm_group { _ = '_' }}).
   435 
   436 -define(TAG, '$gm').
   437 
   438 -ifdef(use_specs).
   439 
   440 -export_type([group_name/0]).
   441 
   442 -type(group_name() :: any()).
   443 -type(txn_fun() :: fun((fun(() -> any())) -> any())).
   444 
   445 -spec(create_tables/0 :: () -> 'ok' | {'aborted', any()}).
   446 -spec(start_link/4 :: (group_name(), atom(), any(), txn_fun()) ->
   447                            rabbit_types:ok_pid_or_error()).
   448 -spec(leave/1 :: (pid()) -> 'ok').
   449 -spec(broadcast/2 :: (pid(), any()) -> 'ok').
   450 -spec(confirmed_broadcast/2 :: (pid(), any()) -> 'ok').
   451 -spec(info/1 :: (pid()) -> rabbit_types:infos()).
   452 -spec(validate_members/2 :: (pid(), [pid()]) -> 'ok').
   453 -spec(forget_group/1 :: (group_name()) -> 'ok').
   454 
   455 %% The joined, members_changed and handle_msg callbacks can all return
   456 %% any of the following terms:
   457 %%
   458 %% 'ok' - the callback function returns normally
   459 %%
   460 %% {'stop', Reason} - the callback indicates the member should stop
   461 %% with reason Reason and should leave the group.
   462 %%
   463 %% {'become', Module, Args} - the callback indicates that the callback
   464 %% module should be changed to Module and that the callback functions
   465 %% should now be passed the arguments Args. This allows the callback
   466 %% module to be dynamically changed.
   467 
   468 %% Called when we've successfully joined the group. Supplied with Args
   469 %% provided in start_link, plus current group members.
   470 -callback joined(Args :: term(), Members :: [pid()]) ->
   471     ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}.
   472 
   473 %% Supplied with Args provided in start_link, the list of new members
   474 %% and the list of members previously known to us that have since
   475 %% died. Note that if a member joins and dies very quickly, it's
   476 %% possible that we will never see that member appear in either births
   477 %% or deaths. However we are guaranteed that (1) we will see a member
   478 %% joining either in the births here, or in the members passed to
   479 %% joined/2 before receiving any messages from it; and (2) we will not
   480 %% see members die that we have not seen born (or supplied in the
   481 %% members to joined/2).
   482 -callback members_changed(Args :: term(),
   483                           Births :: [pid()], Deaths :: [pid()]) ->
   484     ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}.
   485 
   486 %% Supplied with Args provided in start_link, the sender, and the
   487 %% message. This does get called for messages injected by this member,
   488 %% however, in such cases, there is no special significance of this
   489 %% invocation: it does not indicate that the message has made it to
   490 %% any other members, let alone all other members.
   491 -callback handle_msg(Args :: term(), From :: pid(), Message :: term()) ->
   492     ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}.
   493 
   494 %% Called on gm member termination as per rules in gen_server, with
   495 %% the Args provided in start_link plus the termination Reason.
   496 -callback handle_terminate(Args :: term(), Reason :: term()) ->
   497     ok | term().
   498 
   499 -else.
   500 
   501 behaviour_info(callbacks) ->
   502     [{joined, 2}, {members_changed, 3}, {handle_msg, 3}, {handle_terminate, 2}];
   503 behaviour_info(_Other) ->
   504     undefined.
   505 
   506 -endif.
   507 
   508 create_tables() ->
   509     create_tables([?TABLE]).
   510 
   511 create_tables([]) ->
   512     ok;
   513 create_tables([{Table, Attributes} | Tables]) ->
   514     case mnesia:create_table(Table, Attributes) of
   515         {atomic, ok}                       -> create_tables(Tables);
   516         {aborted, {already_exists, Table}} -> create_tables(Tables);
   517         Err                                -> Err
   518     end.
   519 
   520 table_definitions() ->
   521     {Name, Attributes} = ?TABLE,
   522     [{Name, [?TABLE_MATCH | Attributes]}].
   523 
   524 start_link(GroupName, Module, Args, TxnFun) ->
   525     gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun], []).
   526 
   527 leave(Server) ->
   528     gen_server2:cast(Server, leave).
   529 
   530 broadcast(Server, Msg) -> broadcast(Server, Msg, 0).
   531 
   532 broadcast(Server, Msg, SizeHint) ->
   533     gen_server2:cast(Server, {broadcast, Msg, SizeHint}).
   534 
   535 confirmed_broadcast(Server, Msg) ->
   536     gen_server2:call(Server, {confirmed_broadcast, Msg}, infinity).
   537 
   538 info(Server) ->
   539     gen_server2:call(Server, info, infinity).
   540 
   541 validate_members(Server, Members) ->
   542     gen_server2:cast(Server, {validate_members, Members}).
   543 
   544 forget_group(GroupName) ->
   545     {atomic, ok} = mnesia:sync_transaction(
   546                      fun () ->
   547                              mnesia:delete({?GROUP_TABLE, GroupName})
   548                      end),
   549     ok.
   550 
   551 init([GroupName, Module, Args, TxnFun]) ->
   552     put(process_name, {?MODULE, GroupName}),
   553     {MegaSecs, Secs, MicroSecs} = now(),
   554     random:seed(MegaSecs, Secs, MicroSecs),
   555     Self = make_member(GroupName),
   556     gen_server2:cast(self(), join),
   557     {ok, #state { self                = Self,
   558                   left                = {Self, undefined},
   559                   right               = {Self, undefined},
   560                   group_name          = GroupName,
   561                   module              = Module,
   562                   view                = undefined,
   563                   pub_count           = -1,
   564                   members_state       = undefined,
   565                   callback_args       = Args,
   566                   confirms            = queue:new(),
   567                   broadcast_buffer    = [],
   568                   broadcast_buffer_sz = 0,
   569                   broadcast_timer     = undefined,
   570                   txn_executor        = TxnFun }, hibernate,
   571      {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
   572 
   573 
   574 handle_call({confirmed_broadcast, _Msg}, _From,
   575             State = #state { members_state = undefined }) ->
   576     reply(not_joined, State);
   577 
   578 handle_call({confirmed_broadcast, Msg}, _From,
   579             State = #state { self          = Self,
   580                              right         = {Self, undefined},
   581                              module        = Module,
   582                              callback_args = Args }) ->
   583     handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg),
   584                            ok, State});
   585 
   586 handle_call({confirmed_broadcast, Msg}, From, State) ->
   587     {Result, State1 = #state { pub_count = PubCount, confirms = Confirms }} =
   588         internal_broadcast(Msg, 0, State),
   589     Confirms1 = queue:in({PubCount, From}, Confirms),
   590     handle_callback_result({Result, flush_broadcast_buffer(
   591                                       State1 #state { confirms = Confirms1 })});
   592 
   593 handle_call(info, _From,
   594             State = #state { members_state = undefined }) ->
   595     reply(not_joined, State);
   596 
   597 handle_call(info, _From, State = #state { group_name = GroupName,
   598                                           module     = Module,
   599                                           view       = View }) ->
   600     reply([{group_name,    GroupName},
   601            {module,        Module},
   602            {group_members, get_pids(alive_view_members(View))}], State);
   603 
   604 handle_call({add_on_right, _NewMember}, _From,
   605             State = #state { members_state = undefined }) ->
   606     reply(not_ready, State);
   607 
   608 handle_call({add_on_right, NewMember}, _From,
   609             State = #state { self          = Self,
   610                              group_name    = GroupName,
   611                              members_state = MembersState,
   612                              txn_executor  = TxnFun }) ->
   613     Group = record_new_member_in_group(NewMember, Self, GroupName, TxnFun),
   614     View1 = group_to_view(Group),
   615     MembersState1 = remove_erased_members(MembersState, View1),
   616     ok = send_right(NewMember, View1,
   617                     {catchup, Self, prepare_members_state(MembersState1)}),
   618     {Result, State1} = change_view(View1, State #state {
   619                                             members_state = MembersState1 }),
   620     handle_callback_result({Result, {ok, Group}, State1}).
   621 
   622 %% add_on_right causes a catchup to be sent immediately from the left,
   623 %% so we can never see this from the left neighbour. However, it's
   624 %% possible for the right neighbour to send us a check_neighbours
   625 %% immediately before that. We can't possibly handle it, but if we're
   626 %% in this state we know a catchup is coming imminently anyway. So
   627 %% just ignore it.
   628 handle_cast({?TAG, _ReqVer, check_neighbours},
   629             State = #state { members_state = undefined }) ->
   630     noreply(State);
   631 
   632 handle_cast({?TAG, ReqVer, Msg},
   633             State = #state { view          = View,
   634                              members_state = MembersState,
   635                              group_name    = GroupName }) ->
   636     {Result, State1} =
   637         case needs_view_update(ReqVer, View) of
   638             true  -> View1 = group_to_view(dirty_read_group(GroupName)),
   639                      MemberState1 = remove_erased_members(MembersState, View1),
   640                      change_view(View1, State #state {
   641                                           members_state = MemberState1 });
   642             false -> {ok, State}
   643         end,
   644     handle_callback_result(
   645       if_callback_success(
   646         Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1));
   647 
   648 handle_cast({broadcast, _Msg, _SizeHint},
   649             State = #state { members_state = undefined }) ->
   650     noreply(State);
   651 
   652 handle_cast({broadcast, Msg, _SizeHint},
   653             State = #state { self          = Self,
   654                              right         = {Self, undefined},
   655                              module        = Module,
   656                              callback_args = Args }) ->
   657     handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg),
   658                             State});
   659 
   660 handle_cast({broadcast, Msg, SizeHint}, State) ->
   661     {Result, State1} = internal_broadcast(Msg, SizeHint, State),
   662     handle_callback_result({Result, maybe_flush_broadcast_buffer(State1)});
   663 
   664 handle_cast(join, State = #state { self          = Self,
   665                                    group_name    = GroupName,
   666                                    members_state = undefined,
   667                                    module        = Module,
   668                                    callback_args = Args,
   669                                    txn_executor  = TxnFun }) ->
   670     View = join_group(Self, GroupName, TxnFun),
   671     MembersState =
   672         case alive_view_members(View) of
   673             [Self] -> blank_member_state();
   674             _      -> undefined
   675         end,
   676     State1 = check_neighbours(State #state { view          = View,
   677                                              members_state = MembersState }),
   678     handle_callback_result(
   679       {Module:joined(Args, get_pids(all_known_members(View))), State1});
   680 
   681 handle_cast({validate_members, OldMembers},
   682             State = #state { view          = View,
   683                              module        = Module,
   684                              callback_args = Args }) ->
   685     NewMembers = get_pids(all_known_members(View)),
   686     Births = NewMembers -- OldMembers,
   687     Deaths = OldMembers -- NewMembers,
   688     case {Births, Deaths} of
   689         {[], []}  -> noreply(State);
   690         _         -> Result = Module:members_changed(Args, Births, Deaths),
   691                      handle_callback_result({Result, State})
   692     end;
   693 
   694 handle_cast(leave, State) ->
   695     {stop, normal, State}.
   696 
   697 
   698 handle_info(flush, State) ->
   699     noreply(
   700       flush_broadcast_buffer(State #state { broadcast_timer = undefined }));
   701 
   702 handle_info(timeout, State) ->
   703     noreply(flush_broadcast_buffer(State));
   704 
   705 handle_info({'DOWN', MRef, process, _Pid, Reason},
   706             State = #state { self          = Self,
   707                              left          = Left,
   708                              right         = Right,
   709                              group_name    = GroupName,
   710                              confirms      = Confirms,
   711                              txn_executor  = TxnFun }) ->
   712     Member = case {Left, Right} of
   713                  {{Member1, MRef}, _} -> Member1;
   714                  {_, {Member1, MRef}} -> Member1;
   715                  _                    -> undefined
   716              end,
   717     case {Member, Reason} of
   718         {undefined, _} ->
   719             noreply(State);
   720         {_, {shutdown, ring_shutdown}} ->
   721             noreply(State);
   722         _ ->
   723             %% In the event of a partial partition we could see another member
   724             %% go down and then remove them from Mnesia. While they can
   725             %% recover from this they'd have to restart the queue - not
   726             %% ideal. So let's sleep here briefly just in case this was caused
   727             %% by a partial partition; in which case by the time we record the
   728             %% member death in Mnesia we will probably be in a full
   729             %% partition and will not be assassinating another member.
   730             timer:sleep(100),
   731             View1 = group_to_view(record_dead_member_in_group(
   732                                     Member, GroupName, TxnFun)),
   733             handle_callback_result(
   734               case alive_view_members(View1) of
   735                   [Self] -> maybe_erase_aliases(
   736                               State #state {
   737                                 members_state = blank_member_state(),
   738                                 confirms      = purge_confirms(Confirms) },
   739                               View1);
   740                   _      -> change_view(View1, State)
   741               end)
   742     end.
   743 
   744 
   745 terminate(Reason, State = #state { module        = Module,
   746                                    callback_args = Args }) ->
   747     flush_broadcast_buffer(State),
   748     Module:handle_terminate(Args, Reason).
   749 
   750 
   751 code_change(_OldVsn, State, _Extra) ->
   752     {ok, State}.
   753 
   754 prioritise_info(flush, _Len, _State) ->
   755     1;
   756 %% DOWN messages should not overtake initial catchups; if they do we
   757 %% will receive a DOWN we do not know what to do with.
   758 prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _Len,
   759                 #state { members_state = undefined }) ->
   760     0;
   761 %% We should not prioritise DOWN messages from our left since
   762 %% otherwise the DOWN can overtake any last activity from the left,
   763 %% causing that activity to be lost.
   764 prioritise_info({'DOWN', _MRef, process, LeftPid, _Reason}, _Len,
   765                 #state { left = {{_LeftVer, LeftPid}, _MRef2} }) ->
   766     0;
   767 %% But prioritise all other DOWNs - we want to make sure we are not
   768 %% sending activity into the void for too long because our right is
   769 %% down but we don't know it.
   770 prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _Len, _State) ->
   771     1;
   772 prioritise_info(_, _Len, _State) ->
   773     0.
   774 
   775 
   776 handle_msg(check_neighbours, State) ->
   777     %% no-op - it's already been done by the calling handle_cast
   778     {ok, State};
   779 
   780 handle_msg({catchup, Left, MembersStateLeft},
   781            State = #state { self          = Self,
   782                             left          = {Left, _MRefL},
   783                             right         = {Right, _MRefR},
   784                             view          = View,
   785                             members_state = undefined }) ->
   786     ok = send_right(Right, View, {catchup, Self, MembersStateLeft}),
   787     MembersStateLeft1 = build_members_state(MembersStateLeft),
   788     {ok, State #state { members_state = MembersStateLeft1 }};
   789 
   790 handle_msg({catchup, Left, MembersStateLeft},
   791            State = #state { self = Self,
   792                             left = {Left, _MRefL},
   793                             view = View,
   794                             members_state = MembersState })
   795   when MembersState =/= undefined ->
   796     MembersStateLeft1 = build_members_state(MembersStateLeft),
   797     AllMembers = lists:usort(?DICT:fetch_keys(MembersState) ++
   798                                  ?DICT:fetch_keys(MembersStateLeft1)),
   799     {MembersState1, Activity} =
   800         lists:foldl(
   801           fun (Id, MembersStateActivity) ->
   802                   #member { pending_ack = PALeft, last_ack = LA } =
   803                       find_member_or_blank(Id, MembersStateLeft1),
   804                   with_member_acc(
   805                     fun (#member { pending_ack = PA } = Member, Activity1) ->
   806                             case is_member_alias(Id, Self, View) of
   807                                 true ->
   808                                     {_AcksInFlight, Pubs, _PA1} =
   809                                         find_prefix_common_suffix(PALeft, PA),
   810                                     {Member #member { last_ack = LA },
   811                                      activity_cons(Id, pubs_from_queue(Pubs),
   812                                                    [], Activity1)};
   813                                 false ->
   814                                     {Acks, _Common, Pubs} =
   815                                         find_prefix_common_suffix(PA, PALeft),
   816                                     {Member,
   817                                      activity_cons(Id, pubs_from_queue(Pubs),
   818                                                    acks_from_queue(Acks),
   819                                                    Activity1)}
   820                             end
   821                     end, Id, MembersStateActivity)
   822           end, {MembersState, activity_nil()}, AllMembers),
   823     handle_msg({activity, Left, activity_finalise(Activity)},
   824                State #state { members_state = MembersState1 });
   825 
   826 handle_msg({catchup, _NotLeft, _MembersState}, State) ->
   827     {ok, State};
   828 
   829 handle_msg({activity, Left, Activity},
   830            State = #state { self          = Self,
   831                             left          = {Left, _MRefL},
   832                             view          = View,
   833                             members_state = MembersState,
   834                             confirms      = Confirms })
   835   when MembersState =/= undefined ->
   836     {MembersState1, {Confirms1, Activity1}} =
   837         lists:foldl(
   838           fun ({Id, Pubs, Acks}, MembersStateConfirmsActivity) ->
   839                   with_member_acc(
   840                     fun (Member = #member { pending_ack = PA,
   841                                             last_pub    = LP,
   842                                             last_ack    = LA },
   843                          {Confirms2, Activity2}) ->
   844                             case is_member_alias(Id, Self, View) of
   845                                 true ->
   846                                     {ToAck, PA1} =
   847                                         find_common(queue_from_pubs(Pubs), PA,
   848                                                     queue:new()),
   849                                     LA1 = last_ack(Acks, LA),
   850                                     AckNums = acks_from_queue(ToAck),
   851                                     Confirms3 = maybe_confirm(
   852                                                   Self, Id, Confirms2, AckNums),
   853                                     {Member #member { pending_ack = PA1,
   854                                                       last_ack    = LA1 },
   855                                      {Confirms3,
   856                                       activity_cons(
   857                                         Id, [], AckNums, Activity2)}};
   858                                 false ->
   859                                     PA1 = apply_acks(Acks, join_pubs(PA, Pubs)),
   860                                     LA1 = last_ack(Acks, LA),
   861                                     LP1 = last_pub(Pubs, LP),
   862                                     {Member #member { pending_ack = PA1,
   863                                                       last_pub    = LP1,
   864                                                       last_ack    = LA1 },
   865                                      {Confirms2,
   866                                       activity_cons(Id, Pubs, Acks, Activity2)}}
   867                             end
   868                     end, Id, MembersStateConfirmsActivity)
   869           end, {MembersState, {Confirms, activity_nil()}}, Activity),
   870     State1 = State #state { members_state = MembersState1,
   871                             confirms      = Confirms1 },
   872     Activity3 = activity_finalise(Activity1),
   873     ok = maybe_send_activity(Activity3, State1),
   874     {Result, State2} = maybe_erase_aliases(State1, View),
   875     if_callback_success(
   876       Result, fun activity_true/3, fun activity_false/3, Activity3, State2);
   877 
   878 handle_msg({activity, _NotLeft, _Activity}, State) ->
   879     {ok, State}.
   880 
   881 
   882 noreply(State) ->
   883     {noreply, ensure_broadcast_timer(State), flush_timeout(State)}.
   884 
   885 reply(Reply, State) ->
   886     {reply, Reply, ensure_broadcast_timer(State), flush_timeout(State)}.
   887 
   888 flush_timeout(#state{broadcast_buffer = []}) -> hibernate;
   889 flush_timeout(_)                             -> 0.
   890 
   891 ensure_broadcast_timer(State = #state { broadcast_buffer = [],
   892                                         broadcast_timer  = undefined }) ->
   893     State;
   894 ensure_broadcast_timer(State = #state { broadcast_buffer = [],
   895                                         broadcast_timer  = TRef }) ->
   896     erlang:cancel_timer(TRef),
   897     State #state { broadcast_timer = undefined };
   898 ensure_broadcast_timer(State = #state { broadcast_timer = undefined }) ->
   899     TRef = erlang:send_after(?BROADCAST_TIMER, self(), flush),
   900     State #state { broadcast_timer = TRef };
   901 ensure_broadcast_timer(State) ->
   902     State.
   903 
   904 internal_broadcast(Msg, SizeHint,
   905                    State = #state { self                = Self,
   906                                     pub_count           = PubCount,
   907                                     module              = Module,
   908                                     callback_args       = Args,
   909                                     broadcast_buffer    = Buffer,
   910                                     broadcast_buffer_sz = BufferSize }) ->
   911     PubCount1 = PubCount + 1,
   912     {Module:handle_msg(Args, get_pid(Self), Msg),
   913      State #state { pub_count           = PubCount1,
   914                     broadcast_buffer    = [{PubCount1, Msg} | Buffer],
   915                     broadcast_buffer_sz = BufferSize + SizeHint}}.
   916 
   917 %% The Erlang distribution mechanism has an interesting quirk - it
   918 %% will kill the VM cold with "Absurdly large distribution output data
   919 %% buffer" if you attempt to send a message which serialises out to
   920 %% more than 2^31 bytes in size. It's therefore a very good idea to
   921 %% make sure that we don't exceed that size!
   922 %%
   923 %% Now, we could figure out the size of messages as they come in using
   924 %% size(term_to_binary(Msg)) or similar. The trouble is, that requires
   925 %% us to serialise the message only to throw the serialised form
   926 %% away. Hard to believe that's a sensible thing to do. So instead we
   927 %% accept a size hint from the application, via broadcast/3. This size
   928 %% hint can be the size of anything in the message which we expect
   929 %% could be large, and we just ignore the size of any small bits of
   930 %% the message term. Therefore MAX_BUFFER_SIZE is set somewhat
   931 %% conservatively at 100MB - but the buffer is only to allow us to
   932 %% buffer tiny messages anyway, so 100MB is plenty.
   933 
   934 maybe_flush_broadcast_buffer(State = #state{broadcast_buffer_sz = Size}) ->
   935     case Size > ?MAX_BUFFER_SIZE of
   936         true  -> flush_broadcast_buffer(State);
   937         false -> State
   938     end.
   939 
   940 flush_broadcast_buffer(State = #state { broadcast_buffer = [] }) ->
   941     State;
   942 flush_broadcast_buffer(State = #state { self             = Self,
   943                                         members_state    = MembersState,
   944                                         broadcast_buffer = Buffer,
   945                                         pub_count        = PubCount }) ->
   946     [{PubCount, _Msg}|_] = Buffer, %% ASSERTION match on PubCount
   947     Pubs = lists:reverse(Buffer),
   948     Activity = activity_cons(Self, Pubs, [], activity_nil()),
   949     ok = maybe_send_activity(activity_finalise(Activity), State),
   950     MembersState1 = with_member(
   951                       fun (Member = #member { pending_ack = PA }) ->
   952                               PA1 = queue:join(PA, queue:from_list(Pubs)),
   953                               Member #member { pending_ack = PA1,
   954                                                last_pub = PubCount }
   955                       end, Self, MembersState),
   956     State #state { members_state       = MembersState1,
   957                    broadcast_buffer    = [],
   958                    broadcast_buffer_sz = 0}.
   959 
   960 
   961 %% ---------------------------------------------------------------------------
   962 %% View construction and inspection
   963 %% ---------------------------------------------------------------------------
   964 
   965 needs_view_update(ReqVer, {Ver, _View}) -> Ver < ReqVer.
   966 
   967 view_version({Ver, _View}) -> Ver.
   968 
   969 is_member_alive({dead, _Member}) -> false;
   970 is_member_alive(_)               -> true.
   971 
   972 is_member_alias(Self, Self, _View) ->
   973     true;
   974 is_member_alias(Member, Self, View) ->
   975     ?SETS:is_element(Member,
   976                      ((fetch_view_member(Self, View)) #view_member.aliases)).
   977 
   978 dead_member_id({dead, Member}) -> Member.
   979 
   980 store_view_member(VMember = #view_member { id = Id }, {Ver, View}) ->
   981     {Ver, ?DICT:store(Id, VMember, View)}.
   982 
   983 with_view_member(Fun, View, Id) ->
   984     store_view_member(Fun(fetch_view_member(Id, View)), View).
   985 
   986 fetch_view_member(Id, {_Ver, View}) -> ?DICT:fetch(Id, View).
   987 
   988 find_view_member(Id, {_Ver, View}) -> ?DICT:find(Id, View).
   989 
   990 blank_view(Ver) -> {Ver, ?DICT:new()}.
   991 
   992 alive_view_members({_Ver, View}) -> ?DICT:fetch_keys(View).
   993 
   994 all_known_members({_Ver, View}) ->
   995     ?DICT:fold(
   996        fun (Member, #view_member { aliases = Aliases }, Acc) ->
   997                ?SETS:to_list(Aliases) ++ [Member | Acc]
   998        end, [], View).
   999 
  1000 group_to_view(#gm_group { members = Members, version = Ver }) ->
  1001     Alive = lists:filter(fun is_member_alive/1, Members),
  1002     [_|_] = Alive, %% ASSERTION - can't have all dead members
  1003     add_aliases(link_view(Alive ++ Alive ++ Alive, blank_view(Ver)), Members).
  1004 
  1005 link_view([Left, Middle, Right | Rest], View) ->
  1006     case find_view_member(Middle, View) of
  1007         error ->
  1008             link_view(
  1009               [Middle, Right | Rest],
  1010               store_view_member(#view_member { id      = Middle,
  1011                                                aliases = ?SETS:new(),
  1012                                                left    = Left,
  1013                                                right   = Right }, View));
  1014         {ok, _} ->
  1015             View
  1016     end;
  1017 link_view(_, View) ->
  1018     View.
  1019 
  1020 add_aliases(View, Members) ->
  1021     Members1 = ensure_alive_suffix(Members),
  1022     {EmptyDeadSet, View1} =
  1023         lists:foldl(
  1024           fun (Member, {DeadAcc, ViewAcc}) ->
  1025                   case is_member_alive(Member) of
  1026                       true ->
  1027                           {?SETS:new(),
  1028                            with_view_member(
  1029                              fun (VMember =
  1030                                       #view_member { aliases = Aliases }) ->
  1031                                      VMember #view_member {
  1032                                        aliases = ?SETS:union(Aliases, DeadAcc) }
  1033                              end, ViewAcc, Member)};
  1034                       false ->
  1035                           {?SETS:add_element(dead_member_id(Member), DeadAcc),
  1036                            ViewAcc}
  1037                   end
  1038           end, {?SETS:new(), View}, Members1),
  1039     0 = ?SETS:size(EmptyDeadSet), %% ASSERTION
  1040     View1.
  1041 
  1042 ensure_alive_suffix(Members) ->
  1043     queue:to_list(ensure_alive_suffix1(queue:from_list(Members))).
  1044 
  1045 ensure_alive_suffix1(MembersQ) ->
  1046     {{value, Member}, MembersQ1} = queue:out_r(MembersQ),
  1047     case is_member_alive(Member) of
  1048         true  -> MembersQ;
  1049         false -> ensure_alive_suffix1(queue:in_r(Member, MembersQ1))
  1050     end.
  1051 
  1052 
  1053 %% ---------------------------------------------------------------------------
  1054 %% View modification
  1055 %% ---------------------------------------------------------------------------
  1056 
  1057 join_group(Self, GroupName, TxnFun) ->
  1058     join_group(Self, GroupName, dirty_read_group(GroupName), TxnFun).
  1059 
  1060 join_group(Self, GroupName, {error, not_found}, TxnFun) ->
  1061     join_group(Self, GroupName,
  1062                prune_or_create_group(Self, GroupName, TxnFun), TxnFun);
  1063 join_group(Self, _GroupName, #gm_group { members = [Self] } = Group, _TxnFun) ->
  1064     group_to_view(Group);
  1065 join_group(Self, GroupName, #gm_group { members = Members } = Group, TxnFun) ->
  1066     case lists:member(Self, Members) of
  1067         true ->
  1068             group_to_view(Group);
  1069         false ->
  1070             case lists:filter(fun is_member_alive/1, Members) of
  1071                 [] ->
  1072                     join_group(Self, GroupName,
  1073                                prune_or_create_group(Self, GroupName, TxnFun),
  1074                                TxnFun);
  1075                 Alive ->
  1076                     Left = lists:nth(random:uniform(length(Alive)), Alive),
  1077                     Handler =
  1078                         fun () ->
  1079                                 join_group(
  1080                                   Self, GroupName,
  1081                                   record_dead_member_in_group(
  1082                                     Left, GroupName, TxnFun),
  1083                                   TxnFun)
  1084                         end,
  1085                     try
  1086                         case neighbour_call(Left, {add_on_right, Self}) of
  1087                             {ok, Group1} -> group_to_view(Group1);
  1088                             not_ready    -> join_group(Self, GroupName, TxnFun)
  1089                         end
  1090                     catch
  1091                         exit:{R, _}
  1092                           when R =:= noproc; R =:= normal; R =:= shutdown ->
  1093                             Handler();
  1094                         exit:{{R, _}, _}
  1095                           when R =:= nodedown; R =:= shutdown ->
  1096                             Handler()
  1097                     end
  1098             end
  1099     end.
  1100 
  1101 dirty_read_group(GroupName) ->
  1102     case mnesia:dirty_read(?GROUP_TABLE, GroupName) of
  1103         []      -> {error, not_found};
  1104         [Group] -> Group
  1105     end.
  1106 
  1107 read_group(GroupName) ->
  1108     case mnesia:read({?GROUP_TABLE, GroupName}) of
  1109         []      -> {error, not_found};
  1110         [Group] -> Group
  1111     end.
  1112 
  1113 write_group(Group) -> mnesia:write(?GROUP_TABLE, Group, write), Group.
  1114 
  1115 prune_or_create_group(Self, GroupName, TxnFun) ->
  1116     TxnFun(
  1117       fun () ->
  1118               GroupNew = #gm_group { name    = GroupName,
  1119                                      members = [Self],
  1120                                      version = get_version(Self) },
  1121               case read_group(GroupName) of
  1122                   {error, not_found} ->
  1123                       write_group(GroupNew);
  1124                   Group = #gm_group { members = Members } ->
  1125                       case lists:any(fun is_member_alive/1, Members) of
  1126                           true  -> Group;
  1127                           false -> write_group(GroupNew)
  1128                       end
  1129               end
  1130       end).
  1131 
  1132 record_dead_member_in_group(Member, GroupName, TxnFun) ->
  1133     TxnFun(
  1134       fun () ->
  1135               Group = #gm_group { members = Members, version = Ver } =
  1136                   read_group(GroupName),
  1137               case lists:splitwith(
  1138                      fun (Member1) -> Member1 =/= Member end, Members) of
  1139                   {_Members1, []} -> %% not found - already recorded dead
  1140                       Group;
  1141                   {Members1, [Member | Members2]} ->
  1142                       Members3 = Members1 ++ [{dead, Member} | Members2],
  1143                       write_group(Group #gm_group { members = Members3,
  1144                                                     version = Ver + 1 })
  1145               end
  1146       end).
  1147 
  1148 record_new_member_in_group(NewMember, Left, GroupName, TxnFun) ->
  1149     TxnFun(
  1150       fun () ->
  1151               Group = #gm_group { members = Members, version = Ver } =
  1152                   read_group(GroupName),
  1153               {Prefix, [Left | Suffix]} =
  1154                   lists:splitwith(fun (M) -> M =/= Left end, Members),
  1155               write_group(Group #gm_group {
  1156                             members = Prefix ++ [Left, NewMember | Suffix],
  1157                             version = Ver + 1 })
  1158       end).
  1159 
  1160 erase_members_in_group(Members, GroupName, TxnFun) ->
  1161     DeadMembers = [{dead, Id} || Id <- Members],
  1162     TxnFun(
  1163       fun () ->
  1164               Group = #gm_group { members = [_|_] = Members1, version = Ver } =
  1165                   read_group(GroupName),
  1166               case Members1 -- DeadMembers of
  1167                   Members1 -> Group;
  1168                   Members2 -> write_group(
  1169                                 Group #gm_group { members = Members2,
  1170                                                   version = Ver + 1 })
  1171               end
  1172       end).
  1173 
  1174 maybe_erase_aliases(State = #state { self          = Self,
  1175                                      group_name    = GroupName,
  1176                                      members_state = MembersState,
  1177                                      txn_executor  = TxnFun }, View) ->
  1178     #view_member { aliases = Aliases } = fetch_view_member(Self, View),
  1179     {Erasable, MembersState1}
  1180         = ?SETS:fold(
  1181              fun (Id, {ErasableAcc, MembersStateAcc} = Acc) ->
  1182                      #member { last_pub = LP, last_ack = LA } =
  1183                          find_member_or_blank(Id, MembersState),
  1184                      case can_erase_view_member(Self, Id, LA, LP) of
  1185                          true  -> {[Id | ErasableAcc],
  1186                                    erase_member(Id, MembersStateAcc)};
  1187                          false -> Acc
  1188                      end
  1189              end, {[], MembersState}, Aliases),
  1190     View1 = case Erasable of
  1191                 [] -> View;
  1192                 _  -> group_to_view(
  1193                         erase_members_in_group(Erasable, GroupName, TxnFun))
  1194             end,
  1195     change_view(View1, State #state { members_state = MembersState1 }).
  1196 
  1197 can_erase_view_member(Self, Self, _LA, _LP) -> false;
  1198 can_erase_view_member(_Self, _Id,   N,   N) -> true;
  1199 can_erase_view_member(_Self, _Id, _LA, _LP) -> false.
  1200 
  1201 neighbour_cast(N, Msg) -> ?INSTR_MOD:cast(get_pid(N), Msg).
  1202 neighbour_call(N, Msg) -> ?INSTR_MOD:call(get_pid(N), Msg, infinity).
  1203 
  1204 %% ---------------------------------------------------------------------------
  1205 %% View monitoring and maintanence
  1206 %% ---------------------------------------------------------------------------
  1207 
  1208 ensure_neighbour(_Ver, Self, {Self, undefined}, Self) ->
  1209     {Self, undefined};
  1210 ensure_neighbour(Ver, Self, {Self, undefined}, RealNeighbour) ->
  1211     ok = neighbour_cast(RealNeighbour, {?TAG, Ver, check_neighbours}),
  1212     {RealNeighbour, maybe_monitor(RealNeighbour, Self)};
  1213 ensure_neighbour(_Ver, _Self, {RealNeighbour, MRef}, RealNeighbour) ->
  1214     {RealNeighbour, MRef};
  1215 ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) ->
  1216     true = ?INSTR_MOD:demonitor(MRef),
  1217     Msg = {?TAG, Ver, check_neighbours},
  1218     ok = neighbour_cast(RealNeighbour, Msg),
  1219     ok = case Neighbour of
  1220              Self -> ok;
  1221              _    -> neighbour_cast(Neighbour, Msg)
  1222          end,
  1223     {Neighbour, maybe_monitor(Neighbour, Self)}.
  1224 
  1225 maybe_monitor( Self,  Self) -> undefined;
  1226 maybe_monitor(Other, _Self) -> ?INSTR_MOD:monitor(get_pid(Other)).
  1227 
  1228 check_neighbours(State = #state { self             = Self,
  1229                                   left             = Left,
  1230                                   right            = Right,
  1231                                   view             = View,
  1232                                   broadcast_buffer = Buffer }) ->
  1233     #view_member { left = VLeft, right = VRight }
  1234         = fetch_view_member(Self, View),
  1235     Ver = view_version(View),
  1236     Left1 = ensure_neighbour(Ver, Self, Left, VLeft),
  1237     Right1 = ensure_neighbour(Ver, Self, Right, VRight),
  1238     Buffer1 = case Right1 of
  1239                   {Self, undefined} -> [];
  1240                   _                 -> Buffer
  1241               end,
  1242     State1 = State #state { left = Left1, right = Right1,
  1243                             broadcast_buffer = Buffer1 },
  1244     ok = maybe_send_catchup(Right, State1),
  1245     State1.
  1246 
  1247 maybe_send_catchup(Right, #state { right = Right }) ->
  1248     ok;
  1249 maybe_send_catchup(_Right, #state { self  = Self,
  1250                                     right = {Self, undefined} }) ->
  1251     ok;
  1252 maybe_send_catchup(_Right, #state { members_state = undefined }) ->
  1253     ok;
  1254 maybe_send_catchup(_Right, #state { self          = Self,
  1255                                     right         = {Right, _MRef},
  1256                                     view          = View,
  1257                                     members_state = MembersState }) ->
  1258     send_right(Right, View,
  1259                {catchup, Self, prepare_members_state(MembersState)}).
  1260 
  1261 
  1262 %% ---------------------------------------------------------------------------
  1263 %% Catch_up delta detection
  1264 %% ---------------------------------------------------------------------------
  1265 
  1266 find_prefix_common_suffix(A, B) ->
  1267     {Prefix, A1} = find_prefix(A, B, queue:new()),
  1268     {Common, Suffix} = find_common(A1, B, queue:new()),
  1269     {Prefix, Common, Suffix}.
  1270 
  1271 %% Returns the elements of A that occur before the first element of B,
  1272 %% plus the remainder of A.
  1273 find_prefix(A, B, Prefix) ->
  1274     case {queue:out(A), queue:out(B)} of
  1275         {{{value, Val}, _A1}, {{value, Val}, _B1}} ->
  1276             {Prefix, A};
  1277         {{empty, A1}, {{value, _A}, _B1}} ->
  1278             {Prefix, A1};
  1279         {{{value, {NumA, _MsgA} = Val}, A1},
  1280          {{value, {NumB, _MsgB}}, _B1}} when NumA < NumB ->
  1281             find_prefix(A1, B, queue:in(Val, Prefix));
  1282         {_, {empty, _B1}} ->
  1283             {A, Prefix} %% Prefix well be empty here
  1284     end.
  1285 
  1286 %% A should be a prefix of B. Returns the commonality plus the
  1287 %% remainder of B.
  1288 find_common(A, B, Common) ->
  1289     case {queue:out(A), queue:out(B)} of
  1290         {{{value, Val}, A1}, {{value, Val}, B1}} ->
  1291             find_common(A1, B1, queue:in(Val, Common));
  1292         {{empty, _A}, _} ->
  1293             {Common, B}
  1294     end.
  1295 
  1296 
  1297 %% ---------------------------------------------------------------------------
  1298 %% Members helpers
  1299 %% ---------------------------------------------------------------------------
  1300 
  1301 with_member(Fun, Id, MembersState) ->
  1302     store_member(
  1303       Id, Fun(find_member_or_blank(Id, MembersState)), MembersState).
  1304 
  1305 with_member_acc(Fun, Id, {MembersState, Acc}) ->
  1306     {MemberState, Acc1} = Fun(find_member_or_blank(Id, MembersState), Acc),
  1307     {store_member(Id, MemberState, MembersState), Acc1}.
  1308 
  1309 find_member_or_blank(Id, MembersState) ->
  1310     case ?DICT:find(Id, MembersState) of
  1311         {ok, Result} -> Result;
  1312         error        -> blank_member()
  1313     end.
  1314 
  1315 erase_member(Id, MembersState) -> ?DICT:erase(Id, MembersState).
  1316 
  1317 blank_member() ->
  1318     #member { pending_ack = queue:new(), last_pub = -1, last_ack = -1 }.
  1319 
  1320 blank_member_state() -> ?DICT:new().
  1321 
  1322 store_member(Id, MemberState, MembersState) ->
  1323     ?DICT:store(Id, MemberState, MembersState).
  1324 
  1325 prepare_members_state(MembersState) -> ?DICT:to_list(MembersState).
  1326 
  1327 build_members_state(MembersStateList) -> ?DICT:from_list(MembersStateList).
  1328 
  1329 make_member(GroupName) ->
  1330    {case dirty_read_group(GroupName) of
  1331         #gm_group { version = Version } -> Version;
  1332         {error, not_found}              -> ?VERSION_START
  1333     end, self()}.
  1334 
  1335 remove_erased_members(MembersState, View) ->
  1336     lists:foldl(fun (Id, MembersState1) ->
  1337                     store_member(Id, find_member_or_blank(Id, MembersState),
  1338                                  MembersState1)
  1339                 end, blank_member_state(), all_known_members(View)).
  1340 
  1341 get_version({Version, _Pid}) -> Version.
  1342 
  1343 get_pid({_Version, Pid}) -> Pid.
  1344 
  1345 get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids].
  1346 
  1347 %% ---------------------------------------------------------------------------
  1348 %% Activity assembly
  1349 %% ---------------------------------------------------------------------------
  1350 
  1351 activity_nil() -> queue:new().
  1352 
  1353 activity_cons(   _Id,   [],   [], Tail) -> Tail;
  1354 activity_cons(Sender, Pubs, Acks, Tail) -> queue:in({Sender, Pubs, Acks}, Tail).
  1355 
  1356 activity_finalise(Activity) -> queue:to_list(Activity).
  1357 
  1358 maybe_send_activity([], _State) ->
  1359     ok;
  1360 maybe_send_activity(Activity, #state { self  = Self,
  1361                                        right = {Right, _MRefR},
  1362                                        view  = View }) ->
  1363     send_right(Right, View, {activity, Self, Activity}).
  1364 
  1365 send_right(Right, View, Msg) ->
  1366     ok = neighbour_cast(Right, {?TAG, view_version(View), Msg}).
  1367 
  1368 callback(Args, Module, Activity) ->
  1369     Result =
  1370       lists:foldl(
  1371         fun ({Id, Pubs, _Acks}, {Args1, Module1, ok}) ->
  1372                 lists:foldl(fun ({_PubNum, Pub}, Acc = {Args2, Module2, ok}) ->
  1373                                     case Module2:handle_msg(
  1374                                            Args2, get_pid(Id), Pub) of
  1375                                         ok ->
  1376                                             Acc;
  1377                                         {become, Module3, Args3} ->
  1378                                             {Args3, Module3, ok};
  1379                                         {stop, _Reason} = Error ->
  1380                                             Error
  1381                                     end;
  1382                                 (_, Error = {stop, _Reason}) ->
  1383                                     Error
  1384                             end, {Args1, Module1, ok}, Pubs);
  1385             (_, Error = {stop, _Reason}) ->
  1386                 Error
  1387         end, {Args, Module, ok}, Activity),
  1388     case Result of
  1389         {Args, Module, ok}      -> ok;
  1390         {Args1, Module1, ok}    -> {become, Module1, Args1};
  1391         {stop, _Reason} = Error -> Error
  1392     end.
  1393 
  1394 change_view(View, State = #state { view          = View0,
  1395                                    module        = Module,
  1396                                    callback_args = Args }) ->
  1397     OldMembers = all_known_members(View0),
  1398     NewMembers = all_known_members(View),
  1399     Births = NewMembers -- OldMembers,
  1400     Deaths = OldMembers -- NewMembers,
  1401     Result = case {Births, Deaths} of
  1402                  {[], []} -> ok;
  1403                  _        -> Module:members_changed(
  1404                                Args, get_pids(Births), get_pids(Deaths))
  1405              end,
  1406     {Result, check_neighbours(State #state { view = View })}.
  1407 
  1408 handle_callback_result({Result, State}) ->
  1409     if_callback_success(
  1410       Result, fun no_reply_true/3, fun no_reply_false/3, undefined, State);
  1411 handle_callback_result({Result, Reply, State}) ->
  1412     if_callback_success(
  1413       Result, fun reply_true/3, fun reply_false/3, Reply, State).
  1414 
  1415 no_reply_true (_Result,        _Undefined, State) -> noreply(State).
  1416 no_reply_false({stop, Reason}, _Undefined, State) -> {stop, Reason, State}.
  1417 
  1418 reply_true (_Result,        Reply, State) -> reply(Reply, State).
  1419 reply_false({stop, Reason}, Reply, State) -> {stop, Reason, Reply, State}.
  1420 
  1421 handle_msg_true (_Result, Msg, State) -> handle_msg(Msg, State).
  1422 handle_msg_false(Result, _Msg, State) -> {Result, State}.
  1423 
  1424 activity_true(_Result, Activity, State = #state { module        = Module,
  1425                                                   callback_args = Args }) ->
  1426     {callback(Args, Module, Activity), State}.
  1427 activity_false(Result, _Activity, State) ->
  1428     {Result, State}.
  1429 
  1430 if_callback_success(ok, True, _False, Arg, State) ->
  1431     True(ok, Arg, State);
  1432 if_callback_success(
  1433   {become, Module, Args} = Result, True, _False, Arg, State) ->
  1434     True(Result, Arg, State #state { module        = Module,
  1435                                      callback_args = Args });
  1436 if_callback_success({stop, _Reason} = Result, _True, False, Arg, State) ->
  1437     False(Result, Arg, State).
  1438 
  1439 maybe_confirm(_Self, _Id, Confirms, []) ->
  1440     Confirms;
  1441 maybe_confirm(Self, Self, Confirms, [PubNum | PubNums]) ->
  1442     case queue:out(Confirms) of
  1443         {empty, _Confirms} ->
  1444             Confirms;
  1445         {{value, {PubNum, From}}, Confirms1} ->
  1446             gen_server2:reply(From, ok),
  1447             maybe_confirm(Self, Self, Confirms1, PubNums);
  1448         {{value, {PubNum1, _From}}, _Confirms} when PubNum1 > PubNum ->
  1449             maybe_confirm(Self, Self, Confirms, PubNums)
  1450     end;
  1451 maybe_confirm(_Self, _Id, Confirms, _PubNums) ->
  1452     Confirms.
  1453 
  1454 purge_confirms(Confirms) ->
  1455     [gen_server2:reply(From, ok) || {_PubNum, From} <- queue:to_list(Confirms)],
  1456     queue:new().
  1457 
  1458 
  1459 %% ---------------------------------------------------------------------------
  1460 %% Msg transformation
  1461 %% ---------------------------------------------------------------------------
  1462 
  1463 acks_from_queue(Q) -> [PubNum || {PubNum, _Msg} <- queue:to_list(Q)].
  1464 
  1465 pubs_from_queue(Q) -> queue:to_list(Q).
  1466 
  1467 queue_from_pubs(Pubs) -> queue:from_list(Pubs).
  1468 
  1469 apply_acks(  [], Pubs) -> Pubs;
  1470 apply_acks(List, Pubs) -> {_, Pubs1} = queue:split(length(List), Pubs),
  1471                           Pubs1.
  1472 
  1473 join_pubs(Q, [])   -> Q;
  1474 join_pubs(Q, Pubs) -> queue:join(Q, queue_from_pubs(Pubs)).
  1475 
  1476 last_ack(  [], LA) -> LA;
  1477 last_ack(List, LA) -> LA1 = lists:last(List),
  1478                       true = LA1 > LA, %% ASSERTION
  1479                       LA1.
  1480 
  1481 last_pub(  [], LP) -> LP;
  1482 last_pub(List, LP) -> {PubNum, _Msg} = lists:last(List),
  1483                       true = PubNum > LP, %% ASSERTION
  1484                       PubNum.
  1485 
  1486 %% ---------------------------------------------------------------------------
  1487 
  1488 %% Uninstrumented versions
  1489 
  1490 call(Pid, Msg, Timeout) -> gen_server2:call(Pid, Msg, Timeout).
  1491 cast(Pid, Msg)          -> gen_server2:cast(Pid, Msg).
  1492 monitor(Pid)            -> erlang:monitor(process, Pid).
  1493 demonitor(MRef)         -> erlang:demonitor(MRef).