commiting this only in case we ever change our mind. In this fix, the connection sends a fake channel.close on behalf of the channel and waits for channel.close_ok, or times out bug21172v1
authorVlad Ionescu <vlad@lshift.net>
Mon Sep 28 13:04:52 2009 +0100 (2009-09-28)
branchbug21172v1
changeset 568fd3edf6f1238
parent 567 ba2870ab5783
child 570 9e2f437e26cc
commiting this only in case we ever change our mind. In this fix, the connection sends a fake channel.close on behalf of the channel and waits for channel.close_ok, or times out
src/amqp_channel.erl
src/amqp_connection.erl
src/amqp_direct_driver.erl
src/amqp_network_driver.erl
test/negative_test_util.erl
     1.1 --- a/src/amqp_channel.erl	Fri Sep 25 13:46:28 2009 +0100
     1.2 +++ b/src/amqp_channel.erl	Mon Sep 28 13:04:52 2009 +0100
     1.3 @@ -497,7 +497,7 @@
     1.4              State = #channel_state{number = ChannelNumber,
     1.5                                     writer_pid = WriterPid}) ->
     1.6      ?LOG_WARN("Channel ~p closing: received exit signal from writer. "
     1.7 -             "Reason: ~p~n", [ChannelNumber, Reason]),
     1.8 +              "Reason: ~p~n", [ChannelNumber, Reason]),
     1.9      {stop, {writer_died, WriterPid, Reason}, State};
    1.10  
    1.11  %% Handle reader exit
    1.12 @@ -506,7 +506,7 @@
    1.13              State = #channel_state{number = ChannelNumber,
    1.14                                     reader_pid = ReaderPid}) ->
    1.15      ?LOG_WARN("Channel ~p closing: received exit signal from reader. "
    1.16 -             "Reason: ~p~n", [ChannelNumber, Reason]),
    1.17 +              "Reason: ~p~n", [ChannelNumber, Reason]),
    1.18      {stop, {reader_died, ReaderPid, Reason}, State};
    1.19  
    1.20  %% Handle other exit
    1.21 @@ -514,7 +514,7 @@
    1.22  handle_info({'EXIT', Pid, Reason},
    1.23              State = #channel_state{number = ChannelNumber}) ->
    1.24      ?LOG_WARN("Channel ~p closing: received unexpected exit signal from (~p). "
    1.25 -             "Reason: ~p~n", [ChannelNumber, Pid, Reason]),
    1.26 +              "Reason: ~p~n", [ChannelNumber, Pid, Reason]),
    1.27      {stop, {unexpected_exit, Pid, Reason}, State};
    1.28  
    1.29  %% This is for a channel exception that is sent by the direct
     2.1 --- a/src/amqp_connection.erl	Fri Sep 25 13:46:28 2009 +0100
     2.2 +++ b/src/amqp_connection.erl	Mon Sep 28 13:04:52 2009 +0100
     2.3 @@ -265,6 +265,11 @@
     2.4  is_registered_channel(Channel, #connection_state{channels = Channels}) ->
     2.5      dict:is_key(Channel, Channels).
     2.6  
     2.7 +%% Resolve channel by passing {channel, ChannelNumber} or {chpid, ChannelPid}
     2.8 +resolve_channel(Channel, #connection_state{channels = Channels}) ->
     2.9 +    {ok, Val} = dict:find(Channel, Channels),
    2.10 +    Val.
    2.11 +
    2.12  
    2.13  %%---------------------------------------------------------------------------
    2.14  %% gen_server callbacks
    2.15 @@ -342,7 +347,8 @@
    2.16  
    2.17  %% Handle exit from other pid
    2.18  %% @private
    2.19 -handle_info({'EXIT', Pid, Reason}, State = #connection_state{closing = false}) ->
    2.20 +handle_info({'EXIT', Pid, Reason}, State = #connection_state{closing = false,
    2.21 +                                                             driver = Driver}) ->
    2.22      case {is_registered_channel({chpid, Pid}, State), Reason} of
    2.23          %% Normal amqp_channel shutdown
    2.24          {true, normal} ->
    2.25 @@ -360,7 +366,21 @@
    2.26          {true, _} ->
    2.27              ?LOG_WARN("Connection: Handling exit from channel (~p). "
    2.28                        "Reason: ~p~n", [Pid, Reason]),
    2.29 -            {noreply, unregister_channel({chpid, Pid}, State)};
    2.30 +            {channel, Number} = resolve_channel({chpid, Pid}, State),
    2.31 +            case catch Driver:handle_channel_death(Number, Reason, State) of
    2.32 +                ok ->
    2.33 +                    {noreply, unregister_channel({chpid, Pid}, State)};
    2.34 +                waiting_for_close_ok_timed_out ->
    2.35 +                    ?LOG_WARN("Timed out waiting for channel.close_ok after "
    2.36 +                              "sending channel.close on behalf of dead channel "
    2.37 +                              " (~p)~n", [Pid]),
    2.38 +                    {noreply, unregister_channel({chpid, Pid}, State)};
    2.39 +                {'EXIT', Error} ->
    2.40 +                    ?LOG_WARN("Failed sending 'channel.close' on behalf of "
    2.41 +                              "channel (~p) that died. Reason: ",
    2.42 +                              [Pid, Error]),
    2.43 +                    {stop, {failed_sending_channel_close, Error}, State}
    2.44 +            end;
    2.45          %% Exit signal from unknown pid
    2.46          {false, _} ->
    2.47              ?LOG_WARN("Connection (~p) closing: received unexpected exit signal "
     3.1 --- a/src/amqp_direct_driver.erl	Fri Sep 25 13:46:28 2009 +0100
     3.2 +++ b/src/amqp_direct_driver.erl	Mon Sep 28 13:04:52 2009 +0100
     3.3 @@ -31,6 +31,7 @@
     3.4  -export([handshake/1, open_channel/2, close_channel/3, close_connection/3]).
     3.5  -export([do/2, do/3]).
     3.6  -export([handle_broker_close/1]).
     3.7 +-export([handle_channel_death/3]).
     3.8  
     3.9  %---------------------------------------------------------------------------
    3.10  % Driver API Methods
    3.11 @@ -69,3 +70,6 @@
    3.12  
    3.13  handle_broker_close(_State) ->
    3.14      ok.
    3.15 +
    3.16 +handle_channel_death(_Number, _Reason, _ConnectionState) ->
    3.17 +    ok.
     4.1 --- a/src/amqp_network_driver.erl	Fri Sep 25 13:46:28 2009 +0100
     4.2 +++ b/src/amqp_network_driver.erl	Mon Sep 28 13:04:52 2009 +0100
     4.3 @@ -34,10 +34,12 @@
     4.4  -export([start_main_reader/2, start_writer/2]).
     4.5  -export([do/2, do/3]).
     4.6  -export([handle_broker_close/1]).
     4.7 +-export([handle_channel_death/3]).
     4.8  
     4.9  -define(SOCKET_CLOSING_TIMEOUT, 1000).
    4.10  -define(CLIENT_CLOSE_TIMEOUT, 5000).
    4.11  -define(HANDSHAKE_RECEIVE_TIMEOUT, 60000).
    4.12 +-define(CHANNEL_CLOSE_TIMEOUT, 5000).
    4.13  
    4.14  %---------------------------------------------------------------------------
    4.15  % Driver API Methods
    4.16 @@ -74,7 +76,6 @@
    4.17      end.
    4.18  
    4.19  
    4.20 -
    4.21  %% The reader runs unaware of the channel number that it is bound to
    4.22  %% because this will be parsed out of the frames received off the socket.
    4.23  %% Hence, you have tell the main reader which Pids are intended to
    4.24 @@ -128,13 +129,61 @@
    4.25      erlang:send_after(?SOCKET_CLOSING_TIMEOUT, MainReader,
    4.26                        socket_closing_timeout).
    4.27  
    4.28 +%% Handles the death of a channel because of an internal error in the client,
    4.29 +%% by 'faking' a 'channel.close' to let the server know so it can unregister
    4.30 +%% it.
    4.31 +%% Starts the framing channel for the channel that died, sends a 'channel.close'
    4.32 +%% on behalf of the channel, gets 'channel.close_ok' from the framing
    4.33 +%% channel and shuts the framing channel down.
    4.34 +handle_channel_death(ChannelNumber, _Reason,
    4.35 +                     #connection_state{sock = Sock,
    4.36 +                                       main_reader_pid = MainReaderPid}) ->
    4.37 +    %% Start and register a framing channel for the channel that died
    4.38 +    FramingPid = start_framing_channel(),
    4.39 +    MainReaderPid ! {register_framing_channel_and_signal_back, ChannelNumber,
    4.40 +                     FramingPid, self()},
    4.41 +    receive
    4.42 +        {registered_framing_channel, FramingPid} -> ok;
    4.43 +        {'EXIT', MainReaderPid, DeathReason}     -> throw({main_reader_died,
    4.44 +                                                           DeathReason})
    4.45 +    end,
    4.46 +    %% Send a 'channel.close' on behalf of the dead channel
    4.47 +    rabbit_writer:internal_send_command(
    4.48 +        Sock, ChannelNumber,
    4.49 +        #'channel.close'{reply_text = <<"Internal error in channel process">>,
    4.50 +                         reply_code = 550,
    4.51 +                         class_id = 0,
    4.52 +                         method_id = 0}),
    4.53 +    %% Wait for 'channel.close_ok' and kill the framing channel
    4.54 +    receive
    4.55 +        {'$gen_cast', {method, #'channel.close_ok'{}, none}} ->
    4.56 +            FramingPid ! {'EXIT', self(), normal},
    4.57 +            receive {'EXIT', FramingPid, _}  -> ok end,
    4.58 +            ok;
    4.59 +        {'EXIT', FramingPid, Reason} ->
    4.60 +            throw({reader_died_while_waiting_for_close_ok, Reason})
    4.61 +    after ?CHANNEL_CLOSE_TIMEOUT ->
    4.62 +        FramingPid ! {'EXIT', self(), normal},
    4.63 +        receive {'EXIT', FramingPid, _}  -> ok end,
    4.64 +        %% Clear 'channel.close_ok' message if one did come up in between
    4.65 +        %% timeout and kill
    4.66 +        receive {'$gen_cast', {method, #'channel.close_ok'{}, none}} -> ok
    4.67 +        after 0 -> waiting_for_close_ok_timed_out
    4.68 +        end
    4.69 +    end.
    4.70 +
    4.71  %---------------------------------------------------------------------------
    4.72 -% AMQP message sending and receiving
    4.73 +% AMQP message receiving
    4.74  %---------------------------------------------------------------------------
    4.75  
    4.76 -send_frame(Channel, Frame) ->
    4.77 -    {framing_pid, FramingPid} = resolve_framing_channel({channel, Channel}),
    4.78 -    rabbit_framing_channel:process(FramingPid, Frame).
    4.79 +pass_frame_to_framing_channel(Channel, Frame) ->
    4.80 +    case resolve_framing_channel({channel, Channel}) of
    4.81 +        {framing_pid, FramingPid} ->
    4.82 +            rabbit_framing_channel:process(FramingPid, Frame);
    4.83 +        undefined ->
    4.84 +            ?LOG_INFO("Dropping frame for channel ~p, which is dead or does "
    4.85 +                      "not exist: ~p~n", [Channel, Frame])
    4.86 +    end.
    4.87  
    4.88  recv(#connection_state{main_reader_pid = MainReaderPid}) ->
    4.89      receive
    4.90 @@ -243,6 +292,11 @@
    4.91          {register_framing_channel, ChannelNumber, FramingPid} ->
    4.92              register_framing_channel(ChannelNumber, FramingPid),
    4.93              main_reader_loop(Sock, Type, Channel, Length);
    4.94 +        {register_framing_channel_and_signal_back, ChannelNumber, FramingPid,
    4.95 +         Caller} ->
    4.96 +            register_framing_channel(ChannelNumber, FramingPid),
    4.97 +            Caller ! {registered_framing_channel, FramingPid},
    4.98 +            main_reader_loop(Sock, Type, Channel, Length);
    4.99          timeout ->
   4.100              ?LOG_WARN("Reader (~p) received timeout from heartbeat, "
   4.101                        "exiting ~n", [self()]),
   4.102 @@ -282,10 +336,10 @@
   4.103          trace ->
   4.104              trace;
   4.105          {method, Method = 'connection.close_ok', none} ->
   4.106 -            send_frame(Channel, {method, Method}),
   4.107 +            pass_frame_to_framing_channel(Channel, {method, Method}),
   4.108              closed_ok;
   4.109          AnalyzedFrame ->
   4.110 -            send_frame(Channel, AnalyzedFrame)
   4.111 +            pass_frame_to_framing_channel(Channel, AnalyzedFrame)
   4.112      end.
   4.113  
   4.114  start_framing_channel() ->
     5.1 --- a/test/negative_test_util.erl	Fri Sep 25 13:46:28 2009 +0100
     5.2 +++ b/test/negative_test_util.erl	Mon Sep 28 13:04:52 2009 +0100
     5.3 @@ -80,13 +80,13 @@
     5.4      C2 = amqp_connection:open_channel(Connection),
     5.5      Publish = #'basic.publish'{routing_key = <<>>, exchange = <<>>},
     5.6      Message = #amqp_msg{props = <<>>, payload = <<>>},
     5.7 -    %ok = amqp_channel:call(C2, Publish, Message),
     5.8 -    %?assertNot(is_process_alive(C2)),
     5.9 -    %?assert(is_process_alive(Connection)),
    5.10 -    ok.
    5.11 -    %C3 = amqp_connection:open_channel(Connection),
    5.12 -    %?assert(is_process_alive(C3)),
    5.13 -    %test_util:teardown(Connection, C3).
    5.14 +    ok = amqp_channel:call(C2, Publish, Message),
    5.15 +    timer:sleep(1000),
    5.16 +    ?assertNot(is_process_alive(C2)),
    5.17 +    ?assert(is_process_alive(Connection)),
    5.18 +    C3 = amqp_connection:open_channel(Connection),
    5.19 +    ?assert(is_process_alive(C3)),
    5.20 +    test_util:teardown(Connection, C3).
    5.21      
    5.22  
    5.23  non_existent_user_test() ->