Finished the file handle cache. It works as follows:
1) Every client keeps a gb_tree of timestamp-when-fd-was-last-used => fd_ref. This is updated for each action.
2) When a client opens a file or closes a file, it sends a suitable msg to the server, including the smallest timestamp-when-fd-was-last-used (i.e. least recently used fd)
3) The server counts how many fds have been used
4) When too many fds have been used, it finds the average age of the least-recently-used-fds and tells all clients to close anything older than that
5) This is likely to have no effect, because the clients may have since used the fds, thus the ages will be wrong. Regardless of whether any fds have been closed at this point, all the clients send back to the server their current smallest timestamp-when-fd-was-last-used
6) 2 seconds later, the server checks to see if the situation has improved, and if not, using the now updated information (thus the average age will be lower) may choose to further ask all clients to kill off fhs. This will repeat, albeit not that fast until enough fds have been closed.
1 %% The contents of this file are subject to the Mozilla Public License
2 %% Version 1.1 (the "License"); you may not use this file except in
3 %% compliance with the License. You may obtain a copy of the License at
4 %% http://www.mozilla.org/MPL/
6 %% Software distributed under the License is distributed on an "AS IS"
7 %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8 %% License for the specific language governing rights and limitations
11 %% The Original Code is RabbitMQ.
13 %% The Initial Developers of the Original Code are LShift Ltd,
14 %% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
16 %% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
17 %% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
18 %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
19 %% Technologies LLC, and Rabbit Technologies Ltd.
21 %% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
22 %% Ltd. Portions created by Cohesive Financial Technologies LLC are
23 %% Copyright (C) 2007-2009 Cohesive Financial Technologies
24 %% LLC. Portions created by Rabbit Technologies Ltd are Copyright
25 %% (C) 2007-2009 Rabbit Technologies Ltd.
27 %% All Rights Reserved.
29 %% Contributor(s): ______________________________________.
32 -module(rabbit_multi).
33 -include("rabbit.hrl").
35 -export([start/0, stop/0]).
37 -define(RPC_SLEEP, 500).
39 %%----------------------------------------------------------------------------
43 -spec(start/0 :: () -> no_return()).
44 -spec(stop/0 :: () -> 'ok').
48 %%----------------------------------------------------------------------------
52 case init:get_argument(maxwait) of
53 {ok,[[N1]]} -> 1000 * list_to_integer(N1);
56 case init:get_plain_arguments() of
60 {Command, Args} = parse_args(FullCommand),
61 case catch action(Command, Args, RpcTimeout) of
65 {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
66 error("invalid command '~s'",
68 rabbit_misc:intersperse(" ", FullCommand))]),
71 error("timeout starting some nodes.", []),
79 error(Format, Args) ->
80 rabbit_misc:format_stderr("Error: " ++ Format ++ "~n", Args).
82 parse_args([Command | Args]) ->
83 {list_to_atom(Command), Args}.
89 io:format("Usage: rabbitmq-multi <command>
93 start_all <NodeCount> - start a local cluster of RabbitMQ nodes.
94 status - print status of all running nodes
95 stop_all - stops all local RabbitMQ nodes.
96 rotate_logs [Suffix] - rotate logs for all local and running RabbitMQ nodes.
100 action(start_all, [NodeCount], RpcTimeout) ->
101 io:format("Starting all nodes...~n", []),
102 N = list_to_integer(NodeCount),
103 {NodePids, Running} =
104 start_nodes(N, N, [], true,
105 rabbit_misc:nodeparts(
106 getenv("RABBITMQ_NODENAME")),
107 list_to_integer(getenv("RABBITMQ_NODE_PORT")),
109 write_pids_file(NodePids),
115 action(status, [], RpcTimeout) ->
116 io:format("Status of all running nodes...~n", []),
120 case is_rabbit_running(Node, RpcTimeout) of
121 false -> not_running;
124 io:format("Node '~p' with Pid ~p: ~p~n",
125 [Node, Pid, RabbitRunning])
128 action(stop_all, [], RpcTimeout) ->
129 io:format("Stopping all nodes...~n", []),
130 call_all_nodes(fun({Node, Pid}) ->
131 io:format("Stopping node ~p~n", [Node]),
132 rpc:call(Node, rabbit, stop_and_halt, []),
133 case kill_wait(Pid, RpcTimeout, false) of
134 false -> kill_wait(Pid, RpcTimeout, true);
137 io:format("OK~n", [])
141 action(rotate_logs, [], RpcTimeout) ->
142 action(rotate_logs, [""], RpcTimeout);
144 action(rotate_logs, [Suffix], RpcTimeout) ->
145 io:format("Rotating logs for all nodes...~n", []),
146 BinarySuffix = list_to_binary(Suffix),
149 io:format("Rotating logs for node ~p", [Node]),
150 case rpc:call(Node, rabbit, rotate_logs,
151 [BinarySuffix], RpcTimeout) of
152 {badrpc, Error} -> io:format(": ~p.~n", [Error]);
153 ok -> io:format(": ok.~n", [])
157 %% PNodePid is the list of PIDs
158 %% Running is a boolean exhibiting success at some moment
159 start_nodes(0, _, PNodePid, Running, _, _, _) -> {PNodePid, Running};
161 start_nodes(N, Total, PNodePid, Running,
162 NodeNameBase, NodePortBase, RpcTimeout) ->
163 {NodePre, NodeSuff} = NodeNameBase,
164 NodeNumber = Total - N,
165 NodePre1 = if NodeNumber == 0 ->
166 %% For compatibility with running a single node
169 NodePre ++ "_" ++ integer_to_list(NodeNumber)
171 {NodePid, Started} = start_node(rabbit_misc:makenode({NodePre1, NodeSuff}),
172 NodePortBase + NodeNumber,
174 start_nodes(N - 1, Total, [NodePid | PNodePid],
176 NodeNameBase, NodePortBase, RpcTimeout).
178 start_node(Node, NodePort, RpcTimeout) ->
179 os:putenv("RABBITMQ_NODENAME", atom_to_list(Node)),
180 os:putenv("RABBITMQ_NODE_PORT", integer_to_list(NodePort)),
181 io:format("Starting node ~s...~n", [Node]),
182 case rpc:call(Node, os, getpid, []) of
184 Port = run_cmd(script_filename()),
185 Started = wait_for_rabbit_to_start(Node, RpcTimeout, Port),
186 Pid = case rpc:call(Node, os, getpid, []) of
187 {badrpc, _} -> throw(cannot_get_pid);
188 PidS -> list_to_integer(PidS)
190 io:format("~s~n", [case Started of
194 {{Node, Pid}, Started};
196 Pid = list_to_integer(PidS),
197 throw({node_already_running, Node, Pid})
200 wait_for_rabbit_to_start(_ , RpcTimeout, _) when RpcTimeout < 0 ->
202 wait_for_rabbit_to_start(Node, RpcTimeout, Port) ->
203 case is_rabbit_running(Node, RpcTimeout) of
206 {'EXIT', Port, PosixCode} ->
207 throw({node_start_failed, PosixCode})
209 wait_for_rabbit_to_start(
210 Node, RpcTimeout - ?RPC_SLEEP, Port)
215 erlang:open_port({spawn, FullPath}, [nouse_stdio]).
217 is_rabbit_running(Node, RpcTimeout) ->
218 case rpc:call(Node, rabbit, status, [], RpcTimeout) of
219 {badrpc, _} -> false;
220 Status -> case proplists:get_value(running_applications, Status) of
222 Apps -> lists:keymember(rabbit, 1, Apps)
227 {OsFamily, _} = os:type(),
228 case proplists:get_value(OsFamily, Handlers) of
229 undefined -> throw({unsupported_os, OsFamily});
234 ScriptHome = getenv("RABBITMQ_SCRIPT_HOME"),
235 ScriptName = with_os(
236 [{unix , fun () -> "rabbitmq-server" end},
237 {win32, fun () -> "rabbitmq-server.bat" end}]),
238 ScriptHome ++ "/" ++ ScriptName ++ " -noinput".
240 pids_file() -> getenv("RABBITMQ_PIDS_FILE").
242 write_pids_file(Pids) ->
243 FileName = pids_file(),
244 Handle = case file:open(FileName, [write]) of
248 throw({cannot_create_pids_file, FileName, Reason})
251 ok = io:write(Handle, Pids),
252 ok = io:put_chars(Handle, [$.])
254 case file:close(Handle) of
257 throw({cannot_create_pids_file, FileName, Reason1})
262 delete_pids_file() ->
263 FileName = pids_file(),
264 case file:delete(FileName) of
266 {error, enoent} -> ok;
267 {error, Reason} -> throw({cannot_delete_pids_file, FileName, Reason})
271 FileName = pids_file(),
272 case file:consult(FileName) of
273 {ok, [Pids]} -> Pids;
274 {error, enoent} -> [];
275 {error, Reason} -> throw({cannot_read_pids_file, FileName, Reason})
278 kill_wait(Pid, TimeLeft, Forceful) when TimeLeft < 0 ->
279 Cmd = with_os([{unix, fun () -> if Forceful -> "kill -9";
283 %% Kill forcefully always on Windows, since erl.exe
284 %% seems to completely ignore non-forceful killing
285 %% even when everything is working
286 {win32, fun () -> "taskkill /f /pid" end}]),
287 os:cmd(Cmd ++ " " ++ integer_to_list(Pid)),
288 false; % Don't assume what we did just worked!
290 % Returns true if the process is dead, false otherwise.
291 kill_wait(Pid, TimeLeft, Forceful) ->
292 timer:sleep(?RPC_SLEEP),
294 is_dead(Pid) orelse kill_wait(Pid, TimeLeft - ?RPC_SLEEP, Forceful).
296 % Test using some OS clunkiness since we shouldn't trust
297 % rpc:call(os, getpid, []) at this point
299 PidS = integer_to_list(Pid),
300 with_os([{unix, fun () ->
301 Res = os:cmd("ps --no-headers --pid " ++ PidS),
305 Res = os:cmd("tasklist /nh /fi \"pid eq " ++
307 case regexp:first_match(Res, "erl.exe") of
308 {match, _, _} -> false;
313 call_all_nodes(Func) ->
314 case read_pids_file() of
315 [] -> throw(no_nodes_running);
316 NodePids -> lists:foreach(Func, NodePids)
320 case os:getenv(Var) of
321 false -> throw({missing_env_var, Var});