src/file_handle_cache.erl
author Matthias Radestock <matthias@rabbitmq.com>
Thu Jul 22 15:14:07 2010 +0100 (22 months ago)
branchbug21673
changeset 4146 e3aa3c965c9b
parent 3269 23950ce9888b
parent 3933 54476bfe3634
child 4392 6991323b486f
permissions -rw-r--r--
replace use of dict with orddict for per-msg_store partitions
which is much faster for small dicts
matthias@3268
     1
%%   The contents of this file are subject to the Mozilla Public License
matthias@3268
     2
%%   Version 1.1 (the "License"); you may not use this file except in
matthias@3268
     3
%%   compliance with the License. You may obtain a copy of the License at
matthias@3268
     4
%%   http://www.mozilla.org/MPL/
matthias@3268
     5
%%
matthias@3268
     6
%%   Software distributed under the License is distributed on an "AS IS"
matthias@3268
     7
%%   basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
matthias@3268
     8
%%   License for the specific language governing rights and limitations
matthias@3268
     9
%%   under the License.
matthias@3268
    10
%%
matthias@3268
    11
%%   The Original Code is RabbitMQ.
matthias@3268
    12
%%
matthias@3268
    13
%%   The Initial Developers of the Original Code are LShift Ltd,
matthias@3268
    14
%%   Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
matthias@3268
    15
%%
matthias@3268
    16
%%   Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
matthias@3268
    17
%%   Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
matthias@3268
    18
%%   are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
matthias@3268
    19
%%   Technologies LLC, and Rabbit Technologies Ltd.
matthias@3268
    20
%%
matthias@3268
    21
%%   Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
matthias@3268
    22
%%   Ltd. Portions created by Cohesive Financial Technologies LLC are
matthias@3268
    23
%%   Copyright (C) 2007-2010 Cohesive Financial Technologies
matthias@3268
    24
%%   LLC. Portions created by Rabbit Technologies Ltd are Copyright
matthias@3268
    25
%%   (C) 2007-2010 Rabbit Technologies Ltd.
matthias@3268
    26
%%
matthias@3268
    27
%%   All Rights Reserved.
matthias@3268
    28
%%
matthias@3268
    29
%%   Contributor(s): ______________________________________.
matthias@3268
    30
%%
matthias@3268
    31
matthias@3268
    32
-module(file_handle_cache).
matthias@3268
    33
matthias@3268
    34
%% A File Handle Cache
matthias@3268
    35
%%
matthias@3268
    36
%% This extends a subset of the functionality of the Erlang file
matthias@3268
    37
%% module.
matthias@3268
    38
%%
matthias@3268
    39
%% Some constraints
matthias@3268
    40
%% 1) This supports one writer, multiple readers per file. Nothing
matthias@3268
    41
%% else.
matthias@3268
    42
%% 2) Do not open the same file from different processes. Bad things
matthias@3268
    43
%% may happen.
matthias@3268
    44
%% 3) Writes are all appends. You cannot write to the middle of a
matthias@3268
    45
%% file, although you can truncate and then append if you want.
matthias@3268
    46
%% 4) Although there is a write buffer, there is no read buffer. Feel
matthias@3268
    47
%% free to use the read_ahead mode, but beware of the interaction
matthias@3268
    48
%% between that buffer and the write buffer.
matthias@3268
    49
%%
matthias@3268
    50
%% Some benefits
matthias@3268
    51
%% 1) You do not have to remember to call sync before close
matthias@3268
    52
%% 2) Buffering is much more flexible than with plain file module, and
matthias@3268
    53
%% you can control when the buffer gets flushed out. This means that
matthias@3268
    54
%% you can rely on reads-after-writes working, without having to call
matthias@3268
    55
%% the expensive sync.
matthias@3268
    56
%% 3) Unnecessary calls to position and sync get optimised out.
matthias@3268
    57
%% 4) You can find out what your 'real' offset is, and what your
matthias@3268
    58
%% 'virtual' offset is (i.e. where the hdl really is, and where it
matthias@3268
    59
%% would be after the write buffer is written out).
matthias@3268
    60
%% 5) You can find out what the offset was when you last sync'd.
matthias@3268
    61
%%
matthias@3268
    62
%% There is also a server component which serves to limit the number
matthias@3268
    63
%% of open file handles in a "soft" way - the server will never
matthias@3268
    64
%% prevent a client from opening a handle, but may immediately tell it
matthias@3268
    65
%% to close the handle. Thus you can set the limit to zero and it will
matthias@3268
    66
%% still all work correctly, it is just that effectively no caching
matthias@3268
    67
%% will take place. The operation of limiting is as follows:
matthias@3268
    68
%%
matthias@3268
    69
%% On open and close, the client sends messages to the server
matthias@3268
    70
%% informing it of opens and closes. This allows the server to keep
matthias@3268
    71
%% track of the number of open handles. The client also keeps a
matthias@3268
    72
%% gb_tree which is updated on every use of a file handle, mapping the
matthias@3268
    73
%% time at which the file handle was last used (timestamp) to the
matthias@3268
    74
%% handle. Thus the smallest key in this tree maps to the file handle
matthias@3268
    75
%% that has not been used for the longest amount of time. This
matthias@3268
    76
%% smallest key is included in the messages to the server. As such,
matthias@3268
    77
%% the server keeps track of when the least recently used file handle
matthias@3268
    78
%% was used *at the point of the most recent open or close* by each
matthias@3268
    79
%% client.
matthias@3268
    80
%%
matthias@3268
    81
%% Note that this data can go very out of date, by the client using
matthias@3268
    82
%% the least recently used handle.
matthias@3268
    83
%%
matthias@3268
    84
%% When the limit is reached, the server calculates the average age of
matthias@3268
    85
%% the last reported least recently used file handle of all the
matthias@3268
    86
%% clients. It then tells all the clients to close any handles not
matthias@3268
    87
%% used for longer than this average, by invoking the callback the
matthias@3268
    88
%% client registered. The client should receive this message and pass
matthias@3268
    89
%% it into set_maximum_since_use/1. However, it is highly possible
matthias@3268
    90
%% this age will be greater than the ages of all the handles the
matthias@3268
    91
%% client knows of because the client has used its file handles in the
matthias@3268
    92
%% mean time. Thus at this point the client reports to the server the
matthias@3268
    93
%% current timestamp at which its least recently used file handle was
matthias@3268
    94
%% last used. The server will check two seconds later that either it
matthias@3268
    95
%% is back under the limit, in which case all is well again, or if
matthias@3268
    96
%% not, it will calculate a new average age. Its data will be much
matthias@3268
    97
%% more recent now, and so it is very likely that when this is
matthias@3268
    98
%% communicated to the clients, the clients will close file handles.
matthias@3268
    99
%%
matthias@3268
   100
%% The advantage of this scheme is that there is only communication
matthias@3268
   101
