src/rabbit_msg_store.erl
author Matthias Radestock <matthias@rabbitmq.com>
Thu Aug 26 21:51:56 2010 +0100 (20 months ago)
branchbug23192
changeset 4624 5061e6041732
parent 4535 a1b91b84df7d
child 4591 d549b37025df
child 4604 09161eee598c
permissions -rw-r--r--
tweak
     1 %%   The contents of this file are subject to the Mozilla Public License
     2 %%   Version 1.1 (the "License"); you may not use this file except in
     3 %%   compliance with the License. You may obtain a copy of the License at
     4 %%   http://www.mozilla.org/MPL/
     5 %%
     6 %%   Software distributed under the License is distributed on an "AS IS"
     7 %%   basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
     8 %%   License for the specific language governing rights and limitations
     9 %%   under the License.
    10 %%
    11 %%   The Original Code is RabbitMQ.
    12 %%
    13 %%   The Initial Developers of the Original Code are LShift Ltd,
    14 %%   Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
    15 %%
    16 %%   Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
    17 %%   Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
    18 %%   are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
    19 %%   Technologies LLC, and Rabbit Technologies Ltd.
    20 %%
    21 %%   Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
    22 %%   Ltd. Portions created by Cohesive Financial Technologies LLC are
    23 %%   Copyright (C) 2007-2010 Cohesive Financial Technologies
    24 %%   LLC. Portions created by Rabbit Technologies Ltd are Copyright
    25 %%   (C) 2007-2010 Rabbit Technologies Ltd.
    26 %%
    27 %%   All Rights Reserved.
    28 %%
    29 %%   Contributor(s): ______________________________________.
    30 %%
    31 
    32 -module(rabbit_msg_store).
    33 
    34 -behaviour(gen_server2).
    35 
    36 -export([start_link/4, write/4, read/3, contains/2, remove/2, release/2,
    37          sync/3, client_init/2, client_terminate/1,
    38          client_delete_and_terminate/3, successfully_recovered_state/1]).
    39 
    40 -export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal
    41 
    42 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
    43          terminate/2, code_change/3]).
    44 
    45 %%----------------------------------------------------------------------------
    46 
    47 -include("rabbit_msg_store.hrl").
    48 
    49 -define(SYNC_INTERVAL,  5).   %% milliseconds
    50 -define(CLEAN_FILENAME, "clean.dot").
    51 -define(FILE_SUMMARY_FILENAME, "file_summary.ets").
    52 
    53 -define(BINARY_MODE,     [raw, binary]).
    54 -define(READ_MODE,       [read]).
    55 -define(READ_AHEAD_MODE, [read_ahead | ?READ_MODE]).
    56 -define(WRITE_MODE,      [write]).
    57 
    58 -define(FILE_EXTENSION,        ".rdq").
    59 -define(FILE_EXTENSION_TMP,    ".rdt").
    60 
    61 -define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB
    62 
    63 %%----------------------------------------------------------------------------
    64 
    65 -record(msstate,
    66         { dir,                    %% store directory
    67           index_module,           %% the module for index ops
    68           index_state,            %% where are messages?
    69           current_file,           %% current file name as number
    70           current_file_handle,    %% current file handle since the last fsync?
    71           file_handle_cache,      %% file handle cache
    72           on_sync,                %% pending sync requests
    73           sync_timer_ref,         %% TRef for our interval timer
    74           sum_valid_data,         %% sum of valid data in all files
    75           sum_file_size,          %% sum of file sizes
    76           pending_gc_completion,  %% things to do once GC completes
    77           gc_active,              %% is the GC currently working?
    78           gc_pid,                 %% pid of our GC
    79           file_handles_ets,       %% tid of the shared file handles table
    80           file_summary_ets,       %% tid of the file summary table
    81           dedup_cache_ets,        %% tid of dedup cache table
    82           cur_file_cache_ets,     %% tid of current file cache table
    83           client_refs,            %% set of references of all registered clients
    84           successfully_recovered, %% boolean: did we recover state?
    85           file_size_limit         %% how big are our files allowed to get?
    86          }).
    87 
    88 -record(client_msstate,
    89         { file_handle_cache,
    90           index_state,
    91           index_module,
    92           dir,
    93           gc_pid,
    94           file_handles_ets,
    95           file_summary_ets,
    96           dedup_cache_ets,
    97           cur_file_cache_ets
    98          }).
    99 
   100 -record(file_summary,
   101         {file, valid_total_size, contiguous_top, left, right, file_size,
   102          locked, readers}).
   103 
   104 %%----------------------------------------------------------------------------
   105 
   106 -ifdef(use_specs).
   107 
   108 -type(server() :: pid() | atom()).
   109 -type(file_num() :: non_neg_integer()).
   110 -type(client_msstate() :: #client_msstate {
   111                       file_handle_cache  :: dict:dictionary(),
   112                       index_state        :: any(),
   113                       index_module       :: atom(),
   114                       dir                :: file:filename(),
   115                       gc_pid             :: pid(),
   116                       file_handles_ets   :: ets:tid(),
   117                       file_summary_ets   :: ets:tid(),
   118                       dedup_cache_ets    :: ets:tid(),
   119                       cur_file_cache_ets :: ets:tid() }).
   120 -type(startup_fun_state() ::
   121         {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})),
   122          A}).
   123 
   124 -spec(start_link/4 ::
   125         (atom(), file:filename(), [binary()] | 'undefined',
   126          startup_fun_state()) -> rabbit_types:ok_pid_or_error()).
   127 -spec(write/4 :: (server(), rabbit_guid:guid(), msg(), client_msstate()) ->
   128                       rabbit_types:ok(client_msstate())).
   129 -spec(read/3 :: (server(), rabbit_guid:guid(), client_msstate()) ->
   130                      {rabbit_types:ok(msg()) | 'not_found', client_msstate()}).
   131 -spec(contains/2 :: (server(), rabbit_guid:guid()) -> boolean()).
   132 -spec(remove/2 :: (server(), [rabbit_guid:guid()]) -> 'ok').
   133 -spec(release/2 :: (server(), [rabbit_guid:guid()]) -> 'ok').
   134 -spec(sync/3 :: (server(), [rabbit_guid:guid()], fun (() -> any())) -> 'ok').
   135 -spec(gc_done/4 :: (server(), non_neg_integer(), file_num(), file_num()) ->
   136                         'ok').
   137 -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok').
   138 -spec(client_init/2 :: (server(), binary()) -> client_msstate()).
   139 -spec(client_terminate/1 :: (client_msstate()) -> 'ok').
   140 -spec(client_delete_and_terminate/3 ::
   141         (client_msstate(), server(), binary()) -> 'ok').
   142 -spec(successfully_recovered_state/1 :: (server()) -> boolean()).
   143 
   144 -spec(gc/3 :: (non_neg_integer(), non_neg_integer(),
   145                {ets:tid(), file:filename(), atom(), any()}) ->
   146                    'concurrent_readers' | non_neg_integer()).
   147 
   148 -endif.
   149 
   150 %%----------------------------------------------------------------------------
   151 
   152 %% We run GC whenever (garbage / sum_file_size) > ?GARBAGE_FRACTION
   153 %% It is not recommended to set this to < 0.5
   154 -define(GARBAGE_FRACTION,      0.5).
   155 
   156 %% The components:
   157 %%
   158 %% Index: this is a mapping from Guid to #msg_location{}:
   159 %%        {Guid, RefCount, File, Offset, TotalSize}
   160 %%        By default, it's in ets, but it's also pluggable.
   161 %% FileSummary: this is an ets table which maps File to #file_summary{}:
   162 %%        {File, ValidTotalSize, ContiguousTop, Left, Right,
   163 %%         FileSize, Locked, Readers}
   164 %%
   165 %% The basic idea is that messages are appended to the current file up
   166 %% until that file becomes too big (> file_size_limit). At that point,
   167 %% the file is closed and a new file is created on the _right_ of the
   168 %% old file which is used for new messages. Files are named
   169 %% numerically ascending, thus the file with the lowest name is the
   170 %% eldest file.
   171 %%
   172 %% We need to keep track of which messages are in which files (this is
   173 %% the Index); how much useful data is in each file and which files
   174 %% are on the left and right of each other. This is the purpose of the
   175 %% FileSummary ets table.
   176 %%
   177 %% As messages are removed from files, holes appear in these
   178 %% files. The field ValidTotalSize contains the total amount of useful
   179 %% data left in the file, whilst ContiguousTop contains the amount of
   180 %% valid data right at the start of each file. These are needed for
   181 %% garbage collection.
   182 %%
   183 %% When we discover that a file is now empty, we delete it. When we
   184 %% discover that it can be combined with the useful data in either its
   185 %% left or right neighbour, and overall, across all the files, we have
   186 %% ((the amount of garbage) / (the sum of all file sizes)) >
   187 %% ?GARBAGE_FRACTION, we start a garbage collection run concurrently,
   188 %% which will compact the two files together. This keeps disk
   189 %% utilisation high and aids performance. We deliberately do this
   190 %% lazily in order to prevent doing GC on files which are soon to be
   191 %% emptied (and hence deleted) soon.
   192 %%
   193 %% Given the compaction between two files, the left file (i.e. elder
   194 %% file) is considered the ultimate destination for the good data in
   195 %% the right file. If necessary, the good data in the left file which
   196 %% is fragmented throughout the file is written out to a temporary
   197 %% file, then read back in to form a contiguous chunk of good data at
   198 %% the start of the left file. Thus the left file is garbage collected
   199 %% and compacted. Then the good data from the right file is copied
   200 %% onto the end of the left file. Index and FileSummary tables are
   201 %% updated.
   202 %%
   203 %% On non-clean startup, we scan the files we discover, dealing with
   204 %% the possibilites of a crash having occured during a compaction
   205 %% (this consists of tidyup - the compaction is deliberately designed
   206 %% such that data is duplicated on disk rather than risking it being
   207 %% lost), and rebuild the FileSummary ets table and Index.
   208 %%
   209 %% So, with this design, messages move to the left. Eventually, they
   210 %% should end up in a contiguous block on the left and are then never
   211 %% rewritten. But this isn't quite the case. If in a file there is one
   212 %% message that is being ignored, for some reason, and messages in the
   213 %% file to the right and in the current block are being read all the
   214 %% time then it will repeatedly be the case that the good data from
   215 %% both files can be combined and will be written out to a new
   216 %% file. Whenever this happens, our shunned message will be rewritten.
   217 %%
   218 %% So, provided that we combine messages in the right order,
   219 %% (i.e. left file, bottom to top, right file, bottom to top),
   220 %% eventually our shunned message will end up at the bottom of the
   221 %% left file. The compaction/combining algorithm is smart enough to
   222 %% read in good data from the left file that is scattered throughout
   223 %% (i.e. C and D in the below diagram), then truncate the file to just
   224 %% above B (i.e. truncate to the limit of the good contiguous region
   225 %% at the start of the file), then write C and D on top and then write
   226 %% E, F and G from the right file on top. Thus contiguous blocks of
   227 %% good data at the bottom of files are not rewritten (yes, this is
   228 %% the data the size of which is tracked by the ContiguousTop
   229 %% variable. Judicious use of a mirror is required).
   230 %%
   231 %% +-------+    +-------+         +-------+
   232 %% |   X   |    |   G   |         |   G   |
   233 %% +-------+    +-------+         +-------+
   234 %% |   D   |    |   X   |         |   F   |
   235 %% +-------+    +-------+         +-------+
   236 %% |   X   |    |   X   |         |   E   |
   237 %% +-------+    +-------+         +-------+
   238 %% |   C   |    |   F   |   ===>  |   D   |
   239 %% +-------+    +-------+         +-------+
   240 %% |   X   |    |   X   |         |   C   |
   241 %% +-------+    +-------+         +-------+
   242 %% |   B   |    |   X   |         |   B   |
   243 %% +-------+    +-------+         +-------+
   244 %% |   A   |    |   E   |         |   A   |
   245 %% +-------+    +-------+         +-------+
   246 %%   left         right             left
   247 %%
   248 %% From this reasoning, we do have a bound on the number of times the
   249 %% message is rewritten. From when it is inserted, there can be no
   250 %% files inserted between it and the head of the queue, and the worst
   251 %% case is that everytime it is rewritten, it moves one position lower
   252 %% in the file (for it to stay at the same position requires that
   253 %% there are no holes beneath it, which means truncate would be used
   254 %% and so it would not be rewritten at all). Thus this seems to
   255 %% suggest the limit is the number of messages ahead of it in the
   256 %% queue, though it's likely that that's pessimistic, given the
   257 %% requirements for compaction/combination of files.
   258 %%
   259 %% The other property is that we have is the bound on the lowest
   260 %% utilisation, which should be 50% - worst case is that all files are
   261 %% fractionally over half full and can't be combined (equivalent is
   262 %% alternating full files and files with only one tiny message in
   263 %% them).
   264 %%
   265 %% Messages are reference-counted. When a message with the same guid
   266 %% is written several times we only store it once, and only remove it
   267 %% from the store when it has been removed the same number of times.
   268 %%
   269 %% The reference counts do not persist. Therefore the initialisation
   270 %% function must be provided with a generator that produces ref count
   271 %% deltas for all recovered messages. This is only used on startup
   272 %% when the shutdown was non-clean.
   273 %%
   274 %% Read messages with a reference count greater than one are entered
   275 %% into a message cache. The purpose of the cache is not especially
   276 %% performance, though it can help there too, but prevention of memory
   277 %% explosion. It ensures that as messages with a high reference count
   278 %% are read from several processes they are read back as the same
   279 %% binary object rather than multiples of identical binary
   280 %% objects.
   281 %%
   282 %% Reads can be performed directly by clients without calling to the
   283 %% server. This is safe because multiple file handles can be used to
   284 %% read files. However, locking is used by the concurrent GC to make
   285 %% sure that reads are not attempted from files which are in the
   286 %% process of being garbage collected.
   287 %%
   288 %% The server automatically defers reads, removes and contains calls
   289 %% that occur which refer to files which are currently being
   290 %% GC'd. Contains calls are only deferred in order to ensure they do
   291 %% not overtake removes.
   292 %%
   293 %% The current file to which messages are being written has a
   294 %% write-back cache. This is written to immediately by clients and can
   295 %% be read from by clients too. This means that there are only ever
   296 %% writes made to the current file, thus eliminating delays due to
   297 %% flushing write buffers in order to be able to safely read from the
   298 %% current file. The one exception to this is that on start up, the
   299 %% cache is not populated with msgs found in the current file, and
   300 %% thus in this case only, reads may have to come from the file
   301 %% itself. The effect of this is that even if the msg_store process is
   302 %% heavily overloaded, clients can still write and read messages with
   303 %% very low latency and not block at all.
   304 %%
   305 %% For notes on Clean Shutdown and startup, see documentation in
   306 %% variable_queue.
   307 
   308 %%----------------------------------------------------------------------------
   309 %% public API
   310 %%----------------------------------------------------------------------------
   311 
   312 start_link(Server, Dir, ClientRefs, StartupFunState) ->
   313     gen_server2:start_link({local, Server}, ?MODULE,
   314                            [Server, Dir, ClientRefs, StartupFunState],
   315                            [{timeout, infinity}]).
   316 
   317 write(Server, Guid, Msg,
   318       CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
   319     ok = update_msg_cache(CurFileCacheEts, Guid, Msg),
   320     {gen_server2:cast(Server, {write, Guid}), CState}.
   321 
   322 read(Server, Guid,
   323      CState = #client_msstate { dedup_cache_ets    = DedupCacheEts,
   324                                 cur_file_cache_ets = CurFileCacheEts }) ->
   325     %% 1. Check the dedup cache
   326     case fetch_and_increment_cache(DedupCacheEts, Guid) of
   327         not_found ->
   328             %% 2. Check the cur file cache
   329             case ets:lookup(CurFileCacheEts, Guid) of
   330                 [] ->
   331                     Defer = fun() -> {gen_server2:pcall(
   332                                         Server, 2, {read, Guid}, infinity),
   333                                       CState} end,
   334                     case index_lookup(Guid, CState) of
   335                         not_found   -> Defer();
   336                         MsgLocation -> client_read1(Server, MsgLocation, Defer,
   337                                                     CState)
   338                     end;
   339                 [{Guid, Msg, _CacheRefCount}] ->
   340                     %% Although we've found it, we don't know the
   341                     %% refcount, so can't insert into dedup cache
   342                     {{ok, Msg}, CState}
   343             end;
   344         Msg ->
   345             {{ok, Msg}, CState}
   346     end.
   347 
   348 contains(Server, Guid) -> gen_server2:call(Server, {contains, Guid}, infinity).
   349 remove(_Server, [])    -> ok;
   350 remove(Server, Guids)  -> gen_server2:cast(Server, {remove, Guids}).
   351 release(_Server, [])   -> ok;
   352 release(Server, Guids) -> gen_server2:cast(Server, {release, Guids}).
   353 sync(Server, Guids, K) -> gen_server2:cast(Server, {sync, Guids, K}).
   354 sync(Server)           -> gen_server2:pcast(Server, 8, sync). %% internal
   355 
   356 gc_done(Server, Reclaimed, Source, Destination) ->
   357     gen_server2:pcast(Server, 8, {gc_done, Reclaimed, Source, Destination}).
   358 
   359 set_maximum_since_use(Server, Age) ->
   360     gen_server2:pcast(Server, 8, {set_maximum_since_use, Age}).
   361 
   362 client_init(Server, Ref) ->
   363     {IState, IModule, Dir, GCPid,
   364      FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} =
   365         gen_server2:pcall(Server, 7, {new_client_state, Ref}, infinity),
   366     #client_msstate { file_handle_cache  = dict:new(),
   367                       index_state        = IState,
   368                       index_module       = IModule,
   369                       dir                = Dir,
   370                       gc_pid             = GCPid,
   371                       file_handles_ets   = FileHandlesEts,
   372                       file_summary_ets   = FileSummaryEts,
   373                       dedup_cache_ets    = DedupCacheEts,
   374                       cur_file_cache_ets = CurFileCacheEts }.
   375 
   376 client_terminate(CState) ->
   377     close_all_handles(CState),
   378     ok.
   379 
   380 client_delete_and_terminate(CState, Server, Ref) ->
   381     ok = client_terminate(CState),
   382     ok = gen_server2:call(Server, {delete_client, Ref}, infinity).
   383 
   384 successfully_recovered_state(Server) ->
   385     gen_server2:pcall(Server, 7, successfully_recovered_state, infinity).
   386 
   387 %%----------------------------------------------------------------------------
   388 %% Client-side-only helpers
   389 %%----------------------------------------------------------------------------
   390 
   391 client_read1(Server,
   392              #msg_location { guid = Guid, file = File } = MsgLocation,
   393              Defer,
   394              CState = #client_msstate { file_summary_ets = FileSummaryEts }) ->
   395     case ets:lookup(FileSummaryEts, File) of
   396         [] -> %% File has been GC'd and no longer exists. Go around again.
   397             read(Server, Guid, CState);
   398         [#file_summary { locked = Locked, right = Right }] ->
   399             client_read2(Server, Locked, Right, MsgLocation, Defer, CState)
   400     end.
   401 
   402 client_read2(_Server, false, undefined, _MsgLocation, Defer, _CState) ->
   403     %% Although we've already checked both caches and not found the
   404     %% message there, the message is apparently in the
   405     %% current_file. We can only arrive here if we are trying to read
   406     %% a message which we have not written, which is very odd, so just
   407     %% defer.
   408     %%
   409     %% OR, on startup, the cur_file_cache is not populated with the
   410     %% contents of the current file, thus reads from the current file
   411     %% will end up here and will need to be deferred.
   412     Defer();
   413 client_read2(_Server, true, _Right, _MsgLocation, Defer, _CState) ->
   414     %% Of course, in the mean time, the GC could have run and our msg
   415     %% is actually in a different file, unlocked. However, defering is
   416     %% the safest and simplest thing to do.
   417     Defer();
   418 client_read2(Server, false, _Right,
   419              MsgLocation = #msg_location { guid = Guid, file = File },
   420              Defer,
   421              CState = #client_msstate { file_summary_ets = FileSummaryEts }) ->
   422     %% It's entirely possible that everything we're doing from here on
   423     %% is for the wrong file, or a non-existent file, as a GC may have
   424     %% finished.
   425     safe_ets_update_counter(
   426       FileSummaryEts, File, {#file_summary.readers, +1},
   427       fun (_) -> client_read3(Server, MsgLocation, Defer, CState) end,
   428       fun () -> read(Server, Guid, CState) end).
   429 
   430 client_read3(Server, #msg_location { guid = Guid, file = File }, Defer,
   431              CState = #client_msstate { file_handles_ets = FileHandlesEts,
   432                                         file_summary_ets = FileSummaryEts,
   433                                         dedup_cache_ets  = DedupCacheEts,
   434                                         gc_pid           = GCPid }) ->
   435     Release =
   436         fun() -> ok = case ets:update_counter(FileSummaryEts, File,
   437                                               {#file_summary.readers, -1}) of
   438                           0 -> case ets:lookup(FileSummaryEts, File) of
   439                                    [#file_summary { locked = true }] ->
   440                                        rabbit_msg_store_gc:no_readers(
   441                                          GCPid, File);
   442                                    _ -> ok
   443                                end;
   444                           _ -> ok
   445                       end
   446         end,
   447     %% If a GC involving the file hasn't already started, it won't
   448     %% start now. Need to check again to see if we've been locked in
   449     %% the meantime, between lookup and update_counter (thus GC
   450     %% started before our +1. In fact, it could have finished by now
   451     %% too).
   452     case ets:lookup(FileSummaryEts, File) of
   453         [] -> %% GC has deleted our file, just go round again.
   454             read(Server, Guid, CState);
   455         [#file_summary { locked = true }] ->
   456             %% If we get a badarg here, then the GC has finished and
   457             %% deleted our file. Try going around again. Otherwise,
   458             %% just defer.
   459             %%
   460             %% badarg scenario: we lookup, msg_store locks, GC starts,
   461             %% GC ends, we +1 readers, msg_store ets:deletes (and
   462             %% unlocks the dest)
   463             try Release(),
   464                 Defer()
   465             catch error:badarg -> read(Server, Guid, CState)
   466             end;
   467         [#file_summary { locked = false }] ->
   468             %% Ok, we're definitely safe to continue - a GC involving
   469             %% the file cannot start up now, and isn't running, so
   470             %% nothing will tell us from now on to close the handle if
   471             %% it's already open.
   472             %%
   473             %% Finally, we need to recheck that the msg is still at
   474             %% the same place - it's possible an entire GC ran between
   475             %% us doing the lookup and the +1 on the readers. (Same as
   476             %% badarg scenario above, but we don't have a missing file
   477             %% - we just have the /wrong/ file).
   478             case index_lookup(Guid, CState) of
   479                 #msg_location { file = File } = MsgLocation ->
   480                     %% Still the same file.
   481                     mark_handle_open(FileHandlesEts, File),
   482 
   483                     CState1 = close_all_indicated(CState),
   484                     {Msg, CState2} = %% This will never be the current file
   485                         read_from_disk(MsgLocation, CState1, DedupCacheEts),
   486                     Release(), %% this MUST NOT fail with badarg
   487                     {{ok, Msg}, CState2};
   488                 MsgLocation -> %% different file!
   489                     Release(), %% this MUST NOT fail with badarg
   490                     client_read1(Server, MsgLocation, Defer, CState)
   491             end
   492     end.
   493 
   494 %%----------------------------------------------------------------------------
   495 %% gen_server callbacks
   496 %%----------------------------------------------------------------------------
   497 
   498 init([Server, BaseDir, ClientRefs, StartupFunState]) ->
   499     process_flag(trap_exit, true),
   500 
   501     ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
   502                                              [self()]),
   503 
   504     Dir = filename:join(BaseDir, atom_to_list(Server)),
   505 
   506     {ok, IndexModule} = application:get_env(msg_store_index_module),
   507     rabbit_log:info("~w: using ~p to provide index~n", [Server, IndexModule]),
   508 
   509     AttemptFileSummaryRecovery =
   510         case ClientRefs of
   511             undefined -> ok = rabbit_misc:recursive_delete([Dir]),
   512                          ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
   513                          false;
   514             _         -> ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
   515                          recover_crashed_compactions(Dir)
   516         end,
   517 
   518     %% if we found crashed compactions we trust neither the
   519     %% file_summary nor the location index. Note the file_summary is
   520     %% left empty here if it can't be recovered.
   521     {FileSummaryRecovered, FileSummaryEts} =
   522         recover_file_summary(AttemptFileSummaryRecovery, Dir),
   523 
   524     {CleanShutdown, IndexState, ClientRefs1} =
   525         recover_index_and_client_refs(IndexModule, FileSummaryRecovered,
   526                                       ClientRefs, Dir, Server),
   527     %% CleanShutdown => msg location index and file_summary both
   528     %% recovered correctly.
   529     true = case {FileSummaryRecovered, CleanShutdown} of
   530                {true, false} -> ets:delete_all_objects(FileSummaryEts);
   531                _             -> true
   532            end,
   533     %% CleanShutdown <=> msg location index and file_summary both
   534     %% recovered correctly.
   535 
   536     DedupCacheEts   = ets:new(rabbit_msg_store_dedup_cache, [set, public]),
   537     FileHandlesEts  = ets:new(rabbit_msg_store_shared_file_handles,
   538                               [ordered_set, public]),
   539     CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]),
   540 
   541     {ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit),
   542 
   543     State = #msstate { dir                    = Dir,
   544                        index_module           = IndexModule,
   545                        index_state            = IndexState,
   546                        current_file           = 0,
   547                        current_file_handle    = undefined,
   548                        file_handle_cache      = dict:new(),
   549                        on_sync                = [],
   550                        sync_timer_ref         = undefined,
   551                        sum_valid_data         = 0,
   552                        sum_file_size          = 0,
   553                        pending_gc_completion  = [],
   554                        gc_active              = false,
   555                        gc_pid                 = undefined,
   556                        file_handles_ets       = FileHandlesEts,
   557                        file_summary_ets       = FileSummaryEts,
   558                        dedup_cache_ets        = DedupCacheEts,
   559                        cur_file_cache_ets     = CurFileCacheEts,
   560                        client_refs            = ClientRefs1,
   561                        successfully_recovered = CleanShutdown,
   562                        file_size_limit        = FileSizeLimit
   563                       },
   564 
   565     %% If we didn't recover the msg location index then we need to
   566     %% rebuild it now.
   567     {Offset, State1 = #msstate { current_file = CurFile }} =
   568         build_index(CleanShutdown, StartupFunState, State),
   569 
   570     %% read is only needed so that we can seek
   571     {ok, CurHdl} = open_file(Dir, filenum_to_name(CurFile),
   572                              [read | ?WRITE_MODE]),
   573     {ok, Offset} = file_handle_cache:position(CurHdl, Offset),
   574     ok = file_handle_cache:truncate(CurHdl),
   575 
   576     {ok, GCPid} = rabbit_msg_store_gc:start_link(Dir, IndexState, IndexModule,
   577                                                  FileSummaryEts),
   578 
   579     {ok, maybe_compact(
   580            State1 #msstate { current_file_handle = CurHdl, gc_pid = GCPid }),
   581      hibernate,
   582      {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
   583 
   584 handle_call({read, Guid}, From, State) ->
   585     State1 = read_message(Guid, From, State),
   586     noreply(State1);
   587 
   588 handle_call({contains, Guid}, From, State) ->
   589     State1 = contains_message(Guid, From, State),
   590     noreply(State1);
   591 
   592 handle_call({new_client_state, CRef}, _From,
   593             State = #msstate { dir                = Dir,
   594                                index_state        = IndexState,
   595                                index_module       = IndexModule,
   596                                file_handles_ets   = FileHandlesEts,
   597                                file_summary_ets   = FileSummaryEts,
   598                                dedup_cache_ets    = DedupCacheEts,
   599                                cur_file_cache_ets = CurFileCacheEts,
   600                                client_refs        = ClientRefs,
   601                                gc_pid             = GCPid }) ->
   602     reply({IndexState, IndexModule, Dir, GCPid,
   603            FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts},
   604           State #msstate { client_refs = sets:add_element(CRef, ClientRefs) });
   605 
   606 handle_call(successfully_recovered_state, _From, State) ->
   607     reply(State #msstate.successfully_recovered, State);
   608 
   609 handle_call({delete_client, CRef}, _From,
   610             State = #msstate { client_refs = ClientRefs }) ->
   611     reply(ok,
   612           State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }).
   613 
   614 handle_cast({write, Guid},
   615             State = #msstate { current_file_handle = CurHdl,
   616                                current_file        = CurFile,
   617                                sum_valid_data      = SumValid,
   618                                sum_file_size       = SumFileSize,
   619                                file_summary_ets    = FileSummaryEts,
   620                                cur_file_cache_ets  = CurFileCacheEts }) ->
   621     true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}),
   622     [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid),
   623     case index_lookup(Guid, State) of
   624         not_found ->
   625             %% New message, lots to do
   626             {ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl),
   627             {ok, TotalSize} = rabbit_msg_file:append(CurHdl, Guid, Msg),
   628             ok = index_insert(#msg_location {
   629                                 guid = Guid, ref_count = 1, file = CurFile,
   630                                 offset = CurOffset, total_size = TotalSize },
   631                               State),
   632             [#file_summary { valid_total_size = ValidTotalSize,
   633                              contiguous_top   = ContiguousTop,
   634                              right            = undefined,
   635                              locked           = false,
   636                              file_size        = FileSize }] =
   637                 ets:lookup(FileSummaryEts, CurFile),
   638             ValidTotalSize1 = ValidTotalSize + TotalSize,
   639             ContiguousTop1 = case CurOffset =:= ContiguousTop of
   640                                  true  -> ValidTotalSize1;
   641                                  false -> ContiguousTop
   642                              end,
   643             true = ets:update_element(
   644                      FileSummaryEts, CurFile,
   645                      [{#file_summary.valid_total_size, ValidTotalSize1},
   646                       {#file_summary.contiguous_top,   ContiguousTop1},
   647                       {#file_summary.file_size,        FileSize + TotalSize}]),
   648             NextOffset = CurOffset + TotalSize,
   649             noreply(
   650               maybe_roll_to_new_file(
   651                 NextOffset, State #msstate {
   652                               sum_valid_data = SumValid + TotalSize,
   653                               sum_file_size  = SumFileSize + TotalSize }));
   654         #msg_location { ref_count = RefCount } ->
   655             %% We already know about it, just update counter. Only
   656             %% update field otherwise bad interaction with concurrent GC
   657             ok = index_update_fields(Guid,
   658                                      {#msg_location.ref_count, RefCount + 1},
   659                                      State),
   660             noreply(State)
   661     end;
   662 
   663 handle_cast({remove, Guids}, State) ->
   664     State1 = lists:foldl(
   665                fun (Guid, State2) -> remove_message(Guid, State2) end,
   666                State, Guids),
   667     noreply(maybe_compact(State1));
   668 
   669 handle_cast({release, Guids}, State =
   670                 #msstate { dedup_cache_ets = DedupCacheEts }) ->
   671     lists:foreach(
   672       fun (Guid) -> decrement_cache(DedupCacheEts, Guid) end, Guids),
   673     noreply(State);
   674 
   675 handle_cast({sync, Guids, K},
   676             State = #msstate { current_file        = CurFile,
   677                                current_file_handle = CurHdl,
   678                                on_sync             = Syncs }) ->
   679     {ok, SyncOffset} = file_handle_cache:last_sync_offset(CurHdl),
   680     case lists:any(fun (Guid) ->
   681                            #msg_location { file = File, offset = Offset } =
   682                                index_lookup(Guid, State),
   683                            File =:= CurFile andalso Offset >= SyncOffset
   684                    end, Guids) of
   685         false -> K(),
   686                  noreply(State);
   687         true  -> noreply(State #msstate { on_sync = [K | Syncs] })
   688     end;
   689 
   690 handle_cast(sync, State) ->
   691     noreply(internal_sync(State));
   692 
   693 handle_cast({gc_done, Reclaimed, Src, Dst},
   694             State = #msstate { sum_file_size    = SumFileSize,
   695                                gc_active        = {Src, Dst},
   696                                file_handles_ets = FileHandlesEts,
   697                                file_summary_ets = FileSummaryEts }) ->
   698     %% GC done, so now ensure that any clients that have open fhs to
   699     %% those files close them before using them again. This has to be
   700     %% done here (given it's done in the msg_store, and not the gc),
   701     %% and not when starting up the GC, because if done when starting
   702     %% up the GC, the client could find the close, and close and
   703     %% reopen the fh, whilst the GC is waiting for readers to
   704     %% disappear, before it's actually done the GC.
   705     true = mark_handle_to_close(FileHandlesEts, Src),
   706     true = mark_handle_to_close(FileHandlesEts, Dst),
   707     %% we always move data left, so Src has gone and was on the
   708     %% right, so need to make dest = source.right.left, and also
   709     %% dest.right = source.right
   710     [#file_summary { left    = Dst,
   711                      right   = SrcRight,
   712                      locked  = true,
   713                      readers = 0 }] = ets:lookup(FileSummaryEts, Src),
   714     %% this could fail if SrcRight =:= undefined
   715     ets:update_element(FileSummaryEts, SrcRight, {#file_summary.left, Dst}),
   716     true = ets:update_element(FileSummaryEts, Dst,
   717                               [{#file_summary.locked, false},
   718                                {#file_summary.right,  SrcRight}]),
   719     true = ets:delete(FileSummaryEts, Src),
   720     noreply(
   721       maybe_compact(run_pending(
   722                       State #msstate { sum_file_size = SumFileSize - Reclaimed,
   723                                        gc_active     = false })));
   724 
   725 handle_cast({set_maximum_since_use, Age}, State) ->
   726     ok = file_handle_cache:set_maximum_since_use(Age),
   727     noreply(State).
   728 
   729 handle_info(timeout, State) ->
   730     noreply(internal_sync(State));
   731 
   732 handle_info({'EXIT', _Pid, Reason}, State) ->
   733     {stop, Reason, State}.
   734 
   735 terminate(_Reason, State = #msstate { index_state         = IndexState,
   736                                       index_module        = IndexModule,
   737                                       current_file_handle = CurHdl,
   738                                       gc_pid              = GCPid,
   739                                       file_handles_ets    = FileHandlesEts,
   740                                       file_summary_ets    = FileSummaryEts,
   741                                       dedup_cache_ets     = DedupCacheEts,
   742                                       cur_file_cache_ets  = CurFileCacheEts,
   743                                       client_refs         = ClientRefs,
   744                                       dir                 = Dir }) ->
   745     %% stop the gc first, otherwise it could be working and we pull
   746     %% out the ets tables from under it.
   747     ok = rabbit_msg_store_gc:stop(GCPid),
   748     State1 = case CurHdl of
   749                  undefined -> State;
   750                  _         -> State2 = internal_sync(State),
   751                               file_handle_cache:close(CurHdl),
   752                               State2
   753              end,
   754     State3 = close_all_handles(State1),
   755     store_file_summary(FileSummaryEts, Dir),
   756     [ets:delete(T) ||
   757         T <- [FileSummaryEts, DedupCacheEts, FileHandlesEts, CurFileCacheEts]],
   758     IndexModule:terminate(IndexState),
   759     store_recovery_terms([{client_refs, sets:to_list(ClientRefs)},
   760                           {index_module, IndexModule}], Dir),
   761     State3 #msstate { index_state         = undefined,
   762                       current_file_handle = undefined }.
   763 
   764 code_change(_OldVsn, State, _Extra) ->
   765     {ok, State}.
   766 
   767 %%----------------------------------------------------------------------------
   768 %% general helper functions
   769 %%----------------------------------------------------------------------------
   770 
   771 noreply(State) ->
   772     {State1, Timeout} = next_state(State),
   773     {noreply, State1, Timeout}.
   774 
   775 reply(Reply, State) ->
   776     {State1, Timeout} = next_state(State),
   777     {reply, Reply, State1, Timeout}.
   778 
   779 next_state(State = #msstate { on_sync = [], sync_timer_ref = undefined }) ->
   780     {State, hibernate};
   781 next_state(State = #msstate { sync_timer_ref = undefined }) ->
   782     {start_sync_timer(State), 0};
   783 next_state(State = #msstate { on_sync = [] }) ->
   784     {stop_sync_timer(State), hibernate};
   785 next_state(State) ->
   786     {State, 0}.
   787 
   788 start_sync_timer(State = #msstate { sync_timer_ref = undefined }) ->
   789     {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, sync, [self()]),
   790     State #msstate { sync_timer_ref = TRef }.
   791 
   792 stop_sync_timer(State = #msstate { sync_timer_ref = undefined }) ->
   793     State;
   794 stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) ->
   795     {ok, cancel} = timer:cancel(TRef),
   796     State #msstate { sync_timer_ref = undefined }.
   797 
   798 internal_sync(State = #msstate { current_file_handle = CurHdl,
   799                                  on_sync = Syncs }) ->
   800     State1 = stop_sync_timer(State),
   801     case Syncs of
   802         [] -> State1;
   803         _  -> ok = file_handle_cache:sync(CurHdl),
   804               lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)),
   805               State1 #msstate { on_sync = [] }
   806     end.
   807 
   808 read_message(Guid, From,
   809              State = #msstate { dedup_cache_ets = DedupCacheEts }) ->
   810     case index_lookup(Guid, State) of
   811         not_found ->
   812             gen_server2:reply(From, not_found),
   813             State;
   814         MsgLocation ->
   815             case fetch_and_increment_cache(DedupCacheEts, Guid) of
   816                 not_found -> read_message1(From, MsgLocation, State);
   817                 Msg       -> gen_server2:reply(From, {ok, Msg}),
   818                              State
   819             end
   820     end.
   821 
   822 read_message1(From, #msg_location { guid = Guid, ref_count = RefCount,
   823                                     file = File, offset = Offset } = MsgLoc,
   824               State = #msstate { current_file        = CurFile,
   825                                  current_file_handle = CurHdl,
   826                                  file_summary_ets    = FileSummaryEts,
   827                                  dedup_cache_ets     = DedupCacheEts,
   828                                  cur_file_cache_ets  = CurFileCacheEts }) ->
   829     case File =:= CurFile of
   830         true  -> {Msg, State1} =
   831                      %% can return [] if msg in file existed on startup
   832                      case ets:lookup(CurFileCacheEts, Guid) of
   833                          [] ->
   834                              {ok, RawOffSet} =
   835                                  file_handle_cache:current_raw_offset(CurHdl),
   836                              ok = case Offset >= RawOffSet of
   837                                       true  -> file_handle_cache:flush(CurHdl);
   838                                       false -> ok
   839                                   end,
   840                              read_from_disk(MsgLoc, State, DedupCacheEts);
   841                          [{Guid, Msg1, _CacheRefCount}] ->
   842                              ok = maybe_insert_into_cache(
   843                                     DedupCacheEts, RefCount, Guid, Msg1),
   844                              {Msg1, State}
   845                      end,
   846                  gen_server2:reply(From, {ok, Msg}),
   847                  State1;
   848         false -> [#file_summary { locked = Locked }] =
   849                      ets:lookup(FileSummaryEts, File),
   850                  case Locked of
   851                      true  -> add_to_pending_gc_completion({read, Guid, From},
   852                                                            State);
   853                      false -> {Msg, State1} =
   854                                   read_from_disk(MsgLoc, State, DedupCacheEts),
   855                               gen_server2:reply(From, {ok, Msg}),
   856                               State1
   857                  end
   858     end.
   859 
   860 read_from_disk(#msg_location { guid = Guid, ref_count = RefCount,
   861                                file = File, offset = Offset,
   862                                total_size = TotalSize },
   863                State, DedupCacheEts) ->
   864     {Hdl, State1} = get_read_handle(File, State),
   865     {ok, Offset} = file_handle_cache:position(Hdl, Offset),
   866     {ok, {Guid, Msg}} =
   867         case rabbit_msg_file:read(Hdl, TotalSize) of
   868             {ok, {Guid, _}} = Obj ->
   869                 Obj;
   870             Rest ->
   871                 {error, {misread, [{old_state, State},
   872                                    {file_num,  File},
   873                                    {offset,    Offset},
   874                                    {guid,      Guid},
   875                                    {read,      Rest},
   876                                    {proc_dict, get()}
   877                                   ]}}
   878         end,
   879     ok = maybe_insert_into_cache(DedupCacheEts, RefCount, Guid, Msg),
   880     {Msg, State1}.
   881 
   882 contains_message(Guid, From, State = #msstate { gc_active = GCActive }) ->
   883     case index_lookup(Guid, State) of
   884         not_found ->
   885             gen_server2:reply(From, false),
   886             State;
   887         #msg_location { file = File } ->
   888             case GCActive of
   889                 {A, B} when File =:= A orelse File =:= B ->
   890                     add_to_pending_gc_completion(
   891                       {contains, Guid, From}, State);
   892                 _ ->
   893                     gen_server2:reply(From, true),
   894                     State
   895             end
   896     end.
   897 
   898 remove_message(Guid, State = #msstate { sum_valid_data   = SumValid,
   899                                         file_summary_ets = FileSummaryEts,
   900                                         dedup_cache_ets  = DedupCacheEts }) ->
   901     #msg_location { ref_count = RefCount, file = File,
   902                     offset = Offset, total_size = TotalSize } =
   903         index_lookup(Guid, State),
   904     case RefCount of
   905         1 ->
   906             %% don't remove from CUR_FILE_CACHE_ETS_NAME here because
   907             %% there may be further writes in the mailbox for the same
   908             %% msg.
   909             ok = remove_cache_entry(DedupCacheEts, Guid),
   910             [#file_summary { valid_total_size = ValidTotalSize,
   911                              contiguous_top   = ContiguousTop,
   912                              locked           = Locked }] =
   913                 ets:lookup(FileSummaryEts, File),
   914             case Locked of
   915                 true ->
   916                     add_to_pending_gc_completion({remove, Guid}, State);
   917                 false ->
   918                     ok = index_delete(Guid, State),
   919                     ContiguousTop1 = lists:min([ContiguousTop, Offset]),
   920                     ValidTotalSize1 = ValidTotalSize - TotalSize,
   921                     true = ets:update_element(
   922                              FileSummaryEts, File,
   923                              [{#file_summary.valid_total_size, ValidTotalSize1},
   924                               {#file_summary.contiguous_top, ContiguousTop1}]),
   925                     State1 = delete_file_if_empty(File, State),
   926                     State1 #msstate { sum_valid_data = SumValid - TotalSize }
   927             end;
   928         _ when 1 < RefCount ->
   929             ok = decrement_cache(DedupCacheEts, Guid),
   930             %% only update field, otherwise bad interaction with concurrent GC
   931             ok = index_update_fields(Guid,
   932                                      {#msg_location.ref_count, RefCount - 1},
   933                                      State),
   934             State
   935     end.
   936 
   937 add_to_pending_gc_completion(
   938   Op, State = #msstate { pending_gc_completion = Pending }) ->
   939     State #msstate { pending_gc_completion = [Op | Pending] }.
   940 
   941 run_pending(State = #msstate { pending_gc_completion = [] }) ->
   942     State;
   943 run_pending(State = #msstate { pending_gc_completion = Pending }) ->
   944     State1 = State #msstate { pending_gc_completion = [] },
   945     lists:foldl(fun run_pending/2, State1, lists:reverse(Pending)).
   946 
   947 run_pending({read, Guid, From}, State) ->
   948     read_message(Guid, From, State);
   949 run_pending({contains, Guid, From}, State) ->
   950     contains_message(Guid, From, State);
   951 run_pending({remove, Guid}, State) ->
   952     remove_message(Guid, State).
   953 
   954 safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) ->
   955     try
   956         SuccessFun(ets:update_counter(Tab, Key, UpdateOp))
   957     catch error:badarg -> FailThunk()
   958     end.
   959 
   960 safe_ets_update_counter_ok(Tab, Key, UpdateOp, FailThunk) ->
   961     safe_ets_update_counter(Tab, Key, UpdateOp, fun (_) -> ok end, FailThunk).
   962 
   963 %%----------------------------------------------------------------------------
   964 %% file helper functions
   965 %%----------------------------------------------------------------------------
   966 
   967 open_file(Dir, FileName, Mode) ->
   968     file_handle_cache:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode,
   969                            [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]).
   970 
   971 close_handle(Key, CState = #client_msstate { file_handle_cache = FHC }) ->
   972     CState #client_msstate { file_handle_cache = close_handle(Key, FHC) };
   973 
   974 close_handle(Key, State = #msstate { file_handle_cache = FHC }) ->
   975     State #msstate { file_handle_cache = close_handle(Key, FHC) };
   976 
   977 close_handle(Key, FHC) ->
   978     case dict:find(Key, FHC) of
   979         {ok, Hdl} -> ok = file_handle_cache:close(Hdl),
   980                      dict:erase(Key, FHC);
   981         error     -> FHC
   982     end.
   983 
   984 mark_handle_open(FileHandlesEts, File) ->
   985     %% This is fine to fail (already exists)
   986     ets:insert_new(FileHandlesEts, {{self(), File}, open}),
   987     true.
   988 
   989 mark_handle_to_close(FileHandlesEts, File) ->
   990     [ ets:update_element(FileHandlesEts, Key, {2, close})
   991       || {Key, open} <- ets:match_object(FileHandlesEts, {{'_', File}, open}) ],
   992     true.
   993 
   994 close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts } =
   995                     CState) ->
   996     Objs = ets:match_object(FileHandlesEts, {{self(), '_'}, close}),
   997     lists:foldl(fun ({Key = {_Self, File}, close}, CStateM) ->
   998                         true = ets:delete(FileHandlesEts, Key),
   999                         close_handle(File, CStateM)
  1000                 end, CState, Objs).
  1001 
  1002 close_all_handles(CState = #client_msstate { file_handles_ets = FileHandlesEts,
  1003                                              file_handle_cache = FHC }) ->
  1004     Self = self(),
  1005     ok = dict:fold(fun (File, Hdl, ok) ->
  1006                            true = ets:delete(FileHandlesEts, {Self, File}),
  1007                            file_handle_cache:close(Hdl)
  1008                    end, ok, FHC),
  1009     CState #client_msstate { file_handle_cache = dict:new() };
  1010 
  1011 close_all_handles(State = #msstate { file_handle_cache = FHC }) ->
  1012     ok = dict:fold(fun (_Key, Hdl, ok) -> file_handle_cache:close(Hdl) end,
  1013                    ok, FHC),
  1014     State #msstate { file_handle_cache = dict:new() }.
  1015 
  1016 get_read_handle(FileNum, CState = #client_msstate { file_handle_cache = FHC,
  1017                                                     dir = Dir }) ->
  1018     {Hdl, FHC2} = get_read_handle(FileNum, FHC, Dir),
  1019     {Hdl, CState #client_msstate { file_handle_cache = FHC2 }};
  1020 
  1021 get_read_handle(FileNum, State = #msstate { file_handle_cache = FHC,
  1022                                             dir = Dir }) ->
  1023     {Hdl, FHC2} = get_read_handle(FileNum, FHC, Dir),
  1024     {Hdl, State #msstate { file_handle_cache = FHC2 }}.
  1025 
  1026 get_read_handle(FileNum, FHC, Dir) ->
  1027     case dict:find(FileNum, FHC) of
  1028         {ok, Hdl} -> {Hdl, FHC};
  1029         error     -> {ok, Hdl} = open_file(Dir, filenum_to_name(FileNum),
  1030                                            ?READ_MODE),
  1031                      {Hdl, dict:store(FileNum, Hdl, FHC)}
  1032     end.
  1033 
  1034 preallocate(Hdl, FileSizeLimit, FinalPos) ->
  1035     {ok, FileSizeLimit} = file_handle_cache:position(Hdl, FileSizeLimit),
  1036     ok = file_handle_cache:truncate(Hdl),
  1037     {ok, FinalPos} = file_handle_cache:position(Hdl, FinalPos),
  1038     ok.
  1039 
  1040 truncate_and_extend_file(Hdl, Lowpoint, Highpoint) ->
  1041     {ok, Lowpoint} = file_handle_cache:position(Hdl, Lowpoint),
  1042     ok = file_handle_cache:truncate(Hdl),
  1043     ok = preallocate(Hdl, Highpoint, Lowpoint).
  1044 
  1045 form_filename(Dir, Name) -> filename:join(Dir, Name).
  1046 
  1047 filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION.
  1048 
  1049 filename_to_num(FileName) -> list_to_integer(filename:rootname(FileName)).
  1050 
  1051 list_sorted_file_names(Dir, Ext) ->
  1052     lists:sort(fun (A, B) -> filename_to_num(A) < filename_to_num(B) end,
  1053                filelib:wildcard("*" ++ Ext, Dir)).
  1054 
  1055 %%----------------------------------------------------------------------------
  1056 %% message cache helper functions
  1057 %%----------------------------------------------------------------------------
  1058 
  1059 maybe_insert_into_cache(DedupCacheEts, RefCount, Guid, Msg)
  1060   when RefCount > 1 ->
  1061     update_msg_cache(DedupCacheEts, Guid, Msg);
  1062 maybe_insert_into_cache(_DedupCacheEts, _RefCount, _Guid, _Msg) ->
  1063     ok.
  1064 
  1065 update_msg_cache(CacheEts, Guid, Msg) ->
  1066     case ets:insert_new(CacheEts, {Guid, Msg, 1}) of
  1067         true  -> ok;
  1068         false -> safe_ets_update_counter_ok(
  1069                    CacheEts, Guid, {3, +1},
  1070                    fun () -> update_msg_cache(CacheEts, Guid, Msg) end)
  1071     end.
  1072 
  1073 remove_cache_entry(DedupCacheEts, Guid) ->
  1074     true = ets:delete(DedupCacheEts, Guid),
  1075     ok.
  1076 
  1077 fetch_and_increment_cache(DedupCacheEts, Guid) ->
  1078     case ets:lookup(DedupCacheEts, Guid) of
  1079         [] ->
  1080             not_found;
  1081         [{_Guid, Msg, _RefCount}] ->
  1082             safe_ets_update_counter_ok(
  1083               DedupCacheEts, Guid, {3, +1},
  1084               %% someone has deleted us in the meantime, insert us
  1085               fun () -> ok = update_msg_cache(DedupCacheEts, Guid, Msg) end),
  1086             Msg
  1087     end.
  1088 
  1089 decrement_cache(DedupCacheEts, Guid) ->
  1090     true = safe_ets_update_counter(
  1091              DedupCacheEts, Guid, {3, -1},
  1092              fun (N) when N =< 0 -> true = ets:delete(DedupCacheEts, Guid);
  1093                  (_N)            -> true
  1094              end,
  1095              %% Guid is not in there because although it's been
  1096              %% delivered, it's never actually been read (think:
  1097              %% persistent message held in RAM)
  1098              fun () -> true end),
  1099     ok.
  1100 
  1101 %%----------------------------------------------------------------------------
  1102 %% index
  1103 %%----------------------------------------------------------------------------
  1104 
  1105 index_lookup(Key, #client_msstate { index_module = Index,
  1106                                     index_state  = State }) ->
  1107     Index:lookup(Key, State);
  1108 
  1109 index_lookup(Key, #msstate { index_module = Index, index_state = State }) ->
  1110     Index:lookup(Key, State).
  1111 
  1112 index_insert(Obj, #msstate { index_module = Index, index_state = State }) ->
  1113     Index:insert(Obj, State).
  1114 
  1115 index_update(Obj, #msstate { index_module = Index, index_state = State }) ->
  1116     Index:update(Obj, State).
  1117 
  1118 index_update_fields(Key, Updates, #msstate { index_module = Index,
  1119                                              index_state  = State }) ->
  1120     Index:update_fields(Key, Updates, State).
  1121 
  1122 index_delete(Key, #msstate { index_module = Index, index_state = State }) ->
  1123     Index:delete(Key, State).
  1124 
  1125 index_delete_by_file(File, #msstate { index_module = Index,
  1126                                       index_state  = State }) ->
  1127     Index:delete_by_file(File, State).
  1128 
  1129 %%----------------------------------------------------------------------------
  1130 %% shutdown and recovery
  1131 %%----------------------------------------------------------------------------
  1132 
  1133 recover_index_and_client_refs(IndexModule, _Recover, undefined, Dir, _Server) ->
  1134     {false, IndexModule:new(Dir), sets:new()};
  1135 recover_index_and_client_refs(IndexModule, false, _ClientRefs, Dir, Server) ->
  1136     rabbit_log:warning("~w: rebuilding indices from scratch~n", [Server]),
  1137     {false, IndexModule:new(Dir), sets:new()};
  1138 recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Server) ->
  1139     Fresh = fun (ErrorMsg, ErrorArgs) ->
  1140                     rabbit_log:warning("~w: " ++ ErrorMsg ++ "~n"
  1141                                        "rebuilding indices from scratch~n",
  1142                                        [Server | ErrorArgs]),
  1143                     {false, IndexModule:new(Dir), sets:new()}
  1144             end,
  1145     case read_recovery_terms(Dir) of
  1146         {false, Error} ->
  1147             Fresh("failed to read recovery terms: ~p", [Error]);
  1148         {true, Terms} ->
  1149             RecClientRefs  = proplists:get_value(client_refs, Terms, []),
  1150             RecIndexModule = proplists:get_value(index_module, Terms),
  1151             case (lists:sort(ClientRefs) =:= lists:sort(RecClientRefs)
  1152                   andalso IndexModule =:= RecIndexModule) of
  1153                 true  -> case IndexModule:recover(Dir) of
  1154                              {ok, IndexState1} ->
  1155                                  {true, IndexState1,
  1156                                   sets:from_list(ClientRefs)};
  1157                              {error, Error} ->
  1158                                  Fresh("failed to recover index: ~p", [Error])
  1159                          end;
  1160                 false -> Fresh("recovery terms differ from present", [])
  1161             end
  1162     end.
  1163 
  1164 store_recovery_terms(Terms, Dir) ->
  1165     rabbit_misc:write_term_file(filename:join(Dir, ?CLEAN_FILENAME), Terms).
  1166 
  1167 read_recovery_terms(Dir) ->
  1168     Path = filename:join(Dir, ?CLEAN_FILENAME),
  1169     case rabbit_misc:read_term_file(Path) of
  1170         {ok, Terms}    -> case file:delete(Path) of
  1171                               ok             -> {true,  Terms};
  1172                               {error, Error} -> {false, Error}
  1173                           end;
  1174         {error, Error} -> {false, Error}
  1175     end.
  1176 
  1177 store_file_summary(Tid, Dir) ->
  1178     ok = ets:tab2file(Tid, filename:join(Dir, ?FILE_SUMMARY_FILENAME),
  1179                       [{extended_info, [object_count]}]).
  1180 
  1181 recover_file_summary(false, _Dir) ->
  1182     %% TODO: the only reason for this to be an *ordered*_set is so
  1183     %% that a) maybe_compact can start a traversal from the eldest
  1184     %% file, and b) build_index in fast recovery mode can easily
  1185     %% identify the current file. It's awkward to have both that
  1186     %% odering and the left/right pointers in the entries - replacing
  1187     %% the former with some additional bit of state would be easy, but
  1188     %% ditching the latter would be neater.
  1189     {false, ets:new(rabbit_msg_store_file_summary,
  1190                     [ordered_set, public, {keypos, #file_summary.file}])};
  1191 recover_file_summary(true, Dir) ->
  1192     Path = filename:join(Dir, ?FILE_SUMMARY_FILENAME),
  1193     case ets:file2tab(Path) of
  1194         {ok, Tid}       -> file:delete(Path),
  1195                           {true, Tid};
  1196         {error, _Error} -> recover_file_summary(false, Dir)
  1197     end.
  1198 
  1199 count_msg_refs(Gen, Seed, State) ->
  1200     case Gen(Seed) of
  1201         finished ->
  1202             ok;
  1203         {_Guid, 0, Next} ->
  1204             count_msg_refs(Gen, Next, State);
  1205         {Guid, Delta, Next} ->
  1206             ok = case index_lookup(Guid, State) of
  1207                      not_found ->
  1208                          index_insert(#msg_location { guid = Guid,
  1209                                                       file = undefined,
  1210                                                       ref_count = Delta },
  1211                                       State);
  1212                      #msg_location { ref_count = RefCount } = StoreEntry ->
  1213                          NewRefCount = RefCount + Delta,
  1214                          case NewRefCount of
  1215                              0 -> index_delete(Guid, State);
  1216                              _ -> index_update(StoreEntry #msg_location {
  1217                                                  ref_count = NewRefCount },
  1218                                                State)
  1219                          end
  1220                  end,
  1221             count_msg_refs(Gen, Next, State)
  1222     end.
  1223 
  1224 recover_crashed_compactions(Dir) ->
  1225     FileNames =    list_sorted_file_names(Dir, ?FILE_EXTENSION),
  1226     TmpFileNames = list_sorted_file_names(Dir, ?FILE_EXTENSION_TMP),
  1227     lists:foreach(
  1228       fun (TmpFileName) ->
  1229               NonTmpRelatedFileName =
  1230                   filename:rootname(TmpFileName) ++ ?FILE_EXTENSION,
  1231               true = lists:member(NonTmpRelatedFileName, FileNames),
  1232               ok = recover_crashed_compaction(
  1233                      Dir, TmpFileName, NonTmpRelatedFileName)
  1234       end, TmpFileNames),
  1235     TmpFileNames == [].
  1236 
  1237 recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName) ->
  1238     %% Because a msg can legitimately appear multiple times in the
  1239     %% same file, identifying the contents of the tmp file and where
  1240     %% they came from is non-trivial. If we are recovering a crashed
  1241     %% compaction then we will be rebuilding the index, which can cope
  1242     %% with duplicates appearing. Thus the simplest and safest thing
  1243     %% to do is to append the contents of the tmp file to its main
  1244     %% file.
  1245     {ok, TmpHdl}  = open_file(Dir, TmpFileName, ?READ_MODE),
  1246     {ok, MainHdl} = open_file(Dir, NonTmpRelatedFileName,
  1247                               ?READ_MODE ++ ?WRITE_MODE),
  1248     {ok, _End} = file_handle_cache:position(MainHdl, eof),
  1249     Size = filelib:file_size(form_filename(Dir, TmpFileName)),
  1250     {ok, Size} = file_handle_cache:copy(TmpHdl, MainHdl, Size),
  1251     ok = file_handle_cache:close(MainHdl),
  1252     ok = file_handle_cache:delete(TmpHdl),
  1253     ok.
  1254 
  1255 scan_file_for_valid_messages(Dir, FileName) ->
  1256     case open_file(Dir, FileName, ?READ_MODE) of
  1257         {ok, Hdl}       -> Valid = rabbit_msg_file:scan(
  1258                                      Hdl, filelib:file_size(
  1259                                             form_filename(Dir, FileName))),
  1260                            %% if something really bad has happened,
  1261                            %% the close could fail, but ignore
  1262                            file_handle_cache:close(Hdl),
  1263                            Valid;
  1264         {error, enoent} -> {ok, [], 0};
  1265         {error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}}
  1266     end.
  1267 
  1268 %% Takes the list in *ascending* order (i.e. eldest message
  1269 %% first). This is the opposite of what scan_file_for_valid_messages
  1270 %% produces. The list of msgs that is produced is youngest first.
  1271 find_contiguous_block_prefix(L) -> find_contiguous_block_prefix(L, 0, []).
  1272 
  1273 find_contiguous_block_prefix([], ExpectedOffset, Guids) ->
  1274     {ExpectedOffset, Guids};
  1275 find_contiguous_block_prefix([{Guid, TotalSize, ExpectedOffset} | Tail],
  1276                              ExpectedOffset, Guids) ->
  1277     ExpectedOffset1 = ExpectedOffset + TotalSize,
  1278     find_contiguous_block_prefix(Tail, ExpectedOffset1, [Guid | Guids]);
  1279 find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, Guids) ->
  1280     {ExpectedOffset, Guids}.
  1281 
  1282 build_index(true, _StartupFunState,
  1283             State = #msstate { file_summary_ets = FileSummaryEts }) ->
  1284     ets:foldl(
  1285       fun (#file_summary { valid_total_size = ValidTotalSize,
  1286                            file_size        = FileSize,
  1287                            file             = File },
  1288            {_Offset, State1 = #msstate { sum_valid_data = SumValid,
  1289                                          sum_file_size  = SumFileSize }}) ->
  1290               {FileSize, State1 #msstate {
  1291                            sum_valid_data = SumValid + ValidTotalSize,
  1292                            sum_file_size  = SumFileSize + FileSize,
  1293                            current_file   = File }}
  1294       end, {0, State}, FileSummaryEts);
  1295 build_index(false, {MsgRefDeltaGen, MsgRefDeltaGenInit},
  1296             State = #msstate { dir = Dir }) ->
  1297     ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State),
  1298     {ok, Pid} = gatherer:start_link(),
  1299     case [filename_to_num(FileName) ||
  1300              FileName <- list_sorted_file_names(Dir, ?FILE_EXTENSION)] of
  1301         []     -> build_index(Pid, undefined, [State #msstate.current_file],
  1302                               State);
  1303         Files  -> {Offset, State1} = build_index(Pid, undefined, Files, State),
  1304                   {Offset, lists:foldl(fun delete_file_if_empty/2,
  1305                                        State1, Files)}
  1306     end.
  1307 
  1308 build_index(Gatherer, Left, [],
  1309             State = #msstate { file_summary_ets = FileSummaryEts,
  1310                                sum_valid_data   = SumValid,
  1311                                sum_file_size    = SumFileSize }) ->
  1312     case gatherer:out(Gatherer) of
  1313         empty ->
  1314             ok = gatherer:stop(Gatherer),
  1315             ok = rabbit_misc:unlink_and_capture_exit(Gatherer),
  1316             ok = index_delete_by_file(undefined, State),
  1317             Offset = case ets:lookup(FileSummaryEts, Left) of
  1318                          []                                       -> 0;
  1319                          [#file_summary { file_size = FileSize }] -> FileSize
  1320                      end,
  1321             {Offset, State #msstate { current_file = Left }};
  1322         {value, #file_summary { valid_total_size = ValidTotalSize,
  1323                                 file_size = FileSize } = FileSummary} ->
  1324             true = ets:insert_new(FileSummaryEts, FileSummary),
  1325             build_index(Gatherer, Left, [],
  1326                         State #msstate {
  1327                           sum_valid_data = SumValid + ValidTotalSize,
  1328                           sum_file_size  = SumFileSize + FileSize })
  1329     end;
  1330 build_index(Gatherer, Left, [File|Files], State) ->
  1331     ok = gatherer:fork(Gatherer),
  1332     ok = worker_pool:submit_async(
  1333            fun () -> build_index_worker(Gatherer, State,
  1334                                         Left, File, Files)
  1335            end),
  1336     build_index(Gatherer, File, Files, State).
  1337 
  1338 build_index_worker(Gatherer, State = #msstate { dir = Dir },
  1339                    Left, File, Files) ->
  1340     {ok, Messages, FileSize} =
  1341         scan_file_for_valid_messages(Dir, filenum_to_name(File)),
  1342     {ValidMessages, ValidTotalSize} =
  1343         lists:foldl(
  1344           fun (Obj = {Guid, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
  1345                   case index_lookup(Guid, State) of
  1346                       #msg_location { file = undefined } = StoreEntry ->
  1347                           ok = index_update(StoreEntry #msg_location {
  1348                                               file = File, offset = Offset,
  1349                                               total_size = TotalSize },
  1350                                             State),
  1351                           {[Obj | VMAcc], VTSAcc + TotalSize};
  1352                       _ ->
  1353                           {VMAcc, VTSAcc}
  1354                   end
  1355           end, {[], 0}, Messages),
  1356     %% foldl reverses lists, find_contiguous_block_prefix needs
  1357     %% msgs eldest first, so, ValidMessages is the right way round
  1358     {ContiguousTop, _} = find_contiguous_block_prefix(ValidMessages),
  1359     {Right, FileSize1} =
  1360         case Files of
  1361             %% if it's the last file, we'll truncate to remove any
  1362             %% rubbish above the last valid message. This affects the
  1363             %% file size.
  1364             []    -> {undefined, case ValidMessages of
  1365                                      [] -> 0;
  1366                                      _  -> {_Guid, TotalSize, Offset} =
  1367                                                lists:last(ValidMessages),
  1368                                            Offset + TotalSize
  1369                                  end};
  1370             [F|_] -> {F, FileSize}
  1371         end,
  1372     ok = gatherer:in(Gatherer, #file_summary {
  1373                        file             = File,
  1374                        valid_total_size = ValidTotalSize,
  1375                        contiguous_top   = ContiguousTop,
  1376                        left             = Left,
  1377                        right            = Right,
  1378                        file_size        = FileSize1,
  1379                        locked           = false,
  1380                        readers          = 0 }),
  1381     ok = gatherer:finish(Gatherer).
  1382 
  1383 %%----------------------------------------------------------------------------
  1384 %% garbage collection / compaction / aggregation -- internal
  1385 %%----------------------------------------------------------------------------
  1386 
  1387 maybe_roll_to_new_file(
  1388   Offset,
  1389   State = #msstate { dir                 = Dir,
  1390                      current_file_handle = CurHdl,
  1391                      current_file        = CurFile,
  1392                      file_summary_ets    = FileSummaryEts,
  1393                      cur_file_cache_ets  = CurFileCacheEts,
  1394                      file_size_limit     = FileSizeLimit })
  1395   when Offset >= FileSizeLimit ->
  1396     State1 = internal_sync(State),
  1397     ok = file_handle_cache:close(CurHdl),
  1398     NextFile = CurFile + 1,
  1399     {ok, NextHdl} = open_file(Dir, filenum_to_name(NextFile), ?WRITE_MODE),
  1400     true = ets:insert_new(FileSummaryEts, #file_summary {
  1401                             file             = NextFile,
  1402                             valid_total_size = 0,
  1403                             contiguous_top   = 0,
  1404                             left             = CurFile,
  1405                             right            = undefined,
  1406                             file_size        = 0,
  1407                             locked           = false,
  1408                             readers          = 0 }),
  1409     true = ets:update_element(FileSummaryEts, CurFile,
  1410                               {#file_summary.right, NextFile}),
  1411     true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}),
  1412     maybe_compact(State1 #msstate { current_file_handle = NextHdl,
  1413                                     current_file        = NextFile });
  1414 maybe_roll_to_new_file(_, State) ->
  1415     State.
  1416 
  1417 maybe_compact(State = #msstate { sum_valid_data   = SumValid,
  1418                                  sum_file_size    = SumFileSize,
  1419                                  gc_active        = false,
  1420                                  gc_pid           = GCPid,
  1421                                  file_summary_ets = FileSummaryEts,
  1422                                  file_size_limit  = FileSizeLimit })
  1423   when (SumFileSize > 2 * FileSizeLimit andalso
  1424         (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION) ->
  1425     %% TODO: the algorithm here is sub-optimal - it may result in a
  1426     %% complete traversal of FileSummaryEts.
  1427     case ets:first(FileSummaryEts) of
  1428         '$end_of_table' ->
  1429             State;
  1430         First ->
  1431             case find_files_to_gc(FileSummaryEts, FileSizeLimit,
  1432                                   ets:lookup(FileSummaryEts, First)) of
  1433                 not_found ->
  1434                     State;
  1435                 {Src, Dst} ->
  1436                     State1 = close_handle(Src, close_handle(Dst, State)),
  1437                     true = ets:update_element(FileSummaryEts, Src,
  1438                                               {#file_summary.locked, true}),
  1439                     true = ets:update_element(FileSummaryEts, Dst,
  1440                                               {#file_summary.locked, true}),
  1441                     ok = rabbit_msg_store_gc:gc(GCPid, Src, Dst),
  1442                     State1 #msstate { gc_active = {Src, Dst} }
  1443             end
  1444     end;
  1445 maybe_compact(State) ->
  1446     State.
  1447 
  1448 find_files_to_gc(FileSummaryEts, FileSizeLimit,
  1449                  [#file_summary { file             = Dst,
  1450                                   valid_total_size = DstValid,
  1451                                   right            = Src }]) ->
  1452     case Src of
  1453         undefined ->
  1454             not_found;
  1455         _   ->
  1456             [#file_summary { file             = Src,
  1457                              valid_total_size = SrcValid,
  1458                              left             = Dst,
  1459                              right            = SrcRight }] = Next =
  1460                 ets:lookup(FileSummaryEts, Src),
  1461             case SrcRight of
  1462                 undefined -> not_found;
  1463                 _         -> case DstValid + SrcValid =< FileSizeLimit of
  1464                                  true  -> {Src, Dst};
  1465                                  false -> find_files_to_gc(
  1466                                             FileSummaryEts, FileSizeLimit, Next)
  1467                              end
  1468             end
  1469     end.
  1470 
  1471 delete_file_if_empty(File, State = #msstate { current_file = File }) ->
  1472     State;
  1473 delete_file_if_empty(File, State = #msstate {
  1474                              dir              = Dir,
  1475                              sum_file_size    = SumFileSize,
  1476                              file_handles_ets = FileHandlesEts,
  1477                              file_summary_ets = FileSummaryEts }) ->
  1478     [#file_summary { valid_total_size = ValidData,
  1479                      left             = Left,
  1480                      right            = Right,
  1481                      file_size        = FileSize,
  1482                      locked           = false }] =
  1483         ets:lookup(FileSummaryEts, File),
  1484     case ValidData of
  1485         %% we should NEVER find the current file in here hence right
  1486         %% should always be a file, not undefined
  1487         0 -> case {Left, Right} of
  1488                  {undefined, _} when Right =/= undefined ->
  1489                      %% the eldest file is empty.
  1490                      true = ets:update_element(
  1491                               FileSummaryEts, Right,
  1492                               {#file_summary.left, undefined});
  1493                  {_, _} when Right =/= undefined ->
  1494                      true = ets:update_element(FileSummaryEts, Right,
  1495                                                {#file_summary.left, Left}),
  1496                      true = ets:update_element(FileSummaryEts, Left,
  1497                                                {#file_summary.right, Right})
  1498              end,
  1499              true = mark_handle_to_close(FileHandlesEts, File),
  1500              true = ets:delete(FileSummaryEts, File),
  1501              State1 = close_handle(File, State),
  1502              ok = file:delete(form_filename(Dir, filenum_to_name(File))),
  1503              State1 #msstate { sum_file_size = SumFileSize - FileSize };
  1504         _ -> State
  1505     end.
  1506 
  1507 %%----------------------------------------------------------------------------
  1508 %% garbage collection / compaction / aggregation -- external
  1509 %%----------------------------------------------------------------------------
  1510 
  1511 gc(SrcFile, DstFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) ->
  1512     [SrcObj = #file_summary {
  1513        readers          = SrcReaders,
  1514        left             = DstFile,
  1515        file_size        = SrcFileSize,
  1516        locked           = true }] = ets:lookup(FileSummaryEts, SrcFile),
  1517     [DstObj = #file_summary {
  1518        readers          = DstReaders,
  1519        right            = SrcFile,
  1520        file_size        = DstFileSize,
  1521        locked           = true }] = ets:lookup(FileSummaryEts, DstFile),
  1522 
  1523     case SrcReaders =:= 0 andalso DstReaders =:= 0 of
  1524         true  -> TotalValidData = combine_files(SrcObj, DstObj, State),
  1525                  %% don't update dest.right, because it could be
  1526                  %% changing at the same time
  1527                  true = ets:update_element(
  1528                           FileSummaryEts, DstFile,
  1529                           [{#file_summary.valid_total_size, TotalValidData},
  1530                            {#file_summary.contiguous_top,   TotalValidData},
  1531                            {#file_summary.file_size,        TotalValidData}]),
  1532                  SrcFileSize + DstFileSize - TotalValidData;
  1533         false -> concurrent_readers
  1534     end.
  1535 
  1536 combine_files(#file_summary { file             = Source,
  1537                               valid_total_size = SourceValid,
  1538                               left             = Destination },
  1539               #file_summary { file             = Destination,
  1540                               valid_total_size = DestinationValid,
  1541                               contiguous_top   = DestinationContiguousTop,
  1542                               right            = Source },
  1543               State = {_FileSummaryEts, Dir, _Index, _IndexState}) ->
  1544     SourceName      = filenum_to_name(Source),
  1545     DestinationName = filenum_to_name(Destination),
  1546     {ok, SourceHdl}      = open_file(Dir, SourceName,
  1547                                      ?READ_AHEAD_MODE),
  1548     {ok, DestinationHdl} = open_file(Dir, DestinationName,
  1549                                      ?READ_AHEAD_MODE ++ ?WRITE_MODE),
  1550     ExpectedSize = SourceValid + DestinationValid,
  1551     %% if DestinationValid =:= DestinationContiguousTop then we don't
  1552     %% need a tmp file
  1553     %% if they're not equal, then we need to write out everything past
  1554     %%   the DestinationContiguousTop to a tmp file then truncate,
  1555     %%   copy back in, and then copy over from Source
  1556     %% otherwise we just truncate straight away and copy over from Source
  1557     case DestinationContiguousTop =:= DestinationValid of
  1558         true ->
  1559             ok = truncate_and_extend_file(
  1560                    DestinationHdl, DestinationContiguousTop, ExpectedSize);
  1561         false ->
  1562             {DestinationWorkList, DestinationValid} =
  1563                 find_unremoved_messages_in_file(Destination, State),
  1564             Worklist =
  1565                 lists:dropwhile(
  1566                   fun (#msg_location { offset = Offset })
  1567                       when Offset =/= DestinationContiguousTop ->
  1568                           %% it cannot be that Offset =:=
  1569                           %% DestinationContiguousTop because if it
  1570                           %% was then DestinationContiguousTop would
  1571                           %% have been extended by TotalSize
  1572                           Offset < DestinationContiguousTop
  1573                   end, DestinationWorkList),
  1574             Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
  1575             {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE),
  1576             ok = copy_messages(
  1577                    Worklist, DestinationContiguousTop, DestinationValid,
  1578                    DestinationHdl, TmpHdl, Destination, State),
  1579             TmpSize = DestinationValid - DestinationContiguousTop,
  1580             %% so now Tmp contains everything we need to salvage from
  1581             %% Destination, and index_state has been updated to
  1582             %% reflect the compaction of Destination so truncate
  1583             %% Destination and copy from Tmp back to the end
  1584             {ok, 0} = file_handle_cache:position(TmpHdl, 0),
  1585             ok = truncate_and_extend_file(
  1586                    DestinationHdl, DestinationContiguousTop, ExpectedSize),
  1587             {ok, TmpSize} =
  1588                 file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize),
  1589             %% position in DestinationHdl should now be DestinationValid
  1590             ok = file_handle_cache:sync(DestinationHdl),
  1591             ok = file_handle_cache:delete(TmpHdl)
  1592     end,
  1593     {SourceWorkList, SourceValid} =
  1594         find_unremoved_messages_in_file(Source, State),
  1595     ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize,
  1596                        SourceHdl, DestinationHdl, Destination, State),
  1597     %% tidy up
  1598     ok = file_handle_cache:close(DestinationHdl),
  1599     ok = file_handle_cache:delete(SourceHdl),
  1600     ExpectedSize.
  1601 
  1602 find_unremoved_messages_in_file(File,
  1603                                 {_FileSummaryEts, Dir, Index, IndexState}) ->
  1604     %% Messages here will be end-of-file at start-of-list
  1605     {ok, Messages, _FileSize} =
  1606         scan_file_for_valid_messages(Dir, filenum_to_name(File)),
  1607     %% foldl will reverse so will end up with msgs in ascending offset order
  1608     lists:foldl(fun ({Guid, TotalSize, Offset}, Acc = {List, Size}) ->
  1609                         case Index:lookup(Guid, IndexState) of
  1610                             #msg_location { file = File, total_size = TotalSize,
  1611                                             offset = Offset } = Entry ->
  1612                                 {[ Entry | List ], TotalSize + Size};
  1613                             _ ->
  1614                                 Acc
  1615                         end
  1616                 end, {[], 0}, Messages).
  1617 
  1618 copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
  1619               Destination, {_FileSummaryEts, _Dir, Index, IndexState}) ->
  1620     Copy = fun ({BlockStart, BlockEnd}) ->
  1621                    BSize = BlockEnd - BlockStart,
  1622                    {ok, BlockStart} =
  1623                        file_handle_cache:position(SourceHdl, BlockStart),
  1624                    {ok, BSize} =
  1625                        file_handle_cache:copy(SourceHdl, DestinationHdl, BSize)
  1626            end,
  1627     case
  1628         lists:foldl(
  1629           fun (#msg_location { guid = Guid, offset = Offset,
  1630                                total_size = TotalSize },
  1631                {CurOffset, Block = {BlockStart, BlockEnd}}) ->
  1632                   %% CurOffset is in the DestinationFile.
  1633                   %% Offset, BlockStart and BlockEnd are in the SourceFile
  1634                   %% update MsgLocation to reflect change of file and offset
  1635                   ok = Index:update_fields(Guid,
  1636                                            [{#msg_location.file, Destination},
  1637                                             {#msg_location.offset, CurOffset}],
  1638                                            IndexState),
  1639                   {CurOffset + TotalSize,
  1640                    case BlockEnd of
  1641                        undefined ->
  1642                            %% base case, called only for the first list elem
  1643                            {Offset, Offset + TotalSize};
  1644                        Offset ->
  1645                            %% extend the current block because the
  1646                            %% next msg follows straight on
  1647                            {BlockStart, BlockEnd + TotalSize};
  1648                        _ ->
  1649                            %% found a gap, so actually do the work for
  1650                            %% the previous block
  1651                            Copy(Block),
  1652                            {Offset, Offset + TotalSize}
  1653                    end}
  1654           end, {InitOffset, {undefined, undefined}}, WorkList) of
  1655         {FinalOffset, Block} ->
  1656             case WorkList of
  1657                 [] -> ok;
  1658                 _  -> Copy(Block), %% do the last remaining block
  1659                       ok = file_handle_cache:sync(DestinationHdl)
  1660             end;
  1661         {FinalOffsetZ, _Block} ->
  1662             {gc_error, [{expected, FinalOffset},
  1663                         {got, FinalOffsetZ},
  1664                         {destination, Destination}]}
  1665     end.