src/file_handle_cache.erl
author Matthias Radestock <matthias@rabbitmq.com>
Thu Jul 22 15:14:07 2010 +0100 (2010-07-22)
branchbug21673
changeset 4146 e3aa3c965c9b
parent 3269 23950ce9888b
parent 3933 54476bfe3634
child 4392 6991323b486f
permissions -rw-r--r--
replace use of dict with orddict for per-msg_store partitions
which is much faster for small dicts
     1 %%   The contents of this file are subject to the Mozilla Public License
     2 %%   Version 1.1 (the "License"); you may not use this file except in
     3 %%   compliance with the License. You may obtain a copy of the License at
     4 %%   http://www.mozilla.org/MPL/
     5 %%
     6 %%   Software distributed under the License is distributed on an "AS IS"
     7 %%   basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
     8 %%   License for the specific language governing rights and limitations
     9 %%   under the License.
    10 %%
    11 %%   The Original Code is RabbitMQ.
    12 %%
    13 %%   The Initial Developers of the Original Code are LShift Ltd,
    14 %%   Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
    15 %%
    16 %%   Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
    17 %%   Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
    18 %%   are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
    19 %%   Technologies LLC, and Rabbit Technologies Ltd.
    20 %%
    21 %%   Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
    22 %%   Ltd. Portions created by Cohesive Financial Technologies LLC are
    23 %%   Copyright (C) 2007-2010 Cohesive Financial Technologies
    24 %%   LLC. Portions created by Rabbit Technologies Ltd are Copyright
    25 %%   (C) 2007-2010 Rabbit Technologies Ltd.
    26 %%
    27 %%   All Rights Reserved.
    28 %%
    29 %%   Contributor(s): ______________________________________.
    30 %%
    31 
    32 -module(file_handle_cache).
    33 
    34 %% A File Handle Cache
    35 %%
    36 %% This extends a subset of the functionality of the Erlang file
    37 %% module.
    38 %%
    39 %% Some constraints
    40 %% 1) This supports one writer, multiple readers per file. Nothing
    41 %% else.
    42 %% 2) Do not open the same file from different processes. Bad things
    43 %% may happen.
    44 %% 3) Writes are all appends. You cannot write to the middle of a
    45 %% file, although you can truncate and then append if you want.
    46 %% 4) Although there is a write buffer, there is no read buffer. Feel
    47 %% free to use the read_ahead mode, but beware of the interaction
    48 %% between that buffer and the write buffer.
    49 %%
    50 %% Some benefits
    51 %% 1) You do not have to remember to call sync before close
    52 %% 2) Buffering is much more flexible than with plain file module, and
    53 %% you can control when the buffer gets flushed out. This means that
    54 %% you can rely on reads-after-writes working, without having to call
    55 %% the expensive sync.
    56 %% 3) Unnecessary calls to position and sync get optimised out.
    57 %% 4) You can find out what your 'real' offset is, and what your
    58 %% 'virtual' offset is (i.e. where the hdl really is, and where it
    59 %% would be after the write buffer is written out).
    60 %% 5) You can find out what the offset was when you last sync'd.
    61 %%
    62 %% There is also a server component which serves to limit the number
    63 %% of open file handles in a "soft" way - the server will never
    64 %% prevent a client from opening a handle, but may immediately tell it
    65 %% to close the handle. Thus you can set the limit to zero and it will
    66 %% still all work correctly, it is just that effectively no caching
    67 %% will take place. The operation of limiting is as follows:
    68 %%
    69 %% On open and close, the client sends messages to the server
    70 %% informing it of opens and closes. This allows the server to keep
    71 %% track of the number of open handles. The client also keeps a
    72 %% gb_tree which is updated on every use of a file handle, mapping the
    73 %% time at which the file handle was last used (timestamp) to the
    74 %% handle. Thus the smallest key in this tree maps to the file handle
    75 %% that has not been used for the longest amount of time. This
    76 %% smallest key is included in the messages to the server. As such,
    77 %% the server keeps track of when the least recently used file handle
    78 %% was used *at the point of the most recent open or close* by each
    79 %% client.
    80 %%
    81 %% Note that this data can go very out of date, by the client using
    82 %% the least recently used handle.
    83 %%
    84 %% When the limit is reached, the server calculates the average age of
    85 %% the last reported least recently used file handle of all the
    86 %% clients. It then tells all the clients to close any handles not
    87 %% used for longer than this average, by invoking the callback the
    88 %% client registered. The client should receive this message and pass
    89 %% it into set_maximum_since_use/1. However, it is highly possible
    90 %% this age will be greater than the ages of all the handles the
    91 %% client knows of because the client has used its file handles in the
    92 %% mean time. Thus at this point the client reports to the server the
    93 %% current timestamp at which its least recently used file handle was
    94 %% last used. The server will check two seconds later that either it
    95 %% is back under the limit, in which case all is well again, or if
    96 %% not, it will calculate a new average age. Its data will be much
    97 %% more recent now, and so it is very likely that when this is
    98 %% communicated to the clients, the clients will close file handles.
    99 %%
   100 %% The advantage of this scheme is that there is only communication
   101 %% from the client to the server on open, close, and when in the
   102 %% process of trying to reduce file handle usage. There is no
   103 %% communication from the client to the server on normal file handle
   104 %% operations. This scheme forms a feed-back loop - the server does
   105 %% not care which file handles are closed, just that some are, and it
   106 %% checks this repeatedly when over the limit. Given the guarantees of
   107 %% now(), even if there is just one file handle open, a limit of 1,
   108 %% and one client, it is certain that when the client calculates the
   109 %% age of the handle, it will be greater than when the server
   110 %% calculated it, hence it should be closed.
   111 %%
   112 %% Handles which are closed as a result of the server are put into a
   113 %% "soft-closed" state in which the handle is closed (data flushed out
   114 %% and sync'd first) but the state is maintained. The handle will be
   115 %% fully reopened again as soon as needed, thus users of this library
   116 %% do not need to worry about their handles being closed by the server
   117 %% - reopening them when necessary is handled transparently.
   118 %%
   119 %% The server also supports obtain and release_on_death. obtain/0
   120 %% blocks until a file descriptor is available. release_on_death/1
   121 %% takes a pid and monitors the pid, reducing the count by 1 when the
   122 %% pid dies. Thus the assumption is that obtain/0 is called first, and
   123 %% when that returns, release_on_death/1 is called with the pid who
   124 %% "owns" the file descriptor. This is, for example, used to track the
   125 %% use of file descriptors through network sockets.
   126 
   127 -behaviour(gen_server).
   128 
   129 -export([register_callback/3]).
   130 -export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1,
   131          last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1,
   132          flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]).
   133 -export([release_on_death/1, obtain/0]).
   134 
   135 -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
   136          terminate/2, code_change/3]).
   137 
   138 -define(SERVER, ?MODULE).
   139 -define(RESERVED_FOR_OTHERS, 100).
   140 -define(FILE_HANDLES_LIMIT_WINDOWS, 10000000).
   141 -define(FILE_HANDLES_LIMIT_OTHER, 1024).
   142 -define(FILE_HANDLES_CHECK_INTERVAL, 2000).
   143 
   144 %%----------------------------------------------------------------------------
   145 
   146 -record(file,
   147         { reader_count,
   148           has_writer
   149         }).
   150 
   151 -record(handle,
   152         { hdl,
   153           offset,
   154           trusted_offset,
   155           is_dirty,
   156           write_buffer_size,
   157           write_buffer_size_limit,
   158           write_buffer,
   159           at_eof,
   160           path,
   161           mode,
   162           options,
   163           is_write,
   164           is_read,
   165           last_used_at
   166         }).
   167 
   168 -record(fhc_state,
   169         { elders,
   170           limit,
   171           count,
   172           obtains,
   173           callbacks,
   174           client_mrefs,
   175           timer_ref
   176         }).
   177 
   178 %%----------------------------------------------------------------------------
   179 %% Specs
   180 %%----------------------------------------------------------------------------
   181 
   182 -ifdef(use_specs).
   183 
   184 -type(ref() :: any()).
   185 -type(ok_or_error() :: rabbit_types:ok_or_error(any())).
   186 -type(val_or_error(T) :: rabbit_types:ok_or_error2(T, any())).
   187 -type(position() :: ('bof' | 'eof' | non_neg_integer() |
   188                      {('bof' |'eof'), non_neg_integer()} |
   189                      {'cur', integer()})).
   190 -type(offset() :: non_neg_integer()).
   191 
   192 -spec(register_callback/3 :: (atom(), atom(), [any()]) -> 'ok').
   193 -spec(open/3 ::
   194         (string(), [any()],
   195          [{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')}])
   196         -> val_or_error(ref())).
   197 -spec(close/1 :: (ref()) -> ok_or_error()).
   198 -spec(read/2 :: (ref(), non_neg_integer()) ->
   199              val_or_error([char()] | binary()) | 'eof').
   200 -spec(append/2 :: (ref(), iodata()) -> ok_or_error()).
   201 -spec(sync/1 :: (ref()) ->  ok_or_error()).
   202 -spec(position/2 :: (ref(), position()) -> val_or_error(offset())).
   203 -spec(truncate/1 :: (ref()) -> ok_or_error()).
   204 -spec(last_sync_offset/1       :: (ref()) -> val_or_error(offset())).
   205 -spec(current_virtual_offset/1 :: (ref()) -> val_or_error(offset())).
   206 -spec(current_raw_offset/1     :: (ref()) -> val_or_error(offset())).
   207 -spec(flush/1 :: (ref()) -> ok_or_error()).
   208 -spec(copy/3 :: (ref(), ref(), non_neg_integer()) ->
   209              val_or_error(non_neg_integer())).
   210 -spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok').
   211 -spec(delete/1 :: (ref()) -> ok_or_error()).
   212 -spec(clear/1 :: (ref()) -> ok_or_error()).
   213 -spec(release_on_death/1 :: (pid()) -> 'ok').
   214 -spec(obtain/0 :: () -> 'ok').
   215 
   216 -endif.
   217 
   218 %%----------------------------------------------------------------------------
   219 %% Public API
   220 %%----------------------------------------------------------------------------
   221 
   222 start_link() ->
   223     gen_server:start_link({local, ?SERVER}, ?MODULE, [], [{timeout, infinity}]).
   224 
   225 register_callback(M, F, A)
   226   when is_atom(M) andalso is_atom(F) andalso is_list(A) ->
   227     gen_server:cast(?SERVER, {register_callback, self(), {M, F, A}}).
   228 
   229 open(Path, Mode, Options) ->
   230     Path1 = filename:absname(Path),
   231     File1 = #file { reader_count = RCount, has_writer = HasWriter } =
   232         case get({Path1, fhc_file}) of
   233             File = #file {} -> File;
   234             undefined       -> #file { reader_count = 0,
   235                                        has_writer = false }
   236         end,
   237     Mode1 = append_to_write(Mode),
   238     IsWriter = is_writer(Mode1),
   239     case IsWriter andalso HasWriter of
   240         true  -> {error, writer_exists};
   241         false -> Ref = make_ref(),
   242                  case open1(Path1, Mode1, Options, Ref, bof, new) of
   243                      {ok, _Handle} ->
   244                          RCount1 = case is_reader(Mode1) of
   245                                        true  -> RCount + 1;
   246                                        false -> RCount
   247                                    end,
   248                          HasWriter1 = HasWriter orelse IsWriter,
   249                          put({Path1, fhc_file},
   250                              File1 #file { reader_count = RCount1,
   251                                            has_writer = HasWriter1 }),
   252                          {ok, Ref};
   253                      Error ->
   254                          Error
   255                  end
   256     end.
   257 
   258 close(Ref) ->
   259     case erase({Ref, fhc_handle}) of
   260         undefined -> ok;
   261         Handle    -> case hard_close(Handle) of
   262                          ok               -> ok;
   263                          {Error, Handle1} -> put_handle(Ref, Handle1),
   264                                              Error
   265                      end
   266     end.
   267 
   268 read(Ref, Count) ->
   269     with_flushed_handles(
   270       [Ref],
   271       fun ([#handle { is_read = false }]) ->
   272               {error, not_open_for_reading};
   273           ([Handle = #handle { hdl = Hdl, offset = Offset }]) ->
   274               case file:read(Hdl, Count) of
   275                   {ok, Data} = Obj -> Offset1 = Offset + iolist_size(Data),
   276                                       {Obj,
   277                                        [Handle #handle { offset = Offset1 }]};
   278                   eof              -> {eof, [Handle #handle { at_eof = true }]};
   279                   Error            -> {Error, [Handle]}
   280               end
   281       end).
   282 
   283 append(Ref, Data) ->
   284     with_handles(
   285       [Ref],
   286       fun ([#handle { is_write = false }]) ->
   287               {error, not_open_for_writing};
   288           ([Handle]) ->
   289               case maybe_seek(eof, Handle) of
   290                   {{ok, _Offset}, #handle { hdl = Hdl, offset = Offset,
   291                                             write_buffer_size_limit = 0,
   292                                             at_eof = true } = Handle1} ->
   293                       Offset1 = Offset + iolist_size(Data),
   294                       {file:write(Hdl, Data),
   295                        [Handle1 #handle { is_dirty = true, offset = Offset1 }]};
   296                   {{ok, _Offset}, #handle { write_buffer = WriteBuffer,
   297                                             write_buffer_size = Size,
   298                                             write_buffer_size_limit = Limit,
   299                                             at_eof = true } = Handle1} ->
   300                       WriteBuffer1 = [Data | WriteBuffer],
   301                       Size1 = Size + iolist_size(Data),
   302                       Handle2 = Handle1 #handle { write_buffer = WriteBuffer1,
   303                                                   write_buffer_size = Size1 },
   304                       case Limit /= infinity andalso Size1 > Limit of
   305                           true  -> {Result, Handle3} = write_buffer(Handle2),
   306                                    {Result, [Handle3]};
   307                           false -> {ok, [Handle2]}
   308                       end;
   309                   {{error, _} = Error, Handle1} ->
   310                       {Error, [Handle1]}
   311               end
   312       end).
   313 
   314 sync(Ref) ->
   315     with_flushed_handles(
   316       [Ref],
   317       fun ([#handle { is_dirty = false, write_buffer = [] }]) ->
   318               ok;
   319           ([Handle = #handle { hdl = Hdl, offset = Offset,
   320                                is_dirty = true, write_buffer = [] }]) ->
   321               case file:sync(Hdl) of
   322                   ok    -> {ok, [Handle #handle { trusted_offset = Offset,
   323                                                   is_dirty = false }]};
   324                   Error -> {Error, [Handle]}
   325               end
   326       end).
   327 
   328 position(Ref, NewOffset) ->
   329     with_flushed_handles(
   330       [Ref],
   331       fun ([Handle]) -> {Result, Handle1} = maybe_seek(NewOffset, Handle),
   332                         {Result, [Handle1]}
   333       end).
   334 
   335 truncate(Ref) ->
   336     with_flushed_handles(
   337       [Ref],
   338       fun ([Handle1 = #handle { hdl = Hdl, offset = Offset,
   339                                 trusted_offset = TOffset }]) ->
   340               case file:truncate(Hdl) of
   341                   ok    -> TOffset1 = lists:min([Offset, TOffset]),
   342                            {ok, [Handle1 #handle { trusted_offset = TOffset1,
   343                                                    at_eof = true }]};
   344                   Error -> {Error, [Handle1]}
   345               end
   346       end).
   347 
   348 last_sync_offset(Ref) ->
   349     with_handles([Ref], fun ([#handle { trusted_offset = TOffset }]) ->
   350                                 {ok, TOffset}
   351                         end).
   352 
   353 current_virtual_offset(Ref) ->
   354     with_handles([Ref], fun ([#handle { at_eof = true, is_write = true,
   355                                         offset = Offset,
   356                                         write_buffer_size = Size }]) ->
   357                                 {ok, Offset + Size};
   358                             ([#handle { offset = Offset }]) ->
   359                                 {ok, Offset}
   360                         end).
   361 
   362 current_raw_offset(Ref) ->
   363     with_handles([Ref], fun ([Handle]) -> {ok, Handle #handle.offset} end).
   364 
   365 flush(Ref) ->
   366     with_flushed_handles([Ref], fun ([Handle]) -> {ok, [Handle]} end).
   367 
   368 copy(Src, Dest, Count) ->
   369     with_flushed_handles(
   370       [Src, Dest],
   371       fun ([SHandle = #handle { is_read  = true, hdl = SHdl, offset = SOffset },
   372             DHandle = #handle { is_write = true, hdl = DHdl, offset = DOffset }]
   373           ) ->
   374               case file:copy(SHdl, DHdl, Count) of
   375                   {ok, Count1} = Result1 ->
   376                       {Result1,
   377                        [SHandle #handle { offset = SOffset + Count1 },
   378                         DHandle #handle { offset = DOffset + Count1 }]};
   379                   Error ->
   380                       {Error, [SHandle, DHandle]}
   381               end;
   382           (_Handles) ->
   383               {error, incorrect_handle_modes}
   384       end).
   385 
   386 delete(Ref) ->
   387     case erase({Ref, fhc_handle}) of
   388         undefined ->
   389             ok;
   390         Handle = #handle { path = Path } ->
   391             case hard_close(Handle #handle { is_dirty = false,
   392                                              write_buffer = [] }) of
   393                 ok               -> file:delete(Path);
   394                 {Error, Handle1} -> put_handle(Ref, Handle1),
   395                                     Error
   396             end
   397     end.
   398 
   399 clear(Ref) ->
   400     with_handles(
   401       [Ref],
   402       fun ([#handle { at_eof = true, write_buffer_size = 0, offset = 0 }]) ->
   403               ok;
   404           ([Handle]) ->
   405               case maybe_seek(bof, Handle #handle { write_buffer = [],
   406                                                     write_buffer_size = 0 }) of
   407                   {{ok, 0}, Handle1 = #handle { hdl = Hdl }} ->
   408                       case file:truncate(Hdl) of
   409                           ok    -> {ok, [Handle1 #handle {trusted_offset = 0,
   410                                                           at_eof = true }]};
   411                           Error -> {Error, [Handle1]}
   412                       end;
   413                   {{error, _} = Error, Handle1} ->
   414                       {Error, [Handle1]}
   415               end
   416       end).
   417 
   418 set_maximum_since_use(MaximumAge) ->
   419     Now = now(),
   420     case lists:foldl(
   421            fun ({{Ref, fhc_handle},
   422                  Handle = #handle { hdl = Hdl, last_used_at = Then }}, Rep) ->
   423                    Age = timer:now_diff(Now, Then),
   424                    case Hdl /= closed andalso Age >= MaximumAge of
   425                        true  -> {Res, Handle1} = soft_close(Handle),
   426                                 case Res of
   427                                     ok -> put({Ref, fhc_handle}, Handle1),
   428                                           false;
   429                                     _  -> put_handle(Ref, Handle1),
   430                                           Rep
   431                                 end;
   432                        false -> Rep
   433                    end;
   434                (_KeyValuePair, Rep) ->
   435                    Rep
   436            end, true, get()) of
   437         true  -> age_tree_change(), ok;
   438         false -> ok
   439     end.
   440 
   441 release_on_death(Pid) when is_pid(Pid) ->
   442     gen_server:cast(?SERVER, {release_on_death, Pid}).
   443 
   444 obtain() ->
   445     gen_server:call(?SERVER, obtain, infinity).
   446 
   447 %%----------------------------------------------------------------------------
   448 %% Internal functions
   449 %%----------------------------------------------------------------------------
   450 
   451 is_reader(Mode) -> lists:member(read, Mode).
   452 
   453 is_writer(Mode) -> lists:member(write, Mode).
   454 
   455 append_to_write(Mode) ->
   456     case lists:member(append, Mode) of
   457         true  -> [write | Mode -- [append, write]];
   458         false -> Mode
   459     end.
   460 
   461 with_handles(Refs, Fun) ->
   462     ResHandles = lists:foldl(
   463                    fun (Ref, {ok, HandlesAcc}) ->
   464                            case get_or_reopen(Ref) of
   465                                {ok, Handle} -> {ok, [Handle | HandlesAcc]};
   466                                Error        -> Error
   467                            end;
   468                        (_Ref, Error) ->
   469                            Error
   470                    end, {ok, []}, Refs),
   471     case ResHandles of
   472         {ok, Handles} ->
   473             case Fun(lists:reverse(Handles)) of
   474                 {Result, Handles1} when is_list(Handles1) ->
   475                     lists:zipwith(fun put_handle/2, Refs, Handles1),
   476                     Result;
   477                 Result ->
   478                     Result
   479             end;
   480         Error ->
   481             Error
   482     end.
   483 
   484 with_flushed_handles(Refs, Fun) ->
   485     with_handles(
   486       Refs,
   487       fun (Handles) ->
   488               case lists:foldl(
   489                      fun (Handle, {ok, HandlesAcc}) ->
   490                              {Res, Handle1} = write_buffer(Handle),
   491                              {Res, [Handle1 | HandlesAcc]};
   492                          (Handle, {Error, HandlesAcc}) ->
   493                              {Error, [Handle | HandlesAcc]}
   494                      end, {ok, []}, Handles) of
   495                   {ok, Handles1} ->
   496                       Fun(lists:reverse(Handles1));
   497                   {Error, Handles1} ->
   498                       {Error, lists:reverse(Handles1)}
   499               end
   500       end).
   501 
   502 get_or_reopen(Ref) ->
   503     case get({Ref, fhc_handle}) of
   504         undefined ->
   505             {error, not_open, Ref};
   506         #handle { hdl = closed, offset = Offset,
   507                   path = Path, mode = Mode, options = Options } ->
   508             open1(Path, Mode, Options, Ref, Offset, reopen);
   509         Handle ->
   510             {ok, Handle}
   511     end.
   512 
   513 put_handle(Ref, Handle = #handle { last_used_at = Then }) ->
   514     Now = now(),
   515     age_tree_update(Then, Now, Ref),
   516     put({Ref, fhc_handle}, Handle #handle { last_used_at = Now }).
   517 
   518 with_age_tree(Fun) ->
   519     put(fhc_age_tree, Fun(case get(fhc_age_tree) of
   520                               undefined -> gb_trees:empty();
   521                               AgeTree   -> AgeTree
   522                           end)).
   523 
   524 age_tree_insert(Now, Ref) ->
   525     with_age_tree(
   526       fun (Tree) ->
   527               Tree1 = gb_trees:insert(Now, Ref, Tree),
   528               {Oldest, _Ref} = gb_trees:smallest(Tree1),
   529               gen_server:cast(?SERVER, {open, self(), Oldest}),
   530               Tree1
   531       end).
   532 
   533 age_tree_update(Then, Now, Ref) ->
   534     with_age_tree(
   535       fun (Tree) ->
   536               gb_trees:insert(Now, Ref, gb_trees:delete_any(Then, Tree))
   537       end).
   538 
   539 age_tree_delete(Then) ->
   540     with_age_tree(
   541       fun (Tree) ->
   542               Tree1 = gb_trees:delete_any(Then, Tree),
   543               Oldest = case gb_trees:is_empty(Tree1) of
   544                            true ->
   545                                undefined;
   546                            false ->
   547                                {Oldest1, _Ref} = gb_trees:smallest(Tree1),
   548                                Oldest1
   549                        end,
   550               gen_server:cast(?SERVER, {close, self(), Oldest}),
   551               Tree1
   552       end).
   553 
   554 age_tree_change() ->
   555     with_age_tree(
   556       fun (Tree) ->
   557               case gb_trees:is_empty(Tree) of
   558                   true  -> Tree;
   559                   false -> {Oldest, _Ref} = gb_trees:smallest(Tree),
   560                            gen_server:cast(?SERVER, {update, self(), Oldest})
   561               end,
   562               Tree
   563       end).
   564 
   565 open1(Path, Mode, Options, Ref, Offset, NewOrReopen) ->
   566     Mode1 = case NewOrReopen of
   567                 new    -> Mode;
   568                 reopen -> [read | Mode]
   569             end,
   570     case file:open(Path, Mode1) of
   571         {ok, Hdl} ->
   572             WriteBufferSize =
   573                 case proplists:get_value(write_buffer, Options, unbuffered) of
   574                     unbuffered           -> 0;
   575                     infinity             -> infinity;
   576                     N when is_integer(N) -> N
   577                 end,
   578             Now = now(),
   579             Handle = #handle { hdl                     = Hdl,
   580                                offset                  = 0,
   581                                trusted_offset          = 0,
   582                                is_dirty                = false,
   583                                write_buffer_size       = 0,
   584                                write_buffer_size_limit = WriteBufferSize,
   585                                write_buffer            = [],
   586                                at_eof                  = false,
   587                                path                    = Path,
   588                                mode                    = Mode,
   589                                options                 = Options,
   590                                is_write                = is_writer(Mode),
   591                                is_read                 = is_reader(Mode),
   592                                last_used_at            = Now },
   593             {{ok, Offset1}, Handle1} = maybe_seek(Offset, Handle),
   594             Handle2 = Handle1 #handle { trusted_offset = Offset1 },
   595             put({Ref, fhc_handle}, Handle2),
   596             age_tree_insert(Now, Ref),
   597             {ok, Handle2};
   598         {error, Reason} ->
   599             {error, Reason}
   600     end.
   601 
   602 soft_close(Handle = #handle { hdl = closed }) ->
   603     {ok, Handle};
   604 soft_close(Handle) ->
   605     case write_buffer(Handle) of
   606         {ok, #handle { hdl = Hdl, offset = Offset, is_dirty = IsDirty,
   607                        last_used_at = Then } = Handle1 } ->
   608             ok = case IsDirty of
   609                      true  -> file:sync(Hdl);
   610                      false -> ok
   611                  end,
   612             ok = file:close(Hdl),
   613             age_tree_delete(Then),
   614             {ok, Handle1 #handle { hdl = closed, trusted_offset = Offset,
   615                                    is_dirty = false }};
   616         {_Error, _Handle} = Result ->
   617             Result
   618     end.
   619 
   620 hard_close(Handle) ->
   621     case soft_close(Handle) of
   622         {ok, #handle { path = Path,
   623                        is_read = IsReader, is_write = IsWriter }} ->
   624             #file { reader_count = RCount, has_writer = HasWriter } = File =
   625                 get({Path, fhc_file}),
   626             RCount1 = case IsReader of
   627                           true  -> RCount - 1;
   628                           false -> RCount
   629                       end,
   630             HasWriter1 = HasWriter andalso not IsWriter,
   631             case RCount1 =:= 0 andalso not HasWriter1 of
   632                 true  -> erase({Path, fhc_file});
   633                 false -> put({Path, fhc_file},
   634                              File #file { reader_count = RCount1,
   635                                           has_writer = HasWriter1 })
   636             end,
   637             ok;
   638         {_Error, _Handle} = Result ->
   639             Result
   640     end.
   641 
   642 maybe_seek(NewOffset, Handle = #handle { hdl = Hdl, offset = Offset,
   643                                          at_eof = AtEoF }) ->
   644     {AtEoF1, NeedsSeek} = needs_seek(AtEoF, Offset, NewOffset),
   645     case (case NeedsSeek of
   646               true  -> file:position(Hdl, NewOffset);
   647               false -> {ok, Offset}
   648           end) of
   649         {ok, Offset1} = Result ->
   650             {Result, Handle #handle { offset = Offset1, at_eof = AtEoF1 }};
   651         {error, _} = Error ->
   652             {Error, Handle}
   653     end.
   654 
   655 needs_seek( AtEoF, _CurOffset,  cur     ) -> {AtEoF, false};
   656 needs_seek( AtEoF, _CurOffset,  {cur, 0}) -> {AtEoF, false};
   657 needs_seek(  true, _CurOffset,  eof     ) -> {true , false};
   658 needs_seek(  true, _CurOffset,  {eof, 0}) -> {true , false};
   659 needs_seek( false, _CurOffset,  eof     ) -> {true , true };
   660 needs_seek( false, _CurOffset,  {eof, 0}) -> {true , true };
   661 needs_seek( AtEoF,          0,  bof     ) -> {AtEoF, false};
   662 needs_seek( AtEoF,          0,  {bof, 0}) -> {AtEoF, false};
   663 needs_seek( AtEoF,  CurOffset, CurOffset) -> {AtEoF, false};
   664 needs_seek(  true,  CurOffset, {bof, DesiredOffset})
   665   when DesiredOffset >= CurOffset ->
   666     {true, true};
   667 needs_seek(  true, _CurOffset, {cur, DesiredOffset})
   668   when DesiredOffset > 0 ->
   669     {true, true};
   670 needs_seek(  true,  CurOffset, DesiredOffset) %% same as {bof, DO}
   671   when is_integer(DesiredOffset) andalso DesiredOffset >= CurOffset ->
   672     {true, true};
   673 %% because we can't really track size, we could well end up at EoF and not know
   674 needs_seek(_AtEoF, _CurOffset, _DesiredOffset) ->
   675     {false, true}.
   676 
   677 write_buffer(Handle = #handle { write_buffer = [] }) ->
   678     {ok, Handle};
   679 write_buffer(Handle = #handle { hdl = Hdl, offset = Offset,
   680                                 write_buffer = WriteBuffer,
   681                                 write_buffer_size = DataSize,
   682                                 at_eof = true }) ->
   683     case file:write(Hdl, lists:reverse(WriteBuffer)) of
   684         ok ->
   685             Offset1 = Offset + DataSize,
   686             {ok, Handle #handle { offset = Offset1, is_dirty = true,
   687                                   write_buffer = [], write_buffer_size = 0 }};
   688         {error, _} = Error ->
   689             {Error, Handle}
   690     end.
   691 
   692 %%----------------------------------------------------------------------------
   693 %% gen_server callbacks
   694 %%----------------------------------------------------------------------------
   695 
   696 init([]) ->
   697     Limit = case application:get_env(file_handles_high_watermark) of
   698                 {ok, Watermark} when (is_integer(Watermark) andalso
   699                                       Watermark > 0) ->
   700                     Watermark;
   701                 _ ->
   702                     ulimit()
   703             end,
   704     error_logger:info_msg("Limiting to approx ~p file handles~n", [Limit]),
   705     {ok, #fhc_state { elders = dict:new(), limit = Limit, count = 0,
   706                       obtains = [], callbacks = dict:new(),
   707                       client_mrefs = dict:new(), timer_ref = undefined }}.
   708 
   709 handle_call(obtain, From, State = #fhc_state { count = Count }) ->
   710     State1 = #fhc_state { count = Count1, limit = Limit, obtains = Obtains } =
   711         maybe_reduce(State #fhc_state { count = Count + 1 }),
   712     case Limit /= infinity andalso Count1 >= Limit of
   713         true  -> {noreply, State1 #fhc_state { obtains = [From | Obtains],
   714                                                count = Count1 - 1 }};
   715         false -> {reply, ok, State1}
   716     end.
   717 
   718 handle_cast({register_callback, Pid, MFA},
   719             State = #fhc_state { callbacks = Callbacks }) ->
   720     {noreply, ensure_mref(
   721                 Pid, State #fhc_state {
   722                        callbacks = dict:store(Pid, MFA, Callbacks) })};
   723 
   724 handle_cast({open, Pid, EldestUnusedSince}, State =
   725             #fhc_state { elders = Elders, count = Count }) ->
   726     Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
   727     {noreply, maybe_reduce(
   728                 ensure_mref(Pid, State #fhc_state { elders = Elders1,
   729                                                     count = Count + 1 }))};
   730 
   731 handle_cast({update, Pid, EldestUnusedSince}, State =
   732             #fhc_state { elders = Elders }) ->
   733     Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
   734     %% don't call maybe_reduce from here otherwise we can create a
   735     %% storm of messages
   736     {noreply, ensure_mref(Pid, State #fhc_state { elders = Elders1 })};
   737 
   738 handle_cast({close, Pid, EldestUnusedSince}, State =
   739             #fhc_state { elders = Elders, count = Count }) ->
   740     Elders1 = case EldestUnusedSince of
   741                   undefined -> dict:erase(Pid, Elders);
   742                   _         -> dict:store(Pid, EldestUnusedSince, Elders)
   743               end,
   744     {noreply, process_obtains(
   745                 ensure_mref(Pid, State #fhc_state { elders = Elders1,
   746                                                     count = Count - 1 }))};
   747 
   748 handle_cast(check_counts, State) ->
   749     {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })};
   750 
   751 handle_cast({release_on_death, Pid}, State) ->
   752     _MRef = erlang:monitor(process, Pid),
   753     {noreply, State}.
   754 
   755 handle_info({'DOWN', MRef, process, Pid, _Reason}, State =
   756                 #fhc_state { count = Count, callbacks = Callbacks,
   757                              client_mrefs = ClientMRefs, elders = Elders }) ->
   758     {noreply, process_obtains(
   759                 case dict:find(Pid, ClientMRefs) of
   760                     {ok, MRef} -> State #fhc_state {
   761                                     elders       = dict:erase(Pid, Elders),
   762                                     client_mrefs = dict:erase(Pid, ClientMRefs),
   763                                     callbacks    = dict:erase(Pid, Callbacks) };
   764                     _          -> State #fhc_state { count = Count - 1 }
   765                 end)}.
   766 
   767 terminate(_Reason, State) ->
   768     State.
   769 
   770 code_change(_OldVsn, State, _Extra) ->
   771     {ok, State}.
   772 
   773 %%----------------------------------------------------------------------------
   774 %% server helpers
   775 %%----------------------------------------------------------------------------
   776 
   777 process_obtains(State = #fhc_state { obtains = [] }) ->
   778     State;
   779 process_obtains(State = #fhc_state { limit = Limit, count = Count })
   780   when Limit /= infinity andalso Count >= Limit ->
   781     State;
   782 process_obtains(State = #fhc_state { limit = Limit, count = Count,
   783                                      obtains = Obtains }) ->
   784     ObtainsLen = length(Obtains),
   785     ObtainableLen = lists:min([ObtainsLen, Limit - Count]),
   786     Take = ObtainsLen - ObtainableLen,
   787     {ObtainsNew, ObtainableRev} = lists:split(Take, Obtains),
   788     [gen_server:reply(From, ok) || From <- ObtainableRev],
   789     State #fhc_state { count = Count + ObtainableLen, obtains = ObtainsNew }.
   790 
   791 maybe_reduce(State = #fhc_state { limit = Limit, count = Count, elders = Elders,
   792                                   callbacks = Callbacks, timer_ref = TRef })
   793   when Limit /= infinity andalso Count >= Limit ->
   794     Now = now(),
   795     {Pids, Sum, ClientCount} =
   796         dict:fold(fun (_Pid, undefined, Accs) ->
   797                           Accs;
   798                       (Pid, Eldest, {PidsAcc, SumAcc, CountAcc}) ->
   799                           {[Pid|PidsAcc], SumAcc + timer:now_diff(Now, Eldest),
   800                            CountAcc + 1}
   801                   end, {[], 0, 0}, Elders),
   802     case Pids of
   803         [] -> ok;
   804         _  -> AverageAge = Sum / ClientCount,
   805               lists:foreach(
   806                 fun (Pid) ->
   807                         case dict:find(Pid, Callbacks) of
   808                             error           -> ok;
   809                             {ok, {M, F, A}} -> apply(M, F, A ++ [AverageAge])
   810                         end
   811                 end, Pids)
   812     end,
   813     case TRef of
   814         undefined -> {ok, TRef1} = timer:apply_after(
   815                                      ?FILE_HANDLES_CHECK_INTERVAL,
   816                                      gen_server, cast, [?SERVER, check_counts]),
   817                      State #fhc_state { timer_ref = TRef1 };
   818         _         -> State
   819     end;
   820 maybe_reduce(State) ->
   821     State.
   822 
   823 %% Googling around suggests that Windows has a limit somewhere around
   824 %% 16M, eg
   825 %% http://blogs.technet.com/markrussinovich/archive/2009/09/29/3283844.aspx
   826 %% For everything else, assume ulimit exists. Further googling
   827 %% suggests that BSDs (incl OS X), solaris and linux all agree that
   828 %% ulimit -n is file handles
   829 ulimit() ->
   830     case os:type() of
   831         {win32, _OsName} ->
   832             ?FILE_HANDLES_LIMIT_WINDOWS;
   833         {unix, _OsName} ->
   834             %% Under Linux, Solaris and FreeBSD, ulimit is a shell
   835             %% builtin, not a command. In OS X, it's a command.
   836             %% Fortunately, os:cmd invokes the cmd in a shell env, so
   837             %% we're safe in all cases.
   838             case os:cmd("ulimit -n") of
   839                 "unlimited" ->
   840                     infinity;
   841                 String = [C|_] when $0 =< C andalso C =< $9 ->
   842                     Num = list_to_integer(
   843                             lists:takewhile(
   844                               fun (D) -> $0 =< D andalso D =< $9 end, String)) -
   845                         ?RESERVED_FOR_OTHERS,
   846                     lists:max([1, Num]);
   847                 _ ->
   848                     %% probably a variant of
   849                     %% "/bin/sh: line 1: ulimit: command not found\n"
   850                     ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS
   851             end;
   852         _ ->
   853             ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS
   854     end.
   855 
   856 ensure_mref(Pid, State = #fhc_state { client_mrefs = ClientMRefs }) ->
   857     case dict:find(Pid, ClientMRefs) of
   858         {ok, _MRef} -> State;
   859         error       -> MRef = erlang:monitor(process, Pid),
   860                        State #fhc_state {
   861                          client_mrefs = dict:store(Pid, MRef, ClientMRefs) }
   862     end.