1.1 --- a/src/amqp_connection_sup.erl Thu Aug 04 12:13:08 2011 +0100
1.2 +++ b/src/amqp_connection_sup.erl Thu Aug 04 14:37:59 2011 +0100
1.3 @@ -50,14 +50,14 @@
1.4 %%---------------------------------------------------------------------------
1.5
1.6 start_infrastructure_fun(Sup, network) ->
1.7 - fun (Sock, ChMgr) ->
1.8 + fun (Sock) ->
1.9 Connection = self(),
1.10 {ok, CTSup, {MainReader, AState, Writer}} =
1.11 supervisor2:start_child(
1.12 Sup,
1.13 {connection_type_sup, {amqp_connection_type_sup,
1.14 start_link_network,
1.15 - [Sock, Connection, ChMgr]},
1.16 + [Sock, Connection]},
1.17 transient, infinity, supervisor,
1.18 [amqp_connection_type_sup]}),
1.19 {ok, {MainReader, AState, Writer,
2.1 --- a/src/amqp_connection_type_sup.erl Thu Aug 04 12:13:08 2011 +0100
2.2 +++ b/src/amqp_connection_type_sup.erl Thu Aug 04 14:37:59 2011 +0100
2.3 @@ -21,7 +21,7 @@
2.4
2.5 -behaviour(supervisor2).
2.6
2.7 --export([start_link_direct/0, start_link_network/3, start_heartbeat_fun/1]).
2.8 +-export([start_link_direct/0, start_link_network/2, start_heartbeat_fun/1]).
2.9 -export([init/1]).
2.10
2.11 %%---------------------------------------------------------------------------
2.12 @@ -37,7 +37,7 @@
2.13 transient, ?MAX_WAIT, worker, [rabbit_queue_collector]}),
2.14 {ok, Sup, Collector}.
2.15
2.16 -start_link_network(Sock, Connection, ChMgr) ->
2.17 +start_link_network(Sock, Connection) ->
2.18 {ok, Sup} = supervisor2:start_link(?MODULE, []),
2.19 {ok, AState} = rabbit_command_assembler:init(?PROTOCOL),
2.20 {ok, Writer} =
2.21 @@ -50,7 +50,7 @@
2.22 supervisor2:start_child(
2.23 Sup,
2.24 {main_reader, {amqp_main_reader, start_link,
2.25 - [Sock, Connection, ChMgr, AState]},
2.26 + [Sock, Connection, AState]},
2.27 transient, ?MAX_WAIT, worker, [amqp_main_reader]}),
2.28 {ok, Sup, {MainReader, AState, Writer}}.
2.29
3.1 --- a/src/amqp_direct_connection.erl Thu Aug 04 12:13:08 2011 +0100
3.2 +++ b/src/amqp_direct_connection.erl Thu Aug 04 14:37:59 2011 +0100
3.3 @@ -21,7 +21,7 @@
3.4
3.5 -behaviour(amqp_gen_connection).
3.6
3.7 --export([init/1, terminate/2, connect/4, do/2, open_channel_args/1, i/2,
3.8 +-export([init/1, terminate/2, connect/3, do/2, open_channel_args/1, i/2,
3.9 info_keys/0, handle_message/2, closing/3, channels_terminated/1]).
3.10
3.11 -record(state, {node,
3.12 @@ -97,7 +97,7 @@
3.13 node = Node,
3.14 adapter_info = Info,
3.15 virtual_host = VHost},
3.16 - SIF, _ChMgr, State) ->
3.17 + SIF, State) ->
3.18 State1 = State#state{node = Node,
3.19 vhost = VHost,
3.20 params = Params,
4.1 --- a/src/amqp_gen_connection.erl Thu Aug 04 12:13:08 2011 +0100
4.2 +++ b/src/amqp_gen_connection.erl Thu Aug 04 14:37:59 2011 +0100
4.3 @@ -23,7 +23,7 @@
4.4
4.5 -export([start_link/5, connect/1, open_channel/3, hard_error_in_channel/3,
4.6 channel_internal_error/3, server_misbehaved/2, channels_terminated/1,
4.7 - close/2, info/2, info_keys/0, info_keys/1]).
4.8 + close/2, info/2, info_keys/0, info_keys/1, get_channel_manager/1]).
4.9 -export([behaviour_info/1]).
4.10 -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
4.11 handle_info/2]).
4.12 @@ -85,6 +85,9 @@
4.13 close(Pid, Close) ->
4.14 gen_server:call(Pid, {command, {close, Close}}, infinity).
4.15
4.16 +get_channel_manager(Pid) ->
4.17 + gen_server:call(Pid, get_channel_manager, infinity).
4.18 +
4.19 info(Pid, Items) ->
4.20 gen_server:call(Pid, {info, Items}, infinity).
4.21
4.22 @@ -106,12 +109,12 @@
4.23 %% terminate(Reason, FinalState) -> Ignored
4.24 {terminate, 2},
4.25
4.26 - %% connect(AmqpParams, SIF, ChMgr, State) ->
4.27 + %% connect(AmqpParams, SIF, State) ->
4.28 %% {ok, ConnectParams} | {closing, ConnectParams, AmqpError, Reply} |
4.29 %% {error, Error}
4.30 %% where
4.31 %% ConnectParams = {ServerProperties, ChannelMax, NewState}
4.32 - {connect, 4},
4.33 + {connect, 3},
4.34
4.35 %% do(Method, State) -> Ignored
4.36 {do, 2},
4.37 @@ -167,12 +170,14 @@
4.38 amqp_params = AmqpParams,
4.39 start_infrastructure_fun = SIF,
4.40 start_channels_manager_fun = SChMF}) ->
4.41 - {ok, ChMgr} = SChMF(),
4.42 - State1 = State0#state{channels_manager = ChMgr},
4.43 - case Mod:connect(AmqpParams, SIF, ChMgr, MState) of
4.44 + case Mod:connect(AmqpParams, SIF, MState) of
4.45 {ok, Params} ->
4.46 + {ok, ChMgr} = SChMF(),
4.47 + State1 = State0#state{channels_manager = ChMgr},
4.48 {reply, {ok, self()}, after_connect(Params, State1)};
4.49 {closing, Params, #amqp_error{} = AmqpError, Error} ->
4.50 + {ok, ChMgr} = SChMF(),
4.51 + State1 = State0#state{channels_manager = ChMgr},
4.52 server_misbehaved(self(), AmqpError),
4.53 {reply, Error, after_connect(Params, State1)};
4.54 {error, _} = Error ->
4.55 @@ -182,6 +187,9 @@
4.56 case Closing of false -> handle_command(Command, From, State);
4.57 _ -> {reply, closing, State}
4.58 end;
4.59 +handle_call(get_channel_manager, _From,
4.60 + State = #state{channels_manager = ChMgr}) ->
4.61 + {reply, ChMgr, State};
4.62 handle_call({info, Items}, _From, State) ->
4.63 {reply, [{Item, i(Item, State)} || Item <- Items], State};
4.64 handle_call(info_keys, _From, State = #state{module = Mod}) ->
5.1 --- a/src/amqp_main_reader.erl Thu Aug 04 12:13:08 2011 +0100
5.2 +++ b/src/amqp_main_reader.erl Thu Aug 04 14:37:59 2011 +0100
5.3 @@ -21,7 +21,7 @@
5.4
5.5 -behaviour(gen_server).
5.6
5.7 --export([start_link/4]).
5.8 +-export([start_link/3]).
5.9 -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
5.10 handle_info/2]).
5.11
5.12 @@ -36,17 +36,16 @@
5.13 %% Interface
5.14 %%---------------------------------------------------------------------------
5.15
5.16 -start_link(Sock, Connection, ChMgr, AState) ->
5.17 - gen_server:start_link(?MODULE, [Sock, Connection, ChMgr, AState], []).
5.18 +start_link(Sock, Connection, AState) ->
5.19 + gen_server:start_link(?MODULE, [Sock, Connection, AState], []).
5.20
5.21 %%---------------------------------------------------------------------------
5.22 %% gen_server callbacks
5.23 %%---------------------------------------------------------------------------
5.24
5.25 -init([Sock, Connection, ChMgr, AState]) ->
5.26 +init([Sock, Connection, AState]) ->
5.27 {ok, _Ref} = rabbit_net:async_recv(Sock, 7, infinity),
5.28 - {ok, #state{sock = Sock, connection = Connection,
5.29 - channels_manager = ChMgr, astate = AState}}.
5.30 + {ok, #state{sock = Sock, connection = Connection, astate = AState}}.
5.31
5.32 terminate(_Reason, _State) ->
5.33 ok.
5.34 @@ -106,6 +105,10 @@
5.35 pass_frame(0, Frame, State = #state{connection = Conn, astate = AState}) ->
5.36 State#state{astate = rabbit_reader:process_channel_frame(Frame, Conn,
5.37 0, Conn, AState)};
5.38 +pass_frame(Number, Frame, State = #state{channels_manager = undefined,
5.39 + connection = Connection}) ->
5.40 + ChMgr = amqp_gen_connection:get_channel_manager(Connection),
5.41 + pass_frame(Number, Frame, State#state{channels_manager = ChMgr});
5.42 pass_frame(Number, Frame, State = #state{channels_manager = ChMgr}) ->
5.43 amqp_channels_manager:pass_frame(ChMgr, Number, Frame),
5.44 State.
6.1 --- a/src/amqp_network_connection.erl Thu Aug 04 12:13:08 2011 +0100
6.2 +++ b/src/amqp_network_connection.erl Thu Aug 04 14:37:59 2011 +0100
6.3 @@ -21,7 +21,7 @@
6.4
6.5 -behaviour(amqp_gen_connection).
6.6
6.7 --export([init/1, terminate/2, connect/4, do/2, open_channel_args/1, i/2,
6.8 +-export([init/1, terminate/2, connect/3, do/2, open_channel_args/1, i/2,
6.9 info_keys/0, handle_message/2, closing/3, channels_terminated/1]).
6.10
6.11 -define(RABBIT_TCP_OPTS, [binary, {packet, 0}, {active,false}, {nodelay, true}]).
6.12 @@ -101,23 +101,23 @@
6.13 connect(AmqpParams = #amqp_params_network{ssl_options = none,
6.14 host = Host,
6.15 port = Port},
6.16 - SIF, ChMgr, State) ->
6.17 + SIF, State) ->
6.18 case gen_tcp:connect(Host, Port, ?RABBIT_TCP_OPTS) of
6.19 - {ok, Sock} -> try_handshake(AmqpParams, SIF, ChMgr,
6.20 + {ok, Sock} -> try_handshake(AmqpParams, SIF,
6.21 State#state{sock = Sock});
6.22 {error, _} = E -> E
6.23 end;
6.24 connect(AmqpParams = #amqp_params_network{ssl_options = SslOpts,
6.25 host = Host,
6.26 port = Port},
6.27 - SIF, ChMgr, State) ->
6.28 + SIF, State) ->
6.29 rabbit_misc:start_applications([crypto, public_key, ssl]),
6.30 case gen_tcp:connect(Host, Port, ?RABBIT_TCP_OPTS) of
6.31 {ok, Sock} ->
6.32 case ssl:connect(Sock, SslOpts) of
6.33 {ok, SslSock} ->
6.34 RabbitSslSock = #ssl_socket{ssl = SslSock, tcp = Sock},
6.35 - try_handshake(AmqpParams, SIF, ChMgr,
6.36 + try_handshake(AmqpParams, SIF,
6.37 State#state{sock = RabbitSslSock});
6.38 {error, _} = E ->
6.39 E
6.40 @@ -126,19 +126,19 @@
6.41 E
6.42 end.
6.43
6.44 -try_handshake(AmqpParams, SIF, ChMgr, State) ->
6.45 - try handshake(AmqpParams, SIF, ChMgr, State) of
6.46 +try_handshake(AmqpParams, SIF, State) ->
6.47 + try handshake(AmqpParams, SIF, State) of
6.48 Return -> Return
6.49 catch exit:Reason -> {error, Reason}
6.50 end.
6.51
6.52 -handshake(AmqpParams, SIF, ChMgr, State0 = #state{sock = Sock}) ->
6.53 +handshake(AmqpParams, SIF, State0 = #state{sock = Sock}) ->
6.54 ok = rabbit_net:send(Sock, ?PROTOCOL_HEADER),
6.55 - {SHF, State1} = start_infrastructure(SIF, ChMgr, State0),
6.56 + {SHF, State1} = start_infrastructure(SIF, State0),
6.57 network_handshake(AmqpParams, SHF, State1).
6.58
6.59 -start_infrastructure(SIF, ChMgr, State = #state{sock = Sock}) ->
6.60 - {ok, {_MainReader, _AState, Writer, SHF}} = SIF(Sock, ChMgr),
6.61 +start_infrastructure(SIF, State = #state{sock = Sock}) ->
6.62 + {ok, {_MainReader, _AState, Writer, SHF}} = SIF(Sock),
6.63 {SHF, State#state{writer0 = Writer}}.
6.64
6.65 network_handshake(AmqpParams = #amqp_params_network{virtual_host = VHost},