src/file_handle_cache.erl
author Francesco Mazzoli <francesco@rabbitmq.com>
Wed May 16 18:04:59 2012 +0100 (10 hours ago)
branchbug24919
changeset 9571 1b5f4944b13e
parent 8957 686c4565ee7b
parent 8945 6333f2c2663c
permissions -rw-r--r--
added checks to `rabbit_misc:get_options/3'.

See comments on top of the new definition (now its arity is 4).
I still need to update the tests.
     1 %% The contents of this file are subject to the Mozilla Public License
     2 %% Version 1.1 (the "License"); you may not use this file except in
     3 %% compliance with the License. You may obtain a copy of the License
     4 %% at http://www.mozilla.org/MPL/
     5 %%
     6 %% Software distributed under the License is distributed on an "AS IS"
     7 %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
     8 %% the License for the specific language governing rights and
     9 %% limitations under the License.
    10 %%
    11 %% The Original Code is RabbitMQ.
    12 %%
    13 %% The Initial Developer of the Original Code is VMware, Inc.
    14 %% Copyright (c) 2007-2012 VMware, Inc.  All rights reserved.
    15 %%
    16 
    17 -module(file_handle_cache).
    18 
    19 %% A File Handle Cache
    20 %%
    21 %% This extends a subset of the functionality of the Erlang file
    22 %% module. In the below, we use "file handle" to specifically refer to
    23 %% file handles, and "file descriptor" to refer to descriptors which
    24 %% are not file handles, e.g. sockets.
    25 %%
    26 %% Some constraints
    27 %% 1) This supports one writer, multiple readers per file. Nothing
    28 %% else.
    29 %% 2) Do not open the same file from different processes. Bad things
    30 %% may happen, especially for writes.
    31 %% 3) Writes are all appends. You cannot write to the middle of a
    32 %% file, although you can truncate and then append if you want.
    33 %% 4) Although there is a write buffer, there is no read buffer. Feel
    34 %% free to use the read_ahead mode, but beware of the interaction
    35 %% between that buffer and the write buffer.
    36 %%
    37 %% Some benefits
    38 %% 1) You do not have to remember to call sync before close
    39 %% 2) Buffering is much more flexible than with the plain file module,
    40 %% and you can control when the buffer gets flushed out. This means
    41 %% that you can rely on reads-after-writes working, without having to
    42 %% call the expensive sync.
    43 %% 3) Unnecessary calls to position and sync get optimised out.
    44 %% 4) You can find out what your 'real' offset is, and what your
    45 %% 'virtual' offset is (i.e. where the hdl really is, and where it
    46 %% would be after the write buffer is written out).
    47 %%
    48 %% There is also a server component which serves to limit the number
    49 %% of open file descriptors. This is a hard limit: the server
    50 %% component will ensure that clients do not have more file
    51 %% descriptors open than it's configured to allow.
    52 %%
    53 %% On open, the client requests permission from the server to open the
    54 %% required number of file handles. The server may ask the client to
    55 %% close other file handles that it has open, or it may queue the
    56 %% request and ask other clients to close file handles they have open
    57 %% in order to satisfy the request. Requests are always satisfied in
    58 %% the order they arrive, even if a latter request (for a small number
    59 %% of file handles) can be satisfied before an earlier request (for a
    60 %% larger number of file handles). On close, the client sends a
    61 %% message to the server. These messages allow the server to keep
    62 %% track of the number of open handles. The client also keeps a
    63 %% gb_tree which is updated on every use of a file handle, mapping the
    64 %% time at which the file handle was last used (timestamp) to the
    65 %% handle. Thus the smallest key in this tree maps to the file handle
    66 %% that has not been used for the longest amount of time. This
    67 %% smallest key is included in the messages to the server. As such,
    68 %% the server keeps track of when the least recently used file handle
    69 %% was used *at the point of the most recent open or close* by each
    70 %% client.
    71 %%
    72 %% Note that this data can go very out of date, by the client using
    73 %% the least recently used handle.
    74 %%
    75 %% When the limit is exceeded (i.e. the number of open file handles is
    76 %% at the limit and there are pending 'open' requests), the server
    77 %% calculates the average age of the last reported least recently used
    78 %% file handle of all the clients. It then tells all the clients to
    79 %% close any handles not used for longer than this average, by
    80 %% invoking the callback the client registered. The client should
    81 %% receive this message and pass it into
    82 %% set_maximum_since_use/1. However, it is highly possible this age
    83 %% will be greater than the ages of all the handles the client knows
    84 %% of because the client has used its file handles in the mean
    85 %% time. Thus at this point the client reports to the server the
    86 %% current timestamp at which its least recently used file handle was
    87 %% last used. The server will check two seconds later that either it
    88 %% is back under the limit, in which case all is well again, or if
    89 %% not, it will calculate a new average age. Its data will be much
    90 %% more recent now, and so it is very likely that when this is
    91 %% communicated to the clients, the clients will close file handles.
    92 %% (In extreme cases, where it's very likely that all clients have
    93 %% used their open handles since they last sent in an update, which
    94 %% would mean that the average will never cause any file handles to
    95 %% be closed, the server can send out an average age of 0, resulting
    96 %% in all available clients closing all their file handles.)
    97 %%
    98 %% Care is taken to ensure that (a) processes which are blocked
    99 %% waiting for file descriptors to become available are not sent
   100 %% requests to close file handles; and (b) given it is known how many
   101 %% file handles a process has open, when the average age is forced to
   102 %% 0, close messages are only sent to enough processes to release the
   103 %% correct number of file handles and the list of processes is
   104 %% randomly shuffled. This ensures we don't cause processes to
   105 %% needlessly close file handles, and ensures that we don't always
   106 %% make such requests of the same processes.
   107 %%
   108 %% The advantage of this scheme is that there is only communication
   109 %% from the client to the server on open, close, and when in the
   110 %% process of trying to reduce file handle usage. There is no
   111 %% communication from the client to the server on normal file handle
   112 %% operations. This scheme forms a feed-back loop - the server does
   113 %% not care which file handles are closed, just that some are, and it
   114 %% checks this repeatedly when over the limit.
   115 %%
   116 %% Handles which are closed as a result of the server are put into a
   117 %% "soft-closed" state in which the handle is closed (data flushed out
   118 %% and sync'd first) but the state is maintained. The handle will be
   119 %% fully reopened again as soon as needed, thus users of this library
   120 %% do not need to worry about their handles being closed by the server
   121 %% - reopening them when necessary is handled transparently.
   122 %%
   123 %% The server also supports obtain, release and transfer. obtain/0
   124 %% blocks until a file descriptor is available, at which point the
   125 %% requesting process is considered to 'own' one more
   126 %% descriptor. release/0 is the inverse operation and releases a
   127 %% previously obtained descriptor. transfer/1 transfers ownership of a
   128 %% file descriptor between processes. It is non-blocking. Obtain has a
   129 %% lower limit, set by the ?OBTAIN_LIMIT/1 macro. File handles can use
   130 %% the entire limit, but will be evicted by obtain calls up to the
   131 %% point at which no more obtain calls can be satisfied by the obtains
   132 %% limit. Thus there will always be some capacity available for file
   133 %% handles. Processes that use obtain are never asked to return them,
   134 %% and they are not managed in any way by the server. It is simply a
   135 %% mechanism to ensure that processes that need file descriptors such
   136 %% as sockets can do so in such a way that the overall number of open
   137 %% file descriptors is managed.
   138 %%
   139 %% The callers of register_callback/3, obtain/0, and the argument of
   140 %% transfer/1 are monitored, reducing the count of handles in use
   141 %% appropriately when the processes terminate.
   142 
   143 -behaviour(gen_server2).
   144 
   145 -export([register_callback/3]).
   146 -export([open/3, close/1, read/2, append/2, needs_sync/1, sync/1, position/2,
   147          truncate/1, current_virtual_offset/1, current_raw_offset/1, flush/1,
   148          copy/3, set_maximum_since_use/1, delete/1, clear/1]).
   149 -export([obtain/0, release/0, transfer/1, set_limit/1, get_limit/0, info_keys/0,
   150          info/0, info/1]).
   151 -export([ulimit/0]).
   152 
   153 -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
   154          terminate/2, code_change/3, prioritise_cast/2]).
   155 
   156 -define(SERVER, ?MODULE).
   157 -define(RESERVED_FOR_OTHERS, 100).
   158 
   159 -define(FILE_HANDLES_LIMIT_OTHER, 1024).
   160 -define(FILE_HANDLES_CHECK_INTERVAL, 2000).
   161 
   162 -define(OBTAIN_LIMIT(LIMIT), trunc((LIMIT * 0.9) - 2)).
   163 -define(CLIENT_ETS_TABLE, file_handle_cache_client).
   164 -define(ELDERS_ETS_TABLE, file_handle_cache_elders).
   165 
   166 %%----------------------------------------------------------------------------
   167 
   168 -record(file,
   169         { reader_count,
   170           has_writer
   171         }).
   172 
   173 -record(handle,
   174         { hdl,
   175           offset,
   176           is_dirty,
   177           write_buffer_size,
   178           write_buffer_size_limit,
   179           write_buffer,
   180           at_eof,
   181           path,
   182           mode,
   183           options,
   184           is_write,
   185           is_read,
   186           last_used_at
   187         }).
   188 
   189 -record(fhc_state,
   190         { elders,
   191           limit,
   192           open_count,
   193           open_pending,
   194           obtain_limit,
   195           obtain_count,
   196           obtain_pending,
   197           clients,
   198           timer_ref
   199         }).
   200 
   201 -record(cstate,
   202         { pid,
   203           callback,
   204           opened,
   205           obtained,
   206           blocked,
   207           pending_closes
   208         }).
   209 
   210 -record(pending,
   211         { kind,
   212           pid,
   213           requested,
   214           from
   215         }).
   216 
   217 %%----------------------------------------------------------------------------
   218 %% Specs
   219 %%----------------------------------------------------------------------------
   220 
   221 -ifdef(use_specs).
   222 
   223 -type(ref() :: any()).
   224 -type(ok_or_error() :: 'ok' | {'error', any()}).
   225 -type(val_or_error(T) :: {'ok', T} | {'error', any()}).
   226 -type(position() :: ('bof' | 'eof' | non_neg_integer() |
   227                      {('bof' |'eof'), non_neg_integer()} |
   228                      {'cur', integer()})).
   229 -type(offset() :: non_neg_integer()).
   230 
   231 -spec(register_callback/3 :: (atom(), atom(), [any()]) -> 'ok').
   232 -spec(open/3 ::
   233         (file:filename(), [any()],
   234          [{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')}])
   235         -> val_or_error(ref())).
   236 -spec(close/1 :: (ref()) -> ok_or_error()).
   237 -spec(read/2 :: (ref(), non_neg_integer()) ->
   238                      val_or_error([char()] | binary()) | 'eof').
   239 -spec(append/2 :: (ref(), iodata()) -> ok_or_error()).
   240 -spec(sync/1 :: (ref()) ->  ok_or_error()).
   241 -spec(position/2 :: (ref(), position()) -> val_or_error(offset())).
   242 -spec(truncate/1 :: (ref()) -> ok_or_error()).
   243 -spec(current_virtual_offset/1 :: (ref()) -> val_or_error(offset())).
   244 -spec(current_raw_offset/1     :: (ref()) -> val_or_error(offset())).
   245 -spec(flush/1 :: (ref()) -> ok_or_error()).
   246 -spec(copy/3 :: (ref(), ref(), non_neg_integer()) ->
   247                      val_or_error(non_neg_integer())).
   248 -spec(delete/1 :: (ref()) -> ok_or_error()).
   249 -spec(clear/1 :: (ref()) -> ok_or_error()).
   250 -spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok').
   251 -spec(obtain/0 :: () -> 'ok').
   252 -spec(release/0 :: () -> 'ok').
   253 -spec(transfer/1 :: (pid()) -> 'ok').
   254 -spec(set_limit/1 :: (non_neg_integer()) -> 'ok').
   255 -spec(get_limit/0 :: () -> non_neg_integer()).
   256 -spec(info_keys/0 :: () -> rabbit_types:info_keys()).
   257 -spec(info/0 :: () -> rabbit_types:infos()).
   258 -spec(info/1 :: ([atom()]) -> rabbit_types:infos()).
   259 -spec(ulimit/0 :: () -> 'unknown' | non_neg_integer()).
   260 
   261 -endif.
   262 
   263 %%----------------------------------------------------------------------------
   264 -define(INFO_KEYS, [total_limit, total_used, sockets_limit, sockets_used]).
   265 
   266 %%----------------------------------------------------------------------------
   267 %% Public API
   268 %%----------------------------------------------------------------------------
   269 
   270 start_link() ->
   271     gen_server2:start_link({local, ?SERVER}, ?MODULE, [], [{timeout, infinity}]).
   272 
   273 register_callback(M, F, A)
   274   when is_atom(M) andalso is_atom(F) andalso is_list(A) ->
   275     gen_server2:cast(?SERVER, {register_callback, self(), {M, F, A}}).
   276 
   277 open(Path, Mode, Options) ->
   278     Path1 = filename:absname(Path),
   279     File1 = #file { reader_count = RCount, has_writer = HasWriter } =
   280         case get({Path1, fhc_file}) of
   281             File = #file {} -> File;
   282             undefined       -> #file { reader_count = 0,
   283                                        has_writer = false }
   284         end,
   285     Mode1 = append_to_write(Mode),
   286     IsWriter = is_writer(Mode1),
   287     case IsWriter andalso HasWriter of
   288         true  -> {error, writer_exists};
   289         false -> {ok, Ref} = new_closed_handle(Path1, Mode1, Options),
   290                  case get_or_reopen([{Ref, new}]) of
   291                      {ok, [_Handle1]} ->
   292                          RCount1 = case is_reader(Mode1) of
   293                                        true  -> RCount + 1;
   294                                        false -> RCount
   295                                    end,
   296                          HasWriter1 = HasWriter orelse IsWriter,
   297                          put({Path1, fhc_file},
   298                              File1 #file { reader_count = RCount1,
   299                                            has_writer = HasWriter1 }),
   300                          {ok, Ref};
   301                      Error ->
   302                          erase({Ref, fhc_handle}),
   303                          Error
   304                  end
   305     end.
   306 
   307 close(Ref) ->
   308     case erase({Ref, fhc_handle}) of
   309         undefined -> ok;
   310         Handle    -> case hard_close(Handle) of
   311                          ok               -> ok;
   312                          {Error, Handle1} -> put_handle(Ref, Handle1),
   313                                              Error
   314                      end
   315     end.
   316 
   317 read(Ref, Count) ->
   318     with_flushed_handles(
   319       [Ref],
   320       fun ([#handle { is_read = false }]) ->
   321               {error, not_open_for_reading};
   322           ([Handle = #handle { hdl = Hdl, offset = Offset }]) ->
   323               case prim_file:read(Hdl, Count) of
   324                   {ok, Data} = Obj -> Offset1 = Offset + iolist_size(Data),
   325                                       {Obj,
   326                                        [Handle #handle { offset = Offset1 }]};
   327                   eof              -> {eof, [Handle #handle { at_eof = true }]};
   328                   Error            -> {Error, [Handle]}
   329               end
   330       end).
   331 
   332 append(Ref, Data) ->
   333     with_handles(
   334       [Ref],
   335       fun ([#handle { is_write = false }]) ->
   336               {error, not_open_for_writing};
   337           ([Handle]) ->
   338               case maybe_seek(eof, Handle) of
   339                   {{ok, _Offset}, #handle { hdl = Hdl, offset = Offset,
   340                                             write_buffer_size_limit = 0,
   341                                             at_eof = true } = Handle1} ->
   342                       Offset1 = Offset + iolist_size(Data),
   343                       {prim_file:write(Hdl, Data),
   344                        [Handle1 #handle { is_dirty = true, offset = Offset1 }]};
   345                   {{ok, _Offset}, #handle { write_buffer = WriteBuffer,
   346                                             write_buffer_size = Size,
   347                                             write_buffer_size_limit = Limit,
   348                                             at_eof = true } = Handle1} ->
   349                       WriteBuffer1 = [Data | WriteBuffer],
   350                       Size1 = Size + iolist_size(Data),
   351                       Handle2 = Handle1 #handle { write_buffer = WriteBuffer1,
   352                                                   write_buffer_size = Size1 },
   353                       case Limit =/= infinity andalso Size1 > Limit of
   354                           true  -> {Result, Handle3} = write_buffer(Handle2),
   355                                    {Result, [Handle3]};
   356                           false -> {ok, [Handle2]}
   357                       end;
   358                   {{error, _} = Error, Handle1} ->
   359                       {Error, [Handle1]}
   360               end
   361       end).
   362 
   363 sync(Ref) ->
   364     with_flushed_handles(
   365       [Ref],
   366       fun ([#handle { is_dirty = false, write_buffer = [] }]) ->
   367               ok;
   368           ([Handle = #handle { hdl = Hdl,
   369                                is_dirty = true, write_buffer = [] }]) ->
   370               case prim_file:sync(Hdl) of
   371                   ok    -> {ok, [Handle #handle { is_dirty = false }]};
   372                   Error -> {Error, [Handle]}
   373               end
   374       end).
   375 
   376 needs_sync(Ref) ->
   377     with_handles(
   378       [Ref],
   379       fun ([#handle { is_dirty = false, write_buffer = [] }]) -> false;
   380           ([_Handle])                                         -> true
   381       end).
   382 
   383 position(Ref, NewOffset) ->
   384     with_flushed_handles(
   385       [Ref],
   386       fun ([Handle]) -> {Result, Handle1} = maybe_seek(NewOffset, Handle),
   387                         {Result, [Handle1]}
   388       end).
   389 
   390 truncate(Ref) ->
   391     with_flushed_handles(
   392       [Ref],
   393       fun ([Handle1 = #handle { hdl = Hdl }]) ->
   394               case prim_file:truncate(Hdl) of
   395                   ok    -> {ok, [Handle1 #handle { at_eof = true }]};
   396                   Error -> {Error, [Handle1]}
   397               end
   398       end).
   399 
   400 current_virtual_offset(Ref) ->
   401     with_handles([Ref], fun ([#handle { at_eof = true, is_write = true,
   402                                         offset = Offset,
   403                                         write_buffer_size = Size }]) ->
   404                                 {ok, Offset + Size};
   405                             ([#handle { offset = Offset }]) ->
   406                                 {ok, Offset}
   407                         end).
   408 
   409 current_raw_offset(Ref) ->
   410     with_handles([Ref], fun ([Handle]) -> {ok, Handle #handle.offset} end).
   411 
   412 flush(Ref) ->
   413     with_flushed_handles([Ref], fun ([Handle]) -> {ok, [Handle]} end).
   414 
   415 copy(Src, Dest, Count) ->
   416     with_flushed_handles(
   417       [Src, Dest],
   418       fun ([SHandle = #handle { is_read  = true, hdl = SHdl, offset = SOffset },
   419             DHandle = #handle { is_write = true, hdl = DHdl, offset = DOffset }]
   420           ) ->
   421               case prim_file:copy(SHdl, DHdl, Count) of
   422                   {ok, Count1} = Result1 ->
   423                       {Result1,
   424                        [SHandle #handle { offset = SOffset + Count1 },
   425                         DHandle #handle { offset = DOffset + Count1,
   426                                           is_dirty = true }]};
   427                   Error ->
   428                       {Error, [SHandle, DHandle]}
   429               end;
   430           (_Handles) ->
   431               {error, incorrect_handle_modes}
   432       end).
   433 
   434 delete(Ref) ->
   435     case erase({Ref, fhc_handle}) of
   436         undefined ->
   437             ok;
   438         Handle = #handle { path = Path } ->
   439             case hard_close(Handle #handle { is_dirty = false,
   440                                              write_buffer = [] }) of
   441                 ok               -> prim_file:delete(Path);
   442                 {Error, Handle1} -> put_handle(Ref, Handle1),
   443                                     Error
   444             end
   445     end.
   446 
   447 clear(Ref) ->
   448     with_handles(
   449       [Ref],
   450       fun ([#handle { at_eof = true, write_buffer_size = 0, offset = 0 }]) ->
   451               ok;
   452           ([Handle]) ->
   453               case maybe_seek(bof, Handle #handle { write_buffer = [],
   454                                                     write_buffer_size = 0 }) of
   455                   {{ok, 0}, Handle1 = #handle { hdl = Hdl }} ->
   456                       case prim_file:truncate(Hdl) of
   457                           ok    -> {ok, [Handle1 #handle { at_eof = true }]};
   458                           Error -> {Error, [Handle1]}
   459                       end;
   460                   {{error, _} = Error, Handle1} ->
   461                       {Error, [Handle1]}
   462               end
   463       end).
   464 
   465 set_maximum_since_use(MaximumAge) ->
   466     Now = now(),
   467     case lists:foldl(
   468            fun ({{Ref, fhc_handle},
   469                  Handle = #handle { hdl = Hdl, last_used_at = Then }}, Rep) ->
   470                    case Hdl =/= closed andalso
   471                        timer:now_diff(Now, Then) >= MaximumAge of
   472                        true  -> soft_close(Ref, Handle) orelse Rep;
   473                        false -> Rep
   474                    end;
   475                (_KeyValuePair, Rep) ->
   476                    Rep
   477            end, false, get()) of
   478         false -> age_tree_change(), ok;
   479         true  -> ok
   480     end.
   481 
   482 obtain() ->
   483     %% If the FHC isn't running, obtains succeed immediately.
   484     case whereis(?SERVER) of
   485         undefined -> ok;
   486         _         -> gen_server2:call(?SERVER, {obtain, self()}, infinity)
   487     end.
   488 
   489 release() ->
   490     gen_server2:cast(?SERVER, {release, self()}).
   491 
   492 transfer(Pid) ->
   493     gen_server2:cast(?SERVER, {transfer, self(), Pid}).
   494 
   495 set_limit(Limit) ->
   496     gen_server2:call(?SERVER, {set_limit, Limit}, infinity).
   497 
   498 get_limit() ->
   499     gen_server2:call(?SERVER, get_limit, infinity).
   500 
   501 info_keys() -> ?INFO_KEYS.
   502 
   503 info() -> info(?INFO_KEYS).
   504 info(Items) -> gen_server2:call(?SERVER, {info, Items}, infinity).
   505 
   506 %%----------------------------------------------------------------------------
   507 %% Internal functions
   508 %%----------------------------------------------------------------------------
   509 
   510 is_reader(Mode) -> lists:member(read, Mode).
   511 
   512 is_writer(Mode) -> lists:member(write, Mode).
   513 
   514 append_to_write(Mode) ->
   515     case lists:member(append, Mode) of
   516         true  -> [write | Mode -- [append, write]];
   517         false -> Mode
   518     end.
   519 
   520 with_handles(Refs, Fun) ->
   521     case get_or_reopen([{Ref, reopen} || Ref <- Refs]) of
   522         {ok, Handles} ->
   523             case Fun(Handles) of
   524                 {Result, Handles1} when is_list(Handles1) ->
   525                     lists:zipwith(fun put_handle/2, Refs, Handles1),
   526                     Result;
   527                 Result ->
   528                     Result
   529             end;
   530         Error ->
   531             Error
   532     end.
   533 
   534 with_flushed_handles(Refs, Fun) ->
   535     with_handles(
   536       Refs,
   537       fun (Handles) ->
   538               case lists:foldl(
   539                      fun (Handle, {ok, HandlesAcc}) ->
   540                              {Res, Handle1} = write_buffer(Handle),
   541                              {Res, [Handle1 | HandlesAcc]};
   542                          (Handle, {Error, HandlesAcc}) ->
   543                              {Error, [Handle | HandlesAcc]}
   544                      end, {ok, []}, Handles) of
   545                   {ok, Handles1} ->
   546                       Fun(lists:reverse(Handles1));
   547                   {Error, Handles1} ->
   548                       {Error, lists:reverse(Handles1)}
   549               end
   550       end).
   551 
   552 get_or_reopen(RefNewOrReopens) ->
   553     case partition_handles(RefNewOrReopens) of
   554         {OpenHdls, []} ->
   555             {ok, [Handle || {_Ref, Handle} <- OpenHdls]};
   556         {OpenHdls, ClosedHdls} ->
   557             Oldest = oldest(get_age_tree(), fun () -> now() end),
   558             case gen_server2:call(?SERVER, {open, self(), length(ClosedHdls),
   559                                             Oldest}, infinity) of
   560                 ok ->
   561                     case reopen(ClosedHdls) of
   562                         {ok, RefHdls}  -> sort_handles(RefNewOrReopens,
   563                                                        OpenHdls, RefHdls, []);
   564                         Error          -> Error
   565                     end;
   566                 close ->
   567                     [soft_close(Ref, Handle) ||
   568                         {{Ref, fhc_handle}, Handle = #handle { hdl = Hdl }} <-
   569                             get(),
   570                         Hdl =/= closed],
   571                     get_or_reopen(RefNewOrReopens)
   572             end
   573     end.
   574 
   575 reopen(ClosedHdls) -> reopen(ClosedHdls, get_age_tree(), []).
   576 
   577 reopen([], Tree, RefHdls) ->
   578     put_age_tree(Tree),
   579     {ok, lists:reverse(RefHdls)};
   580 reopen([{Ref, NewOrReopen, Handle = #handle { hdl          = closed,
   581                                               path         = Path,
   582                                               mode         = Mode,
   583                                               offset       = Offset,
   584                                               last_used_at = undefined }} |
   585         RefNewOrReopenHdls] = ToOpen, Tree, RefHdls) ->
   586     case prim_file:open(Path, case NewOrReopen of
   587                                   new    -> Mode;
   588                                   reopen -> [read | Mode]
   589                               end) of
   590         {ok, Hdl} ->
   591             Now = now(),
   592             {{ok, _Offset}, Handle1} =
   593                 maybe_seek(Offset, Handle #handle { hdl          = Hdl,
   594                                                     offset       = 0,
   595                                                     last_used_at = Now }),
   596             put({Ref, fhc_handle}, Handle1),
   597             reopen(RefNewOrReopenHdls, gb_trees:insert(Now, Ref, Tree),
   598                    [{Ref, Handle1} | RefHdls]);
   599         Error ->
   600             %% NB: none of the handles in ToOpen are in the age tree
   601             Oldest = oldest(Tree, fun () -> undefined end),
   602             [gen_server2:cast(?SERVER, {close, self(), Oldest}) || _ <- ToOpen],
   603             put_age_tree(Tree),
   604             Error
   605     end.
   606 
   607 partition_handles(RefNewOrReopens) ->
   608     lists:foldr(
   609       fun ({Ref, NewOrReopen}, {Open, Closed}) ->
   610               case get({Ref, fhc_handle}) of
   611                   #handle { hdl = closed } = Handle ->
   612                       {Open, [{Ref, NewOrReopen, Handle} | Closed]};
   613                   #handle {} = Handle ->
   614                       {[{Ref, Handle} | Open], Closed}
   615               end
   616       end, {[], []}, RefNewOrReopens).
   617 
   618 sort_handles([], [], [], Acc) ->
   619     {ok, lists:reverse(Acc)};
   620 sort_handles([{Ref, _} | RefHdls], [{Ref, Handle} | RefHdlsA], RefHdlsB, Acc) ->
   621     sort_handles(RefHdls, RefHdlsA, RefHdlsB, [Handle | Acc]);
   622 sort_handles([{Ref, _} | RefHdls], RefHdlsA, [{Ref, Handle} | RefHdlsB], Acc) ->
   623     sort_handles(RefHdls, RefHdlsA, RefHdlsB, [Handle | Acc]).
   624 
   625 put_handle(Ref, Handle = #handle { last_used_at = Then }) ->
   626     Now = now(),
   627     age_tree_update(Then, Now, Ref),
   628     put({Ref, fhc_handle}, Handle #handle { last_used_at = Now }).
   629 
   630 with_age_tree(Fun) -> put_age_tree(Fun(get_age_tree())).
   631 
   632 get_age_tree() ->
   633     case get(fhc_age_tree) of
   634         undefined -> gb_trees:empty();
   635         AgeTree   -> AgeTree
   636     end.
   637 
   638 put_age_tree(Tree) -> put(fhc_age_tree, Tree).
   639 
   640 age_tree_update(Then, Now, Ref) ->
   641     with_age_tree(
   642       fun (Tree) ->
   643               gb_trees:insert(Now, Ref, gb_trees:delete_any(Then, Tree))
   644       end).
   645 
   646 age_tree_delete(Then) ->
   647     with_age_tree(
   648       fun (Tree) ->
   649               Tree1 = gb_trees:delete_any(Then, Tree),
   650               Oldest = oldest(Tree1, fun () -> undefined end),
   651               gen_server2:cast(?SERVER, {close, self(), Oldest}),
   652               Tree1
   653       end).
   654 
   655 age_tree_change() ->
   656     with_age_tree(
   657       fun (Tree) ->
   658               case gb_trees:is_empty(Tree) of
   659                   true  -> Tree;
   660                   false -> {Oldest, _Ref} = gb_trees:smallest(Tree),
   661                            gen_server2:cast(?SERVER, {update, self(), Oldest})
   662               end,
   663               Tree
   664       end).
   665 
   666 oldest(Tree, DefaultFun) ->
   667     case gb_trees:is_empty(Tree) of
   668         true  -> DefaultFun();
   669         false -> {Oldest, _Ref} = gb_trees:smallest(Tree),
   670                  Oldest
   671     end.
   672 
   673 new_closed_handle(Path, Mode, Options) ->
   674     WriteBufferSize =
   675         case proplists:get_value(write_buffer, Options, unbuffered) of
   676             unbuffered           -> 0;
   677             infinity             -> infinity;
   678             N when is_integer(N) -> N
   679         end,
   680     Ref = make_ref(),
   681     put({Ref, fhc_handle}, #handle { hdl                     = closed,
   682                                      offset                  = 0,
   683                                      is_dirty                = false,
   684                                      write_buffer_size       = 0,
   685                                      write_buffer_size_limit = WriteBufferSize,
   686                                      write_buffer            = [],
   687                                      at_eof                  = false,
   688                                      path                    = Path,
   689                                      mode                    = Mode,
   690                                      options                 = Options,
   691                                      is_write                = is_writer(Mode),
   692                                      is_read                 = is_reader(Mode),
   693                                      last_used_at            = undefined }),
   694     {ok, Ref}.
   695 
   696 soft_close(Ref, Handle) ->
   697     {Res, Handle1} = soft_close(Handle),
   698     case Res of
   699         ok -> put({Ref, fhc_handle}, Handle1),
   700               true;
   701         _  -> put_handle(Ref, Handle1),
   702               false
   703     end.
   704 
   705 soft_close(Handle = #handle { hdl = closed }) ->
   706     {ok, Handle};
   707 soft_close(Handle) ->
   708     case write_buffer(Handle) of
   709         {ok, #handle { hdl         = Hdl,
   710                        is_dirty    = IsDirty,
   711                        last_used_at = Then } = Handle1 } ->
   712             ok = case IsDirty of
   713                      true  -> prim_file:sync(Hdl);
   714                      false -> ok
   715                  end,
   716             ok = prim_file:close(Hdl),
   717             age_tree_delete(Then),
   718             {ok, Handle1 #handle { hdl            = closed,
   719                                    is_dirty       = false,
   720                                    last_used_at   = undefined }};
   721         {_Error, _Handle} = Result ->
   722             Result
   723     end.
   724 
   725 hard_close(Handle) ->
   726     case soft_close(Handle) of
   727         {ok, #handle { path = Path,
   728                        is_read = IsReader, is_write = IsWriter }} ->
   729             #file { reader_count = RCount, has_writer = HasWriter } = File =
   730                 get({Path, fhc_file}),
   731             RCount1 = case IsReader of
   732                           true  -> RCount - 1;
   733                           false -> RCount
   734                       end,
   735             HasWriter1 = HasWriter andalso not IsWriter,
   736             case RCount1 =:= 0 andalso not HasWriter1 of
   737                 true  -> erase({Path, fhc_file});
   738                 false -> put({Path, fhc_file},
   739                              File #file { reader_count = RCount1,
   740                                           has_writer = HasWriter1 })
   741             end,
   742             ok;
   743         {_Error, _Handle} = Result ->
   744             Result
   745     end.
   746 
   747 maybe_seek(NewOffset, Handle = #handle { hdl = Hdl, offset = Offset,
   748                                          at_eof = AtEoF }) ->
   749     {AtEoF1, NeedsSeek} = needs_seek(AtEoF, Offset, NewOffset),
   750     case (case NeedsSeek of
   751               true  -> prim_file:position(Hdl, NewOffset);
   752               false -> {ok, Offset}
   753           end) of
   754         {ok, Offset1} = Result ->
   755             {Result, Handle #handle { offset = Offset1, at_eof = AtEoF1 }};
   756         {error, _} = Error ->
   757             {Error, Handle}
   758     end.
   759 
   760 needs_seek( AtEoF, _CurOffset,  cur     ) -> {AtEoF, false};
   761 needs_seek( AtEoF, _CurOffset,  {cur, 0}) -> {AtEoF, false};
   762 needs_seek(  true, _CurOffset,  eof     ) -> {true , false};
   763 needs_seek(  true, _CurOffset,  {eof, 0}) -> {true , false};
   764 needs_seek( false, _CurOffset,  eof     ) -> {true , true };
   765 needs_seek( false, _CurOffset,  {eof, 0}) -> {true , true };
   766 needs_seek( AtEoF,          0,  bof     ) -> {AtEoF, false};
   767 needs_seek( AtEoF,          0,  {bof, 0}) -> {AtEoF, false};
   768 needs_seek( AtEoF,  CurOffset, CurOffset) -> {AtEoF, false};
   769 needs_seek(  true,  CurOffset, {bof, DesiredOffset})
   770   when DesiredOffset >= CurOffset ->
   771     {true, true};
   772 needs_seek(  true, _CurOffset, {cur, DesiredOffset})
   773   when DesiredOffset > 0 ->
   774     {true, true};
   775 needs_seek(  true,  CurOffset, DesiredOffset) %% same as {bof, DO}
   776   when is_integer(DesiredOffset) andalso DesiredOffset >= CurOffset ->
   777     {true, true};
   778 %% because we can't really track size, we could well end up at EoF and not know
   779 needs_seek(_AtEoF, _CurOffset, _DesiredOffset) ->
   780     {false, true}.
   781 
   782 write_buffer(Handle = #handle { write_buffer = [] }) ->
   783     {ok, Handle};
   784 write_buffer(Handle = #handle { hdl = Hdl, offset = Offset,
   785                                 write_buffer = WriteBuffer,
   786                                 write_buffer_size = DataSize,
   787                                 at_eof = true }) ->
   788     case prim_file:write(Hdl, lists:reverse(WriteBuffer)) of
   789         ok ->
   790             Offset1 = Offset + DataSize,
   791             {ok, Handle #handle { offset = Offset1, is_dirty = true,
   792                                   write_buffer = [], write_buffer_size = 0 }};
   793         {error, _} = Error ->
   794             {Error, Handle}
   795     end.
   796 
   797 infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
   798 
   799 i(total_limit,   #fhc_state{limit        = Limit})               -> Limit;
   800 i(total_used,    #fhc_state{open_count = C1, obtain_count = C2}) -> C1 + C2;
   801 i(sockets_limit, #fhc_state{obtain_limit = Limit})               -> Limit;
   802 i(sockets_used,  #fhc_state{obtain_count = Count})               -> Count;
   803 i(Item, _) -> throw({bad_argument, Item}).
   804 
   805 %%----------------------------------------------------------------------------
   806 %% gen_server2 callbacks
   807 %%----------------------------------------------------------------------------
   808 
   809 init([]) ->
   810     Limit = case application:get_env(file_handles_high_watermark) of
   811                 {ok, Watermark} when (is_integer(Watermark) andalso
   812                                       Watermark > 0) ->
   813                     Watermark;
   814                 _ ->
   815                     case ulimit() of
   816                         unknown  -> ?FILE_HANDLES_LIMIT_OTHER;
   817                         Lim      -> lists:max([2, Lim - ?RESERVED_FOR_OTHERS])
   818                     end
   819             end,
   820     ObtainLimit = obtain_limit(Limit),
   821     error_logger:info_msg("Limiting to approx ~p file handles (~p sockets)~n",
   822                           [Limit, ObtainLimit]),
   823     Clients = ets:new(?CLIENT_ETS_TABLE, [set, private, {keypos, #cstate.pid}]),
   824     Elders = ets:new(?ELDERS_ETS_TABLE, [set, private]),
   825     {ok, #fhc_state { elders         = Elders,
   826                       limit          = Limit,
   827                       open_count     = 0,
   828                       open_pending   = pending_new(),
   829                       obtain_limit   = ObtainLimit,
   830                       obtain_count   = 0,
   831                       obtain_pending = pending_new(),
   832                       clients        = Clients,
   833                       timer_ref      = undefined }}.
   834 
   835 prioritise_cast(Msg, _State) ->
   836     case Msg of
   837         {release, _}                 -> 5;
   838         _                            -> 0
   839     end.
   840 
   841 handle_call({open, Pid, Requested, EldestUnusedSince}, From,
   842             State = #fhc_state { open_count   = Count,
   843                                  open_pending = Pending,
   844                                  elders       = Elders,
   845                                  clients      = Clients })
   846   when EldestUnusedSince =/= undefined ->
   847     true = ets:insert(Elders, {Pid, EldestUnusedSince}),
   848     Item = #pending { kind      = open,
   849                       pid       = Pid,
   850                       requested = Requested,
   851                       from      = From },
   852     ok = track_client(Pid, Clients),
   853     case needs_reduce(State #fhc_state { open_count = Count + Requested }) of
   854         true  -> case ets:lookup(Clients, Pid) of
   855                      [#cstate { opened = 0 }] ->
   856                          true = ets:update_element(
   857                                   Clients, Pid, {#cstate.blocked, true}),
   858                          {noreply,
   859                           reduce(State #fhc_state {
   860                                    open_pending = pending_in(Item, Pending) })};
   861                      [#cstate { opened = Opened }] ->
   862                          true = ets:update_element(
   863                                   Clients, Pid,
   864                                   {#cstate.pending_closes, Opened}),
   865                          {reply, close, State}
   866                  end;
   867         false -> {noreply, run_pending_item(Item, State)}
   868     end;
   869 
   870 handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count   = Count,
   871                                                       obtain_pending = Pending,
   872                                                       clients = Clients }) ->
   873     ok = track_client(Pid, Clients),
   874     Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From },
   875     Enqueue = fun () ->
   876                       true = ets:update_element(Clients, Pid,
   877                                                 {#cstate.blocked, true}),
   878                       State #fhc_state {
   879                         obtain_pending = pending_in(Item, Pending) }
   880               end,
   881     {noreply,
   882         case obtain_limit_reached(State) of
   883             true  -> Enqueue();
   884             false -> case needs_reduce(State #fhc_state {
   885                                       obtain_count = Count + 1 }) of
   886                          true  -> reduce(Enqueue());
   887                          false -> adjust_alarm(
   888                                       State, run_pending_item(Item, State))
   889                      end
   890         end};
   891 
   892 handle_call({set_limit, Limit}, _From, State) ->
   893     {reply, ok, adjust_alarm(
   894                   State, maybe_reduce(
   895                            process_pending(
   896                              State #fhc_state {
   897                                limit        = Limit,
   898                                obtain_limit = obtain_limit(Limit) })))};
   899 
   900 handle_call(get_limit, _From, State = #fhc_state { limit = Limit }) ->
   901     {reply, Limit, State};
   902 
   903 handle_call({info, Items}, _From, State) ->
   904     {reply, infos(Items, State), State}.
   905 
   906 handle_cast({register_callback, Pid, MFA},
   907             State = #fhc_state { clients = Clients }) ->
   908     ok = track_client(Pid, Clients),
   909     true = ets:update_element(Clients, Pid, {#cstate.callback, MFA}),
   910     {noreply, State};
   911 
   912 handle_cast({update, Pid, EldestUnusedSince},
   913             State = #fhc_state { elders = Elders })
   914   when EldestUnusedSince =/= undefined ->
   915     true = ets:insert(Elders, {Pid, EldestUnusedSince}),
   916     %% don't call maybe_reduce from here otherwise we can create a
   917     %% storm of messages
   918     {noreply, State};
   919 
   920 handle_cast({release, Pid}, State) ->
   921     {noreply, adjust_alarm(State, process_pending(
   922                                     update_counts(obtain, Pid, -1, State)))};
   923 
   924 handle_cast({close, Pid, EldestUnusedSince},
   925             State = #fhc_state { elders = Elders, clients = Clients }) ->
   926     true = case EldestUnusedSince of
   927                undefined -> ets:delete(Elders, Pid);
   928                _         -> ets:insert(Elders, {Pid, EldestUnusedSince})
   929            end,
   930     ets:update_counter(Clients, Pid, {#cstate.pending_closes, -1, 0, 0}),
   931     {noreply, adjust_alarm(State, process_pending(
   932                 update_counts(open, Pid, -1, State)))};
   933 
   934 handle_cast({transfer, FromPid, ToPid}, State) ->
   935     ok = track_client(ToPid, State#fhc_state.clients),
   936     {noreply, process_pending(
   937                 update_counts(obtain, ToPid, +1,
   938                               update_counts(obtain, FromPid, -1, State)))}.
   939 
   940 handle_info(check_counts, State) ->
   941     {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })};
   942 
   943 handle_info({'DOWN', _MRef, process, Pid, _Reason},
   944             State = #fhc_state { elders         = Elders,
   945                                  open_count     = OpenCount,
   946                                  open_pending   = OpenPending,
   947                                  obtain_count   = ObtainCount,
   948                                  obtain_pending = ObtainPending,
   949                                  clients        = Clients }) ->
   950     [#cstate { opened = Opened, obtained = Obtained }] =
   951         ets:lookup(Clients, Pid),
   952     true = ets:delete(Clients, Pid),
   953     true = ets:delete(Elders, Pid),
   954     FilterFun = fun (#pending { pid = Pid1 }) -> Pid1 =/= Pid end,
   955     {noreply, adjust_alarm(
   956                 State,
   957                 process_pending(
   958                   State #fhc_state {
   959                     open_count     = OpenCount - Opened,
   960                     open_pending   = filter_pending(FilterFun, OpenPending),
   961                     obtain_count   = ObtainCount - Obtained,
   962                     obtain_pending = filter_pending(FilterFun, ObtainPending) }))}.
   963 
   964 terminate(_Reason, State = #fhc_state { clients = Clients,
   965                                         elders  = Elders }) ->
   966     ets:delete(Clients),
   967     ets:delete(Elders),
   968     State.
   969 
   970 code_change(_OldVsn, State, _Extra) ->
   971     {ok, State}.
   972 
   973 %%----------------------------------------------------------------------------
   974 %% pending queue abstraction helpers
   975 %%----------------------------------------------------------------------------
   976 
   977 queue_fold(Fun, Init, Q) ->
   978     case queue:out(Q) of
   979         {empty, _Q}      -> Init;
   980         {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1)
   981     end.
   982 
   983 filter_pending(Fun, {Count, Queue}) ->
   984     {Delta, Queue1} =
   985         queue_fold(
   986           fun (Item = #pending { requested = Requested }, {DeltaN, QueueN}) ->
   987                   case Fun(Item) of
   988                       true  -> {DeltaN, queue:in(Item, QueueN)};
   989                       false -> {DeltaN - Requested, QueueN}
   990                   end
   991           end, {0, queue:new()}, Queue),
   992     {Count + Delta, Queue1}.
   993 
   994 pending_new() ->
   995     {0, queue:new()}.
   996 
   997 pending_in(Item = #pending { requested = Requested }, {Count, Queue}) ->
   998     {Count + Requested, queue:in(Item, Queue)}.
   999 
  1000 pending_out({0, _Queue} = Pending) ->
  1001     {empty, Pending};
  1002 pending_out({N, Queue}) ->
  1003     {{value, #pending { requested = Requested }} = Result, Queue1} =
  1004         queue:out(Queue),
  1005     {Result, {N - Requested, Queue1}}.
  1006 
  1007 pending_count({Count, _Queue}) ->
  1008     Count.
  1009 
  1010 pending_is_empty({0, _Queue}) ->
  1011     true;
  1012 pending_is_empty({_N, _Queue}) ->
  1013     false.
  1014 
  1015 %%----------------------------------------------------------------------------
  1016 %% server helpers
  1017 %%----------------------------------------------------------------------------
  1018 
  1019 obtain_limit(infinity) -> infinity;
  1020 obtain_limit(Limit)    -> case ?OBTAIN_LIMIT(Limit) of
  1021                               OLimit when OLimit < 0 -> 0;
  1022                               OLimit                 -> OLimit
  1023                           end.
  1024 
  1025 obtain_limit_reached(#fhc_state { obtain_limit = Limit,
  1026                                   obtain_count = Count}) ->
  1027     Limit =/= infinity andalso Count >= Limit.
  1028 
  1029 adjust_alarm(OldState, NewState) ->
  1030     case {obtain_limit_reached(OldState), obtain_limit_reached(NewState)} of
  1031         {false, true} -> alarm_handler:set_alarm({file_descriptor_limit, []});
  1032         {true, false} -> alarm_handler:clear_alarm(file_descriptor_limit);
  1033         _             -> ok
  1034     end,
  1035     NewState.
  1036 
  1037 process_pending(State = #fhc_state { limit = infinity }) ->
  1038     State;
  1039 process_pending(State) ->
  1040     process_open(process_obtain(State)).
  1041 
  1042 process_open(State = #fhc_state { limit        = Limit,
  1043                                   open_pending = Pending,
  1044                                   open_count   = OpenCount,
  1045                                   obtain_count = ObtainCount }) ->
  1046     {Pending1, State1} =
  1047         process_pending(Pending, Limit - (ObtainCount + OpenCount), State),
  1048     State1 #fhc_state { open_pending = Pending1 }.
  1049 
  1050 process_obtain(State = #fhc_state { limit          = Limit,
  1051                                     obtain_pending = Pending,
  1052                                     obtain_limit   = ObtainLimit,
  1053                                     obtain_count   = ObtainCount,
  1054                                     open_count     = OpenCount }) ->
  1055     Quota = lists:min([ObtainLimit - ObtainCount,
  1056                        Limit - (ObtainCount + OpenCount)]),
  1057     {Pending1, State1} = process_pending(Pending, Quota, State),
  1058     State1 #fhc_state { obtain_pending = Pending1 }.
  1059 
  1060 process_pending(Pending, Quota, State) when Quota =< 0 ->
  1061     {Pending, State};
  1062 process_pending(Pending, Quota, State) ->
  1063     case pending_out(Pending) of
  1064         {empty, _Pending} ->
  1065             {Pending, State};
  1066         {{value, #pending { requested = Requested }}, _Pending1}
  1067           when Requested > Quota ->
  1068             {Pending, State};
  1069         {{value, #pending { requested = Requested } = Item}, Pending1} ->
  1070             process_pending(Pending1, Quota - Requested,
  1071                             run_pending_item(Item, State))
  1072     end.
  1073 
  1074 run_pending_item(#pending { kind      = Kind,
  1075                             pid       = Pid,
  1076                             requested = Requested,
  1077                             from      = From },
  1078                  State = #fhc_state { clients = Clients }) ->
  1079     gen_server2:reply(From, ok),
  1080     true = ets:update_element(Clients, Pid, {#cstate.blocked, false}),
  1081     update_counts(Kind, Pid, Requested, State).
  1082 
  1083 update_counts(Kind, Pid, Delta,
  1084               State = #fhc_state { open_count   = OpenCount,
  1085                                    obtain_count = ObtainCount,
  1086                                    clients      = Clients }) ->
  1087     {OpenDelta, ObtainDelta} = update_counts1(Kind, Pid, Delta, Clients),
  1088     State #fhc_state { open_count   = OpenCount   + OpenDelta,
  1089                        obtain_count = ObtainCount + ObtainDelta }.
  1090 
  1091 update_counts1(open, Pid, Delta, Clients) ->
  1092     ets:update_counter(Clients, Pid, {#cstate.opened, Delta}),
  1093     {Delta, 0};
  1094 update_counts1(obtain, Pid, Delta, Clients) ->
  1095     ets:update_counter(Clients, Pid, {#cstate.obtained, Delta}),
  1096     {0, Delta}.
  1097 
  1098 maybe_reduce(State) ->
  1099     case needs_reduce(State) of
  1100         true  -> reduce(State);
  1101         false -> State
  1102     end.
  1103 
  1104 needs_reduce(#fhc_state { limit          = Limit,
  1105                           open_count     = OpenCount,
  1106                           open_pending   = OpenPending,
  1107                           obtain_count   = ObtainCount,
  1108                           obtain_limit   = ObtainLimit,
  1109                           obtain_pending = ObtainPending }) ->
  1110     Limit =/= infinity
  1111         andalso ((OpenCount + ObtainCount > Limit)
  1112                  orelse (not pending_is_empty(OpenPending))
  1113                  orelse (ObtainCount < ObtainLimit
  1114                          andalso not pending_is_empty(ObtainPending))).
  1115 
  1116 reduce(State = #fhc_state { open_pending   = OpenPending,
  1117                             obtain_pending = ObtainPending,
  1118                             elders         = Elders,
  1119                             clients        = Clients,
  1120                             timer_ref      = TRef }) ->
  1121     Now = now(),
  1122     {CStates, Sum, ClientCount} =
  1123         ets:foldl(fun ({Pid, Eldest}, {CStatesAcc, SumAcc, CountAcc} = Accs) ->
  1124                           [#cstate { pending_closes = PendingCloses,
  1125                                      opened         = Opened,
  1126                                      blocked        = Blocked } = CState] =
  1127                               ets:lookup(Clients, Pid),
  1128                           case Blocked orelse PendingCloses =:= Opened of
  1129                               true  -> Accs;
  1130                               false -> {[CState | CStatesAcc],
  1131                                         SumAcc + timer:now_diff(Now, Eldest),
  1132                                         CountAcc + 1}
  1133                           end
  1134                   end, {[], 0, 0}, Elders),
  1135     case CStates of
  1136         [] -> ok;
  1137         _  -> case (Sum / ClientCount) -
  1138                   (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of
  1139                   AverageAge when AverageAge > 0 ->
  1140                       notify_age(CStates, AverageAge);
  1141                   _ ->
  1142                       notify_age0(Clients, CStates,
  1143                                   pending_count(OpenPending) +
  1144                                       pending_count(ObtainPending))
  1145               end
  1146     end,
  1147     case TRef of
  1148         undefined -> TRef1 = erlang:send_after(
  1149                                ?FILE_HANDLES_CHECK_INTERVAL, ?SERVER,
  1150                                check_counts),
  1151                      State #fhc_state { timer_ref = TRef1 };
  1152         _         -> State
  1153     end.
  1154 
  1155 notify_age(CStates, AverageAge) ->
  1156     lists:foreach(
  1157       fun (#cstate { callback = undefined }) -> ok;
  1158           (#cstate { callback = {M, F, A} }) -> apply(M, F, A ++ [AverageAge])
  1159       end, CStates).
  1160 
  1161 notify_age0(Clients, CStates, Required) ->
  1162     case [CState || CState <- CStates, CState#cstate.callback =/= undefined] of
  1163         []            -> ok;
  1164         Notifications -> S = random:uniform(length(Notifications)),
  1165                          {L1, L2} = lists:split(S, Notifications),
  1166                          notify(Clients, Required, L2 ++ L1)
  1167     end.
  1168 
  1169 notify(_Clients, _Required, []) ->
  1170     ok;
  1171 notify(_Clients, Required, _Notifications) when Required =< 0 ->
  1172     ok;
  1173 notify(Clients, Required, [#cstate{ pid      = Pid,
  1174                                     callback = {M, F, A},
  1175                                     opened   = Opened } | Notifications]) ->
  1176     apply(M, F, A ++ [0]),
  1177     ets:update_element(Clients, Pid, {#cstate.pending_closes, Opened}),
  1178     notify(Clients, Required - Opened, Notifications).
  1179 
  1180 track_client(Pid, Clients) ->
  1181     case ets:insert_new(Clients, #cstate { pid            = Pid,
  1182                                            callback       = undefined,
  1183                                            opened         = 0,
  1184                                            obtained       = 0,
  1185                                            blocked        = false,
  1186                                            pending_closes = 0 }) of
  1187         true  -> _MRef = erlang:monitor(process, Pid),
  1188                  ok;
  1189         false -> ok
  1190     end.
  1191 
  1192 
  1193 %% To increase the number of file descriptors: on Windows set ERL_MAX_PORTS
  1194 %% environment variable, on Linux set `ulimit -n`.
  1195 ulimit() ->
  1196     case proplists:get_value(max_fds, erlang:system_info(check_io)) of
  1197         MaxFds when is_integer(MaxFds) andalso MaxFds > 1 ->
  1198             case os:type() of
  1199                 {win32, _OsName} ->
  1200                     %% On Windows max_fds is twice the number of open files:
  1201                     %%   https://github.com/yrashk/erlang/blob/e1282325ed75e52a98d5/erts/emulator/sys/win32/sys.c#L2459-2466
  1202                     MaxFds div 2;
  1203                 _Any ->
  1204                     %% For other operating systems trust Erlang.
  1205                     MaxFds
  1206             end;
  1207         _ ->
  1208             unknown
  1209     end.