1.1 --- a/.hgtags Thu Apr 07 10:36:27 2011 -0500
1.2 +++ b/.hgtags Thu Jul 28 13:46:07 2011 +0100
1.3 @@ -10,3 +10,5 @@
1.4 553b07d6267a608673d1bc7603b6b3a13ce99b16 rabbitmq_v2_3_0
1.5 a1f84f26c26ca04c4baac06f79d0f7a0d66c1dbb rabbitmq_v2_3_1
1.6 69d96cc497ce4686377e41ced1d92a993ce6b748 rabbitmq_v2_4_0
1.7 +c9fc28db6777acf24db72f00efb59d91d3172acf rabbitmq_v2_4_1
1.8 +0b6b1c20a6e829c3477dd11e4b6a085a0189fd9c rabbitmq_v2_5_0
2.1 --- a/Makefile Thu Apr 07 10:36:27 2011 -0500
2.2 +++ b/Makefile Thu Jul 28 13:46:07 2011 +0100
2.3 @@ -37,8 +37,9 @@
2.4
2.5 distribution: documentation source_tarball package
2.6
2.7 -%.app: %.app.in
2.8 - sed -e 's:%%VSN%%:$(VERSION):g' < $< > $@
2.9 +%.app: %.app.in $(SOURCES) $(BROKER_DIR)/generate_app
2.10 + escript $(BROKER_DIR)/generate_app $< $@ $(SOURCE_DIR)
2.11 + sed -i.save 's/%%VSN%%/$(VERSION)/' $@ && rm $@.save
2.12
2.13 ###############################################################################
2.14 ## Dialyzer
3.1 --- a/common.mk Thu Apr 07 10:36:27 2011 -0500
3.2 +++ b/common.mk Thu Jul 28 13:46:07 2011 +0100
3.3 @@ -56,10 +56,10 @@
3.4 ERL_PATH ?=
3.5
3.6 PACKAGE=amqp_client
3.7 -PACKAGE_DIR=$(PACKAGE)$(if $(APPEND_VERSION),-$(VERSION),)
3.8 +PACKAGE_DIR=$(PACKAGE)-$(VERSION)
3.9 PACKAGE_NAME_EZ=$(PACKAGE_DIR).ez
3.10 COMMON_PACKAGE=rabbit_common
3.11 -export COMMON_PACKAGE_DIR=$(COMMON_PACKAGE)$(if $(APPEND_VERSION),-$(VERSION),)
3.12 +export COMMON_PACKAGE_DIR=$(COMMON_PACKAGE)-$(VERSION)
3.13 COMMON_PACKAGE_EZ=$(COMMON_PACKAGE_DIR).ez
3.14
3.15 DEPS=$(shell erl -noshell -eval '{ok,[{_,_,[_,_,{modules, Mods},_,_,_]}]} = \
3.16 @@ -109,13 +109,14 @@
3.17 ALL_SSL := { $(MAKE) test_ssl || OK=false; }
3.18 ALL_SSL_COVERAGE := { $(MAKE) test_ssl_coverage || OK=false; }
3.19 SSL_BROKER_ARGS := -rabbit ssl_listeners [{\\\"0.0.0.0\\\",5671}] \
3.20 - -rabbit ssl_options [{cacertfile,\\\"$(SSL_CERTS_DIR)/testca/cacert.pem\\\"},{certfile,\\\"$(SSL_CERTS_DIR)/server/cert.pem\\\"},{keyfile,\\\"$(SSL_CERTS_DIR)/server/key.pem\\\"},{verify,verify_peer},{fail_if_no_peer_cert,true}] \
3.21 - -erlang_client_ssl_dir \"$(SSL_CERTS_DIR)\"
3.22 + -rabbit ssl_options [{cacertfile,\\\"$(SSL_CERTS_DIR)/testca/cacert.pem\\\"},{certfile,\\\"$(SSL_CERTS_DIR)/server/cert.pem\\\"},{keyfile,\\\"$(SSL_CERTS_DIR)/server/key.pem\\\"},{verify,verify_peer},{fail_if_no_peer_cert,true}]
3.23 +SSL_CLIENT_ARGS := -erlang_client_ssl_dir $(SSL_CERTS_DIR)
3.24 else
3.25 SSL := @echo No SSL_CERTS_DIR defined. && false
3.26 ALL_SSL := true
3.27 ALL_SSL_COVERAGE := true
3.28 SSL_BROKER_ARGS :=
3.29 +SSL_CLIENT_ARGS :=
3.30 endif
3.31
3.32 # Versions prior to this are not supported
4.1 --- a/ebin/amqp_client.app.in Thu Apr 07 10:36:27 2011 -0500
4.2 +++ b/ebin/amqp_client.app.in Thu Jul 28 13:46:07 2011 +0100
4.3 @@ -1,22 +1,7 @@
4.4 {application, amqp_client,
4.5 [{description, "RabbitMQ AMQP Client"},
4.6 {vsn, "%%VSN%%"},
4.7 - {modules, [amqp_auth_mechanisms,
4.8 - amqp_channel,
4.9 - amqp_channel_sup,
4.10 - amqp_channel_sup_sup,
4.11 - amqp_channels_manager,
4.12 - amqp_client,
4.13 - amqp_connection,
4.14 - amqp_connection_sup,
4.15 - amqp_connection_type_sup,
4.16 - amqp_direct_connection,
4.17 - amqp_gen_connection,
4.18 - amqp_main_reader,
4.19 - amqp_network_connection,
4.20 - amqp_rpc_client,
4.21 - amqp_rpc_server,
4.22 - amqp_sup]},
4.23 + {modules, []},
4.24 {registered, [amqp_sup]},
4.25 {env, []},
4.26 {mod, {amqp_client, []}},
5.1 --- a/include/amqp_client.hrl Thu Apr 07 10:36:27 2011 -0500
5.2 +++ b/include/amqp_client.hrl Thu Jul 28 13:46:07 2011 +0100
5.3 @@ -14,6 +14,9 @@
5.4 %% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
5.5 %%
5.6
5.7 +-ifndef(AMQP_CLIENT_HRL).
5.8 +-define(AMQP_CLIENT_HRL, true).
5.9 +
5.10 -include_lib("rabbit_common/include/rabbit.hrl").
5.11 -include_lib("rabbit_common/include/rabbit_framing.hrl").
5.12
5.13 @@ -25,21 +28,37 @@
5.14 -define(MAX_CHANNEL_NUMBER, 65535).
5.15 -define(DEFAULT_CONSUMER, {amqp_selective_consumer, []}).
5.16
5.17 +-define(PROTOCOL_SSL_PORT, (?PROTOCOL_PORT - 1)).
5.18 +
5.19 -record(amqp_msg, {props = #'P_basic'{}, payload = <<>>}).
5.20
5.21 --record(amqp_params, {username = <<"guest">>,
5.22 - password = <<"guest">>,
5.23 - virtual_host = <<"/">>,
5.24 - host = "localhost",
5.25 - port = ?PROTOCOL_PORT,
5.26 - node = node(),
5.27 - channel_max = 0,
5.28 - frame_max = 0,
5.29 - heartbeat = 0,
5.30 - ssl_options = none,
5.31 - auth_mechanisms = [fun amqp_auth_mechanisms:plain/3,
5.32 - fun amqp_auth_mechanisms:amqplain/3],
5.33 - client_properties = []}).
5.34 +-record(amqp_params_network, {username = <<"guest">>,
5.35 + password = <<"guest">>,
5.36 + virtual_host = <<"/">>,
5.37 + host = "localhost",
5.38 + port = undefined,
5.39 + channel_max = 0,
5.40 + frame_max = 0,
5.41 + heartbeat = 0,
5.42 + ssl_options = none,
5.43 + auth_mechanisms =
5.44 + [fun amqp_auth_mechanisms:plain/3,
5.45 + fun amqp_auth_mechanisms:amqplain/3],
5.46 + client_properties = []}).
5.47 +
5.48 +-record(amqp_params_direct, {username = <<"guest">>,
5.49 + virtual_host = <<"/">>,
5.50 + node = node(),
5.51 + adapter_info = none,
5.52 + client_properties = []}).
5.53 +
5.54 +-record(adapter_info, {address = unknown,
5.55 + port = unknown,
5.56 + peer_address = unknown,
5.57 + peer_port = unknown,
5.58 + name = unknown,
5.59 + protocol = unknown,
5.60 + additional_info = []}).
5.61
5.62 -define(LOG_DEBUG(Format), error_logger:info_msg(Format)).
5.63 -define(LOG_INFO(Format, Args), error_logger:info_msg(Format, Args)).
5.64 @@ -49,3 +68,5 @@
5.65 {<<"exchange_exchange_bindings">>, bool, true},
5.66 {<<"basic.nack">>, bool, true},
5.67 {<<"consumer_cancel_notify">>, bool, true}]).
5.68 +
5.69 +-endif.
6.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
6.2 +++ b/include/amqp_gen_consumer_spec.hrl Thu Jul 28 13:46:07 2011 +0100
6.3 @@ -0,0 +1,40 @@
6.4 +%% The contents of this file are subject to the Mozilla Public License
6.5 +%% Version 1.1 (the "License"); you may not use this file except in
6.6 +%% compliance with the License. You may obtain a copy of the License at
6.7 +%% http://www.mozilla.org/MPL/
6.8 +%%
6.9 +%% Software distributed under the License is distributed on an "AS IS"
6.10 +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
6.11 +%% License for the specific language governing rights and limitations
6.12 +%% under the License.
6.13 +%%
6.14 +%% The Original Code is RabbitMQ.
6.15 +%%
6.16 +%% The Initial Developer of the Original Code is VMware, Inc.
6.17 +%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved.
6.18 +%%
6.19 +
6.20 +-include("amqp_client.hrl").
6.21 +
6.22 +-type(state() :: any()).
6.23 +-type(consume() :: #'basic.consume'{}).
6.24 +-type(consume_ok() :: #'basic.consume_ok'{}).
6.25 +-type(cancel() :: #'basic.cancel'{}).
6.26 +-type(cancel_ok() :: #'basic.cancel_ok'{}).
6.27 +-type(deliver() :: #'basic.deliver'{}).
6.28 +-type(from() :: any()).
6.29 +-type(reason() :: any()).
6.30 +-type(ok_error() :: {ok, state()} | {error, reason(), state()}).
6.31 +
6.32 +-spec(init/1 :: ([any()]) -> {ok, state()}).
6.33 +-spec(handle_consume/3 :: (consume(), pid(), state()) -> ok_error()).
6.34 +-spec(handle_consume_ok/3 :: (consume_ok(), consume(), state()) ->
6.35 + ok_error()).
6.36 +-spec(handle_cancel/2 :: (cancel(), state()) -> ok_error()).
6.37 +-spec(handle_cancel_ok/3 :: (cancel_ok(), cancel(), state()) -> ok_error()).
6.38 +-spec(handle_deliver/3 :: (deliver(), #amqp_msg{}, state()) -> ok_error()).
6.39 +-spec(handle_info/2 :: (any(), state()) -> ok_error()).
6.40 +-spec(handle_call/3 :: (any(), from(), state()) ->
6.41 + {reply, any(), state()} | {noreply, state()} |
6.42 + {error, reason(), state()}).
6.43 +-spec(terminate/2 :: (any(), state()) -> state()).
7.1 --- a/rabbit_common.app.in Thu Apr 07 10:36:27 2011 -0500
7.2 +++ b/rabbit_common.app.in Thu Jul 28 13:46:07 2011 +0100
7.3 @@ -3,6 +3,7 @@
7.4 {vsn, "%%VSN%%"},
7.5 {modules, [
7.6 gen_server2,
7.7 + priority_queue,
7.8 rabbit_backing_queue,
7.9 rabbit_basic,
7.10 rabbit_binary_generator,
8.1 --- a/src/amqp_auth_mechanisms.erl Thu Apr 07 10:36:27 2011 -0500
8.2 +++ b/src/amqp_auth_mechanisms.erl Thu Jul 28 13:46:07 2011 +0100
8.3 @@ -25,14 +25,14 @@
8.4
8.5 plain(none, _, init) ->
8.6 {<<"PLAIN">>, []};
8.7 -plain(none, #amqp_params{username = Username,
8.8 - password = Password}, _State) ->
8.9 +plain(none, #amqp_params_network{username = Username,
8.10 + password = Password}, _State) ->
8.11 {<<0, Username/binary, 0, Password/binary>>, _State}.
8.12
8.13 amqplain(none, _, init) ->
8.14 {<<"AMQPLAIN">>, []};
8.15 -amqplain(none, #amqp_params{username = Username,
8.16 - password = Password}, _State) ->
8.17 +amqplain(none, #amqp_params_network{username = Username,
8.18 + password = Password}, _State) ->
8.19 LoginTable = [{<<"LOGIN">>, longstr, Username},
8.20 {<<"PASSWORD">>, longstr, Password}],
8.21 {rabbit_binary_generator:generate_table(LoginTable), _State}.
9.1 --- a/src/amqp_channel.erl Thu Apr 07 10:36:27 2011 -0500
9.2 +++ b/src/amqp_channel.erl Thu Jul 28 13:46:07 2011 +0100
9.3 @@ -68,11 +68,11 @@
9.4
9.5 -export([call/2, call/3, cast/2, cast/3]).
9.6 -export([close/1, close/3]).
9.7 --export([next_publish_seqno/1]).
9.8 -export([register_return_handler/2, register_flow_handler/2,
9.9 register_confirm_handler/2]).
9.10 --export([call_consumer/2]).
9.11 -
9.12 +-export([call_consumer/2, subscribe/3]).
9.13 +-export([next_publish_seqno/1, wait_for_confirms/1,
9.14 + wait_for_confirms_or_die/1]).
9.15 -export([start_link/5, connection_closing/3, open/1]).
9.16
9.17 -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
9.18 @@ -83,6 +83,7 @@
9.19
9.20 -record(state, {number,
9.21 connection,
9.22 + consumer,
9.23 driver,
9.24 rpc_requests = queue:new(),
9.25 closing = false, %% false |
9.26 @@ -94,9 +95,10 @@
9.27 next_pub_seqno = 0,
9.28 flow_active = true,
9.29 flow_handler_pid = none,
9.30 - consumer_module,
9.31 - consumer_state,
9.32 - start_writer_fun
9.33 + start_writer_fun,
9.34 + unconfirmed_set = gb_sets:new(),
9.35 + waiting_set = gb_trees:empty(),
9.36 + only_acks_received = true
9.37 }).
9.38
9.39 %%---------------------------------------------------------------------------
9.40 @@ -129,7 +131,7 @@
9.41 %% @spec (Channel, Method) -> Result
9.42 %% @doc This is equivalent to amqp_channel:call(Channel, Method, none).
9.43 call(Channel, Method) ->
9.44 - gen_server:call(Channel, {call, Method, none}, infinity).
9.45 + gen_server:call(Channel, {call, Method, none, self()}, infinity).
9.46
9.47 %% @spec (Channel, Method, Content) -> Result
9.48 %% where
9.49 @@ -152,12 +154,12 @@
9.50 %% the broker. It does not necessarily imply that the broker has
9.51 %% accepted responsibility for the message.
9.52 call(Channel, Method, Content) ->
9.53 - gen_server:call(Channel, {call, Method, Content}, infinity).
9.54 + gen_server:call(Channel, {call, Method, Content, self()}, infinity).
9.55
9.56 %% @spec (Channel, Method) -> ok
9.57 %% @doc This is equivalent to amqp_channel:cast(Channel, Method, none).
9.58 cast(Channel, Method) ->
9.59 - gen_server:cast(Channel, {cast, Method, none}).
9.60 + gen_server:cast(Channel, {cast, Method, none, self()}).
9.61
9.62 %% @spec (Channel, Method, Content) -> ok
9.63 %% where
9.64 @@ -169,7 +171,7 @@
9.65 %% This function is not recommended with synchronous methods, since there is no
9.66 %% way to verify that the server has received the method.
9.67 cast(Channel, Method, Content) ->
9.68 - gen_server:cast(Channel, {cast, Method, Content}).
9.69 + gen_server:cast(Channel, {cast, Method, Content, self()}).
9.70
9.71 %% @spec (Channel) -> ok | closing
9.72 %% where
9.73 @@ -197,6 +199,24 @@
9.74 next_publish_seqno(Channel) ->
9.75 gen_server:call(Channel, next_publish_seqno, infinity).
9.76
9.77 +%% @spec (Channel) -> boolean()
9.78 +%% where
9.79 +%% Channel = pid()
9.80 +%% @doc Wait until all messages published since the last call have
9.81 +%% been either ack'd or nack'd by the broker. Note, when called on a
9.82 +%% non-Confirm channel, waitForConfirms returns true immediately.
9.83 +wait_for_confirms(Channel) ->
9.84 + gen_server:call(Channel, wait_for_confirms, infinity).
9.85 +
9.86 +%% @spec (Channel) -> true
9.87 +%% where
9.88 +%% Channel = pid()
9.89 +%% @doc Behaves the same as wait_for_confirms/1, but if a nack is
9.90 +%% received, the calling process is immediately sent an
9.91 +%% exit(nack_received).
9.92 +wait_for_confirms_or_die(Channel) ->
9.93 + gen_server:call(Channel, {wait_for_confirms_or_die, self()}, infinity).
9.94 +
9.95 %% @spec (Channel, ReturnHandler) -> ok
9.96 %% where
9.97 %% Channel = pid()
9.98 @@ -226,15 +246,25 @@
9.99 register_flow_handler(Channel, FlowHandler) ->
9.100 gen_server:cast(Channel, {register_flow_handler, FlowHandler} ).
9.101
9.102 -%% @spec (Channel, Message) -> ok
9.103 +%% @spec (Channel, Msg) -> ok
9.104 %% where
9.105 %% Channel = pid()
9.106 -%% Message = any()
9.107 +%% Msg = any()
9.108 %% @doc This causes the channel to invoke Consumer:handle_call/2,
9.109 %% where Consumer is the amqp_gen_consumer implementation registered with
9.110 %% the channel.
9.111 -call_consumer(Channel, Call) ->
9.112 - gen_server:call(Channel, {call_consumer, Call}, infinity).
9.113 +call_consumer(Channel, Msg) ->
9.114 + gen_server:call(Channel, {call_consumer, Msg}, infinity).
9.115 +
9.116 +%% @spec (Channel, BasicConsume, Subscriber) -> ok
9.117 +%% where
9.118 +%% Channel = pid()
9.119 +%% BasicConsume = amqp_method()
9.120 +%% Subscriber = pid()
9.121 +%% @doc Subscribe the given pid to a queue using the specified
9.122 +%% basic.consume method.
9.123 +subscribe(Channel, BasicConsume = #'basic.consume'{}, Subscriber) ->
9.124 + gen_server:call(Channel, {subscribe, BasicConsume, Subscriber}, infinity).
9.125
9.126 %%---------------------------------------------------------------------------
9.127 %% Internal interface
9.128 @@ -258,24 +288,22 @@
9.129 %%---------------------------------------------------------------------------
9.130
9.131 %% @private
9.132 -init([Driver, Connection, ChannelNumber, {ConsumerModule, ConsumerArgs},
9.133 - SWF]) ->
9.134 - State0 = #state{connection = Connection,
9.135 - driver = Driver,
9.136 - number = ChannelNumber,
9.137 - consumer_module = ConsumerModule,
9.138 - start_writer_fun = SWF},
9.139 - {ok, consumer_callback(init, [ConsumerArgs], State0)}.
9.140 +init([Driver, Connection, ChannelNumber, Consumer, SWF]) ->
9.141 + {ok, #state{connection = Connection,
9.142 + driver = Driver,
9.143 + number = ChannelNumber,
9.144 + consumer = Consumer,
9.145 + start_writer_fun = SWF}}.
9.146
9.147 %% @private
9.148 handle_call(open, From, State) ->
9.149 - {noreply, rpc_top_half(#'channel.open'{}, none, From, State)};
9.150 + {noreply, rpc_top_half(#'channel.open'{}, none, From, none, State)};
9.151 %% @private
9.152 handle_call({close, Code, Text}, From, State) ->
9.153 handle_close(Code, Text, From, State);
9.154 %% @private
9.155 -handle_call({call, Method, AmqpMsg}, From, State) ->
9.156 - handle_method_to_server(Method, AmqpMsg, From, State);
9.157 +handle_call({call, Method, AmqpMsg, Sender}, From, State) ->
9.158 + handle_method_to_server(Method, AmqpMsg, From, Sender, State);
9.159 %% Handles the delivery of messages from a direct channel
9.160 %% @private
9.161 handle_call({send_command_sync, Method, Content}, From, State) ->
9.162 @@ -292,13 +320,25 @@
9.163 handle_call(next_publish_seqno, _From,
9.164 State = #state{next_pub_seqno = SeqNo}) ->
9.165 {reply, SeqNo, State};
9.166 +
9.167 +handle_call(wait_for_confirms, From, State) ->
9.168 + handle_wait_for_confirms(From, none, true, State);
9.169 +
9.170 +%% Lets the channel know that the process should be sent an exit
9.171 +%% signal if a nack is received.
9.172 +handle_call({wait_for_confirms_or_die, Pid}, From, State) ->
9.173 + handle_wait_for_confirms(From, Pid, ok, State);
9.174 %% @private
9.175 -handle_call({call_consumer, Call}, _From, State) ->
9.176 - handle_consumer_callback(handle_call, [Call], State).
9.177 +handle_call({call_consumer, Msg}, _From,
9.178 + State = #state{consumer = Consumer}) ->
9.179 + {reply, amqp_gen_consumer:call_consumer(Consumer, Msg), State};
9.180 +%% @private
9.181 +handle_call({subscribe, BasicConsume, Subscriber}, From, State) ->
9.182 + handle_method_to_server(BasicConsume, none, From, Subscriber, State).
9.183
9.184 %% @private
9.185 -handle_cast({cast, Method, AmqpMsg}, State) ->
9.186 - handle_method_to_server(Method, AmqpMsg, none, State);
9.187 +handle_cast({cast, Method, AmqpMsg, Sender}, State) ->
9.188 + handle_method_to_server(Method, AmqpMsg, none, Sender, State);
9.189 %% @private
9.190 handle_cast({register_return_handler, ReturnHandler}, State) ->
9.191 erlang:monitor(process, ReturnHandler),
9.192 @@ -377,8 +417,8 @@
9.193 {noreply, State#state{flow_handler_pid = none}}.
9.194
9.195 %% @private
9.196 -terminate(Reason, State) ->
9.197 - consumer_callback(terminate, [Reason], State).
9.198 +terminate(_Reason, State) ->
9.199 + State.
9.200
9.201 %% @private
9.202 code_change(_OldVsn, State, _Extra) ->
9.203 @@ -388,7 +428,8 @@
9.204 %% RPC mechanism
9.205 %%---------------------------------------------------------------------------
9.206
9.207 -handle_method_to_server(Method, AmqpMsg, From, State) ->
9.208 +handle_method_to_server(Method, AmqpMsg, From, Sender,
9.209 + State = #state{unconfirmed_set = USet}) ->
9.210 case {check_invalid_method(Method), From,
9.211 check_block(Method, AmqpMsg, State)} of
9.212 {ok, _, ok} ->
9.213 @@ -398,12 +439,14 @@
9.214 {#'basic.publish'{}, 0} ->
9.215 State;
9.216 {#'basic.publish'{}, SeqNo} ->
9.217 - State#state{next_pub_seqno = SeqNo + 1};
9.218 + State#state{unconfirmed_set =
9.219 + gb_sets:add(SeqNo, USet),
9.220 + next_pub_seqno = SeqNo + 1};
9.221 _ ->
9.222 State
9.223 end,
9.224 - {noreply,
9.225 - rpc_top_half(Method, build_content(AmqpMsg), From, State1)};
9.226 + {noreply, rpc_top_half(Method, build_content(AmqpMsg),
9.227 + From, Sender, State1)};
9.228 {ok, none, BlockReply} ->
9.229 ?LOG_WARN("Channel (~p): discarding method ~p in cast.~n"
9.230 "Reason: ~p~n", [self(), Method, BlockReply]),
9.231 @@ -424,21 +467,22 @@
9.232 class_id = 0,
9.233 method_id = 0},
9.234 case check_block(Close, none, State) of
9.235 - ok -> {noreply, rpc_top_half(Close, none, From, State)};
9.236 + ok -> {noreply, rpc_top_half(Close, none, From, none, State)};
9.237 BlockReply -> {reply, BlockReply, State}
9.238 end.
9.239
9.240 -rpc_top_half(Method, Content, From,
9.241 +rpc_top_half(Method, Content, From, Sender,
9.242 State0 = #state{rpc_requests = RequestQueue}) ->
9.243 State1 = State0#state{
9.244 - rpc_requests = queue:in({From, Method, Content}, RequestQueue)},
9.245 + rpc_requests =
9.246 + queue:in({From, Sender, Method, Content}, RequestQueue)},
9.247 IsFirstElement = queue:is_empty(RequestQueue),
9.248 if IsFirstElement -> do_rpc(State1);
9.249 true -> State1
9.250 end.
9.251
9.252 rpc_bottom_half(Reply, State = #state{rpc_requests = RequestQueue}) ->
9.253 - {{value, {From, _Method, _Content}}, RequestQueue1} =
9.254 + {{value, {From, _Sender, _Method, _Content}}, RequestQueue1} =
9.255 queue:out(RequestQueue),
9.256 case From of
9.257 none -> ok;
9.258 @@ -449,8 +493,8 @@
9.259 do_rpc(State = #state{rpc_requests = Q,
9.260 closing = Closing}) ->
9.261 case queue:out(Q) of
9.262 - {{value, {From, Method, Content}}, NewQ} ->
9.263 - State1 = pre_do(Method, Content, State),
9.264 + {{value, {From, Sender, Method, Content}}, NewQ} ->
9.265 + State1 = pre_do(Method, Content, Sender, State),
9.266 DoRet = do(Method, Content, State1),
9.267 case ?PROTOCOL:is_method_synchronous(Method) of
9.268 true -> State1;
9.269 @@ -475,15 +519,18 @@
9.270 end.
9.271
9.272 pending_rpc_method(#state{rpc_requests = Q}) ->
9.273 - {value, {_From, Method, _Content}} = queue:peek(Q),
9.274 + {value, {_From, _Sender, Method, _Content}} = queue:peek(Q),
9.275 Method.
9.276
9.277 -pre_do(#'channel.open'{}, none, State) ->
9.278 +pre_do(#'channel.open'{}, none, _Sender, State) ->
9.279 start_writer(State);
9.280 pre_do(#'channel.close'{reply_code = Code, reply_text = Text}, none,
9.281 - State) ->
9.282 + _Sender, State) ->
9.283 State#state{closing = {just_channel, {app_initiated_close, Code, Text}}};
9.284 -pre_do(_, _, State) ->
9.285 +pre_do(#'basic.consume'{} = Method, none, Sender, State) ->
9.286 + ok = call_to_consumer(Method, Sender, State),
9.287 + State;
9.288 +pre_do(_, _, _, State) ->
9.289 State.
9.290
9.291 %%---------------------------------------------------------------------------
9.292 @@ -543,12 +590,18 @@
9.293 end;
9.294 handle_method_from_server1(#'basic.consume_ok'{} = ConsumeOk, none, State) ->
9.295 Consume = #'basic.consume'{} = pending_rpc_method(State),
9.296 - State1 = consumer_callback(handle_consume_ok, [ConsumeOk, Consume], State),
9.297 - {noreply, rpc_bottom_half(ConsumeOk, State1)};
9.298 + ok = call_to_consumer(ConsumeOk, Consume, State),
9.299 + {noreply, rpc_bottom_half(ConsumeOk, State)};
9.300 handle_method_from_server1(#'basic.cancel_ok'{} = CancelOk, none, State) ->
9.301 Cancel = #'basic.cancel'{} = pending_rpc_method(State),
9.302 - State1 = consumer_callback(handle_cancel_ok, [CancelOk, Cancel], State),
9.303 - {noreply, rpc_bottom_half(CancelOk, State1)};
9.304 + ok = call_to_consumer(CancelOk, Cancel, State),
9.305 + {noreply, rpc_bottom_half(CancelOk, State)};
9.306 +handle_method_from_server1(#'basic.cancel'{} = Cancel, none, State) ->
9.307 + ok = call_to_consumer(Cancel, none, State),
9.308 + {noreply, State};
9.309 +handle_method_from_server1(#'basic.deliver'{} = Deliver, AmqpMsg, State) ->
9.310 + ok = call_to_consumer(Deliver, AmqpMsg, State),
9.311 + {noreply, State};
9.312 handle_method_from_server1(#'channel.flow'{active = Active} = Flow, none,
9.313 State = #state{flow_handler_pid = FlowHandler}) ->
9.314 case FlowHandler of none -> ok;
9.315 @@ -558,9 +611,7 @@
9.316 %% flushed beforehand. Methods that made it to the queue are not
9.317 %% blocked in any circumstance.
9.318 {noreply, rpc_top_half(#'channel.flow_ok'{active = Active}, none, none,
9.319 - State#state{flow_active = Active})};
9.320 -handle_method_from_server1(#'basic.deliver'{} = Deliver, AmqpMsg, State) ->
9.321 - handle_consumer_callback(handle_deliver, [{Deliver, AmqpMsg}], State);
9.322 + none, State#state{flow_active = Active})};
9.323 handle_method_from_server1(
9.324 #'basic.return'{} = BasicReturn, AmqpMsg,
9.325 State = #state{return_handler_pid = ReturnHandler}) ->
9.326 @@ -571,28 +622,26 @@
9.327 _ -> ReturnHandler ! {BasicReturn, AmqpMsg}
9.328 end,
9.329 {noreply, State};
9.330 -handle_method_from_server1(#'basic.cancel'{} = Cancel, none, State) ->
9.331 - handle_consumer_callback(handle_cancel, [Cancel], State);
9.332 handle_method_from_server1(#'basic.ack'{} = BasicAck, none,
9.333 #state{confirm_handler_pid = none} = State) ->
9.334 ?LOG_WARN("Channel (~p): received ~p but there is no "
9.335 "confirm handler registered~n", [self(), BasicAck]),
9.336 - {noreply, State};
9.337 + {noreply, update_confirm_set(BasicAck, State)};
9.338 handle_method_from_server1(
9.339 #'basic.ack'{} = BasicAck, none,
9.340 #state{confirm_handler_pid = ConfirmHandler} = State) ->
9.341 ConfirmHandler ! BasicAck,
9.342 - {noreply, State};
9.343 + {noreply, update_confirm_set(BasicAck, State)};
9.344 handle_method_from_server1(#'basic.nack'{} = BasicNack, none,
9.345 #state{confirm_handler_pid = none} = State) ->
9.346 ?LOG_WARN("Channel (~p): received ~p but there is no "
9.347 "confirm handler registered~n", [self(), BasicNack]),
9.348 - {noreply, State};
9.349 + {noreply, update_confirm_set(BasicNack, handle_nack(State))};
9.350 handle_method_from_server1(
9.351 #'basic.nack'{} = BasicNack, none,
9.352 #state{confirm_handler_pid = ConfirmHandler} = State) ->
9.353 ConfirmHandler ! BasicNack,
9.354 - {noreply, State};
9.355 + {noreply, update_confirm_set(BasicNack, handle_nack(State))};
9.356
9.357 handle_method_from_server1(Method, none, State) ->
9.358 {noreply, rpc_bottom_half(Method, State)};
9.359 @@ -717,25 +766,44 @@
9.360 {noreply, State}
9.361 end.
9.362
9.363 -handle_consumer_callback(handle_call, Args,
9.364 - State = #state{consumer_state = CState,
9.365 - consumer_module = CModule}) ->
9.366 - {reply, Reply, NewCState} =
9.367 - erlang:apply(CModule, handle_call, Args ++ [CState]),
9.368 - {reply, Reply, State#state{consumer_state = NewCState}};
9.369 -handle_consumer_callback(Function, Args, State) ->
9.370 - {noreply, consumer_callback(Function, Args, State)}.
9.371 +handle_nack(State = #state{waiting_set = WSet}) ->
9.372 + DyingPids = [Pid || {_, Pid} <- gb_trees:to_list(WSet), Pid =/= none],
9.373 + case DyingPids of
9.374 + [] -> State;
9.375 + _ -> [exit(Pid, nack_received) || Pid <- DyingPids],
9.376 + close(self(), 200, <<"Nacks Received">>)
9.377 + end.
9.378
9.379 -consumer_callback(init, Args, State = #state{}) ->
9.380 - consumer_callback_basic(init, Args, State);
9.381 -consumer_callback(terminate, Args, State = #state{consumer_state = CState,
9.382 - consumer_module = CModule}) ->
9.383 - erlang:apply(CModule, terminate, Args ++ [CState]),
9.384 - State;
9.385 -consumer_callback(Function, Args, State = #state{consumer_state = CState}) ->
9.386 - consumer_callback_basic(Function, Args ++ [CState], State).
9.387 +update_confirm_set(#'basic.ack'{delivery_tag = SeqNo},
9.388 + State = #state{unconfirmed_set = USet}) ->
9.389 + maybe_notify_waiters(
9.390 + State#state{unconfirmed_set = gb_sets:del_element(SeqNo, USet)});
9.391 +update_confirm_set(#'basic.nack'{delivery_tag = SeqNo},
9.392 + State = #state{unconfirmed_set = USet}) ->
9.393 + maybe_notify_waiters(
9.394 + State#state{unconfirmed_set = gb_sets:del_element(SeqNo, USet),
9.395 + only_acks_received = false}).
9.396
9.397 -consumer_callback_basic(Function,
9.398 - Args, State = #state{consumer_module = CModule}) ->
9.399 - {ok, NewCState} = erlang:apply(CModule, Function, Args),
9.400 - State#state{consumer_state = NewCState}.
9.401 +maybe_notify_waiters(State = #state{unconfirmed_set = USet}) ->
9.402 + case gb_sets:is_empty(USet) of
9.403 + false -> State;
9.404 + true -> notify_confirm_waiters(State)
9.405 + end.
9.406 +
9.407 +notify_confirm_waiters(State = #state{waiting_set = WSet,
9.408 + only_acks_received = OAR}) ->
9.409 + [gen_server:reply(From, OAR) || {From, _} <- gb_trees:to_list(WSet)],
9.410 + State#state{waiting_set = gb_trees:empty(),
9.411 + only_acks_received = true}.
9.412 +
9.413 +handle_wait_for_confirms(From, Notify, EmptyReply,
9.414 + State = #state{unconfirmed_set = USet,
9.415 + waiting_set = WSet}) ->
9.416 + case gb_sets:is_empty(USet) of
9.417 + true -> {reply, EmptyReply, State};
9.418 + false -> {noreply, State#state{waiting_set =
9.419 + gb_trees:insert(From, Notify, WSet)}}
9.420 + end.
9.421 +
9.422 +call_to_consumer(Method, Args, #state{consumer = Consumer}) ->
9.423 + amqp_gen_consumer:call_consumer(Consumer, Method, Args).
10.1 --- a/src/amqp_channel_sup.erl Thu Apr 07 10:36:27 2011 -0500
10.2 +++ b/src/amqp_channel_sup.erl Thu Jul 28 13:46:07 2011 +0100
10.3 @@ -28,14 +28,15 @@
10.4 %% Interface
10.5 %%---------------------------------------------------------------------------
10.6
10.7 -start_link(Type, Connection, InfraArgs, ChNumber, Consumer) ->
10.8 - {ok, Sup} = supervisor2:start_link(?MODULE, []),
10.9 +start_link(Type, Connection, InfraArgs, ChNumber, Consumer = {_, _}) ->
10.10 + {ok, Sup} = supervisor2:start_link(?MODULE, [Consumer]),
10.11 + [{gen_consumer, ConsumerPid, _, _}] = supervisor2:which_children(Sup),
10.12 {ok, ChPid} = supervisor2:start_child(
10.13 Sup, {channel, {amqp_channel, start_link,
10.14 - [Type, Connection, ChNumber, Consumer,
10.15 + [Type, Connection, ChNumber, ConsumerPid,
10.16 start_writer_fun(Sup, Type, InfraArgs,
10.17 ChNumber)]},
10.18 - intrinsic, brutal_kill, worker, [amqp_channel]}),
10.19 + intrinsic, ?MAX_WAIT, worker, [amqp_channel]}),
10.20 {ok, AState} = init_command_assembler(Type),
10.21 {ok, Sup, {ChPid, AState}}.
10.22
10.23 @@ -60,7 +61,7 @@
10.24 {writer, {rabbit_writer, start_link,
10.25 [Sock, ChNumber, ?FRAME_MIN_SIZE, ?PROTOCOL,
10.26 self()]},
10.27 - transient, ?MAX_WAIT, worker, [rabbit_writer]})
10.28 + intrinsic, ?MAX_WAIT, worker, [rabbit_writer]})
10.29 end.
10.30
10.31 init_command_assembler(direct) -> {ok, none};
10.32 @@ -70,5 +71,8 @@
10.33 %% supervisor2 callbacks
10.34 %%---------------------------------------------------------------------------
10.35
10.36 -init([]) ->
10.37 - {ok, {{one_for_all, 0, 1}, []}}.
10.38 +init([{ConsumerModule, ConsumerArgs}]) ->
10.39 + {ok, {{one_for_all, 0, 1},
10.40 + [{gen_consumer, {amqp_gen_consumer, start_link,
10.41 + [ConsumerModule, ConsumerArgs]},
10.42 + intrinsic, ?MAX_WAIT, worker, [amqp_gen_consumer]}]}}.
11.1 --- a/src/amqp_connection.erl Thu Apr 07 10:36:27 2011 -0500
11.2 +++ b/src/amqp_connection.erl Thu Jul 28 13:46:07 2011 +0100
11.3 @@ -70,7 +70,7 @@
11.4 -include("amqp_client.hrl").
11.5
11.6 -export([open_channel/1, open_channel/2, open_channel/3]).
11.7 --export([start/1, start/2]).
11.8 +-export([start/1]).
11.9 -export([close/1, close/3]).
11.10 -export([info/2, info_keys/1, info_keys/0]).
11.11
11.12 @@ -78,7 +78,22 @@
11.13 %% Type Definitions
11.14 %%---------------------------------------------------------------------------
11.15
11.16 -%% @type amqp_params() = #amqp_params{}.
11.17 +%% @type adapter_info() = #adapter_info{}.
11.18 +%% @type amqp_params_direct() = #amqp_params_direct{}.
11.19 +%% As defined in amqp_client.hrl. It contains the following fields:
11.20 +%% <ul>
11.21 +%% <li>username :: binary() - The name of a user registered with the broker,
11.22 +%% defaults to <<guest">></li>
11.23 +%% <li>virtual_host :: binary() - The name of a virtual host in the broker,
11.24 +%% defaults to <<"/">></li>
11.25 +%% <li>node :: atom() - The node the broker runs on (direct only)</li>
11.26 +%% <li>adapter_info :: adapter_info() - Extra management information for if
11.27 +%% this connection represents a non-AMQP network connection.</li>
11.28 +%% <li>client_properties :: [{binary(), atom(), binary()}] - A list of extra
11.29 +%% client properties to be sent to the server, defaults to []</li>
11.30 +%% </ul>
11.31 +%%
11.32 +%% @type amqp_params_network() = #amqp_params_network{}.
11.33 %% As defined in amqp_client.hrl. It contains the following fields:
11.34 %% <ul>
11.35 %% <li>username :: binary() - The name of a user registered with the broker,
11.36 @@ -91,7 +106,6 @@
11.37 %% defaults to "localhost" (network only)</li>
11.38 %% <li>port :: integer() - The port the broker is listening on,
11.39 %% defaults to 5672 (network only)</li>
11.40 -%% <li>node :: atom() - The node the broker runs on (direct only)</li>
11.41 %% <li>channel_max :: non_neg_integer() - The channel_max handshake parameter,
11.42 %% defaults to 0</li>
11.43 %% <li>frame_max :: non_neg_integer() - The frame_max handshake parameter,
11.44 @@ -104,43 +118,38 @@
11.45 %% client properties to be sent to the server, defaults to []</li>
11.46 %% </ul>
11.47
11.48 +
11.49 %%---------------------------------------------------------------------------
11.50 %% Starting a connection
11.51 %%---------------------------------------------------------------------------
11.52
11.53 -%% @spec (Type) -> {ok, Connection} | {error, Error}
11.54 +%% @spec (Params) -> {ok, Connection} | {error, Error}
11.55 %% where
11.56 -%% Type = network | direct
11.57 -%% Connection = pid()
11.58 -%% @doc Starts a connection to an AMQP server. Use network type to connect
11.59 -%% to a remote AMQP server - default connection settings are used, meaning that
11.60 -%% the server is expected to be at localhost:5672, with a vhost of "/"
11.61 -%% authorising a user guest/guest. Use direct type for a direct connection to
11.62 -%% a RabbitMQ server, assuming that the server is running in the same process
11.63 -%% space, and with a default set of amqp_params. If a different host, port,
11.64 -%% vhost or credential set is required, start/2 should be used.
11.65 -start(Type) ->
11.66 - start(Type, #amqp_params{}).
11.67 -
11.68 -%% @spec (Type, amqp_params()) -> {ok, Connection} | {error, Error}
11.69 -%% where
11.70 -%% Type = network | direct
11.71 +%% Params = amqp_params_network() | amqp_params_direct()
11.72 %% Connection = pid()
11.73 -%% @doc Starts a connection to an AMQP server. Use network type to connect
11.74 -%% to a remote AMQP server or direct type for a direct connection to
11.75 -%% a RabbitMQ server, assuming that the server is running in the same process
11.76 -%% space.
11.77 -start(Type, AmqpParams) ->
11.78 +%% @doc Starts a connection to an AMQP server. Use network params to
11.79 +%% connect to a remote AMQP server or direct params for a direct
11.80 +%% connection to a RabbitMQ server, assuming that the server is
11.81 +%% running in the same process space. If the port is set to 'undefined',
11.82 +%% the default ports will be selected depending on whether this is a
11.83 +%% normal or an SSL connection.
11.84 +start(AmqpParams) ->
11.85 + io:format("starting connection: ~p~n", [AmqpParams]),
11.86 case amqp_client:start() of
11.87 ok -> ok;
11.88 {error, {already_started, amqp_client}} -> ok;
11.89 {error, _} = E -> throw(E)
11.90 end,
11.91 - {ok, _Sup, Connection} =
11.92 - amqp_sup:start_connection_sup(
11.93 - Type, case Type of direct -> amqp_direct_connection;
11.94 - network -> amqp_network_connection
11.95 - end, AmqpParams),
11.96 + AmqpParams1 =
11.97 + case AmqpParams of
11.98 + #amqp_params_network{port = undefined, ssl_options = none} ->
11.99 + AmqpParams#amqp_params_network{port = ?PROTOCOL_PORT};
11.100 + #amqp_params_network{port = undefined, ssl_options = _} ->
11.101 + AmqpParams#amqp_params_network{port = ?PROTOCOL_SSL_PORT};
11.102 + _ ->
11.103 + AmqpParams
11.104 + end,
11.105 + {ok, _Sup, Connection} = amqp_sup:start_connection_sup(AmqpParams1),
11.106 amqp_gen_connection:connect(Connection).
11.107
11.108 %%---------------------------------------------------------------------------
12.1 --- a/src/amqp_connection_sup.erl Thu Apr 07 10:36:27 2011 -0500
12.2 +++ b/src/amqp_connection_sup.erl Thu Jul 28 13:46:07 2011 +0100
12.3 @@ -21,15 +21,20 @@
12.4
12.5 -behaviour(supervisor2).
12.6
12.7 --export([start_link/3]).
12.8 +-export([start_link/1]).
12.9 -export([init/1]).
12.10
12.11 %%---------------------------------------------------------------------------
12.12 %% Interface
12.13 %%---------------------------------------------------------------------------
12.14
12.15 -start_link(Type, Module, AmqpParams) ->
12.16 +start_link(AmqpParams) ->
12.17 {ok, Sup} = supervisor2:start_link(?MODULE, []),
12.18 + {Type, Module} =
12.19 + case AmqpParams of
12.20 + #amqp_params_direct{} -> {direct, amqp_direct_connection};
12.21 + #amqp_params_network{} -> {network, amqp_network_connection}
12.22 + end,
12.23 SChMF = start_channels_manager_fun(Sup, Type),
12.24 SIF = start_infrastructure_fun(Sup, Type),
12.25 {ok, Connection} = supervisor2:start_child(
13.1 --- a/src/amqp_direct_connection.erl Thu Apr 07 10:36:27 2011 -0500
13.2 +++ b/src/amqp_direct_connection.erl Thu Jul 28 13:46:07 2011 +0100
13.3 @@ -27,12 +27,18 @@
13.4 -record(state, {node,
13.5 user,
13.6 vhost,
13.7 + params,
13.8 + adapter_info,
13.9 collector,
13.10 closing_reason %% undefined | Reason
13.11 }).
13.12
13.13 -define(INFO_KEYS, [type]).
13.14
13.15 +-define(CREATION_EVENT_KEYS, [pid, protocol, address, port, name,
13.16 + peer_address, peer_port,
13.17 + user, vhost, client_properties, type]).
13.18 +
13.19 %%---------------------------------------------------------------------------
13.20
13.21 init([]) ->
13.22 @@ -58,29 +64,70 @@
13.23 rabbit_queue_collector:delete_all(Collector),
13.24 {stop, {shutdown, Reason}, State}.
13.25
13.26 -terminate(_Reason, _State) ->
13.27 +terminate(_Reason, #state{node = Node}) ->
13.28 + rpc:call(Node, rabbit_direct, disconnect, [[{pid, self()}]]),
13.29 ok.
13.30
13.31 i(type, _State) -> direct;
13.32 +i(pid, _State) -> self();
13.33 +%% AMQP Params
13.34 +i(user, #state{params = P}) -> P#amqp_params_direct.username;
13.35 +i(vhost, #state{params = P}) -> P#amqp_params_direct.virtual_host;
13.36 +i(client_properties, #state{params = P}) ->
13.37 + P#amqp_params_direct.client_properties;
13.38 +%% Optional adapter info
13.39 +i(protocol, #state{adapter_info = I}) -> I#adapter_info.protocol;
13.40 +i(address, #state{adapter_info = I}) -> I#adapter_info.address;
13.41 +i(port, #state{adapter_info = I}) -> I#adapter_info.port;
13.42 +i(peer_address, #state{adapter_info = I}) -> I#adapter_info.peer_address;
13.43 +i(peer_port, #state{adapter_info = I}) -> I#adapter_info.peer_port;
13.44 +i(name, #state{adapter_info = I}) -> I#adapter_info.name;
13.45 +
13.46 i(Item, _State) -> throw({bad_argument, Item}).
13.47
13.48 info_keys() ->
13.49 ?INFO_KEYS.
13.50
13.51 -connect(#amqp_params{username = Username,
13.52 - password = Pass,
13.53 - node = Node,
13.54 - virtual_host = VHost}, SIF, _ChMgr, State) ->
13.55 +infos(Items, State) ->
13.56 + [{Item, i(Item, State)} || Item <- Items].
13.57 +
13.58 +additional_info(#state{adapter_info = I}) -> I#adapter_info.additional_info.
13.59 +
13.60 +connect(Params = #amqp_params_direct{username = Username,
13.61 + node = Node,
13.62 + adapter_info = Info,
13.63 + virtual_host = VHost},
13.64 + SIF, _ChMgr, State) ->
13.65 + State1 = State#state{node = Node,
13.66 + vhost = VHost,
13.67 + params = Params,
13.68 + adapter_info = ensure_adapter_info(Info)},
13.69 case rpc:call(Node, rabbit_direct, connect,
13.70 - [Username, Pass, VHost, ?PROTOCOL]) of
13.71 + [Username, VHost, ?PROTOCOL,
13.72 + infos(?CREATION_EVENT_KEYS, State1) ++
13.73 + additional_info(State1)]) of
13.74 {ok, {User, ServerProperties}} ->
13.75 {ok, Collector} = SIF(),
13.76 - {ok, {ServerProperties, 0, State#state{node = Node,
13.77 - user = User,
13.78 - vhost = VHost,
13.79 - collector = Collector}}};
13.80 + State2 = State1#state{user = User,
13.81 + collector = Collector},
13.82 + {ok, {ServerProperties, 0, State2}};
13.83 {error, _} = E ->
13.84 E;
13.85 {badrpc, nodedown} ->
13.86 {error, {nodedown, Node}}
13.87 end.
13.88 +
13.89 +ensure_adapter_info(none) ->
13.90 + ensure_adapter_info(#adapter_info{});
13.91 +
13.92 +ensure_adapter_info(A = #adapter_info{protocol = unknown}) ->
13.93 + ensure_adapter_info(A#adapter_info{protocol =
13.94 + {'Direct', ?PROTOCOL:version()}});
13.95 +
13.96 +ensure_adapter_info(A = #adapter_info{name = unknown,
13.97 + peer_address = unknown,
13.98 + peer_port = unknown}) ->
13.99 + Name = list_to_binary(rabbit_misc:pid_to_string(self())),
13.100 + ensure_adapter_info(A#adapter_info{name = Name});
13.101 +
13.102 +ensure_adapter_info(Info) -> Info.
14.1 --- a/src/amqp_direct_consumer.erl Thu Apr 07 10:36:27 2011 -0500
14.2 +++ b/src/amqp_direct_consumer.erl Thu Jul 28 13:46:07 2011 +0100
14.3 @@ -11,23 +11,28 @@
14.4 %% The Original Code is RabbitMQ.
14.5 %%
14.6 %% The Initial Developer of the Original Code is VMware, Inc.
14.7 -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
14.8 +%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved.
14.9 %%
14.10
14.11 -%% @doc This module is an implementation of the amqp_gen_consumer behaviour and
14.12 -%% can be used as part of the Consumer parameter when opening AMQP
14.13 -%% channels.<br/>
14.14 +%% @doc This module is an implementation of the amqp_gen_consumer
14.15 +%% behaviour and can be used as part of the Consumer parameter when
14.16 +%% opening AMQP channels.
14.17 %% <br/>
14.18 -%% The Consumer parameter for this implementation is
14.19 -%% {{@module}, [ConsumerPid]@}, where ConsumerPid is a process that
14.20 -%% will receive queue subscription-related messages.<br/>
14.21 %% <br/>
14.22 -%% This consumer implementation causes the channel to send to the ConsumerPid
14.23 -%% all basic.consume_ok, basic.cancel_ok, basic.cancel and basic.deliver
14.24 -%% messages received from the server.<br/>
14.25 +%% The Consumer parameter for this implementation is {{@module},
14.26 +%% [ConsumerPid]@}, where ConsumerPid is a process that will receive
14.27 +%% queue subscription-related messages.<br/>
14.28 %% <br/>
14.29 -%% In addition, this consumer implementation creates a link between the channel
14.30 -%% and the provided ConsumerPid.<br/>
14.31 +%% This consumer implementation causes the channel to send to the
14.32 +%% ConsumerPid all basic.consume, basic.consume_ok, basic.cancel,
14.33 +%% basic.cancel_ok and basic.deliver messages received from the
14.34 +%% server.
14.35 +%% <br/>
14.36 +%% <br/>
14.37 +%% In addition, this consumer implementation monitors the ConsumerPid
14.38 +%% and exits with the same shutdown reason when it dies. 'DOWN'
14.39 +%% messages from other sources are passed to ConsumerPid.
14.40 +%% <br/>
14.41 %% Warning! It is not recommended to rely on a consumer on killing off the
14.42 %% channel (through the exit signal). That may cause messages to get lost.
14.43 %% Always use amqp_channel:close/{1,3} for a clean shut down.<br/>
14.44 @@ -35,10 +40,13 @@
14.45 %% This module has no public functions.
14.46 -module(amqp_direct_consumer).
14.47
14.48 +-include("amqp_gen_consumer_spec.hrl").
14.49 +
14.50 -behaviour(amqp_gen_consumer).
14.51
14.52 --export([init/1, handle_consume_ok/3, handle_cancel_ok/3, handle_cancel/2,
14.53 - handle_deliver/2, handle_call/2, terminate/2]).
14.54 +-export([init/1, handle_consume_ok/3, handle_consume/3, handle_cancel_ok/3,
14.55 + handle_cancel/2, handle_deliver/3, handle_info/2, handle_call/3,
14.56 + terminate/2]).
14.57
14.58 %%---------------------------------------------------------------------------
14.59 %% amqp_gen_consumer callbacks
14.60 @@ -46,34 +54,46 @@
14.61
14.62 %% @private
14.63 init([ConsumerPid]) ->
14.64 - link(ConsumerPid),
14.65 + monitor(process, ConsumerPid),
14.66 {ok, ConsumerPid}.
14.67
14.68 %% @private
14.69 +handle_consume(M, A, C) ->
14.70 + C ! {M, A},
14.71 + {ok, C}.
14.72 +
14.73 +%% @private
14.74 handle_consume_ok(M, _, C) ->
14.75 C ! M,
14.76 {ok, C}.
14.77
14.78 %% @private
14.79 +handle_cancel(M, C) ->
14.80 + C ! M,
14.81 + {ok, C}.
14.82 +
14.83 +%% @private
14.84 handle_cancel_ok(M, _, C) ->
14.85 C ! M,
14.86 {ok, C}.
14.87
14.88 %% @private
14.89 -handle_cancel(M, C) ->
14.90 - C ! M,
14.91 +handle_deliver(M, A, C) ->
14.92 + C ! {M, A},
14.93 {ok, C}.
14.94
14.95 %% @private
14.96 -handle_deliver(M, C) ->
14.97 - C ! M,
14.98 +handle_info({'DOWN', _MRef, process, C, Info}, C) ->
14.99 + {error, {consumer_died, Info}, C};
14.100 +handle_info({'DOWN', MRef, process, Pid, Info}, C) ->
14.101 + C ! {'DOWN', MRef, process, Pid, Info},
14.102 {ok, C}.
14.103
14.104 %% @private
14.105 -handle_call(M, C) ->
14.106 - C ! M,
14.107 +handle_call(M, A, C) ->
14.108 + C ! {M, A},
14.109 {reply, ok, C}.
14.110
14.111 %% @private
14.112 -terminate(_Reason, _C) ->
14.113 - ok.
14.114 +terminate(_Reason, C) ->
14.115 + C.
15.1 --- a/src/amqp_gen_consumer.erl Thu Apr 07 10:36:27 2011 -0500
15.2 +++ b/src/amqp_gen_consumer.erl Thu Jul 28 13:46:07 2011 +0100
15.3 @@ -11,16 +11,64 @@
15.4 %% The Original Code is RabbitMQ.
15.5 %%
15.6 %% The Initial Developer of the Original Code is VMware, Inc.
15.7 -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
15.8 +%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved.
15.9 %%
15.10
15.11 -%% @doc A behaviour module for implementing consumers for amqp_channel. To
15.12 -%% specify a consumer implementation for a channel, use
15.13 -%% amqp_connection:open_channel/{2,3}.<br/>
15.14 -%% All callbacks are called withing the channel process.
15.15 +%% @doc A behaviour module for implementing consumers for
15.16 +%% amqp_channel. To specify a consumer implementation for a channel,
15.17 +%% use amqp_connection:open_channel/{2,3}.
15.18 +%% <br/>
15.19 +%% All callbacks are called within the gen_consumer process. <br/>
15.20 +%% <br/>
15.21 +%% See comments in amqp_gen_consumer.erl source file for documentation
15.22 +%% on the callback functions.
15.23 +%% <br/>
15.24 +%% Note that making calls to the channel from the callback module will
15.25 +%% result in deadlock.
15.26 -module(amqp_gen_consumer).
15.27
15.28 +-include("amqp_client.hrl").
15.29 +
15.30 +-behaviour(gen_server2).
15.31 +
15.32 +-export([start_link/2, call_consumer/2, call_consumer/3]).
15.33 -export([behaviour_info/1]).
15.34 +-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
15.35 + handle_info/2, prioritise_info/2]).
15.36 +
15.37 +-record(state, {module,
15.38 + module_state}).
15.39 +
15.40 +%%---------------------------------------------------------------------------
15.41 +%% Interface
15.42 +%%---------------------------------------------------------------------------
15.43 +
15.44 +%% @type ok_error() = {ok, state()} | {error, reason(), state()}
15.45 +%% Denotes a successful or an error return from a consumer module call.
15.46 +
15.47 +start_link(ConsumerModule, ExtraParams) ->
15.48 + gen_server2:start_link(?MODULE, [ConsumerModule, ExtraParams], []).
15.49 +
15.50 +%% @spec (Consumer, Msg) -> ok
15.51 +%% where
15.52 +%% Consumer = pid()
15.53 +%% Msg = any()
15.54 +%%
15.55 +%% @doc This function is used to perform arbitrary calls into the
15.56 +%% consumer module.
15.57 +call_consumer(Pid, Msg) ->
15.58 + gen_server2:call(Pid, {consumer_call, Msg}, infinity).
15.59 +
15.60 +%% @spec (Consumer, Method, Args) -> ok
15.61 +%% where
15.62 +%% Consumer = pid()
15.63 +%% Method = amqp_method()
15.64 +%% Args = any()
15.65 +%%
15.66 +%% @doc This function is used by amqp_channel to forward received
15.67 +%% methods and deliveries to the consumer module.
15.68 +call_consumer(Pid, Method, Args) ->
15.69 + gen_server2:call(Pid, {consumer_call, Method, Args}, infinity).
15.70
15.71 %%---------------------------------------------------------------------------
15.72 %% Behaviour
15.73 @@ -29,77 +77,176 @@
15.74 %% @private
15.75 behaviour_info(callbacks) ->
15.76 [
15.77 - %% @spec Module:init(Args) -> {ok, InitialState}
15.78 + %% init(Args) -> {ok, InitialState} | {stop, Reason} | ignore
15.79 %% where
15.80 %% Args = [any()]
15.81 - %% InitialState = any()
15.82 - %% @doc This function is called by the channel, when it starts up.
15.83 + %% InitialState = state()
15.84 + %% Reason = term()
15.85 + %%
15.86 + %% This callback is invoked by the channel, when it starts
15.87 + %% up. Use it to initialize the state of the consumer. In case of
15.88 + %% an error, return {stop, Reason} or ignore.
15.89 {init, 1},
15.90
15.91 - %% @type consume() = #'basic.consume'{}.
15.92 - %% The AMQP method that is used to subscribe a consumer to a queue.
15.93 - %% @type consume_ok() = #'basic.consume_ok'{}.
15.94 - %% The AMQP method returned in response to basic.consume.
15.95 - %% @spec Module:handle_consume(ConsumeOk, Consume, State) -> {ok, NewState}
15.96 + %% handle_consume(Consume, Sender, State) -> ok_error()
15.97 %% where
15.98 - %% ConsumeOk = consume_ok()
15.99 - %% Consume = consume()
15.100 - %% State = NewState = any()
15.101 - %% @doc This function is called by the channel every time a
15.102 + %% Consume = #'basic.consume'{}
15.103 + %% Sender = pid()
15.104 + %% State = state()
15.105 + %%
15.106 + %% This callback is invoked by the channel before a basic.consume
15.107 + %% is sent to the server.
15.108 + {handle_consume, 3},
15.109 +
15.110 + %% handle_consume_ok(ConsumeOk, Consume, State) -> ok_error()
15.111 + %% where
15.112 + %% ConsumeOk = #'basic.consume_ok'{}
15.113 + %% Consume = #'basic.consume'{}
15.114 + %% State = state()
15.115 + %%
15.116 + %% This callback is invoked by the channel every time a
15.117 %% basic.consume_ok is received from the server. Consume is the original
15.118 %% method sent out to the server - it can be used to associate the
15.119 %% call with the response.
15.120 {handle_consume_ok, 3},
15.121
15.122 - %% @type cancel() = #'basic.cancel'{}.
15.123 - %% The AMQP method used to cancel a subscription.
15.124 - %% @type cancel_ok() = #'basic.cancel_ok'{}.
15.125 - %% The AMQP method returned as reply to basicl.cancel.
15.126 - %% @spec Module:handle_cancel_ok(CancelOk, Cancel, State) -> {ok, NewState}
15.127 + %% handle_cancel(Cancel, State) -> ok_error()
15.128 %% where
15.129 - %% CancelOk = cancel_ok()
15.130 - %% Cancel = cancel()
15.131 - %% State = NewState = any()
15.132 - %% @doc This function is called by the channel every time a basic.cancel_ok
15.133 + %% Cancel = #'basic.cancel'{}
15.134 + %% State = state()
15.135 + %%
15.136 + %% This callback is invoked by the channel every time a basic.cancel
15.137 + %% is received from the server.
15.138 + {handle_cancel, 2},
15.139 +
15.140 + %% handle_cancel_ok(CancelOk, Cancel, State) -> ok_error()
15.141 + %% where
15.142 + %% CancelOk = #'basic.cancel_ok'{}
15.143 + %% Cancel = #'basic.cancel'{}
15.144 + %% State = state()
15.145 + %%
15.146 + %% This callback is invoked by the channel every time a basic.cancel_ok
15.147 %% is received from the server.
15.148 {handle_cancel_ok, 3},
15.149
15.150 - %% @type cancel() = #'basic.cancel'{}.
15.151 - %% The AMQP method used to cancel a subscription.
15.152 - %% @spec Module:handle_cancel(cancel(), State) -> {ok, NewState}
15.153 + %% handle_deliver(Deliver, Message, State) -> ok_error()
15.154 %% where
15.155 - %% State = NewState = any()
15.156 - %% @doc This function is called by the channel every time a basic.cancel
15.157 + %% Deliver = #'basic.deliver'{}
15.158 + %% Message = #amqp_msg{}
15.159 + %% State = state()
15.160 + %%
15.161 + %% This callback is invoked by the channel every time a basic.deliver
15.162 %% is received from the server.
15.163 - {handle_cancel, 2},
15.164 + {handle_deliver, 3},
15.165
15.166 - %% @type deliver() = #'basic.deliver'{}.
15.167 - %% The AMQP method sent when a message is delivered from a subscribed
15.168 - %% queue.
15.169 - %% @spec Module:handle_deliver({deliver(), #amqp_msg{}}, State} ->
15.170 - %% {ok, NewState}
15.171 + %% handle_info(Info, State) -> ok_error()
15.172 %% where
15.173 - %% State = NewState = any()
15.174 - %% @doc This function is called by the channel every time a basic.deliver
15.175 - %% is received from the server.
15.176 - {handle_deliver, 2},
15.177 + %% Info = any()
15.178 + %% State = state()
15.179 + %%
15.180 + %% This callback is invoked the consumer process receives a
15.181 + %% message.
15.182 + {handle_info, 2},
15.183
15.184 - %% @spec Module:handle_message(Message, State) -> {reply, Reply, NewState}
15.185 + %% handle_call(Msg, From, State) -> {reply, Reply, NewState} |
15.186 + %% {noreply, NewState} |
15.187 + %% {error, Reason, NewState}
15.188 %% where
15.189 + %% Msg = any()
15.190 + %% From = any()
15.191 %% Reply = any()
15.192 - %% State = NewState = any()
15.193 - %% @doc This function is called by the channel when calling
15.194 - %% amqp_channel:call_consumer/2. Reply is the term that will be returned
15.195 - %% when amqp_channel:call_consumer/2 returns.
15.196 - {handle_call, 2},
15.197 + %% State = state()
15.198 + %% NewState = state()
15.199 + %%
15.200 + %% This callback is invoked by the channel when calling
15.201 + %% amqp_channel:call_consumer/2. Reply is the term that
15.202 + %% amqp_channel:call_consumer/2 will return. If the callback
15.203 + %% returns {noreply, _}, then the caller to
15.204 + %% amqp_channel:call_consumer/2 and the channel remain blocked
15.205 + %% until gen_server2:reply/2 is used with the provided From as
15.206 + %% the first argument.
15.207 + {handle_call, 3},
15.208
15.209 - %% @spec Module:terminate(Reason, State) -> _
15.210 + %% terminate(Reason, State) -> any()
15.211 %% where
15.212 - %% State = any()
15.213 %% Reason = any()
15.214 - %% @doc This function is called by the channel after it has shut down and
15.215 + %% State = state()
15.216 + %%
15.217 + %% This callback is invoked by the channel after it has shut down and
15.218 %% just before its process exits.
15.219 {terminate, 2}
15.220 ];
15.221 behaviour_info(_Other) ->
15.222 undefined.
15.223 +
15.224 +%%---------------------------------------------------------------------------
15.225 +%% gen_server2 callbacks
15.226 +%%---------------------------------------------------------------------------
15.227 +
15.228 +init([ConsumerModule, ExtraParams]) ->
15.229 + case ConsumerModule:init(ExtraParams) of
15.230 + {ok, MState} ->
15.231 + {ok, #state{module = ConsumerModule, module_state = MState}};
15.232 + {stop, Reason} ->
15.233 + {stop, Reason};
15.234 + ignore ->
15.235 + ignore
15.236 + end.
15.237 +
15.238 +prioritise_info({'DOWN', _MRef, process, _Pid, _Info}, _State) -> 1;
15.239 +prioritise_info(_, _State) -> 0.
15.240 +
15.241 +handle_call({consumer_call, Msg}, From,
15.242 + State = #state{module = ConsumerModule,
15.243 + module_state = MState}) ->
15.244 + case ConsumerModule:handle_call(Msg, From, MState) of
15.245 + {noreply, NewMState} ->
15.246 + {noreply, State#state{module_state = NewMState}};
15.247 + {reply, Reply, NewMState} ->
15.248 + {reply, Reply, State#state{module_state = NewMState}};
15.249 + {error, Reason, NewMState} ->
15.250 + {stop, {error, Reason}, {error, Reason},
15.251 + State#state{module_state = NewMState}}
15.252 + end;
15.253 +handle_call({consumer_call, Method, Args}, _From,
15.254 + State = #state{module = ConsumerModule,
15.255 + module_state = MState}) ->
15.256 + Return =
15.257 + case Method of
15.258 + #'basic.consume'{} ->
15.259 + ConsumerModule:handle_consume(Method, Args, MState);
15.260 + #'basic.consume_ok'{} ->
15.261 + ConsumerModule:handle_consume_ok(Method, Args, MState);
15.262 + #'basic.cancel'{} ->
15.263 + ConsumerModule:handle_cancel(Method, MState);
15.264 + #'basic.cancel_ok'{} ->
15.265 + ConsumerModule:handle_cancel_ok(Method, Args, MState);
15.266 + #'basic.deliver'{} ->
15.267 + ConsumerModule:handle_deliver(Method, Args, MState)
15.268 + end,
15.269 + case Return of
15.270 + {ok, NewMState} ->
15.271 + {reply, ok, State#state{module_state = NewMState}};
15.272 + {error, Reason, NewMState} ->
15.273 + {stop, {error, Reason}, {error, Reason},
15.274 + State#state{module_state = NewMState}}
15.275 + end.
15.276 +
15.277 +handle_cast(_What, State) ->
15.278 + {noreply, State}.
15.279 +
15.280 +handle_info(Info, State = #state{module_state = MState,
15.281 + module = ConsumerModule}) ->
15.282 + case ConsumerModule:handle_info(Info, MState) of
15.283 + {ok, NewMState} ->
15.284 + {noreply, State#state{module_state = NewMState}};
15.285 + {error, Reason, NewMState} ->
15.286 + {stop, {error, Reason}, {error, Reason},
15.287 + State#state{module_state = NewMState}}
15.288 + end.
15.289 +
15.290 +terminate(Reason, #state{module = ConsumerModule, module_state = MState}) ->
15.291 + ConsumerModule:terminate(Reason, MState).
15.292 +
15.293 +code_change(_OldVsn, State, _Extra) ->
15.294 + State.
16.1 --- a/src/amqp_network_connection.erl Thu Apr 07 10:36:27 2011 -0500
16.2 +++ b/src/amqp_network_connection.erl Thu Jul 28 13:46:07 2011 +0100
16.3 @@ -98,17 +98,19 @@
16.4 %% Handshake
16.5 %%---------------------------------------------------------------------------
16.6
16.7 -connect(AmqpParams = #amqp_params{ssl_options = none,
16.8 - host = Host,
16.9 - port = Port}, SIF, ChMgr, State) ->
16.10 +connect(AmqpParams = #amqp_params_network{ssl_options = none,
16.11 + host = Host,
16.12 + port = Port},
16.13 + SIF, ChMgr, State) ->
16.14 case gen_tcp:connect(Host, Port, ?RABBIT_TCP_OPTS) of
16.15 {ok, Sock} -> try_handshake(AmqpParams, SIF, ChMgr,
16.16 State#state{sock = Sock});
16.17 {error, _} = E -> E
16.18 end;
16.19 -connect(AmqpParams = #amqp_params{ssl_options = SslOpts,
16.20 - host = Host,
16.21 - port = Port}, SIF, ChMgr, State) ->
16.22 +connect(AmqpParams = #amqp_params_network{ssl_options = SslOpts,
16.23 + host = Host,
16.24 + port = Port},
16.25 + SIF, ChMgr, State) ->
16.26 rabbit_misc:start_applications([crypto, public_key, ssl]),
16.27 case gen_tcp:connect(Host, Port, ?RABBIT_TCP_OPTS) of
16.28 {ok, Sock} ->
16.29 @@ -139,7 +141,8 @@
16.30 {ok, {_MainReader, _AState, Writer, SHF}} = SIF(Sock, ChMgr),
16.31 {SHF, State#state{writer0 = Writer}}.
16.32
16.33 -network_handshake(AmqpParams, SHF, State0) ->
16.34 +network_handshake(AmqpParams = #amqp_params_network{virtual_host = VHost},
16.35 + SHF, State0) ->
16.36 Start = #'connection.start'{server_properties = ServerProperties,
16.37 mechanisms = Mechanisms} =
16.38 handshake_recv('connection.start'),
16.39 @@ -147,8 +150,7 @@
16.40 Tune = login(AmqpParams, Mechanisms, State0),
16.41 {TuneOk, ChannelMax, State1} = tune(Tune, AmqpParams, SHF, State0),
16.42 do2(TuneOk, State1),
16.43 - do2(#'connection.open'{virtual_host = AmqpParams#amqp_params.virtual_host},
16.44 - State1),
16.45 + do2(#'connection.open'{virtual_host = VHost}, State1),
16.46 Params = {ServerProperties, ChannelMax, State1},
16.47 case handshake_recv('connection.open_ok') of
16.48 #'connection.open_ok'{} -> {ok, Params};
16.49 @@ -169,9 +171,9 @@
16.50 tune(#'connection.tune'{channel_max = ServerChannelMax,
16.51 frame_max = ServerFrameMax,
16.52 heartbeat = ServerHeartbeat},
16.53 - #amqp_params{channel_max = ClientChannelMax,
16.54 - frame_max = ClientFrameMax,
16.55 - heartbeat = ClientHeartbeat}, SHF, State) ->
16.56 + #amqp_params_network{channel_max = ClientChannelMax,
16.57 + frame_max = ClientFrameMax,
16.58 + heartbeat = ClientHeartbeat}, SHF, State) ->
16.59 [ChannelMax, Heartbeat, FrameMax] =
16.60 lists:zipwith(fun (Client, Server) when Client =:= 0; Server =:= 0 ->
16.61 lists:max([Client, Server]);
16.62 @@ -192,8 +194,8 @@
16.63 ReceiveFun = fun () -> Connection ! heartbeat_timeout end,
16.64 SHF(Sock, Heartbeat, SendFun, Heartbeat, ReceiveFun).
16.65
16.66 -login(Params = #amqp_params{auth_mechanisms = ClientMechanisms,
16.67 - client_properties = UserProps},
16.68 +login(Params = #amqp_params_network{auth_mechanisms = ClientMechanisms,
16.69 + client_properties = UserProps},
16.70 ServerMechanismsStr, State) ->
16.71 ServerMechanisms = string:tokens(binary_to_list(ServerMechanismsStr), " "),
16.72 case [{N, S, F} || F <- ClientMechanisms,
17.1 --- a/src/amqp_rpc_client.erl Thu Apr 07 10:36:27 2011 -0500
17.2 +++ b/src/amqp_rpc_client.erl Thu Jul 28 13:46:07 2011 +0100
17.3 @@ -141,10 +141,18 @@
17.4 {noreply, State}.
17.5
17.6 %% @private
17.7 +handle_info({#'basic.consume'{}, _Pid}, State) ->
17.8 + {noreply, State};
17.9 +
17.10 +%% @private
17.11 handle_info(#'basic.consume_ok'{}, State) ->
17.12 {noreply, State};
17.13
17.14 %% @private
17.15 +handle_info(#'basic.cancel'{}, State) ->
17.16 + {noreply, State};
17.17 +
17.18 +%% @private
17.19 handle_info(#'basic.cancel_ok'{}, State) ->
17.20 {stop, normal, State};
17.21
18.1 --- a/src/amqp_rpc_server.erl Thu Apr 07 10:36:27 2011 -0500
18.2 +++ b/src/amqp_rpc_server.erl Thu Jul 28 13:46:07 2011 +0100
18.3 @@ -75,10 +75,18 @@
18.4 {stop, normal, State};
18.5
18.6 %% @private
18.7 +handle_info({#'basic.consume'{}, _}, State) ->
18.8 + {noreply, State};
18.9 +
18.10 +%% @private
18.11 handle_info(#'basic.consume_ok'{}, State) ->
18.12 {noreply, State};
18.13
18.14 %% @private
18.15 +handle_info(#'basic.cancel'{}, State) ->
18.16 + {noreply, State};
18.17 +
18.18 +%% @private
18.19 handle_info(#'basic.cancel_ok'{}, State) ->
18.20 {stop, normal, State};
18.21
18.22 @@ -96,6 +104,10 @@
18.23 amqp_channel:call(Channel, Publish, #amqp_msg{props = Properties,
18.24 payload = Response}),
18.25 amqp_channel:call(Channel, #'basic.ack'{delivery_tag = DeliveryTag}),
18.26 + {noreply, State};
18.27 +
18.28 +%% @private
18.29 +handle_info({'DOWN', _MRef, process, _Pid, _Info}, State) ->
18.30 {noreply, State}.
18.31
18.32 %% @private
19.1 --- a/src/amqp_selective_consumer.erl Thu Apr 07 10:36:27 2011 -0500
19.2 +++ b/src/amqp_selective_consumer.erl Thu Jul 28 13:46:07 2011 +0100
19.3 @@ -1,4 +1,4 @@
19.4 -%% The contents of this file are subject to the Mozilla Public License
19.5 +%% The contents of this file are subject to the Mozilla Public Licensbe
19.6 %% Version 1.1 (the "License"); you may not use this file except in
19.7 %% compliance with the License. You may obtain a copy of the License at
19.8 %% http://www.mozilla.org/MPL/
19.9 @@ -11,20 +11,22 @@
19.10 %% The Original Code is RabbitMQ.
19.11 %%
19.12 %% The Initial Developer of the Original Code is VMware, Inc.
19.13 -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
19.14 +%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved.
19.15 %%
19.16
19.17 -%% @doc This module is an implementation of the amqp_gen_consumer behaviour and
19.18 -%% can be used as part of the Consumer parameter when opening AMQP
19.19 -%% channels.<br/>
19.20 +%% @doc This module is an implementation of the amqp_gen_consumer
19.21 +%% behaviour and can be used as part of the Consumer parameter when
19.22 +%% opening AMQP channels. This is the default implementation selected
19.23 +%% by channel. <br/>
19.24 %% <br/>
19.25 %% The Consumer parameter for this implementation is {{@module}, []@}<br/>
19.26 %% This consumer implementation keeps track of consumer tags and sends
19.27 %% the subscription-relevant messages to the registered consumers, according
19.28 %% to an internal tag dictionary.<br/>
19.29 %% <br/>
19.30 -%% Use {@module}:subscribe/3 to subscribe a consumer to a queue and
19.31 -%% {@module}:cancel/2 to cancel a subscription.<br/>
19.32 +%% Send a #basic.consume{} message to the channel to subscribe a
19.33 +%% consumer to a queue and send a #basic.cancel{} message to cancel a
19.34 +%% subscription.<br/>
19.35 %% <br/>
19.36 %% The channel will send to the relevant registered consumers the
19.37 %% basic.consume_ok, basic.cancel_ok, basic.cancel and basic.deliver messages
19.38 @@ -33,104 +35,39 @@
19.39 %% If a consumer is not registered for a given consumer tag, the message
19.40 %% is sent to the default consumer registered with
19.41 %% {@module}:register_default_consumer. If there is no default consumer
19.42 -%% registered in this case, an exception occurs and the channel is abrubptly
19.43 +%% registered in this case, an exception occurs and the channel is abruptly
19.44 %% terminated.<br/>
19.45 -%% <br/>
19.46 -%% amqp_channel:call(ChannelPid, #'basic.consume'{}) can also be used to
19.47 -%% subscribe to a queue, but one must register a default consumer for messages
19.48 -%% to be delivered to, beforehand. Failing to do so generates the
19.49 -%% above-mentioned exception.<br/>
19.50 -%% <br/>
19.51 -%% This consumer implementation creates a link between the channel and the
19.52 -%% registered consumers (either through register_default_consumer/2 or
19.53 -%% through subscribe/3). A cancel (either issued by the user application or the
19.54 -%% server) causes the link to be removed. In addition, registering another
19.55 -%% default consumer causes the old one to be unlinked.<br/>
19.56 -%% Warning! It is not recommended to rely on a consumer on killing off the
19.57 -%% channel (through the exit signal). That may cause messages to get lost.
19.58 -%% Always use amqp_channel:close/{1,3} for a clean shut down.
19.59 -module(amqp_selective_consumer).
19.60
19.61 -include("amqp_client.hrl").
19.62 +-include("amqp_gen_consumer_spec.hrl").
19.63
19.64 -behaviour(amqp_gen_consumer).
19.65
19.66 --export([subscribe/3, cancel/2, register_default_consumer/2]).
19.67 --export([init/1, handle_consume_ok/3, handle_cancel_ok/3, handle_cancel/2,
19.68 - handle_deliver/2, handle_call/2, terminate/2]).
19.69 +-export([register_default_consumer/2]).
19.70 +-export([init/1, handle_consume_ok/3, handle_consume/3, handle_cancel_ok/3,
19.71 + handle_cancel/2, handle_deliver/3, handle_info/2, handle_call/3,
19.72 + terminate/2]).
19.73
19.74 --record(state, {consumers = dict:new(), %% Tag -> ConsumerPid
19.75 - unassigned = dict:new(), %% BasicConsume -> [ConsumerPid]
19.76 - default_consumer = none}).
19.77 +-record(state, {consumers = dict:new(), %% Tag -> ConsumerPid
19.78 + unassigned = undefined, %% Pid
19.79 + monitors = dict:new(), %% Pid -> MRef
19.80 + default_consumer = none}).
19.81
19.82 %%---------------------------------------------------------------------------
19.83 %% Interface
19.84 %%---------------------------------------------------------------------------
19.85
19.86 -%% @type consume() = #'basic.consume'{}.
19.87 -%% The AMQP method that is used to subscribe a consumer to a queue.
19.88 -%% @type consume_ok() = #'basic.consume_ok'{}.
19.89 -%% The AMQP method returned in response to basic.consume.
19.90 -%% @spec (ChannelPid, consume(), ConsumerPid) -> Result
19.91 -%% where
19.92 -%% ChannelPid = pid()
19.93 -%% ConsumerPid = pid()
19.94 -%% Result = consume_ok() | ok | error
19.95 -%% @doc Creates a subscription to a queue. This subscribes a consumer pid to
19.96 -%% the queue defined in the #'basic.consume'{} method record. Note that
19.97 -%% both the process invoking this method and the supplied consumer process
19.98 -%% receive an acknowledgement of the subscription. The calling process will
19.99 -%% receive the acknowledgement as the return value of this function, whereas
19.100 -%% the consumer process will receive the notification as a message,
19.101 -%% asynchronously.<br/>
19.102 -%% <br/>
19.103 -%% Attempting to subscribe with a consumer_tag that is already in use or
19.104 -%% to subscribe with nowait true and not specifying a consumer_tag will
19.105 -%% cause an exception and the channel will terminate, causing this function
19.106 -%% to throw. If nowait is set to true the function will return 'error'
19.107 -%% immediately, and the channel will be terminated by the server.
19.108 -subscribe(ChannelPid, BasicConsume, ConsumerPid) ->
19.109 - ok = amqp_channel:call_consumer(ChannelPid,
19.110 - {subscribe, BasicConsume, ConsumerPid}),
19.111 - amqp_channel:call(ChannelPid, BasicConsume).
19.112 -
19.113 -%% @type cancel() = #'basic.cancel'{}.
19.114 -%% The AMQP method used to cancel a subscription.
19.115 -%% @spec (ChannelPid, Cancel) -> amqp_method() | ok
19.116 -%% where
19.117 -%% ChannelPid = pid()
19.118 -%% Cancel = cancel()
19.119 -%% @doc This function is the same as calling
19.120 -%% amqp_channel:call(ChannelPid, Cancel) and is only provided for completeness.
19.121 -cancel(ChannelPid, #'basic.cancel'{} = Cancel) ->
19.122 - amqp_channel:call(ChannelPid, Cancel).
19.123 -
19.124 %% @spec (ChannelPid, ConsumerPid) -> ok
19.125 %% where
19.126 %% ChannelPid = pid()
19.127 %% ConsumerPid = pid()
19.128 -%% @doc This function registers a default consumer with the channel. A default
19.129 -%% consumer is used in two situations:<br/>
19.130 -%% <br/>
19.131 -%% 1) A subscription was made via
19.132 +%% @doc This function registers a default consumer with the channel. A
19.133 +%% default consumer is used when a subscription is made via
19.134 %% amqp_channel:call(ChannelPid, #'basic.consume'{}) (rather than
19.135 -%% {@module}:subscribe/3) and hence there is no consumer pid registered with the
19.136 -%% consumer tag.<br/>
19.137 -%% <br/>
19.138 -%% 2) The following sequence of events occurs:<br/>
19.139 -%% <br/>
19.140 -%% - subscribe is used with basic.consume with explicit acks<br/>
19.141 -%% - some deliveries take place but are not acked<br/>
19.142 -%% - a basic.cancel is issued<br/>
19.143 -%% - a basic.recover{requeue = false} is issued<br/>
19.144 -%% <br/>
19.145 -%% Since requeue is specified to be false in the basic.recover, the spec
19.146 -%% states that the message must be redelivered to "the original recipient"
19.147 -%% - i.e. the same channel / consumer-tag. But the consumer is no longer
19.148 -%% active. <br/>
19.149 -%% <br/>
19.150 -%% In these two cases, the relevant deliveries will be sent to the default
19.151 -%% consumer.
19.152 +%% {@module}:subscribe/3) and hence there is no consumer pid
19.153 +%% registered with the consumer tag. In this case, the relevant
19.154 +%% deliveries will be sent to the default consumer.
19.155 register_default_consumer(ChannelPid, ConsumerPid) ->
19.156 amqp_channel:call_consumer(ChannelPid,
19.157 {register_default_consumer, ConsumerPid}).
19.158 @@ -144,35 +81,8 @@
19.159 {ok, #state{}}.
19.160
19.161 %% @private
19.162 -handle_consume_ok(BasicConsumeOk, BasicConsume, State) ->
19.163 - State1 = assign_consumer(BasicConsume, tag(BasicConsumeOk), State),
19.164 - deliver(BasicConsumeOk, State1),
19.165 - {ok, State1}.
19.166 -
19.167 -%% @private
19.168 -handle_cancel_ok(CancelOk, _Cancel, State) ->
19.169 - %% Unlink first!
19.170 - State1 = do_cancel(CancelOk, State),
19.171 - %% Use old state
19.172 - deliver(CancelOk, State),
19.173 - {ok, State1}.
19.174 -
19.175 -%% @private
19.176 -handle_cancel(Cancel, State) ->
19.177 - %% Unlink first!
19.178 - State1 = do_cancel(Cancel, State),
19.179 - %% Use old state
19.180 - deliver(Cancel, State),
19.181 - {ok, State1}.
19.182 -
19.183 -%% @private
19.184 -handle_deliver(Deliver, State) ->
19.185 - deliver(Deliver, State),
19.186 - {ok, State}.
19.187 -
19.188 -%% @private
19.189 -handle_call({subscribe, BasicConsume, Pid},
19.190 - State = #state{consumers = Consumers, unassigned = Unassigned}) ->
19.191 +handle_consume(BasicConsume, Pid, State = #state{consumers = Consumers,
19.192 + monitors = Monitors}) ->
19.193 Tag = tag(BasicConsume),
19.194 Ok =
19.195 case BasicConsume of
19.196 @@ -187,66 +97,121 @@
19.197 _ ->
19.198 true
19.199 end,
19.200 - if Ok ->
19.201 - case BasicConsume of
19.202 - #'basic.consume'{nowait = true} ->
19.203 - {reply, ok,
19.204 - State#state{consumers = dict:store(Tag, Pid, Consumers)}};
19.205 - #'basic.consume'{nowait = false} ->
19.206 - NewUnassigned =
19.207 - dict:update(BasicConsume, fun (Pids) -> [Pid | Pids] end,
19.208 - [Pid], Unassigned),
19.209 - {reply, ok, State#state{unassigned = NewUnassigned}}
19.210 - end;
19.211 - true ->
19.212 - %% There is an error. Don't do anything (don't override existing
19.213 - %% consumers), the server will close the channel with an error.
19.214 - {reply, error, State}
19.215 - end;
19.216 -%% @private
19.217 -handle_call({register_default_consumer, Pid},
19.218 - State = #state{default_consumer = PrevPid}) ->
19.219 - case PrevPid of none -> ok;
19.220 - _ -> unlink(PrevPid)
19.221 - end,
19.222 - link(Pid),
19.223 - {reply, ok, State#state{default_consumer = Pid}}.
19.224 + case {Ok, BasicConsume} of
19.225 + {true, #'basic.consume'{nowait = true}} ->
19.226 + {ok, State#state
19.227 + {consumers = dict:store(Tag, Pid, Consumers),
19.228 + monitors = dict:store(Pid, monitor(process, Pid), Monitors)}};
19.229 + {true, #'basic.consume'{nowait = false}} ->
19.230 + {ok, State#state{unassigned = Pid}};
19.231 + {false, #'basic.consume'{nowait = true}} ->
19.232 + {error, 'no_consumer_tag_specified', State};
19.233 + {false, #'basic.consume'{nowait = false}} ->
19.234 + %% Don't do anything (don't override existing
19.235 + %% consumers), the server will close the channel with an error.
19.236 + {ok, State}
19.237 + end.
19.238
19.239 %% @private
19.240 -terminate(_Reason, _State) ->
19.241 - ok.
19.242 +handle_consume_ok(BasicConsumeOk, _BasicConsume,
19.243 + State = #state{unassigned = Pid,
19.244 + consumers = Consumers,
19.245 + monitors = Monitors})
19.246 + when is_pid(Pid) ->
19.247 + State1 = State#state{consumers =
19.248 + dict:store(tag(BasicConsumeOk), Pid, Consumers),
19.249 + monitors =
19.250 + dict:store(Pid, monitor(process, Pid), Monitors),
19.251 + unassigned = undefined},
19.252 + deliver(BasicConsumeOk, State1),
19.253 + {ok, State1}.
19.254 +
19.255 +%% @private
19.256 +%% The server sent a basic.cancel.
19.257 +handle_cancel(Cancel, State) ->
19.258 + State1 = do_cancel(Cancel, State),
19.259 + %% Use old state
19.260 + deliver(Cancel, State),
19.261 + {ok, State1}.
19.262 +
19.263 +%% @private
19.264 +%% We sent a basic.cancel and now receive the ok.
19.265 +handle_cancel_ok(CancelOk, _Cancel, State) ->
19.266 + State1 = do_cancel(CancelOk, State),
19.267 + %% Use old state
19.268 + deliver(CancelOk, State),
19.269 + {ok, State1}.
19.270 +
19.271 +%% @private
19.272 +handle_deliver(Deliver, Message, State) ->
19.273 + deliver(Deliver, Message, State),
19.274 + {ok, State}.
19.275 +
19.276 +%% @private
19.277 +handle_info({'DOWN', _MRef, process, Pid, _Info},
19.278 + State = #state{monitors = Monitors,
19.279 + consumers = Consumers,
19.280 + default_consumer = DConsumer }) ->
19.281 + case dict:find(Pid, Monitors) of
19.282 + {ok, _Tag} ->
19.283 + {ok, State#state{monitors = dict:erase(Pid, Monitors),
19.284 + consumers =
19.285 + dict:filter(
19.286 + fun (_, Pid1) when Pid1 =:= Pid -> false;
19.287 + (_, _) -> true
19.288 + end, Consumers)}};
19.289 + error ->
19.290 + case Pid of
19.291 + DConsumer -> {ok, State#state{
19.292 + monitors = dict:erase(Pid, Monitors),
19.293 + default_consumer = none}};
19.294 + _ -> {ok, State} %% unnamed consumer went down
19.295 + %% before receiving consume_ok
19.296 + end
19.297 + end.
19.298 +
19.299 +%% @private
19.300 +handle_call({register_default_consumer, Pid}, _From,
19.301 + State = #state{default_consumer = PrevPid,
19.302 + monitors = Monitors}) ->
19.303 + case PrevPid of
19.304 + none -> ok;
19.305 + _ -> demonitor(dict:fetch(PrevPid, Monitors)),
19.306 + dict:erase(PrevPid, Monitors)
19.307 + end,
19.308 + {reply, ok,
19.309 + State#state{default_consumer = Pid,
19.310 + monitors = dict:store(Pid, monitor(process, Pid),
19.311 + Monitors)}}.
19.312 +
19.313 +%% @private
19.314 +terminate(_Reason, State) ->
19.315 + State.
19.316
19.317 %%---------------------------------------------------------------------------
19.318 %% Internal plumbing
19.319 %%---------------------------------------------------------------------------
19.320
19.321 -assign_consumer(BasicConsume, Tag, State = #state{consumers = Consumers,
19.322 - unassigned = Unassigned}) ->
19.323 - case dict:find(BasicConsume, Unassigned) of
19.324 - {ok, [Pid]} ->
19.325 - State#state{unassigned = dict:erase(BasicConsume, Unassigned),
19.326 - consumers = dict:store(Tag, Pid, Consumers)};
19.327 - {ok, [Pid | RestPids]} ->
19.328 - State#state{unassigned = dict:store(BasicConsume, RestPids,
19.329 - Unassigned),
19.330 - consumers = dict:store(Tag, Pid, Consumers)};
19.331 - error ->
19.332 - %% Untracked consumer (subscribed with amqp_channel:call/2)
19.333 - State
19.334 - end.
19.335 -
19.336 deliver(Msg, State) ->
19.337 + deliver(Msg, undefined, State).
19.338 +deliver(Msg, Message, State) ->
19.339 + Combined = if Message =:= undefined -> Msg;
19.340 + true -> {Msg, Message}
19.341 + end,
19.342 case resolve_consumer(tag(Msg), State) of
19.343 - {consumer, Pid} -> Pid ! Msg;
19.344 - {default, Pid} -> Pid ! Msg;
19.345 + {consumer, Pid} -> Pid ! Combined;
19.346 + {default, Pid} -> Pid ! Combined;
19.347 error -> exit(unexpected_delivery_and_no_default_consumer)
19.348 end.
19.349
19.350 -do_cancel(Cancel, State = #state{consumers = Consumers}) ->
19.351 +do_cancel(Cancel, State = #state{consumers = Consumers,
19.352 + monitors = Monitors}) ->
19.353 Tag = tag(Cancel),
19.354 case dict:find(Tag, Consumers) of
19.355 - {ok, Pid} -> unlink(Pid),
19.356 - State#state{consumers = dict:erase(Tag, Consumers)};
19.357 + {ok, Pid} -> MRef = dict:fetch(Pid, Monitors),
19.358 + demonitor(MRef),
19.359 + State#state{consumers = dict:erase(Tag, Consumers),
19.360 + monitors = dict:erase(Pid, Monitors)};
19.361 error -> %% Untracked consumer. Do nothing.
19.362 State
19.363 end.
19.364 @@ -265,4 +230,4 @@
19.365 tag(#'basic.consume_ok'{consumer_tag = Tag}) -> Tag;
19.366 tag(#'basic.cancel'{consumer_tag = Tag}) -> Tag;
19.367 tag(#'basic.cancel_ok'{consumer_tag = Tag}) -> Tag;
19.368 -tag({#'basic.deliver'{consumer_tag = Tag}, _}) -> Tag.
19.369 +tag(#'basic.deliver'{consumer_tag = Tag}) -> Tag.
20.1 --- a/src/amqp_sup.erl Thu Apr 07 10:36:27 2011 -0500
20.2 +++ b/src/amqp_sup.erl Thu Jul 28 13:46:07 2011 +0100
20.3 @@ -21,7 +21,7 @@
20.4
20.5 -behaviour(supervisor2).
20.6
20.7 --export([start_link/0, start_connection_sup/3]).
20.8 +-export([start_link/0, start_connection_sup/1]).
20.9 -export([init/1]).
20.10
20.11 %%---------------------------------------------------------------------------
20.12 @@ -31,8 +31,8 @@
20.13 start_link() ->
20.14 supervisor2:start_link({local, amqp_sup}, ?MODULE, []).
20.15
20.16 -start_connection_sup(Type, Module, AmqpParams) ->
20.17 - supervisor2:start_child(amqp_sup, [Type, Module, AmqpParams]).
20.18 +start_connection_sup(AmqpParams) ->
20.19 + supervisor2:start_child(amqp_sup, [AmqpParams]).
20.20
20.21 %%---------------------------------------------------------------------------
20.22 %% supervisor2 callbacks
21.1 --- a/test.mk Thu Apr 07 10:36:27 2011 -0500
21.2 +++ b/test.mk Thu Jul 28 13:46:07 2011 +0100
21.3 @@ -60,7 +60,8 @@
21.4 run_test_detached: start_test_broker_node
21.5 OK=true && \
21.6 TMPFILE=$(MKTEMP) && \
21.7 - { $(RUN) -noinput $(TESTING_MESSAGE) $(RUN_TEST_ARGS) \
21.8 + { $(RUN) -noinput $(TESTING_MESSAGE) \
21.9 + $(SSL_CLIENT_ARGS) $(RUN_TEST_ARGS) \
21.10 -s init stop 2>&1 | tee $$TMPFILE || OK=false; } && \
21.11 { $(IS_SUCCESS) $$TMPFILE || OK=false; } && \
21.12 rm $$TMPFILE && \
21.13 @@ -79,7 +80,7 @@
21.14 $(MAKE) unboot_broker
21.15
21.16 boot_broker:
21.17 - $(MAKE) -C $(BROKER_DIR) start-background-node
21.18 + $(MAKE) -C $(BROKER_DIR) start-background-node RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS) $(SSL_BROKER_ARGS)"
21.19 $(MAKE) -C $(BROKER_DIR) start-rabbit-on-node
21.20
21.21 unboot_broker:
22.1 --- a/test/amqp_client_SUITE.erl Thu Apr 07 10:36:27 2011 -0500
22.2 +++ b/test/amqp_client_SUITE.erl Thu Jul 28 13:46:07 2011 +0100
22.3 @@ -58,6 +58,9 @@
22.4 pub_and_close_test_() -> ?RUN([{timeout, 60}]).
22.5 channel_tune_negotiation_test_() -> ?RUN([]).
22.6 confirm_test_() -> ?RUN([]).
22.7 +confirm_barrier_test_() -> ?RUN([]).
22.8 +confirm_barrier_nop_test_() -> ?RUN([]).
22.9 +default_consumer_test() -> ?RUN([]).
22.10 subscribe_nowait_test_() -> ?RUN([]).
22.11
22.12 non_existent_exchange_test_() -> ?RUN([negative]).
23.1 --- a/test/negative_test_util.erl Thu Apr 07 10:36:27 2011 -0500
23.2 +++ b/test/negative_test_util.erl Thu Jul 28 13:46:07 2011 +0100
23.3 @@ -24,12 +24,12 @@
23.4 non_existent_exchange_test() ->
23.5 Connection = test_util:new_connection(),
23.6 X = test_util:uuid(),
23.7 - RoutingKey = <<"a">>,
23.8 + RoutingKey = <<"a">>,
23.9 Payload = <<"foobar">>,
23.10 {ok, Channel} = amqp_connection:open_channel(Connection),
23.11 {ok, OtherChannel} = amqp_connection:open_channel(Connection),
23.12 amqp_channel:call(Channel, #'exchange.declare'{exchange = X}),
23.13 -
23.14 +
23.15 %% Deliberately mix up the routingkey and exchange arguments
23.16 Publish = #'basic.publish'{exchange = RoutingKey, routing_key = X},
23.17 amqp_channel:call(Channel, Publish, #amqp_msg{payload = Payload}),
23.18 @@ -177,20 +177,18 @@
23.19 end.
23.20
23.21 non_existent_user_test() ->
23.22 - Params = #amqp_params{username = test_util:uuid(),
23.23 - password = test_util:uuid()},
23.24 + Params = [{username, test_util:uuid()}, {password, test_util:uuid()}],
23.25 ?assertMatch({error, auth_failure}, test_util:new_connection(Params)).
23.26
23.27 invalid_password_test() ->
23.28 - Params = #amqp_params{username = <<"guest">>,
23.29 - password = test_util:uuid()},
23.30 + Params = [{username, <<"guest">>}, {password, test_util:uuid()}],
23.31 ?assertMatch({error, auth_failure}, test_util:new_connection(Params)).
23.32
23.33 non_existent_vhost_test() ->
23.34 - Params = #amqp_params{virtual_host = test_util:uuid()},
23.35 + Params = [{virtual_host, test_util:uuid()}],
23.36 ?assertMatch({error, access_refused}, test_util:new_connection(Params)).
23.37
23.38 no_permission_test() ->
23.39 - Params = #amqp_params{username = <<"test_user_no_perm">>,
23.40 - password = <<"test_user_no_perm">>},
23.41 + Params = [{username, <<"test_user_no_perm">>},
23.42 + {password, <<"test_user_no_perm">>}],
23.43 ?assertMatch({error, access_refused}, test_util:new_connection(Params)).
24.1 --- a/test/test_util.erl Thu Apr 07 10:36:27 2011 -0500
24.2 +++ b/test/test_util.erl Thu Jul 28 13:46:07 2011 +0100
24.3 @@ -337,13 +337,12 @@
24.4 exchange = X,
24.5 routing_key = RoutingKey}),
24.6 #'basic.consume_ok'{} =
24.7 - amqp_selective_consumer:subscribe(
24.8 - Channel, #'basic.consume'{queue = Q, consumer_tag = Tag}, self()),
24.9 + amqp_channel:call(Channel,
24.10 + #'basic.consume'{queue = Q, consumer_tag = Tag}),
24.11 receive #'basic.consume_ok'{consumer_tag = Tag} -> ok end,
24.12 receive {#'basic.deliver'{}, _} -> ok end,
24.13 #'basic.cancel_ok'{} =
24.14 - amqp_selective_consumer:cancel(
24.15 - Channel, #'basic.cancel'{consumer_tag = Tag}),
24.16 + amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = Tag}),
24.17 receive #'basic.cancel_ok'{consumer_tag = Tag} -> ok end,
24.18 Parent ! finished.
24.19
24.20 @@ -354,8 +353,7 @@
24.21 #'queue.declare_ok'{} =
24.22 amqp_channel:call(Channel, #'queue.declare'{queue = Q}),
24.23 #'basic.consume_ok'{consumer_tag = CTag} = ConsumeOk =
24.24 - amqp_selective_consumer:subscribe(
24.25 - Channel, #'basic.consume'{queue = Q}, self()),
24.26 + amqp_channel:call(Channel, #'basic.consume'{queue = Q}),
24.27 receive ConsumeOk -> ok end,
24.28 #'queue.delete_ok'{} =
24.29 amqp_channel:call(Channel, #'queue.delete'{queue = Q}),
24.30 @@ -413,10 +411,10 @@
24.31 #'exchange.declare_ok'{} =
24.32 amqp_channel:call(Channel2, #'exchange.declare'{exchange = uuid()}),
24.33
24.34 - teardown(Connection, Channel2).
24.35 + teardown(Connection, Channel2).
24.36
24.37 channel_tune_negotiation_test() ->
24.38 - amqp_connection:close(new_connection(#amqp_params{channel_max = 10})).
24.39 + amqp_connection:close(new_connection([{channel_max, 10}])).
24.40
24.41 basic_qos_test() ->
24.42 [NoQos, Qos] = [basic_qos_test(Prefetch) || Prefetch <- [0,1]],
24.43 @@ -437,8 +435,8 @@
24.44 {ok, Channel} = amqp_connection:open_channel(Connection),
24.45 amqp_channel:call(Channel,
24.46 #'basic.qos'{prefetch_count = Prefetch}),
24.47 - amqp_selective_consumer:subscribe(
24.48 - Channel, #'basic.consume'{queue = Q}, self()),
24.49 + amqp_channel:call(Channel,
24.50 + #'basic.consume'{queue = Q}),
24.51 Parent ! finished,
24.52 sleeping_consumer(Channel, Sleep, Parent)
24.53 end) || Sleep <- Workers],
24.54 @@ -491,7 +489,6 @@
24.55 {ok, Channel} = amqp_connection:open_channel(Connection),
24.56 #'confirm.select_ok'{} = amqp_channel:call(Channel, #'confirm.select'{}),
24.57 amqp_channel:register_confirm_handler(Channel, self()),
24.58 - io:format("Registered ~p~n", [self()]),
24.59 {ok, Q} = setup_publish(Channel),
24.60 {#'basic.get_ok'{}, _}
24.61 = amqp_channel:call(Channel, #'basic.get'{queue = Q, no_ack = false}),
24.62 @@ -503,15 +500,65 @@
24.63 end,
24.64 teardown(Connection, Channel).
24.65
24.66 +confirm_barrier_test() ->
24.67 + Connection = new_connection(),
24.68 + {ok, Channel} = amqp_connection:open_channel(Connection),
24.69 + #'confirm.select_ok'{} = amqp_channel:call(Channel, #'confirm.select'{}),
24.70 + [amqp_channel:call(Channel, #'basic.publish'{routing_key = <<"whoosh">>},
24.71 + #amqp_msg{payload = <<"foo">>})
24.72 + || _ <- lists:seq(1, 10)],
24.73 + true = amqp_channel:wait_for_confirms(Channel),
24.74 + teardown(Connection, Channel).
24.75 +
24.76 +confirm_barrier_nop_test() ->
24.77 + Connection = new_connection(),
24.78 + {ok, Channel} = amqp_connection:open_channel(Connection),
24.79 + true = amqp_channel:wait_for_confirms(Channel),
24.80 + amqp_channel:call(Channel, #'basic.publish'{routing_key = <<"whoosh">>},
24.81 + #amqp_msg{payload = <<"foo">>}),
24.82 + true = amqp_channel:wait_for_confirms(Channel),
24.83 + teardown(Connection, Channel).
24.84 +
24.85 +default_consumer_test() ->
24.86 + Connection = new_connection(),
24.87 + {ok, Channel} = amqp_connection:open_channel(Connection),
24.88 + amqp_selective_consumer:register_default_consumer(Channel, self()),
24.89 +
24.90 + #'queue.declare_ok'{queue = Q}
24.91 + = amqp_channel:call(Channel, #'queue.declare'{}),
24.92 + Pid = spawn(fun () -> receive
24.93 + after 10000 -> ok
24.94 + end
24.95 + end),
24.96 + #'basic.consume_ok'{} =
24.97 + amqp_channel:subscribe(Channel, #'basic.consume'{queue = Q}, Pid),
24.98 + monitor(process, Pid),
24.99 + exit(Pid, shutdown),
24.100 + receive
24.101 + {'DOWN', _, process, _, _} ->
24.102 + io:format("little consumer died out~n")
24.103 + end,
24.104 + Payload = <<"for the default consumer">>,
24.105 + amqp_channel:call(Channel,
24.106 + #'basic.publish'{exchange = <<>>, routing_key = Q},
24.107 + #amqp_msg{payload = Payload}),
24.108 +
24.109 + receive
24.110 + {#'basic.deliver'{}, #'amqp_msg'{payload = Payload}} ->
24.111 + ok
24.112 + after 1000 ->
24.113 + exit('default_consumer_didnt_work')
24.114 + end,
24.115 + teardown(Connection, Channel).
24.116 +
24.117 subscribe_nowait_test() ->
24.118 Connection = new_connection(),
24.119 {ok, Channel} = amqp_connection:open_channel(Connection),
24.120 {ok, Q} = setup_publish(Channel),
24.121 - ok = amqp_selective_consumer:subscribe(
24.122 - Channel, #'basic.consume'{queue = Q,
24.123 - consumer_tag = uuid(),
24.124 - nowait = true},
24.125 - self()),
24.126 + ok = amqp_channel:call(Channel,
24.127 + #'basic.consume'{queue = Q,
24.128 + consumer_tag = uuid(),
24.129 + nowait = true}),
24.130 receive #'basic.consume_ok'{} -> exit(unexpected_consume_ok)
24.131 after 0 -> ok
24.132 end,
24.133 @@ -750,15 +797,15 @@
24.134 <<A:32, B:32, C:32>>.
24.135
24.136 new_connection() ->
24.137 - new_connection(both, #amqp_params{}).
24.138 + new_connection(both, []).
24.139
24.140 new_connection(AllowedConnectionTypes) when is_atom(AllowedConnectionTypes) ->
24.141 - new_connection(AllowedConnectionTypes, #amqp_params{});
24.142 -new_connection(#amqp_params{} = AmqpParams) ->
24.143 - new_connection(both, AmqpParams).
24.144 + new_connection(AllowedConnectionTypes, []);
24.145 +new_connection(Params) when is_list(Params) ->
24.146 + new_connection(both, Params).
24.147
24.148 -new_connection(AllowedConnectionTypes, AmqpParams) ->
24.149 - {Type, Params} =
24.150 +new_connection(AllowedConnectionTypes, Params) ->
24.151 + Params1 =
24.152 case {AllowedConnectionTypes,
24.153 os:getenv("AMQP_CLIENT_TEST_CONNECTION_TYPE")} of
24.154 {just_direct, "network"} ->
24.155 @@ -768,24 +815,41 @@
24.156 {just_network, "direct"} ->
24.157 exit(normal);
24.158 {_, "network"} ->
24.159 - {network, AmqpParams};
24.160 + make_network_params(Params);
24.161 {_, "network_ssl"} ->
24.162 {ok, [[CertsDir]]} = init:get_argument(erlang_client_ssl_dir),
24.163 - {network,
24.164 - AmqpParams#amqp_params{
24.165 - port = 5671,
24.166 - ssl_options = [{cacertfile,
24.167 - CertsDir ++ "/testca/cacert.pem"},
24.168 - {certfile, CertsDir ++ "/client/cert.pem"},
24.169 - {keyfile, CertsDir ++ "/client/key.pem"},
24.170 - {verify, verify_peer},
24.171 - {fail_if_no_peer_cert, true}]}};
24.172 + make_network_params(
24.173 + [{ssl_options, [{cacertfile,
24.174 + CertsDir ++ "/testca/cacert.pem"},
24.175 + {certfile, CertsDir ++ "/client/cert.pem"},
24.176 + {keyfile, CertsDir ++ "/client/key.pem"},
24.177 + {verify, verify_peer},
24.178 + {fail_if_no_peer_cert, true}]}] ++ Params);
24.179 {_, "direct"} ->
24.180 - {direct,
24.181 - AmqpParams#amqp_params{node = rabbit_misc:makenode(rabbit)}}
24.182 + make_direct_params([node, rabbit_misc:makenode(rabbit)] ++
24.183 + Params)
24.184 end,
24.185 - case amqp_connection:start(Type, Params) of
24.186 + case amqp_connection:start(Params1) of
24.187 {ok, Conn} -> Conn;
24.188 {error, _} = E -> E
24.189 end.
24.190
24.191 +%% Note: not all amqp_params_network fields supported.
24.192 +make_network_params(Props) ->
24.193 + Pgv = fun (Key, Default) ->
24.194 + proplists:get_value(Key, Props, Default)
24.195 + end,
24.196 + #amqp_params_network{username = Pgv(username, <<"guest">>),
24.197 + password = Pgv(password, <<"guest">>),
24.198 + virtual_host = Pgv(virtual_host, <<"/">>),
24.199 + channel_max = Pgv(channel_max, 0),
24.200 + ssl_options = Pgv(ssl_options, none)}.
24.201 +
24.202 +%% Note: not all amqp_params_direct fields supported.
24.203 +make_direct_params(Props) ->
24.204 + Pgv = fun (Key, Default) ->
24.205 + proplists:get_value(Key, Props, Default)
24.206 + end,
24.207 + #amqp_params_direct{username = Pgv(username, <<"guest">>),
24.208 + virtual_host = Pgv(virtual_host, <<"/">>),
24.209 + node = Pgv(node, node())}.