src/rabbit_multi.erl
author Matthew Sackman <matthew@lshift.net>
Thu Nov 19 16:43:17 2009 +0000 (2009-11-19)
branchbug21673
changeset 2271 9efe4a2ff6a5
parent 1996 87fe0f4d75d5
child 2325 197fe38f8d78
child 2427 dd1adccc4579
permissions -rw-r--r--
Finished the file handle cache. It works as follows:
1) Every client keeps a gb_tree of timestamp-when-fd-was-last-used => fd_ref. This is updated for each action.
2) When a client opens a file or closes a file, it sends a suitable msg to the server, including the smallest timestamp-when-fd-was-last-used (i.e. least recently used fd)
3) The server counts how many fds have been used
4) When too many fds have been used, it finds the average age of the least-recently-used-fds and tells all clients to close anything older than that
5) This is likely to have no effect, because the clients may have since used the fds, thus the ages will be wrong. Regardless of whether any fds have been closed at this point, all the clients send back to the server their current smallest timestamp-when-fd-was-last-used
6) 2 seconds later, the server checks to see if the situation has improved, and if not, using the now updated information (thus the average age will be lower) may choose to further ask all clients to kill off fhs. This will repeat, albeit not that fast until enough fds have been closed.
     1 %%   The contents of this file are subject to the Mozilla Public License
     2 %%   Version 1.1 (the "License"); you may not use this file except in
     3 %%   compliance with the License. You may obtain a copy of the License at
     4 %%   http://www.mozilla.org/MPL/
     5 %%
     6 %%   Software distributed under the License is distributed on an "AS IS"
     7 %%   basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
     8 %%   License for the specific language governing rights and limitations
     9 %%   under the License.
    10 %%
    11 %%   The Original Code is RabbitMQ.
    12 %%
    13 %%   The Initial Developers of the Original Code are LShift Ltd,
    14 %%   Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
    15 %%
    16 %%   Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
    17 %%   Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
    18 %%   are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
    19 %%   Technologies LLC, and Rabbit Technologies Ltd.
    20 %%
    21 %%   Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
    22 %%   Ltd. Portions created by Cohesive Financial Technologies LLC are
    23 %%   Copyright (C) 2007-2009 Cohesive Financial Technologies
    24 %%   LLC. Portions created by Rabbit Technologies Ltd are Copyright
    25 %%   (C) 2007-2009 Rabbit Technologies Ltd.
    26 %%
    27 %%   All Rights Reserved.
    28 %%
    29 %%   Contributor(s): ______________________________________.
    30 %%
    31 
    32 -module(rabbit_multi).
    33 -include("rabbit.hrl").
    34 
    35 -export([start/0, stop/0]).
    36 
    37 -define(RPC_SLEEP, 500).
    38 
    39 %%----------------------------------------------------------------------------
    40 
    41 -ifdef(use_specs).
    42 
    43 -spec(start/0 :: () -> no_return()).
    44 -spec(stop/0 :: () -> 'ok').
    45 
    46 -endif.
    47 
    48 %%----------------------------------------------------------------------------
    49 
    50 start() ->
    51     RpcTimeout =
    52         case init:get_argument(maxwait) of
    53             {ok,[[N1]]} -> 1000 * list_to_integer(N1);
    54             _ -> 30000
    55         end,
    56     case init:get_plain_arguments() of
    57         [] ->
    58             usage();
    59         FullCommand ->
    60             {Command, Args} = parse_args(FullCommand),
    61             case catch action(Command, Args, RpcTimeout) of
    62                 ok ->
    63                     io:format("done.~n"),
    64                     halt();
    65                 {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
    66                     error("invalid command '~s'",
    67                           [lists:flatten(
    68                              rabbit_misc:intersperse(" ", FullCommand))]),
    69                     usage();
    70                 timeout ->
    71                     error("timeout starting some nodes.", []),
    72                     halt(1);
    73                 Other ->
    74                     error("~p", [Other]),
    75                     halt(2)
    76             end
    77     end.
    78 
    79 error(Format, Args) ->
    80     rabbit_misc:format_stderr("Error: " ++ Format ++ "~n", Args).
    81 
    82 parse_args([Command | Args]) ->
    83     {list_to_atom(Command), Args}.
    84 
    85 stop() ->
    86     ok.
    87 
    88 usage() ->
    89     io:format("Usage: rabbitmq-multi <command>
    90 
    91 Available commands:
    92 
    93   start_all <NodeCount> - start a local cluster of RabbitMQ nodes.
    94   status                - print status of all running nodes
    95   stop_all              - stops all local RabbitMQ nodes.
    96   rotate_logs [Suffix]  - rotate logs for all local and running RabbitMQ nodes.
    97 "),
    98     halt(3).
    99 
   100 action(start_all, [NodeCount], RpcTimeout) ->
   101     io:format("Starting all nodes...~n", []),
   102     N = list_to_integer(NodeCount),
   103     {NodePids, Running} =
   104         start_nodes(N, N, [], true,
   105                     rabbit_misc:nodeparts(
   106                       getenv("RABBITMQ_NODENAME")),
   107                     list_to_integer(getenv("RABBITMQ_NODE_PORT")),
   108                     RpcTimeout),
   109     write_pids_file(NodePids),
   110     case Running of
   111         true  -> ok;
   112         false -> timeout
   113     end;
   114 
   115 action(status, [], RpcTimeout) ->
   116     io:format("Status of all running nodes...~n", []),
   117     call_all_nodes(
   118       fun({Node, Pid}) ->
   119               RabbitRunning =
   120                   case is_rabbit_running(Node, RpcTimeout) of
   121                       false -> not_running;
   122                       true  -> running
   123                   end,
   124               io:format("Node '~p' with Pid ~p: ~p~n",
   125                         [Node, Pid, RabbitRunning])
   126       end);
   127 
   128 action(stop_all, [], RpcTimeout) ->
   129     io:format("Stopping all nodes...~n", []),
   130     call_all_nodes(fun({Node, Pid}) ->
   131                            io:format("Stopping node ~p~n", [Node]),
   132                            rpc:call(Node, rabbit, stop_and_halt, []),
   133                            case kill_wait(Pid, RpcTimeout, false) of
   134                                false -> kill_wait(Pid, RpcTimeout, true);
   135                                true  -> ok
   136                            end,
   137                            io:format("OK~n", [])
   138                    end),
   139     delete_pids_file();
   140 
   141 action(rotate_logs, [], RpcTimeout) ->
   142     action(rotate_logs, [""], RpcTimeout);
   143 
   144 action(rotate_logs, [Suffix], RpcTimeout) ->
   145     io:format("Rotating logs for all nodes...~n", []),
   146     BinarySuffix = list_to_binary(Suffix),
   147     call_all_nodes(
   148       fun ({Node, _}) ->
   149               io:format("Rotating logs for node ~p", [Node]),
   150               case rpc:call(Node, rabbit, rotate_logs,
   151                             [BinarySuffix], RpcTimeout) of
   152                   {badrpc, Error} -> io:format(": ~p.~n", [Error]);
   153                   ok              -> io:format(": ok.~n", [])
   154               end
   155       end).
   156 
   157 %% PNodePid is the list of PIDs
   158 %% Running is a boolean exhibiting success at some moment
   159 start_nodes(0, _, PNodePid, Running, _, _, _) -> {PNodePid, Running};
   160 
   161 start_nodes(N, Total, PNodePid, Running,
   162             NodeNameBase, NodePortBase, RpcTimeout) ->
   163     {NodePre, NodeSuff} = NodeNameBase,
   164     NodeNumber = Total - N,
   165     NodePre1 = if NodeNumber == 0 ->
   166                        %% For compatibility with running a single node
   167                        NodePre;
   168                   true ->           
   169                        NodePre ++ "_" ++ integer_to_list(NodeNumber)
   170                end,
   171     {NodePid, Started} = start_node(rabbit_misc:makenode({NodePre1, NodeSuff}),
   172                                     NodePortBase + NodeNumber,
   173                                     RpcTimeout),
   174     start_nodes(N - 1, Total, [NodePid | PNodePid],
   175                 Started and Running,
   176                 NodeNameBase, NodePortBase, RpcTimeout).
   177 
   178 start_node(Node, NodePort, RpcTimeout) ->
   179     os:putenv("RABBITMQ_NODENAME", atom_to_list(Node)),
   180     os:putenv("RABBITMQ_NODE_PORT", integer_to_list(NodePort)),
   181     io:format("Starting node ~s...~n", [Node]),
   182     case rpc:call(Node, os, getpid, []) of
   183         {badrpc, _} ->
   184             Port = run_cmd(script_filename()),
   185             Started = wait_for_rabbit_to_start(Node, RpcTimeout, Port),
   186             Pid = case rpc:call(Node, os, getpid, []) of
   187                       {badrpc, _} -> throw(cannot_get_pid);
   188                       PidS -> list_to_integer(PidS)
   189                   end,
   190             io:format("~s~n", [case Started of
   191                                    true  -> "OK";
   192                                    false -> "timeout"
   193                                end]),
   194             {{Node, Pid}, Started};
   195         PidS ->
   196             Pid = list_to_integer(PidS),
   197             throw({node_already_running, Node, Pid})
   198     end.
   199 
   200 wait_for_rabbit_to_start(_ , RpcTimeout, _) when RpcTimeout < 0 ->
   201     false;
   202 wait_for_rabbit_to_start(Node, RpcTimeout, Port) ->
   203     case is_rabbit_running(Node, RpcTimeout) of
   204         true  -> true;
   205         false -> receive
   206                      {'EXIT', Port, PosixCode} ->
   207                          throw({node_start_failed, PosixCode})
   208                  after ?RPC_SLEEP ->
   209                          wait_for_rabbit_to_start(
   210                            Node, RpcTimeout - ?RPC_SLEEP, Port)
   211                  end
   212     end.
   213 
   214 run_cmd(FullPath) ->
   215     erlang:open_port({spawn, FullPath}, [nouse_stdio]).
   216 
   217 is_rabbit_running(Node, RpcTimeout) ->
   218     case rpc:call(Node, rabbit, status, [], RpcTimeout) of
   219         {badrpc, _} -> false;
   220         Status      -> case proplists:get_value(running_applications, Status) of
   221                            undefined -> false;
   222                            Apps      -> lists:keymember(rabbit, 1, Apps)
   223                        end
   224     end.
   225 
   226 with_os(Handlers) ->
   227     {OsFamily, _} = os:type(),
   228     case proplists:get_value(OsFamily, Handlers) of
   229         undefined -> throw({unsupported_os, OsFamily});
   230         Handler   -> Handler()
   231     end.
   232 
   233 script_filename() ->
   234     ScriptHome = getenv("RABBITMQ_SCRIPT_HOME"),
   235     ScriptName = with_os(
   236                    [{unix , fun () -> "rabbitmq-server" end},
   237                     {win32, fun () -> "rabbitmq-server.bat" end}]),
   238     ScriptHome ++ "/" ++ ScriptName ++ " -noinput".
   239 
   240 pids_file() -> getenv("RABBITMQ_PIDS_FILE").
   241 
   242 write_pids_file(Pids) ->
   243     FileName = pids_file(),
   244     Handle = case file:open(FileName, [write]) of
   245                  {ok, Device} ->
   246                      Device;
   247                  {error, Reason} ->
   248                      throw({cannot_create_pids_file, FileName, Reason})
   249              end,
   250     try
   251         ok = io:write(Handle, Pids),
   252         ok = io:put_chars(Handle, [$.])
   253     after
   254         case file:close(Handle) of
   255             ok -> ok;
   256             {error, Reason1} ->
   257                 throw({cannot_create_pids_file, FileName, Reason1})
   258         end
   259     end,
   260     ok.
   261 
   262 delete_pids_file() ->
   263     FileName = pids_file(),
   264     case file:delete(FileName) of
   265         ok              -> ok;
   266         {error, enoent} -> ok;
   267         {error, Reason} -> throw({cannot_delete_pids_file, FileName, Reason})
   268     end.
   269 
   270 read_pids_file() ->
   271     FileName = pids_file(),
   272     case file:consult(FileName) of
   273         {ok, [Pids]}    -> Pids;
   274         {error, enoent} -> [];
   275         {error, Reason} -> throw({cannot_read_pids_file, FileName, Reason})
   276     end.
   277 
   278 kill_wait(Pid, TimeLeft, Forceful) when TimeLeft < 0 ->
   279     Cmd = with_os([{unix, fun () -> if Forceful -> "kill -9";
   280                                        true     -> "kill"
   281                                     end
   282                           end},
   283                    %% Kill forcefully always on Windows, since erl.exe
   284                    %% seems to completely ignore non-forceful killing
   285                    %% even when everything is working
   286                    {win32, fun () -> "taskkill /f /pid" end}]),
   287     os:cmd(Cmd ++ " " ++ integer_to_list(Pid)),
   288     false; % Don't assume what we did just worked!
   289 
   290 % Returns true if the process is dead, false otherwise.
   291 kill_wait(Pid, TimeLeft, Forceful) ->
   292     timer:sleep(?RPC_SLEEP),
   293     io:format(".", []),
   294     is_dead(Pid) orelse kill_wait(Pid, TimeLeft - ?RPC_SLEEP, Forceful).
   295 
   296 % Test using some OS clunkiness since we shouldn't trust 
   297 % rpc:call(os, getpid, []) at this point
   298 is_dead(Pid) ->
   299     PidS = integer_to_list(Pid),
   300     with_os([{unix, fun () ->
   301                             Res = os:cmd("ps --no-headers --pid " ++ PidS),
   302                             Res == ""
   303                     end},
   304              {win32, fun () ->
   305                              Res = os:cmd("tasklist /nh /fi \"pid eq " ++
   306                                           PidS ++ "\""),
   307                              case regexp:first_match(Res, "erl.exe") of
   308                                  {match, _, _} -> false;
   309                                  _             -> true
   310                              end
   311                      end}]).
   312 
   313 call_all_nodes(Func) ->
   314     case read_pids_file() of
   315         []       -> throw(no_nodes_running);
   316         NodePids -> lists:foreach(Func, NodePids)
   317     end.
   318 
   319 getenv(Var) ->
   320     case os:getenv(Var) of
   321         false -> throw({missing_env_var, Var});
   322         Value -> Value
   323     end.