1 %% The contents of this file are subject to the Mozilla Public License
2 %% Version 1.1 (the "License"); you may not use this file except in
3 %% compliance with the License. You may obtain a copy of the License at
4 %% http://www.mozilla.org/MPL/
6 %% Software distributed under the License is distributed on an "AS IS"
7 %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8 %% License for the specific language governing rights and limitations
11 %% The Original Code is RabbitMQ.
13 %% The Initial Developers of the Original Code are LShift Ltd,
14 %% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
16 %% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
17 %% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
18 %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
19 %% Technologies LLC, and Rabbit Technologies Ltd.
21 %% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
22 %% Ltd. Portions created by Cohesive Financial Technologies LLC are
23 %% Copyright (C) 2007-2010 Cohesive Financial Technologies
24 %% LLC. Portions created by Rabbit Technologies Ltd are Copyright
25 %% (C) 2007-2010 Rabbit Technologies Ltd.
27 %% All Rights Reserved.
29 %% Contributor(s): ______________________________________.
32 -module(rabbit_msg_store).
34 -behaviour(gen_server2).
36 -export([start_link/4, write/4, read/3, contains/2, remove/2, release/2,
37 sync/3, client_init/2, client_terminate/1,
38 client_delete_and_terminate/3, successfully_recovered_state/1]).
40 -export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal
42 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
43 terminate/2, code_change/3]).
45 %%----------------------------------------------------------------------------
47 -include("rabbit_msg_store.hrl").
49 -define(SYNC_INTERVAL, 5). %% milliseconds
50 -define(CLEAN_FILENAME, "clean.dot").
51 -define(FILE_SUMMARY_FILENAME, "file_summary.ets").
53 -define(BINARY_MODE, [raw, binary]).
54 -define(READ_MODE, [read]).
55 -define(READ_AHEAD_MODE, [read_ahead | ?READ_MODE]).
56 -define(WRITE_MODE, [write]).
58 -define(FILE_EXTENSION, ".rdq").
59 -define(FILE_EXTENSION_TMP, ".rdt").
61 -define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB
63 %%----------------------------------------------------------------------------
66 { dir, %% store directory
67 index_module, %% the module for index ops
68 index_state, %% where are messages?
69 current_file, %% current file name as number
70 current_file_handle, %% current file handle since the last fsync?
71 file_handle_cache, %% file handle cache
72 on_sync, %% pending sync requests
73 sync_timer_ref, %% TRef for our interval timer
74 sum_valid_data, %% sum of valid data in all files
75 sum_file_size, %% sum of file sizes
76 pending_gc_completion, %% things to do once GC completes
77 gc_active, %% is the GC currently working?
78 gc_pid, %% pid of our GC
79 file_handles_ets, %% tid of the shared file handles table
80 file_summary_ets, %% tid of the file summary table
81 dedup_cache_ets, %% tid of dedup cache table
82 cur_file_cache_ets, %% tid of current file cache table
83 client_refs, %% set of references of all registered clients
84 successfully_recovered, %% boolean: did we recover state?
85 file_size_limit %% how big are our files allowed to get?
88 -record(client_msstate,
100 -record(file_summary,
101 {file, valid_total_size, contiguous_top, left, right, file_size,
104 %%----------------------------------------------------------------------------
108 -type(server() :: pid() | atom()).
109 -type(file_num() :: non_neg_integer()).
110 -type(client_msstate() :: #client_msstate {
111 file_handle_cache :: dict:dictionary(),
112 index_state :: any(),
113 index_module :: atom(),
114 dir :: file:filename(),
116 file_handles_ets :: ets:tid(),
117 file_summary_ets :: ets:tid(),
118 dedup_cache_ets :: ets:tid(),
119 cur_file_cache_ets :: ets:tid() }).
120 -type(startup_fun_state() ::
121 {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})),
124 -spec(start_link/4 ::
125 (atom(), file:filename(), [binary()] | 'undefined',
126 startup_fun_state()) -> rabbit_types:ok_pid_or_error()).
127 -spec(write/4 :: (server(), rabbit_guid:guid(), msg(), client_msstate()) ->
128 rabbit_types:ok(client_msstate())).
129 -spec(read/3 :: (server(), rabbit_guid:guid(), client_msstate()) ->
130 {rabbit_types:ok(msg()) | 'not_found', client_msstate()}).
131 -spec(contains/2 :: (server(), rabbit_guid:guid()) -> boolean()).
132 -spec(remove/2 :: (server(), [rabbit_guid:guid()]) -> 'ok').
133 -spec(release/2 :: (server(), [rabbit_guid:guid()]) -> 'ok').
134 -spec(sync/3 :: (server(), [rabbit_guid:guid()], fun (() -> any())) -> 'ok').
135 -spec(gc_done/4 :: (server(), non_neg_integer(), file_num(), file_num()) ->
137 -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok').
138 -spec(client_init/2 :: (server(), binary()) -> client_msstate()).
139 -spec(client_terminate/1 :: (client_msstate()) -> 'ok').
140 -spec(client_delete_and_terminate/3 ::
141 (client_msstate(), server(), binary()) -> 'ok').
142 -spec(successfully_recovered_state/1 :: (server()) -> boolean()).
144 -spec(gc/3 :: (non_neg_integer(), non_neg_integer(),
145 {ets:tid(), file:filename(), atom(), any()}) ->
146 'concurrent_readers' | non_neg_integer()).
150 %%----------------------------------------------------------------------------
152 %% We run GC whenever (garbage / sum_file_size) > ?GARBAGE_FRACTION
153 %% It is not recommended to set this to < 0.5
154 -define(GARBAGE_FRACTION, 0.5).
158 %% Index: this is a mapping from Guid to #msg_location{}:
159 %% {Guid, RefCount, File, Offset, TotalSize}
160 %% By default, it's in ets, but it's also pluggable.
161 %% FileSummary: this is an ets table which maps File to #file_summary{}:
162 %% {File, ValidTotalSize, ContiguousTop, Left, Right,
163 %% FileSize, Locked, Readers}
165 %% The basic idea is that messages are appended to the current file up
166 %% until that file becomes too big (> file_size_limit). At that point,
167 %% the file is closed and a new file is created on the _right_ of the
168 %% old file which is used for new messages. Files are named
169 %% numerically ascending, thus the file with the lowest name is the
172 %% We need to keep track of which messages are in which files (this is
173 %% the Index); how much useful data is in each file and which files
174 %% are on the left and right of each other. This is the purpose of the
175 %% FileSummary ets table.
177 %% As messages are removed from files, holes appear in these
178 %% files. The field ValidTotalSize contains the total amount of useful
179 %% data left in the file, whilst ContiguousTop contains the amount of
180 %% valid data right at the start of each file. These are needed for
181 %% garbage collection.
183 %% When we discover that a file is now empty, we delete it. When we
184 %% discover that it can be combined with the useful data in either its
185 %% left or right neighbour, and overall, across all the files, we have
186 %% ((the amount of garbage) / (the sum of all file sizes)) >
187 %% ?GARBAGE_FRACTION, we start a garbage collection run concurrently,
188 %% which will compact the two files together. This keeps disk
189 %% utilisation high and aids performance. We deliberately do this
190 %% lazily in order to prevent doing GC on files which are soon to be
191 %% emptied (and hence deleted) soon.
193 %% Given the compaction between two files, the left file (i.e. elder
194 %% file) is considered the ultimate destination for the good data in
195 %% the right file. If necessary, the good data in the left file which
196 %% is fragmented throughout the file is written out to a temporary
197 %% file, then read back in to form a contiguous chunk of good data at
198 %% the start of the left file. Thus the left file is garbage collected
199 %% and compacted. Then the good data from the right file is copied
200 %% onto the end of the left file. Index and FileSummary tables are
203 %% On non-clean startup, we scan the files we discover, dealing with
204 %% the possibilites of a crash having occured during a compaction
205 %% (this consists of tidyup - the compaction is deliberately designed
206 %% such that data is duplicated on disk rather than risking it being
207 %% lost), and rebuild the FileSummary ets table and Index.
209 %% So, with this design, messages move to the left. Eventually, they
210 %% should end up in a contiguous block on the left and are then never
211 %% rewritten. But this isn't quite the case. If in a file there is one
212 %% message that is being ignored, for some reason, and messages in the
213 %% file to the right and in the current block are being read all the
214 %% time then it will repeatedly be the case that the good data from
215 %% both files can be combined and will be written out to a new
216 %% file. Whenever this happens, our shunned message will be rewritten.
218 %% So, provided that we combine messages in the right order,
219 %% (i.e. left file, bottom to top, right file, bottom to top),
220 %% eventually our shunned message will end up at the bottom of the
221 %% left file. The compaction/combining algorithm is smart enough to
222 %% read in good data from the left file that is scattered throughout
223 %% (i.e. C and D in the below diagram), then truncate the file to just
224 %% above B (i.e. truncate to the limit of the good contiguous region
225 %% at the start of the file), then write C and D on top and then write
226 %% E, F and G from the right file on top. Thus contiguous blocks of
227 %% good data at the bottom of files are not rewritten (yes, this is
228 %% the data the size of which is tracked by the ContiguousTop
229 %% variable. Judicious use of a mirror is required).
231 %% +-------+ +-------+ +-------+
233 %% +-------+ +-------+ +-------+
235 %% +-------+ +-------+ +-------+
237 %% +-------+ +-------+ +-------+
238 %% | C | | F | ===> | D |
239 %% +-------+ +-------+ +-------+
241 %% +-------+ +-------+ +-------+
243 %% +-------+ +-------+ +-------+
245 %% +-------+ +-------+ +-------+
248 %% From this reasoning, we do have a bound on the number of times the
249 %% message is rewritten. From when it is inserted, there can be no
250 %% files inserted between it and the head of the queue, and the worst
251 %% case is that everytime it is rewritten, it moves one position lower
252 %% in the file (for it to stay at the same position requires that
253 %% there are no holes beneath it, which means truncate would be used
254 %% and so it would not be rewritten at all). Thus this seems to
255 %% suggest the limit is the number of messages ahead of it in the
256 %% queue, though it's likely that that's pessimistic, given the
257 %% requirements for compaction/combination of files.
259 %% The other property is that we have is the bound on the lowest
260 %% utilisation, which should be 50% - worst case is that all files are
261 %% fractionally over half full and can't be combined (equivalent is
262 %% alternating full files and files with only one tiny message in
265 %% Messages are reference-counted. When a message with the same guid
266 %% is written several times we only store it once, and only remove it
267 %% from the store when it has been removed the same number of times.
269 %% The reference counts do not persist. Therefore the initialisation
270 %% function must be provided with a generator that produces ref count
271 %% deltas for all recovered messages. This is only used on startup
272 %% when the shutdown was non-clean.
274 %% Read messages with a reference count greater than one are entered
275 %% into a message cache. The purpose of the cache is not especially
276 %% performance, though it can help there too, but prevention of memory
277 %% explosion. It ensures that as messages with a high reference count
278 %% are read from several processes they are read back as the same
279 %% binary object rather than multiples of identical binary
282 %% Reads can be performed directly by clients without calling to the
283 %% server. This is safe because multiple file handles can be used to
284 %% read files. However, locking is used by the concurrent GC to make
285 %% sure that reads are not attempted from files which are in the
286 %% process of being garbage collected.
288 %% The server automatically defers reads, removes and contains calls
289 %% that occur which refer to files which are currently being
290 %% GC'd. Contains calls are only deferred in order to ensure they do
291 %% not overtake removes.
293 %% The current file to which messages are being written has a
294 %% write-back cache. This is written to immediately by clients and can
295 %% be read from by clients too. This means that there are only ever
296 %% writes made to the current file, thus eliminating delays due to
297 %% flushing write buffers in order to be able to safely read from the
298 %% current file. The one exception to this is that on start up, the
299 %% cache is not populated with msgs found in the current file, and
300 %% thus in this case only, reads may have to come from the file
301 %% itself. The effect of this is that even if the msg_store process is
302 %% heavily overloaded, clients can still write and read messages with
303 %% very low latency and not block at all.
305 %% For notes on Clean Shutdown and startup, see documentation in
308 %%----------------------------------------------------------------------------
310 %%----------------------------------------------------------------------------
312 start_link(Server, Dir, ClientRefs, StartupFunState) ->
313 gen_server2:start_link({local, Server}, ?MODULE,
314 [Server, Dir, ClientRefs, StartupFunState],
315 [{timeout, infinity}]).
317 write(Server, Guid, Msg,
318 CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
319 ok = update_msg_cache(CurFileCacheEts, Guid, Msg),
320 {gen_server2:cast(Server, {write, Guid}), CState}.
323 CState = #client_msstate { dedup_cache_ets = DedupCacheEts,
324 cur_file_cache_ets = CurFileCacheEts }) ->
325 %% 1. Check the dedup cache
326 case fetch_and_increment_cache(DedupCacheEts, Guid) of
328 %% 2. Check the cur file cache
329 case ets:lookup(CurFileCacheEts, Guid) of
331 Defer = fun() -> {gen_server2:pcall(
332 Server, 2, {read, Guid}, infinity),
334 case index_lookup(Guid, CState) of
335 not_found -> Defer();
336 MsgLocation -> client_read1(Server, MsgLocation, Defer,
339 [{Guid, Msg, _CacheRefCount}] ->
340 %% Although we've found it, we don't know the
341 %% refcount, so can't insert into dedup cache
348 contains(Server, Guid) -> gen_server2:call(Server, {contains, Guid}, infinity).
349 remove(_Server, []) -> ok;
350 remove(Server, Guids) -> gen_server2:cast(Server, {remove, Guids}).
351 release(_Server, []) -> ok;
352 release(Server, Guids) -> gen_server2:cast(Server, {release, Guids}).
353 sync(Server, Guids, K) -> gen_server2:cast(Server, {sync, Guids, K}).
354 sync(Server) -> gen_server2:pcast(Server, 8, sync). %% internal
356 gc_done(Server, Reclaimed, Source, Destination) ->
357 gen_server2:pcast(Server, 8, {gc_done, Reclaimed, Source, Destination}).
359 set_maximum_since_use(Server, Age) ->
360 gen_server2:pcast(Server, 8, {set_maximum_since_use, Age}).
362 client_init(Server, Ref) ->
363 {IState, IModule, Dir, GCPid,
364 FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} =
365 gen_server2:pcall(Server, 7, {new_client_state, Ref}, infinity),
366 #client_msstate { file_handle_cache = dict:new(),
367 index_state = IState,
368 index_module = IModule,
371 file_handles_ets = FileHandlesEts,
372 file_summary_ets = FileSummaryEts,
373 dedup_cache_ets = DedupCacheEts,
374 cur_file_cache_ets = CurFileCacheEts }.
376 client_terminate(CState) ->
377 close_all_handles(CState),
380 client_delete_and_terminate(CState, Server, Ref) ->
381 ok = client_terminate(CState),
382 ok = gen_server2:call(Server, {delete_client, Ref}, infinity).
384 successfully_recovered_state(Server) ->
385 gen_server2:pcall(Server, 7, successfully_recovered_state, infinity).
387 %%----------------------------------------------------------------------------
388 %% Client-side-only helpers
389 %%----------------------------------------------------------------------------
392 #msg_location { guid = Guid, file = File } = MsgLocation,
394 CState = #client_msstate { file_summary_ets = FileSummaryEts }) ->
395 case ets:lookup(FileSummaryEts, File) of
396 [] -> %% File has been GC'd and no longer exists. Go around again.
397 read(Server, Guid, CState);
398 [#file_summary { locked = Locked, right = Right }] ->
399 client_read2(Server, Locked, Right, MsgLocation, Defer, CState)
402 client_read2(_Server, false, undefined, _MsgLocation, Defer, _CState) ->
403 %% Although we've already checked both caches and not found the
404 %% message there, the message is apparently in the
405 %% current_file. We can only arrive here if we are trying to read
406 %% a message which we have not written, which is very odd, so just
409 %% OR, on startup, the cur_file_cache is not populated with the
410 %% contents of the current file, thus reads from the current file
411 %% will end up here and will need to be deferred.
413 client_read2(_Server, true, _Right, _MsgLocation, Defer, _CState) ->
414 %% Of course, in the mean time, the GC could have run and our msg
415 %% is actually in a different file, unlocked. However, defering is
416 %% the safest and simplest thing to do.
418 client_read2(Server, false, _Right,
419 MsgLocation = #msg_location { guid = Guid, file = File },
421 CState = #client_msstate { file_summary_ets = FileSummaryEts }) ->
422 %% It's entirely possible that everything we're doing from here on
423 %% is for the wrong file, or a non-existent file, as a GC may have
425 safe_ets_update_counter(
426 FileSummaryEts, File, {#file_summary.readers, +1},
427 fun (_) -> client_read3(Server, MsgLocation, Defer, CState) end,
428 fun () -> read(Server, Guid, CState) end).
430 client_read3(Server, #msg_location { guid = Guid, file = File }, Defer,
431 CState = #client_msstate { file_handles_ets = FileHandlesEts,
432 file_summary_ets = FileSummaryEts,
433 dedup_cache_ets = DedupCacheEts,
436 fun() -> ok = case ets:update_counter(FileSummaryEts, File,
437 {#file_summary.readers, -1}) of
438 0 -> case ets:lookup(FileSummaryEts, File) of
439 [#file_summary { locked = true }] ->
440 rabbit_msg_store_gc:no_readers(
447 %% If a GC involving the file hasn't already started, it won't
448 %% start now. Need to check again to see if we've been locked in
449 %% the meantime, between lookup and update_counter (thus GC
450 %% started before our +1. In fact, it could have finished by now
452 case ets:lookup(FileSummaryEts, File) of
453 [] -> %% GC has deleted our file, just go round again.
454 read(Server, Guid, CState);
455 [#file_summary { locked = true }] ->
456 %% If we get a badarg here, then the GC has finished and
457 %% deleted our file. Try going around again. Otherwise,
460 %% badarg scenario: we lookup, msg_store locks, GC starts,
461 %% GC ends, we +1 readers, msg_store ets:deletes (and
465 catch error:badarg -> read(Server, Guid, CState)
467 [#file_summary { locked = false }] ->
468 %% Ok, we're definitely safe to continue - a GC involving
469 %% the file cannot start up now, and isn't running, so
470 %% nothing will tell us from now on to close the handle if
471 %% it's already open.
473 %% Finally, we need to recheck that the msg is still at
474 %% the same place - it's possible an entire GC ran between
475 %% us doing the lookup and the +1 on the readers. (Same as
476 %% badarg scenario above, but we don't have a missing file
477 %% - we just have the /wrong/ file).
478 case index_lookup(Guid, CState) of
479 #msg_location { file = File } = MsgLocation ->
480 %% Still the same file.
481 mark_handle_open(FileHandlesEts, File),
483 CState1 = close_all_indicated(CState),
484 {Msg, CState2} = %% This will never be the current file
485 read_from_disk(MsgLocation, CState1, DedupCacheEts),
486 Release(), %% this MUST NOT fail with badarg
487 {{ok, Msg}, CState2};
488 MsgLocation -> %% different file!
489 Release(), %% this MUST NOT fail with badarg
490 client_read1(Server, MsgLocation, Defer, CState)
494 %%----------------------------------------------------------------------------
495 %% gen_server callbacks
496 %%----------------------------------------------------------------------------
498 init([Server, BaseDir, ClientRefs, StartupFunState]) ->
499 process_flag(trap_exit, true),
501 ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
504 Dir = filename:join(BaseDir, atom_to_list(Server)),
506 {ok, IndexModule} = application:get_env(msg_store_index_module),
507 rabbit_log:info("~w: using ~p to provide index~n", [Server, IndexModule]),
509 AttemptFileSummaryRecovery =
511 undefined -> ok = rabbit_misc:recursive_delete([Dir]),
512 ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
514 _ -> ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
515 recover_crashed_compactions(Dir)
518 %% if we found crashed compactions we trust neither the
519 %% file_summary nor the location index. Note the file_summary is
520 %% left empty here if it can't be recovered.
521 {FileSummaryRecovered, FileSummaryEts} =
522 recover_file_summary(AttemptFileSummaryRecovery, Dir),
524 {CleanShutdown, IndexState, ClientRefs1} =
525 recover_index_and_client_refs(IndexModule, FileSummaryRecovered,
526 ClientRefs, Dir, Server),
527 %% CleanShutdown => msg location index and file_summary both
528 %% recovered correctly.
529 true = case {FileSummaryRecovered, CleanShutdown} of
530 {true, false} -> ets:delete_all_objects(FileSummaryEts);
533 %% CleanShutdown <=> msg location index and file_summary both
534 %% recovered correctly.
536 DedupCacheEts = ets:new(rabbit_msg_store_dedup_cache, [set, public]),
537 FileHandlesEts = ets:new(rabbit_msg_store_shared_file_handles,
538 [ordered_set, public]),
539 CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]),
541 {ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit),
543 State = #msstate { dir = Dir,
544 index_module = IndexModule,
545 index_state = IndexState,
547 current_file_handle = undefined,
548 file_handle_cache = dict:new(),
550 sync_timer_ref = undefined,
553 pending_gc_completion = [],
556 file_handles_ets = FileHandlesEts,
557 file_summary_ets = FileSummaryEts,
558 dedup_cache_ets = DedupCacheEts,
559 cur_file_cache_ets = CurFileCacheEts,
560 client_refs = ClientRefs1,
561 successfully_recovered = CleanShutdown,
562 file_size_limit = FileSizeLimit
565 %% If we didn't recover the msg location index then we need to
567 {Offset, State1 = #msstate { current_file = CurFile }} =
568 build_index(CleanShutdown, StartupFunState, State),
570 %% read is only needed so that we can seek
571 {ok, CurHdl} = open_file(Dir, filenum_to_name(CurFile),
572 [read | ?WRITE_MODE]),
573 {ok, Offset} = file_handle_cache:position(CurHdl, Offset),
574 ok = file_handle_cache:truncate(CurHdl),
576 {ok, GCPid} = rabbit_msg_store_gc:start_link(Dir, IndexState, IndexModule,
580 State1 #msstate { current_file_handle = CurHdl, gc_pid = GCPid }),
582 {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
584 handle_call({read, Guid}, From, State) ->
585 State1 = read_message(Guid, From, State),
588 handle_call({contains, Guid}, From, State) ->
589 State1 = contains_message(Guid, From, State),
592 handle_call({new_client_state, CRef}, _From,
593 State = #msstate { dir = Dir,
594 index_state = IndexState,
595 index_module = IndexModule,
596 file_handles_ets = FileHandlesEts,
597 file_summary_ets = FileSummaryEts,
598 dedup_cache_ets = DedupCacheEts,
599 cur_file_cache_ets = CurFileCacheEts,
600 client_refs = ClientRefs,
602 reply({IndexState, IndexModule, Dir, GCPid,
603 FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts},
604 State #msstate { client_refs = sets:add_element(CRef, ClientRefs) });
606 handle_call(successfully_recovered_state, _From, State) ->
607 reply(State #msstate.successfully_recovered, State);
609 handle_call({delete_client, CRef}, _From,
610 State = #msstate { client_refs = ClientRefs }) ->
612 State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }).
614 handle_cast({write, Guid},
615 State = #msstate { current_file_handle = CurHdl,
616 current_file = CurFile,
617 sum_valid_data = SumValid,
618 sum_file_size = SumFileSize,
619 file_summary_ets = FileSummaryEts,
620 cur_file_cache_ets = CurFileCacheEts }) ->
621 true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}),
622 [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid),
623 case index_lookup(Guid, State) of
625 %% New message, lots to do
626 {ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl),
627 {ok, TotalSize} = rabbit_msg_file:append(CurHdl, Guid, Msg),
628 ok = index_insert(#msg_location {
629 guid = Guid, ref_count = 1, file = CurFile,
630 offset = CurOffset, total_size = TotalSize },
632 [#file_summary { valid_total_size = ValidTotalSize,
633 contiguous_top = ContiguousTop,
636 file_size = FileSize }] =
637 ets:lookup(FileSummaryEts, CurFile),
638 ValidTotalSize1 = ValidTotalSize + TotalSize,
639 ContiguousTop1 = case CurOffset =:= ContiguousTop of
640 true -> ValidTotalSize1;
641 false -> ContiguousTop
643 true = ets:update_element(
644 FileSummaryEts, CurFile,
645 [{#file_summary.valid_total_size, ValidTotalSize1},
646 {#file_summary.contiguous_top, ContiguousTop1},
647 {#file_summary.file_size, FileSize + TotalSize}]),
648 NextOffset = CurOffset + TotalSize,
650 maybe_roll_to_new_file(
651 NextOffset, State #msstate {
652 sum_valid_data = SumValid + TotalSize,
653 sum_file_size = SumFileSize + TotalSize }));
654 #msg_location { ref_count = RefCount } ->
655 %% We already know about it, just update counter. Only
656 %% update field otherwise bad interaction with concurrent GC
657 ok = index_update_fields(Guid,
658 {#msg_location.ref_count, RefCount + 1},
663 handle_cast({remove, Guids}, State) ->
664 State1 = lists:foldl(
665 fun (Guid, State2) -> remove_message(Guid, State2) end,
667 noreply(maybe_compact(State1));
669 handle_cast({release, Guids}, State =
670 #msstate { dedup_cache_ets = DedupCacheEts }) ->
672 fun (Guid) -> decrement_cache(DedupCacheEts, Guid) end, Guids),
675 handle_cast({sync, Guids, K},
676 State = #msstate { current_file = CurFile,
677 current_file_handle = CurHdl,
678 on_sync = Syncs }) ->
679 {ok, SyncOffset} = file_handle_cache:last_sync_offset(CurHdl),
680 case lists:any(fun (Guid) ->
681 #msg_location { file = File, offset = Offset } =
682 index_lookup(Guid, State),
683 File =:= CurFile andalso Offset >= SyncOffset
687 true -> noreply(State #msstate { on_sync = [K | Syncs] })
690 handle_cast(sync, State) ->
691 noreply(internal_sync(State));
693 handle_cast({gc_done, Reclaimed, Src, Dst},
694 State = #msstate { sum_file_size = SumFileSize,
695 gc_active = {Src, Dst},
696 file_handles_ets = FileHandlesEts,
697 file_summary_ets = FileSummaryEts }) ->
698 %% GC done, so now ensure that any clients that have open fhs to
699 %% those files close them before using them again. This has to be
700 %% done here (given it's done in the msg_store, and not the gc),
701 %% and not when starting up the GC, because if done when starting
702 %% up the GC, the client could find the close, and close and
703 %% reopen the fh, whilst the GC is waiting for readers to
704 %% disappear, before it's actually done the GC.
705 true = mark_handle_to_close(FileHandlesEts, Src),
706 true = mark_handle_to_close(FileHandlesEts, Dst),
707 %% we always move data left, so Src has gone and was on the
708 %% right, so need to make dest = source.right.left, and also
709 %% dest.right = source.right
710 [#file_summary { left = Dst,
713 readers = 0 }] = ets:lookup(FileSummaryEts, Src),
714 %% this could fail if SrcRight =:= undefined
715 ets:update_element(FileSummaryEts, SrcRight, {#file_summary.left, Dst}),
716 true = ets:update_element(FileSummaryEts, Dst,
717 [{#file_summary.locked, false},
718 {#file_summary.right, SrcRight}]),
719 true = ets:delete(FileSummaryEts, Src),
721 maybe_compact(run_pending(
722 State #msstate { sum_file_size = SumFileSize - Reclaimed,
723 gc_active = false })));
725 handle_cast({set_maximum_since_use, Age}, State) ->
726 ok = file_handle_cache:set_maximum_since_use(Age),
729 handle_info(timeout, State) ->
730 noreply(internal_sync(State));
732 handle_info({'EXIT', _Pid, Reason}, State) ->
733 {stop, Reason, State}.
735 terminate(_Reason, State = #msstate { index_state = IndexState,
736 index_module = IndexModule,
737 current_file_handle = CurHdl,
739 file_handles_ets = FileHandlesEts,
740 file_summary_ets = FileSummaryEts,
741 dedup_cache_ets = DedupCacheEts,
742 cur_file_cache_ets = CurFileCacheEts,
743 client_refs = ClientRefs,
745 %% stop the gc first, otherwise it could be working and we pull
746 %% out the ets tables from under it.
747 ok = rabbit_msg_store_gc:stop(GCPid),
748 State1 = case CurHdl of
750 _ -> State2 = internal_sync(State),
751 file_handle_cache:close(CurHdl),
754 State3 = close_all_handles(State1),
755 store_file_summary(FileSummaryEts, Dir),
757 T <- [FileSummaryEts, DedupCacheEts, FileHandlesEts, CurFileCacheEts]],
758 IndexModule:terminate(IndexState),
759 store_recovery_terms([{client_refs, sets:to_list(ClientRefs)},
760 {index_module, IndexModule}], Dir),
761 State3 #msstate { index_state = undefined,
762 current_file_handle = undefined }.
764 code_change(_OldVsn, State, _Extra) ->
767 %%----------------------------------------------------------------------------
768 %% general helper functions
769 %%----------------------------------------------------------------------------
772 {State1, Timeout} = next_state(State),
773 {noreply, State1, Timeout}.
775 reply(Reply, State) ->
776 {State1, Timeout} = next_state(State),
777 {reply, Reply, State1, Timeout}.
779 next_state(State = #msstate { on_sync = [], sync_timer_ref = undefined }) ->
781 next_state(State = #msstate { sync_timer_ref = undefined }) ->
782 {start_sync_timer(State), 0};
783 next_state(State = #msstate { on_sync = [] }) ->
784 {stop_sync_timer(State), hibernate};
788 start_sync_timer(State = #msstate { sync_timer_ref = undefined }) ->
789 {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, sync, [self()]),
790 State #msstate { sync_timer_ref = TRef }.
792 stop_sync_timer(State = #msstate { sync_timer_ref = undefined }) ->
794 stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) ->
795 {ok, cancel} = timer:cancel(TRef),
796 State #msstate { sync_timer_ref = undefined }.
798 internal_sync(State = #msstate { current_file_handle = CurHdl,
799 on_sync = Syncs }) ->
800 State1 = stop_sync_timer(State),
803 _ -> ok = file_handle_cache:sync(CurHdl),
804 lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)),
805 State1 #msstate { on_sync = [] }
808 read_message(Guid, From,
809 State = #msstate { dedup_cache_ets = DedupCacheEts }) ->
810 case index_lookup(Guid, State) of
812 gen_server2:reply(From, not_found),
815 case fetch_and_increment_cache(DedupCacheEts, Guid) of
816 not_found -> read_message1(From, MsgLocation, State);
817 Msg -> gen_server2:reply(From, {ok, Msg}),
822 read_message1(From, #msg_location { guid = Guid, ref_count = RefCount,
823 file = File, offset = Offset } = MsgLoc,
824 State = #msstate { current_file = CurFile,
825 current_file_handle = CurHdl,
826 file_summary_ets = FileSummaryEts,
827 dedup_cache_ets = DedupCacheEts,
828 cur_file_cache_ets = CurFileCacheEts }) ->
829 case File =:= CurFile of
830 true -> {Msg, State1} =
831 %% can return [] if msg in file existed on startup
832 case ets:lookup(CurFileCacheEts, Guid) of
835 file_handle_cache:current_raw_offset(CurHdl),
836 ok = case Offset >= RawOffSet of
837 true -> file_handle_cache:flush(CurHdl);
840 read_from_disk(MsgLoc, State, DedupCacheEts);
841 [{Guid, Msg1, _CacheRefCount}] ->
842 ok = maybe_insert_into_cache(
843 DedupCacheEts, RefCount, Guid, Msg1),
846 gen_server2:reply(From, {ok, Msg}),
848 false -> [#file_summary { locked = Locked }] =
849 ets:lookup(FileSummaryEts, File),
851 true -> add_to_pending_gc_completion({read, Guid, From},
853 false -> {Msg, State1} =
854 read_from_disk(MsgLoc, State, DedupCacheEts),
855 gen_server2:reply(From, {ok, Msg}),
860 read_from_disk(#msg_location { guid = Guid, ref_count = RefCount,
861 file = File, offset = Offset,
862 total_size = TotalSize },
863 State, DedupCacheEts) ->
864 {Hdl, State1} = get_read_handle(File, State),
865 {ok, Offset} = file_handle_cache:position(Hdl, Offset),
867 case rabbit_msg_file:read(Hdl, TotalSize) of
868 {ok, {Guid, _}} = Obj ->
871 {error, {misread, [{old_state, State},
879 ok = maybe_insert_into_cache(DedupCacheEts, RefCount, Guid, Msg),
882 contains_message(Guid, From, State = #msstate { gc_active = GCActive }) ->
883 case index_lookup(Guid, State) of
885 gen_server2:reply(From, false),
887 #msg_location { file = File } ->
889 {A, B} when File =:= A orelse File =:= B ->
890 add_to_pending_gc_completion(
891 {contains, Guid, From}, State);
893 gen_server2:reply(From, true),
898 remove_message(Guid, State = #msstate { sum_valid_data = SumValid,
899 file_summary_ets = FileSummaryEts,
900 dedup_cache_ets = DedupCacheEts }) ->
901 #msg_location { ref_count = RefCount, file = File,
902 offset = Offset, total_size = TotalSize } =
903 index_lookup(Guid, State),
906 %% don't remove from CUR_FILE_CACHE_ETS_NAME here because
907 %% there may be further writes in the mailbox for the same
909 ok = remove_cache_entry(DedupCacheEts, Guid),
910 [#file_summary { valid_total_size = ValidTotalSize,
911 contiguous_top = ContiguousTop,
913 ets:lookup(FileSummaryEts, File),
916 add_to_pending_gc_completion({remove, Guid}, State);
918 ok = index_delete(Guid, State),
919 ContiguousTop1 = lists:min([ContiguousTop, Offset]),
920 ValidTotalSize1 = ValidTotalSize - TotalSize,
921 true = ets:update_element(
922 FileSummaryEts, File,
923 [{#file_summary.valid_total_size, ValidTotalSize1},
924 {#file_summary.contiguous_top, ContiguousTop1}]),
925 State1 = delete_file_if_empty(File, State),
926 State1 #msstate { sum_valid_data = SumValid - TotalSize }
928 _ when 1 < RefCount ->
929 ok = decrement_cache(DedupCacheEts, Guid),
930 %% only update field, otherwise bad interaction with concurrent GC
931 ok = index_update_fields(Guid,
932 {#msg_location.ref_count, RefCount - 1},
937 add_to_pending_gc_completion(
938 Op, State = #msstate { pending_gc_completion = Pending }) ->
939 State #msstate { pending_gc_completion = [Op | Pending] }.
941 run_pending(State = #msstate { pending_gc_completion = [] }) ->
943 run_pending(State = #msstate { pending_gc_completion = Pending }) ->
944 State1 = State #msstate { pending_gc_completion = [] },
945 lists:foldl(fun run_pending/2, State1, lists:reverse(Pending)).
947 run_pending({read, Guid, From}, State) ->
948 read_message(Guid, From, State);
949 run_pending({contains, Guid, From}, State) ->
950 contains_message(Guid, From, State);
951 run_pending({remove, Guid}, State) ->
952 remove_message(Guid, State).
954 safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) ->
956 SuccessFun(ets:update_counter(Tab, Key, UpdateOp))
957 catch error:badarg -> FailThunk()
960 safe_ets_update_counter_ok(Tab, Key, UpdateOp, FailThunk) ->
961 safe_ets_update_counter(Tab, Key, UpdateOp, fun (_) -> ok end, FailThunk).
963 %%----------------------------------------------------------------------------
964 %% file helper functions
965 %%----------------------------------------------------------------------------
967 open_file(Dir, FileName, Mode) ->
968 file_handle_cache:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode,
969 [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]).
971 close_handle(Key, CState = #client_msstate { file_handle_cache = FHC }) ->
972 CState #client_msstate { file_handle_cache = close_handle(Key, FHC) };
974 close_handle(Key, State = #msstate { file_handle_cache = FHC }) ->
975 State #msstate { file_handle_cache = close_handle(Key, FHC) };
977 close_handle(Key, FHC) ->
978 case dict:find(Key, FHC) of
979 {ok, Hdl} -> ok = file_handle_cache:close(Hdl),
980 dict:erase(Key, FHC);
984 mark_handle_open(FileHandlesEts, File) ->
985 %% This is fine to fail (already exists)
986 ets:insert_new(FileHandlesEts, {{self(), File}, open}),
989 mark_handle_to_close(FileHandlesEts, File) ->
990 [ ets:update_element(FileHandlesEts, Key, {2, close})
991 || {Key, open} <- ets:match_object(FileHandlesEts, {{'_', File}, open}) ],
994 close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts } =
996 Objs = ets:match_object(FileHandlesEts, {{self(), '_'}, close}),
997 lists:foldl(fun ({Key = {_Self, File}, close}, CStateM) ->
998 true = ets:delete(FileHandlesEts, Key),
999 close_handle(File, CStateM)
1002 close_all_handles(CState = #client_msstate { file_handles_ets = FileHandlesEts,
1003 file_handle_cache = FHC }) ->
1005 ok = dict:fold(fun (File, Hdl, ok) ->
1006 true = ets:delete(FileHandlesEts, {Self, File}),
1007 file_handle_cache:close(Hdl)
1009 CState #client_msstate { file_handle_cache = dict:new() };
1011 close_all_handles(State = #msstate { file_handle_cache = FHC }) ->
1012 ok = dict:fold(fun (_Key, Hdl, ok) -> file_handle_cache:close(Hdl) end,
1014 State #msstate { file_handle_cache = dict:new() }.
1016 get_read_handle(FileNum, CState = #client_msstate { file_handle_cache = FHC,
1018 {Hdl, FHC2} = get_read_handle(FileNum, FHC, Dir),
1019 {Hdl, CState #client_msstate { file_handle_cache = FHC2 }};
1021 get_read_handle(FileNum, State = #msstate { file_handle_cache = FHC,
1023 {Hdl, FHC2} = get_read_handle(FileNum, FHC, Dir),
1024 {Hdl, State #msstate { file_handle_cache = FHC2 }}.
1026 get_read_handle(FileNum, FHC, Dir) ->
1027 case dict:find(FileNum, FHC) of
1028 {ok, Hdl} -> {Hdl, FHC};
1029 error -> {ok, Hdl} = open_file(Dir, filenum_to_name(FileNum),
1031 {Hdl, dict:store(FileNum, Hdl, FHC)}
1034 preallocate(Hdl, FileSizeLimit, FinalPos) ->
1035 {ok, FileSizeLimit} = file_handle_cache:position(Hdl, FileSizeLimit),
1036 ok = file_handle_cache:truncate(Hdl),
1037 {ok, FinalPos} = file_handle_cache:position(Hdl, FinalPos),
1040 truncate_and_extend_file(Hdl, Lowpoint, Highpoint) ->
1041 {ok, Lowpoint} = file_handle_cache:position(Hdl, Lowpoint),
1042 ok = file_handle_cache:truncate(Hdl),
1043 ok = preallocate(Hdl, Highpoint, Lowpoint).
1045 form_filename(Dir, Name) -> filename:join(Dir, Name).
1047 filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION.
1049 filename_to_num(FileName) -> list_to_integer(filename:rootname(FileName)).
1051 list_sorted_file_names(Dir, Ext) ->
1052 lists:sort(fun (A, B) -> filename_to_num(A) < filename_to_num(B) end,
1053 filelib:wildcard("*" ++ Ext, Dir)).
1055 %%----------------------------------------------------------------------------
1056 %% message cache helper functions
1057 %%----------------------------------------------------------------------------
1059 maybe_insert_into_cache(DedupCacheEts, RefCount, Guid, Msg)
1060 when RefCount > 1 ->
1061 update_msg_cache(DedupCacheEts, Guid, Msg);
1062 maybe_insert_into_cache(_DedupCacheEts, _RefCount, _Guid, _Msg) ->
1065 update_msg_cache(CacheEts, Guid, Msg) ->
1066 case ets:insert_new(CacheEts, {Guid, Msg, 1}) of
1068 false -> safe_ets_update_counter_ok(
1069 CacheEts, Guid, {3, +1},
1070 fun () -> update_msg_cache(CacheEts, Guid, Msg) end)
1073 remove_cache_entry(DedupCacheEts, Guid) ->
1074 true = ets:delete(DedupCacheEts, Guid),
1077 fetch_and_increment_cache(DedupCacheEts, Guid) ->
1078 case ets:lookup(DedupCacheEts, Guid) of
1081 [{_Guid, Msg, _RefCount}] ->
1082 safe_ets_update_counter_ok(
1083 DedupCacheEts, Guid, {3, +1},
1084 %% someone has deleted us in the meantime, insert us
1085 fun () -> ok = update_msg_cache(DedupCacheEts, Guid, Msg) end),
1089 decrement_cache(DedupCacheEts, Guid) ->
1090 true = safe_ets_update_counter(
1091 DedupCacheEts, Guid, {3, -1},
1092 fun (N) when N =< 0 -> true = ets:delete(DedupCacheEts, Guid);
1095 %% Guid is not in there because although it's been
1096 %% delivered, it's never actually been read (think:
1097 %% persistent message held in RAM)
1098 fun () -> true end),
1101 %%----------------------------------------------------------------------------
1103 %%----------------------------------------------------------------------------
1105 index_lookup(Key, #client_msstate { index_module = Index,
1106 index_state = State }) ->
1107 Index:lookup(Key, State);
1109 index_lookup(Key, #msstate { index_module = Index, index_state = State }) ->
1110 Index:lookup(Key, State).
1112 index_insert(Obj, #msstate { index_module = Index, index_state = State }) ->
1113 Index:insert(Obj, State).
1115 index_update(Obj, #msstate { index_module = Index, index_state = State }) ->
1116 Index:update(Obj, State).
1118 index_update_fields(Key, Updates, #msstate { index_module = Index,
1119 index_state = State }) ->
1120 Index:update_fields(Key, Updates, State).
1122 index_delete(Key, #msstate { index_module = Index, index_state = State }) ->
1123 Index:delete(Key, State).
1125 index_delete_by_file(File, #msstate { index_module = Index,
1126 index_state = State }) ->
1127 Index:delete_by_file(File, State).
1129 %%----------------------------------------------------------------------------
1130 %% shutdown and recovery
1131 %%----------------------------------------------------------------------------
1133 recover_index_and_client_refs(IndexModule, _Recover, undefined, Dir, _Server) ->
1134 {false, IndexModule:new(Dir), sets:new()};
1135 recover_index_and_client_refs(IndexModule, false, _ClientRefs, Dir, Server) ->
1136 rabbit_log:warning("~w: rebuilding indices from scratch~n", [Server]),
1137 {false, IndexModule:new(Dir), sets:new()};
1138 recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Server) ->
1139 Fresh = fun (ErrorMsg, ErrorArgs) ->
1140 rabbit_log:warning("~w: " ++ ErrorMsg ++ "~n"
1141 "rebuilding indices from scratch~n",
1142 [Server | ErrorArgs]),
1143 {false, IndexModule:new(Dir), sets:new()}
1145 case read_recovery_terms(Dir) of
1147 Fresh("failed to read recovery terms: ~p", [Error]);
1149 RecClientRefs = proplists:get_value(client_refs, Terms, []),
1150 RecIndexModule = proplists:get_value(index_module, Terms),
1151 case (lists:sort(ClientRefs) =:= lists:sort(RecClientRefs)
1152 andalso IndexModule =:= RecIndexModule) of
1153 true -> case IndexModule:recover(Dir) of
1154 {ok, IndexState1} ->
1156 sets:from_list(ClientRefs)};
1158 Fresh("failed to recover index: ~p", [Error])
1160 false -> Fresh("recovery terms differ from present", [])
1164 store_recovery_terms(Terms, Dir) ->
1165 rabbit_misc:write_term_file(filename:join(Dir, ?CLEAN_FILENAME), Terms).
1167 read_recovery_terms(Dir) ->
1168 Path = filename:join(Dir, ?CLEAN_FILENAME),
1169 case rabbit_misc:read_term_file(Path) of
1170 {ok, Terms} -> case file:delete(Path) of
1171 ok -> {true, Terms};
1172 {error, Error} -> {false, Error}
1174 {error, Error} -> {false, Error}
1177 store_file_summary(Tid, Dir) ->
1178 ok = ets:tab2file(Tid, filename:join(Dir, ?FILE_SUMMARY_FILENAME),
1179 [{extended_info, [object_count]}]).
1181 recover_file_summary(false, _Dir) ->
1182 %% TODO: the only reason for this to be an *ordered*_set is so
1183 %% that a) maybe_compact can start a traversal from the eldest
1184 %% file, and b) build_index in fast recovery mode can easily
1185 %% identify the current file. It's awkward to have both that
1186 %% odering and the left/right pointers in the entries - replacing
1187 %% the former with some additional bit of state would be easy, but
1188 %% ditching the latter would be neater.
1189 {false, ets:new(rabbit_msg_store_file_summary,
1190 [ordered_set, public, {keypos, #file_summary.file}])};
1191 recover_file_summary(true, Dir) ->
1192 Path = filename:join(Dir, ?FILE_SUMMARY_FILENAME),
1193 case ets:file2tab(Path) of
1194 {ok, Tid} -> file:delete(Path),
1196 {error, _Error} -> recover_file_summary(false, Dir)
1199 count_msg_refs(Gen, Seed, State) ->
1204 count_msg_refs(Gen, Next, State);
1205 {Guid, Delta, Next} ->
1206 ok = case index_lookup(Guid, State) of
1208 index_insert(#msg_location { guid = Guid,
1210 ref_count = Delta },
1212 #msg_location { ref_count = RefCount } = StoreEntry ->
1213 NewRefCount = RefCount + Delta,
1215 0 -> index_delete(Guid, State);
1216 _ -> index_update(StoreEntry #msg_location {
1217 ref_count = NewRefCount },
1221 count_msg_refs(Gen, Next, State)
1224 recover_crashed_compactions(Dir) ->
1225 FileNames = list_sorted_file_names(Dir, ?FILE_EXTENSION),
1226 TmpFileNames = list_sorted_file_names(Dir, ?FILE_EXTENSION_TMP),
1228 fun (TmpFileName) ->
1229 NonTmpRelatedFileName =
1230 filename:rootname(TmpFileName) ++ ?FILE_EXTENSION,
1231 true = lists:member(NonTmpRelatedFileName, FileNames),
1232 ok = recover_crashed_compaction(
1233 Dir, TmpFileName, NonTmpRelatedFileName)
1237 recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName) ->
1238 %% Because a msg can legitimately appear multiple times in the
1239 %% same file, identifying the contents of the tmp file and where
1240 %% they came from is non-trivial. If we are recovering a crashed
1241 %% compaction then we will be rebuilding the index, which can cope
1242 %% with duplicates appearing. Thus the simplest and safest thing
1243 %% to do is to append the contents of the tmp file to its main
1245 {ok, TmpHdl} = open_file(Dir, TmpFileName, ?READ_MODE),
1246 {ok, MainHdl} = open_file(Dir, NonTmpRelatedFileName,
1247 ?READ_MODE ++ ?WRITE_MODE),
1248 {ok, _End} = file_handle_cache:position(MainHdl, eof),
1249 Size = filelib:file_size(form_filename(Dir, TmpFileName)),
1250 {ok, Size} = file_handle_cache:copy(TmpHdl, MainHdl, Size),
1251 ok = file_handle_cache:close(MainHdl),
1252 ok = file_handle_cache:delete(TmpHdl),
1255 scan_file_for_valid_messages(Dir, FileName) ->
1256 case open_file(Dir, FileName, ?READ_MODE) of
1257 {ok, Hdl} -> Valid = rabbit_msg_file:scan(
1258 Hdl, filelib:file_size(
1259 form_filename(Dir, FileName))),
1260 %% if something really bad has happened,
1261 %% the close could fail, but ignore
1262 file_handle_cache:close(Hdl),
1264 {error, enoent} -> {ok, [], 0};
1265 {error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}}
1268 %% Takes the list in *ascending* order (i.e. eldest message
1269 %% first). This is the opposite of what scan_file_for_valid_messages
1270 %% produces. The list of msgs that is produced is youngest first.
1271 find_contiguous_block_prefix(L) -> find_contiguous_block_prefix(L, 0, []).
1273 find_contiguous_block_prefix([], ExpectedOffset, Guids) ->
1274 {ExpectedOffset, Guids};
1275 find_contiguous_block_prefix([{Guid, TotalSize, ExpectedOffset} | Tail],
1276 ExpectedOffset, Guids) ->
1277 ExpectedOffset1 = ExpectedOffset + TotalSize,
1278 find_contiguous_block_prefix(Tail, ExpectedOffset1, [Guid | Guids]);
1279 find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, Guids) ->
1280 {ExpectedOffset, Guids}.
1282 build_index(true, _StartupFunState,
1283 State = #msstate { file_summary_ets = FileSummaryEts }) ->
1285 fun (#file_summary { valid_total_size = ValidTotalSize,
1286 file_size = FileSize,
1288 {_Offset, State1 = #msstate { sum_valid_data = SumValid,
1289 sum_file_size = SumFileSize }}) ->
1290 {FileSize, State1 #msstate {
1291 sum_valid_data = SumValid + ValidTotalSize,
1292 sum_file_size = SumFileSize + FileSize,
1293 current_file = File }}
1294 end, {0, State}, FileSummaryEts);
1295 build_index(false, {MsgRefDeltaGen, MsgRefDeltaGenInit},
1296 State = #msstate { dir = Dir }) ->
1297 ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State),
1298 {ok, Pid} = gatherer:start_link(),
1299 case [filename_to_num(FileName) ||
1300 FileName <- list_sorted_file_names(Dir, ?FILE_EXTENSION)] of
1301 [] -> build_index(Pid, undefined, [State #msstate.current_file],
1303 Files -> {Offset, State1} = build_index(Pid, undefined, Files, State),
1304 {Offset, lists:foldl(fun delete_file_if_empty/2,
1308 build_index(Gatherer, Left, [],
1309 State = #msstate { file_summary_ets = FileSummaryEts,
1310 sum_valid_data = SumValid,
1311 sum_file_size = SumFileSize }) ->
1312 case gatherer:out(Gatherer) of
1314 ok = gatherer:stop(Gatherer),
1315 ok = rabbit_misc:unlink_and_capture_exit(Gatherer),
1316 ok = index_delete_by_file(undefined, State),
1317 Offset = case ets:lookup(FileSummaryEts, Left) of
1319 [#file_summary { file_size = FileSize }] -> FileSize
1321 {Offset, State #msstate { current_file = Left }};
1322 {value, #file_summary { valid_total_size = ValidTotalSize,
1323 file_size = FileSize } = FileSummary} ->
1324 true = ets:insert_new(FileSummaryEts, FileSummary),
1325 build_index(Gatherer, Left, [],
1327 sum_valid_data = SumValid + ValidTotalSize,
1328 sum_file_size = SumFileSize + FileSize })
1330 build_index(Gatherer, Left, [File|Files], State) ->
1331 ok = gatherer:fork(Gatherer),
1332 ok = worker_pool:submit_async(
1333 fun () -> build_index_worker(Gatherer, State,
1336 build_index(Gatherer, File, Files, State).
1338 build_index_worker(Gatherer, State = #msstate { dir = Dir },
1339 Left, File, Files) ->
1340 {ok, Messages, FileSize} =
1341 scan_file_for_valid_messages(Dir, filenum_to_name(File)),
1342 {ValidMessages, ValidTotalSize} =
1344 fun (Obj = {Guid, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
1345 case index_lookup(Guid, State) of
1346 #msg_location { file = undefined } = StoreEntry ->
1347 ok = index_update(StoreEntry #msg_location {
1348 file = File, offset = Offset,
1349 total_size = TotalSize },
1351 {[Obj | VMAcc], VTSAcc + TotalSize};
1355 end, {[], 0}, Messages),
1356 %% foldl reverses lists, find_contiguous_block_prefix needs
1357 %% msgs eldest first, so, ValidMessages is the right way round
1358 {ContiguousTop, _} = find_contiguous_block_prefix(ValidMessages),
1359 {Right, FileSize1} =
1361 %% if it's the last file, we'll truncate to remove any
1362 %% rubbish above the last valid message. This affects the
1364 [] -> {undefined, case ValidMessages of
1366 _ -> {_Guid, TotalSize, Offset} =
1367 lists:last(ValidMessages),
1370 [F|_] -> {F, FileSize}
1372 ok = gatherer:in(Gatherer, #file_summary {
1374 valid_total_size = ValidTotalSize,
1375 contiguous_top = ContiguousTop,
1378 file_size = FileSize1,
1381 ok = gatherer:finish(Gatherer).
1383 %%----------------------------------------------------------------------------
1384 %% garbage collection / compaction / aggregation -- internal
1385 %%----------------------------------------------------------------------------
1387 maybe_roll_to_new_file(
1389 State = #msstate { dir = Dir,
1390 current_file_handle = CurHdl,
1391 current_file = CurFile,
1392 file_summary_ets = FileSummaryEts,
1393 cur_file_cache_ets = CurFileCacheEts,
1394 file_size_limit = FileSizeLimit })
1395 when Offset >= FileSizeLimit ->
1396 State1 = internal_sync(State),
1397 ok = file_handle_cache:close(CurHdl),
1398 NextFile = CurFile + 1,
1399 {ok, NextHdl} = open_file(Dir, filenum_to_name(NextFile), ?WRITE_MODE),
1400 true = ets:insert_new(FileSummaryEts, #file_summary {
1402 valid_total_size = 0,
1409 true = ets:update_element(FileSummaryEts, CurFile,
1410 {#file_summary.right, NextFile}),
1411 true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}),
1412 maybe_compact(State1 #msstate { current_file_handle = NextHdl,
1413 current_file = NextFile });
1414 maybe_roll_to_new_file(_, State) ->
1417 maybe_compact(State = #msstate { sum_valid_data = SumValid,
1418 sum_file_size = SumFileSize,
1421 file_summary_ets = FileSummaryEts,
1422 file_size_limit = FileSizeLimit })
1423 when (SumFileSize > 2 * FileSizeLimit andalso
1424 (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION) ->
1425 %% TODO: the algorithm here is sub-optimal - it may result in a
1426 %% complete traversal of FileSummaryEts.
1427 case ets:first(FileSummaryEts) of
1431 case find_files_to_gc(FileSummaryEts, FileSizeLimit,
1432 ets:lookup(FileSummaryEts, First)) of
1436 State1 = close_handle(Src, close_handle(Dst, State)),
1437 true = ets:update_element(FileSummaryEts, Src,
1438 {#file_summary.locked, true}),
1439 true = ets:update_element(FileSummaryEts, Dst,
1440 {#file_summary.locked, true}),
1441 ok = rabbit_msg_store_gc:gc(GCPid, Src, Dst),
1442 State1 #msstate { gc_active = {Src, Dst} }
1445 maybe_compact(State) ->
1448 find_files_to_gc(FileSummaryEts, FileSizeLimit,
1449 [#file_summary { file = Dst,
1450 valid_total_size = DstValid,
1456 [#file_summary { file = Src,
1457 valid_total_size = SrcValid,
1459 right = SrcRight }] = Next =
1460 ets:lookup(FileSummaryEts, Src),
1462 undefined -> not_found;
1463 _ -> case DstValid + SrcValid =< FileSizeLimit of
1465 false -> find_files_to_gc(
1466 FileSummaryEts, FileSizeLimit, Next)
1471 delete_file_if_empty(File, State = #msstate { current_file = File }) ->
1473 delete_file_if_empty(File, State = #msstate {
1475 sum_file_size = SumFileSize,
1476 file_handles_ets = FileHandlesEts,
1477 file_summary_ets = FileSummaryEts }) ->
1478 [#file_summary { valid_total_size = ValidData,
1481 file_size = FileSize,
1483 ets:lookup(FileSummaryEts, File),
1485 %% we should NEVER find the current file in here hence right
1486 %% should always be a file, not undefined
1487 0 -> case {Left, Right} of
1488 {undefined, _} when Right =/= undefined ->
1489 %% the eldest file is empty.
1490 true = ets:update_element(
1491 FileSummaryEts, Right,
1492 {#file_summary.left, undefined});
1493 {_, _} when Right =/= undefined ->
1494 true = ets:update_element(FileSummaryEts, Right,
1495 {#file_summary.left, Left}),
1496 true = ets:update_element(FileSummaryEts, Left,
1497 {#file_summary.right, Right})
1499 true = mark_handle_to_close(FileHandlesEts, File),
1500 true = ets:delete(FileSummaryEts, File),
1501 State1 = close_handle(File, State),
1502 ok = file:delete(form_filename(Dir, filenum_to_name(File))),
1503 State1 #msstate { sum_file_size = SumFileSize - FileSize };
1507 %%----------------------------------------------------------------------------
1508 %% garbage collection / compaction / aggregation -- external
1509 %%----------------------------------------------------------------------------
1511 gc(SrcFile, DstFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) ->
1512 [SrcObj = #file_summary {
1513 readers = SrcReaders,
1515 file_size = SrcFileSize,
1516 locked = true }] = ets:lookup(FileSummaryEts, SrcFile),
1517 [DstObj = #file_summary {
1518 readers = DstReaders,
1520 file_size = DstFileSize,
1521 locked = true }] = ets:lookup(FileSummaryEts, DstFile),
1523 case SrcReaders =:= 0 andalso DstReaders =:= 0 of
1524 true -> TotalValidData = combine_files(SrcObj, DstObj, State),
1525 %% don't update dest.right, because it could be
1526 %% changing at the same time
1527 true = ets:update_element(
1528 FileSummaryEts, DstFile,
1529 [{#file_summary.valid_total_size, TotalValidData},
1530 {#file_summary.contiguous_top, TotalValidData},
1531 {#file_summary.file_size, TotalValidData}]),
1532 SrcFileSize + DstFileSize - TotalValidData;
1533 false -> concurrent_readers
1536 combine_files(#file_summary { file = Source,
1537 valid_total_size = SourceValid,
1538 left = Destination },
1539 #file_summary { file = Destination,
1540 valid_total_size = DestinationValid,
1541 contiguous_top = DestinationContiguousTop,
1543 State = {_FileSummaryEts, Dir, _Index, _IndexState}) ->
1544 SourceName = filenum_to_name(Source),
1545 DestinationName = filenum_to_name(Destination),
1546 {ok, SourceHdl} = open_file(Dir, SourceName,
1548 {ok, DestinationHdl} = open_file(Dir, DestinationName,
1549 ?READ_AHEAD_MODE ++ ?WRITE_MODE),
1550 ExpectedSize = SourceValid + DestinationValid,
1551 %% if DestinationValid =:= DestinationContiguousTop then we don't
1553 %% if they're not equal, then we need to write out everything past
1554 %% the DestinationContiguousTop to a tmp file then truncate,
1555 %% copy back in, and then copy over from Source
1556 %% otherwise we just truncate straight away and copy over from Source
1557 case DestinationContiguousTop =:= DestinationValid of
1559 ok = truncate_and_extend_file(
1560 DestinationHdl, DestinationContiguousTop, ExpectedSize);
1562 {DestinationWorkList, DestinationValid} =
1563 find_unremoved_messages_in_file(Destination, State),
1566 fun (#msg_location { offset = Offset })
1567 when Offset =/= DestinationContiguousTop ->
1568 %% it cannot be that Offset =:=
1569 %% DestinationContiguousTop because if it
1570 %% was then DestinationContiguousTop would
1571 %% have been extended by TotalSize
1572 Offset < DestinationContiguousTop
1573 end, DestinationWorkList),
1574 Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
1575 {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE),
1577 Worklist, DestinationContiguousTop, DestinationValid,
1578 DestinationHdl, TmpHdl, Destination, State),
1579 TmpSize = DestinationValid - DestinationContiguousTop,
1580 %% so now Tmp contains everything we need to salvage from
1581 %% Destination, and index_state has been updated to
1582 %% reflect the compaction of Destination so truncate
1583 %% Destination and copy from Tmp back to the end
1584 {ok, 0} = file_handle_cache:position(TmpHdl, 0),
1585 ok = truncate_and_extend_file(
1586 DestinationHdl, DestinationContiguousTop, ExpectedSize),
1588 file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize),
1589 %% position in DestinationHdl should now be DestinationValid
1590 ok = file_handle_cache:sync(DestinationHdl),
1591 ok = file_handle_cache:delete(TmpHdl)
1593 {SourceWorkList, SourceValid} =
1594 find_unremoved_messages_in_file(Source, State),
1595 ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize,
1596 SourceHdl, DestinationHdl, Destination, State),
1598 ok = file_handle_cache:close(DestinationHdl),
1599 ok = file_handle_cache:delete(SourceHdl),
1602 find_unremoved_messages_in_file(File,
1603 {_FileSummaryEts, Dir, Index, IndexState}) ->
1604 %% Messages here will be end-of-file at start-of-list
1605 {ok, Messages, _FileSize} =
1606 scan_file_for_valid_messages(Dir, filenum_to_name(File)),
1607 %% foldl will reverse so will end up with msgs in ascending offset order
1608 lists:foldl(fun ({Guid, TotalSize, Offset}, Acc = {List, Size}) ->
1609 case Index:lookup(Guid, IndexState) of
1610 #msg_location { file = File, total_size = TotalSize,
1611 offset = Offset } = Entry ->
1612 {[ Entry | List ], TotalSize + Size};
1616 end, {[], 0}, Messages).
1618 copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
1619 Destination, {_FileSummaryEts, _Dir, Index, IndexState}) ->
1620 Copy = fun ({BlockStart, BlockEnd}) ->
1621 BSize = BlockEnd - BlockStart,
1623 file_handle_cache:position(SourceHdl, BlockStart),
1625 file_handle_cache:copy(SourceHdl, DestinationHdl, BSize)
1629 fun (#msg_location { guid = Guid, offset = Offset,
1630 total_size = TotalSize },
1631 {CurOffset, Block = {BlockStart, BlockEnd}}) ->
1632 %% CurOffset is in the DestinationFile.
1633 %% Offset, BlockStart and BlockEnd are in the SourceFile
1634 %% update MsgLocation to reflect change of file and offset
1635 ok = Index:update_fields(Guid,
1636 [{#msg_location.file, Destination},
1637 {#msg_location.offset, CurOffset}],
1639 {CurOffset + TotalSize,
1642 %% base case, called only for the first list elem
1643 {Offset, Offset + TotalSize};
1645 %% extend the current block because the
1646 %% next msg follows straight on
1647 {BlockStart, BlockEnd + TotalSize};
1649 %% found a gap, so actually do the work for
1650 %% the previous block
1652 {Offset, Offset + TotalSize}
1654 end, {InitOffset, {undefined, undefined}}, WorkList) of
1655 {FinalOffset, Block} ->
1658 _ -> Copy(Block), %% do the last remaining block
1659 ok = file_handle_cache:sync(DestinationHdl)
1661 {FinalOffsetZ, _Block} ->
1662 {gc_error, [{expected, FinalOffset},
1663 {got, FinalOffsetZ},
1664 {destination, Destination}]}