|
matthias@3268
|
1 |
%% The contents of this file are subject to the Mozilla Public License
|
|
matthias@3268
|
2 |
%% Version 1.1 (the "License"); you may not use this file except in
|
|
matthias@3268
|
3 |
%% compliance with the License. You may obtain a copy of the License at
|
|
matthias@3268
|
4 |
%% http://www.mozilla.org/MPL/
|
|
matthias@3268
|
5 |
%%
|
|
matthias@3268
|
6 |
%% Software distributed under the License is distributed on an "AS IS"
|
|
matthias@3268
|
7 |
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
|
|
matthias@3268
|
8 |
%% License for the specific language governing rights and limitations
|
|
matthias@3268
|
9 |
%% under the License.
|
|
matthias@3268
|
10 |
%%
|
|
matthias@3268
|
11 |
%% The Original Code is RabbitMQ.
|
|
matthias@3268
|
12 |
%%
|
|
matthias@3268
|
13 |
%% The Initial Developers of the Original Code are LShift Ltd,
|
|
matthias@3268
|
14 |
%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
|
|
matthias@3268
|
15 |
%%
|
|
matthias@3268
|
16 |
%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
|
|
matthias@3268
|
17 |
%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
|
|
matthias@3268
|
18 |
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
|
|
matthias@3268
|
19 |
%% Technologies LLC, and Rabbit Technologies Ltd.
|
|
matthias@3268
|
20 |
%%
|
|
matthias@3268
|
21 |
%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
|
|
matthias@3268
|
22 |
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
|
|
matthias@3268
|
23 |
%% Copyright (C) 2007-2010 Cohesive Financial Technologies
|
|
matthias@3268
|
24 |
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
|
|
matthias@3268
|
25 |
%% (C) 2007-2010 Rabbit Technologies Ltd.
|
|
matthias@3268
|
26 |
%%
|
|
matthias@3268
|
27 |
%% All Rights Reserved.
|
|
matthias@3268
|
28 |
%%
|
|
matthias@3268
|
29 |
%% Contributor(s): ______________________________________.
|
|
matthias@3268
|
30 |
%%
|
|
matthias@3268
|
31 |
|
|
matthias@3268
|
32 |
-module(file_handle_cache).
|
|
matthias@3268
|
33 |
|
|
matthias@3268
|
34 |
%% A File Handle Cache
|
|
matthias@3268
|
35 |
%%
|
|
matthias@3268
|
36 |
%% This extends a subset of the functionality of the Erlang file
|
|
matthias@3268
|
37 |
%% module.
|
|
matthias@3268
|
38 |
%%
|
|
matthias@3268
|
39 |
%% Some constraints
|
|
matthias@3268
|
40 |
%% 1) This supports one writer, multiple readers per file. Nothing
|
|
matthias@3268
|
41 |
%% else.
|
|
matthias@3268
|
42 |
%% 2) Do not open the same file from different processes. Bad things
|
|
matthias@3268
|
43 |
%% may happen.
|
|
matthias@3268
|
44 |
%% 3) Writes are all appends. You cannot write to the middle of a
|
|
matthias@3268
|
45 |
%% file, although you can truncate and then append if you want.
|
|
matthias@3268
|
46 |
%% 4) Although there is a write buffer, there is no read buffer. Feel
|
|
matthias@3268
|
47 |
%% free to use the read_ahead mode, but beware of the interaction
|
|
matthias@3268
|
48 |
%% between that buffer and the write buffer.
|
|
matthias@3268
|
49 |
%%
|
|
matthias@3268
|
50 |
%% Some benefits
|
|
matthias@3268
|
51 |
%% 1) You do not have to remember to call sync before close
|
|
matthias@3268
|
52 |
%% 2) Buffering is much more flexible than with plain file module, and
|
|
matthias@3268
|
53 |
%% you can control when the buffer gets flushed out. This means that
|
|
matthias@3268
|
54 |
%% you can rely on reads-after-writes working, without having to call
|
|
matthias@3268
|
55 |
%% the expensive sync.
|
|
matthias@3268
|
56 |
%% 3) Unnecessary calls to position and sync get optimised out.
|
|
matthias@3268
|
57 |
%% 4) You can find out what your 'real' offset is, and what your
|
|
matthias@3268
|
58 |
%% 'virtual' offset is (i.e. where the hdl really is, and where it
|
|
matthias@3268
|
59 |
%% would be after the write buffer is written out).
|
|
matthias@3268
|
60 |
%% 5) You can find out what the offset was when you last sync'd.
|
|
matthias@3268
|
61 |
%%
|
|
matthias@3268
|
62 |
%% There is also a server component which serves to limit the number
|
|
matthias@3268
|
63 |
%% of open file handles in a "soft" way - the server will never
|
|
matthias@3268
|
64 |
%% prevent a client from opening a handle, but may immediately tell it
|
|
matthias@3268
|
65 |
%% to close the handle. Thus you can set the limit to zero and it will
|
|
matthias@3268
|
66 |
%% still all work correctly, it is just that effectively no caching
|
|
matthias@3268
|
67 |
%% will take place. The operation of limiting is as follows:
|
|
matthias@3268
|
68 |
%%
|
|
matthias@3268
|
69 |
%% On open and close, the client sends messages to the server
|
|
matthias@3268
|
70 |
%% informing it of opens and closes. This allows the server to keep
|
|
matthias@3268
|
71 |
%% track of the number of open handles. The client also keeps a
|
|
matthias@3268
|
72 |
%% gb_tree which is updated on every use of a file handle, mapping the
|
|
matthias@3268
|
73 |
%% time at which the file handle was last used (timestamp) to the
|
|
matthias@3268
|
74 |
%% handle. Thus the smallest key in this tree maps to the file handle
|
|
matthias@3268
|
75 |
%% that has not been used for the longest amount of time. This
|
|
matthias@3268
|
76 |
%% smallest key is included in the messages to the server. As such,
|
|
matthias@3268
|
77 |
%% the server keeps track of when the least recently used file handle
|
|
matthias@3268
|
78 |
%% was used *at the point of the most recent open or close* by each
|
|
matthias@3268
|
79 |
%% client.
|
|
matthias@3268
|
80 |
%%
|
|
matthias@3268
|
81 |
%% Note that this data can go very out of date, by the client using
|
|
matthias@3268
|
82 |
%% the least recently used handle.
|
|
matthias@3268
|
83 |
%%
|
|
matthias@3268
|
84 |
%% When the limit is reached, the server calculates the average age of
|
|
matthias@3268
|
85 |
%% the last reported least recently used file handle of all the
|
|
matthias@3268
|
86 |
%% clients. It then tells all the clients to close any handles not
|
|
matthias@3268
|
87 |
%% used for longer than this average, by invoking the callback the
|
|
matthias@3268
|
88 |
%% client registered. The client should receive this message and pass
|
|
matthias@3268
|
89 |
%% it into set_maximum_since_use/1. However, it is highly possible
|
|
matthias@3268
|
90 |
%% this age will be greater than the ages of all the handles the
|
|
matthias@3268
|
91 |
%% client knows of because the client has used its file handles in the
|
|
matthias@3268
|
92 |
%% mean time. Thus at this point the client reports to the server the
|
|
matthias@3268
|
93 |
%% current timestamp at which its least recently used file handle was
|
|
matthias@3268
|
94 |
%% last used. The server will check two seconds later that either it
|
|
matthias@3268
|
95 |
%% is back under the limit, in which case all is well again, or if
|
|
matthias@3268
|
96 |
%% not, it will calculate a new average age. Its data will be much
|
|
matthias@3268
|
97 |
%% more recent now, and so it is very likely that when this is
|
|
matthias@3268
|
98 |
%% communicated to the clients, the clients will close file handles.
|
|
matthias@3268
|
99 |
%%
|
|
matthias@3268
|
100 |
%% The advantage of this scheme is that there is only communication
|
|
matthias@3268
|
101 |
%% from the client to the server on open, close, and when in the
|
|
matthias@3268
|
102 |
%% process of trying to reduce file handle usage. There is no
|
|
matthias@3268
|
103 |
%% communication from the client to the server on normal file handle
|
|
matthias@3268
|
104 |
%% operations. This scheme forms a feed-back loop - the server does
|
|
matthias@3268
|
105 |
%% not care which file handles are closed, just that some are, and it
|
|
matthias@3268
|
106 |
%% checks this repeatedly when over the limit. Given the guarantees of
|
|
matthias@3268
|
107 |
%% now(), even if there is just one file handle open, a limit of 1,
|
|
matthias@3268
|
108 |
%% and one client, it is certain that when the client calculates the
|
|
matthias@3268
|
109 |
%% age of the handle, it will be greater than when the server
|
|
matthias@3268
|
110 |
%% calculated it, hence it should be closed.
|
|
matthias@3268
|
111 |
%%
|
|
matthias@3268
|
112 |
%% Handles which are closed as a result of the server are put into a
|
|
matthias@3268
|
113 |
%% "soft-closed" state in which the handle is closed (data flushed out
|
|
matthias@3268
|
114 |
%% and sync'd first) but the state is maintained. The handle will be
|
|
matthias@3268
|
115 |
%% fully reopened again as soon as needed, thus users of this library
|
|
matthias@3268
|
116 |
%% do not need to worry about their handles being closed by the server
|
|
matthias@3268
|
117 |
%% - reopening them when necessary is handled transparently.
|
|
matthias@3268
|
118 |
%%
|
|
matthias@3268
|
119 |
%% The server also supports obtain and release_on_death. obtain/0
|
|
matthias@3268
|
120 |
%% blocks until a file descriptor is available. release_on_death/1
|
|
matthias@3268
|
121 |
%% takes a pid and monitors the pid, reducing the count by 1 when the
|
|
matthias@3268
|
122 |
%% pid dies. Thus the assumption is that obtain/0 is called first, and
|
|
matthias@3268
|
123 |
%% when that returns, release_on_death/1 is called with the pid who
|
|
matthias@3268
|
124 |
%% "owns" the file descriptor. This is, for example, used to track the
|
|
matthias@3268
|
125 |
%% use of file descriptors through network sockets.
|
|
matthias@3268
|
126 |
|
|
matthias@3268
|
127 |
-behaviour(gen_server).
|
|
matthias@3268
|
128 |
|
|
matthias@3268
|
129 |
-export([register_callback/3]).
|
|
matthias@3268
|
130 |
-export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1,
|
|
matthias@3268
|
131 |
last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1,
|
|
matthias@3268
|
132 |
flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]).
|
|
matthias@3268
|
133 |
-export([release_on_death/1, obtain/0]).
|
|
matthias@3268
|
134 |
|
|
matthias@3268
|
135 |
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
|
|
matthias@3268
|
136 |
terminate/2, code_change/3]).
|
|
matthias@3268
|
137 |
|
|
matthias@3268
|
138 |
-define(SERVER, ?MODULE).
|
|
matthias@3268
|
139 |
-define(RESERVED_FOR_OTHERS, 100).
|
|
matthias@3268
|
140 |
-define(FILE_HANDLES_LIMIT_WINDOWS, 10000000).
|
|
matthias@3268
|
141 |
-define(FILE_HANDLES_LIMIT_OTHER, 1024).
|
|
matthias@3268
|
142 |
-define(FILE_HANDLES_CHECK_INTERVAL, 2000).
|
|
matthias@3268
|
143 |
|
|
matthias@3268
|
144 |
%%----------------------------------------------------------------------------
|
|
matthias@3268
|
145 |
|
|
matthias@3268
|
146 |
-record(file,
|
|
matthias@3268
|
147 |
{ reader_count,
|
|
matthias@3268
|
148 |
has_writer
|
|
matthias@3268
|
149 |
}).
|
|
matthias@3268
|
150 |
|
|
matthias@3268
|
151 |
-record(handle,
|
|
matthias@3268
|
152 |
{ hdl,
|
|
matthias@3268
|
153 |
offset,
|
|
matthias@3268
|
154 |
trusted_offset,
|
|
matthias@3268
|
155 |
is_dirty,
|
|
matthias@3268
|
156 |
write_buffer_size,
|
|
matthias@3268
|
157 |
write_buffer_size_limit,
|
|
matthias@3268
|
158 |
write_buffer,
|
|
matthias@3268
|
159 |
at_eof,
|
|
matthias@3268
|
160 |
path,
|
|
matthias@3268
|
161 |
mode,
|
|
matthias@3268
|
162 |
options,
|
|
matthias@3268
|
163 |
is_write,
|
|
matthias@3268
|
164 |
is_read,
|
|
matthias@3268
|
165 |
last_used_at
|
|
matthias@3268
|
166 |
}).
|
|
matthias@3268
|
167 |
|
|
matthias@3268
|
168 |
-record(fhc_state,
|
|
matthias@3268
|
169 |
{ elders,
|
|
matthias@3268
|
170 |
limit,
|
|
matthias@3268
|
171 |
count,
|
|
matthias@3268
|
172 |
obtains,
|
|
matthias@3268
|
173 |
callbacks,
|
|
matthias@3268
|
174 |
client_mrefs,
|
|
matthias@3268
|
175 |
timer_ref
|
|
matthias@3268
|
176 |
}).
|
|
matthias@3268
|
177 |
|
|
matthias@3268
|
178 |
%%----------------------------------------------------------------------------
|
|
matthias@3268
|
179 |
%% Specs
|
|
matthias@3268
|
180 |
%%----------------------------------------------------------------------------
|
|
matthias@3268
|
181 |
|
|
matthias@3268
|
182 |
-ifdef(use_specs).
|
|
matthias@3268
|
183 |
|
|
matthias@3268
|
184 |
-type(ref() :: any()).
|
|
alexandru@3905
|
185 |
-type(ok_or_error() :: rabbit_types:ok_or_error(any())).
|
|
matthew@3933
|
186 |
-type(val_or_error(T) :: rabbit_types:ok_or_error2(T, any())).
|
|
matthias@3268
|
187 |
-type(position() :: ('bof' | 'eof' | non_neg_integer() |
|
|
alexandru@3910
|
188 |
{('bof' |'eof'), non_neg_integer()} |
|
|
alexandru@3910
|
189 |
{'cur', integer()})).
|
|
matthias@3268
|
190 |
-type(offset() :: non_neg_integer()).
|
|
matthias@3268
|
191 |
|
|
matthias@3268
|
192 |
-spec(register_callback/3 :: (atom(), atom(), [any()]) -> 'ok').
|
|
matthias@3268
|
193 |
-spec(open/3 ::
|
|
alexandru@3910
|
194 |
(string(), [any()],
|
|
alexandru@3910
|
195 |
[{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')}])
|
|
alexandru@3910
|
196 |
-> val_or_error(ref())).
|
|
matthias@3268
|
197 |
-spec(close/1 :: (ref()) -> ok_or_error()).
|
|
matthias@3268
|
198 |
-spec(read/2 :: (ref(), non_neg_integer()) ->
|
|
matthias@3268
|
199 |
val_or_error([char()] | binary()) | 'eof').
|
|
matthias@3268
|
200 |
-spec(append/2 :: (ref(), iodata()) -> ok_or_error()).
|
|
matthias@3268
|
201 |
-spec(sync/1 :: (ref()) -> ok_or_error()).
|
|
matthias@3268
|
202 |
-spec(position/2 :: (ref(), position()) -> val_or_error(offset())).
|
|
matthias@3268
|
203 |
-spec(truncate/1 :: (ref()) -> ok_or_error()).
|
|
matthias@3268
|
204 |
-spec(last_sync_offset/1 :: (ref()) -> val_or_error(offset())).
|
|
matthias@3268
|
205 |
-spec(current_virtual_offset/1 :: (ref()) -> val_or_error(offset())).
|
|
matthias@3268
|
206 |
-spec(current_raw_offset/1 :: (ref()) -> val_or_error(offset())).
|
|
matthias@3268
|
207 |
-spec(flush/1 :: (ref()) -> ok_or_error()).
|
|
matthias@3268
|
208 |
-spec(copy/3 :: (ref(), ref(), non_neg_integer()) ->
|
|
matthias@3268
|
209 |
val_or_error(non_neg_integer())).
|
|
matthias@3268
|
210 |
-spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok').
|
|
matthias@3268
|
211 |
-spec(delete/1 :: (ref()) -> ok_or_error()).
|
|
matthias@3268
|
212 |
-spec(clear/1 :: (ref()) -> ok_or_error()).
|
|
matthias@3268
|
213 |
-spec(release_on_death/1 :: (pid()) -> 'ok').
|
|
matthias@3268
|
214 |
-spec(obtain/0 :: () -> 'ok').
|
|
matthias@3268
|
215 |
|
|
matthias@3268
|
216 |
-endif.
|
|
matthias@3268
|
217 |
|
|
matthias@3268
|
218 |
%%----------------------------------------------------------------------------
|
|
matthias@3268
|
219 |
%% Public API
|
|
matthias@3268
|
220 |
%%----------------------------------------------------------------------------
|
|
matthias@3268
|
221 |
|
|
matthias@3268
|
222 |
start_link() ->
|
|
matthias@3268
|
223 |
gen_server:start_link({local, ?SERVER}, ?MODULE, [], [{timeout, infinity}]).
|
|
matthias@3268
|
224 |
|
|
matthias@3268
|
225 |
register_callback(M, F, A)
|
|
matthias@3268
|
226 |
when is_atom(M) andalso is_atom(F) andalso is_list(A) ->
|
|
matthias@3268
|
227 |
gen_server:cast(?SERVER, {register_callback, self(), {M, F, A}}).
|
|
matthias@3268
|
228 |
|
|
matthias@3268
|
229 |
open(Path, Mode, Options) ->
|
|
matthias@3268
|
230 |
Path1 = filename:absname(Path),
|
|
matthias@3268
|
231 |
File1 = #file { reader_count = RCount, has_writer = HasWriter } =
|
|
matthias@3268
|
232 |
case get({Path1, fhc_file}) of
|
|
matthias@3268
|
233 |
File = #file {} -> File;
|
|
matthias@3268
|
234 |
undefined -> #file { reader_count = 0,
|
|
matthias@3268
|
235 |
has_writer = false }
|
|
matthias@3268
|
236 |
end,
|
|
matthias@3268
|
237 |
Mode1 = append_to_write(Mode),
|
|
matthias@3268
|
238 |
IsWriter = is_writer(Mode1),
|
|
matthias@3268
|
239 |
case IsWriter andalso HasWriter of
|
|
matthias@3268
|
240 |
true -> {error, writer_exists};
|
|
matthias@3268
|
241 |
false -> Ref = make_ref(),
|
|
matthias@3268
|
242 |
case open1(Path1, Mode1, Options, Ref, bof, new) of
|
|
matthias@3268
|
243 |
{ok, _Handle} ->
|
|
matthias@3268
|
244 |
RCount1 = case is_reader(Mode1) of
|
|
matthias@3268
|
245 |
true -> RCount + 1;
|
|
matthias@3268
|
246 |
false -> RCount
|
|
matthias@3268
|
247 |
end,
|
|
matthias@3268
|
248 |
HasWriter1 = HasWriter orelse IsWriter,
|
|
matthias@3268
|
249 |
put({Path1, fhc_file},
|
|
matthias@3268
|
250 |
File1 #file { reader_count = RCount1,
|
|
matthias@3268
|
251 |
has_writer = HasWriter1 }),
|
|
matthias@3268
|
252 |
{ok, Ref};
|
|
matthias@3268
|
253 |
Error ->
|
|
matthias@3268
|
254 |
Error
|
|
matthias@3268
|
255 |
end
|
|
matthias@3268
|
256 |
end.
|
|
matthias@3268
|
257 |
|
|
matthias@3268
|
258 |
close(Ref) ->
|
|
matthias@3268
|
259 |
case erase({Ref, fhc_handle}) of
|
|
matthias@3268
|
260 |
undefined -> ok;
|
|
matthias@3268
|
261 |
Handle -> case hard_close(Handle) of
|
|
matthias@3268
|
262 |
ok -> ok;
|
|
matthias@3268
|
263 |
{Error, Handle1} -> put_handle(Ref, Handle1),
|
|
matthias@3268
|
264 |
Error
|
|
matthias@3268
|
265 |
end
|
|
matthias@3268
|
266 |
end.
|
|
matthias@3268
|
267 |
|
|
matthias@3268
|
268 |
read(Ref, Count) ->
|
|
matthias@3268
|
269 |
with_flushed_handles(
|
|
matthias@3268
|
270 |
[Ref],
|
|
matthias@3268
|
271 |
fun ([#handle { is_read = false }]) ->
|
|
matthias@3268
|
272 |
{error, not_open_for_reading};
|
|
matthias@3268
|
273 |
([Handle = #handle { hdl = Hdl, offset = Offset }]) ->
|
|
matthias@3268
|
274 |
case file:read(Hdl, Count) of
|
|
matthias@3268
|
275 |
{ok, Data} = Obj -> Offset1 = Offset + iolist_size(Data),
|
|
matthias@3268
|
276 |
{Obj,
|
|
matthias@3268
|
277 |
[Handle #handle { offset = Offset1 }]};
|
|
matthias@3268
|
278 |
eof -> {eof, [Handle #handle { at_eof = true }]};
|
|
matthias@3268
|
279 |
Error -> {Error, [Handle]}
|
|
matthias@3268
|
280 |
end
|
|
matthias@3268
|
281 |
end).
|
|
matthias@3268
|
282 |
|
|
matthias@3268
|
283 |
append(Ref, Data) ->
|
|
matthias@3268
|
284 |
with_handles(
|
|
matthias@3268
|
285 |
[Ref],
|
|
matthias@3268
|
286 |
fun ([#handle { is_write = false }]) ->
|
|
matthias@3268
|
287 |
{error, not_open_for_writing};
|
|
matthias@3268
|
288 |
([Handle]) ->
|
|
matthias@3268
|
289 |
case maybe_seek(eof, Handle) of
|
|
matthias@3268
|
290 |
{{ok, _Offset}, #handle { hdl = Hdl, offset = Offset,
|
|
matthias@3268
|
291 |
write_buffer_size_limit = 0,
|
|
matthias@3268
|
292 |
at_eof = true } = Handle1} ->
|
|
matthias@3268
|
293 |
Offset1 = Offset + iolist_size(Data),
|
|
matthias@3268
|
294 |
{file:write(Hdl, Data),
|
|
matthias@3268
|
295 |
[Handle1 #handle { is_dirty = true, offset = Offset1 }]};
|
|
matthias@3268
|
296 |
{{ok, _Offset}, #handle { write_buffer = WriteBuffer,
|
|
matthias@3268
|
297 |
write_buffer_size = Size,
|
|
matthias@3268
|
298 |
write_buffer_size_limit = Limit,
|
|
matthias@3268
|
299 |
at_eof = true } = Handle1} ->
|
|
matthias@3268
|
300 |
WriteBuffer1 = [Data | WriteBuffer],
|
|
matthias@3268
|
301 |
Size1 = Size + iolist_size(Data),
|
|
matthias@3268
|
302 |
Handle2 = Handle1 #handle { write_buffer = WriteBuffer1,
|
|
matthias@3268
|
303 |
write_buffer_size = Size1 },
|
|
matthias@3268
|
304 |
case Limit /= infinity andalso Size1 > Limit of
|
|
matthias@3268
|
305 |
true -> {Result, Handle3} = write_buffer(Handle2),
|
|
matthias@3268
|
306 |
{Result, [Handle3]};
|
|
matthias@3268
|
307 |
false -> {ok, [Handle2]}
|
|
matthias@3268
|
308 |
end;
|
|
matthias@3268
|
309 |
{{error, _} = Error, Handle1} ->
|
|
matthias@3268
|
310 |
{Error, [Handle1]}
|
|
matthias@3268
|
311 |
end
|
|
matthias@3268
|
312 |
end).
|
|
matthias@3268
|
313 |
|
|
matthias@3268
|
314 |
sync(Ref) ->
|
|
matthias@3268
|
315 |
with_flushed_handles(
|
|
matthias@3268
|
316 |
[Ref],
|
|
matthias@3268
|
317 |
fun ([#handle { is_dirty = false, write_buffer = [] }]) ->
|
|
matthias@3268
|
318 |
ok;
|
|
matthias@3268
|
319 |
([Handle = #handle { hdl = Hdl, offset = Offset,
|
|
matthias@3268
|
320 |
is_dirty = true, write_buffer = [] }]) ->
|
|
matthias@3268
|
321 |
case file:sync(Hdl) of
|
|
matthias@3268
|
322 |
ok -> {ok, [Handle #handle { trusted_offset = Offset,
|
|
matthias@3268
|
323 |
is_dirty = false }]};
|
|
matthias@3268
|
324 |
Error -> {Error, [Handle]}
|
|
matthias@3268
|
325 |
end
|
|
matthias@3268
|
326 |
end).
|
|
matthias@3268
|
327 |
|
|
matthias@3268
|
328 |
position(Ref, NewOffset) ->
|
|
matthias@3268
|
329 |
with_flushed_handles(
|
|
matthias@3268
|
330 |
[Ref],
|
|
matthias@3268
|
331 |
fun ([Handle]) -> {Result, Handle1} = maybe_seek(NewOffset, Handle),
|
|
matthias@3268
|
332 |
{Result, [Handle1]}
|
|
matthias@3268
|
333 |
end).
|
|
matthias@3268
|
334 |
|
|
matthias@3268
|
335 |
truncate(Ref) ->
|
|
matthias@3268
|
336 |
with_flushed_handles(
|
|
matthias@3268
|
337 |
[Ref],
|
|
matthias@3268
|
338 |
fun ([Handle1 = #handle { hdl = Hdl, offset = Offset,
|
|
matthias@3268
|
339 |
trusted_offset = TOffset }]) ->
|
|
matthias@3268
|
340 |
case file:truncate(Hdl) of
|
|
matthias@3268
|
341 |
ok -> TOffset1 = lists:min([Offset, TOffset]),
|
|
matthias@3268
|
342 |
{ok, [Handle1 #handle { trusted_offset = TOffset1,
|
|
matthias@3268
|
343 |
at_eof = true }]};
|
|
matthias@3268
|
344 |
Error -> {Error, [Handle1]}
|
|
matthias@3268
|
345 |
end
|
|
matthias@3268
|
346 |
end).
|
|
matthias@3268
|
347 |
|
|
matthias@3268
|
348 |
last_sync_offset(Ref) ->
|
|
matthias@3268
|
349 |
with_handles([Ref], fun ([#handle { trusted_offset = TOffset }]) ->
|
|
matthias@3268
|
350 |
{ok, TOffset}
|
|
matthias@3268
|
351 |
end).
|
|
matthias@3268
|
352 |
|
|
matthias@3268
|
353 |
current_virtual_offset(Ref) ->
|
|
matthias@3268
|
354 |
with_handles([Ref], fun ([#handle { at_eof = true, is_write = true,
|
|
matthias@3268
|
355 |
offset = Offset,
|
|
matthias@3268
|
356 |
write_buffer_size = Size }]) ->
|
|
matthias@3268
|
357 |
{ok, Offset + Size};
|
|
matthias@3268
|
358 |
([#handle { offset = Offset }]) ->
|
|
matthias@3268
|
359 |
{ok, Offset}
|
|
matthias@3268
|
360 |
end).
|
|
matthias@3268
|
361 |
|
|
matthias@3268
|
362 |
current_raw_offset(Ref) ->
|
|
matthias@3268
|
363 |
with_handles([Ref], fun ([Handle]) -> {ok, Handle #handle.offset} end).
|
|
matthias@3268
|
364 |
|
|
matthias@3268
|
365 |
flush(Ref) ->
|
|
matthias@3268
|
366 |
with_flushed_handles([Ref], fun ([Handle]) -> {ok, [Handle]} end).
|
|
matthias@3268
|
367 |
|
|
matthias@3268
|
368 |
copy(Src, Dest, Count) ->
|
|
matthias@3268
|
369 |
with_flushed_handles(
|
|
matthias@3268
|
370 |
[Src, Dest],
|
|
matthias@3268
|
371 |
fun ([SHandle = #handle { is_read = true, hdl = SHdl, offset = SOffset },
|
|
matthias@3268
|
372 |
DHandle = #handle { is_write = true, hdl = DHdl, offset = DOffset }]
|
|
matthias@3268
|
373 |
) ->
|
|
matthias@3268
|
374 |
case file:copy(SHdl, DHdl, Count) of
|
|
matthias@3268
|
375 |
{ok, Count1} = Result1 ->
|
|
matthias@3268
|
376 |
{Result1,
|
|
matthias@3268
|
377 |
[SHandle #handle { offset = SOffset + Count1 },
|
|
matthias@3268
|
378 |
DHandle #handle { offset = DOffset + Count1 }]};
|
|
matthias@3268
|
379 |
Error ->
|
|
matthias@3268
|
380 |
{Error, [SHandle, DHandle]}
|
|
matthias@3268
|
381 |
end;
|
|
matthias@3268
|
382 |
(_Handles) ->
|
|
matthias@3268
|
383 |
{error, incorrect_handle_modes}
|
|
matthias@3268
|
384 |
end).
|
|
matthias@3268
|
385 |
|
|
matthias@3268
|
386 |
delete(Ref) ->
|
|
matthias@3268
|
387 |
case erase({Ref, fhc_handle}) of
|
|
matthias@3268
|
388 |
undefined ->
|
|
matthias@3268
|
389 |
ok;
|
|
matthias@3268
|
390 |
Handle = #handle { path = Path } ->
|
|
matthias@3268
|
391 |
case hard_close(Handle #handle { is_dirty = false,
|
|
matthias@3268
|
392 |
write_buffer = [] }) of
|
|
matthias@3268
|
393 |
ok -> file:delete(Path);
|
|
matthias@3268
|
394 |
{Error, Handle1} -> put_handle(Ref, Handle1),
|
|
matthias@3268
|
395 |
Error
|
|
matthias@3268
|
396 |
end
|
|
matthias@3268
|
397 |
end.
|
|
matthias@3268
|
398 |
|
|
matthias@3268
|
399 |
clear(Ref) ->
|
|
matthias@3268
|
400 |
with_handles(
|
|
matthias@3268
|
401 |
[Ref],
|
|
matthias@3268
|
402 |
fun ([#handle { at_eof = true, write_buffer_size = 0, offset = 0 }]) ->
|
|
matthias@3268
|
403 |
ok;
|
|
matthias@3268
|
404 |
([Handle]) ->
|
|
matthias@3268
|
405 |
case maybe_seek(bof, Handle #handle { write_buffer = [],
|
|
matthias@3268
|
406 |
write_buffer_size = 0 }) of
|
|
matthias@3268
|
407 |
{{ok, 0}, Handle1 = #handle { hdl = Hdl }} ->
|
|
matthias@3268
|
408 |
case file:truncate(Hdl) of
|
|
matthias@3268
|
409 |
ok -> {ok, [Handle1 #handle {trusted_offset = 0,
|
|
matthias@3268
|
410 |
at_eof = true }]};
|
|
matthias@3268
|
411 |
Error -> {Error, [Handle1]}
|
|
matthias@3268
|
412 |
end;
|
|
matthias@3268
|
413 |
{{error, _} = Error, Handle1} ->
|
|
matthias@3268
|
414 |
{Error, [Handle1]}
|
|
matthias@3268
|
415 |
end
|
|
matthias@3268
|
416 |
end).
|
|
matthias@3268
|
417 |
|
|
matthias@3268
|
418 |
set_maximum_since_use(MaximumAge) ->
|
|
matthias@3268
|
419 |
Now = now(),
|
|
matthias@3268
|
420 |
case lists:foldl(
|
|
matthias@3268
|
421 |
fun ({{Ref, fhc_handle},
|
|
matthias@3268
|
422 |
Handle = #handle { hdl = Hdl, last_used_at = Then }}, Rep) ->
|
|
matthias@3268
|
423 |
Age = timer:now_diff(Now, Then),
|
|
matthias@3268
|
424 |
case Hdl /= closed andalso Age >= MaximumAge of
|
|
matthias@3268
|
425 |
true -> {Res, Handle1} = soft_close(Handle),
|
|
matthias@3268
|
426 |
case Res of
|
|
matthias@3268
|
427 |
ok -> put({Ref, fhc_handle}, Handle1),
|
|
matthias@3268
|
428 |
false;
|
|
matthias@3268
|
429 |
_ -> put_handle(Ref, Handle1),
|
|
matthias@3268
|
430 |
Rep
|
|
matthias@3268
|
431 |
end;
|
|
matthias@3268
|
432 |
false -> Rep
|
|
matthias@3268
|
433 |
end;
|
|
matthias@3268
|
434 |
(_KeyValuePair, Rep) ->
|
|
matthias@3268
|
435 |
Rep
|
|
matthias@3268
|
436 |
end, true, get()) of
|
|
matthias@3268
|
437 |
true -> age_tree_change(), ok;
|
|
matthias@3268
|
438 |
false -> ok
|
|
matthias@3268
|
439 |
end.
|
|
matthias@3268
|
440 |
|
|
matthias@3268
|
441 |
release_on_death(Pid) when is_pid(Pid) ->
|
|
matthias@3268
|
442 |
gen_server:cast(?SERVER, {release_on_death, Pid}).
|
|
matthias@3268
|
443 |
|
|
matthias@3268
|
444 |
obtain() ->
|
|
matthias@3268
|
445 |
gen_server:call(?SERVER, obtain, infinity).
|
|
matthias@3268
|
446 |
|
|
matthias@3268
|
447 |
%%----------------------------------------------------------------------------
|
|
matthias@3268
|
448 |
%% Internal functions
|
|
matthias@3268
|
449 |
%%----------------------------------------------------------------------------
|
|
matthias@3268
|
450 |
|
|
matthias@3268
|
451 |
is_reader(Mode) -> lists:member(read, Mode).
|
|
matthias@3268
|
452 |
|
|
matthias@3268
|
453 |
is_writer(Mode) -> lists:member(write, Mode).
|
|
matthias@3268
|
454 |
|
|
matthias@3268
|
455 |
append_to_write(Mode) ->
|
|
matthias@3268
|
456 |
case lists:member(append, Mode) of
|
|
matthias@3268
|
457 |
true -> [write | Mode -- [append, write]];
|
|
matthias@3268
|
458 |
false -> Mode
|
|
matthias@3268
|
459 |
end.
|
|
matthias@3268
|
460 |
|
|
matthias@3268
|
461 |
with_handles(Refs, Fun) ->
|
|
matthias@3268
|
462 |
ResHandles = lists:foldl(
|
|
matthias@3268
|
463 |
fun (Ref, {ok, HandlesAcc}) ->
|
|
matthias@3268
|
464 |
case get_or_reopen(Ref) of
|
|
matthias@3268
|
465 |
{ok, Handle} -> {ok, [Handle | HandlesAcc]};
|
|
matthias@3268
|
466 |
Error -> Error
|
|
matthias@3268
|
467 |
end;
|
|
matthias@3268
|
468 |
(_Ref, Error) ->
|
|
matthias@3268
|
469 |
Error
|
|
matthias@3268
|
470 |
end, {ok, []}, Refs),
|
|
matthias@3268
|
471 |
case ResHandles of
|
|
matthias@3268
|
472 |
{ok, Handles} ->
|
|
matthias@3268
|
473 |
case Fun(lists:reverse(Handles)) of
|
|
matthias@3268
|
474 |
{Result, Handles1} when is_list(Handles1) ->
|
|
matthias@3268
|
475 |
lists:zipwith(fun put_handle/2, Refs, Handles1),
|
|
matthias@3268
|
476 |
Result;
|
|
matthias@3268
|
477 |
Result ->
|
|
matthias@3268
|
478 |
Result
|
|
matthias@3268
|
479 |
end;
|
|
matthias@3268
|
480 |
Error ->
|
|
matthias@3268
|
481 |
Error
|
|
matthias@3268
|
482 |
end.
|
|
matthias@3268
|
483 |
|
|
matthias@3268
|
484 |
with_flushed_handles(Refs, Fun) ->
|
|
matthias@3268
|
485 |
with_handles(
|
|
matthias@3268
|
486 |
Refs,
|
|
matthias@3268
|
487 |
fun (Handles) ->
|
|
matthias@3268
|
488 |
case lists:foldl(
|
|
matthias@3268
|
489 |
fun (Handle, {ok, HandlesAcc}) ->
|
|
matthias@3268
|
490 |
{Res, Handle1} = write_buffer(Handle),
|
|
matthias@3268
|
491 |
{Res, [Handle1 | HandlesAcc]};
|
|
matthias@3268
|
492 |
(Handle, {Error, HandlesAcc}) ->
|
|
matthias@3268
|
493 |
{Error, [Handle | HandlesAcc]}
|
|
matthias@3268
|
494 |
end, {ok, []}, Handles) of
|
|
matthias@3268
|
495 |
{ok, Handles1} ->
|
|
matthias@3268
|
496 |
Fun(lists:reverse(Handles1));
|
|
matthias@3268
|
497 |
{Error, Handles1} ->
|
|
matthias@3268
|
498 |
{Error, lists:reverse(Handles1)}
|
|
matthias@3268
|
499 |
end
|
|
matthias@3268
|
500 |
end).
|
|
matthias@3268
|
501 |
|
|
matthias@3268
|
502 |
get_or_reopen(Ref) ->
|
|
matthias@3268
|
503 |
case get({Ref, fhc_handle}) of
|
|
matthias@3268
|
504 |
undefined ->
|
|
matthias@3268
|
505 |
{error, not_open, Ref};
|
|
matthias@3268
|
506 |
#handle { hdl = closed, offset = Offset,
|
|
matthias@3268
|
507 |
path = Path, mode = Mode, options = Options } ->
|
|
matthias@3268
|
508 |
open1(Path, Mode, Options, Ref, Offset, reopen);
|
|
matthias@3268
|
509 |
Handle ->
|
|
matthias@3268
|
510 |
{ok, Handle}
|
|
matthias@3268
|
511 |
end.
|
|
matthias@3268
|
512 |
|
|
matthias@3268
|
513 |
put_handle(Ref, Handle = #handle { last_used_at = Then }) ->
|
|
matthias@3268
|
514 |
Now = now(),
|
|
matthias@3268
|
515 |
age_tree_update(Then, Now, Ref),
|
|
matthias@3268
|
516 |
put({Ref, fhc_handle}, Handle #handle { last_used_at = Now }).
|
|
matthias@3268
|
517 |
|
|
matthias@3268
|
518 |
with_age_tree(Fun) ->
|
|
matthias@3268
|
519 |
put(fhc_age_tree, Fun(case get(fhc_age_tree) of
|
|
matthias@3268
|
520 |
undefined -> gb_trees:empty();
|
|
matthias@3268
|
521 |
AgeTree -> AgeTree
|
|
matthias@3268
|
522 |
end)).
|
|
matthias@3268
|
523 |
|
|
matthias@3268
|
524 |
age_tree_insert(Now, Ref) ->
|
|
matthias@3268
|
525 |
with_age_tree(
|
|
matthias@3268
|
526 |
fun (Tree) ->
|
|
matthias@3268
|
527 |
Tree1 = gb_trees:insert(Now, Ref, Tree),
|
|
matthias@3268
|
528 |
{Oldest, _Ref} = gb_trees:smallest(Tree1),
|
|
matthias@3268
|
529 |
gen_server:cast(?SERVER, {open, self(), Oldest}),
|
|
matthias@3268
|
530 |
Tree1
|
|
matthias@3268
|
531 |
end).
|
|
matthias@3268
|
532 |
|
|
matthias@3268
|
533 |
age_tree_update(Then, Now, Ref) ->
|
|
matthias@3268
|
534 |
with_age_tree(
|
|
matthias@3268
|
535 |
fun (Tree) ->
|
|
matthias@3268
|
536 |
gb_trees:insert(Now, Ref, gb_trees:delete_any(Then, Tree))
|
|
matthias@3268
|
537 |
end).
|
|
matthias@3268
|
538 |
|
|
matthias@3268
|
539 |
age_tree_delete(Then) ->
|
|
matthias@3268
|
540 |
with_age_tree(
|
|
matthias@3268
|
541 |
fun (Tree) ->
|
|
matthias@3268
|
542 |
Tree1 = gb_trees:delete_any(Then, Tree),
|
|
matthias@3268
|
543 |
Oldest = case gb_trees:is_empty(Tree1) of
|
|
matthias@3268
|
544 |
true ->
|
|
matthias@3268
|
545 |
undefined;
|
|
matthias@3268
|
546 |
false ->
|
|
matthias@3268
|
547 |
{Oldest1, _Ref} = gb_trees:smallest(Tree1),
|
|
matthias@3268
|
548 |
Oldest1
|
|
matthias@3268
|
549 |
end,
|
|
matthias@3268
|
550 |
gen_server:cast(?SERVER, {close, self(), Oldest}),
|
|
matthias@3268
|
551 |
Tree1
|
|
matthias@3268
|
552 |
end).
|
|
matthias@3268
|
553 |
|
|
matthias@3268
|
554 |
age_tree_change() ->
|
|
matthias@3268
|
555 |
with_age_tree(
|
|
matthias@3268
|
556 |
fun (Tree) ->
|
|
matthias@3268
|
557 |
case gb_trees:is_empty(Tree) of
|
|
matthias@3268
|
558 |
true -> Tree;
|
|
matthias@3268
|
559 |
false -> {Oldest, _Ref} = gb_trees:smallest(Tree),
|
|
matthias@3268
|
560 |
gen_server:cast(?SERVER, {update, self(), Oldest})
|
|
matthias@3268
|
561 |
end,
|
|
matthias@3268
|
562 |
Tree
|
|
matthias@3268
|
563 |
end).
|
|
matthias@3268
|
564 |
|
|
matthias@3268
|
565 |
open1(Path, Mode, Options, Ref, Offset, NewOrReopen) ->
|
|
matthias@3268
|
566 |
Mode1 = case NewOrReopen of
|
|
matthias@3268
|
567 |
new -> Mode;
|
|
matthias@3268
|
568 |
reopen -> [read | Mode]
|
|
matthias@3268
|
569 |
end,
|
|
matthias@3268
|
570 |
case file:open(Path, Mode1) of
|
|
matthias@3268
|
571 |
{ok, Hdl} ->
|
|
matthias@3268
|
572 |
WriteBufferSize =
|
|
matthias@3268
|
573 |
case proplists:get_value(write_buffer, Options, unbuffered) of
|
|
matthias@3268
|
574 |
unbuffered -> 0;
|
|
matthias@3268
|
575 |
infinity -> infinity;
|
|
matthias@3268
|
576 |
N when is_integer(N) -> N
|
|
matthias@3268
|
577 |
end,
|
|
matthias@3268
|
578 |
Now = now(),
|
|
matthias@3268
|
579 |
Handle = #handle { hdl = Hdl,
|
|
matthias@3268
|
580 |
offset = 0,
|
|
matthias@3268
|
581 |
trusted_offset = 0,
|
|
matthias@3268
|
582 |
is_dirty = false,
|
|
matthias@3268
|
583 |
write_buffer_size = 0,
|
|
matthias@3268
|
584 |
write_buffer_size_limit = WriteBufferSize,
|
|
matthias@3268
|
585 |
write_buffer = [],
|
|
matthias@3268
|
586 |
at_eof = false,
|
|
matthias@3268
|
587 |
path = Path,
|
|
matthias@3268
|
588 |
mode = Mode,
|
|
matthias@3268
|
589 |
options = Options,
|
|
matthias@3268
|
590 |
is_write = is_writer(Mode),
|
|
matthias@3268
|
591 |
is_read = is_reader(Mode),
|
|
matthias@3268
|
592 |
last_used_at = Now },
|
|
matthias@3268
|
593 |
{{ok, Offset1}, Handle1} = maybe_seek(Offset, Handle),
|
|
matthias@3268
|
594 |
Handle2 = Handle1 #handle { trusted_offset = Offset1 },
|
|
matthias@3268
|
595 |
put({Ref, fhc_handle}, Handle2),
|
|
matthias@3268
|
596 |
age_tree_insert(Now, Ref),
|
|
matthias@3268
|
597 |
{ok, Handle2};
|
|
matthias@3268
|
598 |
{error, Reason} ->
|
|
matthias@3268
|
599 |
{error, Reason}
|
|
matthias@3268
|
600 |
end.
|
|
matthias@3268
|
601 |
|
|
matthias@3268
|
602 |
soft_close(Handle = #handle { hdl = closed }) ->
|
|
matthias@3268
|
603 |
{ok, Handle};
|
|
matthias@3268
|
604 |
soft_close(Handle) ->
|
|
matthias@3268
|
605 |
case write_buffer(Handle) of
|
|
matthias@3268
|
606 |
{ok, #handle { hdl = Hdl, offset = Offset, is_dirty = IsDirty,
|
|
matthias@3268
|
607 |
last_used_at = Then } = Handle1 } ->
|
|
matthias@3268
|
608 |
ok = case IsDirty of
|
|
matthias@3268
|
609 |
true -> file:sync(Hdl);
|
|
matthias@3268
|
610 |
false -> ok
|
|
matthias@3268
|
611 |
end,
|
|
matthias@3268
|
612 |
ok = file:close(Hdl),
|
|
matthias@3268
|
613 |
age_tree_delete(Then),
|
|
matthias@3268
|
614 |
{ok, Handle1 #handle { hdl = closed, trusted_offset = Offset,
|
|
matthias@3268
|
615 |
is_dirty = false }};
|
|
matthias@3268
|
616 |
{_Error, _Handle} = Result ->
|
|
matthias@3268
|
617 |
Result
|
|
matthias@3268
|
618 |
end.
|
|
matthias@3268
|
619 |
|
|
matthias@3268
|
620 |
hard_close(Handle) ->
|
|
matthias@3268
|
621 |
case soft_close(Handle) of
|
|
matthias@3268
|
622 |
{ok, #handle { path = Path,
|
|
matthias@3268
|
623 |
is_read = IsReader, is_write = IsWriter }} ->
|
|
matthias@3268
|
624 |
#file { reader_count = RCount, has_writer = HasWriter } = File =
|
|
matthias@3268
|
625 |
get({Path, fhc_file}),
|
|
matthias@3268
|
626 |
RCount1 = case IsReader of
|
|
matthias@3268
|
627 |
true -> RCount - 1;
|
|
matthias@3268
|
628 |
false -> RCount
|
|
matthias@3268
|
629 |
end,
|
|
matthias@3268
|
630 |
HasWriter1 = HasWriter andalso not IsWriter,
|
|
matthias@3268
|
631 |
case RCount1 =:= 0 andalso not HasWriter1 of
|
|
matthias@3268
|
632 |
true -> erase({Path, fhc_file});
|
|
matthias@3268
|
633 |
false -> put({Path, fhc_file},
|
|
matthias@3268
|
634 |
File #file { reader_count = RCount1,
|
|
matthias@3268
|
635 |
has_writer = HasWriter1 })
|
|
matthias@3268
|
636 |
end,
|
|
matthias@3268
|
637 |
ok;
|
|
matthias@3268
|
638 |
{_Error, _Handle} = Result ->
|
|
matthias@3268
|
639 |
Result
|
|
matthias@3268
|
640 |
end.
|
|
matthias@3268
|
641 |
|
|
matthias@3268
|
642 |
maybe_seek(NewOffset, Handle = #handle { hdl = Hdl, offset = Offset,
|
|
matthias@3268
|
643 |
at_eof = AtEoF }) ->
|
|
matthias@3268
|
644 |
{AtEoF1, NeedsSeek} = needs_seek(AtEoF, Offset, NewOffset),
|
|
matthias@3268
|
645 |
case (case NeedsSeek of
|
|
matthias@3268
|
646 |
true -> file:position(Hdl, NewOffset);
|
|
matthias@3268
|
647 |
false -> {ok, Offset}
|
|
matthias@3268
|
648 |
end) of
|
|
matthias@3268
|
649 |
{ok, Offset1} = Result ->
|
|
matthias@3268
|
650 |
{Result, Handle #handle { offset = Offset1, at_eof = AtEoF1 }};
|
|
matthias@3268
|
651 |
{error, _} = Error ->
|
|
matthias@3268
|
652 |
{Error, Handle}
|
|
matthias@3268
|
653 |
end.
|
|
matthias@3268
|
654 |
|
|
matthias@3268
|
655 |
needs_seek( AtEoF, _CurOffset, cur ) -> {AtEoF, false};
|
|
matthias@3268
|
656 |
needs_seek( AtEoF, _CurOffset, {cur, 0}) -> {AtEoF, false};
|
|
matthias@3268
|
657 |
needs_seek( true, _CurOffset, eof ) -> {true , false};
|
|
matthias@3268
|
658 |
needs_seek( true, _CurOffset, {eof, 0}) -> {true , false};
|
|
matthias@3268
|
659 |
needs_seek( false, _CurOffset, eof ) -> {true , true };
|
|
matthias@3268
|
660 |
needs_seek( false, _CurOffset, {eof, 0}) -> {true , true };
|
|
matthias@3268
|
661 |
needs_seek( AtEoF, 0, bof ) -> {AtEoF, false};
|
|
matthias@3268
|
662 |
needs_seek( AtEoF, 0, {bof, 0}) -> {AtEoF, false};
|
|
matthias@3268
|
663 |
needs_seek( AtEoF, CurOffset, CurOffset) -> {AtEoF, false};
|
|
matthias@3268
|
664 |
needs_seek( true, CurOffset, {bof, DesiredOffset})
|
|
matthias@3268
|
665 |
when DesiredOffset >= CurOffset ->
|
|
matthias@3268
|
666 |
{true, true};
|
|
matthias@3268
|
667 |
needs_seek( true, _CurOffset, {cur, DesiredOffset})
|
|
matthias@3268
|
668 |
when DesiredOffset > 0 ->
|
|
matthias@3268
|
669 |
{true, true};
|
|
matthias@3268
|
670 |
needs_seek( true, CurOffset, DesiredOffset) %% same as {bof, DO}
|
|
matthias@3268
|
671 |
when is_integer(DesiredOffset) andalso DesiredOffset >= CurOffset ->
|
|
matthias@3268
|
672 |
{true, true};
|
|
matthias@3268
|
673 |
%% because we can't really track size, we could well end up at EoF and not know
|
|
matthias@3268
|
674 |
needs_seek(_AtEoF, _CurOffset, _DesiredOffset) ->
|
|
matthias@3268
|
675 |
{false, true}.
|
|
matthias@3268
|
676 |
|
|
matthias@3268
|
677 |
write_buffer(Handle = #handle { write_buffer = [] }) ->
|
|
matthias@3268
|
678 |
{ok, Handle};
|
|
matthias@3268
|
679 |
write_buffer(Handle = #handle { hdl = Hdl, offset = Offset,
|
|
matthias@3268
|
680 |
write_buffer = WriteBuffer,
|
|
matthias@3268
|
681 |
write_buffer_size = DataSize,
|
|
matthias@3268
|
682 |
at_eof = true }) ->
|
|
matthias@3268
|
683 |
case file:write(Hdl, lists:reverse(WriteBuffer)) of
|
|
matthias@3268
|
684 |
ok ->
|
|
matthias@3268
|
685 |
Offset1 = Offset + DataSize,
|
|
matthias@3268
|
686 |
{ok, Handle #handle { offset = Offset1, is_dirty = true,
|
|
matthias@3268
|
687 |
write_buffer = [], write_buffer_size = 0 }};
|
|
matthias@3268
|
688 |
{error, _} = Error ->
|
|
matthias@3268
|
689 |
{Error, Handle}
|
|
matthias@3268
|
690 |
end.
|
|
matthias@3268
|
691 |
|
|
matthias@3268
|
692 |
%%----------------------------------------------------------------------------
|
|
matthias@3268
|
693 |
%% gen_server callbacks
|
|
matthias@3268
|
694 |
%%----------------------------------------------------------------------------
|
|
matthias@3268
|
695 |
|
|
matthias@3268
|
696 |
init([]) ->
|
|
matthias@3268
|
697 |
Limit = case application:get_env(file_handles_high_watermark) of
|
|
matthias@3268
|
698 |
{ok, Watermark} when (is_integer(Watermark) andalso
|
|
matthias@3268
|
699 |
Watermark > 0) ->
|
|
matthias@3268
|
700 |
Watermark;
|
|
matthias@3268
|
701 |
_ ->
|
|
matthias@3268
|
702 |
ulimit()
|
|
matthias@3268
|
703 |
end,
|
|
matthias@3268
|
704 |
error_logger:info_msg("Limiting to approx ~p file handles~n", [Limit]),
|
|
matthias@3268
|
705 |
{ok, #fhc_state { elders = dict:new(), limit = Limit, count = 0,
|
|
matthias@3268
|
706 |
obtains = [], callbacks = dict:new(),
|
|
matthias@3268
|
707 |
client_mrefs = dict:new(), timer_ref = undefined }}.
|
|
matthias@3268
|
708 |
|
|
matthias@3268
|
709 |
handle_call(obtain, From, State = #fhc_state { count = Count }) ->
|
|
matthias@3268
|
710 |
State1 = #fhc_state { count = Count1, limit = Limit, obtains = Obtains } =
|
|
matthias@3268
|
711 |
maybe_reduce(State #fhc_state { count = Count + 1 }),
|
|
matthias@3268
|
712 |
case Limit /= infinity andalso Count1 >= Limit of
|
|
matthias@3268
|
713 |
true -> {noreply, State1 #fhc_state { obtains = [From | Obtains],
|
|
matthias@3268
|
714 |
count = Count1 - 1 }};
|
|
matthias@3268
|
715 |
false -> {reply, ok, State1}
|
|
matthias@3268
|
716 |
end.
|
|
matthias@3268
|
717 |
|
|
matthias@3268
|
718 |
handle_cast({register_callback, Pid, MFA},
|
|
matthias@3268
|
719 |
State = #fhc_state { callbacks = Callbacks }) ->
|
|
matthias@3268
|
720 |
{noreply, ensure_mref(
|
|
matthias@3268
|
721 |
Pid, State #fhc_state {
|
|
matthias@3268
|
722 |
callbacks = dict:store(Pid, MFA, Callbacks) })};
|
|
matthias@3268
|
723 |
|
|
matthias@3268
|
724 |
handle_cast({open, Pid, EldestUnusedSince}, State =
|
|
matthias@3268
|
725 |
#fhc_state { elders = Elders, count = Count }) ->
|
|
matthias@3268
|
726 |
Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
|
|
matthias@3268
|
727 |
{noreply, maybe_reduce(
|
|
matthias@3268
|
728 |
ensure_mref(Pid, State #fhc_state { elders = Elders1,
|
|
matthias@3268
|
729 |
count = Count + 1 }))};
|
|
matthias@3268
|
730 |
|
|
matthias@3268
|
731 |
handle_cast({update, Pid, EldestUnusedSince}, State =
|
|
matthias@3268
|
732 |
#fhc_state { elders = Elders }) ->
|
|
matthias@3268
|
733 |
Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
|
|
matthias@3268
|
734 |
%% don't call maybe_reduce from here otherwise we can create a
|
|
matthias@3268
|
735 |
%% storm of messages
|
|
matthias@3268
|
736 |
{noreply, ensure_mref(Pid, State #fhc_state { elders = Elders1 })};
|
|
matthias@3268
|
737 |
|
|
matthias@3268
|
738 |
handle_cast({close, Pid, EldestUnusedSince}, State =
|
|
matthias@3268
|
739 |
#fhc_state { elders = Elders, count = Count }) ->
|
|
matthias@3268
|
740 |
Elders1 = case EldestUnusedSince of
|
|
matthias@3268
|
741 |
undefined -> dict:erase(Pid, Elders);
|
|
matthias@3268
|
742 |
_ -> dict:store(Pid, EldestUnusedSince, Elders)
|
|
matthias@3268
|
743 |
end,
|
|
matthias@3268
|
744 |
{noreply, process_obtains(
|
|
matthias@3268
|
745 |
ensure_mref(Pid, State #fhc_state { elders = Elders1,
|
|
matthias@3268
|
746 |
count = Count - 1 }))};
|
|
matthias@3268
|
747 |
|
|
matthias@3268
|
748 |
handle_cast(check_counts, State) ->
|
|
matthias@3268
|
749 |
{noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })};
|
|
matthias@3268
|
750 |
|
|
matthias@3268
|
751 |
handle_cast({release_on_death, Pid}, State) ->
|
|
matthias@3268
|
752 |
_MRef = erlang:monitor(process, Pid),
|
|
matthias@3268
|
753 |
{noreply, State}.
|
|
matthias@3268
|
754 |
|
|
matthias@3268
|
755 |
handle_info({'DOWN', MRef, process, Pid, _Reason}, State =
|
|
matthias@3268
|
756 |
#fhc_state { count = Count, callbacks = Callbacks,
|
|
matthias@3268
|
757 |
client_mrefs = ClientMRefs, elders = Elders }) ->
|
|
matthias@3268
|
758 |
{noreply, process_obtains(
|
|
matthias@3268
|
759 |
case dict:find(Pid, ClientMRefs) of
|
|
matthias@3268
|
760 |
{ok, MRef} -> State #fhc_state {
|
|
matthias@3268
|
761 |
elders = dict:erase(Pid, Elders),
|
|
matthias@3268
|
762 |
client_mrefs = dict:erase(Pid, ClientMRefs),
|
|
matthias@3268
|
763 |
callbacks = dict:erase(Pid, Callbacks) };
|
|
matthias@3268
|
764 |
_ -> State #fhc_state { count = Count - 1 }
|
|
matthias@3268
|
765 |
end)}.
|
|
matthias@3268
|
766 |
|
|
matthias@3268
|
767 |
terminate(_Reason, State) ->
|
|
matthias@3268
|
768 |
State.
|
|
matthias@3268
|
769 |
|
|
matthias@3268
|
770 |
code_change(_OldVsn, State, _Extra) ->
|
|
matthias@3268
|
771 |
{ok, State}.
|
|
matthias@3268
|
772 |
|
|
matthias@3268
|
773 |
%%----------------------------------------------------------------------------
|
|
matthias@3268
|
774 |
%% server helpers
|
|
matthias@3268
|
775 |
%%----------------------------------------------------------------------------
|
|
matthias@3268
|
776 |
|
|
matthias@3268
|
777 |
process_obtains(State = #fhc_state { obtains = [] }) ->
|
|
matthias@3268
|
778 |
State;
|
|
matthias@3268
|
779 |
process_obtains(State = #fhc_state { limit = Limit, count = Count })
|
|
matthias@3268
|
780 |
when Limit /= infinity andalso Count >= Limit ->
|
|
matthias@3268
|
781 |
State;
|
|
matthias@3268
|
782 |
process_obtains(State = #fhc_state { limit = Limit, count = Count,
|
|
matthias@3268
|
783 |
obtains = Obtains }) ->
|
|
matthias@3268
|
784 |
ObtainsLen = length(Obtains),
|
|
matthias@3268
|
785 |
ObtainableLen = lists:min([ObtainsLen, Limit - Count]),
|
|
matthias@3268
|
786 |
Take = ObtainsLen - ObtainableLen,
|
|
matthias@3268
|
787 |
{ObtainsNew, ObtainableRev} = lists:split(Take, Obtains),
|
|
matthias@3268
|
788 |
[gen_server:reply(From, ok) || From <- ObtainableRev],
|
|
matthias@3268
|
789 |
State #fhc_state { count = Count + ObtainableLen, obtains = ObtainsNew }.
|
|
matthias@3268
|
790 |
|
|
matthias@3268
|
791 |
maybe_reduce(State = #fhc_state { limit = Limit, count = Count, elders = Elders,
|
|
matthias@3268
|
792 |
callbacks = Callbacks, timer_ref = TRef })
|
|
matthias@3268
|
793 |
when Limit /= infinity andalso Count >= Limit ->
|
|
matthias@3268
|
794 |
Now = now(),
|
|
matthias@3268
|
795 |
{Pids, Sum, ClientCount} =
|
|
matthias@3268
|
796 |
dict:fold(fun (_Pid, undefined, Accs) ->
|
|
matthias@3268
|
797 |
Accs;
|
|
matthias@3268
|
798 |
(Pid, Eldest, {PidsAcc, SumAcc, CountAcc}) ->
|
|
matthias@3268
|
799 |
{[Pid|PidsAcc], SumAcc + timer:now_diff(Now, Eldest),
|
|
matthias@3268
|
800 |
CountAcc + 1}
|
|
matthias@3268
|
801 |
end, {[], 0, 0}, Elders),
|
|
matthias@3268
|
802 |
case Pids of
|
|
matthias@3268
|
803 |
[] -> ok;
|
|
matthias@3268
|
804 |
_ -> AverageAge = Sum / ClientCount,
|
|
matthias@3268
|
805 |
lists:foreach(
|
|
matthias@3268
|
806 |
fun (Pid) ->
|
|
matthias@3268
|
807 |
case dict:find(Pid, Callbacks) of
|
|
matthias@3268
|
808 |
error -> ok;
|
|
matthias@3268
|
809 |
{ok, {M, F, A}} -> apply(M, F, A ++ [AverageAge])
|
|
matthias@3268
|
810 |
end
|
|
matthias@3268
|
811 |
end, Pids)
|
|
matthias@3268
|
812 |
end,
|
|
matthias@3268
|
813 |
case TRef of
|
|
matthias@3268
|
814 |
undefined -> {ok, TRef1} = timer:apply_after(
|
|
matthias@3268
|
815 |
?FILE_HANDLES_CHECK_INTERVAL,
|
|
matthias@3268
|
816 |
gen_server, cast, [?SERVER, check_counts]),
|
|
matthias@3268
|
817 |
State #fhc_state { timer_ref = TRef1 };
|
|
matthias@3268
|
818 |
_ -> State
|
|
matthias@3268
|
819 |
end;
|
|
matthias@3268
|
820 |
maybe_reduce(State) ->
|
|
matthias@3268
|
821 |
State.
|
|
matthias@3268
|
822 |
|
|
matthias@3268
|
823 |
%% Googling around suggests that Windows has a limit somewhere around
|
|
matthias@3268
|
824 |
%% 16M, eg
|
|
matthias@3268
|
825 |
%% http://blogs.technet.com/markrussinovich/archive/2009/09/29/3283844.aspx
|
|
matthias@3268
|
826 |
%% For everything else, assume ulimit exists. Further googling
|
|
matthias@3268
|
827 |
%% suggests that BSDs (incl OS X), solaris and linux all agree that
|
|
matthias@3268
|
828 |
%% ulimit -n is file handles
|
|
matthias@3268
|
829 |
ulimit() ->
|
|
matthias@3268
|
830 |
case os:type() of
|
|
matthias@3268
|
831 |
{win32, _OsName} ->
|
|
matthias@3268
|
832 |
?FILE_HANDLES_LIMIT_WINDOWS;
|
|
matthias@3268
|
833 |
{unix, _OsName} ->
|
|
matthias@3268
|
834 |
%% Under Linux, Solaris and FreeBSD, ulimit is a shell
|
|
matthias@3268
|
835 |
%% builtin, not a command. In OS X, it's a command.
|
|
matthias@3268
|
836 |
%% Fortunately, os:cmd invokes the cmd in a shell env, so
|
|
matthias@3268
|
837 |
%% we're safe in all cases.
|
|
matthias@3268
|
838 |
case os:cmd("ulimit -n") of
|
|
matthias@3268
|
839 |
"unlimited" ->
|
|
matthias@3268
|
840 |
infinity;
|
|
matthias@3268
|
841 |
String = [C|_] when $0 =< C andalso C =< $9 ->
|
|
matthias@3268
|
842 |
Num = list_to_integer(
|
|
matthias@3268
|
843 |
lists:takewhile(
|
|
matthias@3268
|
844 |
fun (D) -> $0 =< D andalso D =< $9 end, String)) -
|
|
matthias@3268
|
845 |
?RESERVED_FOR_OTHERS,
|
|
matthias@3268
|
846 |
lists:max([1, Num]);
|
|
matthias@3268
|
847 |
_ ->
|
|
matthias@3268
|
848 |
%% probably a variant of
|
|
matthias@3268
|
849 |
%% "/bin/sh: line 1: ulimit: command not found\n"
|
|
matthias@3268
|
850 |
?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS
|
|
matthias@3268
|
851 |
end;
|
|
matthias@3268
|
852 |
_ ->
|
|
matthias@3268
|
853 |
?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS
|
|
matthias@3268
|
854 |
end.
|
|
matthias@3268
|
855 |
|
|
matthias@3268
|
856 |
ensure_mref(Pid, State = #fhc_state { client_mrefs = ClientMRefs }) ->
|
|
matthias@3268
|
857 |
case dict:find(Pid, ClientMRefs) of
|
|
matthias@3268
|
858 |
{ok, _MRef} -> State;
|
|
matthias@3268
|
859 |
error -> MRef = erlang:monitor(process, Pid),
|
|
matthias@3268
|
860 |
State #fhc_state {
|
|
matthias@3268
|
861 |
client_mrefs = dict:store(Pid, MRef, ClientMRefs) }
|
|
matthias@3268
|
862 |
end.
|