1 %% The contents of this file are subject to the Mozilla Public License
2 %% Version 1.1 (the "License"); you may not use this file except in
3 %% compliance with the License. You may obtain a copy of the License
4 %% at http://www.mozilla.org/MPL/
6 %% Software distributed under the License is distributed on an "AS IS"
7 %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8 %% the License for the specific language governing rights and
9 %% limitations under the License.
11 %% The Original Code is RabbitMQ.
13 %% The Initial Developer of the Original Code is VMware, Inc.
14 %% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
17 -module(rabbit_msg_store).
19 -behaviour(gen_server2).
21 -export([start_link/4, successfully_recovered_state/1,
22 client_init/4, client_terminate/1, client_delete_and_terminate/1,
23 client_ref/1, close_all_indicated/1,
24 write/3, read/2, contains/2, remove/2]).
26 -export([set_maximum_since_use/2, has_readers/2, combine_files/3,
27 delete_file/2]). %% internal
29 -export([transform_dir/3, force_recovery/2]). %% upgrade
31 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
32 code_change/3, prioritise_call/3, prioritise_cast/2,
33 prioritise_info/2, format_message_queue/2]).
35 %%----------------------------------------------------------------------------
37 -include("rabbit_msg_store.hrl").
39 -define(SYNC_INTERVAL, 25). %% milliseconds
40 -define(CLEAN_FILENAME, "clean.dot").
41 -define(FILE_SUMMARY_FILENAME, "file_summary.ets").
42 -define(TRANSFORM_TMP, "transform_tmp").
44 -define(BINARY_MODE, [raw, binary]).
45 -define(READ_MODE, [read]).
46 -define(READ_AHEAD_MODE, [read_ahead | ?READ_MODE]).
47 -define(WRITE_MODE, [write]).
49 -define(FILE_EXTENSION, ".rdq").
50 -define(FILE_EXTENSION_TMP, ".rdt").
52 -define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB
54 %%----------------------------------------------------------------------------
57 { dir, %% store directory
58 index_module, %% the module for index ops
59 index_state, %% where are messages?
60 current_file, %% current file name as number
61 current_file_handle, %% current file handle since the last fsync?
62 file_handle_cache, %% file handle cache
63 sync_timer_ref, %% TRef for our interval timer
64 sum_valid_data, %% sum of valid data in all files
65 sum_file_size, %% sum of file sizes
66 pending_gc_completion, %% things to do once GC completes
67 gc_pid, %% pid of our GC
68 file_handles_ets, %% tid of the shared file handles table
69 file_summary_ets, %% tid of the file summary table
70 cur_file_cache_ets, %% tid of current file cache table
71 dying_clients, %% set of dying clients
72 clients, %% map of references of all registered clients
74 successfully_recovered, %% boolean: did we recover state?
75 file_size_limit, %% how big are our files allowed to get?
76 cref_to_msg_ids %% client ref to synced messages mapping
79 -record(client_msstate,
93 {file, valid_total_size, left, right, file_size, locked, readers}).
104 %%----------------------------------------------------------------------------
108 -export_type([gc_state/0, file_num/0]).
110 -type(gc_state() :: #gc_state { dir :: file:filename(),
111 index_module :: atom(),
112 index_state :: any(),
113 file_summary_ets :: ets:tid(),
114 file_handles_ets :: ets:tid(),
115 msg_store :: server()
118 -type(server() :: pid() | atom()).
119 -type(client_ref() :: binary()).
120 -type(file_num() :: non_neg_integer()).
121 -type(client_msstate() :: #client_msstate {
123 client_ref :: client_ref(),
124 file_handle_cache :: dict(),
125 index_state :: any(),
126 index_module :: atom(),
127 dir :: file:filename(),
129 file_handles_ets :: ets:tid(),
130 file_summary_ets :: ets:tid(),
131 cur_file_cache_ets :: ets:tid()}).
132 -type(msg_ref_delta_gen(A) ::
133 fun ((A) -> 'finished' |
134 {rabbit_types:msg_id(), non_neg_integer(), A})).
135 -type(maybe_msg_id_fun() ::
136 'undefined' | fun ((gb_set(), 'written' | 'removed') -> any())).
137 -type(maybe_close_fds_fun() :: 'undefined' | fun (() -> 'ok')).
138 -type(deletion_thunk() :: fun (() -> boolean())).
140 -spec(start_link/4 ::
141 (atom(), file:filename(), [binary()] | 'undefined',
142 {msg_ref_delta_gen(A), A}) -> rabbit_types:ok_pid_or_error()).
143 -spec(successfully_recovered_state/1 :: (server()) -> boolean()).
144 -spec(client_init/4 :: (server(), client_ref(), maybe_msg_id_fun(),
145 maybe_close_fds_fun()) -> client_msstate()).
146 -spec(client_terminate/1 :: (client_msstate()) -> 'ok').
147 -spec(client_delete_and_terminate/1 :: (client_msstate()) -> 'ok').
148 -spec(client_ref/1 :: (client_msstate()) -> client_ref()).
149 -spec(close_all_indicated/1 ::
150 (client_msstate()) -> rabbit_types:ok(client_msstate())).
151 -spec(write/3 :: (rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok').
152 -spec(read/2 :: (rabbit_types:msg_id(), client_msstate()) ->
153 {rabbit_types:ok(msg()) | 'not_found', client_msstate()}).
154 -spec(contains/2 :: (rabbit_types:msg_id(), client_msstate()) -> boolean()).
155 -spec(remove/2 :: ([rabbit_types:msg_id()], client_msstate()) -> 'ok').
157 -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok').
158 -spec(has_readers/2 :: (non_neg_integer(), gc_state()) -> boolean()).
159 -spec(combine_files/3 :: (non_neg_integer(), non_neg_integer(), gc_state()) ->
161 -spec(delete_file/2 :: (non_neg_integer(), gc_state()) -> deletion_thunk()).
162 -spec(force_recovery/2 :: (file:filename(), server()) -> 'ok').
163 -spec(transform_dir/3 :: (file:filename(), server(),
164 fun ((any()) -> (rabbit_types:ok_or_error2(msg(), any())))) -> 'ok').
168 %%----------------------------------------------------------------------------
170 %% We run GC whenever (garbage / sum_file_size) > ?GARBAGE_FRACTION
171 %% It is not recommended to set this to < 0.5
172 -define(GARBAGE_FRACTION, 0.5).
176 %% Index: this is a mapping from MsgId to #msg_location{}:
177 %% {MsgId, RefCount, File, Offset, TotalSize}
178 %% By default, it's in ets, but it's also pluggable.
179 %% FileSummary: this is an ets table which maps File to #file_summary{}:
180 %% {File, ValidTotalSize, Left, Right, FileSize, Locked, Readers}
182 %% The basic idea is that messages are appended to the current file up
183 %% until that file becomes too big (> file_size_limit). At that point,
184 %% the file is closed and a new file is created on the _right_ of the
185 %% old file which is used for new messages. Files are named
186 %% numerically ascending, thus the file with the lowest name is the
189 %% We need to keep track of which messages are in which files (this is
190 %% the Index); how much useful data is in each file and which files
191 %% are on the left and right of each other. This is the purpose of the
192 %% FileSummary ets table.
194 %% As messages are removed from files, holes appear in these
195 %% files. The field ValidTotalSize contains the total amount of useful
196 %% data left in the file. This is needed for garbage collection.
198 %% When we discover that a file is now empty, we delete it. When we
199 %% discover that it can be combined with the useful data in either its
200 %% left or right neighbour, and overall, across all the files, we have
201 %% ((the amount of garbage) / (the sum of all file sizes)) >
202 %% ?GARBAGE_FRACTION, we start a garbage collection run concurrently,
203 %% which will compact the two files together. This keeps disk
204 %% utilisation high and aids performance. We deliberately do this
205 %% lazily in order to prevent doing GC on files which are soon to be
206 %% emptied (and hence deleted) soon.
208 %% Given the compaction between two files, the left file (i.e. elder
209 %% file) is considered the ultimate destination for the good data in
210 %% the right file. If necessary, the good data in the left file which
211 %% is fragmented throughout the file is written out to a temporary
212 %% file, then read back in to form a contiguous chunk of good data at
213 %% the start of the left file. Thus the left file is garbage collected
214 %% and compacted. Then the good data from the right file is copied
215 %% onto the end of the left file. Index and FileSummary tables are
218 %% On non-clean startup, we scan the files we discover, dealing with
219 %% the possibilites of a crash having occured during a compaction
220 %% (this consists of tidyup - the compaction is deliberately designed
221 %% such that data is duplicated on disk rather than risking it being
222 %% lost), and rebuild the FileSummary ets table and Index.
224 %% So, with this design, messages move to the left. Eventually, they
225 %% should end up in a contiguous block on the left and are then never
226 %% rewritten. But this isn't quite the case. If in a file there is one
227 %% message that is being ignored, for some reason, and messages in the
228 %% file to the right and in the current block are being read all the
229 %% time then it will repeatedly be the case that the good data from
230 %% both files can be combined and will be written out to a new
231 %% file. Whenever this happens, our shunned message will be rewritten.
233 %% So, provided that we combine messages in the right order,
234 %% (i.e. left file, bottom to top, right file, bottom to top),
235 %% eventually our shunned message will end up at the bottom of the
236 %% left file. The compaction/combining algorithm is smart enough to
237 %% read in good data from the left file that is scattered throughout
238 %% (i.e. C and D in the below diagram), then truncate the file to just
239 %% above B (i.e. truncate to the limit of the good contiguous region
240 %% at the start of the file), then write C and D on top and then write
241 %% E, F and G from the right file on top. Thus contiguous blocks of
242 %% good data at the bottom of files are not rewritten.
244 %% +-------+ +-------+ +-------+
246 %% +-------+ +-------+ +-------+
248 %% +-------+ +-------+ +-------+
250 %% +-------+ +-------+ +-------+
251 %% | C | | F | ===> | D |
252 %% +-------+ +-------+ +-------+
254 %% +-------+ +-------+ +-------+
256 %% +-------+ +-------+ +-------+
258 %% +-------+ +-------+ +-------+
261 %% From this reasoning, we do have a bound on the number of times the
262 %% message is rewritten. From when it is inserted, there can be no
263 %% files inserted between it and the head of the queue, and the worst
264 %% case is that everytime it is rewritten, it moves one position lower
265 %% in the file (for it to stay at the same position requires that
266 %% there are no holes beneath it, which means truncate would be used
267 %% and so it would not be rewritten at all). Thus this seems to
268 %% suggest the limit is the number of messages ahead of it in the
269 %% queue, though it's likely that that's pessimistic, given the
270 %% requirements for compaction/combination of files.
272 %% The other property is that we have is the bound on the lowest
273 %% utilisation, which should be 50% - worst case is that all files are
274 %% fractionally over half full and can't be combined (equivalent is
275 %% alternating full files and files with only one tiny message in
278 %% Messages are reference-counted. When a message with the same msg id
279 %% is written several times we only store it once, and only remove it
280 %% from the store when it has been removed the same number of times.
282 %% The reference counts do not persist. Therefore the initialisation
283 %% function must be provided with a generator that produces ref count
284 %% deltas for all recovered messages. This is only used on startup
285 %% when the shutdown was non-clean.
287 %% Read messages with a reference count greater than one are entered
288 %% into a message cache. The purpose of the cache is not especially
289 %% performance, though it can help there too, but prevention of memory
290 %% explosion. It ensures that as messages with a high reference count
291 %% are read from several processes they are read back as the same
292 %% binary object rather than multiples of identical binary
295 %% Reads can be performed directly by clients without calling to the
296 %% server. This is safe because multiple file handles can be used to
297 %% read files. However, locking is used by the concurrent GC to make
298 %% sure that reads are not attempted from files which are in the
299 %% process of being garbage collected.
301 %% When a message is removed, its reference count is decremented. Even
302 %% if the reference count becomes 0, its entry is not removed. This is
303 %% because in the event of the same message being sent to several
304 %% different queues, there is the possibility of one queue writing and
305 %% removing the message before other queues write it at all. Thus
306 %% accomodating 0-reference counts allows us to avoid unnecessary
307 %% writes here. Of course, there are complications: the file to which
308 %% the message has already been written could be locked pending
309 %% deletion or GC, which means we have to rewrite the message as the
310 %% original copy will now be lost.
312 %% The server automatically defers reads, removes and contains calls
313 %% that occur which refer to files which are currently being
314 %% GC'd. Contains calls are only deferred in order to ensure they do
315 %% not overtake removes.
317 %% The current file to which messages are being written has a
318 %% write-back cache. This is written to immediately by clients and can
319 %% be read from by clients too. This means that there are only ever
320 %% writes made to the current file, thus eliminating delays due to
321 %% flushing write buffers in order to be able to safely read from the
322 %% current file. The one exception to this is that on start up, the
323 %% cache is not populated with msgs found in the current file, and
324 %% thus in this case only, reads may have to come from the file
325 %% itself. The effect of this is that even if the msg_store process is
326 %% heavily overloaded, clients can still write and read messages with
327 %% very low latency and not block at all.
329 %% Clients of the msg_store are required to register before using the
330 %% msg_store. This provides them with the necessary client-side state
331 %% to allow them to directly access the various caches and files. When
332 %% they terminate, they should deregister. They can do this by calling
333 %% either client_terminate/1 or client_delete_and_terminate/1. The
334 %% differences are: (a) client_terminate is synchronous. As a result,
335 %% if the msg_store is badly overloaded and has lots of in-flight
336 %% writes and removes to process, this will take some time to
337 %% return. However, once it does return, you can be sure that all the
338 %% actions you've issued to the msg_store have been processed. (b) Not
339 %% only is client_delete_and_terminate/1 asynchronous, but it also
340 %% permits writes and subsequent removes from the current
341 %% (terminating) client which are still in flight to be safely
342 %% ignored. Thus from the point of view of the msg_store itself, and
343 %% all from the same client:
345 %% (T) = termination; (WN) = write of msg N; (RN) = remove of msg N
346 %% --> W1, W2, W1, R1, T, W3, R2, W2, R1, R2, R3, W4 -->
348 %% The client obviously sent T after all the other messages (up to
349 %% W4), but because the msg_store prioritises messages, the T can be
350 %% promoted and thus received early.
352 %% Thus at the point of the msg_store receiving T, we have messages 1
353 %% and 2 with a refcount of 1. After T, W3 will be ignored because
354 %% it's an unknown message, as will R3, and W4. W2, R1 and R2 won't be
355 %% ignored because the messages that they refer to were already known
356 %% to the msg_store prior to T. However, it can be a little more
357 %% complex: after the first R2, the refcount of msg 2 is 0. At that
358 %% point, if a GC occurs or file deletion, msg 2 could vanish, which
359 %% would then mean that the subsequent W2 and R2 are then ignored.
361 %% The use case then for client_delete_and_terminate/1 is if the
362 %% client wishes to remove everything it's written to the msg_store:
363 %% it issues removes for all messages it's written and not removed,
364 %% and then calls client_delete_and_terminate/1. At that point, any
365 %% in-flight writes (and subsequent removes) can be ignored, but
366 %% removes and writes for messages the msg_store already knows about
367 %% will continue to be processed normally (which will normally just
368 %% involve modifying the reference count, which is fast). Thus we save
369 %% disk bandwidth for writes which are going to be immediately removed
370 %% again by the the terminating client.
372 %% We use a separate set to keep track of the dying clients in order
373 %% to keep that set, which is inspected on every write and remove, as
374 %% small as possible. Inspecting the set of all clients would degrade
375 %% performance with many healthy clients and few, if any, dying
376 %% clients, which is the typical case.
378 %% For notes on Clean Shutdown and startup, see documentation in
381 %%----------------------------------------------------------------------------
383 %%----------------------------------------------------------------------------
385 start_link(Server, Dir, ClientRefs, StartupFunState) ->
386 gen_server2:start_link({local, Server}, ?MODULE,
387 [Server, Dir, ClientRefs, StartupFunState],
388 [{timeout, infinity}]).
390 successfully_recovered_state(Server) ->
391 gen_server2:call(Server, successfully_recovered_state, infinity).
393 client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
394 {IState, IModule, Dir, GCPid,
395 FileHandlesEts, FileSummaryEts, CurFileCacheEts} =
397 Server, {new_client_state, Ref, MsgOnDiskFun, CloseFDsFun}, infinity),
398 #client_msstate { server = Server,
400 file_handle_cache = dict:new(),
401 index_state = IState,
402 index_module = IModule,
405 file_handles_ets = FileHandlesEts,
406 file_summary_ets = FileSummaryEts,
407 cur_file_cache_ets = CurFileCacheEts }.
409 client_terminate(CState = #client_msstate { client_ref = Ref }) ->
410 close_all_handles(CState),
411 ok = server_call(CState, {client_terminate, Ref}).
413 client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) ->
414 close_all_handles(CState),
415 ok = server_cast(CState, {client_dying, Ref}),
416 ok = server_cast(CState, {client_delete, Ref}).
418 client_ref(#client_msstate { client_ref = Ref }) -> Ref.
421 CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
422 client_ref = CRef }) ->
423 ok = update_msg_cache(CurFileCacheEts, MsgId, Msg),
424 ok = server_cast(CState, {write, CRef, MsgId}).
427 CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
428 %% Check the cur file cache
429 case ets:lookup(CurFileCacheEts, MsgId) of
431 Defer = fun() -> {server_call(CState, {read, MsgId}), CState} end,
432 case index_lookup_positive_ref_count(MsgId, CState) of
433 not_found -> Defer();
434 MsgLocation -> client_read1(MsgLocation, Defer, CState)
436 [{MsgId, Msg, _CacheRefCount}] ->
440 contains(MsgId, CState) -> server_call(CState, {contains, MsgId}).
441 remove([], _CState) -> ok;
442 remove(MsgIds, CState = #client_msstate { client_ref = CRef }) ->
443 server_cast(CState, {remove, CRef, MsgIds}).
445 set_maximum_since_use(Server, Age) ->
446 gen_server2:cast(Server, {set_maximum_since_use, Age}).
448 %%----------------------------------------------------------------------------
449 %% Client-side-only helpers
450 %%----------------------------------------------------------------------------
452 server_call(#client_msstate { server = Server }, Msg) ->
453 gen_server2:call(Server, Msg, infinity).
455 server_cast(#client_msstate { server = Server }, Msg) ->
456 gen_server2:cast(Server, Msg).
458 client_read1(#msg_location { msg_id = MsgId, file = File } = MsgLocation, Defer,
459 CState = #client_msstate { file_summary_ets = FileSummaryEts }) ->
460 case ets:lookup(FileSummaryEts, File) of
461 [] -> %% File has been GC'd and no longer exists. Go around again.
463 [#file_summary { locked = Locked, right = Right }] ->
464 client_read2(Locked, Right, MsgLocation, Defer, CState)
467 client_read2(false, undefined, _MsgLocation, Defer, _CState) ->
468 %% Although we've already checked both caches and not found the
469 %% message there, the message is apparently in the
470 %% current_file. We can only arrive here if we are trying to read
471 %% a message which we have not written, which is very odd, so just
474 %% OR, on startup, the cur_file_cache is not populated with the
475 %% contents of the current file, thus reads from the current file
476 %% will end up here and will need to be deferred.
478 client_read2(true, _Right, _MsgLocation, Defer, _CState) ->
479 %% Of course, in the mean time, the GC could have run and our msg
480 %% is actually in a different file, unlocked. However, defering is
481 %% the safest and simplest thing to do.
483 client_read2(false, _Right,
484 MsgLocation = #msg_location { msg_id = MsgId, file = File },
486 CState = #client_msstate { file_summary_ets = FileSummaryEts }) ->
487 %% It's entirely possible that everything we're doing from here on
488 %% is for the wrong file, or a non-existent file, as a GC may have
490 safe_ets_update_counter(
491 FileSummaryEts, File, {#file_summary.readers, +1},
492 fun (_) -> client_read3(MsgLocation, Defer, CState) end,
493 fun () -> read(MsgId, CState) end).
495 client_read3(#msg_location { msg_id = MsgId, file = File }, Defer,
496 CState = #client_msstate { file_handles_ets = FileHandlesEts,
497 file_summary_ets = FileSummaryEts,
499 client_ref = Ref }) ->
501 fun() -> ok = case ets:update_counter(FileSummaryEts, File,
502 {#file_summary.readers, -1}) of
503 0 -> case ets:lookup(FileSummaryEts, File) of
504 [#file_summary { locked = true }] ->
505 rabbit_msg_store_gc:no_readers(
512 %% If a GC involving the file hasn't already started, it won't
513 %% start now. Need to check again to see if we've been locked in
514 %% the meantime, between lookup and update_counter (thus GC
515 %% started before our +1. In fact, it could have finished by now
517 case ets:lookup(FileSummaryEts, File) of
518 [] -> %% GC has deleted our file, just go round again.
520 [#file_summary { locked = true }] ->
521 %% If we get a badarg here, then the GC has finished and
522 %% deleted our file. Try going around again. Otherwise,
525 %% badarg scenario: we lookup, msg_store locks, GC starts,
526 %% GC ends, we +1 readers, msg_store ets:deletes (and
530 catch error:badarg -> read(MsgId, CState)
532 [#file_summary { locked = false }] ->
533 %% Ok, we're definitely safe to continue - a GC involving
534 %% the file cannot start up now, and isn't running, so
535 %% nothing will tell us from now on to close the handle if
536 %% it's already open.
538 %% Finally, we need to recheck that the msg is still at
539 %% the same place - it's possible an entire GC ran between
540 %% us doing the lookup and the +1 on the readers. (Same as
541 %% badarg scenario above, but we don't have a missing file
542 %% - we just have the /wrong/ file).
543 case index_lookup(MsgId, CState) of
544 #msg_location { file = File } = MsgLocation ->
545 %% Still the same file.
546 {ok, CState1} = close_all_indicated(CState),
547 %% We are now guaranteed that the mark_handle_open
548 %% call will either insert_new correctly, or will
549 %% fail, but find the value is open, not close.
550 mark_handle_open(FileHandlesEts, File, Ref),
551 %% Could the msg_store now mark the file to be
552 %% closed? No: marks for closing are issued only
553 %% when the msg_store has locked the file.
554 %% This will never be the current file
555 {Msg, CState2} = read_from_disk(MsgLocation, CState1),
556 Release(), %% this MUST NOT fail with badarg
557 {{ok, Msg}, CState2};
558 #msg_location {} = MsgLocation -> %% different file!
559 Release(), %% this MUST NOT fail with badarg
560 client_read1(MsgLocation, Defer, CState);
561 not_found -> %% it seems not to exist. Defer, just to be sure.
562 try Release() %% this can badarg, same as locked case, above
563 catch error:badarg -> ok
569 clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM,
570 dying_clients = DyingClients }) ->
571 State #msstate { cref_to_msg_ids = dict:erase(CRef, CTM),
572 dying_clients = sets:del_element(CRef, DyingClients) }.
575 %%----------------------------------------------------------------------------
576 %% gen_server callbacks
577 %%----------------------------------------------------------------------------
579 init([Server, BaseDir, ClientRefs, StartupFunState]) ->
580 process_flag(trap_exit, true),
582 ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
585 Dir = filename:join(BaseDir, atom_to_list(Server)),
587 {ok, IndexModule} = application:get_env(msg_store_index_module),
588 rabbit_log:info("~w: using ~p to provide index~n", [Server, IndexModule]),
590 AttemptFileSummaryRecovery =
592 undefined -> ok = rabbit_file:recursive_delete([Dir]),
593 ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
595 _ -> ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
596 recover_crashed_compactions(Dir)
599 %% if we found crashed compactions we trust neither the
600 %% file_summary nor the location index. Note the file_summary is
601 %% left empty here if it can't be recovered.
602 {FileSummaryRecovered, FileSummaryEts} =
603 recover_file_summary(AttemptFileSummaryRecovery, Dir),
605 {CleanShutdown, IndexState, ClientRefs1} =
606 recover_index_and_client_refs(IndexModule, FileSummaryRecovered,
607 ClientRefs, Dir, Server),
608 Clients = dict:from_list(
609 [{CRef, {undefined, undefined}} || CRef <- ClientRefs1]),
610 %% CleanShutdown => msg location index and file_summary both
611 %% recovered correctly.
612 true = case {FileSummaryRecovered, CleanShutdown} of
613 {true, false} -> ets:delete_all_objects(FileSummaryEts);
616 %% CleanShutdown <=> msg location index and file_summary both
617 %% recovered correctly.
619 FileHandlesEts = ets:new(rabbit_msg_store_shared_file_handles,
620 [ordered_set, public]),
621 CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]),
623 {ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit),
625 {ok, GCPid} = rabbit_msg_store_gc:start_link(
626 #gc_state { dir = Dir,
627 index_module = IndexModule,
628 index_state = IndexState,
629 file_summary_ets = FileSummaryEts,
630 file_handles_ets = FileHandlesEts,
634 State = #msstate { dir = Dir,
635 index_module = IndexModule,
636 index_state = IndexState,
638 current_file_handle = undefined,
639 file_handle_cache = dict:new(),
640 sync_timer_ref = undefined,
643 pending_gc_completion = orddict:new(),
645 file_handles_ets = FileHandlesEts,
646 file_summary_ets = FileSummaryEts,
647 cur_file_cache_ets = CurFileCacheEts,
648 dying_clients = sets:new(),
650 successfully_recovered = CleanShutdown,
651 file_size_limit = FileSizeLimit,
652 cref_to_msg_ids = dict:new()
655 %% If we didn't recover the msg location index then we need to
657 {Offset, State1 = #msstate { current_file = CurFile }} =
658 build_index(CleanShutdown, StartupFunState, State),
660 %% read is only needed so that we can seek
661 {ok, CurHdl} = open_file(Dir, filenum_to_name(CurFile),
662 [read | ?WRITE_MODE]),
663 {ok, Offset} = file_handle_cache:position(CurHdl, Offset),
664 ok = file_handle_cache:truncate(CurHdl),
666 {ok, maybe_compact(State1 #msstate { current_file_handle = CurHdl }),
668 {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
670 prioritise_call(Msg, _From, _State) ->
672 successfully_recovered_state -> 7;
673 {new_client_state, _Ref, _MODC, _CloseFDsFun} -> 7;
678 prioritise_cast(Msg, _State) ->
680 {combine_files, _Source, _Destination, _Reclaimed} -> 8;
681 {delete_file, _File, _Reclaimed} -> 8;
682 {set_maximum_since_use, _Age} -> 8;
683 {client_dying, _Pid} -> 7;
687 prioritise_info(Msg, _State) ->
693 handle_call(successfully_recovered_state, _From, State) ->
694 reply(State #msstate.successfully_recovered, State);
696 handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From,
697 State = #msstate { dir = Dir,
698 index_state = IndexState,
699 index_module = IndexModule,
700 file_handles_ets = FileHandlesEts,
701 file_summary_ets = FileSummaryEts,
702 cur_file_cache_ets = CurFileCacheEts,
705 Clients1 = dict:store(CRef, {MsgOnDiskFun, CloseFDsFun}, Clients),
706 reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts,
707 CurFileCacheEts}, State #msstate { clients = Clients1 });
709 handle_call({client_terminate, CRef}, _From, State) ->
710 reply(ok, clear_client(CRef, State));
712 handle_call({read, MsgId}, From, State) ->
713 State1 = read_message(MsgId, From, State),
716 handle_call({contains, MsgId}, From, State) ->
717 State1 = contains_message(MsgId, From, State),
720 handle_cast({client_dying, CRef},
721 State = #msstate { dying_clients = DyingClients }) ->
722 DyingClients1 = sets:add_element(CRef, DyingClients),
723 noreply(write_message(CRef, <<>>,
724 State #msstate { dying_clients = DyingClients1 }));
726 handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) ->
727 State1 = State #msstate { clients = dict:erase(CRef, Clients) },
728 noreply(remove_message(CRef, CRef, clear_client(CRef, State1)));
730 handle_cast({write, CRef, MsgId},
731 State = #msstate { cur_file_cache_ets = CurFileCacheEts }) ->
732 true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}),
733 [{MsgId, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, MsgId),
735 case write_action(should_mask_action(CRef, MsgId, State), MsgId, State) of
737 write_message(CRef, MsgId, Msg, State1);
738 {ignore, CurFile, State1 = #msstate { current_file = CurFile }} ->
740 {ignore, _File, State1} ->
741 true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}),
743 {confirm, CurFile, State1 = #msstate { current_file = CurFile }}->
744 record_pending_confirm(CRef, MsgId, State1);
745 {confirm, _File, State1} ->
746 true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}),
747 update_pending_confirms(
748 fun (MsgOnDiskFun, CTM) ->
749 MsgOnDiskFun(gb_sets:singleton(MsgId), written),
754 handle_cast({remove, CRef, MsgIds}, State) ->
755 State1 = lists:foldl(
756 fun (MsgId, State2) -> remove_message(MsgId, CRef, State2) end,
758 noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(MsgIds),
761 handle_cast({combine_files, Source, Destination, Reclaimed},
762 State = #msstate { sum_file_size = SumFileSize,
763 file_handles_ets = FileHandlesEts,
764 file_summary_ets = FileSummaryEts,
765 clients = Clients }) ->
766 ok = cleanup_after_file_deletion(Source, State),
767 %% see comment in cleanup_after_file_deletion, and client_read3
768 true = mark_handle_to_close(Clients, FileHandlesEts, Destination, false),
769 true = ets:update_element(FileSummaryEts, Destination,
770 {#file_summary.locked, false}),
771 State1 = State #msstate { sum_file_size = SumFileSize - Reclaimed },
772 noreply(maybe_compact(run_pending([Source, Destination], State1)));
774 handle_cast({delete_file, File, Reclaimed},
775 State = #msstate { sum_file_size = SumFileSize }) ->
776 ok = cleanup_after_file_deletion(File, State),
777 State1 = State #msstate { sum_file_size = SumFileSize - Reclaimed },
778 noreply(maybe_compact(run_pending([File], State1)));
780 handle_cast({set_maximum_since_use, Age}, State) ->
781 ok = file_handle_cache:set_maximum_since_use(Age),
784 handle_info(sync, State) ->
785 noreply(internal_sync(State));
787 handle_info(timeout, State) ->
788 noreply(internal_sync(State));
790 handle_info({'EXIT', _Pid, Reason}, State) ->
791 {stop, Reason, State}.
793 terminate(_Reason, State = #msstate { index_state = IndexState,
794 index_module = IndexModule,
795 current_file_handle = CurHdl,
797 file_handles_ets = FileHandlesEts,
798 file_summary_ets = FileSummaryEts,
799 cur_file_cache_ets = CurFileCacheEts,
802 %% stop the gc first, otherwise it could be working and we pull
803 %% out the ets tables from under it.
804 ok = rabbit_msg_store_gc:stop(GCPid),
805 State1 = case CurHdl of
807 _ -> State2 = internal_sync(State),
808 ok = file_handle_cache:close(CurHdl),
811 State3 = close_all_handles(State1),
812 ok = store_file_summary(FileSummaryEts, Dir),
813 [true = ets:delete(T) ||
814 T <- [FileSummaryEts, FileHandlesEts, CurFileCacheEts]],
815 IndexModule:terminate(IndexState),
816 ok = store_recovery_terms([{client_refs, dict:fetch_keys(Clients)},
817 {index_module, IndexModule}], Dir),
818 State3 #msstate { index_state = undefined,
819 current_file_handle = undefined }.
821 code_change(_OldVsn, State, _Extra) ->
824 format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
826 %%----------------------------------------------------------------------------
827 %% general helper functions
828 %%----------------------------------------------------------------------------
831 {State1, Timeout} = next_state(State),
832 {noreply, State1, Timeout}.
834 reply(Reply, State) ->
835 {State1, Timeout} = next_state(State),
836 {reply, Reply, State1, Timeout}.
838 next_state(State = #msstate { sync_timer_ref = undefined,
839 cref_to_msg_ids = CTM }) ->
840 case dict:size(CTM) of
841 0 -> {State, hibernate};
842 _ -> {start_sync_timer(State), 0}
844 next_state(State = #msstate { cref_to_msg_ids = CTM }) ->
845 case dict:size(CTM) of
846 0 -> {stop_sync_timer(State), hibernate};
850 start_sync_timer(State = #msstate { sync_timer_ref = undefined }) ->
851 TRef = erlang:send_after(?SYNC_INTERVAL, self(), sync),
852 State #msstate { sync_timer_ref = TRef }.
854 stop_sync_timer(State = #msstate { sync_timer_ref = undefined }) ->
856 stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) ->
857 erlang:cancel_timer(TRef),
858 State #msstate { sync_timer_ref = undefined }.
860 internal_sync(State = #msstate { current_file_handle = CurHdl,
861 cref_to_msg_ids = CTM }) ->
862 State1 = stop_sync_timer(State),
863 CGs = dict:fold(fun (CRef, MsgIds, NS) ->
864 case gb_sets:is_empty(MsgIds) of
866 false -> [{CRef, MsgIds} | NS]
871 _ -> file_handle_cache:sync(CurHdl)
873 lists:foldl(fun ({CRef, MsgIds}, StateN) ->
874 client_confirm(CRef, MsgIds, written, StateN)
877 write_action({true, not_found}, _MsgId, State) ->
878 {ignore, undefined, State};
879 write_action({true, #msg_location { file = File }}, _MsgId, State) ->
880 {ignore, File, State};
881 write_action({false, not_found}, _MsgId, State) ->
883 write_action({Mask, #msg_location { ref_count = 0, file = File,
884 total_size = TotalSize }},
885 MsgId, State = #msstate { file_summary_ets = FileSummaryEts }) ->
886 case {Mask, ets:lookup(FileSummaryEts, File)} of
887 {false, [#file_summary { locked = true }]} ->
888 ok = index_delete(MsgId, State),
890 {false_if_increment, [#file_summary { locked = true }]} ->
891 %% The msg for MsgId is older than the client death
892 %% message, but as it is being GC'd currently we'll have
893 %% to write a new copy, which will then be younger, so
894 %% ignore this write.
895 {ignore, File, State};
896 {_Mask, [#file_summary {}]} ->
897 ok = index_update_ref_count(MsgId, 1, State),
898 State1 = adjust_valid_total_size(File, TotalSize, State),
899 {confirm, File, State1}
901 write_action({_Mask, #msg_location { ref_count = RefCount, file = File }},
903 ok = index_update_ref_count(MsgId, RefCount + 1, State),
904 %% We already know about it, just update counter. Only update
905 %% field otherwise bad interaction with concurrent GC
906 {confirm, File, State}.
908 write_message(CRef, MsgId, Msg, State) ->
909 write_message(MsgId, Msg, record_pending_confirm(CRef, MsgId, State)).
911 write_message(MsgId, Msg,
912 State = #msstate { current_file_handle = CurHdl,
913 current_file = CurFile,
914 sum_valid_data = SumValid,
915 sum_file_size = SumFileSize,
916 file_summary_ets = FileSummaryEts }) ->
917 {ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl),
918 {ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg),
920 #msg_location { msg_id = MsgId, ref_count = 1, file = CurFile,
921 offset = CurOffset, total_size = TotalSize }, State),
922 [#file_summary { right = undefined, locked = false }] =
923 ets:lookup(FileSummaryEts, CurFile),
924 [_,_] = ets:update_counter(FileSummaryEts, CurFile,
925 [{#file_summary.valid_total_size, TotalSize},
926 {#file_summary.file_size, TotalSize}]),
927 maybe_roll_to_new_file(CurOffset + TotalSize,
929 sum_valid_data = SumValid + TotalSize,
930 sum_file_size = SumFileSize + TotalSize }).
932 read_message(MsgId, From, State) ->
933 case index_lookup_positive_ref_count(MsgId, State) of
934 not_found -> gen_server2:reply(From, not_found),
936 MsgLocation -> read_message1(From, MsgLocation, State)
939 read_message1(From, #msg_location { msg_id = MsgId, file = File,
940 offset = Offset } = MsgLoc,
941 State = #msstate { current_file = CurFile,
942 current_file_handle = CurHdl,
943 file_summary_ets = FileSummaryEts,
944 cur_file_cache_ets = CurFileCacheEts }) ->
945 case File =:= CurFile of
946 true -> {Msg, State1} =
947 %% can return [] if msg in file existed on startup
948 case ets:lookup(CurFileCacheEts, MsgId) of
951 file_handle_cache:current_raw_offset(CurHdl),
952 ok = case Offset >= RawOffSet of
953 true -> file_handle_cache:flush(CurHdl);
956 read_from_disk(MsgLoc, State);
957 [{MsgId, Msg1, _CacheRefCount}] ->
960 gen_server2:reply(From, {ok, Msg}),
962 false -> [#file_summary { locked = Locked }] =
963 ets:lookup(FileSummaryEts, File),
965 true -> add_to_pending_gc_completion({read, MsgId, From},
967 false -> {Msg, State1} = read_from_disk(MsgLoc, State),
968 gen_server2:reply(From, {ok, Msg}),
973 read_from_disk(#msg_location { msg_id = MsgId, file = File, offset = Offset,
974 total_size = TotalSize }, State) ->
975 {Hdl, State1} = get_read_handle(File, State),
976 {ok, Offset} = file_handle_cache:position(Hdl, Offset),
978 case rabbit_msg_file:read(Hdl, TotalSize) of
979 {ok, {MsgId, _}} = Obj ->
982 {error, {misread, [{old_state, State},
992 contains_message(MsgId, From,
993 State = #msstate { pending_gc_completion = Pending }) ->
994 case index_lookup_positive_ref_count(MsgId, State) of
996 gen_server2:reply(From, false),
998 #msg_location { file = File } ->
999 case orddict:is_key(File, Pending) of
1000 true -> add_to_pending_gc_completion(
1001 {contains, MsgId, From}, File, State);
1002 false -> gen_server2:reply(From, true),
1007 remove_message(MsgId, CRef,
1008 State = #msstate { file_summary_ets = FileSummaryEts }) ->
1009 case should_mask_action(CRef, MsgId, State) of
1010 {true, _Location} ->
1012 {false_if_increment, #msg_location { ref_count = 0 }} ->
1013 %% CRef has tried to both write and remove this msg
1014 %% whilst it's being GC'd. ASSERTION:
1015 %% [#file_summary { locked = true }] =
1016 %% ets:lookup(FileSummaryEts, File),
1018 {_Mask, #msg_location { ref_count = RefCount, file = File,
1019 total_size = TotalSize }} when RefCount > 0 ->
1020 %% only update field, otherwise bad interaction with
1023 index_update_ref_count(MsgId, RefCount - 1, State)
1026 %% don't remove from CUR_FILE_CACHE_ETS_NAME here
1027 %% because there may be further writes in the mailbox
1028 %% for the same msg.
1029 1 -> case ets:lookup(FileSummaryEts, File) of
1030 [#file_summary { locked = true }] ->
1031 add_to_pending_gc_completion(
1032 {remove, MsgId, CRef}, File, State);
1033 [#file_summary {}] ->
1035 delete_file_if_empty(
1036 File, adjust_valid_total_size(File, -TotalSize,
1044 add_to_pending_gc_completion(
1045 Op, File, State = #msstate { pending_gc_completion = Pending }) ->
1046 State #msstate { pending_gc_completion =
1047 rabbit_misc:orddict_cons(File, Op, Pending) }.
1049 run_pending(Files, State) ->
1051 fun (File, State1 = #msstate { pending_gc_completion = Pending }) ->
1052 Pending1 = orddict:erase(File, Pending),
1054 fun run_pending_action/2,
1055 State1 #msstate { pending_gc_completion = Pending1 },
1056 lists:reverse(orddict:fetch(File, Pending)))
1059 run_pending_action({read, MsgId, From}, State) ->
1060 read_message(MsgId, From, State);
1061 run_pending_action({contains, MsgId, From}, State) ->
1062 contains_message(MsgId, From, State);
1063 run_pending_action({remove, MsgId, CRef}, State) ->
1064 remove_message(MsgId, CRef, State).
1066 safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) ->
1068 SuccessFun(ets:update_counter(Tab, Key, UpdateOp))
1069 catch error:badarg -> FailThunk()
1072 safe_ets_update_counter_ok(Tab, Key, UpdateOp, FailThunk) ->
1073 safe_ets_update_counter(Tab, Key, UpdateOp, fun (_) -> ok end, FailThunk).
1075 adjust_valid_total_size(File, Delta, State = #msstate {
1076 sum_valid_data = SumValid,
1077 file_summary_ets = FileSummaryEts }) ->
1078 [_] = ets:update_counter(FileSummaryEts, File,
1079 [{#file_summary.valid_total_size, Delta}]),
1080 State #msstate { sum_valid_data = SumValid + Delta }.
1082 orddict_store(Key, Val, Dict) ->
1083 false = orddict:is_key(Key, Dict),
1084 orddict:store(Key, Val, Dict).
1086 update_pending_confirms(Fun, CRef,
1087 State = #msstate { clients = Clients,
1088 cref_to_msg_ids = CTM }) ->
1089 case dict:fetch(CRef, Clients) of
1090 {undefined, _CloseFDsFun} -> State;
1091 {MsgOnDiskFun, _CloseFDsFun} -> CTM1 = Fun(MsgOnDiskFun, CTM),
1093 cref_to_msg_ids = CTM1 }
1096 record_pending_confirm(CRef, MsgId, State) ->
1097 update_pending_confirms(
1098 fun (_MsgOnDiskFun, CTM) ->
1099 dict:update(CRef, fun (MsgIds) -> gb_sets:add(MsgId, MsgIds) end,
1100 gb_sets:singleton(MsgId), CTM)
1103 client_confirm(CRef, MsgIds, ActionTaken, State) ->
1104 update_pending_confirms(
1105 fun (MsgOnDiskFun, CTM) ->
1106 MsgOnDiskFun(MsgIds, ActionTaken),
1107 case dict:find(CRef, CTM) of
1108 {ok, Gs} -> MsgIds1 = gb_sets:difference(Gs, MsgIds),
1109 case gb_sets:is_empty(MsgIds1) of
1110 true -> dict:erase(CRef, CTM);
1111 false -> dict:store(CRef, MsgIds1, CTM)
1117 %% Detect whether the MsgId is older or younger than the client's death
1118 %% msg (if there is one). If the msg is older than the client death
1119 %% msg, and it has a 0 ref_count we must only alter the ref_count, not
1120 %% rewrite the msg - rewriting it would make it younger than the death
1121 %% msg and thus should be ignored. Note that this (correctly) returns
1122 %% false when testing to remove the death msg itself.
1123 should_mask_action(CRef, MsgId,
1124 State = #msstate { dying_clients = DyingClients }) ->
1125 case {sets:is_element(CRef, DyingClients), index_lookup(MsgId, State)} of
1126 {false, Location} ->
1128 {true, not_found} ->
1130 {true, #msg_location { file = File, offset = Offset,
1131 ref_count = RefCount } = Location} ->
1132 #msg_location { file = DeathFile, offset = DeathOffset } =
1133 index_lookup(CRef, State),
1134 {case {{DeathFile, DeathOffset} < {File, Offset}, RefCount} of
1136 {false, 0} -> false_if_increment;
1141 %%----------------------------------------------------------------------------
1142 %% file helper functions
1143 %%----------------------------------------------------------------------------
1145 open_file(Dir, FileName, Mode) ->
1146 file_handle_cache:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode,
1147 [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]).
1149 close_handle(Key, CState = #client_msstate { file_handle_cache = FHC }) ->
1150 CState #client_msstate { file_handle_cache = close_handle(Key, FHC) };
1152 close_handle(Key, State = #msstate { file_handle_cache = FHC }) ->
1153 State #msstate { file_handle_cache = close_handle(Key, FHC) };
1155 close_handle(Key, FHC) ->
1156 case dict:find(Key, FHC) of
1157 {ok, Hdl} -> ok = file_handle_cache:close(Hdl),
1158 dict:erase(Key, FHC);
1162 mark_handle_open(FileHandlesEts, File, Ref) ->
1163 %% This is fine to fail (already exists). Note it could fail with
1164 %% the value being close, and not have it updated to open.
1165 ets:insert_new(FileHandlesEts, {{Ref, File}, open}),
1168 %% See comment in client_read3 - only call this when the file is locked
1169 mark_handle_to_close(ClientRefs, FileHandlesEts, File, Invoke) ->
1171 case (ets:update_element(FileHandlesEts, Key, {2, close})
1173 true -> case dict:fetch(Ref, ClientRefs) of
1174 {_MsgOnDiskFun, undefined} -> ok;
1175 {_MsgOnDiskFun, CloseFDsFun} -> ok = CloseFDsFun()
1179 end || {{Ref, _File} = Key, open} <-
1180 ets:match_object(FileHandlesEts, {{'_', File}, open}) ],
1183 safe_file_delete_fun(File, Dir, FileHandlesEts) ->
1184 fun () -> safe_file_delete(File, Dir, FileHandlesEts) end.
1186 safe_file_delete(File, Dir, FileHandlesEts) ->
1187 %% do not match on any value - it's the absence of the row that
1188 %% indicates the client has really closed the file.
1189 case ets:match_object(FileHandlesEts, {{'_', File}, '_'}, 1) of
1190 {[_|_], _Cont} -> false;
1191 _ -> ok = file:delete(
1192 form_filename(Dir, filenum_to_name(File))),
1196 close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts,
1197 client_ref = Ref } =
1199 Objs = ets:match_object(FileHandlesEts, {{Ref, '_'}, close}),
1200 {ok, lists:foldl(fun ({Key = {_Ref, File}, close}, CStateM) ->
1201 true = ets:delete(FileHandlesEts, Key),
1202 close_handle(File, CStateM)
1203 end, CState, Objs)}.
1205 close_all_handles(CState = #client_msstate { file_handles_ets = FileHandlesEts,
1206 file_handle_cache = FHC,
1207 client_ref = Ref }) ->
1208 ok = dict:fold(fun (File, Hdl, ok) ->
1209 true = ets:delete(FileHandlesEts, {Ref, File}),
1210 file_handle_cache:close(Hdl)
1212 CState #client_msstate { file_handle_cache = dict:new() };
1214 close_all_handles(State = #msstate { file_handle_cache = FHC }) ->
1215 ok = dict:fold(fun (_Key, Hdl, ok) -> file_handle_cache:close(Hdl) end,
1217 State #msstate { file_handle_cache = dict:new() }.
1219 get_read_handle(FileNum, CState = #client_msstate { file_handle_cache = FHC,
1221 {Hdl, FHC2} = get_read_handle(FileNum, FHC, Dir),
1222 {Hdl, CState #client_msstate { file_handle_cache = FHC2 }};
1224 get_read_handle(FileNum, State = #msstate { file_handle_cache = FHC,
1226 {Hdl, FHC2} = get_read_handle(FileNum, FHC, Dir),
1227 {Hdl, State #msstate { file_handle_cache = FHC2 }}.
1229 get_read_handle(FileNum, FHC, Dir) ->
1230 case dict:find(FileNum, FHC) of
1231 {ok, Hdl} -> {Hdl, FHC};
1232 error -> {ok, Hdl} = open_file(Dir, filenum_to_name(FileNum),
1234 {Hdl, dict:store(FileNum, Hdl, FHC)}
1237 preallocate(Hdl, FileSizeLimit, FinalPos) ->
1238 {ok, FileSizeLimit} = file_handle_cache:position(Hdl, FileSizeLimit),
1239 ok = file_handle_cache:truncate(Hdl),
1240 {ok, FinalPos} = file_handle_cache:position(Hdl, FinalPos),
1243 truncate_and_extend_file(Hdl, Lowpoint, Highpoint) ->
1244 {ok, Lowpoint} = file_handle_cache:position(Hdl, Lowpoint),
1245 ok = file_handle_cache:truncate(Hdl),
1246 ok = preallocate(Hdl, Highpoint, Lowpoint).
1248 form_filename(Dir, Name) -> filename:join(Dir, Name).
1250 filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION.
1252 filename_to_num(FileName) -> list_to_integer(filename:rootname(FileName)).
1254 list_sorted_file_names(Dir, Ext) ->
1255 lists:sort(fun (A, B) -> filename_to_num(A) < filename_to_num(B) end,
1256 filelib:wildcard("*" ++ Ext, Dir)).
1258 %%----------------------------------------------------------------------------
1259 %% message cache helper functions
1260 %%----------------------------------------------------------------------------
1262 update_msg_cache(CacheEts, MsgId, Msg) ->
1263 case ets:insert_new(CacheEts, {MsgId, Msg, 1}) of
1265 false -> safe_ets_update_counter_ok(
1266 CacheEts, MsgId, {3, +1},
1267 fun () -> update_msg_cache(CacheEts, MsgId, Msg) end)
1270 %%----------------------------------------------------------------------------
1272 %%----------------------------------------------------------------------------
1274 index_lookup_positive_ref_count(Key, State) ->
1275 case index_lookup(Key, State) of
1276 not_found -> not_found;
1277 #msg_location { ref_count = 0 } -> not_found;
1278 #msg_location {} = MsgLocation -> MsgLocation
1281 index_update_ref_count(Key, RefCount, State) ->
1282 index_update_fields(Key, {#msg_location.ref_count, RefCount}, State).
1284 index_lookup(Key, #client_msstate { index_module = Index,
1285 index_state = State }) ->
1286 Index:lookup(Key, State);
1288 index_lookup(Key, #msstate { index_module = Index, index_state = State }) ->
1289 Index:lookup(Key, State).
1291 index_insert(Obj, #msstate { index_module = Index, index_state = State }) ->
1292 Index:insert(Obj, State).
1294 index_update(Obj, #msstate { index_module = Index, index_state = State }) ->
1295 Index:update(Obj, State).
1297 index_update_fields(Key, Updates, #msstate { index_module = Index,
1298 index_state = State }) ->
1299 Index:update_fields(Key, Updates, State).
1301 index_delete(Key, #msstate { index_module = Index, index_state = State }) ->
1302 Index:delete(Key, State).
1304 index_delete_by_file(File, #msstate { index_module = Index,
1305 index_state = State }) ->
1306 Index:delete_by_file(File, State).
1308 %%----------------------------------------------------------------------------
1309 %% shutdown and recovery
1310 %%----------------------------------------------------------------------------
1312 recover_index_and_client_refs(IndexModule, _Recover, undefined, Dir, _Server) ->
1313 {false, IndexModule:new(Dir), []};
1314 recover_index_and_client_refs(IndexModule, false, _ClientRefs, Dir, Server) ->
1315 rabbit_log:warning("~w: rebuilding indices from scratch~n", [Server]),
1316 {false, IndexModule:new(Dir), []};
1317 recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Server) ->
1318 Fresh = fun (ErrorMsg, ErrorArgs) ->
1319 rabbit_log:warning("~w: " ++ ErrorMsg ++ "~n"
1320 "rebuilding indices from scratch~n",
1321 [Server | ErrorArgs]),
1322 {false, IndexModule:new(Dir), []}
1324 case read_recovery_terms(Dir) of
1326 Fresh("failed to read recovery terms: ~p", [Error]);
1328 RecClientRefs = proplists:get_value(client_refs, Terms, []),
1329 RecIndexModule = proplists:get_value(index_module, Terms),
1330 case (lists:sort(ClientRefs) =:= lists:sort(RecClientRefs)
1331 andalso IndexModule =:= RecIndexModule) of
1332 true -> case IndexModule:recover(Dir) of
1333 {ok, IndexState1} ->
1334 {true, IndexState1, ClientRefs};
1336 Fresh("failed to recover index: ~p", [Error])
1338 false -> Fresh("recovery terms differ from present", [])
1342 store_recovery_terms(Terms, Dir) ->
1343 rabbit_file:write_term_file(filename:join(Dir, ?CLEAN_FILENAME), Terms).
1345 read_recovery_terms(Dir) ->
1346 Path = filename:join(Dir, ?CLEAN_FILENAME),
1347 case rabbit_file:read_term_file(Path) of
1348 {ok, Terms} -> case file:delete(Path) of
1349 ok -> {true, Terms};
1350 {error, Error} -> {false, Error}
1352 {error, Error} -> {false, Error}
1355 store_file_summary(Tid, Dir) ->
1356 ok = ets:tab2file(Tid, filename:join(Dir, ?FILE_SUMMARY_FILENAME),
1357 [{extended_info, [object_count]}]).
1359 recover_file_summary(false, _Dir) ->
1360 %% TODO: the only reason for this to be an *ordered*_set is so
1361 %% that a) maybe_compact can start a traversal from the eldest
1362 %% file, and b) build_index in fast recovery mode can easily
1363 %% identify the current file. It's awkward to have both that
1364 %% odering and the left/right pointers in the entries - replacing
1365 %% the former with some additional bit of state would be easy, but
1366 %% ditching the latter would be neater.
1367 {false, ets:new(rabbit_msg_store_file_summary,
1368 [ordered_set, public, {keypos, #file_summary.file}])};
1369 recover_file_summary(true, Dir) ->
1370 Path = filename:join(Dir, ?FILE_SUMMARY_FILENAME),
1371 case ets:file2tab(Path) of
1372 {ok, Tid} -> ok = file:delete(Path),
1374 {error, _Error} -> recover_file_summary(false, Dir)
1377 count_msg_refs(Gen, Seed, State) ->
1381 {_MsgId, 0, Next} ->
1382 count_msg_refs(Gen, Next, State);
1383 {MsgId, Delta, Next} ->
1384 ok = case index_lookup(MsgId, State) of
1386 index_insert(#msg_location { msg_id = MsgId,
1388 ref_count = Delta },
1390 #msg_location { ref_count = RefCount } = StoreEntry ->
1391 NewRefCount = RefCount + Delta,
1393 0 -> index_delete(MsgId, State);
1394 _ -> index_update(StoreEntry #msg_location {
1395 ref_count = NewRefCount },
1399 count_msg_refs(Gen, Next, State)
1402 recover_crashed_compactions(Dir) ->
1403 FileNames = list_sorted_file_names(Dir, ?FILE_EXTENSION),
1404 TmpFileNames = list_sorted_file_names(Dir, ?FILE_EXTENSION_TMP),
1406 fun (TmpFileName) ->
1407 NonTmpRelatedFileName =
1408 filename:rootname(TmpFileName) ++ ?FILE_EXTENSION,
1409 true = lists:member(NonTmpRelatedFileName, FileNames),
1410 ok = recover_crashed_compaction(
1411 Dir, TmpFileName, NonTmpRelatedFileName)
1415 recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName) ->
1416 %% Because a msg can legitimately appear multiple times in the
1417 %% same file, identifying the contents of the tmp file and where
1418 %% they came from is non-trivial. If we are recovering a crashed
1419 %% compaction then we will be rebuilding the index, which can cope
1420 %% with duplicates appearing. Thus the simplest and safest thing
1421 %% to do is to append the contents of the tmp file to its main
1423 {ok, TmpHdl} = open_file(Dir, TmpFileName, ?READ_MODE),
1424 {ok, MainHdl} = open_file(Dir, NonTmpRelatedFileName,
1425 ?READ_MODE ++ ?WRITE_MODE),
1426 {ok, _End} = file_handle_cache:position(MainHdl, eof),
1427 Size = filelib:file_size(form_filename(Dir, TmpFileName)),
1428 {ok, Size} = file_handle_cache:copy(TmpHdl, MainHdl, Size),
1429 ok = file_handle_cache:close(MainHdl),
1430 ok = file_handle_cache:delete(TmpHdl),
1433 scan_file_for_valid_messages(Dir, FileName) ->
1434 case open_file(Dir, FileName, ?READ_MODE) of
1435 {ok, Hdl} -> Valid = rabbit_msg_file:scan(
1436 Hdl, filelib:file_size(
1437 form_filename(Dir, FileName)),
1438 fun scan_fun/2, []),
1439 ok = file_handle_cache:close(Hdl),
1441 {error, enoent} -> {ok, [], 0};
1442 {error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}}
1445 scan_fun({MsgId, TotalSize, Offset, _Msg}, Acc) ->
1446 [{MsgId, TotalSize, Offset} | Acc].
1448 %% Takes the list in *ascending* order (i.e. eldest message
1449 %% first). This is the opposite of what scan_file_for_valid_messages
1450 %% produces. The list of msgs that is produced is youngest first.
1451 drop_contiguous_block_prefix(L) -> drop_contiguous_block_prefix(L, 0).
1453 drop_contiguous_block_prefix([], ExpectedOffset) ->
1454 {ExpectedOffset, []};
1455 drop_contiguous_block_prefix([#msg_location { offset = ExpectedOffset,
1456 total_size = TotalSize } | Tail],
1458 ExpectedOffset1 = ExpectedOffset + TotalSize,
1459 drop_contiguous_block_prefix(Tail, ExpectedOffset1);
1460 drop_contiguous_block_prefix(MsgsAfterGap, ExpectedOffset) ->
1461 {ExpectedOffset, MsgsAfterGap}.
1463 build_index(true, _StartupFunState,
1464 State = #msstate { file_summary_ets = FileSummaryEts }) ->
1466 fun (#file_summary { valid_total_size = ValidTotalSize,
1467 file_size = FileSize,
1469 {_Offset, State1 = #msstate { sum_valid_data = SumValid,
1470 sum_file_size = SumFileSize }}) ->
1471 {FileSize, State1 #msstate {
1472 sum_valid_data = SumValid + ValidTotalSize,
1473 sum_file_size = SumFileSize + FileSize,
1474 current_file = File }}
1475 end, {0, State}, FileSummaryEts);
1476 build_index(false, {MsgRefDeltaGen, MsgRefDeltaGenInit},
1477 State = #msstate { dir = Dir }) ->
1478 ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State),
1479 {ok, Pid} = gatherer:start_link(),
1480 case [filename_to_num(FileName) ||
1481 FileName <- list_sorted_file_names(Dir, ?FILE_EXTENSION)] of
1482 [] -> build_index(Pid, undefined, [State #msstate.current_file],
1484 Files -> {Offset, State1} = build_index(Pid, undefined, Files, State),
1485 {Offset, lists:foldl(fun delete_file_if_empty/2,
1489 build_index(Gatherer, Left, [],
1490 State = #msstate { file_summary_ets = FileSummaryEts,
1491 sum_valid_data = SumValid,
1492 sum_file_size = SumFileSize }) ->
1493 case gatherer:out(Gatherer) of
1496 ok = gatherer:stop(Gatherer),
1497 ok = index_delete_by_file(undefined, State),
1498 Offset = case ets:lookup(FileSummaryEts, Left) of
1500 [#file_summary { file_size = FileSize }] -> FileSize
1502 {Offset, State #msstate { current_file = Left }};
1503 {value, #file_summary { valid_total_size = ValidTotalSize,
1504 file_size = FileSize } = FileSummary} ->
1505 true = ets:insert_new(FileSummaryEts, FileSummary),
1506 build_index(Gatherer, Left, [],
1508 sum_valid_data = SumValid + ValidTotalSize,
1509 sum_file_size = SumFileSize + FileSize })
1511 build_index(Gatherer, Left, [File|Files], State) ->
1512 ok = gatherer:fork(Gatherer),
1513 ok = worker_pool:submit_async(
1514 fun () -> build_index_worker(Gatherer, State,
1517 build_index(Gatherer, File, Files, State).
1519 build_index_worker(Gatherer, State = #msstate { dir = Dir },
1520 Left, File, Files) ->
1521 {ok, Messages, FileSize} =
1522 scan_file_for_valid_messages(Dir, filenum_to_name(File)),
1523 {ValidMessages, ValidTotalSize} =
1525 fun (Obj = {MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
1526 case index_lookup(MsgId, State) of
1527 #msg_location { file = undefined } = StoreEntry ->
1528 ok = index_update(StoreEntry #msg_location {
1529 file = File, offset = Offset,
1530 total_size = TotalSize },
1532 {[Obj | VMAcc], VTSAcc + TotalSize};
1536 end, {[], 0}, Messages),
1537 {Right, FileSize1} =
1539 %% if it's the last file, we'll truncate to remove any
1540 %% rubbish above the last valid message. This affects the
1542 [] -> {undefined, case ValidMessages of
1544 _ -> {_MsgId, TotalSize, Offset} =
1545 lists:last(ValidMessages),
1548 [F|_] -> {F, FileSize}
1550 ok = gatherer:in(Gatherer, #file_summary {
1552 valid_total_size = ValidTotalSize,
1555 file_size = FileSize1,
1558 ok = gatherer:finish(Gatherer).
1560 %%----------------------------------------------------------------------------
1561 %% garbage collection / compaction / aggregation -- internal
1562 %%----------------------------------------------------------------------------
1564 maybe_roll_to_new_file(
1566 State = #msstate { dir = Dir,
1567 current_file_handle = CurHdl,
1568 current_file = CurFile,
1569 file_summary_ets = FileSummaryEts,
1570 cur_file_cache_ets = CurFileCacheEts,
1571 file_size_limit = FileSizeLimit })
1572 when Offset >= FileSizeLimit ->
1573 State1 = internal_sync(State),
1574 ok = file_handle_cache:close(CurHdl),
1575 NextFile = CurFile + 1,
1576 {ok, NextHdl} = open_file(Dir, filenum_to_name(NextFile), ?WRITE_MODE),
1577 true = ets:insert_new(FileSummaryEts, #file_summary {
1579 valid_total_size = 0,
1585 true = ets:update_element(FileSummaryEts, CurFile,
1586 {#file_summary.right, NextFile}),
1587 true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}),
1588 maybe_compact(State1 #msstate { current_file_handle = NextHdl,
1589 current_file = NextFile });
1590 maybe_roll_to_new_file(_, State) ->
1593 maybe_compact(State = #msstate { sum_valid_data = SumValid,
1594 sum_file_size = SumFileSize,
1596 pending_gc_completion = Pending,
1597 file_summary_ets = FileSummaryEts,
1598 file_size_limit = FileSizeLimit })
1599 when SumFileSize > 2 * FileSizeLimit andalso
1600 (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION ->
1601 %% TODO: the algorithm here is sub-optimal - it may result in a
1602 %% complete traversal of FileSummaryEts.
1603 case ets:first(FileSummaryEts) of
1607 case find_files_to_combine(FileSummaryEts, FileSizeLimit,
1608 ets:lookup(FileSummaryEts, First)) of
1612 Pending1 = orddict_store(Dst, [],
1613 orddict_store(Src, [], Pending)),
1614 State1 = close_handle(Src, close_handle(Dst, State)),
1615 true = ets:update_element(FileSummaryEts, Src,
1616 {#file_summary.locked, true}),
1617 true = ets:update_element(FileSummaryEts, Dst,
1618 {#file_summary.locked, true}),
1619 ok = rabbit_msg_store_gc:combine(GCPid, Src, Dst),
1620 State1 #msstate { pending_gc_completion = Pending1 }
1623 maybe_compact(State) ->
1626 find_files_to_combine(FileSummaryEts, FileSizeLimit,
1627 [#file_summary { file = Dst,
1628 valid_total_size = DstValid,
1630 locked = DstLocked }]) ->
1635 [#file_summary { file = Src,
1636 valid_total_size = SrcValid,
1639 locked = SrcLocked }] = Next =
1640 ets:lookup(FileSummaryEts, Src),
1642 undefined -> not_found;
1643 _ -> case (DstValid + SrcValid =< FileSizeLimit) andalso
1644 (DstValid > 0) andalso (SrcValid > 0) andalso
1645 not (DstLocked orelse SrcLocked) of
1647 false -> find_files_to_combine(
1648 FileSummaryEts, FileSizeLimit, Next)
1653 delete_file_if_empty(File, State = #msstate { current_file = File }) ->
1655 delete_file_if_empty(File, State = #msstate {
1657 file_summary_ets = FileSummaryEts,
1658 pending_gc_completion = Pending }) ->
1659 [#file_summary { valid_total_size = ValidData,
1661 ets:lookup(FileSummaryEts, File),
1663 %% don't delete the file_summary_ets entry for File here
1664 %% because we could have readers which need to be able to
1665 %% decrement the readers count.
1666 0 -> true = ets:update_element(FileSummaryEts, File,
1667 {#file_summary.locked, true}),
1668 ok = rabbit_msg_store_gc:delete(GCPid, File),
1669 Pending1 = orddict_store(File, [], Pending),
1671 State #msstate { pending_gc_completion = Pending1 });
1675 cleanup_after_file_deletion(File,
1676 #msstate { file_handles_ets = FileHandlesEts,
1677 file_summary_ets = FileSummaryEts,
1678 clients = Clients }) ->
1679 %% Ensure that any clients that have open fhs to the file close
1680 %% them before using them again. This has to be done here (given
1681 %% it's done in the msg_store, and not the gc), and not when
1682 %% starting up the GC, because if done when starting up the GC,
1683 %% the client could find the close, and close and reopen the fh,
1684 %% whilst the GC is waiting for readers to disappear, before it's
1685 %% actually done the GC.
1686 true = mark_handle_to_close(Clients, FileHandlesEts, File, true),
1687 [#file_summary { left = Left,
1690 readers = 0 }] = ets:lookup(FileSummaryEts, File),
1691 %% We'll never delete the current file, so right is never undefined
1692 true = Right =/= undefined, %% ASSERTION
1693 true = ets:update_element(FileSummaryEts, Right,
1694 {#file_summary.left, Left}),
1695 %% ensure the double linked list is maintained
1697 undefined -> true; %% File is the eldest file (left-most)
1698 _ -> ets:update_element(FileSummaryEts, Left,
1699 {#file_summary.right, Right})
1701 true = ets:delete(FileSummaryEts, File),
1704 %%----------------------------------------------------------------------------
1705 %% garbage collection / compaction / aggregation -- external
1706 %%----------------------------------------------------------------------------
1708 has_readers(File, #gc_state { file_summary_ets = FileSummaryEts }) ->
1709 [#file_summary { locked = true, readers = Count }] =
1710 ets:lookup(FileSummaryEts, File),
1713 combine_files(Source, Destination,
1714 State = #gc_state { file_summary_ets = FileSummaryEts,
1715 file_handles_ets = FileHandlesEts,
1717 msg_store = Server }) ->
1721 valid_total_size = SourceValid,
1722 file_size = SourceFileSize,
1723 locked = true }] = ets:lookup(FileSummaryEts, Source),
1727 valid_total_size = DestinationValid,
1728 file_size = DestinationFileSize,
1729 locked = true }] = ets:lookup(FileSummaryEts, Destination),
1731 SourceName = filenum_to_name(Source),
1732 DestinationName = filenum_to_name(Destination),
1733 {ok, SourceHdl} = open_file(Dir, SourceName,
1735 {ok, DestinationHdl} = open_file(Dir, DestinationName,
1736 ?READ_AHEAD_MODE ++ ?WRITE_MODE),
1737 TotalValidData = SourceValid + DestinationValid,
1738 %% if DestinationValid =:= DestinationContiguousTop then we don't
1740 %% if they're not equal, then we need to write out everything past
1741 %% the DestinationContiguousTop to a tmp file then truncate,
1742 %% copy back in, and then copy over from Source
1743 %% otherwise we just truncate straight away and copy over from Source
1744 {DestinationWorkList, DestinationValid} =
1745 load_and_vacuum_message_file(Destination, State),
1746 {DestinationContiguousTop, DestinationWorkListTail} =
1747 drop_contiguous_block_prefix(DestinationWorkList),
1748 case DestinationWorkListTail of
1749 [] -> ok = truncate_and_extend_file(
1750 DestinationHdl, DestinationContiguousTop, TotalValidData);
1751 _ -> Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
1752 {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE++?WRITE_MODE),
1754 DestinationWorkListTail, DestinationContiguousTop,
1755 DestinationValid, DestinationHdl, TmpHdl, Destination,
1757 TmpSize = DestinationValid - DestinationContiguousTop,
1758 %% so now Tmp contains everything we need to salvage
1759 %% from Destination, and index_state has been updated to
1760 %% reflect the compaction of Destination so truncate
1761 %% Destination and copy from Tmp back to the end
1762 {ok, 0} = file_handle_cache:position(TmpHdl, 0),
1763 ok = truncate_and_extend_file(
1764 DestinationHdl, DestinationContiguousTop, TotalValidData),
1766 file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize),
1767 %% position in DestinationHdl should now be DestinationValid
1768 ok = file_handle_cache:sync(DestinationHdl),
1769 ok = file_handle_cache:delete(TmpHdl)
1771 {SourceWorkList, SourceValid} = load_and_vacuum_message_file(Source, State),
1772 ok = copy_messages(SourceWorkList, DestinationValid, TotalValidData,
1773 SourceHdl, DestinationHdl, Destination, State),
1775 ok = file_handle_cache:close(DestinationHdl),
1776 ok = file_handle_cache:close(SourceHdl),
1778 %% don't update dest.right, because it could be changing at the
1780 true = ets:update_element(
1781 FileSummaryEts, Destination,
1782 [{#file_summary.valid_total_size, TotalValidData},
1783 {#file_summary.file_size, TotalValidData}]),
1785 Reclaimed = SourceFileSize + DestinationFileSize - TotalValidData,
1786 gen_server2:cast(Server, {combine_files, Source, Destination, Reclaimed}),
1787 safe_file_delete_fun(Source, Dir, FileHandlesEts).
1789 delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
1790 file_handles_ets = FileHandlesEts,
1792 msg_store = Server }) ->
1793 [#file_summary { valid_total_size = 0,
1795 file_size = FileSize,
1796 readers = 0 }] = ets:lookup(FileSummaryEts, File),
1797 {[], 0} = load_and_vacuum_message_file(File, State),
1798 gen_server2:cast(Server, {delete_file, File, FileSize}),
1799 safe_file_delete_fun(File, Dir, FileHandlesEts).
1801 load_and_vacuum_message_file(File, #gc_state { dir = Dir,
1802 index_module = Index,
1803 index_state = IndexState }) ->
1804 %% Messages here will be end-of-file at start-of-list
1805 {ok, Messages, _FileSize} =
1806 scan_file_for_valid_messages(Dir, filenum_to_name(File)),
1807 %% foldl will reverse so will end up with msgs in ascending offset order
1809 fun ({MsgId, TotalSize, Offset}, Acc = {List, Size}) ->
1810 case Index:lookup(MsgId, IndexState) of
1811 #msg_location { file = File, total_size = TotalSize,
1812 offset = Offset, ref_count = 0 } = Entry ->
1813 ok = Index:delete_object(Entry, IndexState),
1815 #msg_location { file = File, total_size = TotalSize,
1816 offset = Offset } = Entry ->
1817 {[ Entry | List ], TotalSize + Size};
1821 end, {[], 0}, Messages).
1823 copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
1824 Destination, #gc_state { index_module = Index,
1825 index_state = IndexState }) ->
1826 Copy = fun ({BlockStart, BlockEnd}) ->
1827 BSize = BlockEnd - BlockStart,
1829 file_handle_cache:position(SourceHdl, BlockStart),
1831 file_handle_cache:copy(SourceHdl, DestinationHdl, BSize)
1835 fun (#msg_location { msg_id = MsgId, offset = Offset,
1836 total_size = TotalSize },
1837 {CurOffset, Block = {BlockStart, BlockEnd}}) ->
1838 %% CurOffset is in the DestinationFile.
1839 %% Offset, BlockStart and BlockEnd are in the SourceFile
1840 %% update MsgLocation to reflect change of file and offset
1841 ok = Index:update_fields(MsgId,
1842 [{#msg_location.file, Destination},
1843 {#msg_location.offset, CurOffset}],
1845 {CurOffset + TotalSize,
1848 %% base case, called only for the first list elem
1849 {Offset, Offset + TotalSize};
1851 %% extend the current block because the
1852 %% next msg follows straight on
1853 {BlockStart, BlockEnd + TotalSize};
1855 %% found a gap, so actually do the work for
1856 %% the previous block
1858 {Offset, Offset + TotalSize}
1860 end, {InitOffset, {undefined, undefined}}, WorkList) of
1861 {FinalOffset, Block} ->
1864 _ -> Copy(Block), %% do the last remaining block
1865 ok = file_handle_cache:sync(DestinationHdl)
1867 {FinalOffsetZ, _Block} ->
1868 {gc_error, [{expected, FinalOffset},
1869 {got, FinalOffsetZ},
1870 {destination, Destination}]}
1873 force_recovery(BaseDir, Store) ->
1874 Dir = filename:join(BaseDir, atom_to_list(Store)),
1875 case file:delete(filename:join(Dir, ?CLEAN_FILENAME)) of
1877 {error, enoent} -> ok
1879 recover_crashed_compactions(BaseDir),
1882 foreach_file(D, Fun, Files) ->
1883 [ok = Fun(filename:join(D, File)) || File <- Files].
1885 foreach_file(D1, D2, Fun, Files) ->
1886 [ok = Fun(filename:join(D1, File), filename:join(D2, File)) || File <- Files].
1888 transform_dir(BaseDir, Store, TransformFun) ->
1889 Dir = filename:join(BaseDir, atom_to_list(Store)),
1890 TmpDir = filename:join(Dir, ?TRANSFORM_TMP),
1891 TransformFile = fun (A, B) -> transform_msg_file(A, B, TransformFun) end,
1892 CopyFile = fun (Src, Dst) -> {ok, _Bytes} = file:copy(Src, Dst), ok end,
1893 case filelib:is_dir(TmpDir) of
1894 true -> throw({error, transform_failed_previously});
1895 false -> FileList = list_sorted_file_names(Dir, ?FILE_EXTENSION),
1896 foreach_file(Dir, TmpDir, TransformFile, FileList),
1897 foreach_file(Dir, fun file:delete/1, FileList),
1898 foreach_file(TmpDir, Dir, CopyFile, FileList),
1899 foreach_file(TmpDir, fun file:delete/1, FileList),
1900 ok = file:del_dir(TmpDir)
1903 transform_msg_file(FileOld, FileNew, TransformFun) ->
1904 ok = rabbit_file:ensure_parent_dirs_exist(FileNew),
1905 {ok, RefOld} = file_handle_cache:open(FileOld, [raw, binary, read], []),
1906 {ok, RefNew} = file_handle_cache:open(FileNew, [raw, binary, write],
1908 ?HANDLE_CACHE_BUFFER_SIZE}]),
1909 {ok, _Acc, _IgnoreSize} =
1910 rabbit_msg_file:scan(
1911 RefOld, filelib:file_size(FileOld),
1912 fun({MsgId, _Size, _Offset, BinMsg}, ok) ->
1913 {ok, MsgNew} = case binary_to_term(BinMsg) of
1914 <<>> -> {ok, <<>>}; %% dying client marker
1915 Msg -> TransformFun(Msg)
1917 {ok, _} = rabbit_msg_file:append(RefNew, MsgId, MsgNew),
1920 ok = file_handle_cache:close(RefOld),
1921 ok = file_handle_cache:close(RefNew),