1.1 --- a/include/amqp_client.hrl Mon Jul 18 17:40:41 2011 +0100
1.2 +++ b/include/amqp_client.hrl Wed Jul 27 16:03:16 2011 +0100
1.3 @@ -14,6 +14,9 @@
1.4 %% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
1.5 %%
1.6
1.7 +-ifndef(AMQP_CLIENT_HRL).
1.8 +-define(AMQP_CLIENT_HRL, true).
1.9 +
1.10 -include_lib("rabbit_common/include/rabbit.hrl").
1.11 -include_lib("rabbit_common/include/rabbit_framing.hrl").
1.12
1.13 @@ -23,6 +26,7 @@
1.14 -define(PROTOCOL, rabbit_framing_amqp_0_9_1).
1.15
1.16 -define(MAX_CHANNEL_NUMBER, 65535).
1.17 +-define(DEFAULT_CONSUMER, {amqp_selective_consumer, []}).
1.18
1.19 -define(PROTOCOL_SSL_PORT, (?PROTOCOL_PORT - 1)).
1.20
1.21 @@ -59,7 +63,10 @@
1.22 -define(LOG_DEBUG(Format), error_logger:info_msg(Format)).
1.23 -define(LOG_INFO(Format, Args), error_logger:info_msg(Format, Args)).
1.24 -define(LOG_WARN(Format, Args), error_logger:warning_msg(Format, Args)).
1.25 +
1.26 -define(CLIENT_CAPABILITIES, [{<<"publisher_confirms">>, bool, true},
1.27 {<<"exchange_exchange_bindings">>, bool, true},
1.28 {<<"basic.nack">>, bool, true},
1.29 {<<"consumer_cancel_notify">>, bool, true}]).
1.30 +
1.31 +-endif.
2.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
2.2 +++ b/include/amqp_gen_consumer_spec.hrl Wed Jul 27 16:03:16 2011 +0100
2.3 @@ -0,0 +1,40 @@
2.4 +%% The contents of this file are subject to the Mozilla Public License
2.5 +%% Version 1.1 (the "License"); you may not use this file except in
2.6 +%% compliance with the License. You may obtain a copy of the License at
2.7 +%% http://www.mozilla.org/MPL/
2.8 +%%
2.9 +%% Software distributed under the License is distributed on an "AS IS"
2.10 +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
2.11 +%% License for the specific language governing rights and limitations
2.12 +%% under the License.
2.13 +%%
2.14 +%% The Original Code is RabbitMQ.
2.15 +%%
2.16 +%% The Initial Developer of the Original Code is VMware, Inc.
2.17 +%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved.
2.18 +%%
2.19 +
2.20 +-include("amqp_client.hrl").
2.21 +
2.22 +-type(state() :: any()).
2.23 +-type(consume() :: #'basic.consume'{}).
2.24 +-type(consume_ok() :: #'basic.consume_ok'{}).
2.25 +-type(cancel() :: #'basic.cancel'{}).
2.26 +-type(cancel_ok() :: #'basic.cancel_ok'{}).
2.27 +-type(deliver() :: #'basic.deliver'{}).
2.28 +-type(from() :: any()).
2.29 +-type(reason() :: any()).
2.30 +-type(ok_error() :: {ok, state()} | {error, reason(), state()}).
2.31 +
2.32 +-spec(init/1 :: ([any()]) -> {ok, state()}).
2.33 +-spec(handle_consume/3 :: (consume(), pid(), state()) -> ok_error()).
2.34 +-spec(handle_consume_ok/3 :: (consume_ok(), consume(), state()) ->
2.35 + ok_error()).
2.36 +-spec(handle_cancel/2 :: (cancel(), state()) -> ok_error()).
2.37 +-spec(handle_cancel_ok/3 :: (cancel_ok(), cancel(), state()) -> ok_error()).
2.38 +-spec(handle_deliver/3 :: (deliver(), #amqp_msg{}, state()) -> ok_error()).
2.39 +-spec(handle_info/2 :: (any(), state()) -> ok_error()).
2.40 +-spec(handle_call/3 :: (any(), from(), state()) ->
2.41 + {reply, any(), state()} | {noreply, state()} |
2.42 + {error, reason(), state()}).
2.43 +-spec(terminate/2 :: (any(), state()) -> state()).
3.1 --- a/rabbit_common.app.in Mon Jul 18 17:40:41 2011 +0100
3.2 +++ b/rabbit_common.app.in Wed Jul 27 16:03:16 2011 +0100
3.3 @@ -3,6 +3,7 @@
3.4 {vsn, "%%VSN%%"},
3.5 {modules, [
3.6 gen_server2,
3.7 + priority_queue,
3.8 rabbit_backing_queue,
3.9 rabbit_basic,
3.10 rabbit_binary_generator,
4.1 --- a/src/amqp_auth_mechanisms.erl Mon Jul 18 17:40:41 2011 +0100
4.2 +++ b/src/amqp_auth_mechanisms.erl Wed Jul 27 16:03:16 2011 +0100
4.3 @@ -14,6 +14,7 @@
4.4 %% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
4.5 %%
4.6
4.7 +%% @private
4.8 -module(amqp_auth_mechanisms).
4.9
4.10 -include("amqp_client.hrl").
5.1 --- a/src/amqp_channel.erl Mon Jul 18 17:40:41 2011 +0100
5.2 +++ b/src/amqp_channel.erl Wed Jul 27 16:03:16 2011 +0100
5.3 @@ -66,27 +66,26 @@
5.4
5.5 -behaviour(gen_server).
5.6
5.7 --export([start_link/4, connection_closing/3, open/1]).
5.8 --export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
5.9 - handle_info/2]).
5.10 -export([call/2, call/3, cast/2, cast/3]).
5.11 --export([subscribe/3]).
5.12 -export([close/1, close/3]).
5.13 -export([register_return_handler/2, register_flow_handler/2,
5.14 register_confirm_handler/2]).
5.15 +-export([call_consumer/2, subscribe/3]).
5.16 -export([next_publish_seqno/1, wait_for_confirms/1,
5.17 wait_for_confirms_or_die/1]).
5.18 --export([register_default_consumer/2]).
5.19 +-export([start_link/5, connection_closing/3, open/1]).
5.20 +
5.21 +-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
5.22 + handle_info/2]).
5.23
5.24 -define(TIMEOUT_FLUSH, 60000).
5.25 -define(TIMEOUT_CLOSE_OK, 3000).
5.26
5.27 -record(state, {number,
5.28 connection,
5.29 + consumer,
5.30 driver,
5.31 rpc_requests = queue:new(),
5.32 - anon_sub_requests = queue:new(),
5.33 - tagged_sub_requests = dict:new(),
5.34 closing = false, %% false |
5.35 %% {just_channel, Reason} |
5.36 %% {connection, Reason}
5.37 @@ -96,8 +95,6 @@
5.38 next_pub_seqno = 0,
5.39 flow_active = true,
5.40 flow_handler_pid = none,
5.41 - consumers = dict:new(),
5.42 - default_consumer = none,
5.43 start_writer_fun,
5.44 unconfirmed_set = gb_sets:new(),
5.45 waiting_set = gb_trees:empty(),
5.46 @@ -134,7 +131,7 @@
5.47 %% @spec (Channel, Method) -> Result
5.48 %% @doc This is equivalent to amqp_channel:call(Channel, Method, none).
5.49 call(Channel, Method) ->
5.50 - gen_server:call(Channel, {call, Method, none}, infinity).
5.51 + gen_server:call(Channel, {call, Method, none, self()}, infinity).
5.52
5.53 %% @spec (Channel, Method, Content) -> Result
5.54 %% where
5.55 @@ -157,12 +154,12 @@
5.56 %% the broker. It does not necessarily imply that the broker has
5.57 %% accepted responsibility for the message.
5.58 call(Channel, Method, Content) ->
5.59 - gen_server:call(Channel, {call, Method, Content}, infinity).
5.60 + gen_server:call(Channel, {call, Method, Content, self()}, infinity).
5.61
5.62 %% @spec (Channel, Method) -> ok
5.63 %% @doc This is equivalent to amqp_channel:cast(Channel, Method, none).
5.64 cast(Channel, Method) ->
5.65 - gen_server:cast(Channel, {cast, Method, none}).
5.66 + gen_server:cast(Channel, {cast, Method, none, self()}).
5.67
5.68 %% @spec (Channel, Method, Content) -> ok
5.69 %% where
5.70 @@ -174,7 +171,7 @@
5.71 %% This function is not recommended with synchronous methods, since there is no
5.72 %% way to verify that the server has received the method.
5.73 cast(Channel, Method, Content) ->
5.74 - gen_server:cast(Channel, {cast, Method, Content}).
5.75 + gen_server:cast(Channel, {cast, Method, Content, self()}).
5.76
5.77 %% @spec (Channel) -> ok | closing
5.78 %% where
5.79 @@ -220,25 +217,6 @@
5.80 wait_for_confirms_or_die(Channel) ->
5.81 gen_server:call(Channel, {wait_for_confirms_or_die, self()}, infinity).
5.82
5.83 -%%---------------------------------------------------------------------------
5.84 -%% Consumer registration (API)
5.85 -%%---------------------------------------------------------------------------
5.86 -
5.87 -%% @type consume() = #'basic.consume'{}.
5.88 -%% The AMQP method that is used to subscribe a consumer to a queue.
5.89 -%% @spec (Channel, consume(), Consumer) -> amqp_method()
5.90 -%% where
5.91 -%% Channel = pid()
5.92 -%% Consumer = pid()
5.93 -%% @doc Creates a subscription to a queue. This subscribes a consumer pid to
5.94 -%% the queue defined in the #'basic.consume'{} method record. Note that
5.95 -%% both the process invoking this method and the supplied consumer process
5.96 -%% receive an acknowledgement of the subscription. The calling process will
5.97 -%% receive the acknowledgement as the return value of this function, whereas
5.98 -%% the consumer process will receive the notification asynchronously.
5.99 -subscribe(Channel, BasicConsume = #'basic.consume'{}, Consumer) ->
5.100 - gen_server:call(Channel, {subscribe, BasicConsume, Consumer}, infinity).
5.101 -
5.102 %% @spec (Channel, ReturnHandler) -> ok
5.103 %% where
5.104 %% Channel = pid()
5.105 @@ -268,40 +246,34 @@
5.106 register_flow_handler(Channel, FlowHandler) ->
5.107 gen_server:cast(Channel, {register_flow_handler, FlowHandler} ).
5.108
5.109 -%% @spec (Channel, Consumer) -> ok
5.110 +%% @spec (Channel, Msg) -> ok
5.111 %% where
5.112 %% Channel = pid()
5.113 -%% Consumer = pid()
5.114 -%% @doc Set the current default consumer.
5.115 -%% Under certain circumstances it is possible for a channel to receive a
5.116 -%% message delivery which does not match any consumer which is currently
5.117 -%% set up via basic.consume. This will occur after the following sequence
5.118 -%% of events:<br/>
5.119 -%% <br/>
5.120 -%% basic.consume with explicit acks<br/>
5.121 -%% %% some deliveries take place but are not acked<br/>
5.122 -%% basic.cancel<br/>
5.123 -%% basic.recover{requeue = false}<br/>
5.124 -%% <br/>
5.125 -%% Since requeue is specified to be false in the basic.recover, the spec
5.126 -%% states that the message must be redelivered to "the original recipient"
5.127 -%% - i.e. the same channel / consumer-tag. But the consumer is no longer
5.128 -%% active.<br/>
5.129 -%% In these circumstances, you can register a default consumer to handle
5.130 -%% such deliveries. If no default consumer is registered then the channel
5.131 -%% will exit on receiving such a delivery.<br/>
5.132 -%% Most people will not need to use this.
5.133 -register_default_consumer(Channel, Consumer) ->
5.134 - gen_server:cast(Channel, {register_default_consumer, Consumer}).
5.135 +%% Msg = any()
5.136 +%% @doc This causes the channel to invoke Consumer:handle_call/2,
5.137 +%% where Consumer is the amqp_gen_consumer implementation registered with
5.138 +%% the channel.
5.139 +call_consumer(Channel, Msg) ->
5.140 + gen_server:call(Channel, {call_consumer, Msg}, infinity).
5.141 +
5.142 +%% @spec (Channel, BasicConsume, Subscriber) -> ok
5.143 +%% where
5.144 +%% Channel = pid()
5.145 +%% BasicConsume = amqp_method()
5.146 +%% Subscriber = pid()
5.147 +%% @doc Subscribe the given pid to a queue using the specified
5.148 +%% basic.consume method.
5.149 +subscribe(Channel, BasicConsume = #'basic.consume'{}, Subscriber) ->
5.150 + gen_server:call(Channel, {subscribe, BasicConsume, Subscriber}, infinity).
5.151
5.152 %%---------------------------------------------------------------------------
5.153 %% Internal interface
5.154 %%---------------------------------------------------------------------------
5.155
5.156 %% @private
5.157 -start_link(Driver, Connection, ChannelNumber, SWF) ->
5.158 - gen_server:start_link(?MODULE,
5.159 - [Driver, Connection, ChannelNumber, SWF], []).
5.160 +start_link(Driver, Connection, ChannelNumber, Consumer, SWF) ->
5.161 + gen_server:start_link(
5.162 + ?MODULE, [Driver, Connection, ChannelNumber, Consumer, SWF], []).
5.163
5.164 %% @private
5.165 connection_closing(Pid, ChannelCloseType, Reason) ->
5.166 @@ -316,24 +288,22 @@
5.167 %%---------------------------------------------------------------------------
5.168
5.169 %% @private
5.170 -init([Driver, Connection, ChannelNumber, SWF]) ->
5.171 +init([Driver, Connection, ChannelNumber, Consumer, SWF]) ->
5.172 {ok, #state{connection = Connection,
5.173 driver = Driver,
5.174 number = ChannelNumber,
5.175 + consumer = Consumer,
5.176 start_writer_fun = SWF}}.
5.177
5.178 %% @private
5.179 handle_call(open, From, State) ->
5.180 - {noreply, rpc_top_half(#'channel.open'{}, none, From, State)};
5.181 + {noreply, rpc_top_half(#'channel.open'{}, none, From, none, State)};
5.182 %% @private
5.183 handle_call({close, Code, Text}, From, State) ->
5.184 handle_close(Code, Text, From, State);
5.185 %% @private
5.186 -handle_call({call, Method, AmqpMsg}, From, State) ->
5.187 - handle_method_to_server(Method, AmqpMsg, From, State);
5.188 -%% @private
5.189 -handle_call({subscribe, Method, Consumer}, From, State) ->
5.190 - handle_subscribe(Method, Consumer, From, State);
5.191 +handle_call({call, Method, AmqpMsg, Sender}, From, State) ->
5.192 + handle_method_to_server(Method, AmqpMsg, From, Sender, State);
5.193 %% Handles the delivery of messages from a direct channel
5.194 %% @private
5.195 handle_call({send_command_sync, Method, Content}, From, State) ->
5.196 @@ -346,8 +316,6 @@
5.197 Ret = handle_method_from_server(Method, none, State),
5.198 gen_server:reply(From, ok),
5.199 Ret;
5.200 -%% When in confirm mode, returns the sequence number of the next
5.201 -%% message to be published.
5.202 %% @private
5.203 handle_call(next_publish_seqno, _From,
5.204 State = #state{next_pub_seqno = SeqNo}) ->
5.205 @@ -359,31 +327,30 @@
5.206 %% Lets the channel know that the process should be sent an exit
5.207 %% signal if a nack is received.
5.208 handle_call({wait_for_confirms_or_die, Pid}, From, State) ->
5.209 - handle_wait_for_confirms(From, Pid, ok, State).
5.210 + handle_wait_for_confirms(From, Pid, ok, State);
5.211 +%% @private
5.212 +handle_call({call_consumer, Msg}, _From,
5.213 + State = #state{consumer = Consumer}) ->
5.214 + {reply, amqp_gen_consumer:call_consumer(Consumer, Msg), State};
5.215 +%% @private
5.216 +handle_call({subscribe, BasicConsume, Subscriber}, From, State) ->
5.217 + handle_method_to_server(BasicConsume, none, From, Subscriber, State).
5.218
5.219 %% @private
5.220 -handle_cast({cast, Method, AmqpMsg}, State) ->
5.221 - handle_method_to_server(Method, AmqpMsg, none, State);
5.222 -%% Registers a handler to process return messages
5.223 +handle_cast({cast, Method, AmqpMsg, Sender}, State) ->
5.224 + handle_method_to_server(Method, AmqpMsg, none, Sender, State);
5.225 %% @private
5.226 handle_cast({register_return_handler, ReturnHandler}, State) ->
5.227 erlang:monitor(process, ReturnHandler),
5.228 {noreply, State#state{return_handler_pid = ReturnHandler}};
5.229 -%% Registers a handler to process ack and nack messages
5.230 %% @private
5.231 handle_cast({register_confirm_handler, ConfirmHandler}, State) ->
5.232 erlang:monitor(process, ConfirmHandler),
5.233 {noreply, State#state{confirm_handler_pid = ConfirmHandler}};
5.234 -%% Registers a handler to process flow control messages
5.235 %% @private
5.236 handle_cast({register_flow_handler, FlowHandler}, State) ->
5.237 erlang:monitor(process, FlowHandler),
5.238 {noreply, State#state{flow_handler_pid = FlowHandler}};
5.239 -%% Registers a handler to process unexpected deliveries
5.240 -%% @private
5.241 -handle_cast({register_default_consumer, Consumer}, State) ->
5.242 - erlang:monitor(process, Consumer),
5.243 - {noreply, State#state{default_consumer = Consumer}};
5.244 %% Received from channels manager
5.245 %% @private
5.246 handle_cast({method, Method, Content}, State) ->
5.247 @@ -447,17 +414,11 @@
5.248 State = #state{flow_handler_pid = FlowHandler}) ->
5.249 ?LOG_WARN("Channel (~p): Unregistering flow handler ~p because it died. "
5.250 "Reason: ~p~n", [self(), FlowHandler, Reason]),
5.251 - {noreply, State#state{flow_handler_pid = none}};
5.252 -%% @private
5.253 -handle_info({'DOWN', _, process, DefaultConsumer, Reason},
5.254 - State = #state{default_consumer = DefaultConsumer}) ->
5.255 - ?LOG_WARN("Channel (~p): Unregistering default consumer ~p because it died."
5.256 - "Reason: ~p~n", [self(), DefaultConsumer, Reason]),
5.257 - {noreply, State#state{default_consumer = none}}.
5.258 + {noreply, State#state{flow_handler_pid = none}}.
5.259
5.260 %% @private
5.261 -terminate(_Reason, _State) ->
5.262 - ok.
5.263 +terminate(_Reason, State) ->
5.264 + State.
5.265
5.266 %% @private
5.267 code_change(_OldVsn, State, _Extra) ->
5.268 @@ -467,7 +428,7 @@
5.269 %% RPC mechanism
5.270 %%---------------------------------------------------------------------------
5.271
5.272 -handle_method_to_server(Method, AmqpMsg, From,
5.273 +handle_method_to_server(Method, AmqpMsg, From, Sender,
5.274 State = #state{unconfirmed_set = USet}) ->
5.275 case {check_invalid_method(Method), From,
5.276 check_block(Method, AmqpMsg, State)} of
5.277 @@ -484,8 +445,8 @@
5.278 _ ->
5.279 State
5.280 end,
5.281 - {noreply,
5.282 - rpc_top_half(Method, build_content(AmqpMsg), From, State1)};
5.283 + {noreply, rpc_top_half(Method, build_content(AmqpMsg),
5.284 + From, Sender, State1)};
5.285 {ok, none, BlockReply} ->
5.286 ?LOG_WARN("Channel (~p): discarding method ~p in cast.~n"
5.287 "Reason: ~p~n", [self(), Method, BlockReply]),
5.288 @@ -506,53 +467,22 @@
5.289 class_id = 0,
5.290 method_id = 0},
5.291 case check_block(Close, none, State) of
5.292 - ok -> {noreply, rpc_top_half(Close, none, From, State)};
5.293 + ok -> {noreply, rpc_top_half(Close, none, From, none, State)};
5.294 BlockReply -> {reply, BlockReply, State}
5.295 end.
5.296
5.297 -handle_subscribe(#'basic.consume'{consumer_tag = Tag, nowait = NoWait} = Method,
5.298 - Consumer,
5.299 - From, State = #state{tagged_sub_requests = Tagged,
5.300 - anon_sub_requests = Anon,
5.301 - consumers = Consumers}) ->
5.302 - case check_block(Method, none, State) of
5.303 - ok when Tag =:= undefined orelse size(Tag) == 0 ->
5.304 - case NoWait of
5.305 - true ->
5.306 - {reply, {error, command_invalid}, State};
5.307 - false ->
5.308 - NewMethod = Method#'basic.consume'{consumer_tag = <<"">>},
5.309 - NewState = State#state{anon_sub_requests =
5.310 - queue:in(Consumer, Anon)},
5.311 - {noreply, rpc_top_half(NewMethod, none, From, NewState)}
5.312 - end;
5.313 - ok when is_binary(Tag) andalso size(Tag) >= 0 ->
5.314 - case dict:is_key(Tag, Tagged) orelse dict:is_key(Tag, Consumers) of
5.315 - true ->
5.316 - {reply, {error, consumer_tag_already_in_use}, State};
5.317 - false when NoWait ->
5.318 - NewState = register_consumer(Tag, Consumer, State),
5.319 - {reply, ok, rpc_top_half(Method, none, none, NewState)};
5.320 - false ->
5.321 - NewState = State#state{tagged_sub_requests =
5.322 - dict:store(Tag, Consumer, Tagged)},
5.323 - {noreply, rpc_top_half(Method, none, From, NewState)}
5.324 - end;
5.325 - BlockReply ->
5.326 - {reply, BlockReply, State}
5.327 - end.
5.328 -
5.329 -rpc_top_half(Method, Content, From,
5.330 +rpc_top_half(Method, Content, From, Sender,
5.331 State0 = #state{rpc_requests = RequestQueue}) ->
5.332 State1 = State0#state{
5.333 - rpc_requests = queue:in({From, Method, Content}, RequestQueue)},
5.334 + rpc_requests =
5.335 + queue:in({From, Sender, Method, Content}, RequestQueue)},
5.336 IsFirstElement = queue:is_empty(RequestQueue),
5.337 if IsFirstElement -> do_rpc(State1);
5.338 true -> State1
5.339 end.
5.340
5.341 rpc_bottom_half(Reply, State = #state{rpc_requests = RequestQueue}) ->
5.342 - {{value, {From, _Method, _Content}}, RequestQueue1} =
5.343 + {{value, {From, _Sender, _Method, _Content}}, RequestQueue1} =
5.344 queue:out(RequestQueue),
5.345 case From of
5.346 none -> ok;
5.347 @@ -563,8 +493,8 @@
5.348 do_rpc(State = #state{rpc_requests = Q,
5.349 closing = Closing}) ->
5.350 case queue:out(Q) of
5.351 - {{value, {From, Method, Content}}, NewQ} ->
5.352 - State1 = pre_do(Method, Content, State),
5.353 + {{value, {From, Sender, Method, Content}}, NewQ} ->
5.354 + State1 = pre_do(Method, Content, Sender, State),
5.355 DoRet = do(Method, Content, State1),
5.356 case ?PROTOCOL:is_method_synchronous(Method) of
5.357 true -> State1;
5.358 @@ -588,12 +518,19 @@
5.359 State#state{rpc_requests = NewQ}
5.360 end.
5.361
5.362 -pre_do(#'channel.open'{}, none, State) ->
5.363 +pending_rpc_method(#state{rpc_requests = Q}) ->
5.364 + {value, {_From, _Sender, Method, _Content}} = queue:peek(Q),
5.365 + Method.
5.366 +
5.367 +pre_do(#'channel.open'{}, none, _Sender, State) ->
5.368 start_writer(State);
5.369 pre_do(#'channel.close'{reply_code = Code, reply_text = Text}, none,
5.370 - State) ->
5.371 + _Sender, State) ->
5.372 State#state{closing = {just_channel, {app_initiated_close, Code, Text}}};
5.373 -pre_do(_, _, State) ->
5.374 +pre_do(#'basic.consume'{} = Method, none, Sender, State) ->
5.375 + ok = call_to_consumer(Method, Sender, State),
5.376 + State;
5.377 +pre_do(_, _, _, State) ->
5.378 State.
5.379
5.380 %%---------------------------------------------------------------------------
5.381 @@ -651,29 +588,20 @@
5.382 {connection, Reason} ->
5.383 handle_shutdown({connection_closing, Reason}, State)
5.384 end;
5.385 -handle_method_from_server1(
5.386 - #'basic.consume_ok'{consumer_tag = ConsumerTag} = ConsumeOk,
5.387 - none, State = #state{tagged_sub_requests = Tagged,
5.388 - anon_sub_requests = Anon}) ->
5.389 - {Consumer, State0} =
5.390 - case dict:find(ConsumerTag, Tagged) of
5.391 - {ok, C} ->
5.392 - NewTagged = dict:erase(ConsumerTag, Tagged),
5.393 - {C, State#state{tagged_sub_requests = NewTagged}};
5.394 - error ->
5.395 - {{value, C}, NewAnon} = queue:out(Anon),
5.396 - {C, State#state{anon_sub_requests = NewAnon}}
5.397 - end,
5.398 - Consumer ! ConsumeOk,
5.399 - State1 = register_consumer(ConsumerTag, Consumer, State0),
5.400 - {noreply, rpc_bottom_half(ConsumeOk, State1)};
5.401 -handle_method_from_server1(
5.402 - #'basic.cancel_ok'{consumer_tag = ConsumerTag} = CancelOk, none,
5.403 - State) ->
5.404 - Consumer = resolve_consumer(ConsumerTag, State),
5.405 - Consumer ! CancelOk,
5.406 - NewState = unregister_consumer(ConsumerTag, State),
5.407 - {noreply, rpc_bottom_half(CancelOk, NewState)};
5.408 +handle_method_from_server1(#'basic.consume_ok'{} = ConsumeOk, none, State) ->
5.409 + Consume = #'basic.consume'{} = pending_rpc_method(State),
5.410 + ok = call_to_consumer(ConsumeOk, Consume, State),
5.411 + {noreply, rpc_bottom_half(ConsumeOk, State)};
5.412 +handle_method_from_server1(#'basic.cancel_ok'{} = CancelOk, none, State) ->
5.413 + Cancel = #'basic.cancel'{} = pending_rpc_method(State),
5.414 + ok = call_to_consumer(CancelOk, Cancel, State),
5.415 + {noreply, rpc_bottom_half(CancelOk, State)};
5.416 +handle_method_from_server1(#'basic.cancel'{} = Cancel, none, State) ->
5.417 + ok = call_to_consumer(Cancel, none, State),
5.418 + {noreply, State};
5.419 +handle_method_from_server1(#'basic.deliver'{} = Deliver, AmqpMsg, State) ->
5.420 + ok = call_to_consumer(Deliver, AmqpMsg, State),
5.421 + {noreply, State};
5.422 handle_method_from_server1(#'channel.flow'{active = Active} = Flow, none,
5.423 State = #state{flow_handler_pid = FlowHandler}) ->
5.424 case FlowHandler of none -> ok;
5.425 @@ -683,13 +611,7 @@
5.426 %% flushed beforehand. Methods that made it to the queue are not
5.427 %% blocked in any circumstance.
5.428 {noreply, rpc_top_half(#'channel.flow_ok'{active = Active}, none, none,
5.429 - State#state{flow_active = Active})};
5.430 -handle_method_from_server1(
5.431 - #'basic.deliver'{consumer_tag = ConsumerTag} = Deliver, AmqpMsg,
5.432 - State) ->
5.433 - Consumer = resolve_consumer(ConsumerTag, State),
5.434 - Consumer ! {Deliver, AmqpMsg},
5.435 - {noreply, State};
5.436 + none, State#state{flow_active = Active})};
5.437 handle_method_from_server1(
5.438 #'basic.return'{} = BasicReturn, AmqpMsg,
5.439 State = #state{return_handler_pid = ReturnHandler}) ->
5.440 @@ -700,19 +622,14 @@
5.441 _ -> ReturnHandler ! {BasicReturn, AmqpMsg}
5.442 end,
5.443 {noreply, State};
5.444 -handle_method_from_server1(#'basic.cancel'{consumer_tag = ConsumerTag} = Death,
5.445 - none, State) ->
5.446 - Consumer = resolve_consumer(ConsumerTag, State),
5.447 - Consumer ! Death,
5.448 - NewState = unregister_consumer(ConsumerTag, State),
5.449 - {noreply, NewState};
5.450 handle_method_from_server1(#'basic.ack'{} = BasicAck, none,
5.451 #state{confirm_handler_pid = none} = State) ->
5.452 ?LOG_WARN("Channel (~p): received ~p but there is no "
5.453 "confirm handler registered~n", [self(), BasicAck]),
5.454 {noreply, update_confirm_set(BasicAck, State)};
5.455 -handle_method_from_server1(#'basic.ack'{} = BasicAck, none,
5.456 - #state{confirm_handler_pid = ConfirmHandler} = State) ->
5.457 +handle_method_from_server1(
5.458 + #'basic.ack'{} = BasicAck, none,
5.459 + #state{confirm_handler_pid = ConfirmHandler} = State) ->
5.460 ConfirmHandler ! BasicAck,
5.461 {noreply, update_confirm_set(BasicAck, State)};
5.462 handle_method_from_server1(#'basic.nack'{} = BasicNack, none,
5.463 @@ -720,8 +637,9 @@
5.464 ?LOG_WARN("Channel (~p): received ~p but there is no "
5.465 "confirm handler registered~n", [self(), BasicNack]),
5.466 {noreply, update_confirm_set(BasicNack, handle_nack(State))};
5.467 -handle_method_from_server1(#'basic.nack'{} = BasicNack, none,
5.468 - #state{confirm_handler_pid = ConfirmHandler} = State) ->
5.469 +handle_method_from_server1(
5.470 + #'basic.nack'{} = BasicNack, none,
5.471 + #state{confirm_handler_pid = ConfirmHandler} = State) ->
5.472 ConfirmHandler ! BasicNack,
5.473 {noreply, update_confirm_set(BasicNack, handle_nack(State))};
5.474
5.475 @@ -798,30 +716,6 @@
5.476 {ok, Writer} = SWF(),
5.477 State#state{writer = Writer}.
5.478
5.479 -resolve_consumer(_ConsumerTag, #state{consumers = []}) ->
5.480 - exit(no_consumers_registered);
5.481 -resolve_consumer(ConsumerTag, #state{consumers = Consumers,
5.482 - default_consumer = DefaultConsumer}) ->
5.483 - case dict:find(ConsumerTag, Consumers) of
5.484 - {ok, Value} ->
5.485 - Value;
5.486 - error ->
5.487 - case is_pid(DefaultConsumer) of
5.488 - true -> DefaultConsumer;
5.489 - false -> exit(unexpected_delivery_and_no_default_consumer)
5.490 - end
5.491 - end.
5.492 -
5.493 -register_consumer(ConsumerTag, Consumer,
5.494 - State = #state{consumers = Consumers0}) ->
5.495 - Consumers1 = dict:store(ConsumerTag, Consumer, Consumers0),
5.496 - State#state{consumers = Consumers1}.
5.497 -
5.498 -unregister_consumer(ConsumerTag,
5.499 - State = #state{consumers = Consumers0}) ->
5.500 - Consumers1 = dict:erase(ConsumerTag, Consumers0),
5.501 - State#state{consumers = Consumers1}.
5.502 -
5.503 amqp_msg(none) ->
5.504 none;
5.505 amqp_msg(Content) ->
5.506 @@ -849,8 +743,6 @@
5.507 "Use amqp_connection:open_channel/{1,2} instead"};
5.508 check_invalid_method(#'channel.close'{}) ->
5.509 {use_close_function, "Use close/{1,3} instead"};
5.510 -check_invalid_method(#'basic.consume'{}) ->
5.511 - {use_subscribe_function, "Use subscribe/3 instead"};
5.512 check_invalid_method(Method) ->
5.513 case is_connection_method(Method) of
5.514 true -> {connection_methods_not_allowed,
5.515 @@ -912,3 +804,6 @@
5.516 false -> {noreply, State#state{waiting_set =
5.517 gb_trees:insert(From, Notify, WSet)}}
5.518 end.
5.519 +
5.520 +call_to_consumer(Method, Args, #state{consumer = Consumer}) ->
5.521 + amqp_gen_consumer:call_consumer(Consumer, Method, Args).
6.1 --- a/src/amqp_channel_sup.erl Mon Jul 18 17:40:41 2011 +0100
6.2 +++ b/src/amqp_channel_sup.erl Wed Jul 27 16:03:16 2011 +0100
6.3 @@ -21,21 +21,22 @@
6.4
6.5 -behaviour(supervisor2).
6.6
6.7 --export([start_link/4]).
6.8 +-export([start_link/5]).
6.9 -export([init/1]).
6.10
6.11 %%---------------------------------------------------------------------------
6.12 %% Interface
6.13 %%---------------------------------------------------------------------------
6.14
6.15 -start_link(Type, Connection, InfraArgs, ChNumber) ->
6.16 - {ok, Sup} = supervisor2:start_link(?MODULE, []),
6.17 +start_link(Type, Connection, InfraArgs, ChNumber, Consumer = {_, _}) ->
6.18 + {ok, Sup} = supervisor2:start_link(?MODULE, [Consumer]),
6.19 + [{gen_consumer, ConsumerPid, _, _}] = supervisor2:which_children(Sup),
6.20 {ok, ChPid} = supervisor2:start_child(
6.21 Sup, {channel, {amqp_channel, start_link,
6.22 - [Type, Connection, ChNumber,
6.23 + [Type, Connection, ChNumber, ConsumerPid,
6.24 start_writer_fun(Sup, Type, InfraArgs,
6.25 ChNumber)]},
6.26 - intrinsic, brutal_kill, worker, [amqp_channel]}),
6.27 + intrinsic, ?MAX_WAIT, worker, [amqp_channel]}),
6.28 {ok, AState} = init_command_assembler(Type),
6.29 {ok, Sup, {ChPid, AState}}.
6.30
6.31 @@ -60,7 +61,7 @@
6.32 {writer, {rabbit_writer, start_link,
6.33 [Sock, ChNumber, ?FRAME_MIN_SIZE, ?PROTOCOL,
6.34 self()]},
6.35 - transient, ?MAX_WAIT, worker, [rabbit_writer]})
6.36 + intrinsic, ?MAX_WAIT, worker, [rabbit_writer]})
6.37 end.
6.38
6.39 init_command_assembler(direct) -> {ok, none};
6.40 @@ -70,5 +71,8 @@
6.41 %% supervisor2 callbacks
6.42 %%---------------------------------------------------------------------------
6.43
6.44 -init([]) ->
6.45 - {ok, {{one_for_all, 0, 1}, []}}.
6.46 +init([{ConsumerModule, ConsumerArgs}]) ->
6.47 + {ok, {{one_for_all, 0, 1},
6.48 + [{gen_consumer, {amqp_gen_consumer, start_link,
6.49 + [ConsumerModule, ConsumerArgs]},
6.50 + intrinsic, ?MAX_WAIT, worker, [amqp_gen_consumer]}]}}.
7.1 --- a/src/amqp_channel_sup_sup.erl Mon Jul 18 17:40:41 2011 +0100
7.2 +++ b/src/amqp_channel_sup_sup.erl Wed Jul 27 16:03:16 2011 +0100
7.3 @@ -21,7 +21,7 @@
7.4
7.5 -behaviour(supervisor2).
7.6
7.7 --export([start_link/2, start_channel_sup/3]).
7.8 +-export([start_link/2, start_channel_sup/4]).
7.9 -export([init/1]).
7.10
7.11 %%---------------------------------------------------------------------------
7.12 @@ -31,8 +31,8 @@
7.13 start_link(Type, Connection) ->
7.14 supervisor2:start_link(?MODULE, [Type, Connection]).
7.15
7.16 -start_channel_sup(Sup, InfraArgs, ChannelNumber) ->
7.17 - supervisor2:start_child(Sup, [InfraArgs, ChannelNumber]).
7.18 +start_channel_sup(Sup, InfraArgs, ChannelNumber, Consumer) ->
7.19 + supervisor2:start_child(Sup, [InfraArgs, ChannelNumber, Consumer]).
7.20
7.21 %%---------------------------------------------------------------------------
7.22 %% supervisor2 callbacks
8.1 --- a/src/amqp_channels_manager.erl Mon Jul 18 17:40:41 2011 +0100
8.2 +++ b/src/amqp_channels_manager.erl Wed Jul 27 16:03:16 2011 +0100
8.3 @@ -21,7 +21,7 @@
8.4
8.5 -behaviour(gen_server).
8.6
8.7 --export([start_link/2, open_channel/3, set_channel_max/2, is_empty/1,
8.8 +-export([start_link/2, open_channel/4, set_channel_max/2, is_empty/1,
8.9 num_channels/1, pass_frame/3, signal_connection_closing/3]).
8.10 -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
8.11 handle_info/2]).
8.12 @@ -40,8 +40,9 @@
8.13 start_link(Connection, ChSupSup) ->
8.14 gen_server:start_link(?MODULE, [Connection, ChSupSup], []).
8.15
8.16 -open_channel(ChMgr, ProposedNumber, InfraArgs) ->
8.17 - gen_server:call(ChMgr, {open_channel, ProposedNumber, InfraArgs}, infinity).
8.18 +open_channel(ChMgr, ProposedNumber, Consumer, InfraArgs) ->
8.19 + gen_server:call(ChMgr, {open_channel, ProposedNumber, Consumer, InfraArgs},
8.20 + infinity).
8.21
8.22 set_channel_max(ChMgr, ChannelMax) ->
8.23 gen_server:cast(ChMgr, {set_channel_max, ChannelMax}).
8.24 @@ -71,9 +72,9 @@
8.25 code_change(_OldVsn, State, _Extra) ->
8.26 State.
8.27
8.28 -handle_call({open_channel, ProposedNumber, InfraArgs}, _,
8.29 +handle_call({open_channel, ProposedNumber, Consumer, InfraArgs}, _,
8.30 State = #state{closing = false}) ->
8.31 - handle_open_channel(ProposedNumber, InfraArgs, State);
8.32 + handle_open_channel(ProposedNumber, Consumer, InfraArgs, State);
8.33 handle_call(is_empty, _, State) ->
8.34 {reply, internal_is_empty(State), State};
8.35 handle_call(num_channels, _, State) ->
8.36 @@ -93,13 +94,13 @@
8.37 %% Internal plumbing
8.38 %%---------------------------------------------------------------------------
8.39
8.40 -handle_open_channel(ProposedNumber, InfraArgs,
8.41 +handle_open_channel(ProposedNumber, Consumer, InfraArgs,
8.42 State = #state{channel_sup_sup = ChSupSup}) ->
8.43 case new_number(ProposedNumber, State) of
8.44 {ok, Number} ->
8.45 {ok, _ChSup, {Ch, AState}} =
8.46 amqp_channel_sup_sup:start_channel_sup(ChSupSup, InfraArgs,
8.47 - Number),
8.48 + Number, Consumer),
8.49 NewState = internal_register(Number, Ch, AState, State),
8.50 erlang:monitor(process, Ch),
8.51 {reply, {ok, Ch}, NewState};
9.1 --- a/src/amqp_connection.erl Mon Jul 18 17:40:41 2011 +0100
9.2 +++ b/src/amqp_connection.erl Wed Jul 27 16:03:16 2011 +0100
9.3 @@ -69,7 +69,7 @@
9.4
9.5 -include("amqp_client.hrl").
9.6
9.7 --export([open_channel/1, open_channel/2]).
9.8 +-export([open_channel/1, open_channel/2, open_channel/3]).
9.9 -export([start/1]).
9.10 -export([close/1, close/3]).
9.11 -export([info/2, info_keys/1, info_keys/0]).
9.12 @@ -155,25 +155,46 @@
9.13 %% Commands
9.14 %%---------------------------------------------------------------------------
9.15
9.16 -%% @doc Invokes open_channel(ConnectionPid, none).
9.17 +%% @doc Invokes open_channel(ConnectionPid, none, ?DEFAULT_CONSUMER).
9.18 +%% Opens a channel without having to specify a channel number. This uses the
9.19 +%% default consumer implementation.
9.20 +open_channel(ConnectionPid) ->
9.21 + open_channel(ConnectionPid, none, ?DEFAULT_CONSUMER).
9.22 +
9.23 +%% @doc Invokes open_channel(ConnectionPid, none, Consumer).
9.24 %% Opens a channel without having to specify a channel number.
9.25 -open_channel(ConnectionPid) ->
9.26 - open_channel(ConnectionPid, none).
9.27 +open_channel(ConnectionPid, {_, _} = Consumer) ->
9.28 + open_channel(ConnectionPid, none, Consumer);
9.29
9.30 -%% @spec (ConnectionPid, ChannelNumber) -> {ok, ChannelPid} | {error, Error}
9.31 +%% @doc Invokes open_channel(ConnectionPid, ChannelNumber, ?DEFAULT_CONSUMER).
9.32 +%% Opens a channel, using the default consumer implementation.
9.33 +open_channel(ConnectionPid, ChannelNumber)
9.34 + when is_number(ChannelNumber) orelse ChannelNumber =:= none ->
9.35 + open_channel(ConnectionPid, ChannelNumber, ?DEFAULT_CONSUMER).
9.36 +
9.37 +%% @spec (ConnectionPid, ChannelNumber, Consumer) -> Result
9.38 %% where
9.39 +%% ConnectionPid = pid()
9.40 %% ChannelNumber = pos_integer() | 'none'
9.41 -%% ConnectionPid = pid()
9.42 +%% Consumer = {ConsumerModule, ConsumerArgs}
9.43 +%% ConsumerModule = atom()
9.44 +%% ConsumerArgs = [any()]
9.45 +%% Result = {ok, ChannelPid} | {error, Error}
9.46 %% ChannelPid = pid()
9.47 %% @doc Opens an AMQP channel.<br/>
9.48 +%% Opens a channel, using a proposed channel number and a specific consumer
9.49 +%% implementation.<br/>
9.50 +%% ConsumerModule must implement the amqp_gen_consumer behaviour. ConsumerArgs
9.51 +%% is passed as parameter to ConsumerModule:init/1.<br/>
9.52 %% This function assumes that an AMQP connection (networked or direct)
9.53 %% has already been successfully established.<br/>
9.54 %% ChannelNumber must be less than or equal to the negotiated max_channel value,
9.55 %% or less than or equal to ?MAX_CHANNEL_NUMBER if the negotiated max_channel
9.56 %% value is 0.<br/>
9.57 %% In the direct connection, max_channel is always 0.
9.58 -open_channel(ConnectionPid, ChannelNumber) ->
9.59 - amqp_gen_connection:open_channel(ConnectionPid, ChannelNumber).
9.60 +open_channel(ConnectionPid, ChannelNumber,
9.61 + {_ConsumerModule, _ConsumerArgs} = Consumer) ->
9.62 + amqp_gen_connection:open_channel(ConnectionPid, ChannelNumber, Consumer).
9.63
9.64 %% @spec (ConnectionPid) -> ok | Error
9.65 %% where
10.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
10.2 +++ b/src/amqp_direct_consumer.erl Wed Jul 27 16:03:16 2011 +0100
10.3 @@ -0,0 +1,99 @@
10.4 +%% The contents of this file are subject to the Mozilla Public License
10.5 +%% Version 1.1 (the "License"); you may not use this file except in
10.6 +%% compliance with the License. You may obtain a copy of the License at
10.7 +%% http://www.mozilla.org/MPL/
10.8 +%%
10.9 +%% Software distributed under the License is distributed on an "AS IS"
10.10 +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
10.11 +%% License for the specific language governing rights and limitations
10.12 +%% under the License.
10.13 +%%
10.14 +%% The Original Code is RabbitMQ.
10.15 +%%
10.16 +%% The Initial Developer of the Original Code is VMware, Inc.
10.17 +%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved.
10.18 +%%
10.19 +
10.20 +%% @doc This module is an implementation of the amqp_gen_consumer
10.21 +%% behaviour and can be used as part of the Consumer parameter when
10.22 +%% opening AMQP channels.
10.23 +%% <br/>
10.24 +%% <br/>
10.25 +%% The Consumer parameter for this implementation is {{@module},
10.26 +%% [ConsumerPid]@}, where ConsumerPid is a process that will receive
10.27 +%% queue subscription-related messages.<br/>
10.28 +%% <br/>
10.29 +%% This consumer implementation causes the channel to send to the
10.30 +%% ConsumerPid all basic.consume, basic.consume_ok, basic.cancel,
10.31 +%% basic.cancel_ok and basic.deliver messages received from the
10.32 +%% server.
10.33 +%% <br/>
10.34 +%% <br/>
10.35 +%% In addition, this consumer implementation monitors the ConsumerPid
10.36 +%% and exits with the same shutdown reason when it dies. 'DOWN'
10.37 +%% messages from other sources are passed to ConsumerPid.
10.38 +%% <br/>
10.39 +%% Warning! It is not recommended to rely on a consumer on killing off the
10.40 +%% channel (through the exit signal). That may cause messages to get lost.
10.41 +%% Always use amqp_channel:close/{1,3} for a clean shut down.<br/>
10.42 +%% <br/>
10.43 +%% This module has no public functions.
10.44 +-module(amqp_direct_consumer).
10.45 +
10.46 +-include("amqp_gen_consumer_spec.hrl").
10.47 +
10.48 +-behaviour(amqp_gen_consumer).
10.49 +
10.50 +-export([init/1, handle_consume_ok/3, handle_consume/3, handle_cancel_ok/3,
10.51 + handle_cancel/2, handle_deliver/3, handle_info/2, handle_call/3,
10.52 + terminate/2]).
10.53 +
10.54 +%%---------------------------------------------------------------------------
10.55 +%% amqp_gen_consumer callbacks
10.56 +%%---------------------------------------------------------------------------
10.57 +
10.58 +%% @private
10.59 +init([ConsumerPid]) ->
10.60 + monitor(process, ConsumerPid),
10.61 + {ok, ConsumerPid}.
10.62 +
10.63 +%% @private
10.64 +handle_consume(M, A, C) ->
10.65 + C ! {M, A},
10.66 + {ok, C}.
10.67 +
10.68 +%% @private
10.69 +handle_consume_ok(M, _, C) ->
10.70 + C ! M,
10.71 + {ok, C}.
10.72 +
10.73 +%% @private
10.74 +handle_cancel(M, C) ->
10.75 + C ! M,
10.76 + {ok, C}.
10.77 +
10.78 +%% @private
10.79 +handle_cancel_ok(M, _, C) ->
10.80 + C ! M,
10.81 + {ok, C}.
10.82 +
10.83 +%% @private
10.84 +handle_deliver(M, A, C) ->
10.85 + C ! {M, A},
10.86 + {ok, C}.
10.87 +
10.88 +%% @private
10.89 +handle_info({'DOWN', _MRef, process, C, Info}, C) ->
10.90 + {error, {consumer_died, Info}, C};
10.91 +handle_info({'DOWN', MRef, process, Pid, Info}, C) ->
10.92 + C ! {'DOWN', MRef, process, Pid, Info},
10.93 + {ok, C}.
10.94 +
10.95 +%% @private
10.96 +handle_call(M, A, C) ->
10.97 + C ! {M, A},
10.98 + {reply, ok, C}.
10.99 +
10.100 +%% @private
10.101 +terminate(_Reason, C) ->
10.102 + C.
11.1 --- a/src/amqp_gen_connection.erl Mon Jul 18 17:40:41 2011 +0100
11.2 +++ b/src/amqp_gen_connection.erl Wed Jul 27 16:03:16 2011 +0100
11.3 @@ -21,7 +21,7 @@
11.4
11.5 -behaviour(gen_server).
11.6
11.7 --export([start_link/5, connect/1, open_channel/2, hard_error_in_channel/3,
11.8 +-export([start_link/5, connect/1, open_channel/3, hard_error_in_channel/3,
11.9 channel_internal_error/3, server_misbehaved/2, channels_terminated/1,
11.10 close/2, info/2, info_keys/0, info_keys/1]).
11.11 -export([behaviour_info/1]).
11.12 @@ -61,8 +61,9 @@
11.13 connect(Pid) ->
11.14 gen_server:call(Pid, connect, infinity).
11.15
11.16 -open_channel(Pid, ProposedNumber) ->
11.17 - case gen_server:call(Pid, {command, {open_channel, ProposedNumber}},
11.18 +open_channel(Pid, ProposedNumber, Consumer) ->
11.19 + case gen_server:call(Pid,
11.20 + {command, {open_channel, ProposedNumber, Consumer}},
11.21 infinity) of
11.22 {ok, ChannelPid} -> ok = amqp_channel:open(ChannelPid),
11.23 {ok, ChannelPid};
11.24 @@ -234,11 +235,11 @@
11.25 %% Command handling
11.26 %%---------------------------------------------------------------------------
11.27
11.28 -handle_command({open_channel, ProposedNumber}, _From,
11.29 +handle_command({open_channel, ProposedNumber, Consumer}, _From,
11.30 State = #state{channels_manager = ChMgr,
11.31 module = Mod,
11.32 module_state = MState}) ->
11.33 - {reply, amqp_channels_manager:open_channel(ChMgr, ProposedNumber,
11.34 + {reply, amqp_channels_manager:open_channel(ChMgr, ProposedNumber, Consumer,
11.35 Mod:open_channel_args(MState)),
11.36 State};
11.37 handle_command({close, #'connection.close'{} = Close}, From, State) ->
12.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
12.2 +++ b/src/amqp_gen_consumer.erl Wed Jul 27 16:03:16 2011 +0100
12.3 @@ -0,0 +1,252 @@
12.4 +%% The contents of this file are subject to the Mozilla Public License
12.5 +%% Version 1.1 (the "License"); you may not use this file except in
12.6 +%% compliance with the License. You may obtain a copy of the License at
12.7 +%% http://www.mozilla.org/MPL/
12.8 +%%
12.9 +%% Software distributed under the License is distributed on an "AS IS"
12.10 +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
12.11 +%% License for the specific language governing rights and limitations
12.12 +%% under the License.
12.13 +%%
12.14 +%% The Original Code is RabbitMQ.
12.15 +%%
12.16 +%% The Initial Developer of the Original Code is VMware, Inc.
12.17 +%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved.
12.18 +%%
12.19 +
12.20 +%% @doc A behaviour module for implementing consumers for
12.21 +%% amqp_channel. To specify a consumer implementation for a channel,
12.22 +%% use amqp_connection:open_channel/{2,3}.
12.23 +%% <br/>
12.24 +%% All callbacks are called within the gen_consumer process. <br/>
12.25 +%% <br/>
12.26 +%% See comments in amqp_gen_consumer.erl source file for documentation
12.27 +%% on the callback functions.
12.28 +%% <br/>
12.29 +%% Note that making calls to the channel from the callback module will
12.30 +%% result in deadlock.
12.31 +-module(amqp_gen_consumer).
12.32 +
12.33 +-include("amqp_client.hrl").
12.34 +
12.35 +-behaviour(gen_server2).
12.36 +
12.37 +-export([start_link/2, call_consumer/2, call_consumer/3]).
12.38 +-export([behaviour_info/1]).
12.39 +-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
12.40 + handle_info/2, prioritise_info/2]).
12.41 +
12.42 +-record(state, {module,
12.43 + module_state}).
12.44 +
12.45 +%%---------------------------------------------------------------------------
12.46 +%% Interface
12.47 +%%---------------------------------------------------------------------------
12.48 +
12.49 +%% @type ok_error() = {ok, state()} | {error, reason(), state()}
12.50 +%% Denotes a successful or an error return from a consumer module call.
12.51 +
12.52 +start_link(ConsumerModule, ExtraParams) ->
12.53 + gen_server2:start_link(?MODULE, [ConsumerModule, ExtraParams], []).
12.54 +
12.55 +%% @spec (Consumer, Msg) -> ok
12.56 +%% where
12.57 +%% Consumer = pid()
12.58 +%% Msg = any()
12.59 +%%
12.60 +%% @doc This function is used to perform arbitrary calls into the
12.61 +%% consumer module.
12.62 +call_consumer(Pid, Msg) ->
12.63 + gen_server2:call(Pid, {consumer_call, Msg}, infinity).
12.64 +
12.65 +%% @spec (Consumer, Method, Args) -> ok
12.66 +%% where
12.67 +%% Consumer = pid()
12.68 +%% Method = amqp_method()
12.69 +%% Args = any()
12.70 +%%
12.71 +%% @doc This function is used by amqp_channel to forward received
12.72 +%% methods and deliveries to the consumer module.
12.73 +call_consumer(Pid, Method, Args) ->
12.74 + gen_server2:call(Pid, {consumer_call, Method, Args}, infinity).
12.75 +
12.76 +%%---------------------------------------------------------------------------
12.77 +%% Behaviour
12.78 +%%---------------------------------------------------------------------------
12.79 +
12.80 +%% @private
12.81 +behaviour_info(callbacks) ->
12.82 + [
12.83 + %% init(Args) -> {ok, InitialState} | {stop, Reason} | ignore
12.84 + %% where
12.85 + %% Args = [any()]
12.86 + %% InitialState = state()
12.87 + %% Reason = term()
12.88 + %%
12.89 + %% This callback is invoked by the channel, when it starts
12.90 + %% up. Use it to initialize the state of the consumer. In case of
12.91 + %% an error, return {stop, Reason} or ignore.
12.92 + {init, 1},
12.93 +
12.94 + %% handle_consume(Consume, Sender, State) -> ok_error()
12.95 + %% where
12.96 + %% Consume = #'basic.consume'{}
12.97 + %% Sender = pid()
12.98 + %% State = state()
12.99 + %%
12.100 + %% This callback is invoked by the channel before a basic.consume
12.101 + %% is sent to the server.
12.102 + {handle_consume, 3},
12.103 +
12.104 + %% handle_consume_ok(ConsumeOk, Consume, State) -> ok_error()
12.105 + %% where
12.106 + %% ConsumeOk = #'basic.consume_ok'{}
12.107 + %% Consume = #'basic.consume'{}
12.108 + %% State = state()
12.109 + %%
12.110 + %% This callback is invoked by the channel every time a
12.111 + %% basic.consume_ok is received from the server. Consume is the original
12.112 + %% method sent out to the server - it can be used to associate the
12.113 + %% call with the response.
12.114 + {handle_consume_ok, 3},
12.115 +
12.116 + %% handle_cancel(Cancel, State) -> ok_error()
12.117 + %% where
12.118 + %% Cancel = #'basic.cancel'{}
12.119 + %% State = state()
12.120 + %%
12.121 + %% This callback is invoked by the channel every time a basic.cancel
12.122 + %% is received from the server.
12.123 + {handle_cancel, 2},
12.124 +
12.125 + %% handle_cancel_ok(CancelOk, Cancel, State) -> ok_error()
12.126 + %% where
12.127 + %% CancelOk = #'basic.cancel_ok'{}
12.128 + %% Cancel = #'basic.cancel'{}
12.129 + %% State = state()
12.130 + %%
12.131 + %% This callback is invoked by the channel every time a basic.cancel_ok
12.132 + %% is received from the server.
12.133 + {handle_cancel_ok, 3},
12.134 +
12.135 + %% handle_deliver(Deliver, Message, State) -> ok_error()
12.136 + %% where
12.137 + %% Deliver = #'basic.deliver'{}
12.138 + %% Message = #amqp_msg{}
12.139 + %% State = state()
12.140 + %%
12.141 + %% This callback is invoked by the channel every time a basic.deliver
12.142 + %% is received from the server.
12.143 + {handle_deliver, 3},
12.144 +
12.145 + %% handle_info(Info, State) -> ok_error()
12.146 + %% where
12.147 + %% Info = any()
12.148 + %% State = state()
12.149 + %%
12.150 + %% This callback is invoked the consumer process receives a
12.151 + %% message.
12.152 + {handle_info, 2},
12.153 +
12.154 + %% handle_call(Msg, From, State) -> {reply, Reply, NewState} |
12.155 + %% {noreply, NewState} |
12.156 + %% {error, Reason, NewState}
12.157 + %% where
12.158 + %% Msg = any()
12.159 + %% From = any()
12.160 + %% Reply = any()
12.161 + %% State = state()
12.162 + %% NewState = state()
12.163 + %%
12.164 + %% This callback is invoked by the channel when calling
12.165 + %% amqp_channel:call_consumer/2. Reply is the term that
12.166 + %% amqp_channel:call_consumer/2 will return. If the callback
12.167 + %% returns {noreply, _}, then the caller to
12.168 + %% amqp_channel:call_consumer/2 and the channel remain blocked
12.169 + %% until gen_server2:reply/2 is used with the provided From as
12.170 + %% the first argument.
12.171 + {handle_call, 3},
12.172 +
12.173 + %% terminate(Reason, State) -> any()
12.174 + %% where
12.175 + %% Reason = any()
12.176 + %% State = state()
12.177 + %%
12.178 + %% This callback is invoked by the channel after it has shut down and
12.179 + %% just before its process exits.
12.180 + {terminate, 2}
12.181 + ];
12.182 +behaviour_info(_Other) ->
12.183 + undefined.
12.184 +
12.185 +%%---------------------------------------------------------------------------
12.186 +%% gen_server2 callbacks
12.187 +%%---------------------------------------------------------------------------
12.188 +
12.189 +init([ConsumerModule, ExtraParams]) ->
12.190 + case ConsumerModule:init(ExtraParams) of
12.191 + {ok, MState} ->
12.192 + {ok, #state{module = ConsumerModule, module_state = MState}};
12.193 + {stop, Reason} ->
12.194 + {stop, Reason};
12.195 + ignore ->
12.196 + ignore
12.197 + end.
12.198 +
12.199 +prioritise_info({'DOWN', _MRef, process, _Pid, _Info}, _State) -> 1;
12.200 +prioritise_info(_, _State) -> 0.
12.201 +
12.202 +handle_call({consumer_call, Msg}, From,
12.203 + State = #state{module = ConsumerModule,
12.204 + module_state = MState}) ->
12.205 + case ConsumerModule:handle_call(Msg, From, MState) of
12.206 + {noreply, NewMState} ->
12.207 + {noreply, State#state{module_state = NewMState}};
12.208 + {reply, Reply, NewMState} ->
12.209 + {reply, Reply, State#state{module_state = NewMState}};
12.210 + {error, Reason, NewMState} ->
12.211 + {stop, {error, Reason}, {error, Reason},
12.212 + State#state{module_state = NewMState}}
12.213 + end;
12.214 +handle_call({consumer_call, Method, Args}, _From,
12.215 + State = #state{module = ConsumerModule,
12.216 + module_state = MState}) ->
12.217 + Return =
12.218 + case Method of
12.219 + #'basic.consume'{} ->
12.220 + ConsumerModule:handle_consume(Method, Args, MState);
12.221 + #'basic.consume_ok'{} ->
12.222 + ConsumerModule:handle_consume_ok(Method, Args, MState);
12.223 + #'basic.cancel'{} ->
12.224 + ConsumerModule:handle_cancel(Method, MState);
12.225 + #'basic.cancel_ok'{} ->
12.226 + ConsumerModule:handle_cancel_ok(Method, Args, MState);
12.227 + #'basic.deliver'{} ->
12.228 + ConsumerModule:handle_deliver(Method, Args, MState)
12.229 + end,
12.230 + case Return of
12.231 + {ok, NewMState} ->
12.232 + {reply, ok, State#state{module_state = NewMState}};
12.233 + {error, Reason, NewMState} ->
12.234 + {stop, {error, Reason}, {error, Reason},
12.235 + State#state{module_state = NewMState}}
12.236 + end.
12.237 +
12.238 +handle_cast(_What, State) ->
12.239 + {noreply, State}.
12.240 +
12.241 +handle_info(Info, State = #state{module_state = MState,
12.242 + module = ConsumerModule}) ->
12.243 + case ConsumerModule:handle_info(Info, MState) of
12.244 + {ok, NewMState} ->
12.245 + {noreply, State#state{module_state = NewMState}};
12.246 + {error, Reason, NewMState} ->
12.247 + {stop, {error, Reason}, {error, Reason},
12.248 + State#state{module_state = NewMState}}
12.249 + end.
12.250 +
12.251 +terminate(Reason, #state{module = ConsumerModule, module_state = MState}) ->
12.252 + ConsumerModule:terminate(Reason, MState).
12.253 +
12.254 +code_change(_OldVsn, State, _Extra) ->
12.255 + State.
13.1 --- a/src/amqp_rpc_client.erl Mon Jul 18 17:40:41 2011 +0100
13.2 +++ b/src/amqp_rpc_client.erl Wed Jul 27 16:03:16 2011 +0100
13.3 @@ -81,7 +81,7 @@
13.4
13.5 %% Registers this RPC client instance as a consumer to handle rpc responses
13.6 setup_consumer(#state{channel = Channel, reply_queue = Q}) ->
13.7 - amqp_channel:subscribe(Channel, #'basic.consume'{queue = Q}, self()).
13.8 + amqp_channel:call(Channel, #'basic.consume'{queue = Q}).
13.9
13.10 %% Publishes to the broker, stores the From address against
13.11 %% the correlation id and increments the correlationid for
13.12 @@ -111,7 +111,8 @@
13.13 %% Sets up a reply queue and consumer within an existing channel
13.14 %% @private
13.15 init([Connection, RoutingKey]) ->
13.16 - {ok, Channel} = amqp_connection:open_channel(Connection),
13.17 + {ok, Channel} = amqp_connection:open_channel(
13.18 + Connection, {amqp_direct_consumer, [self()]}),
13.19 InitialState = #state{channel = Channel,
13.20 exchange = <<>>,
13.21 routing_key = RoutingKey},
13.22 @@ -140,10 +141,18 @@
13.23 {noreply, State}.
13.24
13.25 %% @private
13.26 +handle_info({#'basic.consume'{}, _Pid}, State) ->
13.27 + {noreply, State};
13.28 +
13.29 +%% @private
13.30 handle_info(#'basic.consume_ok'{}, State) ->
13.31 {noreply, State};
13.32
13.33 %% @private
13.34 +handle_info(#'basic.cancel'{}, State) ->
13.35 + {noreply, State};
13.36 +
13.37 +%% @private
13.38 handle_info(#'basic.cancel_ok'{}, State) ->
13.39 {stop, normal, State};
13.40
14.1 --- a/src/amqp_rpc_server.erl Mon Jul 18 17:40:41 2011 +0100
14.2 +++ b/src/amqp_rpc_server.erl Wed Jul 27 16:03:16 2011 +0100
14.3 @@ -64,9 +64,10 @@
14.4
14.5 %% @private
14.6 init([Connection, Q, Fun]) ->
14.7 - {ok, Channel} = amqp_connection:open_channel(Connection),
14.8 + {ok, Channel} = amqp_connection:open_channel(
14.9 + Connection, {amqp_direct_consumer, [self()]}),
14.10 amqp_channel:call(Channel, #'queue.declare'{queue = Q}),
14.11 - amqp_channel:subscribe(Channel, #'basic.consume'{queue = Q}, self()),
14.12 + amqp_channel:call(Channel, #'basic.consume'{queue = Q}),
14.13 {ok, #state{channel = Channel, handler = Fun} }.
14.14
14.15 %% @private
14.16 @@ -74,10 +75,18 @@
14.17 {stop, normal, State};
14.18
14.19 %% @private
14.20 +handle_info({#'basic.consume'{}, _}, State) ->
14.21 + {noreply, State};
14.22 +
14.23 +%% @private
14.24 handle_info(#'basic.consume_ok'{}, State) ->
14.25 {noreply, State};
14.26
14.27 %% @private
14.28 +handle_info(#'basic.cancel'{}, State) ->
14.29 + {noreply, State};
14.30 +
14.31 +%% @private
14.32 handle_info(#'basic.cancel_ok'{}, State) ->
14.33 {stop, normal, State};
14.34
14.35 @@ -95,6 +104,10 @@
14.36 amqp_channel:call(Channel, Publish, #amqp_msg{props = Properties,
14.37 payload = Response}),
14.38 amqp_channel:call(Channel, #'basic.ack'{delivery_tag = DeliveryTag}),
14.39 + {noreply, State};
14.40 +
14.41 +%% @private
14.42 +handle_info({'DOWN', _MRef, process, _Pid, _Info}, State) ->
14.43 {noreply, State}.
14.44
14.45 %% @private
15.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
15.2 +++ b/src/amqp_selective_consumer.erl Wed Jul 27 16:03:16 2011 +0100
15.3 @@ -0,0 +1,233 @@
15.4 +%% The contents of this file are subject to the Mozilla Public Licensbe
15.5 +%% Version 1.1 (the "License"); you may not use this file except in
15.6 +%% compliance with the License. You may obtain a copy of the License at
15.7 +%% http://www.mozilla.org/MPL/
15.8 +%%
15.9 +%% Software distributed under the License is distributed on an "AS IS"
15.10 +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
15.11 +%% License for the specific language governing rights and limitations
15.12 +%% under the License.
15.13 +%%
15.14 +%% The Original Code is RabbitMQ.
15.15 +%%
15.16 +%% The Initial Developer of the Original Code is VMware, Inc.
15.17 +%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved.
15.18 +%%
15.19 +
15.20 +%% @doc This module is an implementation of the amqp_gen_consumer
15.21 +%% behaviour and can be used as part of the Consumer parameter when
15.22 +%% opening AMQP channels. This is the default implementation selected
15.23 +%% by channel. <br/>
15.24 +%% <br/>
15.25 +%% The Consumer parameter for this implementation is {{@module}, []@}<br/>
15.26 +%% This consumer implementation keeps track of consumer tags and sends
15.27 +%% the subscription-relevant messages to the registered consumers, according
15.28 +%% to an internal tag dictionary.<br/>
15.29 +%% <br/>
15.30 +%% Send a #basic.consume{} message to the channel to subscribe a
15.31 +%% consumer to a queue and send a #basic.cancel{} message to cancel a
15.32 +%% subscription.<br/>
15.33 +%% <br/>
15.34 +%% The channel will send to the relevant registered consumers the
15.35 +%% basic.consume_ok, basic.cancel_ok, basic.cancel and basic.deliver messages
15.36 +%% received from the server.<br/>
15.37 +%% <br/>
15.38 +%% If a consumer is not registered for a given consumer tag, the message
15.39 +%% is sent to the default consumer registered with
15.40 +%% {@module}:register_default_consumer. If there is no default consumer
15.41 +%% registered in this case, an exception occurs and the channel is abruptly
15.42 +%% terminated.<br/>
15.43 +-module(amqp_selective_consumer).
15.44 +
15.45 +-include("amqp_client.hrl").
15.46 +-include("amqp_gen_consumer_spec.hrl").
15.47 +
15.48 +-behaviour(amqp_gen_consumer).
15.49 +
15.50 +-export([register_default_consumer/2]).
15.51 +-export([init/1, handle_consume_ok/3, handle_consume/3, handle_cancel_ok/3,
15.52 + handle_cancel/2, handle_deliver/3, handle_info/2, handle_call/3,
15.53 + terminate/2]).
15.54 +
15.55 +-record(state, {consumers = dict:new(), %% Tag -> ConsumerPid
15.56 + unassigned = undefined, %% Pid
15.57 + monitors = dict:new(), %% Pid -> MRef
15.58 + default_consumer = none}).
15.59 +
15.60 +%%---------------------------------------------------------------------------
15.61 +%% Interface
15.62 +%%---------------------------------------------------------------------------
15.63 +
15.64 +%% @spec (ChannelPid, ConsumerPid) -> ok
15.65 +%% where
15.66 +%% ChannelPid = pid()
15.67 +%% ConsumerPid = pid()
15.68 +%% @doc This function registers a default consumer with the channel. A
15.69 +%% default consumer is used when a subscription is made via
15.70 +%% amqp_channel:call(ChannelPid, #'basic.consume'{}) (rather than
15.71 +%% {@module}:subscribe/3) and hence there is no consumer pid
15.72 +%% registered with the consumer tag. In this case, the relevant
15.73 +%% deliveries will be sent to the default consumer.
15.74 +register_default_consumer(ChannelPid, ConsumerPid) ->
15.75 + amqp_channel:call_consumer(ChannelPid,
15.76 + {register_default_consumer, ConsumerPid}).
15.77 +
15.78 +%%---------------------------------------------------------------------------
15.79 +%% amqp_gen_consumer callbacks
15.80 +%%---------------------------------------------------------------------------
15.81 +
15.82 +%% @private
15.83 +init([]) ->
15.84 + {ok, #state{}}.
15.85 +
15.86 +%% @private
15.87 +handle_consume(BasicConsume, Pid, State = #state{consumers = Consumers,
15.88 + monitors = Monitors}) ->
15.89 + Tag = tag(BasicConsume),
15.90 + Ok =
15.91 + case BasicConsume of
15.92 + #'basic.consume'{nowait = true}
15.93 + when Tag =:= undefined orelse size(Tag) == 0 ->
15.94 + false; %% Async and undefined tag
15.95 + _ when is_binary(Tag) andalso size(Tag) >= 0 ->
15.96 + case resolve_consumer(Tag, State) of
15.97 + {consumer, _} -> false; %% Tag already in use
15.98 + _ -> true
15.99 + end;
15.100 + _ ->
15.101 + true
15.102 + end,
15.103 + case {Ok, BasicConsume} of
15.104 + {true, #'basic.consume'{nowait = true}} ->
15.105 + {ok, State#state
15.106 + {consumers = dict:store(Tag, Pid, Consumers),
15.107 + monitors = dict:store(Pid, monitor(process, Pid), Monitors)}};
15.108 + {true, #'basic.consume'{nowait = false}} ->
15.109 + {ok, State#state{unassigned = Pid}};
15.110 + {false, #'basic.consume'{nowait = true}} ->
15.111 + {error, 'no_consumer_tag_specified', State};
15.112 + {false, #'basic.consume'{nowait = false}} ->
15.113 + %% Don't do anything (don't override existing
15.114 + %% consumers), the server will close the channel with an error.
15.115 + {ok, State}
15.116 + end.
15.117 +
15.118 +%% @private
15.119 +handle_consume_ok(BasicConsumeOk, _BasicConsume,
15.120 + State = #state{unassigned = Pid,
15.121 + consumers = Consumers,
15.122 + monitors = Monitors})
15.123 + when is_pid(Pid) ->
15.124 + State1 = State#state{consumers =
15.125 + dict:store(tag(BasicConsumeOk), Pid, Consumers),
15.126 + monitors =
15.127 + dict:store(Pid, monitor(process, Pid), Monitors),
15.128 + unassigned = undefined},
15.129 + deliver(BasicConsumeOk, State1),
15.130 + {ok, State1}.
15.131 +
15.132 +%% @private
15.133 +%% The server sent a basic.cancel.
15.134 +handle_cancel(Cancel, State) ->
15.135 + State1 = do_cancel(Cancel, State),
15.136 + %% Use old state
15.137 + deliver(Cancel, State),
15.138 + {ok, State1}.
15.139 +
15.140 +%% @private
15.141 +%% We sent a basic.cancel and now receive the ok.
15.142 +handle_cancel_ok(CancelOk, _Cancel, State) ->
15.143 + State1 = do_cancel(CancelOk, State),
15.144 + %% Use old state
15.145 + deliver(CancelOk, State),
15.146 + {ok, State1}.
15.147 +
15.148 +%% @private
15.149 +handle_deliver(Deliver, Message, State) ->
15.150 + deliver(Deliver, Message, State),
15.151 + {ok, State}.
15.152 +
15.153 +%% @private
15.154 +handle_info({'DOWN', _MRef, process, Pid, _Info},
15.155 + State = #state{monitors = Monitors,
15.156 + consumers = Consumers,
15.157 + default_consumer = DConsumer }) ->
15.158 + case dict:find(Pid, Monitors) of
15.159 + {ok, _Tag} ->
15.160 + {ok, State#state{monitors = dict:erase(Pid, Monitors),
15.161 + consumers =
15.162 + dict:filter(
15.163 + fun (_, Pid1) when Pid1 =:= Pid -> false;
15.164 + (_, _) -> true
15.165 + end, Consumers)}};
15.166 + error ->
15.167 + case Pid of
15.168 + DConsumer -> {ok, State#state{
15.169 + monitors = dict:erase(Pid, Monitors),
15.170 + default_consumer = none}};
15.171 + _ -> {ok, State} %% unnamed consumer went down
15.172 + %% before receiving consume_ok
15.173 + end
15.174 + end.
15.175 +
15.176 +%% @private
15.177 +handle_call({register_default_consumer, Pid}, _From,
15.178 + State = #state{default_consumer = PrevPid,
15.179 + monitors = Monitors}) ->
15.180 + case PrevPid of
15.181 + none -> ok;
15.182 + _ -> demonitor(dict:fetch(PrevPid, Monitors)),
15.183 + dict:erase(PrevPid, Monitors)
15.184 + end,
15.185 + {reply, ok,
15.186 + State#state{default_consumer = Pid,
15.187 + monitors = dict:store(Pid, monitor(process, Pid),
15.188 + Monitors)}}.
15.189 +
15.190 +%% @private
15.191 +terminate(_Reason, State) ->
15.192 + State.
15.193 +
15.194 +%%---------------------------------------------------------------------------
15.195 +%% Internal plumbing
15.196 +%%---------------------------------------------------------------------------
15.197 +
15.198 +deliver(Msg, State) ->
15.199 + deliver(Msg, undefined, State).
15.200 +deliver(Msg, Message, State) ->
15.201 + Combined = if Message =:= undefined -> Msg;
15.202 + true -> {Msg, Message}
15.203 + end,
15.204 + case resolve_consumer(tag(Msg), State) of
15.205 + {consumer, Pid} -> Pid ! Combined;
15.206 + {default, Pid} -> Pid ! Combined;
15.207 + error -> exit(unexpected_delivery_and_no_default_consumer)
15.208 + end.
15.209 +
15.210 +do_cancel(Cancel, State = #state{consumers = Consumers,
15.211 + monitors = Monitors}) ->
15.212 + Tag = tag(Cancel),
15.213 + case dict:find(Tag, Consumers) of
15.214 + {ok, Pid} -> MRef = dict:fetch(Pid, Monitors),
15.215 + demonitor(MRef),
15.216 + State#state{consumers = dict:erase(Tag, Consumers),
15.217 + monitors = dict:erase(Pid, Monitors)};
15.218 + error -> %% Untracked consumer. Do nothing.
15.219 + State
15.220 + end.
15.221 +
15.222 +resolve_consumer(Tag, #state{consumers = Consumers,
15.223 + default_consumer = DefaultConsumer}) ->
15.224 + case dict:find(Tag, Consumers) of
15.225 + {ok, ConsumerPid} -> {consumer, ConsumerPid};
15.226 + error -> case DefaultConsumer of
15.227 + none -> error;
15.228 + _ -> {default, DefaultConsumer}
15.229 + end
15.230 + end.
15.231 +
15.232 +tag(#'basic.consume'{consumer_tag = Tag}) -> Tag;
15.233 +tag(#'basic.consume_ok'{consumer_tag = Tag}) -> Tag;
15.234 +tag(#'basic.cancel'{consumer_tag = Tag}) -> Tag;
15.235 +tag(#'basic.cancel_ok'{consumer_tag = Tag}) -> Tag;
15.236 +tag(#'basic.deliver'{consumer_tag = Tag}) -> Tag.
16.1 --- a/test/direct_client_SUITE.erl Mon Jul 18 17:40:41 2011 +0100
16.2 +++ b/test/direct_client_SUITE.erl Wed Jul 27 16:03:16 2011 +0100
16.3 @@ -103,6 +103,9 @@
16.4 confirm_barrier_nop_test() ->
16.5 test_util:confirm_barrier_nop_test(new_connection()).
16.6
16.7 +default_consumer_test() ->
16.8 + test_util:default_consumer_test(new_connection()).
16.9 +
16.10 subscribe_nowait_test() ->
16.11 test_util:subscribe_nowait_test(new_connection()).
16.12
17.1 --- a/test/negative_test_util.erl Mon Jul 18 17:40:41 2011 +0100
17.2 +++ b/test/negative_test_util.erl Wed Jul 27 16:03:16 2011 +0100
17.3 @@ -126,10 +126,10 @@
17.4 SentString = << <<"k">> || _ <- lists:seq(1, 340)>>,
17.5 Q = test_util:uuid(), X = test_util:uuid(), Key = test_util:uuid(),
17.6 test_util:setup_exchange(Channel, Q, X, Key),
17.7 - ?assertExit(_, amqp_channel:subscribe(
17.8 - Channel, #'basic.consume'{queue = Q, no_ack = true,
17.9 - consumer_tag = SentString},
17.10 - self())),
17.11 + ?assertExit(_, amqp_channel:call(
17.12 + Channel, #'basic.consume'{queue = Q,
17.13 + no_ack = true,
17.14 + consumer_tag = SentString})),
17.15 test_util:wait_for_death(Channel),
17.16 test_util:wait_for_death(Connection),
17.17 ok.
18.1 --- a/test/network_client_SUITE.erl Mon Jul 18 17:40:41 2011 +0100
18.2 +++ b/test/network_client_SUITE.erl Wed Jul 27 16:03:16 2011 +0100
18.3 @@ -117,6 +117,9 @@
18.4 confirm_barrier_nop_test() ->
18.5 test_util:confirm_barrier_nop_test(new_connection()).
18.6
18.7 +default_consumer_test() ->
18.8 + test_util:default_consumer_test(new_connection()).
18.9 +
18.10 subscribe_nowait_test() ->
18.11 test_util:subscribe_nowait_test(new_connection()).
18.12
19.1 --- a/test/test_util.erl Mon Jul 18 17:40:41 2011 +0100
19.2 +++ b/test/test_util.erl Wed Jul 27 16:03:16 2011 +0100
19.3 @@ -332,9 +332,8 @@
19.4 exchange = X,
19.5 routing_key = RoutingKey}),
19.6 #'basic.consume_ok'{} =
19.7 - amqp_channel:subscribe(Channel,
19.8 - #'basic.consume'{queue = Q, consumer_tag = Tag},
19.9 - self()),
19.10 + amqp_channel:call(Channel,
19.11 + #'basic.consume'{queue = Q, consumer_tag = Tag}),
19.12 receive #'basic.consume_ok'{consumer_tag = Tag} -> ok end,
19.13 receive {#'basic.deliver'{}, _} -> ok end,
19.14 #'basic.cancel_ok'{} =
19.15 @@ -348,7 +347,7 @@
19.16 #'queue.declare_ok'{} =
19.17 amqp_channel:call(Channel, #'queue.declare'{queue = Q}),
19.18 #'basic.consume_ok'{consumer_tag = CTag} = ConsumeOk =
19.19 - amqp_channel:subscribe(Channel, #'basic.consume'{queue = Q}, self()),
19.20 + amqp_channel:call(Channel, #'basic.consume'{queue = Q}),
19.21 receive ConsumeOk -> ok end,
19.22 #'queue.delete_ok'{} =
19.23 amqp_channel:call(Channel, #'queue.delete'{queue = Q}),
19.24 @@ -357,34 +356,27 @@
19.25 ok.
19.26
19.27 basic_recover_test(Connection) ->
19.28 - {ok, Channel} = amqp_connection:open_channel(Connection),
19.29 + {ok, Channel} = amqp_connection:open_channel(
19.30 + Connection, {amqp_direct_consumer, [self()]}),
19.31 #'queue.declare_ok'{queue = Q} =
19.32 amqp_channel:call(Channel, #'queue.declare'{}),
19.33 #'basic.consume_ok'{consumer_tag = Tag} =
19.34 - amqp_channel:subscribe(Channel, #'basic.consume'{queue = Q},
19.35 - self()),
19.36 - receive
19.37 - #'basic.consume_ok'{consumer_tag = Tag} -> ok
19.38 - after 2000 ->
19.39 - exit(did_not_receive_subscription_message)
19.40 - end,
19.41 + amqp_channel:call(Channel, #'basic.consume'{queue = Q}),
19.42 + receive #'basic.consume_ok'{consumer_tag = Tag} -> ok end,
19.43 Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
19.44 amqp_channel:call(Channel, Publish, #amqp_msg{payload = <<"foobar">>}),
19.45 receive
19.46 - {#'basic.deliver'{}, _} ->
19.47 + {#'basic.deliver'{consumer_tag = Tag}, _} ->
19.48 %% no_ack set to false, but don't send ack
19.49 ok
19.50 - after 2000 ->
19.51 - exit(did_not_receive_first_message)
19.52 end,
19.53 BasicRecover = #'basic.recover'{requeue = true},
19.54 amqp_channel:cast(Channel, BasicRecover),
19.55 receive
19.56 - {#'basic.deliver'{delivery_tag = DeliveryTag2}, _} ->
19.57 + {#'basic.deliver'{consumer_tag = Tag,
19.58 + delivery_tag = DeliveryTag2}, _} ->
19.59 amqp_channel:cast(Channel,
19.60 #'basic.ack'{delivery_tag = DeliveryTag2})
19.61 - after 2000 ->
19.62 - exit(did_not_receive_second_message)
19.63 end,
19.64 teardown(Connection, Channel).
19.65
19.66 @@ -433,8 +425,8 @@
19.67 {ok, Channel} = amqp_connection:open_channel(Connection),
19.68 amqp_channel:call(Channel,
19.69 #'basic.qos'{prefetch_count = Prefetch}),
19.70 - amqp_channel:subscribe(Channel, #'basic.consume'{queue = Q},
19.71 - self()),
19.72 + amqp_channel:call(Channel,
19.73 + #'basic.consume'{queue = Q}),
19.74 Parent ! finished,
19.75 sleeping_consumer(Channel, Sleep, Parent)
19.76 end) || Sleep <- Workers],
19.77 @@ -456,7 +448,7 @@
19.78 #'basic.consume_ok'{} ->
19.79 sleeping_consumer(Channel, Sleep, Parent);
19.80 #'basic.cancel_ok'{} ->
19.81 - ok;
19.82 + exit(unexpected_cancel_ok);
19.83 {#'basic.deliver'{delivery_tag = DeliveryTag}, _Content} ->
19.84 Parent ! finished,
19.85 receive stop -> do_stop(Channel, Parent)
19.86 @@ -515,13 +507,47 @@
19.87 true = amqp_channel:wait_for_confirms(Channel),
19.88 teardown(Connection, Channel).
19.89
19.90 +default_consumer_test(Connection) ->
19.91 + {ok, Channel} = amqp_connection:open_channel(Connection),
19.92 + amqp_selective_consumer:register_default_consumer(Channel, self()),
19.93 +
19.94 + #'queue.declare_ok'{queue = Q}
19.95 + = amqp_channel:call(Channel, #'queue.declare'{}),
19.96 + Pid = spawn(fun () -> receive
19.97 + after 10000 -> ok
19.98 + end
19.99 + end),
19.100 + #'basic.consume_ok'{} =
19.101 + amqp_channel:subscribe(Channel, #'basic.consume'{queue = Q}, Pid),
19.102 + monitor(process, Pid),
19.103 + exit(Pid, shutdown),
19.104 + receive
19.105 + {'DOWN', _, process, _, _} ->
19.106 + io:format("little consumer died out~n")
19.107 + end,
19.108 + Payload = <<"for the default consumer">>,
19.109 + amqp_channel:call(Channel,
19.110 + #'basic.publish'{exchange = <<>>, routing_key = Q},
19.111 + #amqp_msg{payload = Payload}),
19.112 +
19.113 + receive
19.114 + {#'basic.deliver'{}, #'amqp_msg'{payload = Payload}} ->
19.115 + ok
19.116 + after 1000 ->
19.117 + exit('default_consumer_didnt_work')
19.118 + end,
19.119 + teardown(Connection, Channel).
19.120 +
19.121 subscribe_nowait_test(Connection) ->
19.122 {ok, Channel} = amqp_connection:open_channel(Connection),
19.123 {ok, Q} = setup_publish(Channel),
19.124 - ok = amqp_channel:subscribe(Channel, #'basic.consume'{queue = Q,
19.125 - consumer_tag = uuid(),
19.126 - nowait = true},
19.127 - self()),
19.128 + ok = amqp_channel:call(Channel,
19.129 + #'basic.consume'{queue = Q,
19.130 + consumer_tag = uuid(),
19.131 + nowait = true}),
19.132 + receive #'basic.consume_ok'{} -> exit(unexpected_consume_ok)
19.133 + after 0 -> ok
19.134 + end,
19.135 receive
19.136 {#'basic.deliver'{delivery_tag = DTag}, _Content} ->
19.137 amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = DTag})
19.138 @@ -585,10 +611,10 @@
19.139 %% Close connection without closing channels
19.140 amqp_connection:close(Connection1),
19.141 %% Get sent messages back and count them
19.142 - {ok, Channel2} = amqp_connection:open_channel(Connection2),
19.143 - amqp_channel:subscribe(Channel2,
19.144 - #'basic.consume'{queue = Q, no_ack = true},
19.145 - self()),
19.146 + {ok, Channel2} = amqp_connection:open_channel(
19.147 + Connection2, {amqp_direct_consumer, [self()]}),
19.148 + amqp_channel:call(Channel2, #'basic.consume'{queue = Q, no_ack = true}),
19.149 + receive #'basic.consume_ok'{} -> ok end,
19.150 ?assert(pc_consumer_loop(Channel2, Payload, 0) == NMessages),
19.151 %% Make sure queue is empty
19.152 #'queue.declare_ok'{queue = Q, message_count = NRemaining} =