added checks to `rabbit_misc:get_options/3'.
See comments on top of the new definition (now its arity is 4).
I still need to update the tests.
1 %% The contents of this file are subject to the Mozilla Public License
2 %% Version 1.1 (the "License"); you may not use this file except in
3 %% compliance with the License. You may obtain a copy of the License
4 %% at http://www.mozilla.org/MPL/
6 %% Software distributed under the License is distributed on an "AS IS"
7 %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8 %% the License for the specific language governing rights and
9 %% limitations under the License.
11 %% The Original Code is RabbitMQ.
13 %% The Initial Developer of the Original Code is VMware, Inc.
14 %% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
17 -module(file_handle_cache).
19 %% A File Handle Cache
21 %% This extends a subset of the functionality of the Erlang file
22 %% module. In the below, we use "file handle" to specifically refer to
23 %% file handles, and "file descriptor" to refer to descriptors which
24 %% are not file handles, e.g. sockets.
27 %% 1) This supports one writer, multiple readers per file. Nothing
29 %% 2) Do not open the same file from different processes. Bad things
30 %% may happen, especially for writes.
31 %% 3) Writes are all appends. You cannot write to the middle of a
32 %% file, although you can truncate and then append if you want.
33 %% 4) Although there is a write buffer, there is no read buffer. Feel
34 %% free to use the read_ahead mode, but beware of the interaction
35 %% between that buffer and the write buffer.
38 %% 1) You do not have to remember to call sync before close
39 %% 2) Buffering is much more flexible than with the plain file module,
40 %% and you can control when the buffer gets flushed out. This means
41 %% that you can rely on reads-after-writes working, without having to
42 %% call the expensive sync.
43 %% 3) Unnecessary calls to position and sync get optimised out.
44 %% 4) You can find out what your 'real' offset is, and what your
45 %% 'virtual' offset is (i.e. where the hdl really is, and where it
46 %% would be after the write buffer is written out).
48 %% There is also a server component which serves to limit the number
49 %% of open file descriptors. This is a hard limit: the server
50 %% component will ensure that clients do not have more file
51 %% descriptors open than it's configured to allow.
53 %% On open, the client requests permission from the server to open the
54 %% required number of file handles. The server may ask the client to
55 %% close other file handles that it has open, or it may queue the
56 %% request and ask other clients to close file handles they have open
57 %% in order to satisfy the request. Requests are always satisfied in
58 %% the order they arrive, even if a latter request (for a small number
59 %% of file handles) can be satisfied before an earlier request (for a
60 %% larger number of file handles). On close, the client sends a
61 %% message to the server. These messages allow the server to keep
62 %% track of the number of open handles. The client also keeps a
63 %% gb_tree which is updated on every use of a file handle, mapping the
64 %% time at which the file handle was last used (timestamp) to the
65 %% handle. Thus the smallest key in this tree maps to the file handle
66 %% that has not been used for the longest amount of time. This
67 %% smallest key is included in the messages to the server. As such,
68 %% the server keeps track of when the least recently used file handle
69 %% was used *at the point of the most recent open or close* by each
72 %% Note that this data can go very out of date, by the client using
73 %% the least recently used handle.
75 %% When the limit is exceeded (i.e. the number of open file handles is
76 %% at the limit and there are pending 'open' requests), the server
77 %% calculates the average age of the last reported least recently used
78 %% file handle of all the clients. It then tells all the clients to
79 %% close any handles not used for longer than this average, by
80 %% invoking the callback the client registered. The client should
81 %% receive this message and pass it into
82 %% set_maximum_since_use/1. However, it is highly possible this age
83 %% will be greater than the ages of all the handles the client knows
84 %% of because the client has used its file handles in the mean
85 %% time. Thus at this point the client reports to the server the
86 %% current timestamp at which its least recently used file handle was
87 %% last used. The server will check two seconds later that either it
88 %% is back under the limit, in which case all is well again, or if
89 %% not, it will calculate a new average age. Its data will be much
90 %% more recent now, and so it is very likely that when this is
91 %% communicated to the clients, the clients will close file handles.
92 %% (In extreme cases, where it's very likely that all clients have
93 %% used their open handles since they last sent in an update, which
94 %% would mean that the average will never cause any file handles to
95 %% be closed, the server can send out an average age of 0, resulting
96 %% in all available clients closing all their file handles.)
98 %% Care is taken to ensure that (a) processes which are blocked
99 %% waiting for file descriptors to become available are not sent
100 %% requests to close file handles; and (b) given it is known how many
101 %% file handles a process has open, when the average age is forced to
102 %% 0, close messages are only sent to enough processes to release the
103 %% correct number of file handles and the list of processes is
104 %% randomly shuffled. This ensures we don't cause processes to
105 %% needlessly close file handles, and ensures that we don't always
106 %% make such requests of the same processes.
108 %% The advantage of this scheme is that there is only communication
109 %% from the client to the server on open, close, and when in the
110 %% process of trying to reduce file handle usage. There is no
111 %% communication from the client to the server on normal file handle
112 %% operations. This scheme forms a feed-back loop - the server does
113 %% not care which file handles are closed, just that some are, and it
114 %% checks this repeatedly when over the limit.
116 %% Handles which are closed as a result of the server are put into a
117 %% "soft-closed" state in which the handle is closed (data flushed out
118 %% and sync'd first) but the state is maintained. The handle will be
119 %% fully reopened again as soon as needed, thus users of this library
120 %% do not need to worry about their handles being closed by the server
121 %% - reopening them when necessary is handled transparently.
123 %% The server also supports obtain, release and transfer. obtain/0
124 %% blocks until a file descriptor is available, at which point the
125 %% requesting process is considered to 'own' one more
126 %% descriptor. release/0 is the inverse operation and releases a
127 %% previously obtained descriptor. transfer/1 transfers ownership of a
128 %% file descriptor between processes. It is non-blocking. Obtain has a
129 %% lower limit, set by the ?OBTAIN_LIMIT/1 macro. File handles can use
130 %% the entire limit, but will be evicted by obtain calls up to the
131 %% point at which no more obtain calls can be satisfied by the obtains
132 %% limit. Thus there will always be some capacity available for file
133 %% handles. Processes that use obtain are never asked to return them,
134 %% and they are not managed in any way by the server. It is simply a
135 %% mechanism to ensure that processes that need file descriptors such
136 %% as sockets can do so in such a way that the overall number of open
137 %% file descriptors is managed.
139 %% The callers of register_callback/3, obtain/0, and the argument of
140 %% transfer/1 are monitored, reducing the count of handles in use
141 %% appropriately when the processes terminate.
143 -behaviour(gen_server2).
145 -export([register_callback/3]).
146 -export([open/3, close/1, read/2, append/2, needs_sync/1, sync/1, position/2,
147 truncate/1, current_virtual_offset/1, current_raw_offset/1, flush/1,
148 copy/3, set_maximum_since_use/1, delete/1, clear/1]).
149 -export([obtain/0, release/0, transfer/1, set_limit/1, get_limit/0, info_keys/0,
153 -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
154 terminate/2, code_change/3, prioritise_cast/2]).
156 -define(SERVER, ?MODULE).
157 -define(RESERVED_FOR_OTHERS, 100).
159 -define(FILE_HANDLES_LIMIT_OTHER, 1024).
160 -define(FILE_HANDLES_CHECK_INTERVAL, 2000).
162 -define(OBTAIN_LIMIT(LIMIT), trunc((LIMIT * 0.9) - 2)).
163 -define(CLIENT_ETS_TABLE, file_handle_cache_client).
164 -define(ELDERS_ETS_TABLE, file_handle_cache_elders).
166 %%----------------------------------------------------------------------------
178 write_buffer_size_limit,
217 %%----------------------------------------------------------------------------
219 %%----------------------------------------------------------------------------
223 -type(ref() :: any()).
224 -type(ok_or_error() :: 'ok' | {'error', any()}).
225 -type(val_or_error(T) :: {'ok', T} | {'error', any()}).
226 -type(position() :: ('bof' | 'eof' | non_neg_integer() |
227 {('bof' |'eof'), non_neg_integer()} |
228 {'cur', integer()})).
229 -type(offset() :: non_neg_integer()).
231 -spec(register_callback/3 :: (atom(), atom(), [any()]) -> 'ok').
233 (file:filename(), [any()],
234 [{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')}])
235 -> val_or_error(ref())).
236 -spec(close/1 :: (ref()) -> ok_or_error()).
237 -spec(read/2 :: (ref(), non_neg_integer()) ->
238 val_or_error([char()] | binary()) | 'eof').
239 -spec(append/2 :: (ref(), iodata()) -> ok_or_error()).
240 -spec(sync/1 :: (ref()) -> ok_or_error()).
241 -spec(position/2 :: (ref(), position()) -> val_or_error(offset())).
242 -spec(truncate/1 :: (ref()) -> ok_or_error()).
243 -spec(current_virtual_offset/1 :: (ref()) -> val_or_error(offset())).
244 -spec(current_raw_offset/1 :: (ref()) -> val_or_error(offset())).
245 -spec(flush/1 :: (ref()) -> ok_or_error()).
246 -spec(copy/3 :: (ref(), ref(), non_neg_integer()) ->
247 val_or_error(non_neg_integer())).
248 -spec(delete/1 :: (ref()) -> ok_or_error()).
249 -spec(clear/1 :: (ref()) -> ok_or_error()).
250 -spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok').
251 -spec(obtain/0 :: () -> 'ok').
252 -spec(release/0 :: () -> 'ok').
253 -spec(transfer/1 :: (pid()) -> 'ok').
254 -spec(set_limit/1 :: (non_neg_integer()) -> 'ok').
255 -spec(get_limit/0 :: () -> non_neg_integer()).
256 -spec(info_keys/0 :: () -> rabbit_types:info_keys()).
257 -spec(info/0 :: () -> rabbit_types:infos()).
258 -spec(info/1 :: ([atom()]) -> rabbit_types:infos()).
259 -spec(ulimit/0 :: () -> 'unknown' | non_neg_integer()).
263 %%----------------------------------------------------------------------------
264 -define(INFO_KEYS, [total_limit, total_used, sockets_limit, sockets_used]).
266 %%----------------------------------------------------------------------------
268 %%----------------------------------------------------------------------------
271 gen_server2:start_link({local, ?SERVER}, ?MODULE, [], [{timeout, infinity}]).
273 register_callback(M, F, A)
274 when is_atom(M) andalso is_atom(F) andalso is_list(A) ->
275 gen_server2:cast(?SERVER, {register_callback, self(), {M, F, A}}).
277 open(Path, Mode, Options) ->
278 Path1 = filename:absname(Path),
279 File1 = #file { reader_count = RCount, has_writer = HasWriter } =
280 case get({Path1, fhc_file}) of
281 File = #file {} -> File;
282 undefined -> #file { reader_count = 0,
285 Mode1 = append_to_write(Mode),
286 IsWriter = is_writer(Mode1),
287 case IsWriter andalso HasWriter of
288 true -> {error, writer_exists};
289 false -> {ok, Ref} = new_closed_handle(Path1, Mode1, Options),
290 case get_or_reopen([{Ref, new}]) of
292 RCount1 = case is_reader(Mode1) of
296 HasWriter1 = HasWriter orelse IsWriter,
297 put({Path1, fhc_file},
298 File1 #file { reader_count = RCount1,
299 has_writer = HasWriter1 }),
302 erase({Ref, fhc_handle}),
308 case erase({Ref, fhc_handle}) of
310 Handle -> case hard_close(Handle) of
312 {Error, Handle1} -> put_handle(Ref, Handle1),
318 with_flushed_handles(
320 fun ([#handle { is_read = false }]) ->
321 {error, not_open_for_reading};
322 ([Handle = #handle { hdl = Hdl, offset = Offset }]) ->
323 case prim_file:read(Hdl, Count) of
324 {ok, Data} = Obj -> Offset1 = Offset + iolist_size(Data),
326 [Handle #handle { offset = Offset1 }]};
327 eof -> {eof, [Handle #handle { at_eof = true }]};
328 Error -> {Error, [Handle]}
335 fun ([#handle { is_write = false }]) ->
336 {error, not_open_for_writing};
338 case maybe_seek(eof, Handle) of
339 {{ok, _Offset}, #handle { hdl = Hdl, offset = Offset,
340 write_buffer_size_limit = 0,
341 at_eof = true } = Handle1} ->
342 Offset1 = Offset + iolist_size(Data),
343 {prim_file:write(Hdl, Data),
344 [Handle1 #handle { is_dirty = true, offset = Offset1 }]};
345 {{ok, _Offset}, #handle { write_buffer = WriteBuffer,
346 write_buffer_size = Size,
347 write_buffer_size_limit = Limit,
348 at_eof = true } = Handle1} ->
349 WriteBuffer1 = [Data | WriteBuffer],
350 Size1 = Size + iolist_size(Data),
351 Handle2 = Handle1 #handle { write_buffer = WriteBuffer1,
352 write_buffer_size = Size1 },
353 case Limit =/= infinity andalso Size1 > Limit of
354 true -> {Result, Handle3} = write_buffer(Handle2),
356 false -> {ok, [Handle2]}
358 {{error, _} = Error, Handle1} ->
364 with_flushed_handles(
366 fun ([#handle { is_dirty = false, write_buffer = [] }]) ->
368 ([Handle = #handle { hdl = Hdl,
369 is_dirty = true, write_buffer = [] }]) ->
370 case prim_file:sync(Hdl) of
371 ok -> {ok, [Handle #handle { is_dirty = false }]};
372 Error -> {Error, [Handle]}
379 fun ([#handle { is_dirty = false, write_buffer = [] }]) -> false;
383 position(Ref, NewOffset) ->
384 with_flushed_handles(
386 fun ([Handle]) -> {Result, Handle1} = maybe_seek(NewOffset, Handle),
391 with_flushed_handles(
393 fun ([Handle1 = #handle { hdl = Hdl }]) ->
394 case prim_file:truncate(Hdl) of
395 ok -> {ok, [Handle1 #handle { at_eof = true }]};
396 Error -> {Error, [Handle1]}
400 current_virtual_offset(Ref) ->
401 with_handles([Ref], fun ([#handle { at_eof = true, is_write = true,
403 write_buffer_size = Size }]) ->
405 ([#handle { offset = Offset }]) ->
409 current_raw_offset(Ref) ->
410 with_handles([Ref], fun ([Handle]) -> {ok, Handle #handle.offset} end).
413 with_flushed_handles([Ref], fun ([Handle]) -> {ok, [Handle]} end).
415 copy(Src, Dest, Count) ->
416 with_flushed_handles(
418 fun ([SHandle = #handle { is_read = true, hdl = SHdl, offset = SOffset },
419 DHandle = #handle { is_write = true, hdl = DHdl, offset = DOffset }]
421 case prim_file:copy(SHdl, DHdl, Count) of
422 {ok, Count1} = Result1 ->
424 [SHandle #handle { offset = SOffset + Count1 },
425 DHandle #handle { offset = DOffset + Count1,
428 {Error, [SHandle, DHandle]}
431 {error, incorrect_handle_modes}
435 case erase({Ref, fhc_handle}) of
438 Handle = #handle { path = Path } ->
439 case hard_close(Handle #handle { is_dirty = false,
440 write_buffer = [] }) of
441 ok -> prim_file:delete(Path);
442 {Error, Handle1} -> put_handle(Ref, Handle1),
450 fun ([#handle { at_eof = true, write_buffer_size = 0, offset = 0 }]) ->
453 case maybe_seek(bof, Handle #handle { write_buffer = [],
454 write_buffer_size = 0 }) of
455 {{ok, 0}, Handle1 = #handle { hdl = Hdl }} ->
456 case prim_file:truncate(Hdl) of
457 ok -> {ok, [Handle1 #handle { at_eof = true }]};
458 Error -> {Error, [Handle1]}
460 {{error, _} = Error, Handle1} ->
465 set_maximum_since_use(MaximumAge) ->
468 fun ({{Ref, fhc_handle},
469 Handle = #handle { hdl = Hdl, last_used_at = Then }}, Rep) ->
470 case Hdl =/= closed andalso
471 timer:now_diff(Now, Then) >= MaximumAge of
472 true -> soft_close(Ref, Handle) orelse Rep;
475 (_KeyValuePair, Rep) ->
477 end, false, get()) of
478 false -> age_tree_change(), ok;
483 %% If the FHC isn't running, obtains succeed immediately.
484 case whereis(?SERVER) of
486 _ -> gen_server2:call(?SERVER, {obtain, self()}, infinity)
490 gen_server2:cast(?SERVER, {release, self()}).
493 gen_server2:cast(?SERVER, {transfer, self(), Pid}).
496 gen_server2:call(?SERVER, {set_limit, Limit}, infinity).
499 gen_server2:call(?SERVER, get_limit, infinity).
501 info_keys() -> ?INFO_KEYS.
503 info() -> info(?INFO_KEYS).
504 info(Items) -> gen_server2:call(?SERVER, {info, Items}, infinity).
506 %%----------------------------------------------------------------------------
507 %% Internal functions
508 %%----------------------------------------------------------------------------
510 is_reader(Mode) -> lists:member(read, Mode).
512 is_writer(Mode) -> lists:member(write, Mode).
514 append_to_write(Mode) ->
515 case lists:member(append, Mode) of
516 true -> [write | Mode -- [append, write]];
520 with_handles(Refs, Fun) ->
521 case get_or_reopen([{Ref, reopen} || Ref <- Refs]) of
524 {Result, Handles1} when is_list(Handles1) ->
525 lists:zipwith(fun put_handle/2, Refs, Handles1),
534 with_flushed_handles(Refs, Fun) ->
539 fun (Handle, {ok, HandlesAcc}) ->
540 {Res, Handle1} = write_buffer(Handle),
541 {Res, [Handle1 | HandlesAcc]};
542 (Handle, {Error, HandlesAcc}) ->
543 {Error, [Handle | HandlesAcc]}
544 end, {ok, []}, Handles) of
546 Fun(lists:reverse(Handles1));
548 {Error, lists:reverse(Handles1)}
552 get_or_reopen(RefNewOrReopens) ->
553 case partition_handles(RefNewOrReopens) of
555 {ok, [Handle || {_Ref, Handle} <- OpenHdls]};
556 {OpenHdls, ClosedHdls} ->
557 Oldest = oldest(get_age_tree(), fun () -> now() end),
558 case gen_server2:call(?SERVER, {open, self(), length(ClosedHdls),
559 Oldest}, infinity) of
561 case reopen(ClosedHdls) of
562 {ok, RefHdls} -> sort_handles(RefNewOrReopens,
563 OpenHdls, RefHdls, []);
567 [soft_close(Ref, Handle) ||
568 {{Ref, fhc_handle}, Handle = #handle { hdl = Hdl }} <-
571 get_or_reopen(RefNewOrReopens)
575 reopen(ClosedHdls) -> reopen(ClosedHdls, get_age_tree(), []).
577 reopen([], Tree, RefHdls) ->
579 {ok, lists:reverse(RefHdls)};
580 reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed,
584 last_used_at = undefined }} |
585 RefNewOrReopenHdls] = ToOpen, Tree, RefHdls) ->
586 case prim_file:open(Path, case NewOrReopen of
588 reopen -> [read | Mode]
592 {{ok, _Offset}, Handle1} =
593 maybe_seek(Offset, Handle #handle { hdl = Hdl,
595 last_used_at = Now }),
596 put({Ref, fhc_handle}, Handle1),
597 reopen(RefNewOrReopenHdls, gb_trees:insert(Now, Ref, Tree),
598 [{Ref, Handle1} | RefHdls]);
600 %% NB: none of the handles in ToOpen are in the age tree
601 Oldest = oldest(Tree, fun () -> undefined end),
602 [gen_server2:cast(?SERVER, {close, self(), Oldest}) || _ <- ToOpen],
607 partition_handles(RefNewOrReopens) ->
609 fun ({Ref, NewOrReopen}, {Open, Closed}) ->
610 case get({Ref, fhc_handle}) of
611 #handle { hdl = closed } = Handle ->
612 {Open, [{Ref, NewOrReopen, Handle} | Closed]};
613 #handle {} = Handle ->
614 {[{Ref, Handle} | Open], Closed}
616 end, {[], []}, RefNewOrReopens).
618 sort_handles([], [], [], Acc) ->
619 {ok, lists:reverse(Acc)};
620 sort_handles([{Ref, _} | RefHdls], [{Ref, Handle} | RefHdlsA], RefHdlsB, Acc) ->
621 sort_handles(RefHdls, RefHdlsA, RefHdlsB, [Handle | Acc]);
622 sort_handles([{Ref, _} | RefHdls], RefHdlsA, [{Ref, Handle} | RefHdlsB], Acc) ->
623 sort_handles(RefHdls, RefHdlsA, RefHdlsB, [Handle | Acc]).
625 put_handle(Ref, Handle = #handle { last_used_at = Then }) ->
627 age_tree_update(Then, Now, Ref),
628 put({Ref, fhc_handle}, Handle #handle { last_used_at = Now }).
630 with_age_tree(Fun) -> put_age_tree(Fun(get_age_tree())).
633 case get(fhc_age_tree) of
634 undefined -> gb_trees:empty();
638 put_age_tree(Tree) -> put(fhc_age_tree, Tree).
640 age_tree_update(Then, Now, Ref) ->
643 gb_trees:insert(Now, Ref, gb_trees:delete_any(Then, Tree))
646 age_tree_delete(Then) ->
649 Tree1 = gb_trees:delete_any(Then, Tree),
650 Oldest = oldest(Tree1, fun () -> undefined end),
651 gen_server2:cast(?SERVER, {close, self(), Oldest}),
658 case gb_trees:is_empty(Tree) of
660 false -> {Oldest, _Ref} = gb_trees:smallest(Tree),
661 gen_server2:cast(?SERVER, {update, self(), Oldest})
666 oldest(Tree, DefaultFun) ->
667 case gb_trees:is_empty(Tree) of
668 true -> DefaultFun();
669 false -> {Oldest, _Ref} = gb_trees:smallest(Tree),
673 new_closed_handle(Path, Mode, Options) ->
675 case proplists:get_value(write_buffer, Options, unbuffered) of
677 infinity -> infinity;
678 N when is_integer(N) -> N
681 put({Ref, fhc_handle}, #handle { hdl = closed,
684 write_buffer_size = 0,
685 write_buffer_size_limit = WriteBufferSize,
691 is_write = is_writer(Mode),
692 is_read = is_reader(Mode),
693 last_used_at = undefined }),
696 soft_close(Ref, Handle) ->
697 {Res, Handle1} = soft_close(Handle),
699 ok -> put({Ref, fhc_handle}, Handle1),
701 _ -> put_handle(Ref, Handle1),
705 soft_close(Handle = #handle { hdl = closed }) ->
707 soft_close(Handle) ->
708 case write_buffer(Handle) of
709 {ok, #handle { hdl = Hdl,
711 last_used_at = Then } = Handle1 } ->
713 true -> prim_file:sync(Hdl);
716 ok = prim_file:close(Hdl),
717 age_tree_delete(Then),
718 {ok, Handle1 #handle { hdl = closed,
720 last_used_at = undefined }};
721 {_Error, _Handle} = Result ->
725 hard_close(Handle) ->
726 case soft_close(Handle) of
727 {ok, #handle { path = Path,
728 is_read = IsReader, is_write = IsWriter }} ->
729 #file { reader_count = RCount, has_writer = HasWriter } = File =
730 get({Path, fhc_file}),
731 RCount1 = case IsReader of
735 HasWriter1 = HasWriter andalso not IsWriter,
736 case RCount1 =:= 0 andalso not HasWriter1 of
737 true -> erase({Path, fhc_file});
738 false -> put({Path, fhc_file},
739 File #file { reader_count = RCount1,
740 has_writer = HasWriter1 })
743 {_Error, _Handle} = Result ->
747 maybe_seek(NewOffset, Handle = #handle { hdl = Hdl, offset = Offset,
749 {AtEoF1, NeedsSeek} = needs_seek(AtEoF, Offset, NewOffset),
750 case (case NeedsSeek of
751 true -> prim_file:position(Hdl, NewOffset);
752 false -> {ok, Offset}
754 {ok, Offset1} = Result ->
755 {Result, Handle #handle { offset = Offset1, at_eof = AtEoF1 }};
756 {error, _} = Error ->
760 needs_seek( AtEoF, _CurOffset, cur ) -> {AtEoF, false};
761 needs_seek( AtEoF, _CurOffset, {cur, 0}) -> {AtEoF, false};
762 needs_seek( true, _CurOffset, eof ) -> {true , false};
763 needs_seek( true, _CurOffset, {eof, 0}) -> {true , false};
764 needs_seek( false, _CurOffset, eof ) -> {true , true };
765 needs_seek( false, _CurOffset, {eof, 0}) -> {true , true };
766 needs_seek( AtEoF, 0, bof ) -> {AtEoF, false};
767 needs_seek( AtEoF, 0, {bof, 0}) -> {AtEoF, false};
768 needs_seek( AtEoF, CurOffset, CurOffset) -> {AtEoF, false};
769 needs_seek( true, CurOffset, {bof, DesiredOffset})
770 when DesiredOffset >= CurOffset ->
772 needs_seek( true, _CurOffset, {cur, DesiredOffset})
773 when DesiredOffset > 0 ->
775 needs_seek( true, CurOffset, DesiredOffset) %% same as {bof, DO}
776 when is_integer(DesiredOffset) andalso DesiredOffset >= CurOffset ->
778 %% because we can't really track size, we could well end up at EoF and not know
779 needs_seek(_AtEoF, _CurOffset, _DesiredOffset) ->
782 write_buffer(Handle = #handle { write_buffer = [] }) ->
784 write_buffer(Handle = #handle { hdl = Hdl, offset = Offset,
785 write_buffer = WriteBuffer,
786 write_buffer_size = DataSize,
788 case prim_file:write(Hdl, lists:reverse(WriteBuffer)) of
790 Offset1 = Offset + DataSize,
791 {ok, Handle #handle { offset = Offset1, is_dirty = true,
792 write_buffer = [], write_buffer_size = 0 }};
793 {error, _} = Error ->
797 infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
799 i(total_limit, #fhc_state{limit = Limit}) -> Limit;
800 i(total_used, #fhc_state{open_count = C1, obtain_count = C2}) -> C1 + C2;
801 i(sockets_limit, #fhc_state{obtain_limit = Limit}) -> Limit;
802 i(sockets_used, #fhc_state{obtain_count = Count}) -> Count;
803 i(Item, _) -> throw({bad_argument, Item}).
805 %%----------------------------------------------------------------------------
806 %% gen_server2 callbacks
807 %%----------------------------------------------------------------------------
810 Limit = case application:get_env(file_handles_high_watermark) of
811 {ok, Watermark} when (is_integer(Watermark) andalso
816 unknown -> ?FILE_HANDLES_LIMIT_OTHER;
817 Lim -> lists:max([2, Lim - ?RESERVED_FOR_OTHERS])
820 ObtainLimit = obtain_limit(Limit),
821 error_logger:info_msg("Limiting to approx ~p file handles (~p sockets)~n",
822 [Limit, ObtainLimit]),
823 Clients = ets:new(?CLIENT_ETS_TABLE, [set, private, {keypos, #cstate.pid}]),
824 Elders = ets:new(?ELDERS_ETS_TABLE, [set, private]),
825 {ok, #fhc_state { elders = Elders,
828 open_pending = pending_new(),
829 obtain_limit = ObtainLimit,
831 obtain_pending = pending_new(),
833 timer_ref = undefined }}.
835 prioritise_cast(Msg, _State) ->
841 handle_call({open, Pid, Requested, EldestUnusedSince}, From,
842 State = #fhc_state { open_count = Count,
843 open_pending = Pending,
846 when EldestUnusedSince =/= undefined ->
847 true = ets:insert(Elders, {Pid, EldestUnusedSince}),
848 Item = #pending { kind = open,
850 requested = Requested,
852 ok = track_client(Pid, Clients),
853 case needs_reduce(State #fhc_state { open_count = Count + Requested }) of
854 true -> case ets:lookup(Clients, Pid) of
855 [#cstate { opened = 0 }] ->
856 true = ets:update_element(
857 Clients, Pid, {#cstate.blocked, true}),
859 reduce(State #fhc_state {
860 open_pending = pending_in(Item, Pending) })};
861 [#cstate { opened = Opened }] ->
862 true = ets:update_element(
864 {#cstate.pending_closes, Opened}),
865 {reply, close, State}
867 false -> {noreply, run_pending_item(Item, State)}
870 handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
871 obtain_pending = Pending,
872 clients = Clients }) ->
873 ok = track_client(Pid, Clients),
874 Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From },
876 true = ets:update_element(Clients, Pid,
877 {#cstate.blocked, true}),
879 obtain_pending = pending_in(Item, Pending) }
882 case obtain_limit_reached(State) of
884 false -> case needs_reduce(State #fhc_state {
885 obtain_count = Count + 1 }) of
886 true -> reduce(Enqueue());
887 false -> adjust_alarm(
888 State, run_pending_item(Item, State))
892 handle_call({set_limit, Limit}, _From, State) ->
893 {reply, ok, adjust_alarm(
898 obtain_limit = obtain_limit(Limit) })))};
900 handle_call(get_limit, _From, State = #fhc_state { limit = Limit }) ->
901 {reply, Limit, State};
903 handle_call({info, Items}, _From, State) ->
904 {reply, infos(Items, State), State}.
906 handle_cast({register_callback, Pid, MFA},
907 State = #fhc_state { clients = Clients }) ->
908 ok = track_client(Pid, Clients),
909 true = ets:update_element(Clients, Pid, {#cstate.callback, MFA}),
912 handle_cast({update, Pid, EldestUnusedSince},
913 State = #fhc_state { elders = Elders })
914 when EldestUnusedSince =/= undefined ->
915 true = ets:insert(Elders, {Pid, EldestUnusedSince}),
916 %% don't call maybe_reduce from here otherwise we can create a
920 handle_cast({release, Pid}, State) ->
921 {noreply, adjust_alarm(State, process_pending(
922 update_counts(obtain, Pid, -1, State)))};
924 handle_cast({close, Pid, EldestUnusedSince},
925 State = #fhc_state { elders = Elders, clients = Clients }) ->
926 true = case EldestUnusedSince of
927 undefined -> ets:delete(Elders, Pid);
928 _ -> ets:insert(Elders, {Pid, EldestUnusedSince})
930 ets:update_counter(Clients, Pid, {#cstate.pending_closes, -1, 0, 0}),
931 {noreply, adjust_alarm(State, process_pending(
932 update_counts(open, Pid, -1, State)))};
934 handle_cast({transfer, FromPid, ToPid}, State) ->
935 ok = track_client(ToPid, State#fhc_state.clients),
936 {noreply, process_pending(
937 update_counts(obtain, ToPid, +1,
938 update_counts(obtain, FromPid, -1, State)))}.
940 handle_info(check_counts, State) ->
941 {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })};
943 handle_info({'DOWN', _MRef, process, Pid, _Reason},
944 State = #fhc_state { elders = Elders,
945 open_count = OpenCount,
946 open_pending = OpenPending,
947 obtain_count = ObtainCount,
948 obtain_pending = ObtainPending,
949 clients = Clients }) ->
950 [#cstate { opened = Opened, obtained = Obtained }] =
951 ets:lookup(Clients, Pid),
952 true = ets:delete(Clients, Pid),
953 true = ets:delete(Elders, Pid),
954 FilterFun = fun (#pending { pid = Pid1 }) -> Pid1 =/= Pid end,
955 {noreply, adjust_alarm(
959 open_count = OpenCount - Opened,
960 open_pending = filter_pending(FilterFun, OpenPending),
961 obtain_count = ObtainCount - Obtained,
962 obtain_pending = filter_pending(FilterFun, ObtainPending) }))}.
964 terminate(_Reason, State = #fhc_state { clients = Clients,
965 elders = Elders }) ->
970 code_change(_OldVsn, State, _Extra) ->
973 %%----------------------------------------------------------------------------
974 %% pending queue abstraction helpers
975 %%----------------------------------------------------------------------------
977 queue_fold(Fun, Init, Q) ->
980 {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1)
983 filter_pending(Fun, {Count, Queue}) ->
986 fun (Item = #pending { requested = Requested }, {DeltaN, QueueN}) ->
988 true -> {DeltaN, queue:in(Item, QueueN)};
989 false -> {DeltaN - Requested, QueueN}
991 end, {0, queue:new()}, Queue),
992 {Count + Delta, Queue1}.
997 pending_in(Item = #pending { requested = Requested }, {Count, Queue}) ->
998 {Count + Requested, queue:in(Item, Queue)}.
1000 pending_out({0, _Queue} = Pending) ->
1002 pending_out({N, Queue}) ->
1003 {{value, #pending { requested = Requested }} = Result, Queue1} =
1005 {Result, {N - Requested, Queue1}}.
1007 pending_count({Count, _Queue}) ->
1010 pending_is_empty({0, _Queue}) ->
1012 pending_is_empty({_N, _Queue}) ->
1015 %%----------------------------------------------------------------------------
1017 %%----------------------------------------------------------------------------
1019 obtain_limit(infinity) -> infinity;
1020 obtain_limit(Limit) -> case ?OBTAIN_LIMIT(Limit) of
1021 OLimit when OLimit < 0 -> 0;
1025 obtain_limit_reached(#fhc_state { obtain_limit = Limit,
1026 obtain_count = Count}) ->
1027 Limit =/= infinity andalso Count >= Limit.
1029 adjust_alarm(OldState, NewState) ->
1030 case {obtain_limit_reached(OldState), obtain_limit_reached(NewState)} of
1031 {false, true} -> alarm_handler:set_alarm({file_descriptor_limit, []});
1032 {true, false} -> alarm_handler:clear_alarm(file_descriptor_limit);
1037 process_pending(State = #fhc_state { limit = infinity }) ->
1039 process_pending(State) ->
1040 process_open(process_obtain(State)).
1042 process_open(State = #fhc_state { limit = Limit,
1043 open_pending = Pending,
1044 open_count = OpenCount,
1045 obtain_count = ObtainCount }) ->
1046 {Pending1, State1} =
1047 process_pending(Pending, Limit - (ObtainCount + OpenCount), State),
1048 State1 #fhc_state { open_pending = Pending1 }.
1050 process_obtain(State = #fhc_state { limit = Limit,
1051 obtain_pending = Pending,
1052 obtain_limit = ObtainLimit,
1053 obtain_count = ObtainCount,
1054 open_count = OpenCount }) ->
1055 Quota = lists:min([ObtainLimit - ObtainCount,
1056 Limit - (ObtainCount + OpenCount)]),
1057 {Pending1, State1} = process_pending(Pending, Quota, State),
1058 State1 #fhc_state { obtain_pending = Pending1 }.
1060 process_pending(Pending, Quota, State) when Quota =< 0 ->
1062 process_pending(Pending, Quota, State) ->
1063 case pending_out(Pending) of
1064 {empty, _Pending} ->
1066 {{value, #pending { requested = Requested }}, _Pending1}
1067 when Requested > Quota ->
1069 {{value, #pending { requested = Requested } = Item}, Pending1} ->
1070 process_pending(Pending1, Quota - Requested,
1071 run_pending_item(Item, State))
1074 run_pending_item(#pending { kind = Kind,
1076 requested = Requested,
1078 State = #fhc_state { clients = Clients }) ->
1079 gen_server2:reply(From, ok),
1080 true = ets:update_element(Clients, Pid, {#cstate.blocked, false}),
1081 update_counts(Kind, Pid, Requested, State).
1083 update_counts(Kind, Pid, Delta,
1084 State = #fhc_state { open_count = OpenCount,
1085 obtain_count = ObtainCount,
1086 clients = Clients }) ->
1087 {OpenDelta, ObtainDelta} = update_counts1(Kind, Pid, Delta, Clients),
1088 State #fhc_state { open_count = OpenCount + OpenDelta,
1089 obtain_count = ObtainCount + ObtainDelta }.
1091 update_counts1(open, Pid, Delta, Clients) ->
1092 ets:update_counter(Clients, Pid, {#cstate.opened, Delta}),
1094 update_counts1(obtain, Pid, Delta, Clients) ->
1095 ets:update_counter(Clients, Pid, {#cstate.obtained, Delta}),
1098 maybe_reduce(State) ->
1099 case needs_reduce(State) of
1100 true -> reduce(State);
1104 needs_reduce(#fhc_state { limit = Limit,
1105 open_count = OpenCount,
1106 open_pending = OpenPending,
1107 obtain_count = ObtainCount,
1108 obtain_limit = ObtainLimit,
1109 obtain_pending = ObtainPending }) ->
1111 andalso ((OpenCount + ObtainCount > Limit)
1112 orelse (not pending_is_empty(OpenPending))
1113 orelse (ObtainCount < ObtainLimit
1114 andalso not pending_is_empty(ObtainPending))).
1116 reduce(State = #fhc_state { open_pending = OpenPending,
1117 obtain_pending = ObtainPending,
1120 timer_ref = TRef }) ->
1122 {CStates, Sum, ClientCount} =
1123 ets:foldl(fun ({Pid, Eldest}, {CStatesAcc, SumAcc, CountAcc} = Accs) ->
1124 [#cstate { pending_closes = PendingCloses,
1126 blocked = Blocked } = CState] =
1127 ets:lookup(Clients, Pid),
1128 case Blocked orelse PendingCloses =:= Opened of
1130 false -> {[CState | CStatesAcc],
1131 SumAcc + timer:now_diff(Now, Eldest),
1134 end, {[], 0, 0}, Elders),
1137 _ -> case (Sum / ClientCount) -
1138 (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of
1139 AverageAge when AverageAge > 0 ->
1140 notify_age(CStates, AverageAge);
1142 notify_age0(Clients, CStates,
1143 pending_count(OpenPending) +
1144 pending_count(ObtainPending))
1148 undefined -> TRef1 = erlang:send_after(
1149 ?FILE_HANDLES_CHECK_INTERVAL, ?SERVER,
1151 State #fhc_state { timer_ref = TRef1 };
1155 notify_age(CStates, AverageAge) ->
1157 fun (#cstate { callback = undefined }) -> ok;
1158 (#cstate { callback = {M, F, A} }) -> apply(M, F, A ++ [AverageAge])
1161 notify_age0(Clients, CStates, Required) ->
1162 case [CState || CState <- CStates, CState#cstate.callback =/= undefined] of
1164 Notifications -> S = random:uniform(length(Notifications)),
1165 {L1, L2} = lists:split(S, Notifications),
1166 notify(Clients, Required, L2 ++ L1)
1169 notify(_Clients, _Required, []) ->
1171 notify(_Clients, Required, _Notifications) when Required =< 0 ->
1173 notify(Clients, Required, [#cstate{ pid = Pid,
1174 callback = {M, F, A},
1175 opened = Opened } | Notifications]) ->
1176 apply(M, F, A ++ [0]),
1177 ets:update_element(Clients, Pid, {#cstate.pending_closes, Opened}),
1178 notify(Clients, Required - Opened, Notifications).
1180 track_client(Pid, Clients) ->
1181 case ets:insert_new(Clients, #cstate { pid = Pid,
1182 callback = undefined,
1186 pending_closes = 0 }) of
1187 true -> _MRef = erlang:monitor(process, Pid),
1193 %% To increase the number of file descriptors: on Windows set ERL_MAX_PORTS
1194 %% environment variable, on Linux set `ulimit -n`.
1196 case proplists:get_value(max_fds, erlang:system_info(check_io)) of
1197 MaxFds when is_integer(MaxFds) andalso MaxFds > 1 ->
1200 %% On Windows max_fds is twice the number of open files:
1201 %% https://github.com/yrashk/erlang/blob/e1282325ed75e52a98d5/erts/emulator/sys/win32/sys.c#L2459-2466
1204 %% For other operating systems trust Erlang.