Merging bug23117 to default
authorMatthew Sackman <matthew@rabbitmq.com>
Wed, 27 Jul 2011 16:03:16 +0100
changeset 127568a8204c89ba
parent 1266 d0f7d93d22f2
parent 1274 668abefd149d
child 1276 dddea10e006c
child 1277 576e15cca8c6
Merging bug23117 to default
     1.1 --- a/include/amqp_client.hrl	Mon Jul 18 17:40:41 2011 +0100
     1.2 +++ b/include/amqp_client.hrl	Wed Jul 27 16:03:16 2011 +0100
     1.3 @@ -14,6 +14,9 @@
     1.4  %% Copyright (c) 2007-2011 VMware, Inc.  All rights reserved.
     1.5  %%
     1.6  
     1.7 +-ifndef(AMQP_CLIENT_HRL).
     1.8 +-define(AMQP_CLIENT_HRL, true).
     1.9 +
    1.10  -include_lib("rabbit_common/include/rabbit.hrl").
    1.11  -include_lib("rabbit_common/include/rabbit_framing.hrl").
    1.12  
    1.13 @@ -23,6 +26,7 @@
    1.14  -define(PROTOCOL, rabbit_framing_amqp_0_9_1).
    1.15  
    1.16  -define(MAX_CHANNEL_NUMBER, 65535).
    1.17 +-define(DEFAULT_CONSUMER, {amqp_selective_consumer, []}).
    1.18  
    1.19  -define(PROTOCOL_SSL_PORT, (?PROTOCOL_PORT - 1)).
    1.20  
    1.21 @@ -59,7 +63,10 @@
    1.22  -define(LOG_DEBUG(Format), error_logger:info_msg(Format)).
    1.23  -define(LOG_INFO(Format, Args), error_logger:info_msg(Format, Args)).
    1.24  -define(LOG_WARN(Format, Args), error_logger:warning_msg(Format, Args)).
    1.25 +
    1.26  -define(CLIENT_CAPABILITIES, [{<<"publisher_confirms">>,         bool, true},
    1.27                                {<<"exchange_exchange_bindings">>, bool, true},
    1.28                                {<<"basic.nack">>,                 bool, true},
    1.29                                {<<"consumer_cancel_notify">>,     bool, true}]).
    1.30 +
    1.31 +-endif.
     2.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     2.2 +++ b/include/amqp_gen_consumer_spec.hrl	Wed Jul 27 16:03:16 2011 +0100
     2.3 @@ -0,0 +1,40 @@
     2.4 +%% The contents of this file are subject to the Mozilla Public License
     2.5 +%% Version 1.1 (the "License"); you may not use this file except in
     2.6 +%% compliance with the License. You may obtain a copy of the License at
     2.7 +%% http://www.mozilla.org/MPL/
     2.8 +%%
     2.9 +%% Software distributed under the License is distributed on an "AS IS"
    2.10 +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
    2.11 +%% License for the specific language governing rights and limitations
    2.12 +%% under the License.
    2.13 +%%
    2.14 +%% The Original Code is RabbitMQ.
    2.15 +%%
    2.16 +%% The Initial Developer of the Original Code is VMware, Inc.
    2.17 +%% Copyright (c) 2011-2011 VMware, Inc.  All rights reserved.
    2.18 +%%
    2.19 +
    2.20 +-include("amqp_client.hrl").
    2.21 +
    2.22 +-type(state() :: any()).
    2.23 +-type(consume() :: #'basic.consume'{}).
    2.24 +-type(consume_ok() :: #'basic.consume_ok'{}).
    2.25 +-type(cancel() :: #'basic.cancel'{}).
    2.26 +-type(cancel_ok() :: #'basic.cancel_ok'{}).
    2.27 +-type(deliver() :: #'basic.deliver'{}).
    2.28 +-type(from() :: any()).
    2.29 +-type(reason() :: any()).
    2.30 +-type(ok_error() :: {ok, state()} | {error, reason(), state()}).
    2.31 +
    2.32 +-spec(init/1 :: ([any()]) -> {ok, state()}).
    2.33 +-spec(handle_consume/3 :: (consume(), pid(), state()) -> ok_error()).
    2.34 +-spec(handle_consume_ok/3 :: (consume_ok(), consume(), state()) ->
    2.35 +                                  ok_error()).
    2.36 +-spec(handle_cancel/2 :: (cancel(), state()) -> ok_error()).
    2.37 +-spec(handle_cancel_ok/3 :: (cancel_ok(), cancel(), state()) -> ok_error()).
    2.38 +-spec(handle_deliver/3 :: (deliver(), #amqp_msg{}, state()) -> ok_error()).
    2.39 +-spec(handle_info/2 :: (any(), state()) -> ok_error()).
    2.40 +-spec(handle_call/3 :: (any(), from(), state()) ->
    2.41 +                           {reply, any(), state()} | {noreply, state()} |
    2.42 +                            {error, reason(), state()}).
    2.43 +-spec(terminate/2 :: (any(), state()) -> state()).
     3.1 --- a/rabbit_common.app.in	Mon Jul 18 17:40:41 2011 +0100
     3.2 +++ b/rabbit_common.app.in	Wed Jul 27 16:03:16 2011 +0100
     3.3 @@ -3,6 +3,7 @@
     3.4    {vsn, "%%VSN%%"},
     3.5    {modules, [
     3.6               gen_server2,
     3.7 +             priority_queue,
     3.8               rabbit_backing_queue,
     3.9               rabbit_basic,
    3.10               rabbit_binary_generator,
     4.1 --- a/src/amqp_auth_mechanisms.erl	Mon Jul 18 17:40:41 2011 +0100
     4.2 +++ b/src/amqp_auth_mechanisms.erl	Wed Jul 27 16:03:16 2011 +0100
     4.3 @@ -14,6 +14,7 @@
     4.4  %% Copyright (c) 2007-2011 VMware, Inc.  All rights reserved.
     4.5  %%
     4.6  
     4.7 +%% @private
     4.8  -module(amqp_auth_mechanisms).
     4.9  
    4.10  -include("amqp_client.hrl").
     5.1 --- a/src/amqp_channel.erl	Mon Jul 18 17:40:41 2011 +0100
     5.2 +++ b/src/amqp_channel.erl	Wed Jul 27 16:03:16 2011 +0100
     5.3 @@ -66,27 +66,26 @@
     5.4  
     5.5  -behaviour(gen_server).
     5.6  
     5.7 --export([start_link/4, connection_closing/3, open/1]).
     5.8 --export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
     5.9 -         handle_info/2]).
    5.10  -export([call/2, call/3, cast/2, cast/3]).
    5.11 --export([subscribe/3]).
    5.12  -export([close/1, close/3]).
    5.13  -export([register_return_handler/2, register_flow_handler/2,
    5.14           register_confirm_handler/2]).
    5.15 +-export([call_consumer/2, subscribe/3]).
    5.16  -export([next_publish_seqno/1, wait_for_confirms/1,
    5.17           wait_for_confirms_or_die/1]).
    5.18 --export([register_default_consumer/2]).
    5.19 +-export([start_link/5, connection_closing/3, open/1]).
    5.20 +
    5.21 +-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
    5.22 +         handle_info/2]).
    5.23  
    5.24  -define(TIMEOUT_FLUSH, 60000).
    5.25  -define(TIMEOUT_CLOSE_OK, 3000).
    5.26  
    5.27  -record(state, {number,
    5.28                  connection,
    5.29 +                consumer,
    5.30                  driver,
    5.31                  rpc_requests        = queue:new(),
    5.32 -                anon_sub_requests   = queue:new(),
    5.33 -                tagged_sub_requests = dict:new(),
    5.34                  closing             = false, %% false |
    5.35                                               %%   {just_channel, Reason} |
    5.36                                               %%   {connection, Reason}
    5.37 @@ -96,8 +95,6 @@
    5.38                  next_pub_seqno      = 0,
    5.39                  flow_active         = true,
    5.40                  flow_handler_pid    = none,
    5.41 -                consumers           = dict:new(),
    5.42 -                default_consumer    = none,
    5.43                  start_writer_fun,
    5.44                  unconfirmed_set     = gb_sets:new(),
    5.45                  waiting_set         = gb_trees:empty(),
    5.46 @@ -134,7 +131,7 @@
    5.47  %% @spec (Channel, Method) -> Result
    5.48  %% @doc This is equivalent to amqp_channel:call(Channel, Method, none).
    5.49  call(Channel, Method) ->
    5.50 -    gen_server:call(Channel, {call, Method, none}, infinity).
    5.51 +    gen_server:call(Channel, {call, Method, none, self()}, infinity).
    5.52  
    5.53  %% @spec (Channel, Method, Content) -> Result
    5.54  %% where
    5.55 @@ -157,12 +154,12 @@
    5.56  %% the broker. It does not necessarily imply that the broker has
    5.57  %% accepted responsibility for the message.
    5.58  call(Channel, Method, Content) ->
    5.59 -    gen_server:call(Channel, {call, Method, Content}, infinity).
    5.60 +    gen_server:call(Channel, {call, Method, Content, self()}, infinity).
    5.61  
    5.62  %% @spec (Channel, Method) -> ok
    5.63  %% @doc This is equivalent to amqp_channel:cast(Channel, Method, none).
    5.64  cast(Channel, Method) ->
    5.65 -    gen_server:cast(Channel, {cast, Method, none}).
    5.66 +    gen_server:cast(Channel, {cast, Method, none, self()}).
    5.67  
    5.68  %% @spec (Channel, Method, Content) -> ok
    5.69  %% where
    5.70 @@ -174,7 +171,7 @@
    5.71  %% This function is not recommended with synchronous methods, since there is no
    5.72  %% way to verify that the server has received the method.
    5.73  cast(Channel, Method, Content) ->
    5.74 -    gen_server:cast(Channel, {cast, Method, Content}).
    5.75 +    gen_server:cast(Channel, {cast, Method, Content, self()}).
    5.76  
    5.77  %% @spec (Channel) -> ok | closing
    5.78  %% where
    5.79 @@ -220,25 +217,6 @@
    5.80  wait_for_confirms_or_die(Channel) ->
    5.81      gen_server:call(Channel, {wait_for_confirms_or_die, self()}, infinity).
    5.82  
    5.83 -%%---------------------------------------------------------------------------
    5.84 -%% Consumer registration (API)
    5.85 -%%---------------------------------------------------------------------------
    5.86 -
    5.87 -%% @type consume() = #'basic.consume'{}.
    5.88 -%% The AMQP method that is used to  subscribe a consumer to a queue.
    5.89 -%% @spec (Channel, consume(), Consumer) -> amqp_method()
    5.90 -%% where
    5.91 -%%      Channel = pid()
    5.92 -%%      Consumer = pid()
    5.93 -%% @doc Creates a subscription to a queue. This subscribes a consumer pid to
    5.94 -%% the queue defined in the #'basic.consume'{} method record. Note that
    5.95 -%% both the process invoking this method and the supplied consumer process
    5.96 -%% receive an acknowledgement of the subscription. The calling process will
    5.97 -%% receive the acknowledgement as the return value of this function, whereas
    5.98 -%% the consumer process will receive the notification asynchronously.
    5.99 -subscribe(Channel, BasicConsume = #'basic.consume'{}, Consumer) ->
   5.100 -    gen_server:call(Channel, {subscribe, BasicConsume, Consumer}, infinity).
   5.101 -
   5.102  %% @spec (Channel, ReturnHandler) -> ok
   5.103  %% where
   5.104  %%      Channel = pid()
   5.105 @@ -268,40 +246,34 @@
   5.106  register_flow_handler(Channel, FlowHandler) ->
   5.107      gen_server:cast(Channel, {register_flow_handler, FlowHandler} ).
   5.108  
   5.109 -%% @spec (Channel, Consumer) -> ok
   5.110 +%% @spec (Channel, Msg) -> ok
   5.111  %% where
   5.112  %%      Channel = pid()
   5.113 -%%      Consumer = pid()
   5.114 -%% @doc Set the current default consumer.
   5.115 -%% Under certain circumstances it is possible for a channel to receive a
   5.116 -%% message delivery which does not match any consumer which is currently
   5.117 -%% set up via basic.consume. This will occur after the following sequence
   5.118 -%% of events:<br/>
   5.119 -%% <br/>
   5.120 -%% basic.consume with explicit acks<br/>
   5.121 -%% %% some deliveries take place but are not acked<br/>
   5.122 -%% basic.cancel<br/>
   5.123 -%% basic.recover{requeue = false}<br/>
   5.124 -%% <br/>
   5.125 -%% Since requeue is specified to be false in the basic.recover, the spec
   5.126 -%% states that the message must be redelivered to "the original recipient"
   5.127 -%% - i.e. the same channel / consumer-tag. But the consumer is no longer
   5.128 -%% active.<br/>
   5.129 -%% In these circumstances, you can register a default consumer to handle
   5.130 -%% such deliveries. If no default consumer is registered then the channel
   5.131 -%% will exit on receiving such a delivery.<br/>
   5.132 -%% Most people will not need to use this.
   5.133 -register_default_consumer(Channel, Consumer) ->
   5.134 -    gen_server:cast(Channel, {register_default_consumer, Consumer}).
   5.135 +%%      Msg    = any()
   5.136 +%% @doc This causes the channel to invoke Consumer:handle_call/2,
   5.137 +%% where Consumer is the amqp_gen_consumer implementation registered with
   5.138 +%% the channel.
   5.139 +call_consumer(Channel, Msg) ->
   5.140 +    gen_server:call(Channel, {call_consumer, Msg}, infinity).
   5.141 +
   5.142 +%% @spec (Channel, BasicConsume, Subscriber) -> ok
   5.143 +%% where
   5.144 +%%      Channel = pid()
   5.145 +%%      BasicConsume = amqp_method()
   5.146 +%%      Subscriber = pid()
   5.147 +%% @doc Subscribe the given pid to a queue using the specified
   5.148 +%% basic.consume method.
   5.149 +subscribe(Channel, BasicConsume = #'basic.consume'{}, Subscriber) ->
   5.150 +    gen_server:call(Channel, {subscribe, BasicConsume, Subscriber}, infinity).
   5.151  
   5.152  %%---------------------------------------------------------------------------
   5.153  %% Internal interface
   5.154  %%---------------------------------------------------------------------------
   5.155  
   5.156  %% @private
   5.157 -start_link(Driver, Connection, ChannelNumber, SWF) ->
   5.158 -    gen_server:start_link(?MODULE,
   5.159 -                          [Driver, Connection, ChannelNumber, SWF], []).
   5.160 +start_link(Driver, Connection, ChannelNumber, Consumer, SWF) ->
   5.161 +    gen_server:start_link(
   5.162 +        ?MODULE, [Driver, Connection, ChannelNumber, Consumer, SWF], []).
   5.163  
   5.164  %% @private
   5.165  connection_closing(Pid, ChannelCloseType, Reason) ->
   5.166 @@ -316,24 +288,22 @@
   5.167  %%---------------------------------------------------------------------------
   5.168  
   5.169  %% @private
   5.170 -init([Driver, Connection, ChannelNumber, SWF]) ->
   5.171 +init([Driver, Connection, ChannelNumber, Consumer, SWF]) ->
   5.172      {ok, #state{connection       = Connection,
   5.173                  driver           = Driver,
   5.174                  number           = ChannelNumber,
   5.175 +                consumer         = Consumer,
   5.176                  start_writer_fun = SWF}}.
   5.177  
   5.178  %% @private
   5.179  handle_call(open, From, State) ->
   5.180 -    {noreply, rpc_top_half(#'channel.open'{}, none, From, State)};
   5.181 +    {noreply, rpc_top_half(#'channel.open'{}, none, From, none, State)};
   5.182  %% @private
   5.183  handle_call({close, Code, Text}, From, State) ->
   5.184      handle_close(Code, Text, From, State);
   5.185  %% @private
   5.186 -handle_call({call, Method, AmqpMsg}, From, State) ->
   5.187 -    handle_method_to_server(Method, AmqpMsg, From, State);
   5.188 -%% @private
   5.189 -handle_call({subscribe, Method, Consumer}, From, State) ->
   5.190 -    handle_subscribe(Method, Consumer, From, State);
   5.191 +handle_call({call, Method, AmqpMsg, Sender}, From, State) ->
   5.192 +    handle_method_to_server(Method, AmqpMsg, From, Sender, State);
   5.193  %% Handles the delivery of messages from a direct channel
   5.194  %% @private
   5.195  handle_call({send_command_sync, Method, Content}, From, State) ->
   5.196 @@ -346,8 +316,6 @@
   5.197      Ret = handle_method_from_server(Method, none, State),
   5.198      gen_server:reply(From, ok),
   5.199      Ret;
   5.200 -%% When in confirm mode, returns the sequence number of the next
   5.201 -%% message to be published.
   5.202  %% @private
   5.203  handle_call(next_publish_seqno, _From,
   5.204              State = #state{next_pub_seqno = SeqNo}) ->
   5.205 @@ -359,31 +327,30 @@
   5.206  %% Lets the channel know that the process should be sent an exit
   5.207  %% signal if a nack is received.
   5.208  handle_call({wait_for_confirms_or_die, Pid}, From, State) ->
   5.209 -    handle_wait_for_confirms(From, Pid, ok, State).
   5.210 +    handle_wait_for_confirms(From, Pid, ok, State);
   5.211 +%% @private
   5.212 +handle_call({call_consumer, Msg}, _From,
   5.213 +            State = #state{consumer = Consumer}) ->
   5.214 +    {reply, amqp_gen_consumer:call_consumer(Consumer, Msg), State};
   5.215 +%% @private
   5.216 +handle_call({subscribe, BasicConsume, Subscriber}, From, State) ->
   5.217 +    handle_method_to_server(BasicConsume, none, From, Subscriber, State).
   5.218  
   5.219  %% @private
   5.220 -handle_cast({cast, Method, AmqpMsg}, State) ->
   5.221 -    handle_method_to_server(Method, AmqpMsg, none, State);
   5.222 -%% Registers a handler to process return messages
   5.223 +handle_cast({cast, Method, AmqpMsg, Sender}, State) ->
   5.224 +    handle_method_to_server(Method, AmqpMsg, none, Sender, State);
   5.225  %% @private
   5.226  handle_cast({register_return_handler, ReturnHandler}, State) ->
   5.227      erlang:monitor(process, ReturnHandler),
   5.228      {noreply, State#state{return_handler_pid = ReturnHandler}};
   5.229 -%% Registers a handler to process ack and nack messages
   5.230  %% @private
   5.231  handle_cast({register_confirm_handler, ConfirmHandler}, State) ->
   5.232      erlang:monitor(process, ConfirmHandler),
   5.233      {noreply, State#state{confirm_handler_pid = ConfirmHandler}};
   5.234 -%% Registers a handler to process flow control messages
   5.235  %% @private
   5.236  handle_cast({register_flow_handler, FlowHandler}, State) ->
   5.237      erlang:monitor(process, FlowHandler),
   5.238      {noreply, State#state{flow_handler_pid = FlowHandler}};
   5.239 -%% Registers a handler to process unexpected deliveries
   5.240 -%% @private
   5.241 -handle_cast({register_default_consumer, Consumer}, State) ->
   5.242 -    erlang:monitor(process, Consumer),
   5.243 -    {noreply, State#state{default_consumer = Consumer}};
   5.244  %% Received from channels manager
   5.245  %% @private
   5.246  handle_cast({method, Method, Content}, State) ->
   5.247 @@ -447,17 +414,11 @@
   5.248              State = #state{flow_handler_pid = FlowHandler}) ->
   5.249      ?LOG_WARN("Channel (~p): Unregistering flow handler ~p because it died. "
   5.250                "Reason: ~p~n", [self(), FlowHandler, Reason]),
   5.251 -    {noreply, State#state{flow_handler_pid = none}};
   5.252 -%% @private
   5.253 -handle_info({'DOWN', _, process, DefaultConsumer, Reason},
   5.254 -            State = #state{default_consumer = DefaultConsumer}) ->
   5.255 -    ?LOG_WARN("Channel (~p): Unregistering default consumer ~p because it died."
   5.256 -              "Reason: ~p~n", [self(), DefaultConsumer, Reason]),
   5.257 -    {noreply, State#state{default_consumer = none}}.
   5.258 +    {noreply, State#state{flow_handler_pid = none}}.
   5.259  
   5.260  %% @private
   5.261 -terminate(_Reason, _State) ->
   5.262 -    ok.
   5.263 +terminate(_Reason, State) ->
   5.264 +    State.
   5.265  
   5.266  %% @private
   5.267  code_change(_OldVsn, State, _Extra) ->
   5.268 @@ -467,7 +428,7 @@
   5.269  %% RPC mechanism
   5.270  %%---------------------------------------------------------------------------
   5.271  
   5.272 -handle_method_to_server(Method, AmqpMsg, From,
   5.273 +handle_method_to_server(Method, AmqpMsg, From, Sender,
   5.274                          State = #state{unconfirmed_set = USet}) ->
   5.275      case {check_invalid_method(Method), From,
   5.276            check_block(Method, AmqpMsg, State)} of
   5.277 @@ -484,8 +445,8 @@
   5.278                           _ ->
   5.279                               State
   5.280                       end,
   5.281 -            {noreply,
   5.282 -             rpc_top_half(Method, build_content(AmqpMsg), From, State1)};
   5.283 +            {noreply, rpc_top_half(Method, build_content(AmqpMsg),
   5.284 +                                   From, Sender, State1)};
   5.285          {ok, none, BlockReply} ->
   5.286              ?LOG_WARN("Channel (~p): discarding method ~p in cast.~n"
   5.287                        "Reason: ~p~n", [self(), Method, BlockReply]),
   5.288 @@ -506,53 +467,22 @@
   5.289                               class_id   = 0,
   5.290                               method_id  = 0},
   5.291      case check_block(Close, none, State) of
   5.292 -        ok         -> {noreply, rpc_top_half(Close, none, From, State)};
   5.293 +        ok         -> {noreply, rpc_top_half(Close, none, From, none, State)};
   5.294          BlockReply -> {reply, BlockReply, State}
   5.295      end.
   5.296  
   5.297 -handle_subscribe(#'basic.consume'{consumer_tag = Tag, nowait = NoWait} = Method,
   5.298 -                 Consumer,
   5.299 -                 From, State = #state{tagged_sub_requests = Tagged,
   5.300 -                                      anon_sub_requests   = Anon,
   5.301 -                                      consumers           = Consumers}) ->
   5.302 -    case check_block(Method, none, State) of
   5.303 -        ok when Tag =:= undefined orelse size(Tag) == 0 ->
   5.304 -            case NoWait of
   5.305 -                true ->
   5.306 -                    {reply, {error, command_invalid}, State};
   5.307 -                false ->
   5.308 -                    NewMethod = Method#'basic.consume'{consumer_tag = <<"">>},
   5.309 -                    NewState = State#state{anon_sub_requests =
   5.310 -                                               queue:in(Consumer, Anon)},
   5.311 -                    {noreply, rpc_top_half(NewMethod, none, From, NewState)}
   5.312 -            end;
   5.313 -        ok when is_binary(Tag) andalso size(Tag) >= 0 ->
   5.314 -            case dict:is_key(Tag, Tagged) orelse dict:is_key(Tag, Consumers) of
   5.315 -                true ->
   5.316 -                    {reply, {error, consumer_tag_already_in_use}, State};
   5.317 -                false when NoWait ->
   5.318 -                    NewState = register_consumer(Tag, Consumer, State),
   5.319 -                    {reply, ok, rpc_top_half(Method, none, none, NewState)};
   5.320 -                false ->
   5.321 -                    NewState = State#state{tagged_sub_requests =
   5.322 -                                             dict:store(Tag, Consumer, Tagged)},
   5.323 -                    {noreply, rpc_top_half(Method, none, From, NewState)}
   5.324 -            end;
   5.325 -        BlockReply ->
   5.326 -            {reply, BlockReply, State}
   5.327 -    end.
   5.328 -
   5.329 -rpc_top_half(Method, Content, From,
   5.330 +rpc_top_half(Method, Content, From, Sender,
   5.331               State0 = #state{rpc_requests = RequestQueue}) ->
   5.332      State1 = State0#state{
   5.333 -        rpc_requests = queue:in({From, Method, Content}, RequestQueue)},
   5.334 +        rpc_requests =
   5.335 +                   queue:in({From, Sender, Method, Content}, RequestQueue)},
   5.336      IsFirstElement = queue:is_empty(RequestQueue),
   5.337      if IsFirstElement -> do_rpc(State1);
   5.338         true           -> State1
   5.339      end.
   5.340  
   5.341  rpc_bottom_half(Reply, State = #state{rpc_requests = RequestQueue}) ->
   5.342 -    {{value, {From, _Method, _Content}}, RequestQueue1} =
   5.343 +    {{value, {From, _Sender, _Method, _Content}}, RequestQueue1} =
   5.344          queue:out(RequestQueue),
   5.345      case From of
   5.346          none -> ok;
   5.347 @@ -563,8 +493,8 @@
   5.348  do_rpc(State = #state{rpc_requests = Q,
   5.349                        closing      = Closing}) ->
   5.350      case queue:out(Q) of
   5.351 -        {{value, {From, Method, Content}}, NewQ} ->
   5.352 -            State1 = pre_do(Method, Content, State),
   5.353 +        {{value, {From, Sender, Method, Content}}, NewQ} ->
   5.354 +            State1 = pre_do(Method, Content, Sender, State),
   5.355              DoRet = do(Method, Content, State1),
   5.356              case ?PROTOCOL:is_method_synchronous(Method) of
   5.357                  true  -> State1;
   5.358 @@ -588,12 +518,19 @@
   5.359              State#state{rpc_requests = NewQ}
   5.360      end.
   5.361  
   5.362 -pre_do(#'channel.open'{}, none, State) ->
   5.363 +pending_rpc_method(#state{rpc_requests = Q}) ->
   5.364 +    {value, {_From, _Sender, Method, _Content}} = queue:peek(Q),
   5.365 +    Method.
   5.366 +
   5.367 +pre_do(#'channel.open'{}, none, _Sender, State) ->
   5.368      start_writer(State);
   5.369  pre_do(#'channel.close'{reply_code = Code, reply_text = Text}, none,
   5.370 -       State) ->
   5.371 +       _Sender, State) ->
   5.372      State#state{closing = {just_channel, {app_initiated_close, Code, Text}}};
   5.373 -pre_do(_, _, State) ->
   5.374 +pre_do(#'basic.consume'{} = Method, none, Sender, State) ->
   5.375 +    ok = call_to_consumer(Method, Sender, State),
   5.376 +    State;
   5.377 +pre_do(_, _, _, State) ->
   5.378      State.
   5.379  
   5.380  %%---------------------------------------------------------------------------
   5.381 @@ -651,29 +588,20 @@
   5.382          {connection, Reason} ->
   5.383              handle_shutdown({connection_closing, Reason}, State)
   5.384      end;
   5.385 -handle_method_from_server1(
   5.386 -        #'basic.consume_ok'{consumer_tag = ConsumerTag} = ConsumeOk,
   5.387 -        none, State = #state{tagged_sub_requests = Tagged,
   5.388 -                             anon_sub_requests = Anon}) ->
   5.389 -    {Consumer, State0} =
   5.390 -        case dict:find(ConsumerTag, Tagged) of
   5.391 -            {ok, C} ->
   5.392 -                NewTagged = dict:erase(ConsumerTag, Tagged),
   5.393 -                {C, State#state{tagged_sub_requests = NewTagged}};
   5.394 -            error ->
   5.395 -                {{value, C}, NewAnon} = queue:out(Anon),
   5.396 -                {C, State#state{anon_sub_requests = NewAnon}}
   5.397 -        end,
   5.398 -    Consumer ! ConsumeOk,
   5.399 -    State1 = register_consumer(ConsumerTag, Consumer, State0),
   5.400 -    {noreply, rpc_bottom_half(ConsumeOk, State1)};
   5.401 -handle_method_from_server1(
   5.402 -        #'basic.cancel_ok'{consumer_tag = ConsumerTag} = CancelOk, none,
   5.403 -        State) ->
   5.404 -    Consumer = resolve_consumer(ConsumerTag, State),
   5.405 -    Consumer ! CancelOk,
   5.406 -    NewState = unregister_consumer(ConsumerTag, State),
   5.407 -    {noreply, rpc_bottom_half(CancelOk, NewState)};
   5.408 +handle_method_from_server1(#'basic.consume_ok'{} = ConsumeOk, none, State) ->
   5.409 +    Consume = #'basic.consume'{} = pending_rpc_method(State),
   5.410 +    ok = call_to_consumer(ConsumeOk, Consume, State),
   5.411 +    {noreply, rpc_bottom_half(ConsumeOk, State)};
   5.412 +handle_method_from_server1(#'basic.cancel_ok'{} = CancelOk, none, State) ->
   5.413 +    Cancel = #'basic.cancel'{} = pending_rpc_method(State),
   5.414 +    ok = call_to_consumer(CancelOk, Cancel, State),
   5.415 +    {noreply, rpc_bottom_half(CancelOk, State)};
   5.416 +handle_method_from_server1(#'basic.cancel'{} = Cancel, none, State) ->
   5.417 +    ok = call_to_consumer(Cancel, none, State),
   5.418 +    {noreply, State};
   5.419 +handle_method_from_server1(#'basic.deliver'{} = Deliver, AmqpMsg, State) ->
   5.420 +    ok = call_to_consumer(Deliver, AmqpMsg, State),
   5.421 +    {noreply, State};
   5.422  handle_method_from_server1(#'channel.flow'{active = Active} = Flow, none,
   5.423                             State = #state{flow_handler_pid = FlowHandler}) ->
   5.424      case FlowHandler of none -> ok;
   5.425 @@ -683,13 +611,7 @@
   5.426      %% flushed beforehand. Methods that made it to the queue are not
   5.427      %% blocked in any circumstance.
   5.428      {noreply, rpc_top_half(#'channel.flow_ok'{active = Active}, none, none,
   5.429 -                           State#state{flow_active = Active})};
   5.430 -handle_method_from_server1(
   5.431 -        #'basic.deliver'{consumer_tag = ConsumerTag} = Deliver, AmqpMsg,
   5.432 -        State) ->
   5.433 -    Consumer = resolve_consumer(ConsumerTag, State),
   5.434 -    Consumer ! {Deliver, AmqpMsg},
   5.435 -    {noreply, State};
   5.436 +                           none, State#state{flow_active = Active})};
   5.437  handle_method_from_server1(
   5.438          #'basic.return'{} = BasicReturn, AmqpMsg,
   5.439          State = #state{return_handler_pid = ReturnHandler}) ->
   5.440 @@ -700,19 +622,14 @@
   5.441          _    -> ReturnHandler ! {BasicReturn, AmqpMsg}
   5.442      end,
   5.443      {noreply, State};
   5.444 -handle_method_from_server1(#'basic.cancel'{consumer_tag = ConsumerTag} = Death,
   5.445 -                           none, State) ->
   5.446 -    Consumer = resolve_consumer(ConsumerTag, State),
   5.447 -    Consumer ! Death,
   5.448 -    NewState = unregister_consumer(ConsumerTag, State),
   5.449 -    {noreply, NewState};
   5.450  handle_method_from_server1(#'basic.ack'{} = BasicAck, none,
   5.451                             #state{confirm_handler_pid = none} = State) ->
   5.452      ?LOG_WARN("Channel (~p): received ~p but there is no "
   5.453                "confirm handler registered~n", [self(), BasicAck]),
   5.454      {noreply, update_confirm_set(BasicAck, State)};
   5.455 -handle_method_from_server1(#'basic.ack'{} = BasicAck, none,
   5.456 -                           #state{confirm_handler_pid = ConfirmHandler} = State) ->
   5.457 +handle_method_from_server1(
   5.458 +        #'basic.ack'{} = BasicAck, none,
   5.459 +        #state{confirm_handler_pid = ConfirmHandler} = State) ->
   5.460      ConfirmHandler ! BasicAck,
   5.461      {noreply, update_confirm_set(BasicAck, State)};
   5.462  handle_method_from_server1(#'basic.nack'{} = BasicNack, none,
   5.463 @@ -720,8 +637,9 @@
   5.464      ?LOG_WARN("Channel (~p): received ~p but there is no "
   5.465                "confirm handler registered~n", [self(), BasicNack]),
   5.466      {noreply, update_confirm_set(BasicNack, handle_nack(State))};
   5.467 -handle_method_from_server1(#'basic.nack'{} = BasicNack, none,
   5.468 -                           #state{confirm_handler_pid = ConfirmHandler} = State) ->
   5.469 +handle_method_from_server1(
   5.470 +        #'basic.nack'{} = BasicNack, none,
   5.471 +        #state{confirm_handler_pid = ConfirmHandler} = State) ->
   5.472      ConfirmHandler ! BasicNack,
   5.473      {noreply, update_confirm_set(BasicNack, handle_nack(State))};
   5.474  
   5.475 @@ -798,30 +716,6 @@
   5.476      {ok, Writer} = SWF(),
   5.477      State#state{writer = Writer}.
   5.478  
   5.479 -resolve_consumer(_ConsumerTag, #state{consumers = []}) ->
   5.480 -    exit(no_consumers_registered);
   5.481 -resolve_consumer(ConsumerTag, #state{consumers = Consumers,
   5.482 -                                     default_consumer = DefaultConsumer}) ->
   5.483 -    case dict:find(ConsumerTag, Consumers) of
   5.484 -        {ok, Value} ->
   5.485 -            Value;
   5.486 -        error ->
   5.487 -            case is_pid(DefaultConsumer) of
   5.488 -                true  -> DefaultConsumer;
   5.489 -                false -> exit(unexpected_delivery_and_no_default_consumer)
   5.490 -            end
   5.491 -    end.
   5.492 -
   5.493 -register_consumer(ConsumerTag, Consumer,
   5.494 -                  State = #state{consumers = Consumers0}) ->
   5.495 -    Consumers1 = dict:store(ConsumerTag, Consumer, Consumers0),
   5.496 -    State#state{consumers = Consumers1}.
   5.497 -
   5.498 -unregister_consumer(ConsumerTag,
   5.499 -                    State = #state{consumers = Consumers0}) ->
   5.500 -    Consumers1 = dict:erase(ConsumerTag, Consumers0),
   5.501 -    State#state{consumers = Consumers1}.
   5.502 -
   5.503  amqp_msg(none) ->
   5.504      none;
   5.505  amqp_msg(Content) ->
   5.506 @@ -849,8 +743,6 @@
   5.507       "Use amqp_connection:open_channel/{1,2} instead"};
   5.508  check_invalid_method(#'channel.close'{}) ->
   5.509      {use_close_function, "Use close/{1,3} instead"};
   5.510 -check_invalid_method(#'basic.consume'{}) ->
   5.511 -    {use_subscribe_function, "Use subscribe/3 instead"};
   5.512  check_invalid_method(Method) ->
   5.513      case is_connection_method(Method) of
   5.514          true  -> {connection_methods_not_allowed,
   5.515 @@ -912,3 +804,6 @@
   5.516          false -> {noreply, State#state{waiting_set =
   5.517                                             gb_trees:insert(From, Notify, WSet)}}
   5.518      end.
   5.519 +
   5.520 +call_to_consumer(Method, Args, #state{consumer = Consumer}) ->
   5.521 +    amqp_gen_consumer:call_consumer(Consumer, Method, Args).
     6.1 --- a/src/amqp_channel_sup.erl	Mon Jul 18 17:40:41 2011 +0100
     6.2 +++ b/src/amqp_channel_sup.erl	Wed Jul 27 16:03:16 2011 +0100
     6.3 @@ -21,21 +21,22 @@
     6.4  
     6.5  -behaviour(supervisor2).
     6.6  
     6.7 --export([start_link/4]).
     6.8 +-export([start_link/5]).
     6.9  -export([init/1]).
    6.10  
    6.11  %%---------------------------------------------------------------------------
    6.12  %% Interface
    6.13  %%---------------------------------------------------------------------------
    6.14  
    6.15 -start_link(Type, Connection, InfraArgs, ChNumber) ->
    6.16 -    {ok, Sup} = supervisor2:start_link(?MODULE, []),
    6.17 +start_link(Type, Connection, InfraArgs, ChNumber, Consumer = {_, _}) ->
    6.18 +    {ok, Sup} = supervisor2:start_link(?MODULE, [Consumer]),
    6.19 +    [{gen_consumer, ConsumerPid, _, _}] = supervisor2:which_children(Sup),
    6.20      {ok, ChPid} = supervisor2:start_child(
    6.21                      Sup, {channel, {amqp_channel, start_link,
    6.22 -                                    [Type, Connection, ChNumber,
    6.23 +                                    [Type, Connection, ChNumber, ConsumerPid,
    6.24                                       start_writer_fun(Sup, Type, InfraArgs,
    6.25                                                        ChNumber)]},
    6.26 -                          intrinsic, brutal_kill, worker, [amqp_channel]}),
    6.27 +                          intrinsic, ?MAX_WAIT, worker, [amqp_channel]}),
    6.28      {ok, AState} = init_command_assembler(Type),
    6.29      {ok, Sup, {ChPid, AState}}.
    6.30  
    6.31 @@ -60,7 +61,7 @@
    6.32                          {writer, {rabbit_writer, start_link,
    6.33                                    [Sock, ChNumber, ?FRAME_MIN_SIZE, ?PROTOCOL,
    6.34                                     self()]},
    6.35 -                         transient, ?MAX_WAIT, worker, [rabbit_writer]})
    6.36 +                         intrinsic, ?MAX_WAIT, worker, [rabbit_writer]})
    6.37      end.
    6.38  
    6.39  init_command_assembler(direct)  -> {ok, none};
    6.40 @@ -70,5 +71,8 @@
    6.41  %% supervisor2 callbacks
    6.42  %%---------------------------------------------------------------------------
    6.43  
    6.44 -init([]) ->
    6.45 -    {ok, {{one_for_all, 0, 1}, []}}.
    6.46 +init([{ConsumerModule, ConsumerArgs}]) ->
    6.47 +    {ok, {{one_for_all, 0, 1},
    6.48 +          [{gen_consumer, {amqp_gen_consumer, start_link,
    6.49 +                           [ConsumerModule, ConsumerArgs]},
    6.50 +           intrinsic, ?MAX_WAIT, worker, [amqp_gen_consumer]}]}}.
     7.1 --- a/src/amqp_channel_sup_sup.erl	Mon Jul 18 17:40:41 2011 +0100
     7.2 +++ b/src/amqp_channel_sup_sup.erl	Wed Jul 27 16:03:16 2011 +0100
     7.3 @@ -21,7 +21,7 @@
     7.4  
     7.5  -behaviour(supervisor2).
     7.6  
     7.7 --export([start_link/2, start_channel_sup/3]).
     7.8 +-export([start_link/2, start_channel_sup/4]).
     7.9  -export([init/1]).
    7.10  
    7.11  %%---------------------------------------------------------------------------
    7.12 @@ -31,8 +31,8 @@
    7.13  start_link(Type, Connection) ->
    7.14      supervisor2:start_link(?MODULE, [Type, Connection]).
    7.15  
    7.16 -start_channel_sup(Sup, InfraArgs, ChannelNumber) ->
    7.17 -    supervisor2:start_child(Sup, [InfraArgs, ChannelNumber]).
    7.18 +start_channel_sup(Sup, InfraArgs, ChannelNumber, Consumer) ->
    7.19 +    supervisor2:start_child(Sup, [InfraArgs, ChannelNumber, Consumer]).
    7.20  
    7.21  %%---------------------------------------------------------------------------
    7.22  %% supervisor2 callbacks
     8.1 --- a/src/amqp_channels_manager.erl	Mon Jul 18 17:40:41 2011 +0100
     8.2 +++ b/src/amqp_channels_manager.erl	Wed Jul 27 16:03:16 2011 +0100
     8.3 @@ -21,7 +21,7 @@
     8.4  
     8.5  -behaviour(gen_server).
     8.6  
     8.7 --export([start_link/2, open_channel/3, set_channel_max/2, is_empty/1,
     8.8 +-export([start_link/2, open_channel/4, set_channel_max/2, is_empty/1,
     8.9           num_channels/1, pass_frame/3, signal_connection_closing/3]).
    8.10  -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
    8.11           handle_info/2]).
    8.12 @@ -40,8 +40,9 @@
    8.13  start_link(Connection, ChSupSup) ->
    8.14      gen_server:start_link(?MODULE, [Connection, ChSupSup], []).
    8.15  
    8.16 -open_channel(ChMgr, ProposedNumber, InfraArgs) ->
    8.17 -    gen_server:call(ChMgr, {open_channel, ProposedNumber, InfraArgs}, infinity).
    8.18 +open_channel(ChMgr, ProposedNumber, Consumer, InfraArgs) ->
    8.19 +    gen_server:call(ChMgr, {open_channel, ProposedNumber, Consumer, InfraArgs},
    8.20 +                    infinity).
    8.21  
    8.22  set_channel_max(ChMgr, ChannelMax) ->
    8.23      gen_server:cast(ChMgr, {set_channel_max, ChannelMax}).
    8.24 @@ -71,9 +72,9 @@
    8.25  code_change(_OldVsn, State, _Extra) ->
    8.26      State.
    8.27  
    8.28 -handle_call({open_channel, ProposedNumber, InfraArgs}, _,
    8.29 +handle_call({open_channel, ProposedNumber, Consumer, InfraArgs}, _,
    8.30              State = #state{closing = false}) ->
    8.31 -    handle_open_channel(ProposedNumber, InfraArgs, State);
    8.32 +    handle_open_channel(ProposedNumber, Consumer, InfraArgs, State);
    8.33  handle_call(is_empty, _, State) ->
    8.34      {reply, internal_is_empty(State), State};
    8.35  handle_call(num_channels, _, State) ->
    8.36 @@ -93,13 +94,13 @@
    8.37  %% Internal plumbing
    8.38  %%---------------------------------------------------------------------------
    8.39  
    8.40 -handle_open_channel(ProposedNumber, InfraArgs,
    8.41 +handle_open_channel(ProposedNumber, Consumer, InfraArgs,
    8.42                      State = #state{channel_sup_sup = ChSupSup}) ->
    8.43      case new_number(ProposedNumber, State) of
    8.44          {ok, Number} ->
    8.45              {ok, _ChSup, {Ch, AState}} =
    8.46                  amqp_channel_sup_sup:start_channel_sup(ChSupSup, InfraArgs,
    8.47 -                                                       Number),
    8.48 +                                                       Number, Consumer),
    8.49              NewState = internal_register(Number, Ch, AState, State),
    8.50              erlang:monitor(process, Ch),
    8.51              {reply, {ok, Ch}, NewState};
     9.1 --- a/src/amqp_connection.erl	Mon Jul 18 17:40:41 2011 +0100
     9.2 +++ b/src/amqp_connection.erl	Wed Jul 27 16:03:16 2011 +0100
     9.3 @@ -69,7 +69,7 @@
     9.4  
     9.5  -include("amqp_client.hrl").
     9.6  
     9.7 --export([open_channel/1, open_channel/2]).
     9.8 +-export([open_channel/1, open_channel/2, open_channel/3]).
     9.9  -export([start/1]).
    9.10  -export([close/1, close/3]).
    9.11  -export([info/2, info_keys/1, info_keys/0]).
    9.12 @@ -155,25 +155,46 @@
    9.13  %% Commands
    9.14  %%---------------------------------------------------------------------------
    9.15  
    9.16 -%% @doc Invokes open_channel(ConnectionPid, none).
    9.17 +%% @doc Invokes open_channel(ConnectionPid, none, ?DEFAULT_CONSUMER).
    9.18 +%% Opens a channel without having to specify a channel number. This uses the
    9.19 +%% default consumer implementation.
    9.20 +open_channel(ConnectionPid) ->
    9.21 +    open_channel(ConnectionPid, none, ?DEFAULT_CONSUMER).
    9.22 +
    9.23 +%% @doc Invokes open_channel(ConnectionPid, none, Consumer).
    9.24  %% Opens a channel without having to specify a channel number.
    9.25 -open_channel(ConnectionPid) ->
    9.26 -    open_channel(ConnectionPid, none).
    9.27 +open_channel(ConnectionPid, {_, _} = Consumer) ->
    9.28 +    open_channel(ConnectionPid, none, Consumer);
    9.29  
    9.30 -%% @spec (ConnectionPid, ChannelNumber) -> {ok, ChannelPid} | {error, Error}
    9.31 +%% @doc Invokes open_channel(ConnectionPid, ChannelNumber, ?DEFAULT_CONSUMER).
    9.32 +%% Opens a channel, using the default consumer implementation.
    9.33 +open_channel(ConnectionPid, ChannelNumber)
    9.34 +        when is_number(ChannelNumber) orelse ChannelNumber =:= none ->
    9.35 +    open_channel(ConnectionPid, ChannelNumber, ?DEFAULT_CONSUMER).
    9.36 +
    9.37 +%% @spec (ConnectionPid, ChannelNumber, Consumer) -> Result
    9.38  %% where
    9.39 +%%      ConnectionPid = pid()
    9.40  %%      ChannelNumber = pos_integer() | 'none'
    9.41 -%%      ConnectionPid = pid()
    9.42 +%%      Consumer = {ConsumerModule, ConsumerArgs}
    9.43 +%%      ConsumerModule = atom()
    9.44 +%%      ConsumerArgs = [any()]
    9.45 +%%      Result = {ok, ChannelPid} | {error, Error}
    9.46  %%      ChannelPid = pid()
    9.47  %% @doc Opens an AMQP channel.<br/>
    9.48 +%% Opens a channel, using a proposed channel number and a specific consumer
    9.49 +%% implementation.<br/>
    9.50 +%% ConsumerModule must implement the amqp_gen_consumer behaviour. ConsumerArgs
    9.51 +%% is passed as parameter to ConsumerModule:init/1.<br/>
    9.52  %% This function assumes that an AMQP connection (networked or direct)
    9.53  %% has already been successfully established.<br/>
    9.54  %% ChannelNumber must be less than or equal to the negotiated max_channel value,
    9.55  %% or less than or equal to ?MAX_CHANNEL_NUMBER if the negotiated max_channel
    9.56  %% value is 0.<br/>
    9.57  %% In the direct connection, max_channel is always 0.
    9.58 -open_channel(ConnectionPid, ChannelNumber) ->
    9.59 -    amqp_gen_connection:open_channel(ConnectionPid, ChannelNumber).
    9.60 +open_channel(ConnectionPid, ChannelNumber,
    9.61 +             {_ConsumerModule, _ConsumerArgs} = Consumer) ->
    9.62 +    amqp_gen_connection:open_channel(ConnectionPid, ChannelNumber, Consumer).
    9.63  
    9.64  %% @spec (ConnectionPid) -> ok | Error
    9.65  %% where
    10.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
    10.2 +++ b/src/amqp_direct_consumer.erl	Wed Jul 27 16:03:16 2011 +0100
    10.3 @@ -0,0 +1,99 @@
    10.4 +%% The contents of this file are subject to the Mozilla Public License
    10.5 +%% Version 1.1 (the "License"); you may not use this file except in
    10.6 +%% compliance with the License. You may obtain a copy of the License at
    10.7 +%% http://www.mozilla.org/MPL/
    10.8 +%%
    10.9 +%% Software distributed under the License is distributed on an "AS IS"
   10.10 +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
   10.11 +%% License for the specific language governing rights and limitations
   10.12 +%% under the License.
   10.13 +%%
   10.14 +%% The Original Code is RabbitMQ.
   10.15 +%%
   10.16 +%% The Initial Developer of the Original Code is VMware, Inc.
   10.17 +%% Copyright (c) 2011-2011 VMware, Inc.  All rights reserved.
   10.18 +%%
   10.19 +
   10.20 +%% @doc This module is an implementation of the amqp_gen_consumer
   10.21 +%% behaviour and can be used as part of the Consumer parameter when
   10.22 +%% opening AMQP channels.
   10.23 +%% <br/>
   10.24 +%% <br/>
   10.25 +%% The Consumer parameter for this implementation is {{@module},
   10.26 +%% [ConsumerPid]@}, where ConsumerPid is a process that will receive
   10.27 +%% queue subscription-related messages.<br/>
   10.28 +%% <br/>
   10.29 +%% This consumer implementation causes the channel to send to the
   10.30 +%% ConsumerPid all basic.consume, basic.consume_ok, basic.cancel,
   10.31 +%% basic.cancel_ok and basic.deliver messages received from the
   10.32 +%% server.
   10.33 +%% <br/>
   10.34 +%% <br/>
   10.35 +%% In addition, this consumer implementation monitors the ConsumerPid
   10.36 +%% and exits with the same shutdown reason when it dies.  'DOWN'
   10.37 +%% messages from other sources are passed to ConsumerPid.
   10.38 +%% <br/>
   10.39 +%% Warning! It is not recommended to rely on a consumer on killing off the
   10.40 +%% channel (through the exit signal). That may cause messages to get lost.
   10.41 +%% Always use amqp_channel:close/{1,3} for a clean shut down.<br/>
   10.42 +%% <br/>
   10.43 +%% This module has no public functions.
   10.44 +-module(amqp_direct_consumer).
   10.45 +
   10.46 +-include("amqp_gen_consumer_spec.hrl").
   10.47 +
   10.48 +-behaviour(amqp_gen_consumer).
   10.49 +
   10.50 +-export([init/1, handle_consume_ok/3, handle_consume/3, handle_cancel_ok/3,
   10.51 +         handle_cancel/2, handle_deliver/3, handle_info/2, handle_call/3,
   10.52 +         terminate/2]).
   10.53 +
   10.54 +%%---------------------------------------------------------------------------
   10.55 +%% amqp_gen_consumer callbacks
   10.56 +%%---------------------------------------------------------------------------
   10.57 +
   10.58 +%% @private
   10.59 +init([ConsumerPid]) ->
   10.60 +    monitor(process, ConsumerPid),
   10.61 +    {ok, ConsumerPid}.
   10.62 +
   10.63 +%% @private
   10.64 +handle_consume(M, A, C) ->
   10.65 +    C ! {M, A},
   10.66 +    {ok, C}.
   10.67 +
   10.68 +%% @private
   10.69 +handle_consume_ok(M, _, C) ->
   10.70 +    C ! M,
   10.71 +    {ok, C}.
   10.72 +
   10.73 +%% @private
   10.74 +handle_cancel(M, C) ->
   10.75 +    C ! M,
   10.76 +    {ok, C}.
   10.77 +
   10.78 +%% @private
   10.79 +handle_cancel_ok(M, _, C) ->
   10.80 +    C ! M,
   10.81 +    {ok, C}.
   10.82 +
   10.83 +%% @private
   10.84 +handle_deliver(M, A, C) ->
   10.85 +    C ! {M, A},
   10.86 +    {ok, C}.
   10.87 +
   10.88 +%% @private
   10.89 +handle_info({'DOWN', _MRef, process, C, Info}, C) ->
   10.90 +    {error, {consumer_died, Info}, C};
   10.91 +handle_info({'DOWN', MRef, process, Pid, Info}, C) ->
   10.92 +    C ! {'DOWN', MRef, process, Pid, Info},
   10.93 +    {ok, C}.
   10.94 +
   10.95 +%% @private
   10.96 +handle_call(M, A, C) ->
   10.97 +    C ! {M, A},
   10.98 +    {reply, ok, C}.
   10.99 +
  10.100 +%% @private
  10.101 +terminate(_Reason, C) ->
  10.102 +    C.
    11.1 --- a/src/amqp_gen_connection.erl	Mon Jul 18 17:40:41 2011 +0100
    11.2 +++ b/src/amqp_gen_connection.erl	Wed Jul 27 16:03:16 2011 +0100
    11.3 @@ -21,7 +21,7 @@
    11.4  
    11.5  -behaviour(gen_server).
    11.6  
    11.7 --export([start_link/5, connect/1, open_channel/2, hard_error_in_channel/3,
    11.8 +-export([start_link/5, connect/1, open_channel/3, hard_error_in_channel/3,
    11.9           channel_internal_error/3, server_misbehaved/2, channels_terminated/1,
   11.10           close/2, info/2, info_keys/0, info_keys/1]).
   11.11  -export([behaviour_info/1]).
   11.12 @@ -61,8 +61,9 @@
   11.13  connect(Pid) ->
   11.14      gen_server:call(Pid, connect, infinity).
   11.15  
   11.16 -open_channel(Pid, ProposedNumber) ->
   11.17 -    case gen_server:call(Pid, {command, {open_channel, ProposedNumber}},
   11.18 +open_channel(Pid, ProposedNumber, Consumer) ->
   11.19 +    case gen_server:call(Pid,
   11.20 +                         {command, {open_channel, ProposedNumber, Consumer}},
   11.21                           infinity) of
   11.22          {ok, ChannelPid} -> ok = amqp_channel:open(ChannelPid),
   11.23                              {ok, ChannelPid};
   11.24 @@ -234,11 +235,11 @@
   11.25  %% Command handling
   11.26  %%---------------------------------------------------------------------------
   11.27  
   11.28 -handle_command({open_channel, ProposedNumber}, _From,
   11.29 +handle_command({open_channel, ProposedNumber, Consumer}, _From,
   11.30                 State = #state{channels_manager = ChMgr,
   11.31                                module = Mod,
   11.32                                module_state = MState}) ->
   11.33 -    {reply, amqp_channels_manager:open_channel(ChMgr, ProposedNumber,
   11.34 +    {reply, amqp_channels_manager:open_channel(ChMgr, ProposedNumber, Consumer,
   11.35                                                 Mod:open_channel_args(MState)),
   11.36       State};
   11.37   handle_command({close, #'connection.close'{} = Close}, From, State) ->
    12.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
    12.2 +++ b/src/amqp_gen_consumer.erl	Wed Jul 27 16:03:16 2011 +0100
    12.3 @@ -0,0 +1,252 @@
    12.4 +%% The contents of this file are subject to the Mozilla Public License
    12.5 +%% Version 1.1 (the "License"); you may not use this file except in
    12.6 +%% compliance with the License. You may obtain a copy of the License at
    12.7 +%% http://www.mozilla.org/MPL/
    12.8 +%%
    12.9 +%% Software distributed under the License is distributed on an "AS IS"
   12.10 +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
   12.11 +%% License for the specific language governing rights and limitations
   12.12 +%% under the License.
   12.13 +%%
   12.14 +%% The Original Code is RabbitMQ.
   12.15 +%%
   12.16 +%% The Initial Developer of the Original Code is VMware, Inc.
   12.17 +%% Copyright (c) 2011-2011 VMware, Inc.  All rights reserved.
   12.18 +%%
   12.19 +
   12.20 +%% @doc A behaviour module for implementing consumers for
   12.21 +%% amqp_channel. To specify a consumer implementation for a channel,
   12.22 +%% use amqp_connection:open_channel/{2,3}.
   12.23 +%% <br/>
   12.24 +%% All callbacks are called within the gen_consumer process. <br/>
   12.25 +%% <br/>
   12.26 +%% See comments in amqp_gen_consumer.erl source file for documentation
   12.27 +%% on the callback functions.
   12.28 +%% <br/>
   12.29 +%% Note that making calls to the channel from the callback module will
   12.30 +%% result in deadlock.
   12.31 +-module(amqp_gen_consumer).
   12.32 +
   12.33 +-include("amqp_client.hrl").
   12.34 +
   12.35 +-behaviour(gen_server2).
   12.36 +
   12.37 +-export([start_link/2, call_consumer/2, call_consumer/3]).
   12.38 +-export([behaviour_info/1]).
   12.39 +-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
   12.40 +         handle_info/2, prioritise_info/2]).
   12.41 +
   12.42 +-record(state, {module,
   12.43 +                module_state}).
   12.44 +
   12.45 +%%---------------------------------------------------------------------------
   12.46 +%% Interface
   12.47 +%%---------------------------------------------------------------------------
   12.48 +
   12.49 +%% @type ok_error() = {ok, state()} | {error, reason(), state()}
   12.50 +%% Denotes a successful or an error return from a consumer module call.
   12.51 +
   12.52 +start_link(ConsumerModule, ExtraParams) ->
   12.53 +    gen_server2:start_link(?MODULE, [ConsumerModule, ExtraParams], []).
   12.54 +
   12.55 +%% @spec (Consumer, Msg) -> ok
   12.56 +%% where
   12.57 +%%      Consumer = pid()
   12.58 +%%      Msg = any()
   12.59 +%%
   12.60 +%% @doc This function is used to perform arbitrary calls into the
   12.61 +%% consumer module.
   12.62 +call_consumer(Pid, Msg) ->
   12.63 +    gen_server2:call(Pid, {consumer_call, Msg}, infinity).
   12.64 +
   12.65 +%% @spec (Consumer, Method, Args) -> ok
   12.66 +%% where
   12.67 +%%      Consumer = pid()
   12.68 +%%      Method = amqp_method()
   12.69 +%%      Args = any()
   12.70 +%%
   12.71 +%% @doc This function is used by amqp_channel to forward received
   12.72 +%% methods and deliveries to the consumer module.
   12.73 +call_consumer(Pid, Method, Args) ->
   12.74 +    gen_server2:call(Pid, {consumer_call, Method, Args}, infinity).
   12.75 +
   12.76 +%%---------------------------------------------------------------------------
   12.77 +%% Behaviour
   12.78 +%%---------------------------------------------------------------------------
   12.79 +
   12.80 +%% @private
   12.81 +behaviour_info(callbacks) ->
   12.82 +    [
   12.83 +     %% init(Args) -> {ok, InitialState} | {stop, Reason} | ignore
   12.84 +     %% where
   12.85 +     %%      Args = [any()]
   12.86 +     %%      InitialState = state()
   12.87 +     %%      Reason = term()
   12.88 +     %%
   12.89 +     %% This callback is invoked by the channel, when it starts
   12.90 +     %% up. Use it to initialize the state of the consumer. In case of
   12.91 +     %% an error, return {stop, Reason} or ignore.
   12.92 +     {init, 1},
   12.93 +
   12.94 +     %% handle_consume(Consume, Sender, State) -> ok_error()
   12.95 +     %% where
   12.96 +     %%      Consume = #'basic.consume'{}
   12.97 +     %%      Sender = pid()
   12.98 +     %%      State = state()
   12.99 +     %%
  12.100 +     %% This callback is invoked by the channel before a basic.consume
  12.101 +     %% is sent to the server.
  12.102 +     {handle_consume, 3},
  12.103 +
  12.104 +     %% handle_consume_ok(ConsumeOk, Consume, State) -> ok_error()
  12.105 +     %% where
  12.106 +     %%      ConsumeOk = #'basic.consume_ok'{}
  12.107 +     %%      Consume = #'basic.consume'{}
  12.108 +     %%      State = state()
  12.109 +     %%
  12.110 +     %% This callback is invoked by the channel every time a
  12.111 +     %% basic.consume_ok is received from the server. Consume is the original
  12.112 +     %% method sent out to the server - it can be used to associate the
  12.113 +     %% call with the response.
  12.114 +     {handle_consume_ok, 3},
  12.115 +
  12.116 +     %% handle_cancel(Cancel, State) -> ok_error()
  12.117 +     %% where
  12.118 +     %%      Cancel = #'basic.cancel'{}
  12.119 +     %%      State = state()
  12.120 +     %%
  12.121 +     %% This callback is invoked by the channel every time a basic.cancel
  12.122 +     %% is received from the server.
  12.123 +     {handle_cancel, 2},
  12.124 +
  12.125 +     %% handle_cancel_ok(CancelOk, Cancel, State) -> ok_error()
  12.126 +     %% where
  12.127 +     %%      CancelOk = #'basic.cancel_ok'{}
  12.128 +     %%      Cancel = #'basic.cancel'{}
  12.129 +     %%      State = state()
  12.130 +     %%
  12.131 +     %% This callback is invoked by the channel every time a basic.cancel_ok
  12.132 +     %% is received from the server.
  12.133 +     {handle_cancel_ok, 3},
  12.134 +
  12.135 +     %% handle_deliver(Deliver, Message, State) -> ok_error()
  12.136 +     %% where
  12.137 +     %%      Deliver = #'basic.deliver'{}
  12.138 +     %%      Message = #amqp_msg{}
  12.139 +     %%      State = state()
  12.140 +     %%
  12.141 +     %% This callback is invoked by the channel every time a basic.deliver
  12.142 +     %% is received from the server.
  12.143 +     {handle_deliver, 3},
  12.144 +
  12.145 +     %% handle_info(Info, State) -> ok_error()
  12.146 +     %% where
  12.147 +     %%      Info = any()
  12.148 +     %%      State = state()
  12.149 +     %%
  12.150 +     %% This callback is invoked the consumer process receives a
  12.151 +     %% message.
  12.152 +     {handle_info, 2},
  12.153 +
  12.154 +     %% handle_call(Msg, From, State) -> {reply, Reply, NewState} |
  12.155 +     %%                                  {noreply, NewState} |
  12.156 +     %%                                  {error, Reason, NewState}
  12.157 +     %% where
  12.158 +     %%      Msg = any()
  12.159 +     %%      From = any()
  12.160 +     %%      Reply = any()
  12.161 +     %%      State = state()
  12.162 +     %%      NewState = state()
  12.163 +     %%
  12.164 +     %% This callback is invoked by the channel when calling
  12.165 +     %% amqp_channel:call_consumer/2. Reply is the term that
  12.166 +     %% amqp_channel:call_consumer/2 will return. If the callback
  12.167 +     %% returns {noreply, _}, then the caller to
  12.168 +     %% amqp_channel:call_consumer/2 and the channel remain blocked
  12.169 +     %% until gen_server2:reply/2 is used with the provided From as
  12.170 +     %% the first argument.
  12.171 +     {handle_call, 3},
  12.172 +
  12.173 +     %% terminate(Reason, State) -> any()
  12.174 +     %% where
  12.175 +     %%      Reason = any()
  12.176 +     %%      State = state()
  12.177 +     %%
  12.178 +     %% This callback is invoked by the channel after it has shut down and
  12.179 +     %% just before its process exits.
  12.180 +     {terminate, 2}
  12.181 +    ];
  12.182 +behaviour_info(_Other) ->
  12.183 +    undefined.
  12.184 +
  12.185 +%%---------------------------------------------------------------------------
  12.186 +%% gen_server2 callbacks
  12.187 +%%---------------------------------------------------------------------------
  12.188 +
  12.189 +init([ConsumerModule, ExtraParams]) ->
  12.190 +    case ConsumerModule:init(ExtraParams) of
  12.191 +        {ok, MState} ->
  12.192 +            {ok, #state{module = ConsumerModule, module_state = MState}};
  12.193 +        {stop, Reason} ->
  12.194 +            {stop, Reason};
  12.195 +        ignore ->
  12.196 +            ignore
  12.197 +    end.
  12.198 +
  12.199 +prioritise_info({'DOWN', _MRef, process, _Pid, _Info}, _State) -> 1;
  12.200 +prioritise_info(_, _State)                                     -> 0.
  12.201 +
  12.202 +handle_call({consumer_call, Msg}, From,
  12.203 +            State = #state{module       = ConsumerModule,
  12.204 +                           module_state = MState}) ->
  12.205 +    case ConsumerModule:handle_call(Msg, From, MState) of
  12.206 +        {noreply, NewMState} ->
  12.207 +            {noreply, State#state{module_state = NewMState}};
  12.208 +        {reply, Reply, NewMState} ->
  12.209 +            {reply, Reply, State#state{module_state = NewMState}};
  12.210 +        {error, Reason, NewMState} ->
  12.211 +            {stop, {error, Reason}, {error, Reason},
  12.212 +             State#state{module_state = NewMState}}
  12.213 +    end;
  12.214 +handle_call({consumer_call, Method, Args}, _From,
  12.215 +            State = #state{module       = ConsumerModule,
  12.216 +                           module_state = MState}) ->
  12.217 +    Return =
  12.218 +        case Method of
  12.219 +            #'basic.consume'{} ->
  12.220 +                ConsumerModule:handle_consume(Method, Args, MState);
  12.221 +            #'basic.consume_ok'{} ->
  12.222 +                ConsumerModule:handle_consume_ok(Method, Args, MState);
  12.223 +            #'basic.cancel'{} ->
  12.224 +                ConsumerModule:handle_cancel(Method, MState);
  12.225 +            #'basic.cancel_ok'{} ->
  12.226 +                ConsumerModule:handle_cancel_ok(Method, Args, MState);
  12.227 +            #'basic.deliver'{} ->
  12.228 +                ConsumerModule:handle_deliver(Method, Args, MState)
  12.229 +        end,
  12.230 +    case Return of
  12.231 +        {ok, NewMState} ->
  12.232 +            {reply, ok, State#state{module_state = NewMState}};
  12.233 +        {error, Reason, NewMState} ->
  12.234 +            {stop, {error, Reason}, {error, Reason},
  12.235 +             State#state{module_state = NewMState}}
  12.236 +    end.
  12.237 +
  12.238 +handle_cast(_What, State) ->
  12.239 +    {noreply, State}.
  12.240 +
  12.241 +handle_info(Info, State = #state{module_state = MState,
  12.242 +                                 module       = ConsumerModule}) ->
  12.243 +    case ConsumerModule:handle_info(Info, MState) of
  12.244 +        {ok, NewMState} ->
  12.245 +            {noreply, State#state{module_state = NewMState}};
  12.246 +        {error, Reason, NewMState} ->
  12.247 +            {stop, {error, Reason}, {error, Reason},
  12.248 +             State#state{module_state = NewMState}}
  12.249 +    end.
  12.250 +
  12.251 +terminate(Reason, #state{module = ConsumerModule, module_state = MState}) ->
  12.252 +    ConsumerModule:terminate(Reason, MState).
  12.253 +
  12.254 +code_change(_OldVsn, State, _Extra) ->
  12.255 +    State.
    13.1 --- a/src/amqp_rpc_client.erl	Mon Jul 18 17:40:41 2011 +0100
    13.2 +++ b/src/amqp_rpc_client.erl	Wed Jul 27 16:03:16 2011 +0100
    13.3 @@ -81,7 +81,7 @@
    13.4  
    13.5  %% Registers this RPC client instance as a consumer to handle rpc responses
    13.6  setup_consumer(#state{channel = Channel, reply_queue = Q}) ->
    13.7 -    amqp_channel:subscribe(Channel, #'basic.consume'{queue = Q}, self()).
    13.8 +    amqp_channel:call(Channel, #'basic.consume'{queue = Q}).
    13.9  
   13.10  %% Publishes to the broker, stores the From address against
   13.11  %% the correlation id and increments the correlationid for
   13.12 @@ -111,7 +111,8 @@
   13.13  %% Sets up a reply queue and consumer within an existing channel
   13.14  %% @private
   13.15  init([Connection, RoutingKey]) ->
   13.16 -    {ok, Channel} = amqp_connection:open_channel(Connection),
   13.17 +    {ok, Channel} = amqp_connection:open_channel(
   13.18 +                        Connection, {amqp_direct_consumer, [self()]}),
   13.19      InitialState = #state{channel     = Channel,
   13.20                            exchange    = <<>>,
   13.21                            routing_key = RoutingKey},
   13.22 @@ -140,10 +141,18 @@
   13.23      {noreply, State}.
   13.24  
   13.25  %% @private
   13.26 +handle_info({#'basic.consume'{}, _Pid}, State) ->
   13.27 +    {noreply, State};
   13.28 +
   13.29 +%% @private
   13.30  handle_info(#'basic.consume_ok'{}, State) ->
   13.31      {noreply, State};
   13.32  
   13.33  %% @private
   13.34 +handle_info(#'basic.cancel'{}, State) ->
   13.35 +    {noreply, State};
   13.36 +
   13.37 +%% @private
   13.38  handle_info(#'basic.cancel_ok'{}, State) ->
   13.39      {stop, normal, State};
   13.40  
    14.1 --- a/src/amqp_rpc_server.erl	Mon Jul 18 17:40:41 2011 +0100
    14.2 +++ b/src/amqp_rpc_server.erl	Wed Jul 27 16:03:16 2011 +0100
    14.3 @@ -64,9 +64,10 @@
    14.4  
    14.5  %% @private
    14.6  init([Connection, Q, Fun]) ->
    14.7 -    {ok, Channel} = amqp_connection:open_channel(Connection),
    14.8 +    {ok, Channel} = amqp_connection:open_channel(
    14.9 +                        Connection, {amqp_direct_consumer, [self()]}),
   14.10      amqp_channel:call(Channel, #'queue.declare'{queue = Q}),
   14.11 -    amqp_channel:subscribe(Channel, #'basic.consume'{queue = Q}, self()),
   14.12 +    amqp_channel:call(Channel, #'basic.consume'{queue = Q}),
   14.13      {ok, #state{channel = Channel, handler = Fun} }.
   14.14  
   14.15  %% @private
   14.16 @@ -74,10 +75,18 @@
   14.17      {stop, normal, State};
   14.18  
   14.19  %% @private
   14.20 +handle_info({#'basic.consume'{}, _}, State) ->
   14.21 +    {noreply, State};
   14.22 +
   14.23 +%% @private
   14.24  handle_info(#'basic.consume_ok'{}, State) ->
   14.25      {noreply, State};
   14.26  
   14.27  %% @private
   14.28 +handle_info(#'basic.cancel'{}, State) ->
   14.29 +    {noreply, State};
   14.30 +
   14.31 +%% @private
   14.32  handle_info(#'basic.cancel_ok'{}, State) ->
   14.33      {stop, normal, State};
   14.34  
   14.35 @@ -95,6 +104,10 @@
   14.36      amqp_channel:call(Channel, Publish, #amqp_msg{props = Properties,
   14.37                                                    payload = Response}),
   14.38      amqp_channel:call(Channel, #'basic.ack'{delivery_tag = DeliveryTag}),
   14.39 +    {noreply, State};
   14.40 +
   14.41 +%% @private
   14.42 +handle_info({'DOWN', _MRef, process, _Pid, _Info}, State) ->
   14.43      {noreply, State}.
   14.44  
   14.45  %% @private
    15.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
    15.2 +++ b/src/amqp_selective_consumer.erl	Wed Jul 27 16:03:16 2011 +0100
    15.3 @@ -0,0 +1,233 @@
    15.4 +%% The contents of this file are subject to the Mozilla Public Licensbe
    15.5 +%% Version 1.1 (the "License"); you may not use this file except in
    15.6 +%% compliance with the License. You may obtain a copy of the License at
    15.7 +%% http://www.mozilla.org/MPL/
    15.8 +%%
    15.9 +%% Software distributed under the License is distributed on an "AS IS"
   15.10 +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
   15.11 +%% License for the specific language governing rights and limitations
   15.12 +%% under the License.
   15.13 +%%
   15.14 +%% The Original Code is RabbitMQ.
   15.15 +%%
   15.16 +%% The Initial Developer of the Original Code is VMware, Inc.
   15.17 +%% Copyright (c) 2011-2011 VMware, Inc.  All rights reserved.
   15.18 +%%
   15.19 +
   15.20 +%% @doc This module is an implementation of the amqp_gen_consumer
   15.21 +%% behaviour and can be used as part of the Consumer parameter when
   15.22 +%% opening AMQP channels. This is the default implementation selected
   15.23 +%% by channel. <br/>
   15.24 +%% <br/>
   15.25 +%% The Consumer parameter for this implementation is {{@module}, []@}<br/>
   15.26 +%% This consumer implementation keeps track of consumer tags and sends
   15.27 +%% the subscription-relevant messages to the registered consumers, according
   15.28 +%% to an internal tag dictionary.<br/>
   15.29 +%% <br/>
   15.30 +%% Send a #basic.consume{} message to the channel to subscribe a
   15.31 +%% consumer to a queue and send a #basic.cancel{} message to cancel a
   15.32 +%% subscription.<br/>
   15.33 +%% <br/>
   15.34 +%% The channel will send to the relevant registered consumers the
   15.35 +%% basic.consume_ok, basic.cancel_ok, basic.cancel and basic.deliver messages
   15.36 +%% received from the server.<br/>
   15.37 +%% <br/>
   15.38 +%% If a consumer is not registered for a given consumer tag, the message
   15.39 +%% is sent to the default consumer registered with
   15.40 +%% {@module}:register_default_consumer. If there is no default consumer
   15.41 +%% registered in this case, an exception occurs and the channel is abruptly
   15.42 +%% terminated.<br/>
   15.43 +-module(amqp_selective_consumer).
   15.44 +
   15.45 +-include("amqp_client.hrl").
   15.46 +-include("amqp_gen_consumer_spec.hrl").
   15.47 +
   15.48 +-behaviour(amqp_gen_consumer).
   15.49 +
   15.50 +-export([register_default_consumer/2]).
   15.51 +-export([init/1, handle_consume_ok/3, handle_consume/3, handle_cancel_ok/3,
   15.52 +         handle_cancel/2, handle_deliver/3, handle_info/2, handle_call/3,
   15.53 +         terminate/2]).
   15.54 +
   15.55 +-record(state, {consumers             = dict:new(), %% Tag -> ConsumerPid
   15.56 +                unassigned            = undefined,  %% Pid
   15.57 +                monitors              = dict:new(), %% Pid -> MRef
   15.58 +                default_consumer      = none}).
   15.59 +
   15.60 +%%---------------------------------------------------------------------------
   15.61 +%% Interface
   15.62 +%%---------------------------------------------------------------------------
   15.63 +
   15.64 +%% @spec (ChannelPid, ConsumerPid) -> ok
   15.65 +%% where
   15.66 +%%      ChannelPid = pid()
   15.67 +%%      ConsumerPid = pid()
   15.68 +%% @doc This function registers a default consumer with the channel. A
   15.69 +%% default consumer is used when a subscription is made via
   15.70 +%% amqp_channel:call(ChannelPid, #'basic.consume'{}) (rather than
   15.71 +%% {@module}:subscribe/3) and hence there is no consumer pid
   15.72 +%% registered with the consumer tag. In this case, the relevant
   15.73 +%% deliveries will be sent to the default consumer.
   15.74 +register_default_consumer(ChannelPid, ConsumerPid) ->
   15.75 +    amqp_channel:call_consumer(ChannelPid,
   15.76 +                               {register_default_consumer, ConsumerPid}).
   15.77 +
   15.78 +%%---------------------------------------------------------------------------
   15.79 +%% amqp_gen_consumer callbacks
   15.80 +%%---------------------------------------------------------------------------
   15.81 +
   15.82 +%% @private
   15.83 +init([]) ->
   15.84 +    {ok, #state{}}.
   15.85 +
   15.86 +%% @private
   15.87 +handle_consume(BasicConsume, Pid, State = #state{consumers = Consumers,
   15.88 +                                                 monitors = Monitors}) ->
   15.89 +    Tag = tag(BasicConsume),
   15.90 +    Ok =
   15.91 +        case BasicConsume of
   15.92 +            #'basic.consume'{nowait = true}
   15.93 +                    when Tag =:= undefined orelse size(Tag) == 0 ->
   15.94 +                false; %% Async and undefined tag
   15.95 +            _ when is_binary(Tag) andalso size(Tag) >= 0 ->
   15.96 +                case resolve_consumer(Tag, State) of
   15.97 +                    {consumer, _} -> false; %% Tag already in use
   15.98 +                    _             -> true
   15.99 +                end;
  15.100 +           _ ->
  15.101 +               true
  15.102 +        end,
  15.103 +    case {Ok, BasicConsume} of
  15.104 +        {true, #'basic.consume'{nowait = true}} ->
  15.105 +            {ok, State#state
  15.106 +             {consumers = dict:store(Tag, Pid, Consumers),
  15.107 +              monitors  = dict:store(Pid, monitor(process, Pid), Monitors)}};
  15.108 +        {true, #'basic.consume'{nowait = false}} ->
  15.109 +            {ok, State#state{unassigned = Pid}};
  15.110 +        {false, #'basic.consume'{nowait = true}} ->
  15.111 +            {error, 'no_consumer_tag_specified', State};
  15.112 +        {false, #'basic.consume'{nowait = false}} ->
  15.113 +            %% Don't do anything (don't override existing
  15.114 +            %% consumers), the server will close the channel with an error.
  15.115 +            {ok, State}
  15.116 +    end.
  15.117 +
  15.118 +%% @private
  15.119 +handle_consume_ok(BasicConsumeOk, _BasicConsume,
  15.120 +                  State = #state{unassigned = Pid,
  15.121 +                                 consumers  = Consumers,
  15.122 +                                 monitors   = Monitors})
  15.123 +  when is_pid(Pid) ->
  15.124 +    State1 = State#state{consumers  =
  15.125 +                             dict:store(tag(BasicConsumeOk), Pid, Consumers),
  15.126 +                         monitors   =
  15.127 +                             dict:store(Pid, monitor(process, Pid), Monitors),
  15.128 +                         unassigned = undefined},
  15.129 +    deliver(BasicConsumeOk, State1),
  15.130 +    {ok, State1}.
  15.131 +
  15.132 +%% @private
  15.133 +%% The server sent a basic.cancel.
  15.134 +handle_cancel(Cancel, State) ->
  15.135 +    State1 = do_cancel(Cancel, State),
  15.136 +    %% Use old state
  15.137 +    deliver(Cancel, State),
  15.138 +    {ok, State1}.
  15.139 +
  15.140 +%% @private
  15.141 +%% We sent a basic.cancel and now receive the ok.
  15.142 +handle_cancel_ok(CancelOk, _Cancel, State) ->
  15.143 +    State1 = do_cancel(CancelOk, State),
  15.144 +    %% Use old state
  15.145 +    deliver(CancelOk, State),
  15.146 +    {ok, State1}.
  15.147 +
  15.148 +%% @private
  15.149 +handle_deliver(Deliver, Message, State) ->
  15.150 +    deliver(Deliver, Message, State),
  15.151 +    {ok, State}.
  15.152 +
  15.153 +%% @private
  15.154 +handle_info({'DOWN', _MRef, process, Pid, _Info},
  15.155 +            State = #state{monitors         = Monitors,
  15.156 +                           consumers        = Consumers,
  15.157 +                           default_consumer = DConsumer }) ->
  15.158 +    case dict:find(Pid, Monitors) of
  15.159 +        {ok, _Tag} ->
  15.160 +            {ok, State#state{monitors = dict:erase(Pid, Monitors),
  15.161 +                             consumers =
  15.162 +                                 dict:filter(
  15.163 +                                   fun (_, Pid1) when Pid1 =:= Pid -> false;
  15.164 +                                       (_, _)                      -> true
  15.165 +                                   end, Consumers)}};
  15.166 +        error ->
  15.167 +            case Pid of
  15.168 +                DConsumer -> {ok, State#state{
  15.169 +                                    monitors = dict:erase(Pid, Monitors),
  15.170 +                                    default_consumer = none}};
  15.171 +                _         -> {ok, State} %% unnamed consumer went down
  15.172 +                                         %% before receiving consume_ok
  15.173 +            end
  15.174 +    end.
  15.175 +
  15.176 +%% @private
  15.177 +handle_call({register_default_consumer, Pid}, _From,
  15.178 +            State = #state{default_consumer = PrevPid,
  15.179 +                           monitors         = Monitors}) ->
  15.180 +    case PrevPid of
  15.181 +        none -> ok;
  15.182 +        _    -> demonitor(dict:fetch(PrevPid, Monitors)),
  15.183 +                dict:erase(PrevPid, Monitors)
  15.184 +    end,
  15.185 +    {reply, ok,
  15.186 +     State#state{default_consumer = Pid,
  15.187 +                 monitors = dict:store(Pid, monitor(process, Pid),
  15.188 +                                       Monitors)}}.
  15.189 +
  15.190 +%% @private
  15.191 +terminate(_Reason, State) ->
  15.192 +    State.
  15.193 +
  15.194 +%%---------------------------------------------------------------------------
  15.195 +%% Internal plumbing
  15.196 +%%---------------------------------------------------------------------------
  15.197 +
  15.198 +deliver(Msg, State) ->
  15.199 +    deliver(Msg, undefined, State).
  15.200 +deliver(Msg, Message, State) ->
  15.201 +    Combined = if Message =:= undefined -> Msg;
  15.202 +                  true                  -> {Msg, Message}
  15.203 +               end,
  15.204 +    case resolve_consumer(tag(Msg), State) of
  15.205 +        {consumer, Pid} -> Pid ! Combined;
  15.206 +        {default, Pid}  -> Pid ! Combined;
  15.207 +        error           -> exit(unexpected_delivery_and_no_default_consumer)
  15.208 +    end.
  15.209 +
  15.210 +do_cancel(Cancel, State = #state{consumers = Consumers,
  15.211 +                                 monitors  = Monitors}) ->
  15.212 +    Tag = tag(Cancel),
  15.213 +    case dict:find(Tag, Consumers) of
  15.214 +        {ok, Pid} -> MRef = dict:fetch(Pid, Monitors),
  15.215 +                     demonitor(MRef),
  15.216 +                     State#state{consumers = dict:erase(Tag, Consumers),
  15.217 +                                 monitors  = dict:erase(Pid, Monitors)};
  15.218 +        error     -> %% Untracked consumer. Do nothing.
  15.219 +                     State
  15.220 +    end.
  15.221 +
  15.222 +resolve_consumer(Tag, #state{consumers = Consumers,
  15.223 +                             default_consumer = DefaultConsumer}) ->
  15.224 +    case dict:find(Tag, Consumers) of
  15.225 +        {ok, ConsumerPid} -> {consumer, ConsumerPid};
  15.226 +        error             -> case DefaultConsumer of
  15.227 +                                 none -> error;
  15.228 +                                 _    -> {default, DefaultConsumer}
  15.229 +                             end
  15.230 +    end.
  15.231 +
  15.232 +tag(#'basic.consume'{consumer_tag = Tag})         -> Tag;
  15.233 +tag(#'basic.consume_ok'{consumer_tag = Tag})      -> Tag;
  15.234 +tag(#'basic.cancel'{consumer_tag = Tag})          -> Tag;
  15.235 +tag(#'basic.cancel_ok'{consumer_tag = Tag})       -> Tag;
  15.236 +tag(#'basic.deliver'{consumer_tag = Tag})         -> Tag.
    16.1 --- a/test/direct_client_SUITE.erl	Mon Jul 18 17:40:41 2011 +0100
    16.2 +++ b/test/direct_client_SUITE.erl	Wed Jul 27 16:03:16 2011 +0100
    16.3 @@ -103,6 +103,9 @@
    16.4  confirm_barrier_nop_test() ->
    16.5      test_util:confirm_barrier_nop_test(new_connection()).
    16.6  
    16.7 +default_consumer_test() ->
    16.8 +    test_util:default_consumer_test(new_connection()).
    16.9 +
   16.10  subscribe_nowait_test() ->
   16.11      test_util:subscribe_nowait_test(new_connection()).
   16.12  
    17.1 --- a/test/negative_test_util.erl	Mon Jul 18 17:40:41 2011 +0100
    17.2 +++ b/test/negative_test_util.erl	Wed Jul 27 16:03:16 2011 +0100
    17.3 @@ -126,10 +126,10 @@
    17.4      SentString = << <<"k">> || _ <- lists:seq(1, 340)>>,
    17.5      Q = test_util:uuid(), X = test_util:uuid(), Key = test_util:uuid(),
    17.6      test_util:setup_exchange(Channel, Q, X, Key),
    17.7 -    ?assertExit(_, amqp_channel:subscribe(
    17.8 -                       Channel, #'basic.consume'{queue = Q, no_ack = true,
    17.9 -                                                 consumer_tag = SentString},
   17.10 -                       self())),
   17.11 +    ?assertExit(_, amqp_channel:call(
   17.12 +                       Channel, #'basic.consume'{queue = Q,
   17.13 +                                                 no_ack = true,
   17.14 +                                                 consumer_tag = SentString})),
   17.15      test_util:wait_for_death(Channel),
   17.16      test_util:wait_for_death(Connection),
   17.17      ok.
    18.1 --- a/test/network_client_SUITE.erl	Mon Jul 18 17:40:41 2011 +0100
    18.2 +++ b/test/network_client_SUITE.erl	Wed Jul 27 16:03:16 2011 +0100
    18.3 @@ -117,6 +117,9 @@
    18.4  confirm_barrier_nop_test() ->
    18.5      test_util:confirm_barrier_nop_test(new_connection()).
    18.6  
    18.7 +default_consumer_test() ->
    18.8 +    test_util:default_consumer_test(new_connection()).
    18.9 +
   18.10  subscribe_nowait_test() ->
   18.11      test_util:subscribe_nowait_test(new_connection()).
   18.12  
    19.1 --- a/test/test_util.erl	Mon Jul 18 17:40:41 2011 +0100
    19.2 +++ b/test/test_util.erl	Wed Jul 27 16:03:16 2011 +0100
    19.3 @@ -332,9 +332,8 @@
    19.4                                                   exchange = X,
    19.5                                                   routing_key = RoutingKey}),
    19.6      #'basic.consume_ok'{} =
    19.7 -        amqp_channel:subscribe(Channel,
    19.8 -                               #'basic.consume'{queue = Q, consumer_tag = Tag},
    19.9 -                               self()),
   19.10 +        amqp_channel:call(Channel,
   19.11 +                          #'basic.consume'{queue = Q, consumer_tag = Tag}),
   19.12      receive #'basic.consume_ok'{consumer_tag = Tag} -> ok end,
   19.13      receive {#'basic.deliver'{}, _} -> ok end,
   19.14      #'basic.cancel_ok'{} =
   19.15 @@ -348,7 +347,7 @@
   19.16      #'queue.declare_ok'{} =
   19.17          amqp_channel:call(Channel, #'queue.declare'{queue = Q}),
   19.18      #'basic.consume_ok'{consumer_tag = CTag} = ConsumeOk =
   19.19 -        amqp_channel:subscribe(Channel, #'basic.consume'{queue = Q}, self()),
   19.20 +        amqp_channel:call(Channel, #'basic.consume'{queue = Q}),
   19.21      receive ConsumeOk -> ok end,
   19.22      #'queue.delete_ok'{} =
   19.23          amqp_channel:call(Channel, #'queue.delete'{queue = Q}),
   19.24 @@ -357,34 +356,27 @@
   19.25      ok.
   19.26  
   19.27  basic_recover_test(Connection) ->
   19.28 -    {ok, Channel} = amqp_connection:open_channel(Connection),
   19.29 +    {ok, Channel} = amqp_connection:open_channel(
   19.30 +                        Connection, {amqp_direct_consumer, [self()]}),
   19.31      #'queue.declare_ok'{queue = Q} =
   19.32          amqp_channel:call(Channel, #'queue.declare'{}),
   19.33      #'basic.consume_ok'{consumer_tag = Tag} =
   19.34 -        amqp_channel:subscribe(Channel, #'basic.consume'{queue = Q},
   19.35 -                                 self()),
   19.36 -    receive
   19.37 -        #'basic.consume_ok'{consumer_tag = Tag} -> ok
   19.38 -    after 2000 ->
   19.39 -        exit(did_not_receive_subscription_message)
   19.40 -    end,
   19.41 +        amqp_channel:call(Channel, #'basic.consume'{queue = Q}),
   19.42 +    receive #'basic.consume_ok'{consumer_tag = Tag} -> ok end,
   19.43      Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
   19.44      amqp_channel:call(Channel, Publish, #amqp_msg{payload = <<"foobar">>}),
   19.45      receive
   19.46 -        {#'basic.deliver'{}, _} ->
   19.47 +        {#'basic.deliver'{consumer_tag = Tag}, _} ->
   19.48              %% no_ack set to false, but don't send ack
   19.49              ok
   19.50 -    after 2000 ->
   19.51 -        exit(did_not_receive_first_message)
   19.52      end,
   19.53      BasicRecover = #'basic.recover'{requeue = true},
   19.54      amqp_channel:cast(Channel, BasicRecover),
   19.55      receive
   19.56 -        {#'basic.deliver'{delivery_tag = DeliveryTag2}, _} ->
   19.57 +        {#'basic.deliver'{consumer_tag = Tag,
   19.58 +                          delivery_tag = DeliveryTag2}, _} ->
   19.59              amqp_channel:cast(Channel,
   19.60                                #'basic.ack'{delivery_tag = DeliveryTag2})
   19.61 -    after 2000 ->
   19.62 -        exit(did_not_receive_second_message)
   19.63      end,
   19.64      teardown(Connection, Channel).
   19.65  
   19.66 @@ -433,8 +425,8 @@
   19.67                  {ok, Channel} = amqp_connection:open_channel(Connection),
   19.68                  amqp_channel:call(Channel,
   19.69                                    #'basic.qos'{prefetch_count = Prefetch}),
   19.70 -                amqp_channel:subscribe(Channel, #'basic.consume'{queue = Q},
   19.71 -                                       self()),
   19.72 +                amqp_channel:call(Channel,
   19.73 +                                  #'basic.consume'{queue = Q}),
   19.74                  Parent ! finished,
   19.75                  sleeping_consumer(Channel, Sleep, Parent)
   19.76              end) || Sleep <- Workers],
   19.77 @@ -456,7 +448,7 @@
   19.78          #'basic.consume_ok'{} ->
   19.79              sleeping_consumer(Channel, Sleep, Parent);
   19.80          #'basic.cancel_ok'{}  ->
   19.81 -            ok;
   19.82 +            exit(unexpected_cancel_ok);
   19.83          {#'basic.deliver'{delivery_tag = DeliveryTag}, _Content} ->
   19.84              Parent ! finished,
   19.85              receive stop -> do_stop(Channel, Parent)
   19.86 @@ -515,13 +507,47 @@
   19.87      true = amqp_channel:wait_for_confirms(Channel),
   19.88      teardown(Connection, Channel).
   19.89  
   19.90 +default_consumer_test(Connection) ->
   19.91 +    {ok, Channel} = amqp_connection:open_channel(Connection),
   19.92 +    amqp_selective_consumer:register_default_consumer(Channel, self()),
   19.93 +
   19.94 +    #'queue.declare_ok'{queue = Q}
   19.95 +        = amqp_channel:call(Channel, #'queue.declare'{}),
   19.96 +    Pid = spawn(fun () -> receive
   19.97 +                          after 10000 -> ok
   19.98 +                          end
   19.99 +                end),
  19.100 +    #'basic.consume_ok'{} =
  19.101 +        amqp_channel:subscribe(Channel, #'basic.consume'{queue = Q}, Pid),
  19.102 +    monitor(process, Pid),
  19.103 +    exit(Pid, shutdown),
  19.104 +    receive
  19.105 +        {'DOWN', _, process, _, _} ->
  19.106 +            io:format("little consumer died out~n")
  19.107 +    end,
  19.108 +    Payload = <<"for the default consumer">>,
  19.109 +    amqp_channel:call(Channel,
  19.110 +                      #'basic.publish'{exchange = <<>>, routing_key = Q},
  19.111 +                      #amqp_msg{payload = Payload}),
  19.112 +
  19.113 +    receive
  19.114 +        {#'basic.deliver'{}, #'amqp_msg'{payload = Payload}} ->
  19.115 +            ok
  19.116 +    after 1000 ->
  19.117 +            exit('default_consumer_didnt_work')
  19.118 +    end,
  19.119 +    teardown(Connection, Channel).
  19.120 +
  19.121  subscribe_nowait_test(Connection) ->
  19.122      {ok, Channel} = amqp_connection:open_channel(Connection),
  19.123      {ok, Q} = setup_publish(Channel),
  19.124 -    ok = amqp_channel:subscribe(Channel, #'basic.consume'{queue = Q,
  19.125 -                                                          consumer_tag = uuid(),
  19.126 -                                                          nowait = true},
  19.127 -                                self()),
  19.128 +    ok = amqp_channel:call(Channel,
  19.129 +                           #'basic.consume'{queue = Q,
  19.130 +                                            consumer_tag = uuid(),
  19.131 +                                            nowait = true}),
  19.132 +    receive #'basic.consume_ok'{} -> exit(unexpected_consume_ok)
  19.133 +    after 0 -> ok
  19.134 +    end,
  19.135      receive
  19.136          {#'basic.deliver'{delivery_tag = DTag}, _Content} ->
  19.137              amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = DTag})
  19.138 @@ -585,10 +611,10 @@
  19.139      %% Close connection without closing channels
  19.140      amqp_connection:close(Connection1),
  19.141      %% Get sent messages back and count them
  19.142 -    {ok, Channel2} = amqp_connection:open_channel(Connection2),
  19.143 -    amqp_channel:subscribe(Channel2,
  19.144 -                           #'basic.consume'{queue = Q, no_ack = true},
  19.145 -                           self()),
  19.146 +    {ok, Channel2} = amqp_connection:open_channel(
  19.147 +                         Connection2, {amqp_direct_consumer, [self()]}),
  19.148 +    amqp_channel:call(Channel2, #'basic.consume'{queue = Q, no_ack = true}),
  19.149 +    receive #'basic.consume_ok'{} -> ok end,
  19.150      ?assert(pc_consumer_loop(Channel2, Payload, 0) == NMessages),
  19.151      %% Make sure queue is empty
  19.152      #'queue.declare_ok'{queue = Q, message_count = NRemaining} =