commiting this only in case we ever change our mind. In this fix, the connection sends a fake channel.close on behalf of the channel and waits for channel.close_ok, or times out
1.1 --- a/src/amqp_channel.erl Fri Sep 25 13:46:28 2009 +0100
1.2 +++ b/src/amqp_channel.erl Mon Sep 28 13:04:52 2009 +0100
1.3 @@ -497,7 +497,7 @@
1.4 State = #channel_state{number = ChannelNumber,
1.5 writer_pid = WriterPid}) ->
1.6 ?LOG_WARN("Channel ~p closing: received exit signal from writer. "
1.7 - "Reason: ~p~n", [ChannelNumber, Reason]),
1.8 + "Reason: ~p~n", [ChannelNumber, Reason]),
1.9 {stop, {writer_died, WriterPid, Reason}, State};
1.10
1.11 %% Handle reader exit
1.12 @@ -506,7 +506,7 @@
1.13 State = #channel_state{number = ChannelNumber,
1.14 reader_pid = ReaderPid}) ->
1.15 ?LOG_WARN("Channel ~p closing: received exit signal from reader. "
1.16 - "Reason: ~p~n", [ChannelNumber, Reason]),
1.17 + "Reason: ~p~n", [ChannelNumber, Reason]),
1.18 {stop, {reader_died, ReaderPid, Reason}, State};
1.19
1.20 %% Handle other exit
1.21 @@ -514,7 +514,7 @@
1.22 handle_info({'EXIT', Pid, Reason},
1.23 State = #channel_state{number = ChannelNumber}) ->
1.24 ?LOG_WARN("Channel ~p closing: received unexpected exit signal from (~p). "
1.25 - "Reason: ~p~n", [ChannelNumber, Pid, Reason]),
1.26 + "Reason: ~p~n", [ChannelNumber, Pid, Reason]),
1.27 {stop, {unexpected_exit, Pid, Reason}, State};
1.28
1.29 %% This is for a channel exception that is sent by the direct
2.1 --- a/src/amqp_connection.erl Fri Sep 25 13:46:28 2009 +0100
2.2 +++ b/src/amqp_connection.erl Mon Sep 28 13:04:52 2009 +0100
2.3 @@ -265,6 +265,11 @@
2.4 is_registered_channel(Channel, #connection_state{channels = Channels}) ->
2.5 dict:is_key(Channel, Channels).
2.6
2.7 +%% Resolve channel by passing {channel, ChannelNumber} or {chpid, ChannelPid}
2.8 +resolve_channel(Channel, #connection_state{channels = Channels}) ->
2.9 + {ok, Val} = dict:find(Channel, Channels),
2.10 + Val.
2.11 +
2.12
2.13 %%---------------------------------------------------------------------------
2.14 %% gen_server callbacks
2.15 @@ -342,7 +347,8 @@
2.16
2.17 %% Handle exit from other pid
2.18 %% @private
2.19 -handle_info({'EXIT', Pid, Reason}, State = #connection_state{closing = false}) ->
2.20 +handle_info({'EXIT', Pid, Reason}, State = #connection_state{closing = false,
2.21 + driver = Driver}) ->
2.22 case {is_registered_channel({chpid, Pid}, State), Reason} of
2.23 %% Normal amqp_channel shutdown
2.24 {true, normal} ->
2.25 @@ -360,7 +366,21 @@
2.26 {true, _} ->
2.27 ?LOG_WARN("Connection: Handling exit from channel (~p). "
2.28 "Reason: ~p~n", [Pid, Reason]),
2.29 - {noreply, unregister_channel({chpid, Pid}, State)};
2.30 + {channel, Number} = resolve_channel({chpid, Pid}, State),
2.31 + case catch Driver:handle_channel_death(Number, Reason, State) of
2.32 + ok ->
2.33 + {noreply, unregister_channel({chpid, Pid}, State)};
2.34 + waiting_for_close_ok_timed_out ->
2.35 + ?LOG_WARN("Timed out waiting for channel.close_ok after "
2.36 + "sending channel.close on behalf of dead channel "
2.37 + " (~p)~n", [Pid]),
2.38 + {noreply, unregister_channel({chpid, Pid}, State)};
2.39 + {'EXIT', Error} ->
2.40 + ?LOG_WARN("Failed sending 'channel.close' on behalf of "
2.41 + "channel (~p) that died. Reason: ",
2.42 + [Pid, Error]),
2.43 + {stop, {failed_sending_channel_close, Error}, State}
2.44 + end;
2.45 %% Exit signal from unknown pid
2.46 {false, _} ->
2.47 ?LOG_WARN("Connection (~p) closing: received unexpected exit signal "
3.1 --- a/src/amqp_direct_driver.erl Fri Sep 25 13:46:28 2009 +0100
3.2 +++ b/src/amqp_direct_driver.erl Mon Sep 28 13:04:52 2009 +0100
3.3 @@ -31,6 +31,7 @@
3.4 -export([handshake/1, open_channel/2, close_channel/3, close_connection/3]).
3.5 -export([do/2, do/3]).
3.6 -export([handle_broker_close/1]).
3.7 +-export([handle_channel_death/3]).
3.8
3.9 %---------------------------------------------------------------------------
3.10 % Driver API Methods
3.11 @@ -69,3 +70,6 @@
3.12
3.13 handle_broker_close(_State) ->
3.14 ok.
3.15 +
3.16 +handle_channel_death(_Number, _Reason, _ConnectionState) ->
3.17 + ok.
4.1 --- a/src/amqp_network_driver.erl Fri Sep 25 13:46:28 2009 +0100
4.2 +++ b/src/amqp_network_driver.erl Mon Sep 28 13:04:52 2009 +0100
4.3 @@ -34,10 +34,12 @@
4.4 -export([start_main_reader/2, start_writer/2]).
4.5 -export([do/2, do/3]).
4.6 -export([handle_broker_close/1]).
4.7 +-export([handle_channel_death/3]).
4.8
4.9 -define(SOCKET_CLOSING_TIMEOUT, 1000).
4.10 -define(CLIENT_CLOSE_TIMEOUT, 5000).
4.11 -define(HANDSHAKE_RECEIVE_TIMEOUT, 60000).
4.12 +-define(CHANNEL_CLOSE_TIMEOUT, 5000).
4.13
4.14 %---------------------------------------------------------------------------
4.15 % Driver API Methods
4.16 @@ -74,7 +76,6 @@
4.17 end.
4.18
4.19
4.20 -
4.21 %% The reader runs unaware of the channel number that it is bound to
4.22 %% because this will be parsed out of the frames received off the socket.
4.23 %% Hence, you have tell the main reader which Pids are intended to
4.24 @@ -128,13 +129,61 @@
4.25 erlang:send_after(?SOCKET_CLOSING_TIMEOUT, MainReader,
4.26 socket_closing_timeout).
4.27
4.28 +%% Handles the death of a channel because of an internal error in the client,
4.29 +%% by 'faking' a 'channel.close' to let the server know so it can unregister
4.30 +%% it.
4.31 +%% Starts the framing channel for the channel that died, sends a 'channel.close'
4.32 +%% on behalf of the channel, gets 'channel.close_ok' from the framing
4.33 +%% channel and shuts the framing channel down.
4.34 +handle_channel_death(ChannelNumber, _Reason,
4.35 + #connection_state{sock = Sock,
4.36 + main_reader_pid = MainReaderPid}) ->
4.37 + %% Start and register a framing channel for the channel that died
4.38 + FramingPid = start_framing_channel(),
4.39 + MainReaderPid ! {register_framing_channel_and_signal_back, ChannelNumber,
4.40 + FramingPid, self()},
4.41 + receive
4.42 + {registered_framing_channel, FramingPid} -> ok;
4.43 + {'EXIT', MainReaderPid, DeathReason} -> throw({main_reader_died,
4.44 + DeathReason})
4.45 + end,
4.46 + %% Send a 'channel.close' on behalf of the dead channel
4.47 + rabbit_writer:internal_send_command(
4.48 + Sock, ChannelNumber,
4.49 + #'channel.close'{reply_text = <<"Internal error in channel process">>,
4.50 + reply_code = 550,
4.51 + class_id = 0,
4.52 + method_id = 0}),
4.53 + %% Wait for 'channel.close_ok' and kill the framing channel
4.54 + receive
4.55 + {'$gen_cast', {method, #'channel.close_ok'{}, none}} ->
4.56 + FramingPid ! {'EXIT', self(), normal},
4.57 + receive {'EXIT', FramingPid, _} -> ok end,
4.58 + ok;
4.59 + {'EXIT', FramingPid, Reason} ->
4.60 + throw({reader_died_while_waiting_for_close_ok, Reason})
4.61 + after ?CHANNEL_CLOSE_TIMEOUT ->
4.62 + FramingPid ! {'EXIT', self(), normal},
4.63 + receive {'EXIT', FramingPid, _} -> ok end,
4.64 + %% Clear 'channel.close_ok' message if one did come up in between
4.65 + %% timeout and kill
4.66 + receive {'$gen_cast', {method, #'channel.close_ok'{}, none}} -> ok
4.67 + after 0 -> waiting_for_close_ok_timed_out
4.68 + end
4.69 + end.
4.70 +
4.71 %---------------------------------------------------------------------------
4.72 -% AMQP message sending and receiving
4.73 +% AMQP message receiving
4.74 %---------------------------------------------------------------------------
4.75
4.76 -send_frame(Channel, Frame) ->
4.77 - {framing_pid, FramingPid} = resolve_framing_channel({channel, Channel}),
4.78 - rabbit_framing_channel:process(FramingPid, Frame).
4.79 +pass_frame_to_framing_channel(Channel, Frame) ->
4.80 + case resolve_framing_channel({channel, Channel}) of
4.81 + {framing_pid, FramingPid} ->
4.82 + rabbit_framing_channel:process(FramingPid, Frame);
4.83 + undefined ->
4.84 + ?LOG_INFO("Dropping frame for channel ~p, which is dead or does "
4.85 + "not exist: ~p~n", [Channel, Frame])
4.86 + end.
4.87
4.88 recv(#connection_state{main_reader_pid = MainReaderPid}) ->
4.89 receive
4.90 @@ -243,6 +292,11 @@
4.91 {register_framing_channel, ChannelNumber, FramingPid} ->
4.92 register_framing_channel(ChannelNumber, FramingPid),
4.93 main_reader_loop(Sock, Type, Channel, Length);
4.94 + {register_framing_channel_and_signal_back, ChannelNumber, FramingPid,
4.95 + Caller} ->
4.96 + register_framing_channel(ChannelNumber, FramingPid),
4.97 + Caller ! {registered_framing_channel, FramingPid},
4.98 + main_reader_loop(Sock, Type, Channel, Length);
4.99 timeout ->
4.100 ?LOG_WARN("Reader (~p) received timeout from heartbeat, "
4.101 "exiting ~n", [self()]),
4.102 @@ -282,10 +336,10 @@
4.103 trace ->
4.104 trace;
4.105 {method, Method = 'connection.close_ok', none} ->
4.106 - send_frame(Channel, {method, Method}),
4.107 + pass_frame_to_framing_channel(Channel, {method, Method}),
4.108 closed_ok;
4.109 AnalyzedFrame ->
4.110 - send_frame(Channel, AnalyzedFrame)
4.111 + pass_frame_to_framing_channel(Channel, AnalyzedFrame)
4.112 end.
4.113
4.114 start_framing_channel() ->
5.1 --- a/test/negative_test_util.erl Fri Sep 25 13:46:28 2009 +0100
5.2 +++ b/test/negative_test_util.erl Mon Sep 28 13:04:52 2009 +0100
5.3 @@ -80,13 +80,13 @@
5.4 C2 = amqp_connection:open_channel(Connection),
5.5 Publish = #'basic.publish'{routing_key = <<>>, exchange = <<>>},
5.6 Message = #amqp_msg{props = <<>>, payload = <<>>},
5.7 - %ok = amqp_channel:call(C2, Publish, Message),
5.8 - %?assertNot(is_process_alive(C2)),
5.9 - %?assert(is_process_alive(Connection)),
5.10 - ok.
5.11 - %C3 = amqp_connection:open_channel(Connection),
5.12 - %?assert(is_process_alive(C3)),
5.13 - %test_util:teardown(Connection, C3).
5.14 + ok = amqp_channel:call(C2, Publish, Message),
5.15 + timer:sleep(1000),
5.16 + ?assertNot(is_process_alive(C2)),
5.17 + ?assert(is_process_alive(Connection)),
5.18 + C3 = amqp_connection:open_channel(Connection),
5.19 + ?assert(is_process_alive(C3)),
5.20 + test_util:teardown(Connection, C3).
5.21
5.22
5.23 non_existent_user_test() ->