1 %% The contents of this file are subject to the Mozilla Public License
2 %% Version 1.1 (the "License"); you may not use this file except in
3 %% compliance with the License. You may obtain a copy of the License at
4 %% http://www.mozilla.org/MPL/
6 %% Software distributed under the License is distributed on an "AS IS"
7 %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8 %% License for the specific language governing rights and limitations
11 %% The Original Code is RabbitMQ.
13 %% The Initial Developers of the Original Code are LShift Ltd,
14 %% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
16 %% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
17 %% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
18 %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
19 %% Technologies LLC, and Rabbit Technologies Ltd.
21 %% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
22 %% Ltd. Portions created by Cohesive Financial Technologies LLC are
23 %% Copyright (C) 2007-2010 Cohesive Financial Technologies
24 %% LLC. Portions created by Rabbit Technologies Ltd are Copyright
25 %% (C) 2007-2010 Rabbit Technologies Ltd.
27 %% All Rights Reserved.
29 %% Contributor(s): ______________________________________.
32 -module(file_handle_cache).
34 %% A File Handle Cache
36 %% This extends a subset of the functionality of the Erlang file
40 %% 1) This supports one writer, multiple readers per file. Nothing
42 %% 2) Do not open the same file from different processes. Bad things
44 %% 3) Writes are all appends. You cannot write to the middle of a
45 %% file, although you can truncate and then append if you want.
46 %% 4) Although there is a write buffer, there is no read buffer. Feel
47 %% free to use the read_ahead mode, but beware of the interaction
48 %% between that buffer and the write buffer.
51 %% 1) You do not have to remember to call sync before close
52 %% 2) Buffering is much more flexible than with plain file module, and
53 %% you can control when the buffer gets flushed out. This means that
54 %% you can rely on reads-after-writes working, without having to call
55 %% the expensive sync.
56 %% 3) Unnecessary calls to position and sync get optimised out.
57 %% 4) You can find out what your 'real' offset is, and what your
58 %% 'virtual' offset is (i.e. where the hdl really is, and where it
59 %% would be after the write buffer is written out).
60 %% 5) You can find out what the offset was when you last sync'd.
62 %% There is also a server component which serves to limit the number
63 %% of open file handles in a "soft" way - the server will never
64 %% prevent a client from opening a handle, but may immediately tell it
65 %% to close the handle. Thus you can set the limit to zero and it will
66 %% still all work correctly, it is just that effectively no caching
67 %% will take place. The operation of limiting is as follows:
69 %% On open and close, the client sends messages to the server
70 %% informing it of opens and closes. This allows the server to keep
71 %% track of the number of open handles. The client also keeps a
72 %% gb_tree which is updated on every use of a file handle, mapping the
73 %% time at which the file handle was last used (timestamp) to the
74 %% handle. Thus the smallest key in this tree maps to the file handle
75 %% that has not been used for the longest amount of time. This
76 %% smallest key is included in the messages to the server. As such,
77 %% the server keeps track of when the least recently used file handle
78 %% was used *at the point of the most recent open or close* by each
81 %% Note that this data can go very out of date, by the client using
82 %% the least recently used handle.
84 %% When the limit is reached, the server calculates the average age of
85 %% the last reported least recently used file handle of all the
86 %% clients. It then tells all the clients to close any handles not
87 %% used for longer than this average, by invoking the callback the
88 %% client registered. The client should receive this message and pass
89 %% it into set_maximum_since_use/1. However, it is highly possible
90 %% this age will be greater than the ages of all the handles the
91 %% client knows of because the client has used its file handles in the
92 %% mean time. Thus at this point the client reports to the server the
93 %% current timestamp at which its least recently used file handle was
94 %% last used. The server will check two seconds later that either it
95 %% is back under the limit, in which case all is well again, or if
96 %% not, it will calculate a new average age. Its data will be much
97 %% more recent now, and so it is very likely that when this is
98 %% communicated to the clients, the clients will close file handles.
100 %% The advantage of this scheme is that there is only communication
101 %% from the client to the server on open, close, and when in the
102 %% process of trying to reduce file handle usage. There is no
103 %% communication from the client to the server on normal file handle
104 %% operations. This scheme forms a feed-back loop - the server does
105 %% not care which file handles are closed, just that some are, and it
106 %% checks this repeatedly when over the limit. Given the guarantees of
107 %% now(), even if there is just one file handle open, a limit of 1,
108 %% and one client, it is certain that when the client calculates the
109 %% age of the handle, it will be greater than when the server
110 %% calculated it, hence it should be closed.
112 %% Handles which are closed as a result of the server are put into a
113 %% "soft-closed" state in which the handle is closed (data flushed out
114 %% and sync'd first) but the state is maintained. The handle will be
115 %% fully reopened again as soon as needed, thus users of this library
116 %% do not need to worry about their handles being closed by the server
117 %% - reopening them when necessary is handled transparently.
119 %% The server also supports obtain and release_on_death. obtain/0
120 %% blocks until a file descriptor is available. release_on_death/1
121 %% takes a pid and monitors the pid, reducing the count by 1 when the
122 %% pid dies. Thus the assumption is that obtain/0 is called first, and
123 %% when that returns, release_on_death/1 is called with the pid who
124 %% "owns" the file descriptor. This is, for example, used to track the
125 %% use of file descriptors through network sockets.
127 -behaviour(gen_server).
129 -export([register_callback/3]).
130 -export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1,
131 last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1,
132 flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]).
133 -export([release_on_death/1, obtain/0]).
135 -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
136 terminate/2, code_change/3]).
138 -define(SERVER, ?MODULE).
139 -define(RESERVED_FOR_OTHERS, 100).
140 -define(FILE_HANDLES_LIMIT_WINDOWS, 10000000).
141 -define(FILE_HANDLES_LIMIT_OTHER, 1024).
142 -define(FILE_HANDLES_CHECK_INTERVAL, 2000).
144 %%----------------------------------------------------------------------------
157 write_buffer_size_limit,
178 %%----------------------------------------------------------------------------
180 %%----------------------------------------------------------------------------
184 -type(ref() :: any()).
185 -type(ok_or_error() :: rabbit_types:ok_or_error(any())).
186 -type(val_or_error(T) :: rabbit_types:ok_or_error2(T, any())).
187 -type(position() :: ('bof' | 'eof' | non_neg_integer() |
188 {('bof' |'eof'), non_neg_integer()} |
189 {'cur', integer()})).
190 -type(offset() :: non_neg_integer()).
192 -spec(register_callback/3 :: (atom(), atom(), [any()]) -> 'ok').
195 [{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')}])
196 -> val_or_error(ref())).
197 -spec(close/1 :: (ref()) -> ok_or_error()).
198 -spec(read/2 :: (ref(), non_neg_integer()) ->
199 val_or_error([char()] | binary()) | 'eof').
200 -spec(append/2 :: (ref(), iodata()) -> ok_or_error()).
201 -spec(sync/1 :: (ref()) -> ok_or_error()).
202 -spec(position/2 :: (ref(), position()) -> val_or_error(offset())).
203 -spec(truncate/1 :: (ref()) -> ok_or_error()).
204 -spec(last_sync_offset/1 :: (ref()) -> val_or_error(offset())).
205 -spec(current_virtual_offset/1 :: (ref()) -> val_or_error(offset())).
206 -spec(current_raw_offset/1 :: (ref()) -> val_or_error(offset())).
207 -spec(flush/1 :: (ref()) -> ok_or_error()).
208 -spec(copy/3 :: (ref(), ref(), non_neg_integer()) ->
209 val_or_error(non_neg_integer())).
210 -spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok').
211 -spec(delete/1 :: (ref()) -> ok_or_error()).
212 -spec(clear/1 :: (ref()) -> ok_or_error()).
213 -spec(release_on_death/1 :: (pid()) -> 'ok').
214 -spec(obtain/0 :: () -> 'ok').
218 %%----------------------------------------------------------------------------
220 %%----------------------------------------------------------------------------
223 gen_server:start_link({local, ?SERVER}, ?MODULE, [], [{timeout, infinity}]).
225 register_callback(M, F, A)
226 when is_atom(M) andalso is_atom(F) andalso is_list(A) ->
227 gen_server:cast(?SERVER, {register_callback, self(), {M, F, A}}).
229 open(Path, Mode, Options) ->
230 Path1 = filename:absname(Path),
231 File1 = #file { reader_count = RCount, has_writer = HasWriter } =
232 case get({Path1, fhc_file}) of
233 File = #file {} -> File;
234 undefined -> #file { reader_count = 0,
237 Mode1 = append_to_write(Mode),
238 IsWriter = is_writer(Mode1),
239 case IsWriter andalso HasWriter of
240 true -> {error, writer_exists};
241 false -> Ref = make_ref(),
242 case open1(Path1, Mode1, Options, Ref, bof, new) of
244 RCount1 = case is_reader(Mode1) of
248 HasWriter1 = HasWriter orelse IsWriter,
249 put({Path1, fhc_file},
250 File1 #file { reader_count = RCount1,
251 has_writer = HasWriter1 }),
259 case erase({Ref, fhc_handle}) of
261 Handle -> case hard_close(Handle) of
263 {Error, Handle1} -> put_handle(Ref, Handle1),
269 with_flushed_handles(
271 fun ([#handle { is_read = false }]) ->
272 {error, not_open_for_reading};
273 ([Handle = #handle { hdl = Hdl, offset = Offset }]) ->
274 case file:read(Hdl, Count) of
275 {ok, Data} = Obj -> Offset1 = Offset + iolist_size(Data),
277 [Handle #handle { offset = Offset1 }]};
278 eof -> {eof, [Handle #handle { at_eof = true }]};
279 Error -> {Error, [Handle]}
286 fun ([#handle { is_write = false }]) ->
287 {error, not_open_for_writing};
289 case maybe_seek(eof, Handle) of
290 {{ok, _Offset}, #handle { hdl = Hdl, offset = Offset,
291 write_buffer_size_limit = 0,
292 at_eof = true } = Handle1} ->
293 Offset1 = Offset + iolist_size(Data),
294 {file:write(Hdl, Data),
295 [Handle1 #handle { is_dirty = true, offset = Offset1 }]};
296 {{ok, _Offset}, #handle { write_buffer = WriteBuffer,
297 write_buffer_size = Size,
298 write_buffer_size_limit = Limit,
299 at_eof = true } = Handle1} ->
300 WriteBuffer1 = [Data | WriteBuffer],
301 Size1 = Size + iolist_size(Data),
302 Handle2 = Handle1 #handle { write_buffer = WriteBuffer1,
303 write_buffer_size = Size1 },
304 case Limit /= infinity andalso Size1 > Limit of
305 true -> {Result, Handle3} = write_buffer(Handle2),
307 false -> {ok, [Handle2]}
309 {{error, _} = Error, Handle1} ->
315 with_flushed_handles(
317 fun ([#handle { is_dirty = false, write_buffer = [] }]) ->
319 ([Handle = #handle { hdl = Hdl, offset = Offset,
320 is_dirty = true, write_buffer = [] }]) ->
321 case file:sync(Hdl) of
322 ok -> {ok, [Handle #handle { trusted_offset = Offset,
323 is_dirty = false }]};
324 Error -> {Error, [Handle]}
328 position(Ref, NewOffset) ->
329 with_flushed_handles(
331 fun ([Handle]) -> {Result, Handle1} = maybe_seek(NewOffset, Handle),
336 with_flushed_handles(
338 fun ([Handle1 = #handle { hdl = Hdl, offset = Offset,
339 trusted_offset = TOffset }]) ->
340 case file:truncate(Hdl) of
341 ok -> TOffset1 = lists:min([Offset, TOffset]),
342 {ok, [Handle1 #handle { trusted_offset = TOffset1,
344 Error -> {Error, [Handle1]}
348 last_sync_offset(Ref) ->
349 with_handles([Ref], fun ([#handle { trusted_offset = TOffset }]) ->
353 current_virtual_offset(Ref) ->
354 with_handles([Ref], fun ([#handle { at_eof = true, is_write = true,
356 write_buffer_size = Size }]) ->
358 ([#handle { offset = Offset }]) ->
362 current_raw_offset(Ref) ->
363 with_handles([Ref], fun ([Handle]) -> {ok, Handle #handle.offset} end).
366 with_flushed_handles([Ref], fun ([Handle]) -> {ok, [Handle]} end).
368 copy(Src, Dest, Count) ->
369 with_flushed_handles(
371 fun ([SHandle = #handle { is_read = true, hdl = SHdl, offset = SOffset },
372 DHandle = #handle { is_write = true, hdl = DHdl, offset = DOffset }]
374 case file:copy(SHdl, DHdl, Count) of
375 {ok, Count1} = Result1 ->
377 [SHandle #handle { offset = SOffset + Count1 },
378 DHandle #handle { offset = DOffset + Count1 }]};
380 {Error, [SHandle, DHandle]}
383 {error, incorrect_handle_modes}
387 case erase({Ref, fhc_handle}) of
390 Handle = #handle { path = Path } ->
391 case hard_close(Handle #handle { is_dirty = false,
392 write_buffer = [] }) of
393 ok -> file:delete(Path);
394 {Error, Handle1} -> put_handle(Ref, Handle1),
402 fun ([#handle { at_eof = true, write_buffer_size = 0, offset = 0 }]) ->
405 case maybe_seek(bof, Handle #handle { write_buffer = [],
406 write_buffer_size = 0 }) of
407 {{ok, 0}, Handle1 = #handle { hdl = Hdl }} ->
408 case file:truncate(Hdl) of
409 ok -> {ok, [Handle1 #handle {trusted_offset = 0,
411 Error -> {Error, [Handle1]}
413 {{error, _} = Error, Handle1} ->
418 set_maximum_since_use(MaximumAge) ->
421 fun ({{Ref, fhc_handle},
422 Handle = #handle { hdl = Hdl, last_used_at = Then }}, Rep) ->
423 Age = timer:now_diff(Now, Then),
424 case Hdl /= closed andalso Age >= MaximumAge of
425 true -> {Res, Handle1} = soft_close(Handle),
427 ok -> put({Ref, fhc_handle}, Handle1),
429 _ -> put_handle(Ref, Handle1),
434 (_KeyValuePair, Rep) ->
437 true -> age_tree_change(), ok;
441 release_on_death(Pid) when is_pid(Pid) ->
442 gen_server:cast(?SERVER, {release_on_death, Pid}).
445 gen_server:call(?SERVER, obtain, infinity).
447 %%----------------------------------------------------------------------------
448 %% Internal functions
449 %%----------------------------------------------------------------------------
451 is_reader(Mode) -> lists:member(read, Mode).
453 is_writer(Mode) -> lists:member(write, Mode).
455 append_to_write(Mode) ->
456 case lists:member(append, Mode) of
457 true -> [write | Mode -- [append, write]];
461 with_handles(Refs, Fun) ->
462 ResHandles = lists:foldl(
463 fun (Ref, {ok, HandlesAcc}) ->
464 case get_or_reopen(Ref) of
465 {ok, Handle} -> {ok, [Handle | HandlesAcc]};
470 end, {ok, []}, Refs),
473 case Fun(lists:reverse(Handles)) of
474 {Result, Handles1} when is_list(Handles1) ->
475 lists:zipwith(fun put_handle/2, Refs, Handles1),
484 with_flushed_handles(Refs, Fun) ->
489 fun (Handle, {ok, HandlesAcc}) ->
490 {Res, Handle1} = write_buffer(Handle),
491 {Res, [Handle1 | HandlesAcc]};
492 (Handle, {Error, HandlesAcc}) ->
493 {Error, [Handle | HandlesAcc]}
494 end, {ok, []}, Handles) of
496 Fun(lists:reverse(Handles1));
498 {Error, lists:reverse(Handles1)}
502 get_or_reopen(Ref) ->
503 case get({Ref, fhc_handle}) of
505 {error, not_open, Ref};
506 #handle { hdl = closed, offset = Offset,
507 path = Path, mode = Mode, options = Options } ->
508 open1(Path, Mode, Options, Ref, Offset, reopen);
513 put_handle(Ref, Handle = #handle { last_used_at = Then }) ->
515 age_tree_update(Then, Now, Ref),
516 put({Ref, fhc_handle}, Handle #handle { last_used_at = Now }).
518 with_age_tree(Fun) ->
519 put(fhc_age_tree, Fun(case get(fhc_age_tree) of
520 undefined -> gb_trees:empty();
524 age_tree_insert(Now, Ref) ->
527 Tree1 = gb_trees:insert(Now, Ref, Tree),
528 {Oldest, _Ref} = gb_trees:smallest(Tree1),
529 gen_server:cast(?SERVER, {open, self(), Oldest}),
533 age_tree_update(Then, Now, Ref) ->
536 gb_trees:insert(Now, Ref, gb_trees:delete_any(Then, Tree))
539 age_tree_delete(Then) ->
542 Tree1 = gb_trees:delete_any(Then, Tree),
543 Oldest = case gb_trees:is_empty(Tree1) of
547 {Oldest1, _Ref} = gb_trees:smallest(Tree1),
550 gen_server:cast(?SERVER, {close, self(), Oldest}),
557 case gb_trees:is_empty(Tree) of
559 false -> {Oldest, _Ref} = gb_trees:smallest(Tree),
560 gen_server:cast(?SERVER, {update, self(), Oldest})
565 open1(Path, Mode, Options, Ref, Offset, NewOrReopen) ->
566 Mode1 = case NewOrReopen of
568 reopen -> [read | Mode]
570 case file:open(Path, Mode1) of
573 case proplists:get_value(write_buffer, Options, unbuffered) of
575 infinity -> infinity;
576 N when is_integer(N) -> N
579 Handle = #handle { hdl = Hdl,
583 write_buffer_size = 0,
584 write_buffer_size_limit = WriteBufferSize,
590 is_write = is_writer(Mode),
591 is_read = is_reader(Mode),
592 last_used_at = Now },
593 {{ok, Offset1}, Handle1} = maybe_seek(Offset, Handle),
594 Handle2 = Handle1 #handle { trusted_offset = Offset1 },
595 put({Ref, fhc_handle}, Handle2),
596 age_tree_insert(Now, Ref),
602 soft_close(Handle = #handle { hdl = closed }) ->
604 soft_close(Handle) ->
605 case write_buffer(Handle) of
606 {ok, #handle { hdl = Hdl, offset = Offset, is_dirty = IsDirty,
607 last_used_at = Then } = Handle1 } ->
609 true -> file:sync(Hdl);
612 ok = file:close(Hdl),
613 age_tree_delete(Then),
614 {ok, Handle1 #handle { hdl = closed, trusted_offset = Offset,
616 {_Error, _Handle} = Result ->
620 hard_close(Handle) ->
621 case soft_close(Handle) of
622 {ok, #handle { path = Path,
623 is_read = IsReader, is_write = IsWriter }} ->
624 #file { reader_count = RCount, has_writer = HasWriter } = File =
625 get({Path, fhc_file}),
626 RCount1 = case IsReader of
630 HasWriter1 = HasWriter andalso not IsWriter,
631 case RCount1 =:= 0 andalso not HasWriter1 of
632 true -> erase({Path, fhc_file});
633 false -> put({Path, fhc_file},
634 File #file { reader_count = RCount1,
635 has_writer = HasWriter1 })
638 {_Error, _Handle} = Result ->
642 maybe_seek(NewOffset, Handle = #handle { hdl = Hdl, offset = Offset,
644 {AtEoF1, NeedsSeek} = needs_seek(AtEoF, Offset, NewOffset),
645 case (case NeedsSeek of
646 true -> file:position(Hdl, NewOffset);
647 false -> {ok, Offset}
649 {ok, Offset1} = Result ->
650 {Result, Handle #handle { offset = Offset1, at_eof = AtEoF1 }};
651 {error, _} = Error ->
655 needs_seek( AtEoF, _CurOffset, cur ) -> {AtEoF, false};
656 needs_seek( AtEoF, _CurOffset, {cur, 0}) -> {AtEoF, false};
657 needs_seek( true, _CurOffset, eof ) -> {true , false};
658 needs_seek( true, _CurOffset, {eof, 0}) -> {true , false};
659 needs_seek( false, _CurOffset, eof ) -> {true , true };
660 needs_seek( false, _CurOffset, {eof, 0}) -> {true , true };
661 needs_seek( AtEoF, 0, bof ) -> {AtEoF, false};
662 needs_seek( AtEoF, 0, {bof, 0}) -> {AtEoF, false};
663 needs_seek( AtEoF, CurOffset, CurOffset) -> {AtEoF, false};
664 needs_seek( true, CurOffset, {bof, DesiredOffset})
665 when DesiredOffset >= CurOffset ->
667 needs_seek( true, _CurOffset, {cur, DesiredOffset})
668 when DesiredOffset > 0 ->
670 needs_seek( true, CurOffset, DesiredOffset) %% same as {bof, DO}
671 when is_integer(DesiredOffset) andalso DesiredOffset >= CurOffset ->
673 %% because we can't really track size, we could well end up at EoF and not know
674 needs_seek(_AtEoF, _CurOffset, _DesiredOffset) ->
677 write_buffer(Handle = #handle { write_buffer = [] }) ->
679 write_buffer(Handle = #handle { hdl = Hdl, offset = Offset,
680 write_buffer = WriteBuffer,
681 write_buffer_size = DataSize,
683 case file:write(Hdl, lists:reverse(WriteBuffer)) of
685 Offset1 = Offset + DataSize,
686 {ok, Handle #handle { offset = Offset1, is_dirty = true,
687 write_buffer = [], write_buffer_size = 0 }};
688 {error, _} = Error ->
692 %%----------------------------------------------------------------------------
693 %% gen_server callbacks
694 %%----------------------------------------------------------------------------
697 Limit = case application:get_env(file_handles_high_watermark) of
698 {ok, Watermark} when (is_integer(Watermark) andalso
704 error_logger:info_msg("Limiting to approx ~p file handles~n", [Limit]),
705 {ok, #fhc_state { elders = dict:new(), limit = Limit, count = 0,
706 obtains = [], callbacks = dict:new(),
707 client_mrefs = dict:new(), timer_ref = undefined }}.
709 handle_call(obtain, From, State = #fhc_state { count = Count }) ->
710 State1 = #fhc_state { count = Count1, limit = Limit, obtains = Obtains } =
711 maybe_reduce(State #fhc_state { count = Count + 1 }),
712 case Limit /= infinity andalso Count1 >= Limit of
713 true -> {noreply, State1 #fhc_state { obtains = [From | Obtains],
714 count = Count1 - 1 }};
715 false -> {reply, ok, State1}
718 handle_cast({register_callback, Pid, MFA},
719 State = #fhc_state { callbacks = Callbacks }) ->
720 {noreply, ensure_mref(
721 Pid, State #fhc_state {
722 callbacks = dict:store(Pid, MFA, Callbacks) })};
724 handle_cast({open, Pid, EldestUnusedSince}, State =
725 #fhc_state { elders = Elders, count = Count }) ->
726 Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
727 {noreply, maybe_reduce(
728 ensure_mref(Pid, State #fhc_state { elders = Elders1,
729 count = Count + 1 }))};
731 handle_cast({update, Pid, EldestUnusedSince}, State =
732 #fhc_state { elders = Elders }) ->
733 Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
734 %% don't call maybe_reduce from here otherwise we can create a
736 {noreply, ensure_mref(Pid, State #fhc_state { elders = Elders1 })};
738 handle_cast({close, Pid, EldestUnusedSince}, State =
739 #fhc_state { elders = Elders, count = Count }) ->
740 Elders1 = case EldestUnusedSince of
741 undefined -> dict:erase(Pid, Elders);
742 _ -> dict:store(Pid, EldestUnusedSince, Elders)
744 {noreply, process_obtains(
745 ensure_mref(Pid, State #fhc_state { elders = Elders1,
746 count = Count - 1 }))};
748 handle_cast(check_counts, State) ->
749 {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })};
751 handle_cast({release_on_death, Pid}, State) ->
752 _MRef = erlang:monitor(process, Pid),
755 handle_info({'DOWN', MRef, process, Pid, _Reason}, State =
756 #fhc_state { count = Count, callbacks = Callbacks,
757 client_mrefs = ClientMRefs, elders = Elders }) ->
758 {noreply, process_obtains(
759 case dict:find(Pid, ClientMRefs) of
760 {ok, MRef} -> State #fhc_state {
761 elders = dict:erase(Pid, Elders),
762 client_mrefs = dict:erase(Pid, ClientMRefs),
763 callbacks = dict:erase(Pid, Callbacks) };
764 _ -> State #fhc_state { count = Count - 1 }
767 terminate(_Reason, State) ->
770 code_change(_OldVsn, State, _Extra) ->
773 %%----------------------------------------------------------------------------
775 %%----------------------------------------------------------------------------
777 process_obtains(State = #fhc_state { obtains = [] }) ->
779 process_obtains(State = #fhc_state { limit = Limit, count = Count })
780 when Limit /= infinity andalso Count >= Limit ->
782 process_obtains(State = #fhc_state { limit = Limit, count = Count,
783 obtains = Obtains }) ->
784 ObtainsLen = length(Obtains),
785 ObtainableLen = lists:min([ObtainsLen, Limit - Count]),
786 Take = ObtainsLen - ObtainableLen,
787 {ObtainsNew, ObtainableRev} = lists:split(Take, Obtains),
788 [gen_server:reply(From, ok) || From <- ObtainableRev],
789 State #fhc_state { count = Count + ObtainableLen, obtains = ObtainsNew }.
791 maybe_reduce(State = #fhc_state { limit = Limit, count = Count, elders = Elders,
792 callbacks = Callbacks, timer_ref = TRef })
793 when Limit /= infinity andalso Count >= Limit ->
795 {Pids, Sum, ClientCount} =
796 dict:fold(fun (_Pid, undefined, Accs) ->
798 (Pid, Eldest, {PidsAcc, SumAcc, CountAcc}) ->
799 {[Pid|PidsAcc], SumAcc + timer:now_diff(Now, Eldest),
801 end, {[], 0, 0}, Elders),
804 _ -> AverageAge = Sum / ClientCount,
807 case dict:find(Pid, Callbacks) of
809 {ok, {M, F, A}} -> apply(M, F, A ++ [AverageAge])
814 undefined -> {ok, TRef1} = timer:apply_after(
815 ?FILE_HANDLES_CHECK_INTERVAL,
816 gen_server, cast, [?SERVER, check_counts]),
817 State #fhc_state { timer_ref = TRef1 };
820 maybe_reduce(State) ->
823 %% Googling around suggests that Windows has a limit somewhere around
825 %% http://blogs.technet.com/markrussinovich/archive/2009/09/29/3283844.aspx
826 %% For everything else, assume ulimit exists. Further googling
827 %% suggests that BSDs (incl OS X), solaris and linux all agree that
828 %% ulimit -n is file handles
832 ?FILE_HANDLES_LIMIT_WINDOWS;
834 %% Under Linux, Solaris and FreeBSD, ulimit is a shell
835 %% builtin, not a command. In OS X, it's a command.
836 %% Fortunately, os:cmd invokes the cmd in a shell env, so
837 %% we're safe in all cases.
838 case os:cmd("ulimit -n") of
841 String = [C|_] when $0 =< C andalso C =< $9 ->
842 Num = list_to_integer(
844 fun (D) -> $0 =< D andalso D =< $9 end, String)) -
845 ?RESERVED_FOR_OTHERS,
848 %% probably a variant of
849 %% "/bin/sh: line 1: ulimit: command not found\n"
850 ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS
853 ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS
856 ensure_mref(Pid, State = #fhc_state { client_mrefs = ClientMRefs }) ->
857 case dict:find(Pid, ClientMRefs) of
858 {ok, _MRef} -> State;
859 error -> MRef = erlang:monitor(process, Pid),
861 client_mrefs = dict:store(Pid, MRef, ClientMRefs) }