channels_manager (and channel_sup_sup) is started last in a connection bug23343
authorAlexandru Scvortov <alexandru@rabbitmq.com>
Thu Aug 04 14:37:59 2011 +0100 (9 months ago)
branchbug23343
changeset 1298fe12e6742ead
parent 1297 f57cfcaf8ad3
child 1299 71e1c42ac5e9
channels_manager (and channel_sup_sup) is started last in a connection

Don't start the channels manager before the infrastructure (and hence
don't thread its pid through 3 different processes). Just have
main_reader request the channel manager's pid on demand.
src/amqp_connection_sup.erl
src/amqp_connection_type_sup.erl
src/amqp_direct_connection.erl
src/amqp_gen_connection.erl
src/amqp_main_reader.erl
src/amqp_network_connection.erl
     1.1 --- a/src/amqp_connection_sup.erl	Thu Aug 04 12:13:08 2011 +0100
     1.2 +++ b/src/amqp_connection_sup.erl	Thu Aug 04 14:37:59 2011 +0100
     1.3 @@ -50,14 +50,14 @@
     1.4  %%---------------------------------------------------------------------------
     1.5  
     1.6  start_infrastructure_fun(Sup, network) ->
     1.7 -    fun (Sock, ChMgr) ->
     1.8 +    fun (Sock) ->
     1.9              Connection = self(),
    1.10              {ok, CTSup, {MainReader, AState, Writer}} =
    1.11                  supervisor2:start_child(
    1.12                    Sup,
    1.13                    {connection_type_sup, {amqp_connection_type_sup,
    1.14                                           start_link_network,
    1.15 -                                         [Sock, Connection, ChMgr]},
    1.16 +                                         [Sock, Connection]},
    1.17                     transient, infinity, supervisor,
    1.18                     [amqp_connection_type_sup]}),
    1.19              {ok, {MainReader, AState, Writer,
     2.1 --- a/src/amqp_connection_type_sup.erl	Thu Aug 04 12:13:08 2011 +0100
     2.2 +++ b/src/amqp_connection_type_sup.erl	Thu Aug 04 14:37:59 2011 +0100
     2.3 @@ -21,7 +21,7 @@
     2.4  
     2.5  -behaviour(supervisor2).
     2.6  
     2.7 --export([start_link_direct/0, start_link_network/3, start_heartbeat_fun/1]).
     2.8 +-export([start_link_direct/0, start_link_network/2, start_heartbeat_fun/1]).
     2.9  -export([init/1]).
    2.10  
    2.11  %%---------------------------------------------------------------------------
    2.12 @@ -37,7 +37,7 @@
    2.13             transient, ?MAX_WAIT, worker, [rabbit_queue_collector]}),
    2.14      {ok, Sup, Collector}.
    2.15  
    2.16 -start_link_network(Sock, Connection, ChMgr) ->
    2.17 +start_link_network(Sock, Connection) ->
    2.18      {ok, Sup} = supervisor2:start_link(?MODULE, []),
    2.19      {ok, AState} = rabbit_command_assembler:init(?PROTOCOL),
    2.20      {ok, Writer} =
    2.21 @@ -50,7 +50,7 @@
    2.22          supervisor2:start_child(
    2.23            Sup,
    2.24            {main_reader, {amqp_main_reader, start_link,
    2.25 -                         [Sock, Connection, ChMgr, AState]},
    2.26 +                         [Sock, Connection, AState]},
    2.27             transient, ?MAX_WAIT, worker, [amqp_main_reader]}),
    2.28      {ok, Sup, {MainReader, AState, Writer}}.
    2.29  
     3.1 --- a/src/amqp_direct_connection.erl	Thu Aug 04 12:13:08 2011 +0100
     3.2 +++ b/src/amqp_direct_connection.erl	Thu Aug 04 14:37:59 2011 +0100
     3.3 @@ -21,7 +21,7 @@
     3.4  
     3.5  -behaviour(amqp_gen_connection).
     3.6  
     3.7 --export([init/1, terminate/2, connect/4, do/2, open_channel_args/1, i/2,
     3.8 +-export([init/1, terminate/2, connect/3, do/2, open_channel_args/1, i/2,
     3.9           info_keys/0, handle_message/2, closing/3, channels_terminated/1]).
    3.10  
    3.11  -record(state, {node,
    3.12 @@ -97,7 +97,7 @@
    3.13                                       node         = Node,
    3.14                                       adapter_info = Info,
    3.15                                       virtual_host = VHost},
    3.16 -        SIF, _ChMgr, State) ->
    3.17 +        SIF, State) ->
    3.18      State1 = State#state{node         = Node,
    3.19                           vhost        = VHost,
    3.20                           params       = Params,
     4.1 --- a/src/amqp_gen_connection.erl	Thu Aug 04 12:13:08 2011 +0100
     4.2 +++ b/src/amqp_gen_connection.erl	Thu Aug 04 14:37:59 2011 +0100
     4.3 @@ -23,7 +23,7 @@
     4.4  
     4.5  -export([start_link/5, connect/1, open_channel/3, hard_error_in_channel/3,
     4.6           channel_internal_error/3, server_misbehaved/2, channels_terminated/1,
     4.7 -         close/2, info/2, info_keys/0, info_keys/1]).
     4.8 +         close/2, info/2, info_keys/0, info_keys/1, get_channel_manager/1]).
     4.9  -export([behaviour_info/1]).
    4.10  -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
    4.11           handle_info/2]).
    4.12 @@ -85,6 +85,9 @@
    4.13  close(Pid, Close) ->
    4.14      gen_server:call(Pid, {command, {close, Close}}, infinity).
    4.15  
    4.16 +get_channel_manager(Pid) ->
    4.17 +    gen_server:call(Pid, get_channel_manager, infinity).
    4.18 +
    4.19  info(Pid, Items) ->
    4.20      gen_server:call(Pid, {info, Items}, infinity).
    4.21  
    4.22 @@ -106,12 +109,12 @@
    4.23       %% terminate(Reason, FinalState) -> Ignored
    4.24       {terminate, 2},
    4.25  
    4.26 -     %% connect(AmqpParams, SIF, ChMgr, State) ->
    4.27 +     %% connect(AmqpParams, SIF, State) ->
    4.28       %%     {ok, ConnectParams} | {closing, ConnectParams, AmqpError, Reply} |
    4.29       %%         {error, Error}
    4.30       %% where
    4.31       %%     ConnectParams = {ServerProperties, ChannelMax, NewState}
    4.32 -     {connect, 4},
    4.33 +     {connect, 3},
    4.34  
    4.35       %% do(Method, State) -> Ignored
    4.36       {do, 2},
    4.37 @@ -167,12 +170,14 @@
    4.38                              amqp_params = AmqpParams,
    4.39                              start_infrastructure_fun = SIF,
    4.40                              start_channels_manager_fun = SChMF}) ->
    4.41 -    {ok, ChMgr} = SChMF(),
    4.42 -    State1 = State0#state{channels_manager = ChMgr},
    4.43 -    case Mod:connect(AmqpParams, SIF, ChMgr, MState) of
    4.44 +    case Mod:connect(AmqpParams, SIF, MState) of
    4.45          {ok, Params} ->
    4.46 +            {ok, ChMgr} = SChMF(),
    4.47 +            State1 = State0#state{channels_manager = ChMgr},
    4.48              {reply, {ok, self()}, after_connect(Params, State1)};
    4.49          {closing, Params, #amqp_error{} = AmqpError, Error} ->
    4.50 +            {ok, ChMgr} = SChMF(),
    4.51 +            State1 = State0#state{channels_manager = ChMgr},
    4.52              server_misbehaved(self(), AmqpError),
    4.53              {reply, Error, after_connect(Params, State1)};
    4.54          {error, _} = Error ->
    4.55 @@ -182,6 +187,9 @@
    4.56      case Closing of false -> handle_command(Command, From, State);
    4.57                      _     -> {reply, closing, State}
    4.58      end;
    4.59 +handle_call(get_channel_manager, _From,
    4.60 +            State = #state{channels_manager = ChMgr}) ->
    4.61 +    {reply, ChMgr, State};
    4.62  handle_call({info, Items}, _From, State) ->
    4.63      {reply, [{Item, i(Item, State)} || Item <- Items], State};
    4.64  handle_call(info_keys, _From, State = #state{module = Mod}) ->
     5.1 --- a/src/amqp_main_reader.erl	Thu Aug 04 12:13:08 2011 +0100
     5.2 +++ b/src/amqp_main_reader.erl	Thu Aug 04 14:37:59 2011 +0100
     5.3 @@ -21,7 +21,7 @@
     5.4  
     5.5  -behaviour(gen_server).
     5.6  
     5.7 --export([start_link/4]).
     5.8 +-export([start_link/3]).
     5.9  -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
    5.10           handle_info/2]).
    5.11  
    5.12 @@ -36,17 +36,16 @@
    5.13  %% Interface
    5.14  %%---------------------------------------------------------------------------
    5.15  
    5.16 -start_link(Sock, Connection, ChMgr, AState) ->
    5.17 -    gen_server:start_link(?MODULE, [Sock, Connection, ChMgr, AState], []).
    5.18 +start_link(Sock, Connection, AState) ->
    5.19 +    gen_server:start_link(?MODULE, [Sock, Connection, AState], []).
    5.20  
    5.21  %%---------------------------------------------------------------------------
    5.22  %% gen_server callbacks
    5.23  %%---------------------------------------------------------------------------
    5.24  
    5.25 -init([Sock, Connection, ChMgr, AState]) ->
    5.26 +init([Sock, Connection, AState]) ->
    5.27      {ok, _Ref} = rabbit_net:async_recv(Sock, 7, infinity),
    5.28 -    {ok, #state{sock = Sock, connection = Connection,
    5.29 -                channels_manager = ChMgr, astate = AState}}.
    5.30 +    {ok, #state{sock = Sock, connection = Connection, astate = AState}}.
    5.31  
    5.32  terminate(_Reason, _State) ->
    5.33      ok.
    5.34 @@ -106,6 +105,10 @@
    5.35  pass_frame(0, Frame, State = #state{connection = Conn, astate = AState}) ->
    5.36      State#state{astate = rabbit_reader:process_channel_frame(Frame, Conn,
    5.37                                                               0, Conn, AState)};
    5.38 +pass_frame(Number, Frame, State = #state{channels_manager = undefined,
    5.39 +                                         connection       = Connection}) ->
    5.40 +    ChMgr = amqp_gen_connection:get_channel_manager(Connection),
    5.41 +    pass_frame(Number, Frame, State#state{channels_manager = ChMgr});
    5.42  pass_frame(Number, Frame, State = #state{channels_manager = ChMgr}) ->
    5.43      amqp_channels_manager:pass_frame(ChMgr, Number, Frame),
    5.44      State.
     6.1 --- a/src/amqp_network_connection.erl	Thu Aug 04 12:13:08 2011 +0100
     6.2 +++ b/src/amqp_network_connection.erl	Thu Aug 04 14:37:59 2011 +0100
     6.3 @@ -21,7 +21,7 @@
     6.4  
     6.5  -behaviour(amqp_gen_connection).
     6.6  
     6.7 --export([init/1, terminate/2, connect/4, do/2, open_channel_args/1, i/2,
     6.8 +-export([init/1, terminate/2, connect/3, do/2, open_channel_args/1, i/2,
     6.9           info_keys/0, handle_message/2, closing/3, channels_terminated/1]).
    6.10  
    6.11  -define(RABBIT_TCP_OPTS, [binary, {packet, 0}, {active,false}, {nodelay, true}]).
    6.12 @@ -101,23 +101,23 @@
    6.13  connect(AmqpParams = #amqp_params_network{ssl_options = none,
    6.14                                            host        = Host,
    6.15                                            port        = Port},
    6.16 -        SIF, ChMgr, State) ->
    6.17 +        SIF, State) ->
    6.18      case gen_tcp:connect(Host, Port, ?RABBIT_TCP_OPTS) of
    6.19 -        {ok, Sock}     -> try_handshake(AmqpParams, SIF, ChMgr,
    6.20 +        {ok, Sock}     -> try_handshake(AmqpParams, SIF,
    6.21                                          State#state{sock = Sock});
    6.22          {error, _} = E -> E
    6.23      end;
    6.24  connect(AmqpParams = #amqp_params_network{ssl_options = SslOpts,
    6.25                                            host        = Host,
    6.26                                            port        = Port},
    6.27 -        SIF, ChMgr, State) ->
    6.28 +        SIF, State) ->
    6.29      rabbit_misc:start_applications([crypto, public_key, ssl]),
    6.30      case gen_tcp:connect(Host, Port, ?RABBIT_TCP_OPTS) of
    6.31          {ok, Sock} ->
    6.32              case ssl:connect(Sock, SslOpts) of
    6.33                  {ok, SslSock} ->
    6.34                      RabbitSslSock = #ssl_socket{ssl = SslSock, tcp = Sock},
    6.35 -                    try_handshake(AmqpParams, SIF, ChMgr,
    6.36 +                    try_handshake(AmqpParams, SIF,
    6.37                                    State#state{sock = RabbitSslSock});
    6.38                  {error, _} = E ->
    6.39                      E
    6.40 @@ -126,19 +126,19 @@
    6.41              E
    6.42      end.
    6.43  
    6.44 -try_handshake(AmqpParams, SIF, ChMgr, State) ->
    6.45 -    try handshake(AmqpParams, SIF, ChMgr, State) of
    6.46 +try_handshake(AmqpParams, SIF, State) ->
    6.47 +    try handshake(AmqpParams, SIF, State) of
    6.48          Return -> Return
    6.49      catch exit:Reason -> {error, Reason}
    6.50      end.
    6.51  
    6.52 -handshake(AmqpParams, SIF, ChMgr, State0 = #state{sock = Sock}) ->
    6.53 +handshake(AmqpParams, SIF, State0 = #state{sock = Sock}) ->
    6.54      ok = rabbit_net:send(Sock, ?PROTOCOL_HEADER),
    6.55 -    {SHF, State1} = start_infrastructure(SIF, ChMgr, State0),
    6.56 +    {SHF, State1} = start_infrastructure(SIF, State0),
    6.57      network_handshake(AmqpParams, SHF, State1).
    6.58  
    6.59 -start_infrastructure(SIF, ChMgr, State = #state{sock = Sock}) ->
    6.60 -    {ok, {_MainReader, _AState, Writer, SHF}} = SIF(Sock, ChMgr),
    6.61 +start_infrastructure(SIF, State = #state{sock = Sock}) ->
    6.62 +    {ok, {_MainReader, _AState, Writer, SHF}} = SIF(Sock),
    6.63      {SHF, State#state{writer0 = Writer}}.
    6.64  
    6.65  network_handshake(AmqpParams = #amqp_params_network{virtual_host = VHost},