%% from the client to the server on open, close, and when in the
matthias@3268
   102
%% process of trying to reduce file handle usage. There is no
matthias@3268
   103
%% communication from the client to the server on normal file handle
matthias@3268
   104
%% operations. This scheme forms a feed-back loop - the server does
matthias@3268
   105
%% not care which file handles are closed, just that some are, and it
matthias@3268
   106
%% checks this repeatedly when over the limit. Given the guarantees of
matthias@3268
   107
%% now(), even if there is just one file handle open, a limit of 1,
matthias@3268
   108
%% and one client, it is certain that when the client calculates the
matthias@3268
   109
%% age of the handle, it will be greater than when the server
matthias@3268
   110
%% calculated it, hence it should be closed.
matthias@3268
   111
%%
matthias@3268
   112
%% Handles which are closed as a result of the server are put into a
matthias@3268
   113
%% "soft-closed" state in which the handle is closed (data flushed out
matthias@3268
   114
%% and sync'd first) but the state is maintained. The handle will be
matthias@3268
   115
%% fully reopened again as soon as needed, thus users of this library
matthias@3268
   116
%% do not need to worry about their handles being closed by the server
matthias@3268
   117
%% - reopening them when necessary is handled transparently.
matthias@3268
   118
%%
matthias@3268
   119
%% The server also supports obtain and release_on_death. obtain/0
matthias@3268
   120
%% blocks until a file descriptor is available. release_on_death/1
matthias@3268
   121
%% takes a pid and monitors the pid, reducing the count by 1 when the
matthias@3268
   122
%% pid dies. Thus the assumption is that obtain/0 is called first, and
matthias@3268
   123
%% when that returns, release_on_death/1 is called with the pid who
matthias@3268
   124
%% "owns" the file descriptor. This is, for example, used to track the
matthias@3268
   125
%% use of file descriptors through network sockets.
matthias@3268
   126
matthias@3268
   127
-behaviour(gen_server).
matthias@3268
   128
matthias@3268
   129
-export([register_callback/3]).
matthias@3268
   130
-export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1,
matthias@3268
   131
         last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1,
matthias@3268
   132
         flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]).
matthias@3268
   133
-export([release_on_death/1, obtain/0]).
matthias@3268
   134
matthias@3268
   135
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
matthias@3268
   136
         terminate/2, code_change/3]).
matthias@3268
   137
matthias@3268
   138
-define(SERVER, ?MODULE).
matthias@3268
   139
-define(RESERVED_FOR_OTHERS, 100).
matthias@3268
   140
-define(FILE_HANDLES_LIMIT_WINDOWS, 10000000).
matthias@3268
   141
-define(FILE_HANDLES_LIMIT_OTHER, 1024).
matthias@3268
   142
-define(FILE_HANDLES_CHECK_INTERVAL, 2000).
matthias@3268
   143
matthias@3268
   144
%%----------------------------------------------------------------------------
matthias@3268
   145
matthias@3268
   146
-record(file,
matthias@3268
   147
        { reader_count,
matthias@3268
   148
          has_writer
matthias@3268
   149
        }).
matthias@3268
   150
matthias@3268
   151
-record(handle,
matthias@3268
   152
        { hdl,
matthias@3268
   153
          offset,
matthias@3268
   154
          trusted_offset,
matthias@3268
   155
          is_dirty,
matthias@3268
   156
          write_buffer_size,
matthias@3268
   157
          write_buffer_size_limit,
matthias@3268
   158
          write_buffer,
matthias@3268
   159
          at_eof,
matthias@3268
   160
          path,
matthias@3268
   161
          mode,
matthias@3268
   162
          options,
matthias@3268
   163
          is_write,
matthias@3268
   164
          is_read,
matthias@3268
   165
          last_used_at
matthias@3268
   166
        }).
matthias@3268
   167
matthias@3268
   168
-record(fhc_state,
matthias@3268
   169
        { elders,
matthias@3268
   170
          limit,
matthias@3268
   171
          count,
matthias@3268
   172
          obtains,
matthias@3268
   173
          callbacks,
matthias@3268
   174
          client_mrefs,
matthias@3268
   175
          timer_ref
matthias@3268
   176
        }).
matthias@3268
   177
matthias@3268
   178
%%----------------------------------------------------------------------------
matthias@3268
   179
%% Specs
matthias@3268
   180
%%----------------------------------------------------------------------------
matthias@3268
   181
matthias@3268
   182
-ifdef(use_specs).
matthias@3268
   183
matthias@3268
   184
-type(ref() :: any()).
alexandru@3905
   185
-type(ok_or_error() :: rabbit_types:ok_or_error(any())).
matthew@3933
   186
-type(val_or_error(T) :: rabbit_types:ok_or_error2(T, any())).
matthias@3268
   187
-type(position() :: ('bof' | 'eof' | non_neg_integer() |
alexandru@3910
   188
                     {('bof' |'eof'), non_neg_integer()} |
alexandru@3910
   189
                     {'cur', integer()})).
matthias@3268
   190
-type(offset() :: non_neg_integer()).
matthias@3268
   191
matthias@3268
   192
-spec(register_callback/3 :: (atom(), atom(), [any()]) -> 'ok').
matthias@3268
   193
-spec(open/3 ::
alexandru@3910
   194
        (string(), [any()],
alexandru@3910
   195
         [{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')}])
alexandru@3910
   196
        -> val_or_error(ref())).
matthias@3268
   197
-spec(close/1 :: (ref()) -> ok_or_error()).
matthias@3268
   198
-spec(read/2 :: (ref(), non_neg_integer()) ->
matthias@3268
   199
             val_or_error([char()] | binary()) | 'eof').
matthias@3268
   200
-spec(append/2 :: (ref(), iodata()) -> ok_or_error()).
matthias@3268
   201
-spec(sync/1 :: (ref()) ->  ok_or_error()).
matthias@3268
   202
-spec(position/2 :: (ref(), position()) -> val_or_error(offset())).
matthias@3268
   203
-spec(truncate/1 :: (ref()) -> ok_or_error()).
matthias@3268
   204
-spec(last_sync_offset/1       :: (ref()) -> val_or_error(offset())).
matthias@3268
   205
-spec(current_virtual_offset/1 :: (ref()) -> val_or_error(offset())).
matthias@3268
   206
-spec(current_raw_offset/1     :: (ref()) -> val_or_error(offset())).
matthias@3268
   207
-spec(flush/1 :: (ref()) -> ok_or_error()).
matthias@3268
   208
-spec(copy/3 :: (ref(), ref(), non_neg_integer()) ->
matthias@3268
   209
             val_or_error(non_neg_integer())).
matthias@3268
   210
-spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok').
matthias@3268
   211
-spec(delete/1 :: (ref()) -> ok_or_error()).
matthias@3268
   212
-spec(clear/1 :: (ref()) -> ok_or_error()).
matthias@3268
   213
-spec(release_on_death/1 :: (pid()) -> 'ok').
matthias@3268
   214
-spec(obtain/0 :: () -> 'ok').
matthias@3268
   215
matthias@3268
   216
-endif.
matthias@3268
   217
matthias@3268
   218
%%----------------------------------------------------------------------------
matthias@3268
   219
%% Public API
matthias@3268
   220
%%----------------------------------------------------------------------------
matthias@3268
   221
matthias@3268
   222
start_link() ->
matthias@3268
   223
    gen_server:start_link({local, ?SERVER}, ?MODULE, [], [{timeout, infinity}]).
matthias@3268
   224
matthias@3268
   225
register_callback(M, F, A)
matthias@3268
   226
  when is_atom(M) andalso is_atom(F) andalso is_list(A) ->
matthias@3268
   227
    gen_server:cast(?SERVER, {register_callback, self(), {M, F, A}}).
matthias@3268
   228
matthias@3268
   229
open(Path, Mode, Options) ->
matthias@3268
   230
    Path1 = filename:absname(Path),
matthias@3268
   231
    File1 = #file { reader_count = RCount, has_writer = HasWriter } =
matthias@3268
   232
        case get({Path1, fhc_file}) of
matthias@3268
   233
            File = #file {} -> File;
matthias@3268
   234
            undefined       -> #file { reader_count = 0,
matthias@3268
   235
                                       has_writer = false }
matthias@3268
   236
        end,
matthias@3268
   237
    Mode1 = append_to_write(Mode),
matthias@3268
   238
    IsWriter = is_writer(Mode1),
matthias@3268
   239
    case IsWriter andalso HasWriter of
matthias@3268
   240
        true  -> {error, writer_exists};
matthias@3268
   241
        false -> Ref = make_ref(),
matthias@3268
   242
                 case open1(Path1, Mode1, Options, Ref, bof, new) of
matthias@3268
   243
                     {ok, _Handle} ->
matthias@3268
   244
                         RCount1 = case is_reader(Mode1) of
matthias@3268
   245
                                       true  -> RCount + 1;
matthias@3268
   246
                                       false -> RCount
matthias@3268
   247
                                   end,
matthias@3268
   248
                         HasWriter1 = HasWriter orelse IsWriter,
matthias@3268
   249
                         put({Path1, fhc_file},
matthias@3268
   250
                             File1 #file { reader_count = RCount1,
matthias@3268
   251
                                           has_writer = HasWriter1 }),
matthias@3268
   252
                         {ok, Ref};
matthias@3268
   253
                     Error ->
matthias@3268
   254
                         Error
matthias@3268
   255
                 end
matthias@3268
   256
    end.
matthias@3268
   257
matthias@3268
   258
close(Ref) ->
matthias@3268
   259
    case erase({Ref, fhc_handle}) of
matthias@3268
   260
        undefined -> ok;
matthias@3268
   261
        Handle    -> case hard_close(Handle) of
matthias@3268
   262
                         ok               -> ok;
matthias@3268
   263
                         {Error, Handle1} -> put_handle(Ref, Handle1),
matthias@3268
   264
                                             Error
matthias@3268
   265
                     end
matthias@3268
   266
    end.
matthias@3268
   267
matthias@3268
   268
read(Ref, Count) ->
matthias@3268
   269
    with_flushed_handles(
matthias@3268
   270
      [Ref],
matthias@3268
   271
      fun ([#handle { is_read = false }]) ->
matthias@3268
   272
              {error, not_open_for_reading};
matthias@3268
   273
          ([Handle = #handle { hdl = Hdl, offset = Offset }]) ->
matthias@3268
   274
              case file:read(Hdl, Count) of
matthias@3268
   275
                  {ok, Data} = Obj -> Offset1 = Offset + iolist_size(Data),
matthias@3268
   276
                                      {Obj,
matthias@3268
   277
                                       [Handle #handle { offset = Offset1 }]};
matthias@3268
   278
                  eof              -> {eof, [Handle #handle { at_eof = true }]};
matthias@3268
   279
                  Error            -> {Error, [Handle]}
matthias@3268
   280
              end
matthias@3268
   281
      end).
matthias@3268
   282
matthias@3268
   283
append(Ref, Data) ->
matthias@3268
   284
    with_handles(
matthias@3268
   285
      [Ref],
matthias@3268
   286
      fun ([#handle { is_write = false }]) ->
matthias@3268
   287
              {error, not_open_for_writing};
matthias@3268
   288
          ([Handle]) ->
matthias@3268
   289
              case maybe_seek(eof, Handle) of
matthias@3268
   290
                  {{ok, _Offset}, #handle { hdl = Hdl, offset = Offset,
matthias@3268
   291
                                            write_buffer_size_limit = 0,
matthias@3268
   292
                                            at_eof = true } = Handle1} ->
matthias@3268
   293
                      Offset1 = Offset + iolist_size(Data),
matthias@3268
   294
                      {file:write(Hdl, Data),
matthias@3268
   295
                       [Handle1 #handle { is_dirty = true, offset = Offset1 }]};
matthias@3268
   296
                  {{ok, _Offset}, #handle { write_buffer = WriteBuffer,
matthias@3268
   297
                                            write_buffer_size = Size,
matthias@3268
   298
                                            write_buffer_size_limit = Limit,
matthias@3268
   299
                                            at_eof = true } = Handle1} ->
matthias@3268
   300
                      WriteBuffer1 = [Data | WriteBuffer],
matthias@3268
   301
                      Size1 = Size + iolist_size(Data),
matthias@3268
   302
                      Handle2 = Handle1 #handle { write_buffer = WriteBuffer1,
matthias@3268
   303
                                                  write_buffer_size = Size1 },
matthias@3268
   304
                      case Limit /= infinity andalso Size1 > Limit of
matthias@3268
   305
                          true  -> {Result, Handle3} = write_buffer(Handle2),
matthias@3268
   306
                                   {Result, [Handle3]};
matthias@3268
   307
                          false -> {ok, [Handle2]}
matthias@3268
   308
                      end;
matthias@3268
   309
                  {{error, _} = Error, Handle1} ->
matthias@3268
   310
                      {Error, [Handle1]}
matthias@3268
   311
              end
matthias@3268
   312
      end).
matthias@3268
   313
matthias@3268
   314
sync(Ref) ->
matthias@3268
   315
    with_flushed_handles(
matthias@3268
   316
      [Ref],
matthias@3268
   317
      fun ([#handle { is_dirty = false, write_buffer = [] }]) ->
matthias@3268
   318
              ok;
matthias@3268
   319
          ([Handle = #handle { hdl = Hdl, offset = Offset,
matthias@3268
   320
                               is_dirty = true, write_buffer = [] }]) ->
matthias@3268
   321
              case file:sync(Hdl) of
matthias@3268
   322
                  ok    -> {ok, [Handle #handle { trusted_offset = Offset,
matthias@3268
   323
                                                  is_dirty = false }]};
matthias@3268
   324
                  Error -> {Error, [Handle]}
matthias@3268
   325
              end
matthias@3268
   326
      end).
matthias@3268
   327
matthias@3268
   328
position(Ref, NewOffset) ->
matthias@3268
   329
    with_flushed_handles(
matthias@3268
   330
      [Ref],
matthias@3268
   331
      fun ([Handle]) -> {Result, Handle1} = maybe_seek(NewOffset, Handle),
matthias@3268
   332
                        {Result, [Handle1]}
matthias@3268
   333
      end).
matthias@3268
   334
matthias@3268
   335
truncate(Ref) ->
matthias@3268
   336
    with_flushed_handles(
matthias@3268
   337
      [Ref],
matthias@3268
   338
      fun ([Handle1 = #handle { hdl = Hdl, offset = Offset,
matthias@3268
   339
                                trusted_offset = TOffset }]) ->
matthias@3268
   340
              case file:truncate(Hdl) of
matthias@3268
   341
                  ok    -> TOffset1 = lists:min([Offset, TOffset]),
matthias@3268
   342
                           {ok, [Handle1 #handle { trusted_offset = TOffset1,
matthias@3268
   343
                                                   at_eof = true }]};
matthias@3268
   344
                  Error -> {Error, [Handle1]}
matthias@3268
   345
              end
matthias@3268
   346
      end).
matthias@3268
   347
matthias@3268
   348
last_sync_offset(Ref) ->
matthias@3268
   349
    with_handles([Ref], fun ([#handle { trusted_offset = TOffset }]) ->
matthias@3268
   350
                                {ok, TOffset}
matthias@3268
   351
                        end).
matthias@3268
   352
matthias@3268
   353
current_virtual_offset(Ref) ->
matthias@3268
   354
    with_handles([Ref], fun ([#handle { at_eof = true, is_write = true,
matthias@3268
   355
                                        offset = Offset,
matthias@3268
   356
                                        write_buffer_size = Size }]) ->
matthias@3268
   357
                                {ok, Offset + Size};
matthias@3268
   358
                            ([#handle { offset = Offset }]) ->
matthias@3268
   359
                                {ok, Offset}
matthias@3268
   360
                        end).
matthias@3268
   361
matthias@3268
   362
current_raw_offset(Ref) ->
matthias@3268
   363
    with_handles([Ref], fun ([Handle]) -> {ok, Handle #handle.offset} end).
matthias@3268
   364
matthias@3268
   365
flush(Ref) ->
matthias@3268
   366
    with_flushed_handles([Ref], fun ([Handle]) -> {ok, [Handle]} end).
matthias@3268
   367
matthias@3268
   368
copy(Src, Dest, Count) ->
matthias@3268
   369
    with_flushed_handles(
matthias@3268
   370
      [Src, Dest],
matthias@3268
   371
      fun ([SHandle = #handle { is_read  = true, hdl = SHdl, offset = SOffset },
matthias@3268
   372
            DHandle = #handle { is_write = true, hdl = DHdl, offset = DOffset }]
matthias@3268
   373
          ) ->
matthias@3268
   374
              case file:copy(SHdl, DHdl, Count) of
matthias@3268
   375
                  {ok, Count1} = Result1 ->
matthias@3268
   376
                      {Result1,
matthias@3268
   377
                       [SHandle #handle { offset = SOffset + Count1 },
matthias@3268
   378
                        DHandle #handle { offset = DOffset + Count1 }]};
matthias@3268
   379
                  Error ->
matthias@3268
   380
                      {Error, [SHandle, DHandle]}
matthias@3268
   381
              end;
matthias@3268
   382
          (_Handles) ->
matthias@3268
   383
              {error, incorrect_handle_modes}
matthias@3268
   384
      end).
matthias@3268
   385
matthias@3268
   386
delete(Ref) ->
matthias@3268
   387
    case erase({Ref, fhc_handle}) of
matthias@3268
   388
        undefined ->
matthias@3268
   389
            ok;
matthias@3268
   390
        Handle = #handle { path = Path } ->
matthias@3268
   391
            case hard_close(Handle #handle { is_dirty = false,
matthias@3268
   392
                                             write_buffer = [] }) of
matthias@3268
   393
                ok               -> file:delete(Path);
matthias@3268
   394
                {Error, Handle1} -> put_handle(Ref, Handle1),
matthias@3268
   395
                                    Error
matthias@3268
   396
            end
matthias@3268
   397
    end.
matthias@3268
   398
matthias@3268
   399
clear(Ref) ->
matthias@3268
   400
    with_handles(
matthias@3268
   401
      [Ref],
matthias@3268
   402
      fun ([#handle { at_eof = true, write_buffer_size = 0, offset = 0 }]) ->
matthias@3268
   403
              ok;
matthias@3268
   404
          ([Handle]) ->
matthias@3268
   405
              case maybe_seek(bof, Handle #handle { write_buffer = [],
matthias@3268
   406
                                                    write_buffer_size = 0 }) of
matthias@3268
   407
                  {{ok, 0}, Handle1 = #handle { hdl = Hdl }} ->
matthias@3268
   408
                      case file:truncate(Hdl) of
matthias@3268
   409
                          ok    -> {ok, [Handle1 #handle {trusted_offset = 0,
matthias@3268
   410
                                                          at_eof = true }]};
matthias@3268
   411
                          Error -> {Error, [Handle1]}
matthias@3268
   412
                      end;
matthias@3268
   413
                  {{error, _} = Error, Handle1} ->
matthias@3268
   414
                      {Error, [Handle1]}
matthias@3268
   415
              end
matthias@3268
   416
      end).
matthias@3268
   417
matthias@3268
   418
set_maximum_since_use(MaximumAge) ->
matthias@3268
   419
    Now = now(),
matthias@3268
   420
    case lists:foldl(
matthias@3268
   421
           fun ({{Ref, fhc_handle},
matthias@3268
   422
                 Handle = #handle { hdl = Hdl, last_used_at = Then }}, Rep) ->
matthias@3268
   423
                   Age = timer:now_diff(Now, Then),
matthias@3268
   424
                   case Hdl /= closed andalso Age >= MaximumAge of
matthias@3268
   425
                       true  -> {Res, Handle1} = soft_close(Handle),
matthias@3268
   426
                                case Res of
matthias@3268
   427
                                    ok -> put({Ref, fhc_handle}, Handle1),
matthias@3268
   428
                                          false;
matthias@3268
   429
                                    _  -> put_handle(Ref, Handle1),
matthias@3268
   430
                                          Rep
matthias@3268
   431
                                end;
matthias@3268
   432
                       false -> Rep
matthias@3268
   433
                   end;
matthias@3268
   434
               (_KeyValuePair, Rep) ->
matthias@3268
   435
                   Rep
matthias@3268
   436
           end, true, get()) of
matthias@3268
   437
        true  -> age_tree_change(), ok;
matthias@3268
   438
        false -> ok
matthias@3268
   439
    end.
matthias@3268
   440
matthias@3268
   441
release_on_death(Pid) when is_pid(Pid) ->
matthias@3268
   442
    gen_server:cast(?SERVER, {release_on_death, Pid}).
matthias@3268
   443
matthias@3268
   444
obtain() ->
matthias@3268
   445
    gen_server:call(?SERVER, obtain, infinity).
matthias@3268
   446
matthias@3268
   447
%%----------------------------------------------------------------------------
matthias@3268
   448
%% Internal functions
matthias@3268
   449
%%----------------------------------------------------------------------------
matthias@3268
   450
matthias@3268
   451
is_reader(Mode) -> lists:member(read, Mode).
matthias@3268
   452
matthias@3268
   453
is_writer(Mode) -> lists:member(write, Mode).
matthias@3268
   454
matthias@3268
   455
append_to_write(Mode) ->
matthias@3268
   456
    case lists:member(append, Mode) of
matthias@3268
   457
        true  -> [write | Mode -- [append, write]];
matthias@3268
   458
        false -> Mode
matthias@3268
   459
    end.
matthias@3268
   460
matthias@3268
   461
with_handles(Refs, Fun) ->
matthias@3268
   462
    ResHandles = lists:foldl(
matthias@3268
   463
                   fun (Ref, {ok, HandlesAcc}) ->
matthias@3268
   464
                           case get_or_reopen(Ref) of
matthias@3268
   465
                               {ok, Handle} -> {ok, [Handle | HandlesAcc]};
matthias@3268
   466
                               Error        -> Error
matthias@3268
   467
                           end;
matthias@3268
   468
                       (_Ref, Error) ->
matthias@3268
   469
                           Error
matthias@3268
   470
                   end, {ok, []}, Refs),
matthias@3268
   471
    case ResHandles of
matthias@3268
   472
        {ok, Handles} ->
matthias@3268
   473
            case Fun(lists:reverse(Handles)) of
matthias@3268
   474
                {Result, Handles1} when is_list(Handles1) ->
matthias@3268
   475
                    lists:zipwith(fun put_handle/2, Refs, Handles1),
matthias@3268
   476
                    Result;
matthias@3268
   477
                Result ->
matthias@3268
   478
                    Result
matthias@3268
   479
            end;
matthias@3268
   480
        Error ->
matthias@3268
   481
            Error
matthias@3268
   482
    end.
matthias@3268
   483
matthias@3268
   484
with_flushed_handles(Refs, Fun) ->
matthias@3268
   485
    with_handles(
matthias@3268
   486
      Refs,
matthias@3268
   487
      fun (Handles) ->
matthias@3268
   488
              case lists:foldl(
matthias@3268
   489
                     fun (Handle, {ok, HandlesAcc}) ->
matthias@3268
   490
                             {Res, Handle1} = write_buffer(Handle),
matthias@3268
   491
                             {Res, [Handle1 | HandlesAcc]};
matthias@3268
   492
                         (Handle, {Error, HandlesAcc}) ->
matthias@3268
   493
                             {Error, [Handle | HandlesAcc]}
matthias@3268
   494
                     end, {ok, []}, Handles) of
matthias@3268
   495
                  {ok, Handles1} ->
matthias@3268
   496
                      Fun(lists:reverse(Handles1));
matthias@3268
   497
                  {Error, Handles1} ->
matthias@3268
   498
                      {Error, lists:reverse(Handles1)}
matthias@3268
   499
              end
matthias@3268
   500
      end).
matthias@3268
   501
matthias@3268
   502
get_or_reopen(Ref) ->
matthias@3268
   503
    case get({Ref, fhc_handle}) of
matthias@3268
   504
        undefined ->
matthias@3268
   505
            {error, not_open, Ref};
matthias@3268
   506
        #handle { hdl = closed, offset = Offset,
matthias@3268
   507
                  path = Path, mode = Mode, options = Options } ->
matthias@3268
   508
            open1(Path, Mode, Options, Ref, Offset, reopen);
matthias@3268
   509
        Handle ->
matthias@3268
   510
            {ok, Handle}
matthias@3268
   511
    end.
matthias@3268
   512
matthias@3268
   513
put_handle(Ref, Handle = #handle { last_used_at = Then }) ->
matthias@3268
   514
    Now = now(),
matthias@3268
   515
    age_tree_update(Then, Now, Ref),
matthias@3268
   516
    put({Ref, fhc_handle}, Handle #handle { last_used_at = Now }).
matthias@3268
   517
matthias@3268
   518
with_age_tree(Fun) ->
matthias@3268
   519
    put(fhc_age_tree, Fun(case get(fhc_age_tree) of
matthias@3268
   520
                              undefined -> gb_trees:empty();
matthias@3268
   521
                              AgeTree   -> AgeTree
matthias@3268
   522
                          end)).
matthias@3268
   523
matthias@3268
   524
age_tree_insert(Now, Ref) ->
matthias@3268
   525
    with_age_tree(
matthias@3268
   526
      fun (Tree) ->
matthias@3268
   527
              Tree1 = gb_trees:insert(Now, Ref, Tree),
matthias@3268
   528
              {Oldest, _Ref} = gb_trees:smallest(Tree1),
matthias@3268
   529
              gen_server:cast(?SERVER, {open, self(), Oldest}),
matthias@3268
   530
              Tree1
matthias@3268
   531
      end).
matthias@3268
   532
matthias@3268
   533
age_tree_update(Then, Now, Ref) ->
matthias@3268
   534
    with_age_tree(
matthias@3268
   535
      fun (Tree) ->
matthias@3268
   536
              gb_trees:insert(Now, Ref, gb_trees:delete_any(Then, Tree))
matthias@3268
   537
      end).
matthias@3268
   538
matthias@3268
   539
age_tree_delete(Then) ->
matthias@3268
   540
    with_age_tree(
matthias@3268
   541
      fun (Tree) ->
matthias@3268
   542
              Tree1 = gb_trees:delete_any(Then, Tree),
matthias@3268
   543
              Oldest = case gb_trees:is_empty(Tree1) of
matthias@3268
   544
                           true ->
matthias@3268
   545
                               undefined;
matthias@3268
   546
                           false ->
matthias@3268
   547
                               {Oldest1, _Ref} = gb_trees:smallest(Tree1),
matthias@3268
   548
                               Oldest1
matthias@3268
   549
                       end,
matthias@3268
   550
              gen_server:cast(?SERVER, {close, self(), Oldest}),
matthias@3268
   551
              Tree1
matthias@3268
   552
      end).
matthias@3268
   553
matthias@3268
   554
age_tree_change() ->
matthias@3268
   555
    with_age_tree(
matthias@3268
   556
      fun (Tree) ->
matthias@3268
   557
              case gb_trees:is_empty(Tree) of
matthias@3268
   558
                  true  -> Tree;
matthias@3268
   559
                  false -> {Oldest, _Ref} = gb_trees:smallest(Tree),
matthias@3268
   560
                           gen_server:cast(?SERVER, {update, self(), Oldest})
matthias@3268
   561
              end,
matthias@3268
   562
              Tree
matthias@3268
   563
      end).
matthias@3268
   564
matthias@3268
   565
open1(Path, Mode, Options, Ref, Offset, NewOrReopen) ->
matthias@3268
   566
    Mode1 = case NewOrReopen of
matthias@3268
   567
                new    -> Mode;
matthias@3268
   568
                reopen -> [read | Mode]
matthias@3268
   569
            end,
matthias@3268
   570
    case file:open(Path, Mode1) of
matthias@3268
   571
        {ok, Hdl} ->
matthias@3268
   572
            WriteBufferSize =
matthias@3268
   573
                case proplists:get_value(write_buffer, Options, unbuffered) of
matthias@3268
   574
                    unbuffered           -> 0;
matthias@3268
   575
                    infinity             -> infinity;
matthias@3268
   576
                    N when is_integer(N) -> N
matthias@3268
   577
                end,
matthias@3268
   578
            Now = now(),
matthias@3268
   579
            Handle = #handle { hdl                     = Hdl,
matthias@3268
   580
                               offset                  = 0,
matthias@3268
   581
                               trusted_offset          = 0,
matthias@3268
   582
                               is_dirty                = false,
matthias@3268
   583
                               write_buffer_size       = 0,
matthias@3268
   584
                               write_buffer_size_limit = WriteBufferSize,
matthias@3268
   585
                               write_buffer            = [],
matthias@3268
   586
                               at_eof                  = false,
matthias@3268
   587
                               path                    = Path,
matthias@3268
   588
                               mode                    = Mode,
matthias@3268
   589
                               options                 = Options,
matthias@3268
   590
                               is_write                = is_writer(Mode),
matthias@3268
   591
                               is_read                 = is_reader(Mode),
matthias@3268
   592
                               last_used_at            = Now },
matthias@3268
   593
            {{ok, Offset1}, Handle1} = maybe_seek(Offset, Handle),
matthias@3268
   594
            Handle2 = Handle1 #handle { trusted_offset = Offset1 },
matthias@3268
   595
            put({Ref, fhc_handle}, Handle2),
matthias@3268
   596
            age_tree_insert(Now, Ref),
matthias@3268
   597
            {ok, Handle2};
matthias@3268
   598
        {error, Reason} ->
matthias@3268
   599
            {error, Reason}
matthias@3268
   600
    end.
matthias@3268
   601
matthias@3268
   602
soft_close(Handle = #handle { hdl = closed }) ->
matthias@3268
   603
    {ok, Handle};
matthias@3268
   604
soft_close(Handle) ->
matthias@3268
   605
    case write_buffer(Handle) of
matthias@3268
   606
        {ok, #handle { hdl = Hdl, offset = Offset, is_dirty = IsDirty,
matthias@3268
   607
                       last_used_at = Then } = Handle1 } ->
matthias@3268
   608
            ok = case IsDirty of
matthias@3268
   609
                     true  -> file:sync(Hdl);
matthias@3268
   610
                     false -> ok
matthias@3268
   611
                 end,
matthias@3268
   612
            ok = file:close(Hdl),
matthias@3268
   613
            age_tree_delete(Then),
matthias@3268
   614
            {ok, Handle1 #handle { hdl = closed, trusted_offset = Offset,
matthias@3268
   615
                                   is_dirty = false }};
matthias@3268
   616
        {_Error, _Handle} = Result ->
matthias@3268
   617
            Result
matthias@3268
   618
    end.
matthias@3268
   619
matthias@3268
   620
hard_close(Handle) ->
matthias@3268
   621
    case soft_close(Handle) of
matthias@3268
   622
        {ok, #handle { path = Path,
matthias@3268
   623
                       is_read = IsReader, is_write = IsWriter }} ->
matthias@3268
   624
            #file { reader_count = RCount, has_writer = HasWriter } = File =
matthias@3268
   625
                get({Path, fhc_file}),
matthias@3268
   626
            RCount1 = case IsReader of
matthias@3268
   627
                          true  -> RCount - 1;
matthias@3268
   628
                          false -> RCount
matthias@3268
   629
                      end,
matthias@3268
   630
            HasWriter1 = HasWriter andalso not IsWriter,
matthias@3268
   631
            case RCount1 =:= 0 andalso not HasWriter1 of
matthias@3268
   632
                true  -> erase({Path, fhc_file});
matthias@3268
   633
                false -> put({Path, fhc_file},
matthias@3268
   634
                             File #file { reader_count = RCount1,
matthias@3268
   635
                                          has_writer = HasWriter1 })
matthias@3268
   636
            end,
matthias@3268
   637
            ok;
matthias@3268
   638
        {_Error, _Handle} = Result ->
matthias@3268
   639
            Result
matthias@3268
   640
    end.
matthias@3268
   641
matthias@3268
   642
maybe_seek(NewOffset, Handle = #handle { hdl = Hdl, offset = Offset,
matthias@3268
   643
                                         at_eof = AtEoF }) ->
matthias@3268
   644
    {AtEoF1, NeedsSeek} = needs_seek(AtEoF, Offset, NewOffset),
matthias@3268
   645
    case (case NeedsSeek of
matthias@3268
   646
              true  -> file:position(Hdl, NewOffset);
matthias@3268
   647
              false -> {ok, Offset}
matthias@3268
   648
          end) of
matthias@3268
   649
        {ok, Offset1} = Result ->
matthias@3268
   650
            {Result, Handle #handle { offset = Offset1, at_eof = AtEoF1 }};
matthias@3268
   651
        {error, _} = Error ->
matthias@3268
   652
            {Error, Handle}
matthias@3268
   653
    end.
matthias@3268
   654
matthias@3268
   655
needs_seek( AtEoF, _CurOffset,  cur     ) -> {AtEoF, false};
matthias@3268
   656
needs_seek( AtEoF, _CurOffset,  {cur, 0}) -> {AtEoF, false};
matthias@3268
   657
needs_seek(  true, _CurOffset,  eof     ) -> {true , false};
matthias@3268
   658
needs_seek(  true, _CurOffset,  {eof, 0}) -> {true , false};
matthias@3268
   659
needs_seek( false, _CurOffset,  eof     ) -> {true , true };
matthias@3268
   660
needs_seek( false, _CurOffset,  {eof, 0}) -> {true , true };
matthias@3268
   661
needs_seek( AtEoF,          0,  bof     ) -> {AtEoF, false};
matthias@3268
   662
needs_seek( AtEoF,          0,  {bof, 0}) -> {AtEoF, false};
matthias@3268
   663
needs_seek( AtEoF,  CurOffset, CurOffset) -> {AtEoF, false};
matthias@3268
   664
needs_seek(  true,  CurOffset, {bof, DesiredOffset})
matthias@3268
   665
  when DesiredOffset >= CurOffset ->
matthias@3268
   666
    {true, true};
matthias@3268
   667
needs_seek(  true, _CurOffset, {cur, DesiredOffset})
matthias@3268
   668
  when DesiredOffset > 0 ->
matthias@3268
   669
    {true, true};
matthias@3268
   670
needs_seek(  true,  CurOffset, DesiredOffset) %% same as {bof, DO}
matthias@3268
   671
  when is_integer(DesiredOffset) andalso DesiredOffset >= CurOffset ->
matthias@3268
   672
    {true, true};
matthias@3268
   673
%% because we can't really track size, we could well end up at EoF and not know
matthias@3268
   674
needs_seek(_AtEoF, _CurOffset, _DesiredOffset) ->
matthias@3268
   675
    {false, true}.
matthias@3268
   676
matthias@3268
   677
write_buffer(Handle = #handle { write_buffer = [] }) ->
matthias@3268
   678
    {ok, Handle};
matthias@3268
   679
write_buffer(Handle = #handle { hdl = Hdl, offset = Offset,
matthias@3268
   680
                                write_buffer = WriteBuffer,
matthias@3268
   681
                                write_buffer_size = DataSize,
matthias@3268
   682
                                at_eof = true }) ->
matthias@3268
   683
    case file:write(Hdl, lists:reverse(WriteBuffer)) of
matthias@3268
   684
        ok ->
matthias@3268
   685
            Offset1 = Offset + DataSize,
matthias@3268
   686
            {ok, Handle #handle { offset = Offset1, is_dirty = true,
matthias@3268
   687
                                  write_buffer = [], write_buffer_size = 0 }};
matthias@3268
   688
        {error, _} = Error ->
matthias@3268
   689
            {Error, Handle}
matthias@3268
   690
    end.
matthias@3268
   691
matthias@3268
   692
%%----------------------------------------------------------------------------
matthias@3268
   693
%% gen_server callbacks
matthias@3268
   694
%%----------------------------------------------------------------------------
matthias@3268
   695
matthias@3268
   696
init([]) ->
matthias@3268
   697
    Limit = case application:get_env(file_handles_high_watermark) of
matthias@3268
   698
                {ok, Watermark} when (is_integer(Watermark) andalso
matthias@3268
   699
                                      Watermark > 0) ->
matthias@3268
   700
                    Watermark;
matthias@3268
   701
                _ ->
matthias@3268
   702
                    ulimit()
matthias@3268
   703
            end,
matthias@3268
   704
    error_logger:info_msg("Limiting to approx ~p file handles~n", [Limit]),
matthias@3268
   705
    {ok, #fhc_state { elders = dict:new(), limit = Limit, count = 0,
matthias@3268
   706
                      obtains = [], callbacks = dict:new(),
matthias@3268
   707
                      client_mrefs = dict:new(), timer_ref = undefined }}.
matthias@3268
   708
matthias@3268
   709
handle_call(obtain, From, State = #fhc_state { count = Count }) ->
matthias@3268
   710
    State1 = #fhc_state { count = Count1, limit = Limit, obtains = Obtains } =
matthias@3268
   711
        maybe_reduce(State #fhc_state { count = Count + 1 }),
matthias@3268
   712
    case Limit /= infinity andalso Count1 >= Limit of
matthias@3268
   713
        true  -> {noreply, State1 #fhc_state { obtains = [From | Obtains],
matthias@3268
   714
                                               count = Count1 - 1 }};
matthias@3268
   715
        false -> {reply, ok, State1}
matthias@3268
   716
    end.
matthias@3268
   717
matthias@3268
   718
handle_cast({register_callback, Pid, MFA},
matthias@3268
   719
            State = #fhc_state { callbacks = Callbacks }) ->
matthias@3268
   720
    {noreply, ensure_mref(
matthias@3268
   721
                Pid, State #fhc_state {
matthias@3268
   722
                       callbacks = dict:store(Pid, MFA, Callbacks) })};
matthias@3268
   723
matthias@3268
   724
handle_cast({open, Pid, EldestUnusedSince}, State =
matthias@3268
   725
            #fhc_state { elders = Elders, count = Count }) ->
matthias@3268
   726
    Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
matthias@3268
   727
    {noreply, maybe_reduce(
matthias@3268
   728
                ensure_mref(Pid, State #fhc_state { elders = Elders1,
matthias@3268
   729
                                                    count = Count + 1 }))};
matthias@3268
   730
matthias@3268
   731
handle_cast({update, Pid, EldestUnusedSince}, State =
matthias@3268
   732
            #fhc_state { elders = Elders }) ->
matthias@3268
   733
    Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
matthias@3268
   734
    %% don't call maybe_reduce from here otherwise we can create a
matthias@3268
   735
    %% storm of messages
matthias@3268
   736
    {noreply, ensure_mref(Pid, State #fhc_state { elders = Elders1 })};
matthias@3268
   737
matthias@3268
   738
handle_cast({close, Pid, EldestUnusedSince}, State =
matthias@3268
   739
            #fhc_state { elders = Elders, count = Count }) ->
matthias@3268
   740
    Elders1 = case EldestUnusedSince of
matthias@3268
   741
                  undefined -> dict:erase(Pid, Elders);
matthias@3268
   742
                  _         -> dict:store(Pid, EldestUnusedSince, Elders)
matthias@3268
   743
              end,
matthias@3268
   744
    {noreply, process_obtains(
matthias@3268
   745
                ensure_mref(Pid, State #fhc_state { elders = Elders1,
matthias@3268
   746
                                                    count = Count - 1 }))};
matthias@3268
   747
matthias@3268
   748
handle_cast(check_counts, State) ->
matthias@3268
   749
    {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })};
matthias@3268
   750
matthias@3268
   751
handle_cast({release_on_death, Pid}, State) ->
matthias@3268
   752
    _MRef = erlang:monitor(process, Pid),
matthias@3268
   753
    {noreply, State}.
matthias@3268
   754
matthias@3268
   755
handle_info({'DOWN', MRef, process, Pid, _Reason}, State =
matthias@3268
   756
                #fhc_state { count = Count, callbacks = Callbacks,
matthias@3268
   757
                             client_mrefs = ClientMRefs, elders = Elders }) ->
matthias@3268
   758
    {noreply, process_obtains(
matthias@3268
   759
                case dict:find(Pid, ClientMRefs) of
matthias@3268
   760
                    {ok, MRef} -> State #fhc_state {
matthias@3268
   761
                                    elders       = dict:erase(Pid, Elders),
matthias@3268
   762
                                    client_mrefs = dict:erase(Pid, ClientMRefs),
matthias@3268
   763
                                    callbacks    = dict:erase(Pid, Callbacks) };
matthias@3268
   764
                    _          -> State #fhc_state { count = Count - 1 }
matthias@3268
   765
                end)}.
matthias@3268
   766
matthias@3268
   767
terminate(_Reason, State) ->
matthias@3268
   768
    State.
matthias@3268
   769
matthias@3268
   770
code_change(_OldVsn, State, _Extra) ->
matthias@3268
   771
    {ok, State}.
matthias@3268
   772
matthias@3268
   773
%%----------------------------------------------------------------------------
matthias@3268
   774
%% server helpers
matthias@3268
   775
%%----------------------------------------------------------------------------
matthias@3268
   776
matthias@3268
   777
process_obtains(State = #fhc_state { obtains = [] }) ->
matthias@3268
   778
    State;
matthias@3268
   779
process_obtains(State = #fhc_state { limit = Limit, count = Count })
matthias@3268
   780
  when Limit /= infinity andalso Count >= Limit ->
matthias@3268
   781
    State;
matthias@3268
   782
process_obtains(State = #fhc_state { limit = Limit, count = Count,
matthias@3268
   783
                                     obtains = Obtains }) ->
matthias@3268
   784
    ObtainsLen = length(Obtains),
matthias@3268
   785
    ObtainableLen = lists:min([ObtainsLen, Limit - Count]),
matthias@3268
   786
    Take = ObtainsLen - ObtainableLen,
matthias@3268
   787
    {ObtainsNew, ObtainableRev} = lists:split(Take, Obtains),
matthias@3268
   788
    [gen_server:reply(From, ok) || From <- ObtainableRev],
matthias@3268
   789
    State #fhc_state { count = Count + ObtainableLen, obtains = ObtainsNew }.
matthias@3268
   790
matthias@3268
   791
maybe_reduce(State = #fhc_state { limit = Limit, count = Count, elders = Elders,
matthias@3268
   792
                                  callbacks = Callbacks, timer_ref = TRef })
matthias@3268
   793
  when Limit /= infinity andalso Count >= Limit ->
matthias@3268
   794
    Now = now(),
matthias@3268
   795
    {Pids, Sum, ClientCount} =
matthias@3268
   796
        dict:fold(fun (_Pid, undefined, Accs) ->
matthias@3268
   797
                          Accs;
matthias@3268
   798
                      (Pid, Eldest, {PidsAcc, SumAcc, CountAcc}) ->
matthias@3268
   799
                          {[Pid|PidsAcc], SumAcc + timer:now_diff(Now, Eldest),
matthias@3268
   800
                           CountAcc + 1}
matthias@3268
   801
                  end, {[], 0, 0}, Elders),
matthias@3268
   802
    case Pids of
matthias@3268
   803
        [] -> ok;
matthias@3268
   804
        _  -> AverageAge = Sum / ClientCount,
matthias@3268
   805
              lists:foreach(
matthias@3268
   806
                fun (Pid) ->
matthias@3268
   807
                        case dict:find(Pid, Callbacks) of
matthias@3268
   808
                            error           -> ok;
matthias@3268
   809
                            {ok, {M, F, A}} -> apply(M, F, A ++ [AverageAge])
matthias@3268
   810
                        end
matthias@3268
   811
                end, Pids)
matthias@3268
   812
    end,
matthias@3268
   813
    case TRef of
matthias@3268
   814
        undefined -> {ok, TRef1} = timer:apply_after(
matthias@3268
   815
                                     ?FILE_HANDLES_CHECK_INTERVAL,
matthias@3268
   816
                                     gen_server, cast, [?SERVER, check_counts]),
matthias@3268
   817
                     State #fhc_state { timer_ref = TRef1 };
matthias@3268
   818
        _         -> State
matthias@3268
   819
    end;
matthias@3268
   820
maybe_reduce(State) ->
matthias@3268
   821
    State.
matthias@3268
   822
matthias@3268
   823
%% Googling around suggests that Windows has a limit somewhere around
matthias@3268
   824
%% 16M, eg
matthias@3268
   825
%% http://blogs.technet.com/markrussinovich/archive/2009/09/29/3283844.aspx
matthias@3268
   826
%% For everything else, assume ulimit exists. Further googling
matthias@3268
   827
%% suggests that BSDs (incl OS X), solaris and linux all agree that
matthias@3268
   828
%% ulimit -n is file handles
matthias@3268
   829
ulimit() ->
matthias@3268
   830
    case os:type() of
matthias@3268
   831
        {win32, _OsName} ->
matthias@3268
   832
            ?FILE_HANDLES_LIMIT_WINDOWS;
matthias@3268
   833
        {unix, _OsName} ->
matthias@3268
   834
            %% Under Linux, Solaris and FreeBSD, ulimit is a shell
matthias@3268
   835
            %% builtin, not a command. In OS X, it's a command.
matthias@3268
   836
            %% Fortunately, os:cmd invokes the cmd in a shell env, so
matthias@3268
   837
            %% we're safe in all cases.
matthias@3268
   838
            case os:cmd("ulimit -n") of
matthias@3268
   839
                "unlimited" ->
matthias@3268
   840
                    infinity;
matthias@3268
   841
                String = [C|_] when $0 =< C andalso C =< $9 ->
matthias@3268
   842
                    Num = list_to_integer(
matthias@3268
   843
                            lists:takewhile(
matthias@3268
   844
                              fun (D) -> $0 =< D andalso D =< $9 end, String)) -
matthias@3268
   845
                        ?RESERVED_FOR_OTHERS,
matthias@3268
   846
                    lists:max([1, Num]);
matthias@3268
   847
                _ ->
matthias@3268
   848
                    %% probably a variant of
matthias@3268
   849
                    %% "/bin/sh: line 1: ulimit: command not found\n"
matthias@3268
   850
                    ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS
matthias@3268
   851
            end;
matthias@3268
   852
        _ ->
matthias@3268
   853
            ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS
matthias@3268
   854
    end.
matthias@3268
   855
matthias@3268
   856
ensure_mref(Pid, State = #fhc_state { client_mrefs = ClientMRefs }) ->
matthias@3268
   857
    case dict:find(Pid, ClientMRefs) of
matthias@3268
   858
        {ok, _MRef} -> State;
matthias@3268
   859
        error       -> MRef = erlang:monitor(process, Pid),
matthias@3268
   860
                       State #fhc_state {
matthias@3268
   861
                         client_mrefs = dict:store(Pid, MRef, ClientMRefs) }
matthias@3268
   862
    end.