1 %% The contents of this file are subject to the Mozilla Public License
2 %% Version 1.1 (the "License"); you may not use this file except in
3 %% compliance with the License. You may obtain a copy of the License
4 %% at http://www.mozilla.org/MPL/
6 %% Software distributed under the License is distributed on an "AS IS"
7 %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8 %% the License for the specific language governing rights and
9 %% limitations under the License.
11 %% The Original Code is RabbitMQ.
13 %% The Initial Developer of the Original Code is VMware, Inc.
14 %% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
17 -module(rabbit_variable_queue).
19 -export([init/3, terminate/2, delete_and_terminate/2, purge/1,
20 publish/4, publish_delivered/5, drain_confirmed/1,
21 dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1,
22 set_ram_duration_target/2, ram_duration/1, needs_timeout/1,
23 timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
24 is_duplicate/2, discard/3, multiple_routing_keys/0, fold/3]).
26 -export([start/1, stop/0]).
28 %% exported for testing only
29 -export([start_msg_store/2, stop_msg_store/0, init/5]).
31 %%----------------------------------------------------------------------------
34 %% alpha: this is a message where both the message itself, and its
35 %% position within the queue are held in RAM
37 %% beta: this is a message where the message itself is only held on
38 %% disk, but its position within the queue is held in RAM.
40 %% gamma: this is a message where the message itself is only held on
41 %% disk, but its position is both in RAM and on disk.
43 %% delta: this is a collection of messages, represented by a single
44 %% term, where the messages and their position are only held on
47 %% Note that for persistent messages, the message and its position
48 %% within the queue are always held on disk, *in addition* to being in
49 %% one of the above classifications.
51 %% Also note that within this code, the term gamma seldom
52 %% appears. It's frequently the case that gammas are defined by betas
53 %% who have had their queue position recorded on disk.
55 %% In general, messages move q1 -> q2 -> delta -> q3 -> q4, though
56 %% many of these steps are frequently skipped. q1 and q4 only hold
57 %% alphas, q2 and q3 hold both betas and gammas. When a message
58 %% arrives, its classification is determined. It is then added to the
59 %% rightmost appropriate queue.
61 %% If a new message is determined to be a beta or gamma, q1 is
62 %% empty. If a new message is determined to be a delta, q1 and q2 are
63 %% empty (and actually q4 too).
65 %% When removing messages from a queue, if q4 is empty then q3 is read
66 %% directly. If q3 becomes empty then the next segment's worth of
67 %% messages from delta are read into q3, reducing the size of
68 %% delta. If the queue is non empty, either q4 or q3 contain
69 %% entries. It is never permitted for delta to hold all the messages
72 %% The duration indicated to us by the memory_monitor is used to
73 %% calculate, given our current ingress and egress rates, how many
74 %% messages we should hold in RAM (i.e. as alphas). We track the
75 %% ingress and egress rates for both messages and pending acks and
76 %% rates for both are considered when calculating the number of
77 %% messages to hold in RAM. When we need to push alphas to betas or
78 %% betas to gammas, we favour writing out messages that are further
79 %% from the head of the queue. This minimises writes to disk, as the
80 %% messages closer to the tail of the queue stay in the queue for
81 %% longer, thus do not need to be replaced as quickly by sending other
84 %% Whilst messages are pushed to disk and forgotten from RAM as soon
85 %% as requested by a new setting of the queue RAM duration, the
86 %% inverse is not true: we only load messages back into RAM as
87 %% demanded as the queue is read from. Thus only publishes to the
88 %% queue will take up available spare capacity.
90 %% When we report our duration to the memory monitor, we calculate
91 %% average ingress and egress rates over the last two samples, and
92 %% then calculate our duration based on the sum of the ingress and
93 %% egress rates. More than two samples could be used, but it's a
94 %% balance between responding quickly enough to changes in
95 %% producers/consumers versus ignoring temporary blips. The problem
96 %% with temporary blips is that with just a few queues, they can have
97 %% substantial impact on the calculation of the average duration and
98 %% hence cause unnecessary I/O. Another alternative is to increase the
99 %% amqqueue_process:RAM_DURATION_UPDATE_PERIOD to beyond 5
100 %% seconds. However, that then runs the risk of being too slow to
101 %% inform the memory monitor of changes. Thus a 5 second interval,
102 %% plus a rolling average over the last two samples seems to work
105 %% The sum of the ingress and egress rates is used because the egress
106 %% rate alone is not sufficient. Adding in the ingress rate means that
107 %% queues which are being flooded by messages are given more memory,
108 %% resulting in them being able to process the messages faster (by
109 %% doing less I/O, or at least deferring it) and thus helping keep
110 %% their mailboxes empty and thus the queue as a whole is more
111 %% responsive. If such a queue also has fast but previously idle
112 %% consumers, the consumer can then start to be driven as fast as it
113 %% can go, whereas if only egress rate was being used, the incoming
114 %% messages may have to be written to disk and then read back in,
115 %% resulting in the hard disk being a bottleneck in driving the
116 %% consumers. Generally, we want to give Rabbit every chance of
117 %% getting rid of messages as fast as possible and remaining
118 %% responsive, and using only the egress rate impacts that goal.
120 %% Once the queue has more alphas than the target_ram_count, the
121 %% surplus must be converted to betas, if not gammas, if not rolled
122 %% into delta. The conditions under which these transitions occur
123 %% reflect the conflicting goals of minimising RAM cost per msg, and
124 %% minimising CPU cost per msg. Once the msg has become a beta, its
125 %% payload is no longer in RAM, thus a read from the msg_store must
126 %% occur before the msg can be delivered, but the RAM cost of a beta
127 %% is the same as a gamma, so converting a beta to gamma will not free
128 %% up any further RAM. To reduce the RAM cost further, the gamma must
129 %% be rolled into delta. Whilst recovering a beta or a gamma to an
130 %% alpha requires only one disk read (from the msg_store), recovering
131 %% a msg from within delta will require two reads (queue_index and
132 %% then msg_store). But delta has a near-0 per-msg RAM cost. So the
133 %% conflict is between using delta more, which will free up more
134 %% memory, but require additional CPU and disk ops, versus using delta
135 %% less and gammas and betas more, which will cost more memory, but
136 %% require fewer disk ops and less CPU overhead.
138 %% In the case of a persistent msg published to a durable queue, the
139 %% msg is immediately written to the msg_store and queue_index. If
140 %% then additionally converted from an alpha, it'll immediately go to
141 %% a gamma (as it's already in queue_index), and cannot exist as a
142 %% beta. Thus a durable queue with a mixture of persistent and
143 %% transient msgs in it which has more messages than permitted by the
144 %% target_ram_count may contain an interspersed mixture of betas and
145 %% gammas in q2 and q3.
147 %% There is then a ratio that controls how many betas and gammas there
148 %% can be. This is based on the target_ram_count and thus expresses
149 %% the fact that as the number of permitted alphas in the queue falls,
150 %% so should the number of betas and gammas fall (i.e. delta
151 %% grows). If q2 and q3 contain more than the permitted number of
152 %% betas and gammas, then the surplus are forcibly converted to gammas
153 %% (as necessary) and then rolled into delta. The ratio is that
154 %% delta/(betas+gammas+delta) equals
155 %% (betas+gammas+delta)/(target_ram_count+betas+gammas+delta). I.e. as
156 %% the target_ram_count shrinks to 0, so must betas and gammas.
158 %% The conversion of betas to gammas is done in batches of exactly
159 %% ?IO_BATCH_SIZE. This value should not be too small, otherwise the
160 %% frequent operations on the queues of q2 and q3 will not be
161 %% effectively amortised (switching the direction of queue access
162 %% defeats amortisation), nor should it be too big, otherwise
163 %% converting a batch stalls the queue for too long. Therefore, it
164 %% must be just right.
166 %% The conversion from alphas to betas is also chunked, but only to
167 %% ensure no more than ?IO_BATCH_SIZE alphas are converted to betas at
168 %% any one time. This further smooths the effects of changes to the
169 %% target_ram_count and ensures the queue remains responsive
170 %% even when there is a large amount of IO work to do. The
171 %% timeout callback is utilised to ensure that conversions are
172 %% done as promptly as possible whilst ensuring the queue remains
175 %% In the queue we keep track of both messages that are pending
176 %% delivery and messages that are pending acks. In the event of a
177 %% queue purge, we only need to load qi segments if the queue has
178 %% elements in deltas (i.e. it came under significant memory
179 %% pressure). In the event of a queue deletion, in addition to the
180 %% preceding, by keeping track of pending acks in RAM, we do not need
181 %% to search through qi segments looking for messages that are yet to
184 %% Pending acks are recorded in memory by storing the message itself.
185 %% If the message has been sent to disk, we do not store the message
186 %% content. During memory reduction, pending acks containing message
187 %% content have that content removed and the corresponding messages
188 %% are pushed out to disk.
190 %% Messages from pending acks are returned to q4, q3 and delta during
191 %% requeue, based on the limits of seq_id contained in each. Requeued
192 %% messages retain their original seq_id, maintaining order
195 %% The order in which alphas are pushed to betas and pending acks
196 %% are pushed to disk is determined dynamically. We always prefer to
197 %% push messages for the source (alphas or acks) that is growing the
198 %% fastest (with growth measured as avg. ingress - avg. egress). In
199 %% each round of memory reduction a chunk of messages at most
200 %% ?IO_BATCH_SIZE in size is allocated to be pushed to disk. The
201 %% fastest growing source will be reduced by as much of this chunk as
202 %% possible. If there is any remaining allocation in the chunk after
203 %% the first source has been reduced to zero, the second source will
204 %% be reduced by as much of the remaining chunk as possible.
206 %% Notes on Clean Shutdown
207 %% (This documents behaviour in variable_queue, queue_index and
210 %% In order to try to achieve as fast a start-up as possible, if a
211 %% clean shutdown occurs, we try to save out state to disk to reduce
212 %% work on startup. In the msg_store this takes the form of the
213 %% index_module's state, plus the file_summary ets table, and client
214 %% refs. In the VQ, this takes the form of the count of persistent
215 %% messages in the queue and references into the msg_stores. The
216 %% queue_index adds to these terms the details of its segments and
217 %% stores the terms in the queue directory.
219 %% Two message stores are used. One is created for persistent messages
220 %% to durable queues that must survive restarts, and the other is used
221 %% for all other messages that just happen to need to be written to
222 %% disk. On start up we can therefore nuke the transient message
223 %% store, and be sure that the messages in the persistent store are
226 %% The references to the msg_stores are there so that the msg_store
227 %% knows to only trust its saved state if all of the queues it was
228 %% previously talking to come up cleanly. Likewise, the queues
229 %% themselves (esp queue_index) skips work in init if all the queues
230 %% and msg_store were shutdown cleanly. This gives both good speed
231 %% improvements and also robustness so that if anything possibly went
232 %% wrong in shutdown (or there was subsequent manual tampering), all
233 %% messages and queues that can be recovered are recovered, safely.
235 %% To delete transient messages lazily, the variable_queue, on
236 %% startup, stores the next_seq_id reported by the queue_index as the
237 %% transient_threshold. From that point on, whenever it's reading a
238 %% message off disk via the queue_index, if the seq_id is below this
239 %% threshold and the message is transient then it drops the message
240 %% (the message itself won't exist on disk because it would have been
241 %% stored in the transient msg_store which would have had its saved
242 %% state nuked on startup). This avoids the expensive operation of
243 %% scanning the entire queue on startup in order to delete transient
244 %% messages that were only pushed to disk to save memory.
246 %%----------------------------------------------------------------------------
248 -behaviour(rabbit_backing_queue).
286 -record(rates, { egress, ingress, avg_egress, avg_ingress, timestamp }).
300 { start_seq_id, %% start_seq_id is inclusive
302 end_seq_id %% end_seq_id is exclusive
305 %% When we discover, on publish, that we should write some indices to
306 %% disk for some betas, the IO_BATCH_SIZE sets the number of betas
307 %% that we must be due to write indices for before we do any work at
308 %% all. This is both a minimum and a maximum - we don't write fewer
309 %% than IO_BATCH_SIZE indices out in one go, and we don't write more -
310 %% we can always come back on the next publish to do more.
311 -define(IO_BATCH_SIZE, 64).
312 -define(PERSISTENT_MSG_STORE, msg_store_persistent).
313 -define(TRANSIENT_MSG_STORE, msg_store_transient).
314 -define(QUEUE, lqueue).
316 -include("rabbit.hrl").
318 %%----------------------------------------------------------------------------
320 -rabbit_upgrade({multiple_routing_keys, local, []}).
324 -type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}).
325 -type(seq_id() :: non_neg_integer()).
327 -type(rates() :: #rates { egress :: {timestamp(), non_neg_integer()},
328 ingress :: {timestamp(), non_neg_integer()},
329 avg_egress :: float(),
330 avg_ingress :: float(),
331 timestamp :: timestamp() }).
333 -type(delta() :: #delta { start_seq_id :: non_neg_integer(),
334 count :: non_neg_integer(),
335 end_seq_id :: non_neg_integer() }).
337 %% The compiler (rightfully) complains that ack() and state() are
338 %% unused. For this reason we duplicate a -spec from
339 %% rabbit_backing_queue with the only intent being to remove
340 %% warnings. The problem here is that we can't parameterise the BQ
341 %% behaviour by these two types as we would like to. We still leave
342 %% these here for documentation purposes.
343 -type(ack() :: seq_id()).
344 -type(state() :: #vqstate {
345 q1 :: ?QUEUE:?QUEUE(),
346 q2 :: ?QUEUE:?QUEUE(),
348 q3 :: ?QUEUE:?QUEUE(),
349 q4 :: ?QUEUE:?QUEUE(),
350 next_seq_id :: seq_id(),
351 pending_ack :: gb_tree(),
352 ram_ack_index :: gb_tree(),
353 index_state :: any(),
354 msg_store_clients :: 'undefined' | {{any(), binary()},
356 durable :: boolean(),
357 transient_threshold :: non_neg_integer(),
359 async_callback :: rabbit_backing_queue:async_callback(),
361 len :: non_neg_integer(),
362 persistent_count :: non_neg_integer(),
364 target_ram_count :: non_neg_integer() | 'infinity',
365 ram_msg_count :: non_neg_integer(),
366 ram_msg_count_prev :: non_neg_integer(),
367 out_counter :: non_neg_integer(),
368 in_counter :: non_neg_integer(),
370 msgs_on_disk :: gb_set(),
371 msg_indices_on_disk :: gb_set(),
372 unconfirmed :: gb_set(),
373 confirmed :: gb_set(),
374 ack_out_counter :: non_neg_integer(),
375 ack_in_counter :: non_neg_integer(),
376 ack_rates :: rates() }).
377 %% Duplicated from rabbit_backing_queue
378 -spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
380 -spec(multiple_routing_keys/0 :: () -> 'ok').
384 -define(BLANK_DELTA, #delta { start_seq_id = undefined,
386 end_seq_id = undefined }).
387 -define(BLANK_DELTA_PATTERN(Z), #delta { start_seq_id = Z,
391 %%----------------------------------------------------------------------------
393 %%----------------------------------------------------------------------------
395 start(DurableQueues) ->
396 {AllTerms, StartFunState} = rabbit_queue_index:recover(DurableQueues),
398 [Ref || Terms <- AllTerms,
400 Ref = proplists:get_value(persistent_ref, Terms),
405 stop() -> stop_msg_store().
407 start_msg_store(Refs, StartFunState) ->
408 ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store,
409 [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(),
410 undefined, {fun (ok) -> finished end, ok}]),
411 ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store,
412 [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(),
413 Refs, StartFunState]).
416 ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
417 ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
419 init(Queue, Recover, AsyncCallback) ->
420 init(Queue, Recover, AsyncCallback,
421 fun (MsgIds, ActionTaken) ->
422 msgs_written_to_disk(AsyncCallback, MsgIds, ActionTaken)
424 fun (MsgIds) -> msg_indices_written_to_disk(AsyncCallback, MsgIds) end).
426 init(#amqqueue { name = QueueName, durable = IsDurable }, false,
427 AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
428 IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun),
429 init(IsDurable, IndexState, 0, [], AsyncCallback,
431 true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
432 MsgOnDiskFun, AsyncCallback);
435 msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback));
437 init(#amqqueue { name = QueueName, durable = true }, true,
438 AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
439 Terms = rabbit_queue_index:shutdown_terms(QueueName),
441 case proplists:get_value(persistent_ref, Terms) of
442 undefined -> {rabbit_guid:gen(), []};
443 PRef1 -> {PRef1, Terms}
445 PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
446 MsgOnDiskFun, AsyncCallback),
447 TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE,
448 undefined, AsyncCallback),
449 {DeltaCount, IndexState} =
450 rabbit_queue_index:recover(
452 rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
454 rabbit_msg_store:contains(MsgId, PersistentClient)
457 init(true, IndexState, DeltaCount, Terms1, AsyncCallback,
458 PersistentClient, TransientClient).
460 terminate(_Reason, State) ->
461 State1 = #vqstate { persistent_count = PCount,
462 index_state = IndexState,
463 msg_store_clients = {MSCStateP, MSCStateT} } =
464 purge_pending_ack(true, State),
465 PRef = case MSCStateP of
466 undefined -> undefined;
467 _ -> ok = rabbit_msg_store:client_terminate(MSCStateP),
468 rabbit_msg_store:client_ref(MSCStateP)
470 ok = rabbit_msg_store:client_delete_and_terminate(MSCStateT),
471 Terms = [{persistent_ref, PRef}, {persistent_count, PCount}],
472 a(State1 #vqstate { index_state = rabbit_queue_index:terminate(
474 msg_store_clients = undefined }).
476 %% the only difference between purge and delete is that delete also
477 %% needs to delete everything that's been delivered and not ack'd.
478 delete_and_terminate(_Reason, State) ->
479 %% TODO: there is no need to interact with qi at all - which we do
480 %% as part of 'purge' and 'purge_pending_ack', other than
482 {_PurgeCount, State1} = purge(State),
483 State2 = #vqstate { index_state = IndexState,
484 msg_store_clients = {MSCStateP, MSCStateT} } =
485 purge_pending_ack(false, State1),
486 IndexState1 = rabbit_queue_index:delete_and_terminate(IndexState),
489 _ -> rabbit_msg_store:client_delete_and_terminate(MSCStateP)
491 rabbit_msg_store:client_delete_and_terminate(MSCStateT),
492 a(State2 #vqstate { index_state = IndexState1,
493 msg_store_clients = undefined }).
495 purge(State = #vqstate { q4 = Q4,
496 index_state = IndexState,
497 msg_store_clients = MSCState,
499 persistent_count = PCount }) ->
500 %% TODO: when there are no pending acks, which is a common case,
501 %% we could simply wipe the qi instead of issuing delivers and
502 %% acks for all the messages.
503 {LensByStore, IndexState1} = remove_queue_entries(
504 fun ?QUEUE:foldl/3, Q4,
505 orddict:new(), IndexState, MSCState),
506 {LensByStore1, State1 = #vqstate { q1 = Q1,
507 index_state = IndexState2,
508 msg_store_clients = MSCState1 }} =
509 purge_betas_and_deltas(LensByStore,
510 State #vqstate { q4 = ?QUEUE:new(),
511 index_state = IndexState1 }),
512 {LensByStore2, IndexState3} = remove_queue_entries(
513 fun ?QUEUE:foldl/3, Q1,
514 LensByStore1, IndexState2, MSCState1),
515 PCount1 = PCount - find_persistent_count(LensByStore2),
516 {Len, a(State1 #vqstate { q1 = ?QUEUE:new(),
517 index_state = IndexState3,
520 persistent_count = PCount1 })}.
522 publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
523 MsgProps = #message_properties { needs_confirming = NeedsConfirming },
524 _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
527 in_counter = InCount,
528 persistent_count = PCount,
530 ram_msg_count = RamMsgCount,
531 unconfirmed = UC }) ->
532 IsPersistent1 = IsDurable andalso IsPersistent,
533 MsgStatus = msg_status(IsPersistent1, SeqId, Msg, MsgProps),
534 {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
535 State2 = case ?QUEUE:is_empty(Q3) of
536 false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) };
537 true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) }
539 PCount1 = PCount + one_if(IsPersistent1),
540 UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
541 a(reduce_memory_use(State2 #vqstate { next_seq_id = SeqId + 1,
543 in_counter = InCount + 1,
544 persistent_count = PCount1,
545 ram_msg_count = RamMsgCount + 1,
546 unconfirmed = UC1 })).
548 publish_delivered(false, #basic_message { id = MsgId },
549 #message_properties { needs_confirming = NeedsConfirming },
550 _ChPid, State = #vqstate { async_callback = Callback,
552 case NeedsConfirming of
553 true -> blind_confirm(Callback, gb_sets:singleton(MsgId));
556 {undefined, a(State)};
557 publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
559 MsgProps = #message_properties {
560 needs_confirming = NeedsConfirming },
561 _ChPid, State = #vqstate { len = 0,
563 out_counter = OutCount,
564 in_counter = InCount,
565 persistent_count = PCount,
567 unconfirmed = UC }) ->
568 IsPersistent1 = IsDurable andalso IsPersistent,
569 MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
570 #msg_status { is_delivered = true },
571 {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
572 State2 = record_pending_ack(m(MsgStatus1), State1),
573 PCount1 = PCount + one_if(IsPersistent1),
574 UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
575 {SeqId, a(reduce_memory_use(
576 State2 #vqstate { next_seq_id = SeqId + 1,
577 out_counter = OutCount + 1,
578 in_counter = InCount + 1,
579 persistent_count = PCount1,
580 unconfirmed = UC1 }))}.
582 drain_confirmed(State = #vqstate { confirmed = C }) ->
583 case gb_sets:is_empty(C) of
584 true -> {[], State}; %% common case
585 false -> {gb_sets:to_list(C), State #vqstate {
586 confirmed = gb_sets:new() }}
589 dropwhile(Pred, AckRequired, State) -> dropwhile(Pred, AckRequired, State, []).
591 dropwhile(Pred, AckRequired, State, Msgs) ->
592 End = fun(S) when AckRequired -> {lists:reverse(Msgs), S};
593 (S) -> {undefined, S}
595 case queue_out(State) of
598 {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
599 case {Pred(MsgProps), AckRequired} of
601 {MsgStatus1, State2} = read_msg(MsgStatus, State1),
602 {{Msg, _, AckTag, _}, State3} =
603 internal_fetch(true, MsgStatus1, State2),
604 dropwhile(Pred, AckRequired, State3, [{Msg, AckTag} | Msgs]);
606 {_, State2} = internal_fetch(false, MsgStatus, State1),
607 dropwhile(Pred, AckRequired, State2, undefined);
609 End(a(in_r(MsgStatus, State1)))
613 fetch(AckRequired, State) ->
614 case queue_out(State) of
617 {{value, MsgStatus}, State1} ->
618 %% it is possible that the message wasn't read from disk
619 %% at this point, so read it in.
620 {MsgStatus1, State2} = read_msg(MsgStatus, State1),
621 {Res, State3} = internal_fetch(AckRequired, MsgStatus1, State2),
627 ack(AckTags, State) ->
628 {{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds},
629 State1 = #vqstate { index_state = IndexState,
630 msg_store_clients = MSCState,
631 persistent_count = PCount,
632 ack_out_counter = AckOutCount }} =
634 fun (SeqId, {Acc, State2}) ->
635 {MsgStatus, State3} = remove_pending_ack(SeqId, State2),
636 {accumulate_ack(MsgStatus, Acc), State3}
637 end, {accumulate_ack_init(), State}, AckTags),
638 IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
639 [ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
640 || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
641 PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len(
642 orddict:new(), MsgIdsByStore)),
643 {lists:reverse(AllMsgIds),
644 a(State1 #vqstate { index_state = IndexState1,
645 persistent_count = PCount1,
646 ack_out_counter = AckOutCount + length(AckTags) })}.
648 fold(undefined, State, _AckTags) ->
650 fold(MsgFun, State = #vqstate{pending_ack = PA}, AckTags) ->
652 fun(SeqId, State1) ->
653 {MsgStatus, State2} =
654 read_msg(gb_trees:get(SeqId, PA), State1),
655 MsgFun(MsgStatus#msg_status.msg, SeqId),
657 end, State, AckTags).
659 requeue(AckTags, #vqstate { delta = Delta,
662 in_counter = InCounter,
663 len = Len } = State) ->
664 {SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [],
666 fun publish_alpha/2, State),
667 {SeqIds1, Q3a, MsgIds1, State2} = queue_merge(SeqIds, Q3, MsgIds,
669 fun publish_beta/2, State1),
670 {Delta1, MsgIds2, State3} = delta_merge(SeqIds1, Delta, MsgIds1,
672 MsgCount = length(MsgIds2),
673 {MsgIds2, a(reduce_memory_use(
674 State3 #vqstate { delta = Delta1,
677 in_counter = InCounter + MsgCount,
678 len = Len + MsgCount }))}.
680 len(#vqstate { len = Len }) -> Len.
682 is_empty(State) -> 0 == len(State).
684 set_ram_duration_target(
685 DurationTarget, State = #vqstate {
686 rates = #rates { avg_egress = AvgEgressRate,
687 avg_ingress = AvgIngressRate },
688 ack_rates = #rates { avg_egress = AvgAckEgressRate,
689 avg_ingress = AvgAckIngressRate },
690 target_ram_count = TargetRamCount }) ->
692 AvgEgressRate + AvgIngressRate + AvgAckEgressRate + AvgAckIngressRate,
694 case DurationTarget of
695 infinity -> infinity;
696 _ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec
698 State1 = State #vqstate { target_ram_count = TargetRamCount1 },
699 a(case TargetRamCount1 == infinity orelse
700 (TargetRamCount =/= infinity andalso
701 TargetRamCount1 >= TargetRamCount) of
703 false -> reduce_memory_use(State1)
706 ram_duration(State = #vqstate {
707 rates = #rates { timestamp = Timestamp,
709 ingress = Ingress } = Rates,
710 ack_rates = #rates { timestamp = AckTimestamp,
712 ingress = AckIngress } = ARates,
713 in_counter = InCount,
714 out_counter = OutCount,
715 ack_in_counter = AckInCount,
716 ack_out_counter = AckOutCount,
717 ram_msg_count = RamMsgCount,
718 ram_msg_count_prev = RamMsgCountPrev,
719 ram_ack_index = RamAckIndex,
720 ram_ack_count_prev = RamAckCountPrev }) ->
722 {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress),
723 {AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress),
725 {AvgAckEgressRate, AckEgress1} =
726 update_rate(Now, AckTimestamp, AckOutCount, AckEgress),
727 {AvgAckIngressRate, AckIngress1} =
728 update_rate(Now, AckTimestamp, AckInCount, AckIngress),
730 RamAckCount = gb_trees:size(RamAckIndex),
732 Duration = %% msgs+acks / (msgs+acks/sec) == sec
733 case (AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso
734 AvgAckEgressRate == 0 andalso AvgAckIngressRate == 0) of
736 false -> (RamMsgCountPrev + RamMsgCount +
737 RamAckCount + RamAckCountPrev) /
738 (4 * (AvgEgressRate + AvgIngressRate +
739 AvgAckEgressRate + AvgAckIngressRate))
742 {Duration, State #vqstate {
743 rates = Rates #rates {
746 avg_egress = AvgEgressRate,
747 avg_ingress = AvgIngressRate,
749 ack_rates = ARates #rates {
751 ingress = AckIngress1,
752 avg_egress = AvgAckEgressRate,
753 avg_ingress = AvgAckIngressRate,
759 ram_msg_count_prev = RamMsgCount,
760 ram_ack_count_prev = RamAckCount }}.
762 needs_timeout(State = #vqstate { index_state = IndexState }) ->
763 case must_sync_index(State) of
766 case rabbit_queue_index:needs_sync(IndexState) of
768 false -> case reduce_memory_use(
769 fun (_Quota, State1) -> {0, State1} end,
770 fun (_Quota, State1) -> State1 end,
771 fun (_Quota, State1) -> {0, State1} end,
773 {true, _State} -> idle;
774 {false, _State} -> false
779 timeout(State = #vqstate { index_state = IndexState }) ->
780 IndexState1 = rabbit_queue_index:sync(IndexState),
781 State1 = State #vqstate { index_state = IndexState1 },
782 a(reduce_memory_use(State1)).
784 handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
785 State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
788 q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
792 target_ram_count = TargetRamCount,
793 ram_msg_count = RamMsgCount,
794 next_seq_id = NextSeqId,
795 persistent_count = PersistentCount,
796 rates = #rates { avg_egress = AvgEgressRate,
797 avg_ingress = AvgIngressRate },
798 ack_rates = #rates { avg_egress = AvgAckEgressRate,
799 avg_ingress = AvgAckIngressRate } }) ->
800 [ {q1 , ?QUEUE:len(Q1)},
801 {q2 , ?QUEUE:len(Q2)},
803 {q3 , ?QUEUE:len(Q3)},
804 {q4 , ?QUEUE:len(Q4)},
806 {pending_acks , gb_trees:size(PA)},
807 {target_ram_count , TargetRamCount},
808 {ram_msg_count , RamMsgCount},
809 {ram_ack_count , gb_trees:size(RAI)},
810 {next_seq_id , NextSeqId},
811 {persistent_count , PersistentCount},
812 {avg_ingress_rate , AvgIngressRate},
813 {avg_egress_rate , AvgEgressRate},
814 {avg_ack_ingress_rate, AvgAckIngressRate},
815 {avg_ack_egress_rate , AvgAckEgressRate} ].
817 invoke(?MODULE, Fun, State) -> Fun(?MODULE, State).
819 is_duplicate(_Msg, State) -> {false, State}.
821 discard(_Msg, _ChPid, State) -> State.
823 %%----------------------------------------------------------------------------
825 %%----------------------------------------------------------------------------
827 a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
829 persistent_count = PersistentCount,
830 ram_msg_count = RamMsgCount }) ->
831 E1 = ?QUEUE:is_empty(Q1),
832 E2 = ?QUEUE:is_empty(Q2),
833 ED = Delta#delta.count == 0,
834 E3 = ?QUEUE:is_empty(Q3),
835 E4 = ?QUEUE:is_empty(Q4),
841 true = LZ == (E3 and E4),
844 true = PersistentCount >= 0,
845 true = RamMsgCount >= 0,
849 d(Delta = #delta { start_seq_id = Start, count = Count, end_seq_id = End })
850 when Start + Count =< End ->
853 m(MsgStatus = #msg_status { msg = Msg,
854 is_persistent = IsPersistent,
855 msg_on_disk = MsgOnDisk,
856 index_on_disk = IndexOnDisk }) ->
857 true = (not IsPersistent) or IndexOnDisk,
858 true = (not IndexOnDisk) or MsgOnDisk,
859 true = (Msg =/= undefined) or MsgOnDisk,
866 cons_if(true, E, L) -> [E | L];
867 cons_if(false, _E, L) -> L.
869 gb_sets_maybe_insert(false, _Val, Set) -> Set;
870 %% when requeueing, we re-add a msg_id to the unconfirmed set
871 gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
873 msg_status(IsPersistent, SeqId, Msg = #basic_message { id = MsgId },
875 #msg_status { seq_id = SeqId, msg_id = MsgId, msg = Msg,
876 is_persistent = IsPersistent, is_delivered = false,
877 msg_on_disk = false, index_on_disk = false,
878 msg_props = MsgProps }.
880 trim_msg_status(MsgStatus) -> MsgStatus #msg_status { msg = undefined }.
882 with_msg_store_state({MSCStateP, MSCStateT}, true, Fun) ->
883 {Result, MSCStateP1} = Fun(MSCStateP),
884 {Result, {MSCStateP1, MSCStateT}};
885 with_msg_store_state({MSCStateP, MSCStateT}, false, Fun) ->
886 {Result, MSCStateT1} = Fun(MSCStateT),
887 {Result, {MSCStateP, MSCStateT1}}.
889 with_immutable_msg_store_state(MSCState, IsPersistent, Fun) ->
890 {Res, MSCState} = with_msg_store_state(MSCState, IsPersistent,
892 {Fun(MSCState1), MSCState1}
896 msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) ->
897 msg_store_client_init(MsgStore, rabbit_guid:gen(), MsgOnDiskFun,
900 msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) ->
901 CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE),
902 rabbit_msg_store:client_init(MsgStore, Ref, MsgOnDiskFun,
903 fun () -> Callback(?MODULE, CloseFDsFun) end).
905 msg_store_write(MSCState, IsPersistent, MsgId, Msg) ->
906 with_immutable_msg_store_state(
907 MSCState, IsPersistent,
909 rabbit_msg_store:write_flow(MsgId, Msg, MSCState1)
912 msg_store_read(MSCState, IsPersistent, MsgId) ->
913 with_msg_store_state(
914 MSCState, IsPersistent,
916 rabbit_msg_store:read(MsgId, MSCState1)
919 msg_store_remove(MSCState, IsPersistent, MsgIds) ->
920 with_immutable_msg_store_state(
921 MSCState, IsPersistent,
923 rabbit_msg_store:remove(MsgIds, MCSState1)
926 msg_store_close_fds(MSCState, IsPersistent) ->
927 with_msg_store_state(
928 MSCState, IsPersistent,
929 fun (MSCState1) -> rabbit_msg_store:close_all_indicated(MSCState1) end).
931 msg_store_close_fds_fun(IsPersistent) ->
932 fun (?MODULE, State = #vqstate { msg_store_clients = MSCState }) ->
933 {ok, MSCState1} = msg_store_close_fds(MSCState, IsPersistent),
934 State #vqstate { msg_store_clients = MSCState1 }
937 maybe_write_delivered(false, _SeqId, IndexState) ->
939 maybe_write_delivered(true, SeqId, IndexState) ->
940 rabbit_queue_index:deliver([SeqId], IndexState).
942 betas_from_index_entries(List, TransientThreshold, PA, IndexState) ->
943 {Filtered, Delivers, Acks} =
945 fun ({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered},
946 {Filtered1, Delivers1, Acks1} = Acc) ->
947 case SeqId < TransientThreshold andalso not IsPersistent of
949 cons_if(not IsDelivered, SeqId, Delivers1),
951 false -> case gb_trees:is_defined(SeqId, PA) of
958 is_persistent = IsPersistent,
959 is_delivered = IsDelivered,
961 index_on_disk = true,
969 end, {?QUEUE:new(), [], []}, List),
970 {Filtered, rabbit_queue_index:ack(
971 Acks, rabbit_queue_index:deliver(Delivers, IndexState))}.
973 expand_delta(SeqId, ?BLANK_DELTA_PATTERN(X)) ->
974 d(#delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1 });
975 expand_delta(SeqId, #delta { start_seq_id = StartSeqId,
976 count = Count } = Delta)
977 when SeqId < StartSeqId ->
978 d(Delta #delta { start_seq_id = SeqId, count = Count + 1 });
979 expand_delta(SeqId, #delta { count = Count,
980 end_seq_id = EndSeqId } = Delta)
981 when SeqId >= EndSeqId ->
982 d(Delta #delta { count = Count + 1, end_seq_id = SeqId + 1 });
983 expand_delta(_SeqId, #delta { count = Count } = Delta) ->
984 d(Delta #delta { count = Count + 1 }).
986 update_rate(Now, Then, Count, {OThen, OCount}) ->
987 %% avg over the current period and the previous
988 {1000000.0 * (Count + OCount) / timer:now_diff(Now, OThen), {Then, Count}}.
990 %%----------------------------------------------------------------------------
991 %% Internal major helpers for Public API
992 %%----------------------------------------------------------------------------
994 init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
995 PersistentClient, TransientClient) ->
996 {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
998 DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount),
999 Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of
1000 true -> ?BLANK_DELTA;
1001 false -> d(#delta { start_seq_id = LowSeqId,
1002 count = DeltaCount1,
1003 end_seq_id = NextSeqId })
1012 next_seq_id = NextSeqId,
1013 pending_ack = gb_trees:empty(),
1014 ram_ack_index = gb_trees:empty(),
1015 index_state = IndexState1,
1016 msg_store_clients = {PersistentClient, TransientClient},
1017 durable = IsDurable,
1018 transient_threshold = NextSeqId,
1020 async_callback = AsyncCallback,
1023 persistent_count = DeltaCount1,
1025 target_ram_count = infinity,
1027 ram_msg_count_prev = 0,
1028 ram_ack_count_prev = 0,
1031 rates = blank_rate(Now, DeltaCount1),
1032 msgs_on_disk = gb_sets:new(),
1033 msg_indices_on_disk = gb_sets:new(),
1034 unconfirmed = gb_sets:new(),
1035 confirmed = gb_sets:new(),
1036 ack_out_counter = 0,
1038 ack_rates = blank_rate(Now, 0) },
1039 a(maybe_deltas_to_betas(State)).
1041 blank_rate(Timestamp, IngressLength) ->
1042 #rates { egress = {Timestamp, 0},
1043 ingress = {Timestamp, IngressLength},
1046 timestamp = Timestamp }.
1048 in_r(MsgStatus = #msg_status { msg = undefined },
1049 State = #vqstate { q3 = Q3, q4 = Q4 }) ->
1050 case ?QUEUE:is_empty(Q4) of
1051 true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) };
1052 false -> {MsgStatus1, State1 = #vqstate { q4 = Q4a }} =
1053 read_msg(MsgStatus, State),
1054 State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) }
1056 in_r(MsgStatus, State = #vqstate { q4 = Q4 }) ->
1057 State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }.
1059 queue_out(State = #vqstate { q4 = Q4 }) ->
1060 case ?QUEUE:out(Q4) of
1062 case fetch_from_q3(State) of
1063 {empty, _State1} = Result -> Result;
1064 {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1}
1066 {{value, MsgStatus}, Q4a} ->
1067 {{value, MsgStatus}, State #vqstate { q4 = Q4a }}
1070 read_msg(MsgStatus = #msg_status { msg = undefined,
1072 is_persistent = IsPersistent },
1073 State = #vqstate { ram_msg_count = RamMsgCount,
1074 msg_store_clients = MSCState}) ->
1075 {{ok, Msg = #basic_message {}}, MSCState1} =
1076 msg_store_read(MSCState, IsPersistent, MsgId),
1077 {MsgStatus #msg_status { msg = Msg },
1078 State #vqstate { ram_msg_count = RamMsgCount + 1,
1079 msg_store_clients = MSCState1 }};
1080 read_msg(MsgStatus, State) ->
1083 internal_fetch(AckRequired, MsgStatus = #msg_status {
1087 is_persistent = IsPersistent,
1088 is_delivered = IsDelivered,
1089 msg_on_disk = MsgOnDisk,
1090 index_on_disk = IndexOnDisk },
1091 State = #vqstate {ram_msg_count = RamMsgCount,
1092 out_counter = OutCount,
1093 index_state = IndexState,
1094 msg_store_clients = MSCState,
1096 persistent_count = PCount }) ->
1097 %% 1. Mark it delivered if necessary
1098 IndexState1 = maybe_write_delivered(
1099 IndexOnDisk andalso not IsDelivered,
1102 %% 2. Remove from msg_store and queue index, if necessary
1104 ok = msg_store_remove(MSCState, IsPersistent, [MsgId])
1106 Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end,
1108 case {AckRequired, MsgOnDisk, IndexOnDisk} of
1109 {false, true, false} -> Rem(), IndexState1;
1110 {false, true, true} -> Rem(), Ack();
1114 %% 3. If an ack is required, add something sensible to PA
1115 {AckTag, State1} = case AckRequired of
1116 true -> StateN = record_pending_ack(
1117 MsgStatus #msg_status {
1118 is_delivered = true }, State),
1120 false -> {undefined, State}
1123 PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
1125 RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined),
1127 {{Msg, IsDelivered, AckTag, Len1},
1128 State1 #vqstate { ram_msg_count = RamMsgCount1,
1129 out_counter = OutCount + 1,
1130 index_state = IndexState2,
1132 persistent_count = PCount1 }}.
1134 purge_betas_and_deltas(LensByStore,
1135 State = #vqstate { q3 = Q3,
1136 index_state = IndexState,
1137 msg_store_clients = MSCState }) ->
1138 case ?QUEUE:is_empty(Q3) of
1139 true -> {LensByStore, State};
1140 false -> {LensByStore1, IndexState1} =
1141 remove_queue_entries(fun ?QUEUE:foldl/3, Q3,
1142 LensByStore, IndexState, MSCState),
1143 purge_betas_and_deltas(LensByStore1,
1144 maybe_deltas_to_betas(
1147 index_state = IndexState1 }))
1150 remove_queue_entries(Fold, Q, LensByStore, IndexState, MSCState) ->
1151 {MsgIdsByStore, Delivers, Acks} =
1152 Fold(fun remove_queue_entries1/2, {orddict:new(), [], []}, Q),
1153 ok = orddict:fold(fun (IsPersistent, MsgIds, ok) ->
1154 msg_store_remove(MSCState, IsPersistent, MsgIds)
1155 end, ok, MsgIdsByStore),
1156 {sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore),
1157 rabbit_queue_index:ack(Acks,
1158 rabbit_queue_index:deliver(Delivers, IndexState))}.
1160 remove_queue_entries1(
1161 #msg_status { msg_id = MsgId, seq_id = SeqId,
1162 is_delivered = IsDelivered, msg_on_disk = MsgOnDisk,
1163 index_on_disk = IndexOnDisk, is_persistent = IsPersistent },
1164 {MsgIdsByStore, Delivers, Acks}) ->
1166 true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore);
1167 false -> MsgIdsByStore
1169 cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
1170 cons_if(IndexOnDisk, SeqId, Acks)}.
1172 sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore) ->
1174 fun (IsPersistent, MsgIds, LensByStore1) ->
1175 orddict:update_counter(IsPersistent, length(MsgIds), LensByStore1)
1176 end, LensByStore, MsgIdsByStore).
1178 %%----------------------------------------------------------------------------
1179 %% Internal gubbins for publishing
1180 %%----------------------------------------------------------------------------
1182 maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
1183 msg_on_disk = true }, _MSCState) ->
1185 maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
1186 msg = Msg, msg_id = MsgId,
1187 is_persistent = IsPersistent }, MSCState)
1188 when Force orelse IsPersistent ->
1189 Msg1 = Msg #basic_message {
1190 %% don't persist any recoverable decoded properties
1191 content = rabbit_binary_parser:clear_decoded_content(
1192 Msg #basic_message.content)},
1193 ok = msg_store_write(MSCState, IsPersistent, MsgId, Msg1),
1194 MsgStatus #msg_status { msg_on_disk = true };
1195 maybe_write_msg_to_disk(_Force, MsgStatus, _MSCState) ->
1198 maybe_write_index_to_disk(_Force, MsgStatus = #msg_status {
1199 index_on_disk = true }, IndexState) ->
1200 true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
1201 {MsgStatus, IndexState};
1202 maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
1205 is_persistent = IsPersistent,
1206 is_delivered = IsDelivered,
1207 msg_props = MsgProps}, IndexState)
1208 when Force orelse IsPersistent ->
1209 true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
1210 IndexState1 = rabbit_queue_index:publish(
1211 MsgId, SeqId, MsgProps, IsPersistent, IndexState),
1212 {MsgStatus #msg_status { index_on_disk = true },
1213 maybe_write_delivered(IsDelivered, SeqId, IndexState1)};
1214 maybe_write_index_to_disk(_Force, MsgStatus, IndexState) ->
1215 {MsgStatus, IndexState}.
1217 maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
1218 State = #vqstate { index_state = IndexState,
1219 msg_store_clients = MSCState }) ->
1220 MsgStatus1 = maybe_write_msg_to_disk(ForceMsg, MsgStatus, MSCState),
1221 {MsgStatus2, IndexState1} =
1222 maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState),
1223 {MsgStatus2, State #vqstate { index_state = IndexState1 }}.
1225 %%----------------------------------------------------------------------------
1226 %% Internal gubbins for acks
1227 %%----------------------------------------------------------------------------
1229 record_pending_ack(#msg_status { seq_id = SeqId,
1231 msg_on_disk = MsgOnDisk } = MsgStatus,
1232 State = #vqstate { pending_ack = PA,
1233 ram_ack_index = RAI,
1234 ack_in_counter = AckInCount}) ->
1237 true -> {m(trim_msg_status(MsgStatus)), RAI};
1238 false -> {MsgStatus, gb_trees:insert(SeqId, MsgId, RAI)}
1240 State #vqstate { pending_ack = gb_trees:insert(SeqId, AckEntry, PA),
1241 ram_ack_index = RAI1,
1242 ack_in_counter = AckInCount + 1}.
1244 remove_pending_ack(SeqId, State = #vqstate { pending_ack = PA,
1245 ram_ack_index = RAI }) ->
1246 {gb_trees:get(SeqId, PA),
1247 State #vqstate { pending_ack = gb_trees:delete(SeqId, PA),
1248 ram_ack_index = gb_trees:delete_any(SeqId, RAI) }}.
1250 purge_pending_ack(KeepPersistent,
1251 State = #vqstate { pending_ack = PA,
1252 index_state = IndexState,
1253 msg_store_clients = MSCState }) ->
1254 {IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} =
1255 rabbit_misc:gb_trees_fold(fun (_SeqId, MsgStatus, Acc) ->
1256 accumulate_ack(MsgStatus, Acc)
1257 end, accumulate_ack_init(), PA),
1258 State1 = State #vqstate { pending_ack = gb_trees:empty(),
1259 ram_ack_index = gb_trees:empty() },
1260 case KeepPersistent of
1261 true -> case orddict:find(false, MsgIdsByStore) of
1263 {ok, MsgIds} -> ok = msg_store_remove(MSCState, false,
1267 false -> IndexState1 =
1268 rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
1269 [ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
1270 || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
1271 State1 #vqstate { index_state = IndexState1 }
1274 accumulate_ack_init() -> {[], orddict:new(), []}.
1276 accumulate_ack(#msg_status { seq_id = SeqId,
1278 is_persistent = IsPersistent,
1279 msg_on_disk = MsgOnDisk,
1280 index_on_disk = IndexOnDisk },
1281 {IndexOnDiskSeqIdsAcc, MsgIdsByStore, AllMsgIds}) ->
1282 {cons_if(IndexOnDisk, SeqId, IndexOnDiskSeqIdsAcc),
1284 true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore);
1285 false -> MsgIdsByStore
1287 [MsgId | AllMsgIds]}.
1289 find_persistent_count(LensByStore) ->
1290 case orddict:find(true, LensByStore) of
1295 %%----------------------------------------------------------------------------
1296 %% Internal plumbing for confirms (aka publisher acks)
1297 %%----------------------------------------------------------------------------
1299 record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD,
1300 msg_indices_on_disk = MIOD,
1304 msgs_on_disk = rabbit_misc:gb_sets_difference(MOD, MsgIdSet),
1305 msg_indices_on_disk = rabbit_misc:gb_sets_difference(MIOD, MsgIdSet),
1306 unconfirmed = rabbit_misc:gb_sets_difference(UC, MsgIdSet),
1307 confirmed = gb_sets:union(C, MsgIdSet) }.
1309 must_sync_index(#vqstate { msg_indices_on_disk = MIOD,
1310 unconfirmed = UC }) ->
1311 %% If UC is empty then by definition, MIOD and MOD are also empty
1312 %% and there's nothing that can be pending a sync.
1314 %% If UC is not empty, then we want to find is_empty(UC - MIOD),
1315 %% but the subtraction can be expensive. Thus instead, we test to
1316 %% see if UC is a subset of MIOD. This can only be the case if
1317 %% MIOD == UC, which would indicate that every message in UC is
1318 %% also in MIOD and is thus _all_ pending on a msg_store sync, not
1319 %% on a qi sync. Thus the negation of this is sufficient. Because
1320 %% is_subset is short circuiting, this is more efficient than the
1322 not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)).
1324 blind_confirm(Callback, MsgIdSet) ->
1326 fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end).
1328 msgs_written_to_disk(Callback, MsgIdSet, ignored) ->
1329 blind_confirm(Callback, MsgIdSet);
1330 msgs_written_to_disk(Callback, MsgIdSet, written) ->
1332 fun (?MODULE, State = #vqstate { msgs_on_disk = MOD,
1333 msg_indices_on_disk = MIOD,
1334 unconfirmed = UC }) ->
1335 Confirmed = gb_sets:intersection(UC, MsgIdSet),
1336 record_confirms(gb_sets:intersection(MsgIdSet, MIOD),
1339 gb_sets:union(MOD, Confirmed) })
1342 msg_indices_written_to_disk(Callback, MsgIdSet) ->
1344 fun (?MODULE, State = #vqstate { msgs_on_disk = MOD,
1345 msg_indices_on_disk = MIOD,
1346 unconfirmed = UC }) ->
1347 Confirmed = gb_sets:intersection(UC, MsgIdSet),
1348 record_confirms(gb_sets:intersection(MsgIdSet, MOD),
1350 msg_indices_on_disk =
1351 gb_sets:union(MIOD, Confirmed) })
1354 %%----------------------------------------------------------------------------
1355 %% Internal plumbing for requeue
1356 %%----------------------------------------------------------------------------
1358 publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
1359 read_msg(MsgStatus, State);
1360 publish_alpha(MsgStatus, #vqstate {ram_msg_count = RamMsgCount } = State) ->
1361 {MsgStatus, State #vqstate { ram_msg_count = RamMsgCount + 1 }}.
1363 publish_beta(MsgStatus, State) ->
1364 {#msg_status { msg = Msg} = MsgStatus1,
1365 #vqstate { ram_msg_count = RamMsgCount } = State1} =
1366 maybe_write_to_disk(true, false, MsgStatus, State),
1367 {MsgStatus1, State1 #vqstate {
1368 ram_msg_count = RamMsgCount + one_if(Msg =/= undefined) }}.
1370 %% Rebuild queue, inserting sequence ids to maintain ordering
1371 queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) ->
1372 queue_merge(SeqIds, Q, ?QUEUE:new(), MsgIds,
1373 Limit, PubFun, State).
1375 queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds,
1376 Limit, PubFun, State)
1377 when Limit == undefined orelse SeqId < Limit ->
1378 case ?QUEUE:out(Q) of
1379 {{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1}
1380 when SeqIdQ < SeqId ->
1381 %% enqueue from the remaining queue
1382 queue_merge(SeqIds, Q1, ?QUEUE:in(MsgStatus, Front), MsgIds,
1383 Limit, PubFun, State);
1385 %% enqueue from the remaining list of sequence ids
1386 {MsgStatus, State1} = msg_from_pending_ack(SeqId, State),
1387 {#msg_status { msg_id = MsgId } = MsgStatus1, State2} =
1388 PubFun(MsgStatus, State1),
1389 queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds],
1390 Limit, PubFun, State2)
1392 queue_merge(SeqIds, Q, Front, MsgIds,
1393 _Limit, _PubFun, State) ->
1394 {SeqIds, ?QUEUE:join(Front, Q), MsgIds, State}.
1396 delta_merge([], Delta, MsgIds, State) ->
1397 {Delta, MsgIds, State};
1398 delta_merge(SeqIds, Delta, MsgIds, State) ->
1399 lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0}) ->
1400 {#msg_status { msg_id = MsgId } = MsgStatus, State1} =
1401 msg_from_pending_ack(SeqId, State0),
1402 {_MsgStatus, State2} =
1403 maybe_write_to_disk(true, true, MsgStatus, State1),
1404 {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], State2}
1405 end, {Delta, MsgIds, State}, SeqIds).
1407 %% Mostly opposite of record_pending_ack/2
1408 msg_from_pending_ack(SeqId, State) ->
1409 {#msg_status { msg_props = MsgProps } = MsgStatus, State1} =
1410 remove_pending_ack(SeqId, State),
1411 {MsgStatus #msg_status {
1412 msg_props = MsgProps #message_properties { needs_confirming = false } },
1416 case ?QUEUE:peek(Q) of
1417 {value, #msg_status { seq_id = SeqId }} -> SeqId;
1421 delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined;
1422 delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
1424 %%----------------------------------------------------------------------------
1426 %%----------------------------------------------------------------------------
1428 %% Determine whether a reduction in memory use is necessary, and call
1429 %% functions to perform the required phase changes. The function can
1430 %% also be used to just do the former, by passing in dummy phase
1431 %% change functions.
1433 %% The function does not report on any needed beta->delta conversions,
1434 %% though the conversion function for that is called as necessary. The
1435 %% reason is twofold. Firstly, this is safe because the conversion is
1436 %% only ever necessary just after a transition to a
1437 %% target_ram_count of zero or after an incremental alpha->beta
1438 %% conversion. In the former case the conversion is performed straight
1439 %% away (i.e. any betas present at the time are converted to deltas),
1440 %% and in the latter case the need for a conversion is flagged up
1441 %% anyway. Secondly, this is necessary because we do not have a
1442 %% precise and cheap predicate for determining whether a beta->delta
1443 %% conversion is necessary - due to the complexities of retaining up
1444 %% one segment's worth of messages in q3 - and thus would risk
1445 %% perpetually reporting the need for a conversion when no such
1446 %% conversion is needed. That in turn could cause an infinite loop.
1447 reduce_memory_use(_AlphaBetaFun, _BetaDeltaFun, _AckFun,
1448 State = #vqstate {target_ram_count = infinity}) ->
1450 reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
1452 ram_ack_index = RamAckIndex,
1453 ram_msg_count = RamMsgCount,
1454 target_ram_count = TargetRamCount,
1455 rates = #rates { avg_ingress = AvgIngress,
1456 avg_egress = AvgEgress },
1457 ack_rates = #rates { avg_ingress = AvgAckIngress,
1458 avg_egress = AvgAckEgress }
1461 {Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} =
1462 case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex),
1464 0 -> {false, State};
1465 %% Reduce memory of pending acks and alphas. The order is
1466 %% determined based on which is growing faster. Whichever
1467 %% comes second may very well get a quota of 0 if the
1468 %% first manages to push out the max number of messages.
1470 lists:foldl(fun (ReduceFun, {QuotaN, StateN}) ->
1471 ReduceFun(QuotaN, StateN)
1474 case (AvgAckIngress - AvgAckEgress) >
1475 (AvgIngress - AvgEgress) of
1476 true -> [AckFun, AlphaBetaFun];
1477 false -> [AlphaBetaFun, AckFun]
1482 case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3),
1483 permitted_beta_count(State1)) of
1484 ?IO_BATCH_SIZE = S2 -> {true, BetaDeltaFun(S2, State1)};
1485 _ -> {Reduce, State1}
1488 limit_ram_acks(0, State) ->
1490 limit_ram_acks(Quota, State = #vqstate { pending_ack = PA,
1491 ram_ack_index = RAI }) ->
1492 case gb_trees:is_empty(RAI) of
1496 {SeqId, MsgId, RAI1} = gb_trees:take_largest(RAI),
1497 MsgStatus = #msg_status { msg_id = MsgId, is_persistent = false} =
1498 gb_trees:get(SeqId, PA),
1499 {MsgStatus1, State1} =
1500 maybe_write_to_disk(true, false, MsgStatus, State),
1501 PA1 = gb_trees:update(SeqId, m(trim_msg_status(MsgStatus1)), PA),
1502 limit_ram_acks(Quota - 1,
1503 State1 #vqstate { pending_ack = PA1,
1504 ram_ack_index = RAI1 })
1507 reduce_memory_use(State) ->
1508 {_, State1} = reduce_memory_use(fun push_alphas_to_betas/2,
1509 fun push_betas_to_deltas/2,
1510 fun limit_ram_acks/2,
1514 permitted_beta_count(#vqstate { len = 0 }) ->
1516 permitted_beta_count(#vqstate { target_ram_count = 0, q3 = Q3 }) ->
1517 lists:min([?QUEUE:len(Q3), rabbit_queue_index:next_segment_boundary(0)]);
1518 permitted_beta_count(#vqstate { q1 = Q1,
1520 target_ram_count = TargetRamCount,
1522 BetaDelta = Len - ?QUEUE:len(Q1) - ?QUEUE:len(Q4),
1523 lists:max([rabbit_queue_index:next_segment_boundary(0),
1524 BetaDelta - ((BetaDelta * BetaDelta) div
1525 (BetaDelta + TargetRamCount))]).
1527 chunk_size(Current, Permitted)
1528 when Permitted =:= infinity orelse Permitted >= Current ->
1530 chunk_size(Current, Permitted) ->
1531 lists:min([Current - Permitted, ?IO_BATCH_SIZE]).
1533 fetch_from_q3(State = #vqstate { q1 = Q1,
1535 delta = #delta { count = DeltaCount },
1538 case ?QUEUE:out(Q3) of
1541 {{value, MsgStatus}, Q3a} ->
1542 State1 = State #vqstate { q3 = Q3a },
1543 State2 = case {?QUEUE:is_empty(Q3a), 0 == DeltaCount} of
1545 %% q3 is now empty, it wasn't before;
1546 %% delta is still empty. So q2 must be
1547 %% empty, and we know q4 is empty
1548 %% otherwise we wouldn't be loading from
1549 %% q3. As such, we can just set q4 to Q1.
1550 true = ?QUEUE:is_empty(Q2), %% ASSERTION
1551 true = ?QUEUE:is_empty(Q4), %% ASSERTION
1552 State1 #vqstate { q1 = ?QUEUE:new(), q4 = Q1 };
1554 maybe_deltas_to_betas(State1);
1556 %% q3 still isn't empty, we've not
1557 %% touched delta, so the invariants
1558 %% between q1, q2, delta and q3 are
1562 {loaded, {MsgStatus, State2}}
1565 maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) ->
1567 maybe_deltas_to_betas(State = #vqstate {
1571 index_state = IndexState,
1573 transient_threshold = TransientThreshold }) ->
1574 #delta { start_seq_id = DeltaSeqId,
1576 end_seq_id = DeltaSeqIdEnd } = Delta,
1578 lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId),
1580 {List, IndexState1} =
1581 rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState),
1582 {Q3a, IndexState2} =
1583 betas_from_index_entries(List, TransientThreshold, PA, IndexState1),
1584 State1 = State #vqstate { index_state = IndexState2 },
1585 case ?QUEUE:len(Q3a) of
1587 %% we ignored every message in the segment due to it being
1588 %% transient and below the threshold
1589 maybe_deltas_to_betas(
1591 delta = d(Delta #delta { start_seq_id = DeltaSeqId1 })});
1593 Q3b = ?QUEUE:join(Q3, Q3a),
1594 case DeltaCount - Q3aLen of
1596 %% delta is now empty, but it wasn't before, so
1597 %% can now join q2 onto q3
1598 State1 #vqstate { q2 = ?QUEUE:new(),
1599 delta = ?BLANK_DELTA,
1600 q3 = ?QUEUE:join(Q3b, Q2) };
1602 Delta1 = d(#delta { start_seq_id = DeltaSeqId1,
1604 end_seq_id = DeltaSeqIdEnd }),
1605 State1 #vqstate { delta = Delta1,
1610 push_alphas_to_betas(Quota, State) ->
1612 push_alphas_to_betas(
1614 fun (MsgStatus, Q1a,
1615 State0 = #vqstate { q3 = Q3, delta = #delta { count = 0 } }) ->
1616 State0 #vqstate { q1 = Q1a, q3 = ?QUEUE:in(MsgStatus, Q3) };
1617 (MsgStatus, Q1a, State0 = #vqstate { q2 = Q2 }) ->
1618 State0 #vqstate { q1 = Q1a, q2 = ?QUEUE:in(MsgStatus, Q2) }
1619 end, Quota, State #vqstate.q1, State),
1621 push_alphas_to_betas(
1623 fun (MsgStatus, Q4a, State0 = #vqstate { q3 = Q3 }) ->
1624 State0 #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3), q4 = Q4a }
1625 end, Quota1, State1 #vqstate.q4, State1),
1628 push_alphas_to_betas(_Generator, _Consumer, Quota, _Q,
1629 State = #vqstate { ram_msg_count = RamMsgCount,
1630 target_ram_count = TargetRamCount })
1631 when Quota =:= 0 orelse
1632 TargetRamCount =:= infinity orelse
1633 TargetRamCount >= RamMsgCount ->
1635 push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
1636 case Generator(Q) of
1639 {{value, MsgStatus}, Qa} ->
1640 {MsgStatus1 = #msg_status { msg_on_disk = true },
1641 State1 = #vqstate { ram_msg_count = RamMsgCount }} =
1642 maybe_write_to_disk(true, false, MsgStatus, State),
1643 MsgStatus2 = m(trim_msg_status(MsgStatus1)),
1644 State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1 },
1645 push_alphas_to_betas(Generator, Consumer, Quota - 1, Qa,
1646 Consumer(MsgStatus2, Qa, State2))
1649 push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2,
1652 index_state = IndexState }) ->
1653 PushState = {Quota, Delta, IndexState},
1654 {Q3a, PushState1} = push_betas_to_deltas(
1656 fun rabbit_queue_index:next_segment_boundary/1,
1658 {Q2a, PushState2} = push_betas_to_deltas(
1660 fun (Q2MinSeqId) -> Q2MinSeqId end,
1662 {_, Delta1, IndexState1} = PushState2,
1663 State #vqstate { q2 = Q2a,
1666 index_state = IndexState1 }.
1668 push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
1669 case ?QUEUE:is_empty(Q) of
1673 {value, #msg_status { seq_id = MinSeqId }} = ?QUEUE:peek(Q),
1674 {value, #msg_status { seq_id = MaxSeqId }} = ?QUEUE:peek_r(Q),
1675 Limit = LimitFun(MinSeqId),
1676 case MaxSeqId < Limit of
1677 true -> {Q, PushState};
1678 false -> push_betas_to_deltas1(Generator, Limit, Q, PushState)
1682 push_betas_to_deltas1(_Generator, _Limit, Q,
1683 {0, _Delta, _IndexState} = PushState) ->
1685 push_betas_to_deltas1(Generator, Limit, Q,
1686 {Quota, Delta, IndexState} = PushState) ->
1687 case Generator(Q) of
1690 {{value, #msg_status { seq_id = SeqId }}, _Qa}
1691 when SeqId < Limit ->
1693 {{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} ->
1694 {#msg_status { index_on_disk = true }, IndexState1} =
1695 maybe_write_index_to_disk(true, MsgStatus, IndexState),
1696 Delta1 = expand_delta(SeqId, Delta),
1697 push_betas_to_deltas1(Generator, Limit, Qa,
1698 {Quota - 1, Delta1, IndexState1})
1701 %%----------------------------------------------------------------------------
1703 %%----------------------------------------------------------------------------
1705 multiple_routing_keys() ->
1707 fun ({basic_message, ExchangeName, Routing_Key, Content,
1708 MsgId, Persistent}) ->
1709 {ok, {basic_message, ExchangeName, [Routing_Key], Content,
1710 MsgId, Persistent}};
1711 (_) -> {error, corrupt_message}
1716 %% Assumes message store is not running
1717 transform_storage(TransformFun) ->
1718 transform_store(?PERSISTENT_MSG_STORE, TransformFun),
1719 transform_store(?TRANSIENT_MSG_STORE, TransformFun).
1721 transform_store(Store, TransformFun) ->
1722 rabbit_msg_store:force_recovery(rabbit_mnesia:dir(), Store),
1723 rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun).