src/rabbit_variable_queue.erl
author Matthias Radestock <matthias@rabbitmq.com>
Wed May 16 17:39:07 2012 +0100 (11 hours ago)
changeset 9570 2fb4318aa9ee
parent 9472 ae93574c2f74
permissions -rw-r--r--
start supervisor children as, er, supervisors, not workers
     1 %% The contents of this file are subject to the Mozilla Public License
     2 %% Version 1.1 (the "License"); you may not use this file except in
     3 %% compliance with the License. You may obtain a copy of the License
     4 %% at http://www.mozilla.org/MPL/
     5 %%
     6 %% Software distributed under the License is distributed on an "AS IS"
     7 %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
     8 %% the License for the specific language governing rights and
     9 %% limitations under the License.
    10 %%
    11 %% The Original Code is RabbitMQ.
    12 %%
    13 %% The Initial Developer of the Original Code is VMware, Inc.
    14 %% Copyright (c) 2007-2012 VMware, Inc.  All rights reserved.
    15 %%
    16 
    17 -module(rabbit_variable_queue).
    18 
    19 -export([init/3, terminate/2, delete_and_terminate/2, purge/1,
    20          publish/4, publish_delivered/5, drain_confirmed/1,
    21          dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1,
    22          set_ram_duration_target/2, ram_duration/1, needs_timeout/1,
    23          timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
    24          is_duplicate/2, discard/3, multiple_routing_keys/0, fold/3]).
    25 
    26 -export([start/1, stop/0]).
    27 
    28 %% exported for testing only
    29 -export([start_msg_store/2, stop_msg_store/0, init/5]).
    30 
    31 %%----------------------------------------------------------------------------
    32 %% Definitions:
    33 
    34 %% alpha: this is a message where both the message itself, and its
    35 %%        position within the queue are held in RAM
    36 %%
    37 %% beta: this is a message where the message itself is only held on
    38 %%        disk, but its position within the queue is held in RAM.
    39 %%
    40 %% gamma: this is a message where the message itself is only held on
    41 %%        disk, but its position is both in RAM and on disk.
    42 %%
    43 %% delta: this is a collection of messages, represented by a single
    44 %%        term, where the messages and their position are only held on
    45 %%        disk.
    46 %%
    47 %% Note that for persistent messages, the message and its position
    48 %% within the queue are always held on disk, *in addition* to being in
    49 %% one of the above classifications.
    50 %%
    51 %% Also note that within this code, the term gamma seldom
    52 %% appears. It's frequently the case that gammas are defined by betas
    53 %% who have had their queue position recorded on disk.
    54 %%
    55 %% In general, messages move q1 -> q2 -> delta -> q3 -> q4, though
    56 %% many of these steps are frequently skipped. q1 and q4 only hold
    57 %% alphas, q2 and q3 hold both betas and gammas. When a message
    58 %% arrives, its classification is determined. It is then added to the
    59 %% rightmost appropriate queue.
    60 %%
    61 %% If a new message is determined to be a beta or gamma, q1 is
    62 %% empty. If a new message is determined to be a delta, q1 and q2 are
    63 %% empty (and actually q4 too).
    64 %%
    65 %% When removing messages from a queue, if q4 is empty then q3 is read
    66 %% directly. If q3 becomes empty then the next segment's worth of
    67 %% messages from delta are read into q3, reducing the size of
    68 %% delta. If the queue is non empty, either q4 or q3 contain
    69 %% entries. It is never permitted for delta to hold all the messages
    70 %% in the queue.
    71 %%
    72 %% The duration indicated to us by the memory_monitor is used to
    73 %% calculate, given our current ingress and egress rates, how many
    74 %% messages we should hold in RAM (i.e. as alphas). We track the
    75 %% ingress and egress rates for both messages and pending acks and
    76 %% rates for both are considered when calculating the number of
    77 %% messages to hold in RAM. When we need to push alphas to betas or
    78 %% betas to gammas, we favour writing out messages that are further
    79 %% from the head of the queue. This minimises writes to disk, as the
    80 %% messages closer to the tail of the queue stay in the queue for
    81 %% longer, thus do not need to be replaced as quickly by sending other
    82 %% messages to disk.
    83 %%
    84 %% Whilst messages are pushed to disk and forgotten from RAM as soon
    85 %% as requested by a new setting of the queue RAM duration, the
    86 %% inverse is not true: we only load messages back into RAM as
    87 %% demanded as the queue is read from. Thus only publishes to the
    88 %% queue will take up available spare capacity.
    89 %%
    90 %% When we report our duration to the memory monitor, we calculate
    91 %% average ingress and egress rates over the last two samples, and
    92 %% then calculate our duration based on the sum of the ingress and
    93 %% egress rates. More than two samples could be used, but it's a
    94 %% balance between responding quickly enough to changes in
    95 %% producers/consumers versus ignoring temporary blips. The problem
    96 %% with temporary blips is that with just a few queues, they can have
    97 %% substantial impact on the calculation of the average duration and
    98 %% hence cause unnecessary I/O. Another alternative is to increase the
    99 %% amqqueue_process:RAM_DURATION_UPDATE_PERIOD to beyond 5
   100 %% seconds. However, that then runs the risk of being too slow to
   101 %% inform the memory monitor of changes. Thus a 5 second interval,
   102 %% plus a rolling average over the last two samples seems to work
   103 %% well in practice.
   104 %%
   105 %% The sum of the ingress and egress rates is used because the egress
   106 %% rate alone is not sufficient. Adding in the ingress rate means that
   107 %% queues which are being flooded by messages are given more memory,
   108 %% resulting in them being able to process the messages faster (by
   109 %% doing less I/O, or at least deferring it) and thus helping keep
   110 %% their mailboxes empty and thus the queue as a whole is more
   111 %% responsive. If such a queue also has fast but previously idle
   112 %% consumers, the consumer can then start to be driven as fast as it
   113 %% can go, whereas if only egress rate was being used, the incoming
   114 %% messages may have to be written to disk and then read back in,
   115 %% resulting in the hard disk being a bottleneck in driving the
   116 %% consumers. Generally, we want to give Rabbit every chance of
   117 %% getting rid of messages as fast as possible and remaining
   118 %% responsive, and using only the egress rate impacts that goal.
   119 %%
   120 %% Once the queue has more alphas than the target_ram_count, the
   121 %% surplus must be converted to betas, if not gammas, if not rolled
   122 %% into delta. The conditions under which these transitions occur
   123 %% reflect the conflicting goals of minimising RAM cost per msg, and
   124 %% minimising CPU cost per msg. Once the msg has become a beta, its
   125 %% payload is no longer in RAM, thus a read from the msg_store must
   126 %% occur before the msg can be delivered, but the RAM cost of a beta
   127 %% is the same as a gamma, so converting a beta to gamma will not free
   128 %% up any further RAM. To reduce the RAM cost further, the gamma must
   129 %% be rolled into delta. Whilst recovering a beta or a gamma to an
   130 %% alpha requires only one disk read (from the msg_store), recovering
   131 %% a msg from within delta will require two reads (queue_index and
   132 %% then msg_store). But delta has a near-0 per-msg RAM cost. So the
   133 %% conflict is between using delta more, which will free up more
   134 %% memory, but require additional CPU and disk ops, versus using delta
   135 %% less and gammas and betas more, which will cost more memory, but
   136 %% require fewer disk ops and less CPU overhead.
   137 %%
   138 %% In the case of a persistent msg published to a durable queue, the
   139 %% msg is immediately written to the msg_store and queue_index. If
   140 %% then additionally converted from an alpha, it'll immediately go to
   141 %% a gamma (as it's already in queue_index), and cannot exist as a
   142 %% beta. Thus a durable queue with a mixture of persistent and
   143 %% transient msgs in it which has more messages than permitted by the
   144 %% target_ram_count may contain an interspersed mixture of betas and
   145 %% gammas in q2 and q3.
   146 %%
   147 %% There is then a ratio that controls how many betas and gammas there
   148 %% can be. This is based on the target_ram_count and thus expresses
   149 %% the fact that as the number of permitted alphas in the queue falls,
   150 %% so should the number of betas and gammas fall (i.e. delta
   151 %% grows). If q2 and q3 contain more than the permitted number of
   152 %% betas and gammas, then the surplus are forcibly converted to gammas
   153 %% (as necessary) and then rolled into delta. The ratio is that
   154 %% delta/(betas+gammas+delta) equals
   155 %% (betas+gammas+delta)/(target_ram_count+betas+gammas+delta). I.e. as
   156 %% the target_ram_count shrinks to 0, so must betas and gammas.
   157 %%
   158 %% The conversion of betas to gammas is done in batches of exactly
   159 %% ?IO_BATCH_SIZE. This value should not be too small, otherwise the
   160 %% frequent operations on the queues of q2 and q3 will not be
   161 %% effectively amortised (switching the direction of queue access
   162 %% defeats amortisation), nor should it be too big, otherwise
   163 %% converting a batch stalls the queue for too long. Therefore, it
   164 %% must be just right.
   165 %%
   166 %% The conversion from alphas to betas is also chunked, but only to
   167 %% ensure no more than ?IO_BATCH_SIZE alphas are converted to betas at
   168 %% any one time. This further smooths the effects of changes to the
   169 %% target_ram_count and ensures the queue remains responsive
   170 %% even when there is a large amount of IO work to do. The
   171 %% timeout callback is utilised to ensure that conversions are
   172 %% done as promptly as possible whilst ensuring the queue remains
   173 %% responsive.
   174 %%
   175 %% In the queue we keep track of both messages that are pending
   176 %% delivery and messages that are pending acks. In the event of a
   177 %% queue purge, we only need to load qi segments if the queue has
   178 %% elements in deltas (i.e. it came under significant memory
   179 %% pressure). In the event of a queue deletion, in addition to the
   180 %% preceding, by keeping track of pending acks in RAM, we do not need
   181 %% to search through qi segments looking for messages that are yet to
   182 %% be acknowledged.
   183 %%
   184 %% Pending acks are recorded in memory by storing the message itself.
   185 %% If the message has been sent to disk, we do not store the message
   186 %% content. During memory reduction, pending acks containing message
   187 %% content have that content removed and the corresponding messages
   188 %% are pushed out to disk.
   189 %%
   190 %% Messages from pending acks are returned to q4, q3 and delta during
   191 %% requeue, based on the limits of seq_id contained in each. Requeued
   192 %% messages retain their original seq_id, maintaining order
   193 %% when requeued.
   194 %%
   195 %% The order in which alphas are pushed to betas and pending acks
   196 %% are pushed to disk is determined dynamically. We always prefer to
   197 %% push messages for the source (alphas or acks) that is growing the
   198 %% fastest (with growth measured as avg. ingress - avg. egress). In
   199 %% each round of memory reduction a chunk of messages at most
   200 %% ?IO_BATCH_SIZE in size is allocated to be pushed to disk. The
   201 %% fastest growing source will be reduced by as much of this chunk as
   202 %% possible. If there is any remaining allocation in the chunk after
   203 %% the first source has been reduced to zero, the second source will
   204 %% be reduced by as much of the remaining chunk as possible.
   205 %%
   206 %% Notes on Clean Shutdown
   207 %% (This documents behaviour in variable_queue, queue_index and
   208 %% msg_store.)
   209 %%
   210 %% In order to try to achieve as fast a start-up as possible, if a
   211 %% clean shutdown occurs, we try to save out state to disk to reduce
   212 %% work on startup. In the msg_store this takes the form of the
   213 %% index_module's state, plus the file_summary ets table, and client
   214 %% refs. In the VQ, this takes the form of the count of persistent
   215 %% messages in the queue and references into the msg_stores. The
   216 %% queue_index adds to these terms the details of its segments and
   217 %% stores the terms in the queue directory.
   218 %%
   219 %% Two message stores are used. One is created for persistent messages
   220 %% to durable queues that must survive restarts, and the other is used
   221 %% for all other messages that just happen to need to be written to
   222 %% disk. On start up we can therefore nuke the transient message
   223 %% store, and be sure that the messages in the persistent store are
   224 %% all that we need.
   225 %%
   226 %% The references to the msg_stores are there so that the msg_store
   227 %% knows to only trust its saved state if all of the queues it was
   228 %% previously talking to come up cleanly. Likewise, the queues
   229 %% themselves (esp queue_index) skips work in init if all the queues
   230 %% and msg_store were shutdown cleanly. This gives both good speed
   231 %% improvements and also robustness so that if anything possibly went
   232 %% wrong in shutdown (or there was subsequent manual tampering), all
   233 %% messages and queues that can be recovered are recovered, safely.
   234 %%
   235 %% To delete transient messages lazily, the variable_queue, on
   236 %% startup, stores the next_seq_id reported by the queue_index as the
   237 %% transient_threshold. From that point on, whenever it's reading a
   238 %% message off disk via the queue_index, if the seq_id is below this
   239 %% threshold and the message is transient then it drops the message
   240 %% (the message itself won't exist on disk because it would have been
   241 %% stored in the transient msg_store which would have had its saved
   242 %% state nuked on startup). This avoids the expensive operation of
   243 %% scanning the entire queue on startup in order to delete transient
   244 %% messages that were only pushed to disk to save memory.
   245 %%
   246 %%----------------------------------------------------------------------------
   247 
   248 -behaviour(rabbit_backing_queue).
   249 
   250 -record(vqstate,
   251         { q1,
   252           q2,
   253           delta,
   254           q3,
   255           q4,
   256           next_seq_id,
   257           pending_ack,
   258           pending_ack_index,
   259           ram_ack_index,
   260           index_state,
   261           msg_store_clients,
   262           durable,
   263           transient_threshold,
   264 
   265           async_callback,
   266 
   267           len,
   268           persistent_count,
   269 
   270           target_ram_count,
   271           ram_msg_count,
   272           ram_msg_count_prev,
   273           ram_ack_count_prev,
   274           out_counter,
   275           in_counter,
   276           rates,
   277           msgs_on_disk,
   278           msg_indices_on_disk,
   279           unconfirmed,
   280           confirmed,
   281           ack_out_counter,
   282           ack_in_counter,
   283           ack_rates
   284         }).
   285 
   286 -record(rates, { egress, ingress, avg_egress, avg_ingress, timestamp }).
   287 
   288 -record(msg_status,
   289         { seq_id,
   290           msg_id,
   291           msg,
   292           is_persistent,
   293           is_delivered,
   294           msg_on_disk,
   295           index_on_disk,
   296           msg_props
   297         }).
   298 
   299 -record(delta,
   300         { start_seq_id, %% start_seq_id is inclusive
   301           count,
   302           end_seq_id    %% end_seq_id is exclusive
   303         }).
   304 
   305 %% When we discover, on publish, that we should write some indices to
   306 %% disk for some betas, the IO_BATCH_SIZE sets the number of betas
   307 %% that we must be due to write indices for before we do any work at
   308 %% all. This is both a minimum and a maximum - we don't write fewer
   309 %% than IO_BATCH_SIZE indices out in one go, and we don't write more -
   310 %% we can always come back on the next publish to do more.
   311 -define(IO_BATCH_SIZE, 64).
   312 -define(PERSISTENT_MSG_STORE, msg_store_persistent).
   313 -define(TRANSIENT_MSG_STORE,  msg_store_transient).
   314 -define(QUEUE, lqueue).
   315 
   316 -include("rabbit.hrl").
   317 
   318 %%----------------------------------------------------------------------------
   319 
   320 -rabbit_upgrade({multiple_routing_keys, local, []}).
   321 
   322 -ifdef(use_specs).
   323 
   324 -type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}).
   325 -type(seq_id()  :: non_neg_integer()).
   326 
   327 -type(rates() :: #rates { egress      :: {timestamp(), non_neg_integer()},
   328                           ingress     :: {timestamp(), non_neg_integer()},
   329                           avg_egress  :: float(),
   330                           avg_ingress :: float(),
   331                           timestamp   :: timestamp() }).
   332 
   333 -type(delta() :: #delta { start_seq_id :: non_neg_integer(),
   334                           count        :: non_neg_integer(),
   335                           end_seq_id   :: non_neg_integer() }).
   336 
   337 %% The compiler (rightfully) complains that ack() and state() are
   338 %% unused. For this reason we duplicate a -spec from
   339 %% rabbit_backing_queue with the only intent being to remove
   340 %% warnings. The problem here is that we can't parameterise the BQ
   341 %% behaviour by these two types as we would like to. We still leave
   342 %% these here for documentation purposes.
   343 -type(ack() :: seq_id()).
   344 -type(state() :: #vqstate {
   345              q1                    :: ?QUEUE:?QUEUE(),
   346              q2                    :: ?QUEUE:?QUEUE(),
   347              delta                 :: delta(),
   348              q3                    :: ?QUEUE:?QUEUE(),
   349              q4                    :: ?QUEUE:?QUEUE(),
   350              next_seq_id           :: seq_id(),
   351              pending_ack           :: gb_tree(),
   352              ram_ack_index         :: gb_tree(),
   353              index_state           :: any(),
   354              msg_store_clients     :: 'undefined' | {{any(), binary()},
   355                                                     {any(), binary()}},
   356              durable               :: boolean(),
   357              transient_threshold   :: non_neg_integer(),
   358 
   359              async_callback        :: rabbit_backing_queue:async_callback(),
   360 
   361              len                   :: non_neg_integer(),
   362              persistent_count      :: non_neg_integer(),
   363 
   364              target_ram_count      :: non_neg_integer() | 'infinity',
   365              ram_msg_count         :: non_neg_integer(),
   366              ram_msg_count_prev    :: non_neg_integer(),
   367              out_counter           :: non_neg_integer(),
   368              in_counter            :: non_neg_integer(),
   369              rates                 :: rates(),
   370              msgs_on_disk          :: gb_set(),
   371              msg_indices_on_disk   :: gb_set(),
   372              unconfirmed           :: gb_set(),
   373              confirmed             :: gb_set(),
   374              ack_out_counter       :: non_neg_integer(),
   375              ack_in_counter        :: non_neg_integer(),
   376              ack_rates             :: rates() }).
   377 %% Duplicated from rabbit_backing_queue
   378 -spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
   379 
   380 -spec(multiple_routing_keys/0 :: () -> 'ok').
   381 
   382 -endif.
   383 
   384 -define(BLANK_DELTA, #delta { start_seq_id = undefined,
   385                               count        = 0,
   386                               end_seq_id   = undefined }).
   387 -define(BLANK_DELTA_PATTERN(Z), #delta { start_seq_id = Z,
   388                                          count        = 0,
   389                                          end_seq_id   = Z }).
   390 
   391 %%----------------------------------------------------------------------------
   392 %% Public API
   393 %%----------------------------------------------------------------------------
   394 
   395 start(DurableQueues) ->
   396     {AllTerms, StartFunState} = rabbit_queue_index:recover(DurableQueues),
   397     start_msg_store(
   398       [Ref || Terms <- AllTerms,
   399               begin
   400                   Ref = proplists:get_value(persistent_ref, Terms),
   401                   Ref =/= undefined
   402               end],
   403       StartFunState).
   404 
   405 stop() -> stop_msg_store().
   406 
   407 start_msg_store(Refs, StartFunState) ->
   408     ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store,
   409                                 [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(),
   410                                  undefined,  {fun (ok) -> finished end, ok}]),
   411     ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store,
   412                                 [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(),
   413                                  Refs, StartFunState]).
   414 
   415 stop_msg_store() ->
   416     ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
   417     ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
   418 
   419 init(Queue, Recover, AsyncCallback) ->
   420     init(Queue, Recover, AsyncCallback,
   421          fun (MsgIds, ActionTaken) ->
   422                  msgs_written_to_disk(AsyncCallback, MsgIds, ActionTaken)
   423          end,
   424          fun (MsgIds) -> msg_indices_written_to_disk(AsyncCallback, MsgIds) end).
   425 
   426 init(#amqqueue { name = QueueName, durable = IsDurable }, false,
   427      AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
   428     IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun),
   429     init(IsDurable, IndexState, 0, [], AsyncCallback,
   430          case IsDurable of
   431              true  -> msg_store_client_init(?PERSISTENT_MSG_STORE,
   432                                             MsgOnDiskFun, AsyncCallback);
   433              false -> undefined
   434          end,
   435          msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback));
   436 
   437 init(#amqqueue { name = QueueName, durable = true }, true,
   438      AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
   439     Terms = rabbit_queue_index:shutdown_terms(QueueName),
   440     {PRef, Terms1} =
   441         case proplists:get_value(persistent_ref, Terms) of
   442             undefined -> {rabbit_guid:gen(), []};
   443             PRef1     -> {PRef1, Terms}
   444         end,
   445     PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
   446                                              MsgOnDiskFun, AsyncCallback),
   447     TransientClient  = msg_store_client_init(?TRANSIENT_MSG_STORE,
   448                                              undefined, AsyncCallback),
   449     {DeltaCount, IndexState} =
   450         rabbit_queue_index:recover(
   451           QueueName, Terms1,
   452           rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
   453           fun (MsgId) ->
   454                   rabbit_msg_store:contains(MsgId, PersistentClient)
   455           end,
   456           MsgIdxOnDiskFun),
   457     init(true, IndexState, DeltaCount, Terms1, AsyncCallback,
   458          PersistentClient, TransientClient).
   459 
   460 terminate(_Reason, State) ->
   461     State1 = #vqstate { persistent_count  = PCount,
   462                         index_state       = IndexState,
   463                         msg_store_clients = {MSCStateP, MSCStateT} } =
   464         purge_pending_ack(true, State),
   465     PRef = case MSCStateP of
   466                undefined -> undefined;
   467                _         -> ok = rabbit_msg_store:client_terminate(MSCStateP),
   468                             rabbit_msg_store:client_ref(MSCStateP)
   469            end,
   470     ok = rabbit_msg_store:client_delete_and_terminate(MSCStateT),
   471     Terms = [{persistent_ref, PRef}, {persistent_count, PCount}],
   472     a(State1 #vqstate { index_state       = rabbit_queue_index:terminate(
   473                                               Terms, IndexState),
   474                         msg_store_clients = undefined }).
   475 
   476 %% the only difference between purge and delete is that delete also
   477 %% needs to delete everything that's been delivered and not ack'd.
   478 delete_and_terminate(_Reason, State) ->
   479     %% TODO: there is no need to interact with qi at all - which we do
   480     %% as part of 'purge' and 'purge_pending_ack', other than
   481     %% deleting it.
   482     {_PurgeCount, State1} = purge(State),
   483     State2 = #vqstate { index_state         = IndexState,
   484                         msg_store_clients   = {MSCStateP, MSCStateT} } =
   485         purge_pending_ack(false, State1),
   486     IndexState1 = rabbit_queue_index:delete_and_terminate(IndexState),
   487     case MSCStateP of
   488         undefined -> ok;
   489         _         -> rabbit_msg_store:client_delete_and_terminate(MSCStateP)
   490     end,
   491     rabbit_msg_store:client_delete_and_terminate(MSCStateT),
   492     a(State2 #vqstate { index_state       = IndexState1,
   493                         msg_store_clients = undefined }).
   494 
   495 purge(State = #vqstate { q4                = Q4,
   496                          index_state       = IndexState,
   497                          msg_store_clients = MSCState,
   498                          len               = Len,
   499                          persistent_count  = PCount }) ->
   500     %% TODO: when there are no pending acks, which is a common case,
   501     %% we could simply wipe the qi instead of issuing delivers and
   502     %% acks for all the messages.
   503     {LensByStore, IndexState1} = remove_queue_entries(
   504                                    fun ?QUEUE:foldl/3, Q4,
   505                                    orddict:new(), IndexState, MSCState),
   506     {LensByStore1, State1 = #vqstate { q1                = Q1,
   507                                        index_state       = IndexState2,
   508                                        msg_store_clients = MSCState1 }} =
   509         purge_betas_and_deltas(LensByStore,
   510                                State #vqstate { q4          = ?QUEUE:new(),
   511                                                 index_state = IndexState1 }),
   512     {LensByStore2, IndexState3} = remove_queue_entries(
   513                                     fun ?QUEUE:foldl/3, Q1,
   514                                     LensByStore1, IndexState2, MSCState1),
   515     PCount1 = PCount - find_persistent_count(LensByStore2),
   516     {Len, a(State1 #vqstate { q1                = ?QUEUE:new(),
   517                               index_state       = IndexState3,
   518                               len               = 0,
   519                               ram_msg_count     = 0,
   520                               persistent_count  = PCount1 })}.
   521 
   522 publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
   523         MsgProps = #message_properties { needs_confirming = NeedsConfirming },
   524         _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
   525                                    next_seq_id      = SeqId,
   526                                    len              = Len,
   527                                    in_counter       = InCount,
   528                                    persistent_count = PCount,
   529                                    durable          = IsDurable,
   530                                    ram_msg_count    = RamMsgCount,
   531                                    unconfirmed      = UC }) ->
   532     IsPersistent1 = IsDurable andalso IsPersistent,
   533     MsgStatus = msg_status(IsPersistent1, SeqId, Msg, MsgProps),
   534     {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
   535     State2 = case ?QUEUE:is_empty(Q3) of
   536                  false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) };
   537                  true  -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) }
   538              end,
   539     PCount1 = PCount + one_if(IsPersistent1),
   540     UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
   541     a(reduce_memory_use(State2 #vqstate { next_seq_id      = SeqId   + 1,
   542                                           len              = Len     + 1,
   543                                           in_counter       = InCount + 1,
   544                                           persistent_count = PCount1,
   545                                           ram_msg_count    = RamMsgCount + 1,
   546                                           unconfirmed      = UC1 })).
   547 
   548 publish_delivered(false, #basic_message { id = MsgId },
   549                   #message_properties { needs_confirming = NeedsConfirming },
   550                   _ChPid, State = #vqstate { async_callback = Callback,
   551                                              len = 0 }) ->
   552     case NeedsConfirming of
   553         true  -> blind_confirm(Callback, gb_sets:singleton(MsgId));
   554         false -> ok
   555     end,
   556     {undefined, a(State)};
   557 publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
   558                                                id = MsgId },
   559                   MsgProps = #message_properties {
   560                     needs_confirming = NeedsConfirming },
   561                   _ChPid, State = #vqstate { len              = 0,
   562                                              next_seq_id      = SeqId,
   563                                              out_counter      = OutCount,
   564                                              in_counter       = InCount,
   565                                              persistent_count = PCount,
   566                                              durable          = IsDurable,
   567                                              unconfirmed      = UC }) ->
   568     IsPersistent1 = IsDurable andalso IsPersistent,
   569     MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
   570         #msg_status { is_delivered = true },
   571     {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
   572     State2 = record_pending_ack(m(MsgStatus1), State1),
   573     PCount1 = PCount + one_if(IsPersistent1),
   574     UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
   575     {SeqId, a(reduce_memory_use(
   576                 State2 #vqstate { next_seq_id      = SeqId    + 1,
   577                                   out_counter      = OutCount + 1,
   578                                   in_counter       = InCount  + 1,
   579                                   persistent_count = PCount1,
   580                                   unconfirmed      = UC1 }))}.
   581 
   582 drain_confirmed(State = #vqstate { confirmed = C }) ->
   583     case gb_sets:is_empty(C) of
   584         true  -> {[], State}; %% common case
   585         false -> {gb_sets:to_list(C), State #vqstate {
   586                                         confirmed = gb_sets:new() }}
   587     end.
   588 
   589 dropwhile(Pred, AckRequired, State) -> dropwhile(Pred, AckRequired, State, []).
   590 
   591 dropwhile(Pred, AckRequired, State, Msgs) ->
   592     End = fun(S) when AckRequired -> {lists:reverse(Msgs), S};
   593              (S)                  -> {undefined, S}
   594           end,
   595     case queue_out(State) of
   596         {empty, State1} ->
   597             End(a(State1));
   598         {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
   599             case {Pred(MsgProps), AckRequired} of
   600                 {true, true} ->
   601                     {MsgStatus1, State2} = read_msg(MsgStatus, State1),
   602                     {{Msg, _, AckTag, _}, State3} =
   603                          internal_fetch(true, MsgStatus1, State2),
   604                     dropwhile(Pred, AckRequired, State3, [{Msg, AckTag} | Msgs]);
   605                 {true, false} ->
   606                     {_, State2} = internal_fetch(false, MsgStatus, State1),
   607                     dropwhile(Pred, AckRequired, State2, undefined);
   608                 {false, _} ->
   609                     End(a(in_r(MsgStatus, State1)))
   610             end
   611     end.
   612 
   613 fetch(AckRequired, State) ->
   614     case queue_out(State) of
   615         {empty, State1} ->
   616             {empty, a(State1)};
   617         {{value, MsgStatus}, State1} ->
   618             %% it is possible that the message wasn't read from disk
   619             %% at this point, so read it in.
   620             {MsgStatus1, State2} = read_msg(MsgStatus, State1),
   621             {Res, State3} = internal_fetch(AckRequired, MsgStatus1, State2),
   622             {Res, a(State3)}
   623     end.
   624 
   625 ack([], State) ->
   626     {[], State};
   627 ack(AckTags, State) ->
   628     {{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds},
   629      State1 = #vqstate { index_state       = IndexState,
   630                          msg_store_clients = MSCState,
   631                          persistent_count  = PCount,
   632                          ack_out_counter   = AckOutCount }} =
   633         lists:foldl(
   634           fun (SeqId, {Acc, State2}) ->
   635                   {MsgStatus, State3} = remove_pending_ack(SeqId, State2),
   636                   {accumulate_ack(MsgStatus, Acc), State3}
   637           end, {accumulate_ack_init(), State}, AckTags),
   638     IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
   639     [ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
   640      || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
   641     PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len(
   642                                                orddict:new(), MsgIdsByStore)),
   643     {lists:reverse(AllMsgIds),
   644      a(State1 #vqstate { index_state      = IndexState1,
   645                          persistent_count = PCount1,
   646                          ack_out_counter  = AckOutCount + length(AckTags) })}.
   647 
   648 fold(undefined, State, _AckTags) ->
   649     State;
   650 fold(MsgFun, State = #vqstate{pending_ack = PA}, AckTags) ->
   651     lists:foldl(
   652       fun(SeqId, State1) ->
   653               {MsgStatus, State2} =
   654                   read_msg(gb_trees:get(SeqId, PA), State1),
   655               MsgFun(MsgStatus#msg_status.msg, SeqId),
   656               State2
   657       end, State, AckTags).
   658 
   659 requeue(AckTags, #vqstate { delta      = Delta,
   660                             q3         = Q3,
   661                             q4         = Q4,
   662                             in_counter = InCounter,
   663                             len        = Len } = State) ->
   664     {SeqIds,  Q4a, MsgIds,  State1} = queue_merge(lists:sort(AckTags), Q4, [],
   665                                                   beta_limit(Q3),
   666                                                   fun publish_alpha/2, State),
   667     {SeqIds1, Q3a, MsgIds1, State2} = queue_merge(SeqIds, Q3, MsgIds,
   668                                                   delta_limit(Delta),
   669                                                   fun publish_beta/2, State1),
   670     {Delta1, MsgIds2, State3}       = delta_merge(SeqIds1, Delta, MsgIds1,
   671                                                   State2),
   672     MsgCount = length(MsgIds2),
   673     {MsgIds2, a(reduce_memory_use(
   674                   State3 #vqstate { delta      = Delta1,
   675                                     q3         = Q3a,
   676                                     q4         = Q4a,
   677                                     in_counter = InCounter + MsgCount,
   678                                     len        = Len + MsgCount }))}.
   679 
   680 len(#vqstate { len = Len }) -> Len.
   681 
   682 is_empty(State) -> 0 == len(State).
   683 
   684 set_ram_duration_target(
   685   DurationTarget, State = #vqstate {
   686                     rates     = #rates { avg_egress  = AvgEgressRate,
   687                                          avg_ingress = AvgIngressRate },
   688                     ack_rates = #rates { avg_egress  = AvgAckEgressRate,
   689                                          avg_ingress = AvgAckIngressRate },
   690                     target_ram_count = TargetRamCount }) ->
   691     Rate =
   692         AvgEgressRate + AvgIngressRate + AvgAckEgressRate + AvgAckIngressRate,
   693     TargetRamCount1 =
   694         case DurationTarget of
   695             infinity  -> infinity;
   696             _         -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec
   697         end,
   698     State1 = State #vqstate { target_ram_count = TargetRamCount1 },
   699     a(case TargetRamCount1 == infinity orelse
   700           (TargetRamCount =/= infinity andalso
   701            TargetRamCount1 >= TargetRamCount) of
   702           true  -> State1;
   703           false -> reduce_memory_use(State1)
   704       end).
   705 
   706 ram_duration(State = #vqstate {
   707                rates              = #rates { timestamp = Timestamp,
   708                                              egress    = Egress,
   709                                              ingress   = Ingress } = Rates,
   710                ack_rates          = #rates { timestamp = AckTimestamp,
   711                                              egress    = AckEgress,
   712                                              ingress   = AckIngress } = ARates,
   713                in_counter         = InCount,
   714                out_counter        = OutCount,
   715                ack_in_counter     = AckInCount,
   716                ack_out_counter    = AckOutCount,
   717                ram_msg_count      = RamMsgCount,
   718                ram_msg_count_prev = RamMsgCountPrev,
   719                ram_ack_index      = RamAckIndex,
   720                ram_ack_count_prev = RamAckCountPrev }) ->
   721     Now = now(),
   722     {AvgEgressRate,   Egress1} = update_rate(Now, Timestamp, OutCount, Egress),
   723     {AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress),
   724 
   725     {AvgAckEgressRate,   AckEgress1} =
   726         update_rate(Now, AckTimestamp, AckOutCount, AckEgress),
   727     {AvgAckIngressRate, AckIngress1} =
   728         update_rate(Now, AckTimestamp, AckInCount, AckIngress),
   729 
   730     RamAckCount = gb_trees:size(RamAckIndex),
   731 
   732     Duration = %% msgs+acks / (msgs+acks/sec) == sec
   733         case (AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso
   734               AvgAckEgressRate == 0 andalso AvgAckIngressRate == 0) of
   735             true  -> infinity;
   736             false -> (RamMsgCountPrev + RamMsgCount +
   737                           RamAckCount + RamAckCountPrev) /
   738                          (4 * (AvgEgressRate + AvgIngressRate +
   739                                    AvgAckEgressRate + AvgAckIngressRate))
   740         end,
   741 
   742     {Duration, State #vqstate {
   743                  rates              = Rates #rates {
   744                                         egress      = Egress1,
   745                                         ingress     = Ingress1,
   746                                         avg_egress  = AvgEgressRate,
   747                                         avg_ingress = AvgIngressRate,
   748                                         timestamp   = Now },
   749                  ack_rates          = ARates #rates {
   750                                         egress      = AckEgress1,
   751                                         ingress     = AckIngress1,
   752                                         avg_egress  = AvgAckEgressRate,
   753                                         avg_ingress = AvgAckIngressRate,
   754                                         timestamp   = Now },
   755                  in_counter         = 0,
   756                  out_counter        = 0,
   757                  ack_in_counter     = 0,
   758                  ack_out_counter    = 0,
   759                  ram_msg_count_prev = RamMsgCount,
   760                  ram_ack_count_prev = RamAckCount }}.
   761 
   762 needs_timeout(State = #vqstate { index_state = IndexState }) ->
   763     case must_sync_index(State) of
   764         true  -> timed;
   765         false ->
   766             case rabbit_queue_index:needs_sync(IndexState) of
   767                 true  -> idle;
   768                 false -> case reduce_memory_use(
   769                                 fun (_Quota, State1) -> {0, State1} end,
   770                                 fun (_Quota, State1) -> State1 end,
   771                                 fun (_Quota, State1) -> {0, State1} end,
   772                                 State) of
   773                              {true,  _State} -> idle;
   774                              {false, _State} -> false
   775                          end
   776             end
   777     end.
   778 
   779 timeout(State = #vqstate { index_state = IndexState }) ->
   780     IndexState1 = rabbit_queue_index:sync(IndexState),
   781     State1 = State #vqstate { index_state = IndexState1 },
   782     a(reduce_memory_use(State1)).
   783 
   784 handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
   785     State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
   786 
   787 status(#vqstate {
   788           q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
   789           len              = Len,
   790           pending_ack      = PA,
   791           ram_ack_index    = RAI,
   792           target_ram_count = TargetRamCount,
   793           ram_msg_count    = RamMsgCount,
   794           next_seq_id      = NextSeqId,
   795           persistent_count = PersistentCount,
   796           rates            = #rates { avg_egress  = AvgEgressRate,
   797                                       avg_ingress = AvgIngressRate },
   798           ack_rates        = #rates { avg_egress  = AvgAckEgressRate,
   799                                       avg_ingress = AvgAckIngressRate } }) ->
   800     [ {q1                  , ?QUEUE:len(Q1)},
   801       {q2                  , ?QUEUE:len(Q2)},
   802       {delta               , Delta},
   803       {q3                  , ?QUEUE:len(Q3)},
   804       {q4                  , ?QUEUE:len(Q4)},
   805       {len                 , Len},
   806       {pending_acks        , gb_trees:size(PA)},
   807       {target_ram_count    , TargetRamCount},
   808       {ram_msg_count       , RamMsgCount},
   809       {ram_ack_count       , gb_trees:size(RAI)},
   810       {next_seq_id         , NextSeqId},
   811       {persistent_count    , PersistentCount},
   812       {avg_ingress_rate    , AvgIngressRate},
   813       {avg_egress_rate     , AvgEgressRate},
   814       {avg_ack_ingress_rate, AvgAckIngressRate},
   815       {avg_ack_egress_rate , AvgAckEgressRate} ].
   816 
   817 invoke(?MODULE, Fun, State) -> Fun(?MODULE, State).
   818 
   819 is_duplicate(_Msg, State) -> {false, State}.
   820 
   821 discard(_Msg, _ChPid, State) -> State.
   822 
   823 %%----------------------------------------------------------------------------
   824 %% Minor helpers
   825 %%----------------------------------------------------------------------------
   826 
   827 a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
   828                      len              = Len,
   829                      persistent_count = PersistentCount,
   830                      ram_msg_count    = RamMsgCount }) ->
   831     E1 = ?QUEUE:is_empty(Q1),
   832     E2 = ?QUEUE:is_empty(Q2),
   833     ED = Delta#delta.count == 0,
   834     E3 = ?QUEUE:is_empty(Q3),
   835     E4 = ?QUEUE:is_empty(Q4),
   836     LZ = Len == 0,
   837 
   838     true = E1 or not E3,
   839     true = E2 or not ED,
   840     true = ED or not E3,
   841     true = LZ == (E3 and E4),
   842 
   843     true = Len             >= 0,
   844     true = PersistentCount >= 0,
   845     true = RamMsgCount     >= 0,
   846 
   847     State.
   848 
   849 d(Delta = #delta { start_seq_id = Start, count = Count, end_seq_id = End })
   850   when Start + Count =< End ->
   851     Delta.
   852 
   853 m(MsgStatus = #msg_status { msg           = Msg,
   854                             is_persistent = IsPersistent,
   855                             msg_on_disk   = MsgOnDisk,
   856                             index_on_disk = IndexOnDisk }) ->
   857     true = (not IsPersistent) or IndexOnDisk,
   858     true = (not IndexOnDisk) or MsgOnDisk,
   859     true = (Msg =/= undefined) or MsgOnDisk,
   860 
   861     MsgStatus.
   862 
   863 one_if(true ) -> 1;
   864 one_if(false) -> 0.
   865 
   866 cons_if(true,   E, L) -> [E | L];
   867 cons_if(false, _E, L) -> L.
   868 
   869 gb_sets_maybe_insert(false, _Val, Set) -> Set;
   870 %% when requeueing, we re-add a msg_id to the unconfirmed set
   871 gb_sets_maybe_insert(true,  Val,  Set) -> gb_sets:add(Val, Set).
   872 
   873 msg_status(IsPersistent, SeqId, Msg = #basic_message { id = MsgId },
   874            MsgProps) ->
   875     #msg_status { seq_id = SeqId, msg_id = MsgId, msg = Msg,
   876                   is_persistent = IsPersistent, is_delivered = false,
   877                   msg_on_disk = false, index_on_disk = false,
   878                   msg_props = MsgProps }.
   879 
   880 trim_msg_status(MsgStatus) -> MsgStatus #msg_status { msg = undefined }.
   881 
   882 with_msg_store_state({MSCStateP, MSCStateT},  true, Fun) ->
   883     {Result, MSCStateP1} = Fun(MSCStateP),
   884     {Result, {MSCStateP1, MSCStateT}};
   885 with_msg_store_state({MSCStateP, MSCStateT}, false, Fun) ->
   886     {Result, MSCStateT1} = Fun(MSCStateT),
   887     {Result, {MSCStateP, MSCStateT1}}.
   888 
   889 with_immutable_msg_store_state(MSCState, IsPersistent, Fun) ->
   890     {Res, MSCState} = with_msg_store_state(MSCState, IsPersistent,
   891                                            fun (MSCState1) ->
   892                                                    {Fun(MSCState1), MSCState1}
   893                                            end),
   894     Res.
   895 
   896 msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) ->
   897     msg_store_client_init(MsgStore, rabbit_guid:gen(), MsgOnDiskFun,
   898                           Callback).
   899 
   900 msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) ->
   901     CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE),
   902     rabbit_msg_store:client_init(MsgStore, Ref, MsgOnDiskFun,
   903                                  fun () -> Callback(?MODULE, CloseFDsFun) end).
   904 
   905 msg_store_write(MSCState, IsPersistent, MsgId, Msg) ->
   906     with_immutable_msg_store_state(
   907       MSCState, IsPersistent,
   908       fun (MSCState1) ->
   909               rabbit_msg_store:write_flow(MsgId, Msg, MSCState1)
   910       end).
   911 
   912 msg_store_read(MSCState, IsPersistent, MsgId) ->
   913     with_msg_store_state(
   914       MSCState, IsPersistent,
   915       fun (MSCState1) ->
   916               rabbit_msg_store:read(MsgId, MSCState1)
   917       end).
   918 
   919 msg_store_remove(MSCState, IsPersistent, MsgIds) ->
   920     with_immutable_msg_store_state(
   921       MSCState, IsPersistent,
   922       fun (MCSState1) ->
   923               rabbit_msg_store:remove(MsgIds, MCSState1)
   924       end).
   925 
   926 msg_store_close_fds(MSCState, IsPersistent) ->
   927     with_msg_store_state(
   928       MSCState, IsPersistent,
   929       fun (MSCState1) -> rabbit_msg_store:close_all_indicated(MSCState1) end).
   930 
   931 msg_store_close_fds_fun(IsPersistent) ->
   932     fun (?MODULE, State = #vqstate { msg_store_clients = MSCState }) ->
   933             {ok, MSCState1} = msg_store_close_fds(MSCState, IsPersistent),
   934             State #vqstate { msg_store_clients = MSCState1 }
   935     end.
   936 
   937 maybe_write_delivered(false, _SeqId, IndexState) ->
   938     IndexState;
   939 maybe_write_delivered(true, SeqId, IndexState) ->
   940     rabbit_queue_index:deliver([SeqId], IndexState).
   941 
   942 betas_from_index_entries(List, TransientThreshold, PA, IndexState) ->
   943     {Filtered, Delivers, Acks} =
   944         lists:foldr(
   945           fun ({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered},
   946                {Filtered1, Delivers1, Acks1} = Acc) ->
   947                   case SeqId < TransientThreshold andalso not IsPersistent of
   948                       true  -> {Filtered1,
   949                                 cons_if(not IsDelivered, SeqId, Delivers1),
   950                                 [SeqId | Acks1]};
   951                       false -> case gb_trees:is_defined(SeqId, PA) of
   952                                    false ->
   953                                        {?QUEUE:in_r(
   954                                            m(#msg_status {
   955                                                 seq_id        = SeqId,
   956                                                 msg_id        = MsgId,
   957                                                 msg           = undefined,
   958                                                 is_persistent = IsPersistent,
   959                                                 is_delivered  = IsDelivered,
   960                                                 msg_on_disk   = true,
   961                                                 index_on_disk = true,
   962                                                 msg_props     = MsgProps
   963                                                }), Filtered1),
   964                                         Delivers1, Acks1};
   965                                    true ->
   966                                        Acc
   967                            end
   968                   end
   969           end, {?QUEUE:new(), [], []}, List),
   970     {Filtered, rabbit_queue_index:ack(
   971                  Acks, rabbit_queue_index:deliver(Delivers, IndexState))}.
   972 
   973 expand_delta(SeqId, ?BLANK_DELTA_PATTERN(X)) ->
   974     d(#delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1 });
   975 expand_delta(SeqId, #delta { start_seq_id = StartSeqId,
   976                              count        = Count } = Delta)
   977   when SeqId < StartSeqId ->
   978     d(Delta #delta { start_seq_id = SeqId, count = Count + 1 });
   979 expand_delta(SeqId, #delta { count        = Count,
   980                              end_seq_id   = EndSeqId } = Delta)
   981   when SeqId >= EndSeqId ->
   982     d(Delta #delta { count = Count + 1, end_seq_id = SeqId + 1 });
   983 expand_delta(_SeqId, #delta { count       = Count } = Delta) ->
   984     d(Delta #delta { count = Count + 1 }).
   985 
   986 update_rate(Now, Then, Count, {OThen, OCount}) ->
   987     %% avg over the current period and the previous
   988     {1000000.0 * (Count + OCount) / timer:now_diff(Now, OThen), {Then, Count}}.
   989 
   990 %%----------------------------------------------------------------------------
   991 %% Internal major helpers for Public API
   992 %%----------------------------------------------------------------------------
   993 
   994 init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
   995      PersistentClient, TransientClient) ->
   996     {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
   997 
   998     DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount),
   999     Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of
  1000                 true  -> ?BLANK_DELTA;
  1001                 false -> d(#delta { start_seq_id = LowSeqId,
  1002                                     count        = DeltaCount1,
  1003                                     end_seq_id   = NextSeqId })
  1004             end,
  1005     Now = now(),
  1006     State = #vqstate {
  1007       q1                  = ?QUEUE:new(),
  1008       q2                  = ?QUEUE:new(),
  1009       delta               = Delta,
  1010       q3                  = ?QUEUE:new(),
  1011       q4                  = ?QUEUE:new(),
  1012       next_seq_id         = NextSeqId,
  1013       pending_ack         = gb_trees:empty(),
  1014       ram_ack_index       = gb_trees:empty(),
  1015       index_state         = IndexState1,
  1016       msg_store_clients   = {PersistentClient, TransientClient},
  1017       durable             = IsDurable,
  1018       transient_threshold = NextSeqId,
  1019 
  1020       async_callback      = AsyncCallback,
  1021 
  1022       len                 = DeltaCount1,
  1023       persistent_count    = DeltaCount1,
  1024 
  1025       target_ram_count    = infinity,
  1026       ram_msg_count       = 0,
  1027       ram_msg_count_prev  = 0,
  1028       ram_ack_count_prev  = 0,
  1029       out_counter         = 0,
  1030       in_counter          = 0,
  1031       rates               = blank_rate(Now, DeltaCount1),
  1032       msgs_on_disk        = gb_sets:new(),
  1033       msg_indices_on_disk = gb_sets:new(),
  1034       unconfirmed         = gb_sets:new(),
  1035       confirmed           = gb_sets:new(),
  1036       ack_out_counter     = 0,
  1037       ack_in_counter      = 0,
  1038       ack_rates           = blank_rate(Now, 0) },
  1039     a(maybe_deltas_to_betas(State)).
  1040 
  1041 blank_rate(Timestamp, IngressLength) ->
  1042     #rates { egress      = {Timestamp, 0},
  1043              ingress     = {Timestamp, IngressLength},
  1044              avg_egress  = 0.0,
  1045              avg_ingress = 0.0,
  1046              timestamp   = Timestamp }.
  1047 
  1048 in_r(MsgStatus = #msg_status { msg = undefined },
  1049      State = #vqstate { q3 = Q3, q4 = Q4 }) ->
  1050     case ?QUEUE:is_empty(Q4) of
  1051         true  -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) };
  1052         false -> {MsgStatus1, State1 = #vqstate { q4 = Q4a }} =
  1053                      read_msg(MsgStatus, State),
  1054                  State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) }
  1055     end;
  1056 in_r(MsgStatus, State = #vqstate { q4 = Q4 }) ->
  1057     State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }.
  1058 
  1059 queue_out(State = #vqstate { q4 = Q4 }) ->
  1060     case ?QUEUE:out(Q4) of
  1061         {empty, _Q4} ->
  1062             case fetch_from_q3(State) of
  1063                 {empty, _State1} = Result     -> Result;
  1064                 {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1}
  1065             end;
  1066         {{value, MsgStatus}, Q4a} ->
  1067             {{value, MsgStatus}, State #vqstate { q4 = Q4a }}
  1068     end.
  1069 
  1070 read_msg(MsgStatus = #msg_status { msg           = undefined,
  1071                                    msg_id        = MsgId,
  1072                                    is_persistent = IsPersistent },
  1073          State = #vqstate { ram_msg_count     = RamMsgCount,
  1074                             msg_store_clients = MSCState}) ->
  1075     {{ok, Msg = #basic_message {}}, MSCState1} =
  1076         msg_store_read(MSCState, IsPersistent, MsgId),
  1077     {MsgStatus #msg_status { msg = Msg },
  1078      State #vqstate { ram_msg_count     = RamMsgCount + 1,
  1079                       msg_store_clients = MSCState1 }};
  1080 read_msg(MsgStatus, State) ->
  1081     {MsgStatus, State}.
  1082 
  1083 internal_fetch(AckRequired, MsgStatus = #msg_status {
  1084                               seq_id        = SeqId,
  1085                               msg_id        = MsgId,
  1086                               msg           = Msg,
  1087                               is_persistent = IsPersistent,
  1088                               is_delivered  = IsDelivered,
  1089                               msg_on_disk   = MsgOnDisk,
  1090                               index_on_disk = IndexOnDisk },
  1091                State = #vqstate {ram_msg_count     = RamMsgCount,
  1092                                  out_counter       = OutCount,
  1093                                  index_state       = IndexState,
  1094                                  msg_store_clients = MSCState,
  1095                                  len               = Len,
  1096                                  persistent_count  = PCount }) ->
  1097     %% 1. Mark it delivered if necessary
  1098     IndexState1 = maybe_write_delivered(
  1099                     IndexOnDisk andalso not IsDelivered,
  1100                     SeqId, IndexState),
  1101 
  1102     %% 2. Remove from msg_store and queue index, if necessary
  1103     Rem = fun () ->
  1104                   ok = msg_store_remove(MSCState, IsPersistent, [MsgId])
  1105           end,
  1106     Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end,
  1107     IndexState2 =
  1108         case {AckRequired, MsgOnDisk, IndexOnDisk} of
  1109             {false, true, false} -> Rem(), IndexState1;
  1110             {false, true,  true} -> Rem(), Ack();
  1111             _                    -> IndexState1
  1112         end,
  1113 
  1114     %% 3. If an ack is required, add something sensible to PA
  1115     {AckTag, State1} = case AckRequired of
  1116                            true  -> StateN = record_pending_ack(
  1117                                                MsgStatus #msg_status {
  1118                                                  is_delivered = true }, State),
  1119                                     {SeqId, StateN};
  1120                            false -> {undefined, State}
  1121                        end,
  1122 
  1123     PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
  1124     Len1 = Len - 1,
  1125     RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined),
  1126 
  1127     {{Msg, IsDelivered, AckTag, Len1},
  1128      State1 #vqstate { ram_msg_count    = RamMsgCount1,
  1129                        out_counter      = OutCount + 1,
  1130                        index_state      = IndexState2,
  1131                        len              = Len1,
  1132                        persistent_count = PCount1 }}.
  1133 
  1134 purge_betas_and_deltas(LensByStore,
  1135                        State = #vqstate { q3                = Q3,
  1136                                           index_state       = IndexState,
  1137                                           msg_store_clients = MSCState }) ->
  1138     case ?QUEUE:is_empty(Q3) of
  1139         true  -> {LensByStore, State};
  1140         false -> {LensByStore1, IndexState1} =
  1141                      remove_queue_entries(fun ?QUEUE:foldl/3, Q3,
  1142                                           LensByStore, IndexState, MSCState),
  1143                  purge_betas_and_deltas(LensByStore1,
  1144                                         maybe_deltas_to_betas(
  1145                                           State #vqstate {
  1146                                             q3          = ?QUEUE:new(),
  1147                                             index_state = IndexState1 }))
  1148     end.
  1149 
  1150 remove_queue_entries(Fold, Q, LensByStore, IndexState, MSCState) ->
  1151     {MsgIdsByStore, Delivers, Acks} =
  1152         Fold(fun remove_queue_entries1/2, {orddict:new(), [], []}, Q),
  1153     ok = orddict:fold(fun (IsPersistent, MsgIds, ok) ->
  1154                               msg_store_remove(MSCState, IsPersistent, MsgIds)
  1155                       end, ok, MsgIdsByStore),
  1156     {sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore),
  1157      rabbit_queue_index:ack(Acks,
  1158                             rabbit_queue_index:deliver(Delivers, IndexState))}.
  1159 
  1160 remove_queue_entries1(
  1161   #msg_status { msg_id = MsgId, seq_id = SeqId,
  1162                 is_delivered = IsDelivered, msg_on_disk = MsgOnDisk,
  1163                 index_on_disk = IndexOnDisk, is_persistent = IsPersistent },
  1164   {MsgIdsByStore, Delivers, Acks}) ->
  1165     {case MsgOnDisk of
  1166          true  -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore);
  1167          false -> MsgIdsByStore
  1168      end,
  1169      cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
  1170      cons_if(IndexOnDisk, SeqId, Acks)}.
  1171 
  1172 sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore) ->
  1173     orddict:fold(
  1174       fun (IsPersistent, MsgIds, LensByStore1) ->
  1175               orddict:update_counter(IsPersistent, length(MsgIds), LensByStore1)
  1176       end, LensByStore, MsgIdsByStore).
  1177 
  1178 %%----------------------------------------------------------------------------
  1179 %% Internal gubbins for publishing
  1180 %%----------------------------------------------------------------------------
  1181 
  1182 maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
  1183                                   msg_on_disk = true }, _MSCState) ->
  1184     MsgStatus;
  1185 maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
  1186                                  msg = Msg, msg_id = MsgId,
  1187                                  is_persistent = IsPersistent }, MSCState)
  1188   when Force orelse IsPersistent ->
  1189     Msg1 = Msg #basic_message {
  1190              %% don't persist any recoverable decoded properties
  1191              content = rabbit_binary_parser:clear_decoded_content(
  1192                          Msg #basic_message.content)},
  1193     ok = msg_store_write(MSCState, IsPersistent, MsgId, Msg1),
  1194     MsgStatus #msg_status { msg_on_disk = true };
  1195 maybe_write_msg_to_disk(_Force, MsgStatus, _MSCState) ->
  1196     MsgStatus.
  1197 
  1198 maybe_write_index_to_disk(_Force, MsgStatus = #msg_status {
  1199                                     index_on_disk = true }, IndexState) ->
  1200     true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
  1201     {MsgStatus, IndexState};
  1202 maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
  1203                                    msg_id        = MsgId,
  1204                                    seq_id        = SeqId,
  1205                                    is_persistent = IsPersistent,
  1206                                    is_delivered  = IsDelivered,
  1207                                    msg_props     = MsgProps}, IndexState)
  1208   when Force orelse IsPersistent ->
  1209     true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
  1210     IndexState1 = rabbit_queue_index:publish(
  1211                     MsgId, SeqId, MsgProps, IsPersistent, IndexState),
  1212     {MsgStatus #msg_status { index_on_disk = true },
  1213      maybe_write_delivered(IsDelivered, SeqId, IndexState1)};
  1214 maybe_write_index_to_disk(_Force, MsgStatus, IndexState) ->
  1215     {MsgStatus, IndexState}.
  1216 
  1217 maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
  1218                     State = #vqstate { index_state       = IndexState,
  1219                                        msg_store_clients = MSCState }) ->
  1220     MsgStatus1 = maybe_write_msg_to_disk(ForceMsg, MsgStatus, MSCState),
  1221     {MsgStatus2, IndexState1} =
  1222         maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState),
  1223     {MsgStatus2, State #vqstate { index_state = IndexState1 }}.
  1224 
  1225 %%----------------------------------------------------------------------------
  1226 %% Internal gubbins for acks
  1227 %%----------------------------------------------------------------------------
  1228 
  1229 record_pending_ack(#msg_status { seq_id        = SeqId,
  1230                                  msg_id        = MsgId,
  1231                                  msg_on_disk   = MsgOnDisk } = MsgStatus,
  1232                    State = #vqstate { pending_ack     = PA,
  1233                                       ram_ack_index   = RAI,
  1234                                       ack_in_counter  = AckInCount}) ->
  1235     {AckEntry, RAI1} =
  1236         case MsgOnDisk of
  1237             true  -> {m(trim_msg_status(MsgStatus)), RAI};
  1238             false -> {MsgStatus, gb_trees:insert(SeqId, MsgId, RAI)}
  1239         end,
  1240     State #vqstate { pending_ack    = gb_trees:insert(SeqId, AckEntry, PA),
  1241                      ram_ack_index  = RAI1,
  1242                      ack_in_counter = AckInCount + 1}.
  1243 
  1244 remove_pending_ack(SeqId, State = #vqstate { pending_ack   = PA,
  1245                                              ram_ack_index = RAI }) ->
  1246     {gb_trees:get(SeqId, PA),
  1247      State #vqstate { pending_ack   = gb_trees:delete(SeqId, PA),
  1248                       ram_ack_index = gb_trees:delete_any(SeqId, RAI) }}.
  1249 
  1250 purge_pending_ack(KeepPersistent,
  1251                   State = #vqstate { pending_ack       = PA,
  1252                                      index_state       = IndexState,
  1253                                      msg_store_clients = MSCState }) ->
  1254     {IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} =
  1255         rabbit_misc:gb_trees_fold(fun (_SeqId, MsgStatus, Acc) ->
  1256                                           accumulate_ack(MsgStatus, Acc)
  1257                                   end, accumulate_ack_init(), PA),
  1258     State1 = State #vqstate { pending_ack   = gb_trees:empty(),
  1259                               ram_ack_index = gb_trees:empty() },
  1260     case KeepPersistent of
  1261         true  -> case orddict:find(false, MsgIdsByStore) of
  1262                      error        -> State1;
  1263                      {ok, MsgIds} -> ok = msg_store_remove(MSCState, false,
  1264                                                            MsgIds),
  1265                                     State1
  1266                  end;
  1267         false -> IndexState1 =
  1268                      rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
  1269                  [ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
  1270                   || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
  1271                  State1 #vqstate { index_state = IndexState1 }
  1272     end.
  1273 
  1274 accumulate_ack_init() -> {[], orddict:new(), []}.
  1275 
  1276 accumulate_ack(#msg_status { seq_id        = SeqId,
  1277                              msg_id        = MsgId,
  1278                              is_persistent = IsPersistent,
  1279                              msg_on_disk   = MsgOnDisk,
  1280                              index_on_disk = IndexOnDisk },
  1281                {IndexOnDiskSeqIdsAcc, MsgIdsByStore, AllMsgIds}) ->
  1282     {cons_if(IndexOnDisk, SeqId, IndexOnDiskSeqIdsAcc),
  1283      case MsgOnDisk of
  1284          true  -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore);
  1285          false -> MsgIdsByStore
  1286      end,
  1287      [MsgId | AllMsgIds]}.
  1288 
  1289 find_persistent_count(LensByStore) ->
  1290     case orddict:find(true, LensByStore) of
  1291         error     -> 0;
  1292         {ok, Len} -> Len
  1293     end.
  1294 
  1295 %%----------------------------------------------------------------------------
  1296 %% Internal plumbing for confirms (aka publisher acks)
  1297 %%----------------------------------------------------------------------------
  1298 
  1299 record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk        = MOD,
  1300                                              msg_indices_on_disk = MIOD,
  1301                                              unconfirmed         = UC,
  1302                                              confirmed           = C }) ->
  1303     State #vqstate {
  1304       msgs_on_disk        = rabbit_misc:gb_sets_difference(MOD,  MsgIdSet),
  1305       msg_indices_on_disk = rabbit_misc:gb_sets_difference(MIOD, MsgIdSet),
  1306       unconfirmed         = rabbit_misc:gb_sets_difference(UC,   MsgIdSet),
  1307       confirmed           = gb_sets:union(C, MsgIdSet) }.
  1308 
  1309 must_sync_index(#vqstate { msg_indices_on_disk = MIOD,
  1310                            unconfirmed = UC }) ->
  1311     %% If UC is empty then by definition, MIOD and MOD are also empty
  1312     %% and there's nothing that can be pending a sync.
  1313 
  1314     %% If UC is not empty, then we want to find is_empty(UC - MIOD),
  1315     %% but the subtraction can be expensive. Thus instead, we test to
  1316     %% see if UC is a subset of MIOD. This can only be the case if
  1317     %% MIOD == UC, which would indicate that every message in UC is
  1318     %% also in MIOD and is thus _all_ pending on a msg_store sync, not
  1319     %% on a qi sync. Thus the negation of this is sufficient. Because
  1320     %% is_subset is short circuiting, this is more efficient than the
  1321     %% subtraction.
  1322     not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)).
  1323 
  1324 blind_confirm(Callback, MsgIdSet) ->
  1325     Callback(?MODULE,
  1326              fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end).
  1327 
  1328 msgs_written_to_disk(Callback, MsgIdSet, ignored) ->
  1329     blind_confirm(Callback, MsgIdSet);
  1330 msgs_written_to_disk(Callback, MsgIdSet, written) ->
  1331     Callback(?MODULE,
  1332              fun (?MODULE, State = #vqstate { msgs_on_disk        = MOD,
  1333                                               msg_indices_on_disk = MIOD,
  1334                                               unconfirmed         = UC }) ->
  1335                      Confirmed = gb_sets:intersection(UC, MsgIdSet),
  1336                      record_confirms(gb_sets:intersection(MsgIdSet, MIOD),
  1337                                      State #vqstate {
  1338                                        msgs_on_disk =
  1339                                            gb_sets:union(MOD, Confirmed) })
  1340              end).
  1341 
  1342 msg_indices_written_to_disk(Callback, MsgIdSet) ->
  1343     Callback(?MODULE,
  1344              fun (?MODULE, State = #vqstate { msgs_on_disk        = MOD,
  1345                                               msg_indices_on_disk = MIOD,
  1346                                               unconfirmed         = UC }) ->
  1347                      Confirmed = gb_sets:intersection(UC, MsgIdSet),
  1348                      record_confirms(gb_sets:intersection(MsgIdSet, MOD),
  1349                                      State #vqstate {
  1350                                        msg_indices_on_disk =
  1351                                            gb_sets:union(MIOD, Confirmed) })
  1352              end).
  1353 
  1354 %%----------------------------------------------------------------------------
  1355 %% Internal plumbing for requeue
  1356 %%----------------------------------------------------------------------------
  1357 
  1358 publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
  1359     read_msg(MsgStatus, State);
  1360 publish_alpha(MsgStatus, #vqstate {ram_msg_count = RamMsgCount } = State) ->
  1361     {MsgStatus, State #vqstate { ram_msg_count = RamMsgCount + 1 }}.
  1362 
  1363 publish_beta(MsgStatus, State) ->
  1364     {#msg_status { msg = Msg} = MsgStatus1,
  1365      #vqstate { ram_msg_count = RamMsgCount } = State1} =
  1366         maybe_write_to_disk(true, false, MsgStatus, State),
  1367     {MsgStatus1, State1 #vqstate {
  1368                    ram_msg_count = RamMsgCount + one_if(Msg =/= undefined) }}.
  1369 
  1370 %% Rebuild queue, inserting sequence ids to maintain ordering
  1371 queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) ->
  1372     queue_merge(SeqIds, Q, ?QUEUE:new(), MsgIds,
  1373                 Limit, PubFun, State).
  1374 
  1375 queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds,
  1376             Limit, PubFun, State)
  1377   when Limit == undefined orelse SeqId < Limit ->
  1378     case ?QUEUE:out(Q) of
  1379         {{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1}
  1380           when SeqIdQ < SeqId ->
  1381             %% enqueue from the remaining queue
  1382             queue_merge(SeqIds, Q1, ?QUEUE:in(MsgStatus, Front), MsgIds,
  1383                         Limit, PubFun, State);
  1384         {_, _Q1} ->
  1385             %% enqueue from the remaining list of sequence ids
  1386             {MsgStatus, State1} = msg_from_pending_ack(SeqId, State),
  1387             {#msg_status { msg_id = MsgId } = MsgStatus1, State2} =
  1388                 PubFun(MsgStatus, State1),
  1389             queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds],
  1390                         Limit, PubFun, State2)
  1391     end;
  1392 queue_merge(SeqIds, Q, Front, MsgIds,
  1393             _Limit, _PubFun, State) ->
  1394     {SeqIds, ?QUEUE:join(Front, Q), MsgIds, State}.
  1395 
  1396 delta_merge([], Delta, MsgIds, State) ->
  1397     {Delta, MsgIds, State};
  1398 delta_merge(SeqIds, Delta, MsgIds, State) ->
  1399     lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0}) ->
  1400                         {#msg_status { msg_id = MsgId } = MsgStatus, State1} =
  1401                             msg_from_pending_ack(SeqId, State0),
  1402                         {_MsgStatus, State2} =
  1403                             maybe_write_to_disk(true, true, MsgStatus, State1),
  1404                         {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], State2}
  1405                 end, {Delta, MsgIds, State}, SeqIds).
  1406 
  1407 %% Mostly opposite of record_pending_ack/2
  1408 msg_from_pending_ack(SeqId, State) ->
  1409     {#msg_status { msg_props = MsgProps } = MsgStatus, State1} =
  1410         remove_pending_ack(SeqId, State),
  1411     {MsgStatus #msg_status {
  1412        msg_props = MsgProps #message_properties { needs_confirming = false } },
  1413      State1}.
  1414 
  1415 beta_limit(Q) ->
  1416     case ?QUEUE:peek(Q) of
  1417         {value, #msg_status { seq_id = SeqId }} -> SeqId;
  1418         empty                                   -> undefined
  1419     end.
  1420 
  1421 delta_limit(?BLANK_DELTA_PATTERN(_X))             -> undefined;
  1422 delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
  1423 
  1424 %%----------------------------------------------------------------------------
  1425 %% Phase changes
  1426 %%----------------------------------------------------------------------------
  1427 
  1428 %% Determine whether a reduction in memory use is necessary, and call
  1429 %% functions to perform the required phase changes. The function can
  1430 %% also be used to just do the former, by passing in dummy phase
  1431 %% change functions.
  1432 %%
  1433 %% The function does not report on any needed beta->delta conversions,
  1434 %% though the conversion function for that is called as necessary. The
  1435 %% reason is twofold. Firstly, this is safe because the conversion is
  1436 %% only ever necessary just after a transition to a
  1437 %% target_ram_count of zero or after an incremental alpha->beta
  1438 %% conversion. In the former case the conversion is performed straight
  1439 %% away (i.e. any betas present at the time are converted to deltas),
  1440 %% and in the latter case the need for a conversion is flagged up
  1441 %% anyway. Secondly, this is necessary because we do not have a
  1442 %% precise and cheap predicate for determining whether a beta->delta
  1443 %% conversion is necessary - due to the complexities of retaining up
  1444 %% one segment's worth of messages in q3 - and thus would risk
  1445 %% perpetually reporting the need for a conversion when no such
  1446 %% conversion is needed. That in turn could cause an infinite loop.
  1447 reduce_memory_use(_AlphaBetaFun, _BetaDeltaFun, _AckFun,
  1448                   State = #vqstate {target_ram_count = infinity}) ->
  1449     {false, State};
  1450 reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
  1451                   State = #vqstate {
  1452                     ram_ack_index    = RamAckIndex,
  1453                     ram_msg_count    = RamMsgCount,
  1454                     target_ram_count = TargetRamCount,
  1455                     rates            = #rates { avg_ingress = AvgIngress,
  1456                                                 avg_egress  = AvgEgress },
  1457                     ack_rates        = #rates { avg_ingress = AvgAckIngress,
  1458                                                 avg_egress  = AvgAckEgress }
  1459                    }) ->
  1460 
  1461     {Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} =
  1462         case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex),
  1463                         TargetRamCount) of
  1464             0  -> {false, State};
  1465             %% Reduce memory of pending acks and alphas. The order is
  1466             %% determined based on which is growing faster. Whichever
  1467             %% comes second may very well get a quota of 0 if the
  1468             %% first manages to push out the max number of messages.
  1469             S1 -> {_, State2} =
  1470                       lists:foldl(fun (ReduceFun, {QuotaN, StateN}) ->
  1471                                           ReduceFun(QuotaN, StateN)
  1472                                   end,
  1473                                   {S1, State},
  1474                                   case (AvgAckIngress - AvgAckEgress) >
  1475                                       (AvgIngress - AvgEgress) of
  1476                                       true  -> [AckFun, AlphaBetaFun];
  1477                                       false -> [AlphaBetaFun, AckFun]
  1478                                   end),
  1479                   {true, State2}
  1480         end,
  1481 
  1482     case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3),
  1483                     permitted_beta_count(State1)) of
  1484         ?IO_BATCH_SIZE = S2 -> {true, BetaDeltaFun(S2, State1)};
  1485         _                   -> {Reduce, State1}
  1486     end.
  1487 
  1488 limit_ram_acks(0, State) ->
  1489     {0, State};
  1490 limit_ram_acks(Quota, State = #vqstate { pending_ack   = PA,
  1491                                          ram_ack_index = RAI }) ->
  1492     case gb_trees:is_empty(RAI) of
  1493         true ->
  1494             {Quota, State};
  1495         false ->
  1496             {SeqId, MsgId, RAI1} = gb_trees:take_largest(RAI),
  1497             MsgStatus = #msg_status { msg_id = MsgId, is_persistent = false} =
  1498                 gb_trees:get(SeqId, PA),
  1499             {MsgStatus1, State1} =
  1500                 maybe_write_to_disk(true, false, MsgStatus, State),
  1501             PA1 = gb_trees:update(SeqId, m(trim_msg_status(MsgStatus1)), PA),
  1502             limit_ram_acks(Quota - 1,
  1503                            State1 #vqstate { pending_ack   = PA1,
  1504                                              ram_ack_index = RAI1 })
  1505     end.
  1506 
  1507 reduce_memory_use(State) ->
  1508     {_, State1} = reduce_memory_use(fun push_alphas_to_betas/2,
  1509                                     fun push_betas_to_deltas/2,
  1510                                     fun limit_ram_acks/2,
  1511                                     State),
  1512     State1.
  1513 
  1514 permitted_beta_count(#vqstate { len = 0 }) ->
  1515     infinity;
  1516 permitted_beta_count(#vqstate { target_ram_count = 0, q3 = Q3 }) ->
  1517     lists:min([?QUEUE:len(Q3), rabbit_queue_index:next_segment_boundary(0)]);
  1518 permitted_beta_count(#vqstate { q1               = Q1,
  1519                                 q4               = Q4,
  1520                                 target_ram_count = TargetRamCount,
  1521                                 len              = Len }) ->
  1522     BetaDelta = Len - ?QUEUE:len(Q1) - ?QUEUE:len(Q4),
  1523     lists:max([rabbit_queue_index:next_segment_boundary(0),
  1524                BetaDelta - ((BetaDelta * BetaDelta) div
  1525                                 (BetaDelta + TargetRamCount))]).
  1526 
  1527 chunk_size(Current, Permitted)
  1528   when Permitted =:= infinity orelse Permitted >= Current ->
  1529     0;
  1530 chunk_size(Current, Permitted) ->
  1531     lists:min([Current - Permitted, ?IO_BATCH_SIZE]).
  1532 
  1533 fetch_from_q3(State = #vqstate { q1    = Q1,
  1534                                  q2    = Q2,
  1535                                  delta = #delta { count = DeltaCount },
  1536                                  q3    = Q3,
  1537                                  q4    = Q4 }) ->
  1538     case ?QUEUE:out(Q3) of
  1539         {empty, _Q3} ->
  1540             {empty, State};
  1541         {{value, MsgStatus}, Q3a} ->
  1542             State1 = State #vqstate { q3 = Q3a },
  1543             State2 = case {?QUEUE:is_empty(Q3a), 0 == DeltaCount} of
  1544                          {true, true} ->
  1545                              %% q3 is now empty, it wasn't before;
  1546                              %% delta is still empty. So q2 must be
  1547                              %% empty, and we know q4 is empty
  1548                              %% otherwise we wouldn't be loading from
  1549                              %% q3. As such, we can just set q4 to Q1.
  1550                              true = ?QUEUE:is_empty(Q2), %% ASSERTION
  1551                              true = ?QUEUE:is_empty(Q4), %% ASSERTION
  1552                              State1 #vqstate { q1 = ?QUEUE:new(), q4 = Q1 };
  1553                          {true, false} ->
  1554                              maybe_deltas_to_betas(State1);
  1555                          {false, _} ->
  1556                              %% q3 still isn't empty, we've not
  1557                              %% touched delta, so the invariants
  1558                              %% between q1, q2, delta and q3 are
  1559                              %% maintained
  1560                              State1
  1561                      end,
  1562             {loaded, {MsgStatus, State2}}
  1563     end.
  1564 
  1565 maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) ->
  1566     State;
  1567 maybe_deltas_to_betas(State = #vqstate {
  1568                         q2                   = Q2,
  1569                         delta                = Delta,
  1570                         q3                   = Q3,
  1571                         index_state          = IndexState,
  1572                         pending_ack          = PA,
  1573                         transient_threshold  = TransientThreshold }) ->
  1574     #delta { start_seq_id = DeltaSeqId,
  1575              count        = DeltaCount,
  1576              end_seq_id   = DeltaSeqIdEnd } = Delta,
  1577     DeltaSeqId1 =
  1578         lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId),
  1579                    DeltaSeqIdEnd]),
  1580     {List, IndexState1} =
  1581         rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState),
  1582     {Q3a, IndexState2} =
  1583         betas_from_index_entries(List, TransientThreshold, PA, IndexState1),
  1584     State1 = State #vqstate { index_state = IndexState2 },
  1585     case ?QUEUE:len(Q3a) of
  1586         0 ->
  1587             %% we ignored every message in the segment due to it being
  1588             %% transient and below the threshold
  1589             maybe_deltas_to_betas(
  1590               State1 #vqstate {
  1591                 delta = d(Delta #delta { start_seq_id = DeltaSeqId1 })});
  1592         Q3aLen ->
  1593             Q3b = ?QUEUE:join(Q3, Q3a),
  1594             case DeltaCount - Q3aLen of
  1595                 0 ->
  1596                     %% delta is now empty, but it wasn't before, so
  1597                     %% can now join q2 onto q3
  1598                     State1 #vqstate { q2    = ?QUEUE:new(),
  1599                                       delta = ?BLANK_DELTA,
  1600                                       q3    = ?QUEUE:join(Q3b, Q2) };
  1601                 N when N > 0 ->
  1602                     Delta1 = d(#delta { start_seq_id = DeltaSeqId1,
  1603                                         count        = N,
  1604                                         end_seq_id   = DeltaSeqIdEnd }),
  1605                     State1 #vqstate { delta = Delta1,
  1606                                       q3    = Q3b }
  1607             end
  1608     end.
  1609 
  1610 push_alphas_to_betas(Quota, State) ->
  1611     {Quota1, State1} =
  1612         push_alphas_to_betas(
  1613           fun ?QUEUE:out/1,
  1614           fun (MsgStatus, Q1a,
  1615                State0 = #vqstate { q3 = Q3, delta = #delta { count = 0 } }) ->
  1616                   State0 #vqstate { q1 = Q1a, q3 = ?QUEUE:in(MsgStatus, Q3) };
  1617               (MsgStatus, Q1a, State0 = #vqstate { q2 = Q2 }) ->
  1618                   State0 #vqstate { q1 = Q1a, q2 = ?QUEUE:in(MsgStatus, Q2) }
  1619           end, Quota, State #vqstate.q1, State),
  1620     {Quota2, State2} =
  1621         push_alphas_to_betas(
  1622           fun ?QUEUE:out_r/1,
  1623           fun (MsgStatus, Q4a, State0 = #vqstate { q3 = Q3 }) ->
  1624                   State0 #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3), q4 = Q4a }
  1625           end, Quota1, State1 #vqstate.q4, State1),
  1626     {Quota2, State2}.
  1627 
  1628 push_alphas_to_betas(_Generator, _Consumer, Quota, _Q,
  1629                      State = #vqstate { ram_msg_count    = RamMsgCount,
  1630                                         target_ram_count = TargetRamCount })
  1631   when Quota =:= 0 orelse
  1632        TargetRamCount =:= infinity orelse
  1633        TargetRamCount >= RamMsgCount ->
  1634     {Quota, State};
  1635 push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
  1636     case Generator(Q) of
  1637         {empty, _Q} ->
  1638             {Quota, State};
  1639         {{value, MsgStatus}, Qa} ->
  1640             {MsgStatus1 = #msg_status { msg_on_disk = true },
  1641              State1 = #vqstate { ram_msg_count = RamMsgCount }} =
  1642                 maybe_write_to_disk(true, false, MsgStatus, State),
  1643             MsgStatus2 = m(trim_msg_status(MsgStatus1)),
  1644             State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1 },
  1645             push_alphas_to_betas(Generator, Consumer, Quota - 1, Qa,
  1646                                  Consumer(MsgStatus2, Qa, State2))
  1647     end.
  1648 
  1649 push_betas_to_deltas(Quota, State = #vqstate { q2          = Q2,
  1650                                                delta       = Delta,
  1651                                                q3          = Q3,
  1652                                                index_state = IndexState }) ->
  1653     PushState = {Quota, Delta, IndexState},
  1654     {Q3a, PushState1} = push_betas_to_deltas(
  1655                           fun ?QUEUE:out_r/1,
  1656                           fun rabbit_queue_index:next_segment_boundary/1,
  1657                           Q3, PushState),
  1658     {Q2a, PushState2} = push_betas_to_deltas(
  1659                           fun ?QUEUE:out/1,
  1660                           fun (Q2MinSeqId) -> Q2MinSeqId end,
  1661                           Q2, PushState1),
  1662     {_, Delta1, IndexState1} = PushState2,
  1663     State #vqstate { q2          = Q2a,
  1664                      delta       = Delta1,
  1665                      q3          = Q3a,
  1666                      index_state = IndexState1 }.
  1667 
  1668 push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
  1669     case ?QUEUE:is_empty(Q) of
  1670         true ->
  1671             {Q, PushState};
  1672         false ->
  1673             {value, #msg_status { seq_id = MinSeqId }} = ?QUEUE:peek(Q),
  1674             {value, #msg_status { seq_id = MaxSeqId }} = ?QUEUE:peek_r(Q),
  1675             Limit = LimitFun(MinSeqId),
  1676             case MaxSeqId < Limit of
  1677                 true  -> {Q, PushState};
  1678                 false -> push_betas_to_deltas1(Generator, Limit, Q, PushState)
  1679             end
  1680     end.
  1681 
  1682 push_betas_to_deltas1(_Generator, _Limit, Q,
  1683                       {0, _Delta, _IndexState} = PushState) ->
  1684     {Q, PushState};
  1685 push_betas_to_deltas1(Generator, Limit, Q,
  1686                       {Quota, Delta, IndexState} = PushState) ->
  1687     case Generator(Q) of
  1688         {empty, _Q} ->
  1689             {Q, PushState};
  1690         {{value, #msg_status { seq_id = SeqId }}, _Qa}
  1691           when SeqId < Limit ->
  1692             {Q, PushState};
  1693         {{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} ->
  1694             {#msg_status { index_on_disk = true }, IndexState1} =
  1695                 maybe_write_index_to_disk(true, MsgStatus, IndexState),
  1696             Delta1 = expand_delta(SeqId, Delta),
  1697             push_betas_to_deltas1(Generator, Limit, Qa,
  1698                                   {Quota - 1, Delta1, IndexState1})
  1699     end.
  1700 
  1701 %%----------------------------------------------------------------------------
  1702 %% Upgrading
  1703 %%----------------------------------------------------------------------------
  1704 
  1705 multiple_routing_keys() ->
  1706     transform_storage(
  1707       fun ({basic_message, ExchangeName, Routing_Key, Content,
  1708             MsgId, Persistent}) ->
  1709               {ok, {basic_message, ExchangeName, [Routing_Key], Content,
  1710                     MsgId, Persistent}};
  1711           (_) -> {error, corrupt_message}
  1712       end),
  1713     ok.
  1714 
  1715 
  1716 %% Assumes message store is not running
  1717 transform_storage(TransformFun) ->
  1718     transform_store(?PERSISTENT_MSG_STORE, TransformFun),
  1719     transform_store(?TRANSIENT_MSG_STORE, TransformFun).
  1720 
  1721 transform_store(Store, TransformFun) ->
  1722     rabbit_msg_store:force_recovery(rabbit_mnesia:dir(), Store),
  1723     rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun).