src/rabbit_msg_store.erl
author Matthew Sackman <matthew@rabbitmq.com>
Wed, 12 Oct 2011 16:24:35 +0100
branchbug24455
changeset 8386 59fa7d144fe1
parent 7985 df3aaaa8d665
child 8350 4dcae0039b1d
child 8380 617415f5d5f9
permissions -rw-r--r--
Update docs
     1 %% The contents of this file are subject to the Mozilla Public License
     2 %% Version 1.1 (the "License"); you may not use this file except in
     3 %% compliance with the License. You may obtain a copy of the License
     4 %% at http://www.mozilla.org/MPL/
     5 %%
     6 %% Software distributed under the License is distributed on an "AS IS"
     7 %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
     8 %% the License for the specific language governing rights and
     9 %% limitations under the License.
    10 %%
    11 %% The Original Code is RabbitMQ.
    12 %%
    13 %% The Initial Developer of the Original Code is VMware, Inc.
    14 %% Copyright (c) 2007-2011 VMware, Inc.  All rights reserved.
    15 %%
    16 
    17 -module(rabbit_msg_store).
    18 
    19 -behaviour(gen_server2).
    20 
    21 -export([start_link/4, successfully_recovered_state/1,
    22          client_init/4, client_terminate/1, client_delete_and_terminate/1,
    23          client_ref/1, close_all_indicated/1,
    24          write/3, read/2, contains/2, remove/2]).
    25 
    26 -export([set_maximum_since_use/2, has_readers/2, combine_files/3,
    27          delete_file/2]). %% internal
    28 
    29 -export([transform_dir/3, force_recovery/2]). %% upgrade
    30 
    31 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
    32          code_change/3, prioritise_call/3, prioritise_cast/2,
    33          prioritise_info/2, format_message_queue/2]).
    34 
    35 %%----------------------------------------------------------------------------
    36 
    37 -include("rabbit_msg_store.hrl").
    38 
    39 -define(SYNC_INTERVAL,  25).   %% milliseconds
    40 -define(CLEAN_FILENAME, "clean.dot").
    41 -define(FILE_SUMMARY_FILENAME, "file_summary.ets").
    42 -define(TRANSFORM_TMP, "transform_tmp").
    43 
    44 -define(BINARY_MODE,     [raw, binary]).
    45 -define(READ_MODE,       [read]).
    46 -define(READ_AHEAD_MODE, [read_ahead | ?READ_MODE]).
    47 -define(WRITE_MODE,      [write]).
    48 
    49 -define(FILE_EXTENSION,        ".rdq").
    50 -define(FILE_EXTENSION_TMP,    ".rdt").
    51 
    52 -define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB
    53 
    54 %%----------------------------------------------------------------------------
    55 
    56 -record(msstate,
    57         { dir,                    %% store directory
    58           index_module,           %% the module for index ops
    59           index_state,            %% where are messages?
    60           current_file,           %% current file name as number
    61           current_file_handle,    %% current file handle since the last fsync?
    62           file_handle_cache,      %% file handle cache
    63           sync_timer_ref,         %% TRef for our interval timer
    64           sum_valid_data,         %% sum of valid data in all files
    65           sum_file_size,          %% sum of file sizes
    66           pending_gc_completion,  %% things to do once GC completes
    67           gc_pid,                 %% pid of our GC
    68           file_handles_ets,       %% tid of the shared file handles table
    69           file_summary_ets,       %% tid of the file summary table
    70           cur_file_cache_ets,     %% tid of current file cache table
    71           dying_clients,          %% set of dying clients
    72           clients,                %% map of references of all registered clients
    73                                   %% to callbacks
    74           successfully_recovered, %% boolean: did we recover state?
    75           file_size_limit,        %% how big are our files allowed to get?
    76           cref_to_msg_ids         %% client ref to synced messages mapping
    77         }).
    78 
    79 -record(client_msstate,
    80         { server,
    81           client_ref,
    82           file_handle_cache,
    83           index_state,
    84           index_module,
    85           dir,
    86           gc_pid,
    87           file_handles_ets,
    88           file_summary_ets,
    89           cur_file_cache_ets
    90         }).
    91 
    92 -record(file_summary,
    93         {file, valid_total_size, left, right, file_size, locked, readers}).
    94 
    95 -record(gc_state,
    96         { dir,
    97           index_module,
    98           index_state,
    99           file_summary_ets,
   100           file_handles_ets,
   101           msg_store
   102         }).
   103 
   104 %%----------------------------------------------------------------------------
   105 
   106 -ifdef(use_specs).
   107 
   108 -export_type([gc_state/0, file_num/0]).
   109 
   110 -type(gc_state() :: #gc_state { dir              :: file:filename(),
   111                                 index_module     :: atom(),
   112                                 index_state      :: any(),
   113                                 file_summary_ets :: ets:tid(),
   114                                 file_handles_ets :: ets:tid(),
   115                                 msg_store        :: server()
   116                               }).
   117 
   118 -type(server() :: pid() | atom()).
   119 -type(client_ref() :: binary()).
   120 -type(file_num() :: non_neg_integer()).
   121 -type(client_msstate() :: #client_msstate {
   122                       server             :: server(),
   123                       client_ref         :: client_ref(),
   124                       file_handle_cache  :: dict(),
   125                       index_state        :: any(),
   126                       index_module       :: atom(),
   127                       dir                :: file:filename(),
   128                       gc_pid             :: pid(),
   129                       file_handles_ets   :: ets:tid(),
   130                       file_summary_ets   :: ets:tid(),
   131                       cur_file_cache_ets :: ets:tid()}).
   132 -type(msg_ref_delta_gen(A) ::
   133         fun ((A) -> 'finished' |
   134                     {rabbit_types:msg_id(), non_neg_integer(), A})).
   135 -type(maybe_msg_id_fun() ::
   136         'undefined' | fun ((gb_set(), 'written' | 'removed') -> any())).
   137 -type(maybe_close_fds_fun() :: 'undefined' | fun (() -> 'ok')).
   138 -type(deletion_thunk() :: fun (() -> boolean())).
   139 
   140 -spec(start_link/4 ::
   141         (atom(), file:filename(), [binary()] | 'undefined',
   142          {msg_ref_delta_gen(A), A}) -> rabbit_types:ok_pid_or_error()).
   143 -spec(successfully_recovered_state/1 :: (server()) -> boolean()).
   144 -spec(client_init/4 :: (server(), client_ref(), maybe_msg_id_fun(),
   145                         maybe_close_fds_fun()) -> client_msstate()).
   146 -spec(client_terminate/1 :: (client_msstate()) -> 'ok').
   147 -spec(client_delete_and_terminate/1 :: (client_msstate()) -> 'ok').
   148 -spec(client_ref/1 :: (client_msstate()) -> client_ref()).
   149 -spec(close_all_indicated/1 ::
   150         (client_msstate()) -> rabbit_types:ok(client_msstate())).
   151 -spec(write/3 :: (rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok').
   152 -spec(read/2 :: (rabbit_types:msg_id(), client_msstate()) ->
   153                      {rabbit_types:ok(msg()) | 'not_found', client_msstate()}).
   154 -spec(contains/2 :: (rabbit_types:msg_id(), client_msstate()) -> boolean()).
   155 -spec(remove/2 :: ([rabbit_types:msg_id()], client_msstate()) -> 'ok').
   156 
   157 -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok').
   158 -spec(has_readers/2 :: (non_neg_integer(), gc_state()) -> boolean()).
   159 -spec(combine_files/3 :: (non_neg_integer(), non_neg_integer(), gc_state()) ->
   160                               deletion_thunk()).
   161 -spec(delete_file/2 :: (non_neg_integer(), gc_state()) -> deletion_thunk()).
   162 -spec(force_recovery/2 :: (file:filename(), server()) -> 'ok').
   163 -spec(transform_dir/3 :: (file:filename(), server(),
   164         fun ((any()) -> (rabbit_types:ok_or_error2(msg(), any())))) -> 'ok').
   165 
   166 -endif.
   167 
   168 %%----------------------------------------------------------------------------
   169 
   170 %% We run GC whenever (garbage / sum_file_size) > ?GARBAGE_FRACTION
   171 %% It is not recommended to set this to < 0.5
   172 -define(GARBAGE_FRACTION,      0.5).
   173 
   174 %% The components:
   175 %%
   176 %% Index: this is a mapping from MsgId to #msg_location{}:
   177 %%        {MsgId, RefCount, File, Offset, TotalSize}
   178 %%        By default, it's in ets, but it's also pluggable.
   179 %% FileSummary: this is an ets table which maps File to #file_summary{}:
   180 %%        {File, ValidTotalSize, Left, Right, FileSize, Locked, Readers}
   181 %%
   182 %% The basic idea is that messages are appended to the current file up
   183 %% until that file becomes too big (> file_size_limit). At that point,
   184 %% the file is closed and a new file is created on the _right_ of the
   185 %% old file which is used for new messages. Files are named
   186 %% numerically ascending, thus the file with the lowest name is the
   187 %% eldest file.
   188 %%
   189 %% We need to keep track of which messages are in which files (this is
   190 %% the Index); how much useful data is in each file and which files
   191 %% are on the left and right of each other. This is the purpose of the
   192 %% FileSummary ets table.
   193 %%
   194 %% As messages are removed from files, holes appear in these
   195 %% files. The field ValidTotalSize contains the total amount of useful
   196 %% data left in the file. This is needed for garbage collection.
   197 %%
   198 %% When we discover that a file is now empty, we delete it. When we
   199 %% discover that it can be combined with the useful data in either its
   200 %% left or right neighbour, and overall, across all the files, we have
   201 %% ((the amount of garbage) / (the sum of all file sizes)) >
   202 %% ?GARBAGE_FRACTION, we start a garbage collection run concurrently,
   203 %% which will compact the two files together. This keeps disk
   204 %% utilisation high and aids performance. We deliberately do this
   205 %% lazily in order to prevent doing GC on files which are soon to be
   206 %% emptied (and hence deleted) soon.
   207 %%
   208 %% Given the compaction between two files, the left file (i.e. elder
   209 %% file) is considered the ultimate destination for the good data in
   210 %% the right file. If necessary, the good data in the left file which
   211 %% is fragmented throughout the file is written out to a temporary
   212 %% file, then read back in to form a contiguous chunk of good data at
   213 %% the start of the left file. Thus the left file is garbage collected
   214 %% and compacted. Then the good data from the right file is copied
   215 %% onto the end of the left file. Index and FileSummary tables are
   216 %% updated.
   217 %%
   218 %% On non-clean startup, we scan the files we discover, dealing with
   219 %% the possibilites of a crash having occured during a compaction
   220 %% (this consists of tidyup - the compaction is deliberately designed
   221 %% such that data is duplicated on disk rather than risking it being
   222 %% lost), and rebuild the FileSummary ets table and Index.
   223 %%
   224 %% So, with this design, messages move to the left. Eventually, they
   225 %% should end up in a contiguous block on the left and are then never
   226 %% rewritten. But this isn't quite the case. If in a file there is one
   227 %% message that is being ignored, for some reason, and messages in the
   228 %% file to the right and in the current block are being read all the
   229 %% time then it will repeatedly be the case that the good data from
   230 %% both files can be combined and will be written out to a new
   231 %% file. Whenever this happens, our shunned message will be rewritten.
   232 %%
   233 %% So, provided that we combine messages in the right order,
   234 %% (i.e. left file, bottom to top, right file, bottom to top),
   235 %% eventually our shunned message will end up at the bottom of the
   236 %% left file. The compaction/combining algorithm is smart enough to
   237 %% read in good data from the left file that is scattered throughout
   238 %% (i.e. C and D in the below diagram), then truncate the file to just
   239 %% above B (i.e. truncate to the limit of the good contiguous region
   240 %% at the start of the file), then write C and D on top and then write
   241 %% E, F and G from the right file on top. Thus contiguous blocks of
   242 %% good data at the bottom of files are not rewritten.
   243 %%
   244 %% +-------+    +-------+         +-------+
   245 %% |   X   |    |   G   |         |   G   |
   246 %% +-------+    +-------+         +-------+
   247 %% |   D   |    |   X   |         |   F   |
   248 %% +-------+    +-------+         +-------+
   249 %% |   X   |    |   X   |         |   E   |
   250 %% +-------+    +-------+         +-------+
   251 %% |   C   |    |   F   |   ===>  |   D   |
   252 %% +-------+    +-------+         +-------+
   253 %% |   X   |    |   X   |         |   C   |
   254 %% +-------+    +-------+         +-------+
   255 %% |   B   |    |   X   |         |   B   |
   256 %% +-------+    +-------+         +-------+
   257 %% |   A   |    |   E   |         |   A   |
   258 %% +-------+    +-------+         +-------+
   259 %%   left         right             left
   260 %%
   261 %% From this reasoning, we do have a bound on the number of times the
   262 %% message is rewritten. From when it is inserted, there can be no
   263 %% files inserted between it and the head of the queue, and the worst
   264 %% case is that everytime it is rewritten, it moves one position lower
   265 %% in the file (for it to stay at the same position requires that
   266 %% there are no holes beneath it, which means truncate would be used
   267 %% and so it would not be rewritten at all). Thus this seems to
   268 %% suggest the limit is the number of messages ahead of it in the
   269 %% queue, though it's likely that that's pessimistic, given the
   270 %% requirements for compaction/combination of files.
   271 %%
   272 %% The other property is that we have is the bound on the lowest
   273 %% utilisation, which should be 50% - worst case is that all files are
   274 %% fractionally over half full and can't be combined (equivalent is
   275 %% alternating full files and files with only one tiny message in
   276 %% them).
   277 %%
   278 %% Messages are reference-counted. When a message with the same msg id
   279 %% is written several times we only store it once, and only remove it
   280 %% from the store when it has been removed the same number of times.
   281 %%
   282 %% The reference counts do not persist. Therefore the initialisation
   283 %% function must be provided with a generator that produces ref count
   284 %% deltas for all recovered messages. This is only used on startup
   285 %% when the shutdown was non-clean.
   286 %%
   287 %% Read messages with a reference count greater than one are entered
   288 %% into a message cache. The purpose of the cache is not especially
   289 %% performance, though it can help there too, but prevention of memory
   290 %% explosion. It ensures that as messages with a high reference count
   291 %% are read from several processes they are read back as the same
   292 %% binary object rather than multiples of identical binary
   293 %% objects.
   294 %%
   295 %% Reads can be performed directly by clients without calling to the
   296 %% server. This is safe because multiple file handles can be used to
   297 %% read files. However, locking is used by the concurrent GC to make
   298 %% sure that reads are not attempted from files which are in the
   299 %% process of being garbage collected.
   300 %%
   301 %% When a message is removed, its reference count is decremented. Even
   302 %% if the reference count becomes 0, its entry is not removed. This is
   303 %% because in the event of the same message being sent to several
   304 %% different queues, there is the possibility of one queue writing and
   305 %% removing the message before other queues write it at all. Thus
   306 %% accomodating 0-reference counts allows us to avoid unnecessary
   307 %% writes here. Of course, there are complications: the file to which
   308 %% the message has already been written could be locked pending
   309 %% deletion or GC, which means we have to rewrite the message as the
   310 %% original copy will now be lost.
   311 %%
   312 %% The server automatically defers reads, removes and contains calls
   313 %% that occur which refer to files which are currently being
   314 %% GC'd. Contains calls are only deferred in order to ensure they do
   315 %% not overtake removes.
   316 %%
   317 %% The current file to which messages are being written has a
   318 %% write-back cache. This is written to immediately by clients and can
   319 %% be read from by clients too. This means that there are only ever
   320 %% writes made to the current file, thus eliminating delays due to
   321 %% flushing write buffers in order to be able to safely read from the
   322 %% current file. The one exception to this is that on start up, the
   323 %% cache is not populated with msgs found in the current file, and
   324 %% thus in this case only, reads may have to come from the file
   325 %% itself. The effect of this is that even if the msg_store process is
   326 %% heavily overloaded, clients can still write and read messages with
   327 %% very low latency and not block at all.
   328 %%
   329 %% Clients of the msg_store are required to register before using the
   330 %% msg_store. This provides them with the necessary client-side state
   331 %% to allow them to directly access the various caches and files. When
   332 %% they terminate, they should deregister. They can do this by calling
   333 %% either client_terminate/1 or client_delete_and_terminate/1. The
   334 %% differences are: (a) client_terminate is synchronous. As a result,
   335 %% if the msg_store is badly overloaded and has lots of in-flight
   336 %% writes and removes to process, this will take some time to
   337 %% return. However, once it does return, you can be sure that all the
   338 %% actions you've issued to the msg_store have been processed. (b) Not
   339 %% only is client_delete_and_terminate/1 asynchronous, but it also
   340 %% permits writes and subsequent removes from the current
   341 %% (terminating) client which are still in flight to be safely
   342 %% ignored. Thus from the point of view of the msg_store itself, and
   343 %% all from the same client:
   344 %%
   345 %% (T) = termination; (WN) = write of msg N; (RN) = remove of msg N
   346 %% --> W1, W2, W1, R1, T, W3, R2, W2, R1, R2, R3, W4 -->
   347 %%
   348 %% The client obviously sent T after all the other messages (up to
   349 %% W4), but because the msg_store prioritises messages, the T can be
   350 %% promoted and thus received early.
   351 %%
   352 %% Thus at the point of the msg_store receiving T, we have messages 1
   353 %% and 2 with a refcount of 1. After T, W3 will be ignored because
   354 %% it's an unknown message, as will R3, and W4. W2, R1 and R2 won't be
   355 %% ignored because the messages that they refer to were already known
   356 %% to the msg_store prior to T. However, it can be a little more
   357 %% complex: after the first R2, the refcount of msg 2 is 0. At that
   358 %% point, if a GC occurs or file deletion, msg 2 could vanish, which
   359 %% would then mean that the subsequent W2 and R2 are then ignored.
   360 %%
   361 %% The use case then for client_delete_and_terminate/1 is if the
   362 %% client wishes to remove everything it's written to the msg_store:
   363 %% it issues removes for all messages it's written and not removed,
   364 %% and then calls client_delete_and_terminate/1. At that point, any
   365 %% in-flight writes (and subsequent removes) can be ignored, but
   366 %% removes and writes for messages the msg_store already knows about
   367 %% will continue to be processed normally (which will normally just
   368 %% involve modifying the reference count, which is fast). Thus we save
   369 %% disk bandwidth for writes which are going to be immediately removed
   370 %% again by the the terminating client.
   371 %%
   372 %% We use a separate set to keep track of the dying clients in order
   373 %% to keep that set, which is inspected on every write and remove, as
   374 %% small as possible. Inspecting the set of all clients would degrade
   375 %% performance with many healthy clients and few, if any, dying
   376 %% clients, which is the typical case.
   377 %%
   378 %% For notes on Clean Shutdown and startup, see documentation in
   379 %% variable_queue.
   380 
   381 %%----------------------------------------------------------------------------
   382 %% public API
   383 %%----------------------------------------------------------------------------
   384 
   385 start_link(Server, Dir, ClientRefs, StartupFunState) ->
   386     gen_server2:start_link({local, Server}, ?MODULE,
   387                            [Server, Dir, ClientRefs, StartupFunState],
   388                            [{timeout, infinity}]).
   389 
   390 successfully_recovered_state(Server) ->
   391     gen_server2:call(Server, successfully_recovered_state, infinity).
   392 
   393 client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
   394     {IState, IModule, Dir, GCPid,
   395      FileHandlesEts, FileSummaryEts, CurFileCacheEts} =
   396         gen_server2:call(
   397           Server, {new_client_state, Ref, MsgOnDiskFun, CloseFDsFun}, infinity),
   398     #client_msstate { server             = Server,
   399                       client_ref         = Ref,
   400                       file_handle_cache  = dict:new(),
   401                       index_state        = IState,
   402                       index_module       = IModule,
   403                       dir                = Dir,
   404                       gc_pid             = GCPid,
   405                       file_handles_ets   = FileHandlesEts,
   406                       file_summary_ets   = FileSummaryEts,
   407                       cur_file_cache_ets = CurFileCacheEts }.
   408 
   409 client_terminate(CState = #client_msstate { client_ref = Ref }) ->
   410     close_all_handles(CState),
   411     ok = server_call(CState, {client_terminate, Ref}).
   412 
   413 client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) ->
   414     close_all_handles(CState),
   415     ok = server_cast(CState, {client_dying, Ref}),
   416     ok = server_cast(CState, {client_delete, Ref}).
   417 
   418 client_ref(#client_msstate { client_ref = Ref }) -> Ref.
   419 
   420 write(MsgId, Msg,
   421       CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
   422                                  client_ref         = CRef }) ->
   423     ok = update_msg_cache(CurFileCacheEts, MsgId, Msg),
   424     ok = server_cast(CState, {write, CRef, MsgId}).
   425 
   426 read(MsgId,
   427      CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
   428     %% Check the cur file cache
   429     case ets:lookup(CurFileCacheEts, MsgId) of
   430         [] ->
   431             Defer = fun() -> {server_call(CState, {read, MsgId}), CState} end,
   432             case index_lookup_positive_ref_count(MsgId, CState) of
   433                 not_found   -> Defer();
   434                 MsgLocation -> client_read1(MsgLocation, Defer, CState)
   435             end;
   436         [{MsgId, Msg, _CacheRefCount}] ->
   437             {{ok, Msg}, CState}
   438     end.
   439 
   440 contains(MsgId, CState) -> server_call(CState, {contains, MsgId}).
   441 remove([],    _CState) -> ok;
   442 remove(MsgIds, CState = #client_msstate { client_ref = CRef }) ->
   443     server_cast(CState, {remove, CRef, MsgIds}).
   444 
   445 set_maximum_since_use(Server, Age) ->
   446     gen_server2:cast(Server, {set_maximum_since_use, Age}).
   447 
   448 %%----------------------------------------------------------------------------
   449 %% Client-side-only helpers
   450 %%----------------------------------------------------------------------------
   451 
   452 server_call(#client_msstate { server = Server }, Msg) ->
   453     gen_server2:call(Server, Msg, infinity).
   454 
   455 server_cast(#client_msstate { server = Server }, Msg) ->
   456     gen_server2:cast(Server, Msg).
   457 
   458 client_read1(#msg_location { msg_id = MsgId, file = File } = MsgLocation, Defer,
   459              CState = #client_msstate { file_summary_ets = FileSummaryEts }) ->
   460     case ets:lookup(FileSummaryEts, File) of
   461         [] -> %% File has been GC'd and no longer exists. Go around again.
   462             read(MsgId, CState);
   463         [#file_summary { locked = Locked, right = Right }] ->
   464             client_read2(Locked, Right, MsgLocation, Defer, CState)
   465     end.
   466 
   467 client_read2(false, undefined, _MsgLocation, Defer, _CState) ->
   468     %% Although we've already checked both caches and not found the
   469     %% message there, the message is apparently in the
   470     %% current_file. We can only arrive here if we are trying to read
   471     %% a message which we have not written, which is very odd, so just
   472     %% defer.
   473     %%
   474     %% OR, on startup, the cur_file_cache is not populated with the
   475     %% contents of the current file, thus reads from the current file
   476     %% will end up here and will need to be deferred.
   477     Defer();
   478 client_read2(true, _Right, _MsgLocation, Defer, _CState) ->
   479     %% Of course, in the mean time, the GC could have run and our msg
   480     %% is actually in a different file, unlocked. However, defering is
   481     %% the safest and simplest thing to do.
   482     Defer();
   483 client_read2(false, _Right,
   484              MsgLocation = #msg_location { msg_id = MsgId, file = File },
   485              Defer,
   486              CState = #client_msstate { file_summary_ets = FileSummaryEts }) ->
   487     %% It's entirely possible that everything we're doing from here on
   488     %% is for the wrong file, or a non-existent file, as a GC may have
   489     %% finished.
   490     safe_ets_update_counter(
   491       FileSummaryEts, File, {#file_summary.readers, +1},
   492       fun (_) -> client_read3(MsgLocation, Defer, CState) end,
   493       fun () -> read(MsgId, CState) end).
   494 
   495 client_read3(#msg_location { msg_id = MsgId, file = File }, Defer,
   496              CState = #client_msstate { file_handles_ets = FileHandlesEts,
   497                                         file_summary_ets = FileSummaryEts,
   498                                         gc_pid           = GCPid,
   499                                         client_ref       = Ref }) ->
   500     Release =
   501         fun() -> ok = case ets:update_counter(FileSummaryEts, File,
   502                                               {#file_summary.readers, -1}) of
   503                           0 -> case ets:lookup(FileSummaryEts, File) of
   504                                    [#file_summary { locked = true }] ->
   505                                        rabbit_msg_store_gc:no_readers(
   506                                          GCPid, File);
   507                                    _ -> ok
   508                                end;
   509                           _ -> ok
   510                       end
   511         end,
   512     %% If a GC involving the file hasn't already started, it won't
   513     %% start now. Need to check again to see if we've been locked in
   514     %% the meantime, between lookup and update_counter (thus GC
   515     %% started before our +1. In fact, it could have finished by now
   516     %% too).
   517     case ets:lookup(FileSummaryEts, File) of
   518         [] -> %% GC has deleted our file, just go round again.
   519             read(MsgId, CState);
   520         [#file_summary { locked = true }] ->
   521             %% If we get a badarg here, then the GC has finished and
   522             %% deleted our file. Try going around again. Otherwise,
   523             %% just defer.
   524             %%
   525             %% badarg scenario: we lookup, msg_store locks, GC starts,
   526             %% GC ends, we +1 readers, msg_store ets:deletes (and
   527             %% unlocks the dest)
   528             try Release(),
   529                  Defer()
   530             catch error:badarg -> read(MsgId, CState)
   531             end;
   532         [#file_summary { locked = false }] ->
   533             %% Ok, we're definitely safe to continue - a GC involving
   534             %% the file cannot start up now, and isn't running, so
   535             %% nothing will tell us from now on to close the handle if
   536             %% it's already open.
   537             %%
   538             %% Finally, we need to recheck that the msg is still at
   539             %% the same place - it's possible an entire GC ran between
   540             %% us doing the lookup and the +1 on the readers. (Same as
   541             %% badarg scenario above, but we don't have a missing file
   542             %% - we just have the /wrong/ file).
   543             case index_lookup(MsgId, CState) of
   544                 #msg_location { file = File } = MsgLocation ->
   545                     %% Still the same file.
   546                     {ok, CState1} = close_all_indicated(CState),
   547                     %% We are now guaranteed that the mark_handle_open
   548                     %% call will either insert_new correctly, or will
   549                     %% fail, but find the value is open, not close.
   550                     mark_handle_open(FileHandlesEts, File, Ref),
   551                     %% Could the msg_store now mark the file to be
   552                     %% closed? No: marks for closing are issued only
   553                     %% when the msg_store has locked the file.
   554                     %% This will never be the current file
   555                     {Msg, CState2} = read_from_disk(MsgLocation, CState1),
   556                     Release(), %% this MUST NOT fail with badarg
   557                     {{ok, Msg}, CState2};
   558                 #msg_location {} = MsgLocation -> %% different file!
   559                     Release(), %% this MUST NOT fail with badarg
   560                     client_read1(MsgLocation, Defer, CState);
   561                 not_found -> %% it seems not to exist. Defer, just to be sure.
   562                     try Release() %% this can badarg, same as locked case, above
   563                     catch error:badarg -> ok
   564                     end,
   565                     Defer()
   566             end
   567     end.
   568 
   569 clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM,
   570                                       dying_clients = DyingClients }) ->
   571     State #msstate { cref_to_msg_ids = dict:erase(CRef, CTM),
   572                      dying_clients = sets:del_element(CRef, DyingClients) }.
   573 
   574 
   575 %%----------------------------------------------------------------------------
   576 %% gen_server callbacks
   577 %%----------------------------------------------------------------------------
   578 
   579 init([Server, BaseDir, ClientRefs, StartupFunState]) ->
   580     process_flag(trap_exit, true),
   581 
   582     ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
   583                                              [self()]),
   584 
   585     Dir = filename:join(BaseDir, atom_to_list(Server)),
   586 
   587     {ok, IndexModule} = application:get_env(msg_store_index_module),
   588     rabbit_log:info("~w: using ~p to provide index~n", [Server, IndexModule]),
   589 
   590     AttemptFileSummaryRecovery =
   591         case ClientRefs of
   592             undefined -> ok = rabbit_file:recursive_delete([Dir]),
   593                          ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
   594                          false;
   595             _         -> ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
   596                          recover_crashed_compactions(Dir)
   597         end,
   598 
   599     %% if we found crashed compactions we trust neither the
   600     %% file_summary nor the location index. Note the file_summary is
   601     %% left empty here if it can't be recovered.
   602     {FileSummaryRecovered, FileSummaryEts} =
   603         recover_file_summary(AttemptFileSummaryRecovery, Dir),
   604 
   605     {CleanShutdown, IndexState, ClientRefs1} =
   606         recover_index_and_client_refs(IndexModule, FileSummaryRecovered,
   607                                       ClientRefs, Dir, Server),
   608     Clients = dict:from_list(
   609                 [{CRef, {undefined, undefined}} || CRef <- ClientRefs1]),
   610     %% CleanShutdown => msg location index and file_summary both
   611     %% recovered correctly.
   612     true = case {FileSummaryRecovered, CleanShutdown} of
   613                {true, false} -> ets:delete_all_objects(FileSummaryEts);
   614                _             -> true
   615            end,
   616     %% CleanShutdown <=> msg location index and file_summary both
   617     %% recovered correctly.
   618 
   619     FileHandlesEts  = ets:new(rabbit_msg_store_shared_file_handles,
   620                               [ordered_set, public]),
   621     CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]),
   622 
   623     {ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit),
   624 
   625     {ok, GCPid} = rabbit_msg_store_gc:start_link(
   626                     #gc_state { dir              = Dir,
   627                                 index_module     = IndexModule,
   628                                 index_state      = IndexState,
   629                                 file_summary_ets = FileSummaryEts,
   630                                 file_handles_ets = FileHandlesEts,
   631                                 msg_store        = self()
   632                               }),
   633 
   634     State = #msstate { dir                    = Dir,
   635                        index_module           = IndexModule,
   636                        index_state            = IndexState,
   637                        current_file           = 0,
   638                        current_file_handle    = undefined,
   639                        file_handle_cache      = dict:new(),
   640                        sync_timer_ref         = undefined,
   641                        sum_valid_data         = 0,
   642                        sum_file_size          = 0,
   643                        pending_gc_completion  = orddict:new(),
   644                        gc_pid                 = GCPid,
   645                        file_handles_ets       = FileHandlesEts,
   646                        file_summary_ets       = FileSummaryEts,
   647                        cur_file_cache_ets     = CurFileCacheEts,
   648                        dying_clients          = sets:new(),
   649                        clients                = Clients,
   650                        successfully_recovered = CleanShutdown,
   651                        file_size_limit        = FileSizeLimit,
   652                        cref_to_msg_ids        = dict:new()
   653                      },
   654 
   655     %% If we didn't recover the msg location index then we need to
   656     %% rebuild it now.
   657     {Offset, State1 = #msstate { current_file = CurFile }} =
   658         build_index(CleanShutdown, StartupFunState, State),
   659 
   660     %% read is only needed so that we can seek
   661     {ok, CurHdl} = open_file(Dir, filenum_to_name(CurFile),
   662                              [read | ?WRITE_MODE]),
   663     {ok, Offset} = file_handle_cache:position(CurHdl, Offset),
   664     ok = file_handle_cache:truncate(CurHdl),
   665 
   666     {ok, maybe_compact(State1 #msstate { current_file_handle = CurHdl }),
   667      hibernate,
   668      {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
   669 
   670 prioritise_call(Msg, _From, _State) ->
   671     case Msg of
   672         successfully_recovered_state                  -> 7;
   673         {new_client_state, _Ref, _MODC, _CloseFDsFun} -> 7;
   674         {read, _MsgId}                                -> 2;
   675         _                                             -> 0
   676     end.
   677 
   678 prioritise_cast(Msg, _State) ->
   679     case Msg of
   680         {combine_files, _Source, _Destination, _Reclaimed} -> 8;
   681         {delete_file, _File, _Reclaimed}                   -> 8;
   682         {set_maximum_since_use, _Age}                      -> 8;
   683         {client_dying, _Pid}                               -> 7;
   684         _                                                  -> 0
   685     end.
   686 
   687 prioritise_info(Msg, _State) ->
   688     case Msg of
   689         sync                                               -> 8;
   690         _                                                  -> 0
   691     end.
   692 
   693 handle_call(successfully_recovered_state, _From, State) ->
   694     reply(State #msstate.successfully_recovered, State);
   695 
   696 handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From,
   697             State = #msstate { dir                = Dir,
   698                                index_state        = IndexState,
   699                                index_module       = IndexModule,
   700                                file_handles_ets   = FileHandlesEts,
   701                                file_summary_ets   = FileSummaryEts,
   702                                cur_file_cache_ets = CurFileCacheEts,
   703                                clients            = Clients,
   704                                gc_pid             = GCPid }) ->
   705     Clients1 = dict:store(CRef, {MsgOnDiskFun, CloseFDsFun}, Clients),
   706     reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts,
   707            CurFileCacheEts}, State #msstate { clients = Clients1 });
   708 
   709 handle_call({client_terminate, CRef}, _From, State) ->
   710     reply(ok, clear_client(CRef, State));
   711 
   712 handle_call({read, MsgId}, From, State) ->
   713     State1 = read_message(MsgId, From, State),
   714     noreply(State1);
   715 
   716 handle_call({contains, MsgId}, From, State) ->
   717     State1 = contains_message(MsgId, From, State),
   718     noreply(State1).
   719 
   720 handle_cast({client_dying, CRef},
   721             State = #msstate { dying_clients = DyingClients }) ->
   722     DyingClients1 = sets:add_element(CRef, DyingClients),
   723     noreply(write_message(CRef, <<>>,
   724                           State #msstate { dying_clients = DyingClients1 }));
   725 
   726 handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) ->
   727     State1 = State #msstate { clients = dict:erase(CRef, Clients) },
   728     noreply(remove_message(CRef, CRef, clear_client(CRef, State1)));
   729 
   730 handle_cast({write, CRef, MsgId},
   731             State = #msstate { cur_file_cache_ets = CurFileCacheEts }) ->
   732     true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}),
   733     [{MsgId, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, MsgId),
   734     noreply(
   735       case write_action(should_mask_action(CRef, MsgId, State), MsgId, State) of
   736           {write, State1} ->
   737               write_message(CRef, MsgId, Msg, State1);
   738           {ignore, CurFile, State1 = #msstate { current_file = CurFile }} ->
   739               State1;
   740           {ignore, _File, State1} ->
   741               true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}),
   742               State1;
   743           {confirm, CurFile, State1 = #msstate { current_file = CurFile }}->
   744               record_pending_confirm(CRef, MsgId, State1);
   745           {confirm, _File, State1} ->
   746               true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}),
   747               update_pending_confirms(
   748                 fun (MsgOnDiskFun, CTM) ->
   749                         MsgOnDiskFun(gb_sets:singleton(MsgId), written),
   750                         CTM
   751                 end, CRef, State1)
   752       end);
   753 
   754 handle_cast({remove, CRef, MsgIds}, State) ->
   755     State1 = lists:foldl(
   756                fun (MsgId, State2) -> remove_message(MsgId, CRef, State2) end,
   757                State, MsgIds),
   758     noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(MsgIds),
   759                                          removed, State1)));
   760 
   761 handle_cast({combine_files, Source, Destination, Reclaimed},
   762             State = #msstate { sum_file_size    = SumFileSize,
   763                                file_handles_ets = FileHandlesEts,
   764                                file_summary_ets = FileSummaryEts,
   765                                clients          = Clients }) ->
   766     ok = cleanup_after_file_deletion(Source, State),
   767     %% see comment in cleanup_after_file_deletion, and client_read3
   768     true = mark_handle_to_close(Clients, FileHandlesEts, Destination, false),
   769     true = ets:update_element(FileSummaryEts, Destination,
   770                               {#file_summary.locked, false}),
   771     State1 = State #msstate { sum_file_size = SumFileSize - Reclaimed },
   772     noreply(maybe_compact(run_pending([Source, Destination], State1)));
   773 
   774 handle_cast({delete_file, File, Reclaimed},
   775             State = #msstate { sum_file_size = SumFileSize }) ->
   776     ok = cleanup_after_file_deletion(File, State),
   777     State1 = State #msstate { sum_file_size = SumFileSize - Reclaimed },
   778     noreply(maybe_compact(run_pending([File], State1)));
   779 
   780 handle_cast({set_maximum_since_use, Age}, State) ->
   781     ok = file_handle_cache:set_maximum_since_use(Age),
   782     noreply(State).
   783 
   784 handle_info(sync, State) ->
   785     noreply(internal_sync(State));
   786 
   787 handle_info(timeout, State) ->
   788     noreply(internal_sync(State));
   789 
   790 handle_info({'EXIT', _Pid, Reason}, State) ->
   791     {stop, Reason, State}.
   792 
   793 terminate(_Reason, State = #msstate { index_state         = IndexState,
   794                                       index_module        = IndexModule,
   795                                       current_file_handle = CurHdl,
   796                                       gc_pid              = GCPid,
   797                                       file_handles_ets    = FileHandlesEts,
   798                                       file_summary_ets    = FileSummaryEts,
   799                                       cur_file_cache_ets  = CurFileCacheEts,
   800                                       clients             = Clients,
   801                                       dir                 = Dir }) ->
   802     %% stop the gc first, otherwise it could be working and we pull
   803     %% out the ets tables from under it.
   804     ok = rabbit_msg_store_gc:stop(GCPid),
   805     State1 = case CurHdl of
   806                  undefined -> State;
   807                  _         -> State2 = internal_sync(State),
   808                               ok = file_handle_cache:close(CurHdl),
   809                               State2
   810              end,
   811     State3 = close_all_handles(State1),
   812     ok = store_file_summary(FileSummaryEts, Dir),
   813     [true = ets:delete(T) ||
   814         T <- [FileSummaryEts, FileHandlesEts, CurFileCacheEts]],
   815     IndexModule:terminate(IndexState),
   816     ok = store_recovery_terms([{client_refs, dict:fetch_keys(Clients)},
   817                                {index_module, IndexModule}], Dir),
   818     State3 #msstate { index_state         = undefined,
   819                       current_file_handle = undefined }.
   820 
   821 code_change(_OldVsn, State, _Extra) ->
   822     {ok, State}.
   823 
   824 format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
   825 
   826 %%----------------------------------------------------------------------------
   827 %% general helper functions
   828 %%----------------------------------------------------------------------------
   829 
   830 noreply(State) ->
   831     {State1, Timeout} = next_state(State),
   832     {noreply, State1, Timeout}.
   833 
   834 reply(Reply, State) ->
   835     {State1, Timeout} = next_state(State),
   836     {reply, Reply, State1, Timeout}.
   837 
   838 next_state(State = #msstate { sync_timer_ref  = undefined,
   839                               cref_to_msg_ids = CTM }) ->
   840     case dict:size(CTM) of
   841         0 -> {State, hibernate};
   842         _ -> {start_sync_timer(State), 0}
   843     end;
   844 next_state(State = #msstate { cref_to_msg_ids = CTM }) ->
   845     case dict:size(CTM) of
   846         0 -> {stop_sync_timer(State), hibernate};
   847         _ -> {State, 0}
   848     end.
   849 
   850 start_sync_timer(State = #msstate { sync_timer_ref = undefined }) ->
   851     TRef = erlang:send_after(?SYNC_INTERVAL, self(), sync),
   852     State #msstate { sync_timer_ref = TRef }.
   853 
   854 stop_sync_timer(State = #msstate { sync_timer_ref = undefined }) ->
   855     State;
   856 stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) ->
   857     erlang:cancel_timer(TRef),
   858     State #msstate { sync_timer_ref = undefined }.
   859 
   860 internal_sync(State = #msstate { current_file_handle = CurHdl,
   861                                  cref_to_msg_ids     = CTM }) ->
   862     State1 = stop_sync_timer(State),
   863     CGs = dict:fold(fun (CRef, MsgIds, NS) ->
   864                             case gb_sets:is_empty(MsgIds) of
   865                                 true  -> NS;
   866                                 false -> [{CRef, MsgIds} | NS]
   867                             end
   868                     end, [], CTM),
   869     ok = case CGs of
   870              [] -> ok;
   871              _  -> file_handle_cache:sync(CurHdl)
   872          end,
   873     lists:foldl(fun ({CRef, MsgIds}, StateN) ->
   874                         client_confirm(CRef, MsgIds, written, StateN)
   875                 end, State1, CGs).
   876 
   877 write_action({true, not_found}, _MsgId, State) ->
   878     {ignore, undefined, State};
   879 write_action({true, #msg_location { file = File }}, _MsgId, State) ->
   880     {ignore, File, State};
   881 write_action({false, not_found}, _MsgId, State) ->
   882     {write, State};
   883 write_action({Mask, #msg_location { ref_count = 0, file = File,
   884                                     total_size = TotalSize }},
   885              MsgId, State = #msstate { file_summary_ets = FileSummaryEts }) ->
   886     case {Mask, ets:lookup(FileSummaryEts, File)} of
   887         {false, [#file_summary { locked = true }]} ->
   888             ok = index_delete(MsgId, State),
   889             {write, State};
   890         {false_if_increment, [#file_summary { locked = true }]} ->
   891             %% The msg for MsgId is older than the client death
   892             %% message, but as it is being GC'd currently we'll have
   893             %% to write a new copy, which will then be younger, so
   894             %% ignore this write.
   895             {ignore, File, State};
   896         {_Mask, [#file_summary {}]} ->
   897             ok = index_update_ref_count(MsgId, 1, State),
   898             State1 = adjust_valid_total_size(File, TotalSize, State),
   899             {confirm, File, State1}
   900     end;
   901 write_action({_Mask, #msg_location { ref_count = RefCount, file = File }},
   902              MsgId, State) ->
   903     ok = index_update_ref_count(MsgId, RefCount + 1, State),
   904     %% We already know about it, just update counter. Only update
   905     %% field otherwise bad interaction with concurrent GC
   906     {confirm, File, State}.
   907 
   908 write_message(CRef, MsgId, Msg, State) ->
   909     write_message(MsgId, Msg, record_pending_confirm(CRef, MsgId, State)).
   910 
   911 write_message(MsgId, Msg,
   912               State = #msstate { current_file_handle = CurHdl,
   913                                  current_file        = CurFile,
   914                                  sum_valid_data      = SumValid,
   915                                  sum_file_size       = SumFileSize,
   916                                  file_summary_ets    = FileSummaryEts }) ->
   917     {ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl),
   918     {ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg),
   919     ok = index_insert(
   920            #msg_location { msg_id = MsgId, ref_count = 1, file = CurFile,
   921                            offset = CurOffset, total_size = TotalSize }, State),
   922     [#file_summary { right = undefined, locked = false }] =
   923         ets:lookup(FileSummaryEts, CurFile),
   924     [_,_] = ets:update_counter(FileSummaryEts, CurFile,
   925                                [{#file_summary.valid_total_size, TotalSize},
   926                                 {#file_summary.file_size,        TotalSize}]),
   927     maybe_roll_to_new_file(CurOffset + TotalSize,
   928                            State #msstate {
   929                              sum_valid_data = SumValid    + TotalSize,
   930                              sum_file_size  = SumFileSize + TotalSize }).
   931 
   932 read_message(MsgId, From, State) ->
   933     case index_lookup_positive_ref_count(MsgId, State) of
   934         not_found   -> gen_server2:reply(From, not_found),
   935                        State;
   936         MsgLocation -> read_message1(From, MsgLocation, State)
   937     end.
   938 
   939 read_message1(From, #msg_location { msg_id = MsgId, file = File,
   940                                     offset = Offset } = MsgLoc,
   941               State = #msstate { current_file        = CurFile,
   942                                  current_file_handle = CurHdl,
   943                                  file_summary_ets    = FileSummaryEts,
   944                                  cur_file_cache_ets  = CurFileCacheEts }) ->
   945     case File =:= CurFile of
   946         true  -> {Msg, State1} =
   947                      %% can return [] if msg in file existed on startup
   948                      case ets:lookup(CurFileCacheEts, MsgId) of
   949                          [] ->
   950                              {ok, RawOffSet} =
   951                                  file_handle_cache:current_raw_offset(CurHdl),
   952                              ok = case Offset >= RawOffSet of
   953                                       true  -> file_handle_cache:flush(CurHdl);
   954                                       false -> ok
   955                                   end,
   956                              read_from_disk(MsgLoc, State);
   957                          [{MsgId, Msg1, _CacheRefCount}] ->
   958                              {Msg1, State}
   959                      end,
   960                  gen_server2:reply(From, {ok, Msg}),
   961                  State1;
   962         false -> [#file_summary { locked = Locked }] =
   963                      ets:lookup(FileSummaryEts, File),
   964                  case Locked of
   965                      true  -> add_to_pending_gc_completion({read, MsgId, From},
   966                                                            File, State);
   967                      false -> {Msg, State1} = read_from_disk(MsgLoc, State),
   968                               gen_server2:reply(From, {ok, Msg}),
   969                               State1
   970                  end
   971     end.
   972 
   973 read_from_disk(#msg_location { msg_id = MsgId, file = File, offset = Offset,
   974                                total_size = TotalSize }, State) ->
   975     {Hdl, State1} = get_read_handle(File, State),
   976     {ok, Offset} = file_handle_cache:position(Hdl, Offset),
   977     {ok, {MsgId, Msg}} =
   978         case rabbit_msg_file:read(Hdl, TotalSize) of
   979             {ok, {MsgId, _}} = Obj ->
   980                 Obj;
   981             Rest ->
   982                 {error, {misread, [{old_state, State},
   983                                    {file_num,  File},
   984                                    {offset,    Offset},
   985                                    {msg_id,    MsgId},
   986                                    {read,      Rest},
   987                                    {proc_dict, get()}
   988                                   ]}}
   989         end,
   990     {Msg, State1}.
   991 
   992 contains_message(MsgId, From,
   993                  State = #msstate { pending_gc_completion = Pending }) ->
   994     case index_lookup_positive_ref_count(MsgId, State) of
   995         not_found ->
   996             gen_server2:reply(From, false),
   997             State;
   998         #msg_location { file = File } ->
   999             case orddict:is_key(File, Pending) of
  1000                 true  -> add_to_pending_gc_completion(
  1001                            {contains, MsgId, From}, File, State);
  1002                 false -> gen_server2:reply(From, true),
  1003                          State
  1004             end
  1005     end.
  1006 
  1007 remove_message(MsgId, CRef,
  1008                State = #msstate { file_summary_ets = FileSummaryEts }) ->
  1009     case should_mask_action(CRef, MsgId, State) of
  1010         {true, _Location} ->
  1011             State;
  1012         {false_if_increment, #msg_location { ref_count = 0 }} ->
  1013             %% CRef has tried to both write and remove this msg
  1014             %% whilst it's being GC'd. ASSERTION:
  1015             %% [#file_summary { locked = true }] =
  1016             %%    ets:lookup(FileSummaryEts, File),
  1017             State;
  1018         {_Mask, #msg_location { ref_count = RefCount, file = File,
  1019                                 total_size = TotalSize }} when RefCount > 0 ->
  1020             %% only update field, otherwise bad interaction with
  1021             %% concurrent GC
  1022             Dec = fun () ->
  1023                           index_update_ref_count(MsgId, RefCount - 1, State)
  1024                   end,
  1025             case RefCount of
  1026                 %% don't remove from CUR_FILE_CACHE_ETS_NAME here
  1027                 %% because there may be further writes in the mailbox
  1028                 %% for the same msg.
  1029                 1 -> case ets:lookup(FileSummaryEts, File) of
  1030                          [#file_summary { locked = true }] ->
  1031                              add_to_pending_gc_completion(
  1032                                {remove, MsgId, CRef}, File, State);
  1033                          [#file_summary {}] ->
  1034                              ok = Dec(),
  1035                              delete_file_if_empty(
  1036                                File, adjust_valid_total_size(File, -TotalSize,
  1037                                                              State))
  1038                      end;
  1039                 _ -> ok = Dec(),
  1040                      State
  1041             end
  1042     end.
  1043 
  1044 add_to_pending_gc_completion(
  1045   Op, File, State = #msstate { pending_gc_completion = Pending }) ->
  1046     State #msstate { pending_gc_completion =
  1047                          rabbit_misc:orddict_cons(File, Op, Pending) }.
  1048 
  1049 run_pending(Files, State) ->
  1050     lists:foldl(
  1051       fun (File, State1 = #msstate { pending_gc_completion = Pending }) ->
  1052               Pending1 = orddict:erase(File, Pending),
  1053               lists:foldl(
  1054                 fun run_pending_action/2,
  1055                 State1 #msstate { pending_gc_completion = Pending1 },
  1056                 lists:reverse(orddict:fetch(File, Pending)))
  1057       end, State, Files).
  1058 
  1059 run_pending_action({read, MsgId, From}, State) ->
  1060     read_message(MsgId, From, State);
  1061 run_pending_action({contains, MsgId, From}, State) ->
  1062     contains_message(MsgId, From, State);
  1063 run_pending_action({remove, MsgId, CRef}, State) ->
  1064     remove_message(MsgId, CRef, State).
  1065 
  1066 safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) ->
  1067     try
  1068         SuccessFun(ets:update_counter(Tab, Key, UpdateOp))
  1069     catch error:badarg -> FailThunk()
  1070     end.
  1071 
  1072 safe_ets_update_counter_ok(Tab, Key, UpdateOp, FailThunk) ->
  1073     safe_ets_update_counter(Tab, Key, UpdateOp, fun (_) -> ok end, FailThunk).
  1074 
  1075 adjust_valid_total_size(File, Delta, State = #msstate {
  1076                                        sum_valid_data   = SumValid,
  1077                                        file_summary_ets = FileSummaryEts }) ->
  1078     [_] = ets:update_counter(FileSummaryEts, File,
  1079                              [{#file_summary.valid_total_size, Delta}]),
  1080     State #msstate { sum_valid_data = SumValid + Delta }.
  1081 
  1082 orddict_store(Key, Val, Dict) ->
  1083     false = orddict:is_key(Key, Dict),
  1084     orddict:store(Key, Val, Dict).
  1085 
  1086 update_pending_confirms(Fun, CRef,
  1087                         State = #msstate { clients         = Clients,
  1088                                            cref_to_msg_ids = CTM }) ->
  1089     case dict:fetch(CRef, Clients) of
  1090         {undefined,    _CloseFDsFun} -> State;
  1091         {MsgOnDiskFun, _CloseFDsFun} -> CTM1 = Fun(MsgOnDiskFun, CTM),
  1092                                         State #msstate {
  1093                                           cref_to_msg_ids = CTM1 }
  1094     end.
  1095 
  1096 record_pending_confirm(CRef, MsgId, State) ->
  1097     update_pending_confirms(
  1098       fun (_MsgOnDiskFun, CTM) ->
  1099               dict:update(CRef, fun (MsgIds) -> gb_sets:add(MsgId, MsgIds) end,
  1100                           gb_sets:singleton(MsgId), CTM)
  1101       end, CRef, State).
  1102 
  1103 client_confirm(CRef, MsgIds, ActionTaken, State) ->
  1104     update_pending_confirms(
  1105       fun (MsgOnDiskFun, CTM) ->
  1106               MsgOnDiskFun(MsgIds, ActionTaken),
  1107               case dict:find(CRef, CTM) of
  1108                   {ok, Gs} -> MsgIds1 = gb_sets:difference(Gs, MsgIds),
  1109                               case gb_sets:is_empty(MsgIds1) of
  1110                                   true  -> dict:erase(CRef, CTM);
  1111                                   false -> dict:store(CRef, MsgIds1, CTM)
  1112                               end;
  1113                   error    -> CTM
  1114               end
  1115       end, CRef, State).
  1116 
  1117 %% Detect whether the MsgId is older or younger than the client's death
  1118 %% msg (if there is one). If the msg is older than the client death
  1119 %% msg, and it has a 0 ref_count we must only alter the ref_count, not
  1120 %% rewrite the msg - rewriting it would make it younger than the death
  1121 %% msg and thus should be ignored. Note that this (correctly) returns
  1122 %% false when testing to remove the death msg itself.
  1123 should_mask_action(CRef, MsgId,
  1124                    State = #msstate { dying_clients = DyingClients }) ->
  1125     case {sets:is_element(CRef, DyingClients), index_lookup(MsgId, State)} of
  1126         {false, Location} ->
  1127             {false, Location};
  1128         {true, not_found} ->
  1129             {true, not_found};
  1130         {true, #msg_location { file = File, offset = Offset,
  1131                                ref_count = RefCount } = Location} ->
  1132             #msg_location { file = DeathFile, offset = DeathOffset } =
  1133                 index_lookup(CRef, State),
  1134             {case {{DeathFile, DeathOffset} < {File, Offset}, RefCount} of
  1135                  {true,  _} -> true;
  1136                  {false, 0} -> false_if_increment;
  1137                  {false, _} -> false
  1138              end, Location}
  1139     end.
  1140 
  1141 %%----------------------------------------------------------------------------
  1142 %% file helper functions
  1143 %%----------------------------------------------------------------------------
  1144 
  1145 open_file(Dir, FileName, Mode) ->
  1146     file_handle_cache:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode,
  1147                            [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]).
  1148 
  1149 close_handle(Key, CState = #client_msstate { file_handle_cache = FHC }) ->
  1150     CState #client_msstate { file_handle_cache = close_handle(Key, FHC) };
  1151 
  1152 close_handle(Key, State = #msstate { file_handle_cache = FHC }) ->
  1153     State #msstate { file_handle_cache = close_handle(Key, FHC) };
  1154 
  1155 close_handle(Key, FHC) ->
  1156     case dict:find(Key, FHC) of
  1157         {ok, Hdl} -> ok = file_handle_cache:close(Hdl),
  1158                      dict:erase(Key, FHC);
  1159         error     -> FHC
  1160     end.
  1161 
  1162 mark_handle_open(FileHandlesEts, File, Ref) ->
  1163     %% This is fine to fail (already exists). Note it could fail with
  1164     %% the value being close, and not have it updated to open.
  1165     ets:insert_new(FileHandlesEts, {{Ref, File}, open}),
  1166     true.
  1167 
  1168 %% See comment in client_read3 - only call this when the file is locked
  1169 mark_handle_to_close(ClientRefs, FileHandlesEts, File, Invoke) ->
  1170     [ begin
  1171           case (ets:update_element(FileHandlesEts, Key, {2, close})
  1172                 andalso Invoke) of
  1173               true  -> case dict:fetch(Ref, ClientRefs) of
  1174                            {_MsgOnDiskFun, undefined}   -> ok;
  1175                            {_MsgOnDiskFun, CloseFDsFun} -> ok = CloseFDsFun()
  1176                        end;
  1177               false -> ok
  1178           end
  1179       end || {{Ref, _File} = Key, open} <-
  1180                  ets:match_object(FileHandlesEts, {{'_', File}, open}) ],
  1181     true.
  1182 
  1183 safe_file_delete_fun(File, Dir, FileHandlesEts) ->
  1184     fun () -> safe_file_delete(File, Dir, FileHandlesEts) end.
  1185 
  1186 safe_file_delete(File, Dir, FileHandlesEts) ->
  1187     %% do not match on any value - it's the absence of the row that
  1188     %% indicates the client has really closed the file.
  1189     case ets:match_object(FileHandlesEts, {{'_', File}, '_'}, 1) of
  1190         {[_|_], _Cont} -> false;
  1191         _              -> ok = file:delete(
  1192                                  form_filename(Dir, filenum_to_name(File))),
  1193                           true
  1194     end.
  1195 
  1196 close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts,
  1197                                       client_ref       = Ref } =
  1198                         CState) ->
  1199     Objs = ets:match_object(FileHandlesEts, {{Ref, '_'}, close}),
  1200     {ok, lists:foldl(fun ({Key = {_Ref, File}, close}, CStateM) ->
  1201                              true = ets:delete(FileHandlesEts, Key),
  1202                              close_handle(File, CStateM)
  1203                      end, CState, Objs)}.
  1204 
  1205 close_all_handles(CState = #client_msstate { file_handles_ets  = FileHandlesEts,
  1206                                              file_handle_cache = FHC,
  1207                                              client_ref        = Ref }) ->
  1208     ok = dict:fold(fun (File, Hdl, ok) ->
  1209                            true = ets:delete(FileHandlesEts, {Ref, File}),
  1210                            file_handle_cache:close(Hdl)
  1211                    end, ok, FHC),
  1212     CState #client_msstate { file_handle_cache = dict:new() };
  1213 
  1214 close_all_handles(State = #msstate { file_handle_cache = FHC }) ->
  1215     ok = dict:fold(fun (_Key, Hdl, ok) -> file_handle_cache:close(Hdl) end,
  1216                    ok, FHC),
  1217     State #msstate { file_handle_cache = dict:new() }.
  1218 
  1219 get_read_handle(FileNum, CState = #client_msstate { file_handle_cache = FHC,
  1220                                                     dir = Dir }) ->
  1221     {Hdl, FHC2} = get_read_handle(FileNum, FHC, Dir),
  1222     {Hdl, CState #client_msstate { file_handle_cache = FHC2 }};
  1223 
  1224 get_read_handle(FileNum, State = #msstate { file_handle_cache = FHC,
  1225                                             dir = Dir }) ->
  1226     {Hdl, FHC2} = get_read_handle(FileNum, FHC, Dir),
  1227     {Hdl, State #msstate { file_handle_cache = FHC2 }}.
  1228 
  1229 get_read_handle(FileNum, FHC, Dir) ->
  1230     case dict:find(FileNum, FHC) of
  1231         {ok, Hdl} -> {Hdl, FHC};
  1232         error     -> {ok, Hdl} = open_file(Dir, filenum_to_name(FileNum),
  1233                                            ?READ_MODE),
  1234                      {Hdl, dict:store(FileNum, Hdl, FHC)}
  1235     end.
  1236 
  1237 preallocate(Hdl, FileSizeLimit, FinalPos) ->
  1238     {ok, FileSizeLimit} = file_handle_cache:position(Hdl, FileSizeLimit),
  1239     ok = file_handle_cache:truncate(Hdl),
  1240     {ok, FinalPos} = file_handle_cache:position(Hdl, FinalPos),
  1241     ok.
  1242 
  1243 truncate_and_extend_file(Hdl, Lowpoint, Highpoint) ->
  1244     {ok, Lowpoint} = file_handle_cache:position(Hdl, Lowpoint),
  1245     ok = file_handle_cache:truncate(Hdl),
  1246     ok = preallocate(Hdl, Highpoint, Lowpoint).
  1247 
  1248 form_filename(Dir, Name) -> filename:join(Dir, Name).
  1249 
  1250 filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION.
  1251 
  1252 filename_to_num(FileName) -> list_to_integer(filename:rootname(FileName)).
  1253 
  1254 list_sorted_file_names(Dir, Ext) ->
  1255     lists:sort(fun (A, B) -> filename_to_num(A) < filename_to_num(B) end,
  1256                filelib:wildcard("*" ++ Ext, Dir)).
  1257 
  1258 %%----------------------------------------------------------------------------
  1259 %% message cache helper functions
  1260 %%----------------------------------------------------------------------------
  1261 
  1262 update_msg_cache(CacheEts, MsgId, Msg) ->
  1263     case ets:insert_new(CacheEts, {MsgId, Msg, 1}) of
  1264         true  -> ok;
  1265         false -> safe_ets_update_counter_ok(
  1266                    CacheEts, MsgId, {3, +1},
  1267                    fun () -> update_msg_cache(CacheEts, MsgId, Msg) end)
  1268     end.
  1269 
  1270 %%----------------------------------------------------------------------------
  1271 %% index
  1272 %%----------------------------------------------------------------------------
  1273 
  1274 index_lookup_positive_ref_count(Key, State) ->
  1275     case index_lookup(Key, State) of
  1276         not_found                       -> not_found;
  1277         #msg_location { ref_count = 0 } -> not_found;
  1278         #msg_location {} = MsgLocation  -> MsgLocation
  1279     end.
  1280 
  1281 index_update_ref_count(Key, RefCount, State) ->
  1282     index_update_fields(Key, {#msg_location.ref_count, RefCount}, State).
  1283 
  1284 index_lookup(Key, #client_msstate { index_module = Index,
  1285                                     index_state  = State }) ->
  1286     Index:lookup(Key, State);
  1287 
  1288 index_lookup(Key, #msstate { index_module = Index, index_state = State }) ->
  1289     Index:lookup(Key, State).
  1290 
  1291 index_insert(Obj, #msstate { index_module = Index, index_state = State }) ->
  1292     Index:insert(Obj, State).
  1293 
  1294 index_update(Obj, #msstate { index_module = Index, index_state = State }) ->
  1295     Index:update(Obj, State).
  1296 
  1297 index_update_fields(Key, Updates, #msstate { index_module = Index,
  1298                                              index_state  = State }) ->
  1299     Index:update_fields(Key, Updates, State).
  1300 
  1301 index_delete(Key, #msstate { index_module = Index, index_state = State }) ->
  1302     Index:delete(Key, State).
  1303 
  1304 index_delete_by_file(File, #msstate { index_module = Index,
  1305                                       index_state  = State }) ->
  1306     Index:delete_by_file(File, State).
  1307 
  1308 %%----------------------------------------------------------------------------
  1309 %% shutdown and recovery
  1310 %%----------------------------------------------------------------------------
  1311 
  1312 recover_index_and_client_refs(IndexModule, _Recover, undefined, Dir, _Server) ->
  1313     {false, IndexModule:new(Dir), []};
  1314 recover_index_and_client_refs(IndexModule, false, _ClientRefs, Dir, Server) ->
  1315     rabbit_log:warning("~w: rebuilding indices from scratch~n", [Server]),
  1316     {false, IndexModule:new(Dir), []};
  1317 recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Server) ->
  1318     Fresh = fun (ErrorMsg, ErrorArgs) ->
  1319                     rabbit_log:warning("~w: " ++ ErrorMsg ++ "~n"
  1320                                        "rebuilding indices from scratch~n",
  1321                                        [Server | ErrorArgs]),
  1322                     {false, IndexModule:new(Dir), []}
  1323             end,
  1324     case read_recovery_terms(Dir) of
  1325         {false, Error} ->
  1326             Fresh("failed to read recovery terms: ~p", [Error]);
  1327         {true, Terms} ->
  1328             RecClientRefs  = proplists:get_value(client_refs, Terms, []),
  1329             RecIndexModule = proplists:get_value(index_module, Terms),
  1330             case (lists:sort(ClientRefs) =:= lists:sort(RecClientRefs)
  1331                   andalso IndexModule =:= RecIndexModule) of
  1332                 true  -> case IndexModule:recover(Dir) of
  1333                              {ok, IndexState1} ->
  1334                                  {true, IndexState1, ClientRefs};
  1335                              {error, Error} ->
  1336                                  Fresh("failed to recover index: ~p", [Error])
  1337                          end;
  1338                 false -> Fresh("recovery terms differ from present", [])
  1339             end
  1340     end.
  1341 
  1342 store_recovery_terms(Terms, Dir) ->
  1343     rabbit_file:write_term_file(filename:join(Dir, ?CLEAN_FILENAME), Terms).
  1344 
  1345 read_recovery_terms(Dir) ->
  1346     Path = filename:join(Dir, ?CLEAN_FILENAME),
  1347     case rabbit_file:read_term_file(Path) of
  1348         {ok, Terms}    -> case file:delete(Path) of
  1349                               ok             -> {true,  Terms};
  1350                               {error, Error} -> {false, Error}
  1351                           end;
  1352         {error, Error} -> {false, Error}
  1353     end.
  1354 
  1355 store_file_summary(Tid, Dir) ->
  1356     ok = ets:tab2file(Tid, filename:join(Dir, ?FILE_SUMMARY_FILENAME),
  1357                       [{extended_info, [object_count]}]).
  1358 
  1359 recover_file_summary(false, _Dir) ->
  1360     %% TODO: the only reason for this to be an *ordered*_set is so
  1361     %% that a) maybe_compact can start a traversal from the eldest
  1362     %% file, and b) build_index in fast recovery mode can easily
  1363     %% identify the current file. It's awkward to have both that
  1364     %% odering and the left/right pointers in the entries - replacing
  1365     %% the former with some additional bit of state would be easy, but
  1366     %% ditching the latter would be neater.
  1367     {false, ets:new(rabbit_msg_store_file_summary,
  1368                     [ordered_set, public, {keypos, #file_summary.file}])};
  1369 recover_file_summary(true, Dir) ->
  1370     Path = filename:join(Dir, ?FILE_SUMMARY_FILENAME),
  1371     case ets:file2tab(Path) of
  1372         {ok, Tid}       -> ok = file:delete(Path),
  1373                            {true, Tid};
  1374         {error, _Error} -> recover_file_summary(false, Dir)
  1375     end.
  1376 
  1377 count_msg_refs(Gen, Seed, State) ->
  1378     case Gen(Seed) of
  1379         finished ->
  1380             ok;
  1381         {_MsgId, 0, Next} ->
  1382             count_msg_refs(Gen, Next, State);
  1383         {MsgId, Delta, Next} ->
  1384             ok = case index_lookup(MsgId, State) of
  1385                      not_found ->
  1386                          index_insert(#msg_location { msg_id = MsgId,
  1387                                                       file = undefined,
  1388                                                       ref_count = Delta },
  1389                                       State);
  1390                      #msg_location { ref_count = RefCount } = StoreEntry ->
  1391                          NewRefCount = RefCount + Delta,
  1392                          case NewRefCount of
  1393                              0 -> index_delete(MsgId, State);
  1394                              _ -> index_update(StoreEntry #msg_location {
  1395                                                  ref_count = NewRefCount },
  1396                                                State)
  1397                          end
  1398                  end,
  1399             count_msg_refs(Gen, Next, State)
  1400     end.
  1401 
  1402 recover_crashed_compactions(Dir) ->
  1403     FileNames =    list_sorted_file_names(Dir, ?FILE_EXTENSION),
  1404     TmpFileNames = list_sorted_file_names(Dir, ?FILE_EXTENSION_TMP),
  1405     lists:foreach(
  1406       fun (TmpFileName) ->
  1407               NonTmpRelatedFileName =
  1408                   filename:rootname(TmpFileName) ++ ?FILE_EXTENSION,
  1409               true = lists:member(NonTmpRelatedFileName, FileNames),
  1410               ok = recover_crashed_compaction(
  1411                      Dir, TmpFileName, NonTmpRelatedFileName)
  1412       end, TmpFileNames),
  1413     TmpFileNames == [].
  1414 
  1415 recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName) ->
  1416     %% Because a msg can legitimately appear multiple times in the
  1417     %% same file, identifying the contents of the tmp file and where
  1418     %% they came from is non-trivial. If we are recovering a crashed
  1419     %% compaction then we will be rebuilding the index, which can cope
  1420     %% with duplicates appearing. Thus the simplest and safest thing
  1421     %% to do is to append the contents of the tmp file to its main
  1422     %% file.
  1423     {ok, TmpHdl}  = open_file(Dir, TmpFileName, ?READ_MODE),
  1424     {ok, MainHdl} = open_file(Dir, NonTmpRelatedFileName,
  1425                               ?READ_MODE ++ ?WRITE_MODE),
  1426     {ok, _End} = file_handle_cache:position(MainHdl, eof),
  1427     Size = filelib:file_size(form_filename(Dir, TmpFileName)),
  1428     {ok, Size} = file_handle_cache:copy(TmpHdl, MainHdl, Size),
  1429     ok = file_handle_cache:close(MainHdl),
  1430     ok = file_handle_cache:delete(TmpHdl),
  1431     ok.
  1432 
  1433 scan_file_for_valid_messages(Dir, FileName) ->
  1434     case open_file(Dir, FileName, ?READ_MODE) of
  1435         {ok, Hdl}       -> Valid = rabbit_msg_file:scan(
  1436                                      Hdl, filelib:file_size(
  1437                                             form_filename(Dir, FileName)),
  1438                                      fun scan_fun/2, []),
  1439                            ok = file_handle_cache:close(Hdl),
  1440                            Valid;
  1441         {error, enoent} -> {ok, [], 0};
  1442         {error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}}
  1443     end.
  1444 
  1445 scan_fun({MsgId, TotalSize, Offset, _Msg}, Acc) ->
  1446     [{MsgId, TotalSize, Offset} | Acc].
  1447 
  1448 %% Takes the list in *ascending* order (i.e. eldest message
  1449 %% first). This is the opposite of what scan_file_for_valid_messages
  1450 %% produces. The list of msgs that is produced is youngest first.
  1451 drop_contiguous_block_prefix(L) -> drop_contiguous_block_prefix(L, 0).
  1452 
  1453 drop_contiguous_block_prefix([], ExpectedOffset) ->
  1454     {ExpectedOffset, []};
  1455 drop_contiguous_block_prefix([#msg_location { offset = ExpectedOffset,
  1456                                               total_size = TotalSize } | Tail],
  1457                              ExpectedOffset) ->
  1458     ExpectedOffset1 = ExpectedOffset + TotalSize,
  1459     drop_contiguous_block_prefix(Tail, ExpectedOffset1);
  1460 drop_contiguous_block_prefix(MsgsAfterGap, ExpectedOffset) ->
  1461     {ExpectedOffset, MsgsAfterGap}.
  1462 
  1463 build_index(true, _StartupFunState,
  1464             State = #msstate { file_summary_ets = FileSummaryEts }) ->
  1465     ets:foldl(
  1466       fun (#file_summary { valid_total_size = ValidTotalSize,
  1467                            file_size        = FileSize,
  1468                            file             = File },
  1469            {_Offset, State1 = #msstate { sum_valid_data = SumValid,
  1470                                          sum_file_size  = SumFileSize }}) ->
  1471               {FileSize, State1 #msstate {
  1472                            sum_valid_data = SumValid + ValidTotalSize,
  1473                            sum_file_size  = SumFileSize + FileSize,
  1474                            current_file   = File }}
  1475       end, {0, State}, FileSummaryEts);
  1476 build_index(false, {MsgRefDeltaGen, MsgRefDeltaGenInit},
  1477             State = #msstate { dir = Dir }) ->
  1478     ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State),
  1479     {ok, Pid} = gatherer:start_link(),
  1480     case [filename_to_num(FileName) ||
  1481              FileName <- list_sorted_file_names(Dir, ?FILE_EXTENSION)] of
  1482         []     -> build_index(Pid, undefined, [State #msstate.current_file],
  1483                               State);
  1484         Files  -> {Offset, State1} = build_index(Pid, undefined, Files, State),
  1485                   {Offset, lists:foldl(fun delete_file_if_empty/2,
  1486                                        State1, Files)}
  1487     end.
  1488 
  1489 build_index(Gatherer, Left, [],
  1490             State = #msstate { file_summary_ets = FileSummaryEts,
  1491                                sum_valid_data   = SumValid,
  1492                                sum_file_size    = SumFileSize }) ->
  1493     case gatherer:out(Gatherer) of
  1494         empty ->
  1495             unlink(Gatherer),
  1496             ok = gatherer:stop(Gatherer),
  1497             ok = index_delete_by_file(undefined, State),
  1498             Offset = case ets:lookup(FileSummaryEts, Left) of
  1499                          []                                       -> 0;
  1500                          [#file_summary { file_size = FileSize }] -> FileSize
  1501                      end,
  1502             {Offset, State #msstate { current_file = Left }};
  1503         {value, #file_summary { valid_total_size = ValidTotalSize,
  1504                                 file_size = FileSize } = FileSummary} ->
  1505             true = ets:insert_new(FileSummaryEts, FileSummary),
  1506             build_index(Gatherer, Left, [],
  1507                         State #msstate {
  1508                           sum_valid_data = SumValid + ValidTotalSize,
  1509                           sum_file_size  = SumFileSize + FileSize })
  1510     end;
  1511 build_index(Gatherer, Left, [File|Files], State) ->
  1512     ok = gatherer:fork(Gatherer),
  1513     ok = worker_pool:submit_async(
  1514            fun () -> build_index_worker(Gatherer, State,
  1515                                         Left, File, Files)
  1516            end),
  1517     build_index(Gatherer, File, Files, State).
  1518 
  1519 build_index_worker(Gatherer, State = #msstate { dir = Dir },
  1520                    Left, File, Files) ->
  1521     {ok, Messages, FileSize} =
  1522         scan_file_for_valid_messages(Dir, filenum_to_name(File)),
  1523     {ValidMessages, ValidTotalSize} =
  1524         lists:foldl(
  1525           fun (Obj = {MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
  1526                   case index_lookup(MsgId, State) of
  1527                       #msg_location { file = undefined } = StoreEntry ->
  1528                           ok = index_update(StoreEntry #msg_location {
  1529                                               file = File, offset = Offset,
  1530                                               total_size = TotalSize },
  1531                                             State),
  1532                           {[Obj | VMAcc], VTSAcc + TotalSize};
  1533                       _ ->
  1534                           {VMAcc, VTSAcc}
  1535                   end
  1536           end, {[], 0}, Messages),
  1537     {Right, FileSize1} =
  1538         case Files of
  1539             %% if it's the last file, we'll truncate to remove any
  1540             %% rubbish above the last valid message. This affects the
  1541             %% file size.
  1542             []    -> {undefined, case ValidMessages of
  1543                                      [] -> 0;
  1544                                      _  -> {_MsgId, TotalSize, Offset} =
  1545                                                lists:last(ValidMessages),
  1546                                            Offset + TotalSize
  1547                                  end};
  1548             [F|_] -> {F, FileSize}
  1549         end,
  1550     ok = gatherer:in(Gatherer, #file_summary {
  1551                        file             = File,
  1552                        valid_total_size = ValidTotalSize,
  1553                        left             = Left,
  1554                        right            = Right,
  1555                        file_size        = FileSize1,
  1556                        locked           = false,
  1557                        readers          = 0 }),
  1558     ok = gatherer:finish(Gatherer).
  1559 
  1560 %%----------------------------------------------------------------------------
  1561 %% garbage collection / compaction / aggregation -- internal
  1562 %%----------------------------------------------------------------------------
  1563 
  1564 maybe_roll_to_new_file(
  1565   Offset,
  1566   State = #msstate { dir                 = Dir,
  1567                      current_file_handle = CurHdl,
  1568                      current_file        = CurFile,
  1569                      file_summary_ets    = FileSummaryEts,
  1570                      cur_file_cache_ets  = CurFileCacheEts,
  1571                      file_size_limit     = FileSizeLimit })
  1572   when Offset >= FileSizeLimit ->
  1573     State1 = internal_sync(State),
  1574     ok = file_handle_cache:close(CurHdl),
  1575     NextFile = CurFile + 1,
  1576     {ok, NextHdl} = open_file(Dir, filenum_to_name(NextFile), ?WRITE_MODE),
  1577     true = ets:insert_new(FileSummaryEts, #file_summary {
  1578                             file             = NextFile,
  1579                             valid_total_size = 0,
  1580                             left             = CurFile,
  1581                             right            = undefined,
  1582                             file_size        = 0,
  1583                             locked           = false,
  1584                             readers          = 0 }),
  1585     true = ets:update_element(FileSummaryEts, CurFile,
  1586                               {#file_summary.right, NextFile}),
  1587     true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}),
  1588     maybe_compact(State1 #msstate { current_file_handle = NextHdl,
  1589                                     current_file        = NextFile });
  1590 maybe_roll_to_new_file(_, State) ->
  1591     State.
  1592 
  1593 maybe_compact(State = #msstate { sum_valid_data        = SumValid,
  1594                                  sum_file_size         = SumFileSize,
  1595                                  gc_pid                = GCPid,
  1596                                  pending_gc_completion = Pending,
  1597                                  file_summary_ets      = FileSummaryEts,
  1598                                  file_size_limit       = FileSizeLimit })
  1599   when SumFileSize > 2 * FileSizeLimit andalso
  1600        (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION ->
  1601     %% TODO: the algorithm here is sub-optimal - it may result in a
  1602     %% complete traversal of FileSummaryEts.
  1603     case ets:first(FileSummaryEts) of
  1604         '$end_of_table' ->
  1605             State;
  1606         First ->
  1607             case find_files_to_combine(FileSummaryEts, FileSizeLimit,
  1608                                        ets:lookup(FileSummaryEts, First)) of
  1609                 not_found ->
  1610                     State;
  1611                 {Src, Dst} ->
  1612                     Pending1 = orddict_store(Dst, [],
  1613                                              orddict_store(Src, [], Pending)),
  1614                     State1 = close_handle(Src, close_handle(Dst, State)),
  1615                     true = ets:update_element(FileSummaryEts, Src,
  1616                                               {#file_summary.locked, true}),
  1617                     true = ets:update_element(FileSummaryEts, Dst,
  1618                                               {#file_summary.locked, true}),
  1619                     ok = rabbit_msg_store_gc:combine(GCPid, Src, Dst),
  1620                     State1 #msstate { pending_gc_completion = Pending1 }
  1621             end
  1622     end;
  1623 maybe_compact(State) ->
  1624     State.
  1625 
  1626 find_files_to_combine(FileSummaryEts, FileSizeLimit,
  1627                       [#file_summary { file             = Dst,
  1628                                        valid_total_size = DstValid,
  1629                                        right            = Src,
  1630                                        locked           = DstLocked }]) ->
  1631     case Src of
  1632         undefined ->
  1633             not_found;
  1634         _   ->
  1635             [#file_summary { file             = Src,
  1636                              valid_total_size = SrcValid,
  1637                              left             = Dst,
  1638                              right            = SrcRight,
  1639                              locked           = SrcLocked }] = Next =
  1640                 ets:lookup(FileSummaryEts, Src),
  1641             case SrcRight of
  1642                 undefined -> not_found;
  1643                 _         -> case (DstValid + SrcValid =< FileSizeLimit) andalso
  1644                                  (DstValid > 0) andalso (SrcValid > 0) andalso
  1645                                  not (DstLocked orelse SrcLocked) of
  1646                                  true  -> {Src, Dst};
  1647                                  false -> find_files_to_combine(
  1648                                             FileSummaryEts, FileSizeLimit, Next)
  1649                              end
  1650             end
  1651     end.
  1652 
  1653 delete_file_if_empty(File, State = #msstate { current_file = File }) ->
  1654     State;
  1655 delete_file_if_empty(File, State = #msstate {
  1656                              gc_pid                = GCPid,
  1657                              file_summary_ets      = FileSummaryEts,
  1658                              pending_gc_completion = Pending }) ->
  1659     [#file_summary { valid_total_size = ValidData,
  1660                      locked           = false }] =
  1661         ets:lookup(FileSummaryEts, File),
  1662     case ValidData of
  1663         %% don't delete the file_summary_ets entry for File here
  1664         %% because we could have readers which need to be able to
  1665         %% decrement the readers count.
  1666         0 -> true = ets:update_element(FileSummaryEts, File,
  1667                                        {#file_summary.locked, true}),
  1668              ok = rabbit_msg_store_gc:delete(GCPid, File),
  1669              Pending1 = orddict_store(File, [], Pending),
  1670              close_handle(File,
  1671                           State #msstate { pending_gc_completion = Pending1 });
  1672         _ -> State
  1673     end.
  1674 
  1675 cleanup_after_file_deletion(File,
  1676                             #msstate { file_handles_ets = FileHandlesEts,
  1677                                        file_summary_ets = FileSummaryEts,
  1678                                        clients          = Clients }) ->
  1679     %% Ensure that any clients that have open fhs to the file close
  1680     %% them before using them again. This has to be done here (given
  1681     %% it's done in the msg_store, and not the gc), and not when
  1682     %% starting up the GC, because if done when starting up the GC,
  1683     %% the client could find the close, and close and reopen the fh,
  1684     %% whilst the GC is waiting for readers to disappear, before it's
  1685     %% actually done the GC.
  1686     true = mark_handle_to_close(Clients, FileHandlesEts, File, true),
  1687     [#file_summary { left    = Left,
  1688                      right   = Right,
  1689                      locked  = true,
  1690                      readers = 0 }] = ets:lookup(FileSummaryEts, File),
  1691     %% We'll never delete the current file, so right is never undefined
  1692     true = Right =/= undefined, %% ASSERTION
  1693     true = ets:update_element(FileSummaryEts, Right,
  1694                               {#file_summary.left, Left}),
  1695     %% ensure the double linked list is maintained
  1696     true = case Left of
  1697                undefined -> true; %% File is the eldest file (left-most)
  1698                _         -> ets:update_element(FileSummaryEts, Left,
  1699                                                {#file_summary.right, Right})
  1700            end,
  1701     true = ets:delete(FileSummaryEts, File),
  1702     ok.
  1703 
  1704 %%----------------------------------------------------------------------------
  1705 %% garbage collection / compaction / aggregation -- external
  1706 %%----------------------------------------------------------------------------
  1707 
  1708 has_readers(File, #gc_state { file_summary_ets = FileSummaryEts }) ->
  1709     [#file_summary { locked = true, readers = Count }] =
  1710         ets:lookup(FileSummaryEts, File),
  1711     Count /= 0.
  1712 
  1713 combine_files(Source, Destination,
  1714               State = #gc_state { file_summary_ets = FileSummaryEts,
  1715                                   file_handles_ets = FileHandlesEts,
  1716                                   dir              = Dir,
  1717                                   msg_store        = Server }) ->
  1718     [#file_summary {
  1719         readers          = 0,
  1720         left             = Destination,
  1721         valid_total_size = SourceValid,
  1722         file_size        = SourceFileSize,
  1723         locked           = true }] = ets:lookup(FileSummaryEts, Source),
  1724     [#file_summary {
  1725         readers          = 0,
  1726         right            = Source,
  1727         valid_total_size = DestinationValid,
  1728         file_size        = DestinationFileSize,
  1729         locked           = true }] = ets:lookup(FileSummaryEts, Destination),
  1730 
  1731     SourceName           = filenum_to_name(Source),
  1732     DestinationName      = filenum_to_name(Destination),
  1733     {ok, SourceHdl}      = open_file(Dir, SourceName,
  1734                                      ?READ_AHEAD_MODE),
  1735     {ok, DestinationHdl} = open_file(Dir, DestinationName,
  1736                                      ?READ_AHEAD_MODE ++ ?WRITE_MODE),
  1737     TotalValidData = SourceValid + DestinationValid,
  1738     %% if DestinationValid =:= DestinationContiguousTop then we don't
  1739     %% need a tmp file
  1740     %% if they're not equal, then we need to write out everything past
  1741     %%   the DestinationContiguousTop to a tmp file then truncate,
  1742     %%   copy back in, and then copy over from Source
  1743     %% otherwise we just truncate straight away and copy over from Source
  1744     {DestinationWorkList, DestinationValid} =
  1745         load_and_vacuum_message_file(Destination, State),
  1746     {DestinationContiguousTop, DestinationWorkListTail} =
  1747         drop_contiguous_block_prefix(DestinationWorkList),
  1748     case DestinationWorkListTail of
  1749         [] -> ok = truncate_and_extend_file(
  1750                      DestinationHdl, DestinationContiguousTop, TotalValidData);
  1751         _  -> Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
  1752               {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE++?WRITE_MODE),
  1753               ok = copy_messages(
  1754                      DestinationWorkListTail, DestinationContiguousTop,
  1755                      DestinationValid, DestinationHdl, TmpHdl, Destination,
  1756                      State),
  1757               TmpSize = DestinationValid - DestinationContiguousTop,
  1758               %% so now Tmp contains everything we need to salvage
  1759               %% from Destination, and index_state has been updated to
  1760               %% reflect the compaction of Destination so truncate
  1761               %% Destination and copy from Tmp back to the end
  1762               {ok, 0} = file_handle_cache:position(TmpHdl, 0),
  1763               ok = truncate_and_extend_file(
  1764                      DestinationHdl, DestinationContiguousTop, TotalValidData),
  1765               {ok, TmpSize} =
  1766                   file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize),
  1767               %% position in DestinationHdl should now be DestinationValid
  1768               ok = file_handle_cache:sync(DestinationHdl),
  1769               ok = file_handle_cache:delete(TmpHdl)
  1770     end,
  1771     {SourceWorkList, SourceValid} = load_and_vacuum_message_file(Source, State),
  1772     ok = copy_messages(SourceWorkList, DestinationValid, TotalValidData,
  1773                        SourceHdl, DestinationHdl, Destination, State),
  1774     %% tidy up
  1775     ok = file_handle_cache:close(DestinationHdl),
  1776     ok = file_handle_cache:close(SourceHdl),
  1777 
  1778     %% don't update dest.right, because it could be changing at the
  1779     %% same time
  1780     true = ets:update_element(
  1781              FileSummaryEts, Destination,
  1782              [{#file_summary.valid_total_size, TotalValidData},
  1783               {#file_summary.file_size,        TotalValidData}]),
  1784 
  1785     Reclaimed = SourceFileSize + DestinationFileSize - TotalValidData,
  1786     gen_server2:cast(Server, {combine_files, Source, Destination, Reclaimed}),
  1787     safe_file_delete_fun(Source, Dir, FileHandlesEts).
  1788 
  1789 delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
  1790                                       file_handles_ets = FileHandlesEts,
  1791                                       dir              = Dir,
  1792                                       msg_store        = Server }) ->
  1793     [#file_summary { valid_total_size = 0,
  1794                      locked           = true,
  1795                      file_size        = FileSize,
  1796                      readers          = 0 }] = ets:lookup(FileSummaryEts, File),
  1797     {[], 0} = load_and_vacuum_message_file(File, State),
  1798     gen_server2:cast(Server, {delete_file, File, FileSize}),
  1799     safe_file_delete_fun(File, Dir, FileHandlesEts).
  1800 
  1801 load_and_vacuum_message_file(File, #gc_state { dir          = Dir,
  1802                                                index_module = Index,
  1803                                                index_state  = IndexState }) ->
  1804     %% Messages here will be end-of-file at start-of-list
  1805     {ok, Messages, _FileSize} =
  1806         scan_file_for_valid_messages(Dir, filenum_to_name(File)),
  1807     %% foldl will reverse so will end up with msgs in ascending offset order
  1808     lists:foldl(
  1809       fun ({MsgId, TotalSize, Offset}, Acc = {List, Size}) ->
  1810               case Index:lookup(MsgId, IndexState) of
  1811                   #msg_location { file = File, total_size = TotalSize,
  1812                                   offset = Offset, ref_count = 0 } = Entry ->
  1813                       ok = Index:delete_object(Entry, IndexState),
  1814                       Acc;
  1815                   #msg_location { file = File, total_size = TotalSize,
  1816                                   offset = Offset } = Entry ->
  1817                       {[ Entry | List ], TotalSize + Size};
  1818                   _ ->
  1819                       Acc
  1820               end
  1821       end, {[], 0}, Messages).
  1822 
  1823 copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
  1824               Destination, #gc_state { index_module = Index,
  1825                                        index_state  = IndexState }) ->
  1826     Copy = fun ({BlockStart, BlockEnd}) ->
  1827                    BSize = BlockEnd - BlockStart,
  1828                    {ok, BlockStart} =
  1829                        file_handle_cache:position(SourceHdl, BlockStart),
  1830                    {ok, BSize} =
  1831                        file_handle_cache:copy(SourceHdl, DestinationHdl, BSize)
  1832            end,
  1833     case
  1834         lists:foldl(
  1835           fun (#msg_location { msg_id = MsgId, offset = Offset,
  1836                                total_size = TotalSize },
  1837                {CurOffset, Block = {BlockStart, BlockEnd}}) ->
  1838                   %% CurOffset is in the DestinationFile.
  1839                   %% Offset, BlockStart and BlockEnd are in the SourceFile
  1840                   %% update MsgLocation to reflect change of file and offset
  1841                   ok = Index:update_fields(MsgId,
  1842                                            [{#msg_location.file, Destination},
  1843                                             {#msg_location.offset, CurOffset}],
  1844                                            IndexState),
  1845                   {CurOffset + TotalSize,
  1846                    case BlockEnd of
  1847                        undefined ->
  1848                            %% base case, called only for the first list elem
  1849                            {Offset, Offset + TotalSize};
  1850                        Offset ->
  1851                            %% extend the current block because the
  1852                            %% next msg follows straight on
  1853                            {BlockStart, BlockEnd + TotalSize};
  1854                        _ ->
  1855                            %% found a gap, so actually do the work for
  1856                            %% the previous block
  1857                            Copy(Block),
  1858                            {Offset, Offset + TotalSize}
  1859                    end}
  1860           end, {InitOffset, {undefined, undefined}}, WorkList) of
  1861         {FinalOffset, Block} ->
  1862             case WorkList of
  1863                 [] -> ok;
  1864                 _  -> Copy(Block), %% do the last remaining block
  1865                       ok = file_handle_cache:sync(DestinationHdl)
  1866             end;
  1867         {FinalOffsetZ, _Block} ->
  1868             {gc_error, [{expected, FinalOffset},
  1869                         {got, FinalOffsetZ},
  1870                         {destination, Destination}]}
  1871     end.
  1872 
  1873 force_recovery(BaseDir, Store) ->
  1874     Dir = filename:join(BaseDir, atom_to_list(Store)),
  1875     case file:delete(filename:join(Dir, ?CLEAN_FILENAME)) of
  1876         ok              -> ok;
  1877         {error, enoent} -> ok
  1878     end,
  1879     recover_crashed_compactions(BaseDir),
  1880     ok.
  1881 
  1882 foreach_file(D, Fun, Files) ->
  1883     [ok = Fun(filename:join(D, File)) || File <- Files].
  1884 
  1885 foreach_file(D1, D2, Fun, Files) ->
  1886     [ok = Fun(filename:join(D1, File), filename:join(D2, File)) || File <- Files].
  1887 
  1888 transform_dir(BaseDir, Store, TransformFun) ->
  1889     Dir = filename:join(BaseDir, atom_to_list(Store)),
  1890     TmpDir = filename:join(Dir, ?TRANSFORM_TMP),
  1891     TransformFile = fun (A, B) -> transform_msg_file(A, B, TransformFun) end,
  1892     CopyFile = fun (Src, Dst) -> {ok, _Bytes} = file:copy(Src, Dst), ok end,
  1893     case filelib:is_dir(TmpDir) of
  1894         true  -> throw({error, transform_failed_previously});
  1895         false -> FileList = list_sorted_file_names(Dir, ?FILE_EXTENSION),
  1896                  foreach_file(Dir, TmpDir, TransformFile,     FileList),
  1897                  foreach_file(Dir,         fun file:delete/1, FileList),
  1898                  foreach_file(TmpDir, Dir, CopyFile,          FileList),
  1899                  foreach_file(TmpDir,      fun file:delete/1, FileList),
  1900                  ok = file:del_dir(TmpDir)
  1901     end.
  1902 
  1903 transform_msg_file(FileOld, FileNew, TransformFun) ->
  1904     ok = rabbit_file:ensure_parent_dirs_exist(FileNew),
  1905     {ok, RefOld} = file_handle_cache:open(FileOld, [raw, binary, read], []),
  1906     {ok, RefNew} = file_handle_cache:open(FileNew, [raw, binary, write],
  1907                                           [{write_buffer,
  1908                                             ?HANDLE_CACHE_BUFFER_SIZE}]),
  1909     {ok, _Acc, _IgnoreSize} =
  1910         rabbit_msg_file:scan(
  1911           RefOld, filelib:file_size(FileOld),
  1912           fun({MsgId, _Size, _Offset, BinMsg}, ok) ->
  1913                   {ok, MsgNew} = case binary_to_term(BinMsg) of
  1914                                      <<>> -> {ok, <<>>};  %% dying client marker
  1915                                      Msg  -> TransformFun(Msg)
  1916                                  end,
  1917                   {ok, _} = rabbit_msg_file:append(RefNew, MsgId, MsgNew),
  1918                   ok
  1919           end, ok),
  1920     ok = file_handle_cache:close(RefOld),
  1921     ok = file_handle_cache:close(RefNew),
  1922     ok.