merge from default bug24018
authorAlexandru Scvortov <alexandru@rabbitmq.com>
Thu Jul 28 13:46:07 2011 +0100 (2011-07-28)
branchbug24018
changeset 1277576e15cca8c6
parent 1149 583e4b619937
parent 1275 68a8204c89ba
child 1278 7b2ca497657f
merge from default

Direct tests still fail. There might also be a test or two missing.
Makefile
src/amqp_connection.erl
test.mk
test/amqp_client_SUITE.erl
test/negative_test_util.erl
test/test_util.erl
     1.1 --- a/.hgtags	Thu Apr 07 10:36:27 2011 -0500
     1.2 +++ b/.hgtags	Thu Jul 28 13:46:07 2011 +0100
     1.3 @@ -10,3 +10,5 @@
     1.4  553b07d6267a608673d1bc7603b6b3a13ce99b16 rabbitmq_v2_3_0
     1.5  a1f84f26c26ca04c4baac06f79d0f7a0d66c1dbb rabbitmq_v2_3_1
     1.6  69d96cc497ce4686377e41ced1d92a993ce6b748 rabbitmq_v2_4_0
     1.7 +c9fc28db6777acf24db72f00efb59d91d3172acf rabbitmq_v2_4_1
     1.8 +0b6b1c20a6e829c3477dd11e4b6a085a0189fd9c rabbitmq_v2_5_0
     2.1 --- a/Makefile	Thu Apr 07 10:36:27 2011 -0500
     2.2 +++ b/Makefile	Thu Jul 28 13:46:07 2011 +0100
     2.3 @@ -37,8 +37,9 @@
     2.4  
     2.5  distribution: documentation source_tarball package
     2.6  
     2.7 -%.app: %.app.in
     2.8 -	sed -e 's:%%VSN%%:$(VERSION):g' < $< > $@
     2.9 +%.app: %.app.in $(SOURCES) $(BROKER_DIR)/generate_app
    2.10 +	escript  $(BROKER_DIR)/generate_app $< $@ $(SOURCE_DIR)
    2.11 +	sed -i.save 's/%%VSN%%/$(VERSION)/' $@ && rm $@.save
    2.12  
    2.13  ###############################################################################
    2.14  ##  Dialyzer
     3.1 --- a/common.mk	Thu Apr 07 10:36:27 2011 -0500
     3.2 +++ b/common.mk	Thu Jul 28 13:46:07 2011 +0100
     3.3 @@ -56,10 +56,10 @@
     3.4  ERL_PATH ?=
     3.5  
     3.6  PACKAGE=amqp_client
     3.7 -PACKAGE_DIR=$(PACKAGE)$(if $(APPEND_VERSION),-$(VERSION),)
     3.8 +PACKAGE_DIR=$(PACKAGE)-$(VERSION)
     3.9  PACKAGE_NAME_EZ=$(PACKAGE_DIR).ez
    3.10  COMMON_PACKAGE=rabbit_common
    3.11 -export COMMON_PACKAGE_DIR=$(COMMON_PACKAGE)$(if $(APPEND_VERSION),-$(VERSION),)
    3.12 +export COMMON_PACKAGE_DIR=$(COMMON_PACKAGE)-$(VERSION)
    3.13  COMMON_PACKAGE_EZ=$(COMMON_PACKAGE_DIR).ez
    3.14  
    3.15  DEPS=$(shell erl -noshell -eval '{ok,[{_,_,[_,_,{modules, Mods},_,_,_]}]} = \
    3.16 @@ -109,13 +109,14 @@
    3.17  ALL_SSL := { $(MAKE) test_ssl || OK=false; }
    3.18  ALL_SSL_COVERAGE := { $(MAKE) test_ssl_coverage || OK=false; }
    3.19  SSL_BROKER_ARGS := -rabbit ssl_listeners [{\\\"0.0.0.0\\\",5671}] \
    3.20 -	-rabbit ssl_options [{cacertfile,\\\"$(SSL_CERTS_DIR)/testca/cacert.pem\\\"},{certfile,\\\"$(SSL_CERTS_DIR)/server/cert.pem\\\"},{keyfile,\\\"$(SSL_CERTS_DIR)/server/key.pem\\\"},{verify,verify_peer},{fail_if_no_peer_cert,true}] \
    3.21 -	-erlang_client_ssl_dir \"$(SSL_CERTS_DIR)\"
    3.22 +	-rabbit ssl_options [{cacertfile,\\\"$(SSL_CERTS_DIR)/testca/cacert.pem\\\"},{certfile,\\\"$(SSL_CERTS_DIR)/server/cert.pem\\\"},{keyfile,\\\"$(SSL_CERTS_DIR)/server/key.pem\\\"},{verify,verify_peer},{fail_if_no_peer_cert,true}]
    3.23 +SSL_CLIENT_ARGS := -erlang_client_ssl_dir $(SSL_CERTS_DIR)
    3.24  else
    3.25  SSL := @echo No SSL_CERTS_DIR defined. && false
    3.26  ALL_SSL := true
    3.27  ALL_SSL_COVERAGE := true
    3.28  SSL_BROKER_ARGS :=
    3.29 +SSL_CLIENT_ARGS :=
    3.30  endif
    3.31  
    3.32  # Versions prior to this are not supported
     4.1 --- a/ebin/amqp_client.app.in	Thu Apr 07 10:36:27 2011 -0500
     4.2 +++ b/ebin/amqp_client.app.in	Thu Jul 28 13:46:07 2011 +0100
     4.3 @@ -1,22 +1,7 @@
     4.4  {application, amqp_client,
     4.5   [{description, "RabbitMQ AMQP Client"},
     4.6    {vsn, "%%VSN%%"},
     4.7 -  {modules, [amqp_auth_mechanisms,
     4.8 -             amqp_channel,
     4.9 -             amqp_channel_sup,
    4.10 -             amqp_channel_sup_sup,
    4.11 -             amqp_channels_manager,
    4.12 -             amqp_client,
    4.13 -             amqp_connection,
    4.14 -             amqp_connection_sup,
    4.15 -             amqp_connection_type_sup,
    4.16 -             amqp_direct_connection,
    4.17 -             amqp_gen_connection,
    4.18 -             amqp_main_reader,
    4.19 -             amqp_network_connection,
    4.20 -             amqp_rpc_client,
    4.21 -             amqp_rpc_server,
    4.22 -             amqp_sup]},
    4.23 +  {modules, []},
    4.24    {registered, [amqp_sup]},
    4.25    {env, []},
    4.26    {mod, {amqp_client, []}},
     5.1 --- a/include/amqp_client.hrl	Thu Apr 07 10:36:27 2011 -0500
     5.2 +++ b/include/amqp_client.hrl	Thu Jul 28 13:46:07 2011 +0100
     5.3 @@ -14,6 +14,9 @@
     5.4  %% Copyright (c) 2007-2011 VMware, Inc.  All rights reserved.
     5.5  %%
     5.6  
     5.7 +-ifndef(AMQP_CLIENT_HRL).
     5.8 +-define(AMQP_CLIENT_HRL, true).
     5.9 +
    5.10  -include_lib("rabbit_common/include/rabbit.hrl").
    5.11  -include_lib("rabbit_common/include/rabbit_framing.hrl").
    5.12  
    5.13 @@ -25,21 +28,37 @@
    5.14  -define(MAX_CHANNEL_NUMBER, 65535).
    5.15  -define(DEFAULT_CONSUMER, {amqp_selective_consumer, []}).
    5.16  
    5.17 +-define(PROTOCOL_SSL_PORT, (?PROTOCOL_PORT - 1)).
    5.18 +
    5.19  -record(amqp_msg, {props = #'P_basic'{}, payload = <<>>}).
    5.20  
    5.21 --record(amqp_params, {username          = <<"guest">>,
    5.22 -                      password          = <<"guest">>,
    5.23 -                      virtual_host      = <<"/">>,
    5.24 -                      host              = "localhost",
    5.25 -                      port              = ?PROTOCOL_PORT,
    5.26 -                      node              = node(),
    5.27 -                      channel_max       = 0,
    5.28 -                      frame_max         = 0,
    5.29 -                      heartbeat         = 0,
    5.30 -                      ssl_options       = none,
    5.31 -                      auth_mechanisms   = [fun amqp_auth_mechanisms:plain/3,
    5.32 -                                           fun amqp_auth_mechanisms:amqplain/3],
    5.33 -                      client_properties = []}).
    5.34 +-record(amqp_params_network, {username          = <<"guest">>,
    5.35 +                              password          = <<"guest">>,
    5.36 +                              virtual_host      = <<"/">>,
    5.37 +                              host              = "localhost",
    5.38 +                              port              = undefined,
    5.39 +                              channel_max       = 0,
    5.40 +                              frame_max         = 0,
    5.41 +                              heartbeat         = 0,
    5.42 +                              ssl_options       = none,
    5.43 +                              auth_mechanisms   =
    5.44 +                                  [fun amqp_auth_mechanisms:plain/3,
    5.45 +                                   fun amqp_auth_mechanisms:amqplain/3],
    5.46 +                              client_properties = []}).
    5.47 +
    5.48 +-record(amqp_params_direct, {username          = <<"guest">>,
    5.49 +                             virtual_host      = <<"/">>,
    5.50 +                             node              = node(),
    5.51 +                             adapter_info      = none,
    5.52 +                             client_properties = []}).
    5.53 +
    5.54 +-record(adapter_info, {address         = unknown,
    5.55 +                       port            = unknown,
    5.56 +                       peer_address    = unknown,
    5.57 +                       peer_port       = unknown,
    5.58 +                       name            = unknown,
    5.59 +                       protocol        = unknown,
    5.60 +                       additional_info = []}).
    5.61  
    5.62  -define(LOG_DEBUG(Format), error_logger:info_msg(Format)).
    5.63  -define(LOG_INFO(Format, Args), error_logger:info_msg(Format, Args)).
    5.64 @@ -49,3 +68,5 @@
    5.65                                {<<"exchange_exchange_bindings">>, bool, true},
    5.66                                {<<"basic.nack">>,                 bool, true},
    5.67                                {<<"consumer_cancel_notify">>,     bool, true}]).
    5.68 +
    5.69 +-endif.
     6.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     6.2 +++ b/include/amqp_gen_consumer_spec.hrl	Thu Jul 28 13:46:07 2011 +0100
     6.3 @@ -0,0 +1,40 @@
     6.4 +%% The contents of this file are subject to the Mozilla Public License
     6.5 +%% Version 1.1 (the "License"); you may not use this file except in
     6.6 +%% compliance with the License. You may obtain a copy of the License at
     6.7 +%% http://www.mozilla.org/MPL/
     6.8 +%%
     6.9 +%% Software distributed under the License is distributed on an "AS IS"
    6.10 +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
    6.11 +%% License for the specific language governing rights and limitations
    6.12 +%% under the License.
    6.13 +%%
    6.14 +%% The Original Code is RabbitMQ.
    6.15 +%%
    6.16 +%% The Initial Developer of the Original Code is VMware, Inc.
    6.17 +%% Copyright (c) 2011-2011 VMware, Inc.  All rights reserved.
    6.18 +%%
    6.19 +
    6.20 +-include("amqp_client.hrl").
    6.21 +
    6.22 +-type(state() :: any()).
    6.23 +-type(consume() :: #'basic.consume'{}).
    6.24 +-type(consume_ok() :: #'basic.consume_ok'{}).
    6.25 +-type(cancel() :: #'basic.cancel'{}).
    6.26 +-type(cancel_ok() :: #'basic.cancel_ok'{}).
    6.27 +-type(deliver() :: #'basic.deliver'{}).
    6.28 +-type(from() :: any()).
    6.29 +-type(reason() :: any()).
    6.30 +-type(ok_error() :: {ok, state()} | {error, reason(), state()}).
    6.31 +
    6.32 +-spec(init/1 :: ([any()]) -> {ok, state()}).
    6.33 +-spec(handle_consume/3 :: (consume(), pid(), state()) -> ok_error()).
    6.34 +-spec(handle_consume_ok/3 :: (consume_ok(), consume(), state()) ->
    6.35 +                                  ok_error()).
    6.36 +-spec(handle_cancel/2 :: (cancel(), state()) -> ok_error()).
    6.37 +-spec(handle_cancel_ok/3 :: (cancel_ok(), cancel(), state()) -> ok_error()).
    6.38 +-spec(handle_deliver/3 :: (deliver(), #amqp_msg{}, state()) -> ok_error()).
    6.39 +-spec(handle_info/2 :: (any(), state()) -> ok_error()).
    6.40 +-spec(handle_call/3 :: (any(), from(), state()) ->
    6.41 +                           {reply, any(), state()} | {noreply, state()} |
    6.42 +                            {error, reason(), state()}).
    6.43 +-spec(terminate/2 :: (any(), state()) -> state()).
     7.1 --- a/rabbit_common.app.in	Thu Apr 07 10:36:27 2011 -0500
     7.2 +++ b/rabbit_common.app.in	Thu Jul 28 13:46:07 2011 +0100
     7.3 @@ -3,6 +3,7 @@
     7.4    {vsn, "%%VSN%%"},
     7.5    {modules, [
     7.6               gen_server2,
     7.7 +             priority_queue,
     7.8               rabbit_backing_queue,
     7.9               rabbit_basic,
    7.10               rabbit_binary_generator,
     8.1 --- a/src/amqp_auth_mechanisms.erl	Thu Apr 07 10:36:27 2011 -0500
     8.2 +++ b/src/amqp_auth_mechanisms.erl	Thu Jul 28 13:46:07 2011 +0100
     8.3 @@ -25,14 +25,14 @@
     8.4  
     8.5  plain(none, _, init) ->
     8.6      {<<"PLAIN">>, []};
     8.7 -plain(none, #amqp_params{username = Username,
     8.8 -                         password = Password}, _State) ->
     8.9 +plain(none, #amqp_params_network{username = Username,
    8.10 +                                 password = Password}, _State) ->
    8.11      {<<0, Username/binary, 0, Password/binary>>, _State}.
    8.12  
    8.13  amqplain(none, _, init) ->
    8.14      {<<"AMQPLAIN">>, []};
    8.15 -amqplain(none, #amqp_params{username = Username,
    8.16 -                            password = Password}, _State) ->
    8.17 +amqplain(none, #amqp_params_network{username = Username,
    8.18 +                                    password = Password}, _State) ->
    8.19      LoginTable = [{<<"LOGIN">>,    longstr, Username},
    8.20                    {<<"PASSWORD">>, longstr, Password}],
    8.21      {rabbit_binary_generator:generate_table(LoginTable), _State}.
     9.1 --- a/src/amqp_channel.erl	Thu Apr 07 10:36:27 2011 -0500
     9.2 +++ b/src/amqp_channel.erl	Thu Jul 28 13:46:07 2011 +0100
     9.3 @@ -68,11 +68,11 @@
     9.4  
     9.5  -export([call/2, call/3, cast/2, cast/3]).
     9.6  -export([close/1, close/3]).
     9.7 --export([next_publish_seqno/1]).
     9.8  -export([register_return_handler/2, register_flow_handler/2,
     9.9           register_confirm_handler/2]).
    9.10 --export([call_consumer/2]).
    9.11 -
    9.12 +-export([call_consumer/2, subscribe/3]).
    9.13 +-export([next_publish_seqno/1, wait_for_confirms/1,
    9.14 +         wait_for_confirms_or_die/1]).
    9.15  -export([start_link/5, connection_closing/3, open/1]).
    9.16  
    9.17  -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
    9.18 @@ -83,6 +83,7 @@
    9.19  
    9.20  -record(state, {number,
    9.21                  connection,
    9.22 +                consumer,
    9.23                  driver,
    9.24                  rpc_requests        = queue:new(),
    9.25                  closing             = false, %% false |
    9.26 @@ -94,9 +95,10 @@
    9.27                  next_pub_seqno      = 0,
    9.28                  flow_active         = true,
    9.29                  flow_handler_pid    = none,
    9.30 -                consumer_module,
    9.31 -                consumer_state,
    9.32 -                start_writer_fun
    9.33 +                start_writer_fun,
    9.34 +                unconfirmed_set     = gb_sets:new(),
    9.35 +                waiting_set         = gb_trees:empty(),
    9.36 +                only_acks_received  = true
    9.37                 }).
    9.38  
    9.39  %%---------------------------------------------------------------------------
    9.40 @@ -129,7 +131,7 @@
    9.41  %% @spec (Channel, Method) -> Result
    9.42  %% @doc This is equivalent to amqp_channel:call(Channel, Method, none).
    9.43  call(Channel, Method) ->
    9.44 -    gen_server:call(Channel, {call, Method, none}, infinity).
    9.45 +    gen_server:call(Channel, {call, Method, none, self()}, infinity).
    9.46  
    9.47  %% @spec (Channel, Method, Content) -> Result
    9.48  %% where
    9.49 @@ -152,12 +154,12 @@
    9.50  %% the broker. It does not necessarily imply that the broker has
    9.51  %% accepted responsibility for the message.
    9.52  call(Channel, Method, Content) ->
    9.53 -    gen_server:call(Channel, {call, Method, Content}, infinity).
    9.54 +    gen_server:call(Channel, {call, Method, Content, self()}, infinity).
    9.55  
    9.56  %% @spec (Channel, Method) -> ok
    9.57  %% @doc This is equivalent to amqp_channel:cast(Channel, Method, none).
    9.58  cast(Channel, Method) ->
    9.59 -    gen_server:cast(Channel, {cast, Method, none}).
    9.60 +    gen_server:cast(Channel, {cast, Method, none, self()}).
    9.61  
    9.62  %% @spec (Channel, Method, Content) -> ok
    9.63  %% where
    9.64 @@ -169,7 +171,7 @@
    9.65  %% This function is not recommended with synchronous methods, since there is no
    9.66  %% way to verify that the server has received the method.
    9.67  cast(Channel, Method, Content) ->
    9.68 -    gen_server:cast(Channel, {cast, Method, Content}).
    9.69 +    gen_server:cast(Channel, {cast, Method, Content, self()}).
    9.70  
    9.71  %% @spec (Channel) -> ok | closing
    9.72  %% where
    9.73 @@ -197,6 +199,24 @@
    9.74  next_publish_seqno(Channel) ->
    9.75      gen_server:call(Channel, next_publish_seqno, infinity).
    9.76  
    9.77 +%% @spec (Channel) -> boolean()
    9.78 +%% where
    9.79 +%%      Channel = pid()
    9.80 +%% @doc Wait until all messages published since the last call have
    9.81 +%% been either ack'd or nack'd by the broker.  Note, when called on a
    9.82 +%% non-Confirm channel, waitForConfirms returns true immediately.
    9.83 +wait_for_confirms(Channel) ->
    9.84 +    gen_server:call(Channel, wait_for_confirms, infinity).
    9.85 +
    9.86 +%% @spec (Channel) -> true
    9.87 +%% where
    9.88 +%%      Channel = pid()
    9.89 +%% @doc Behaves the same as wait_for_confirms/1, but if a nack is
    9.90 +%% received, the calling process is immediately sent an
    9.91 +%% exit(nack_received).
    9.92 +wait_for_confirms_or_die(Channel) ->
    9.93 +    gen_server:call(Channel, {wait_for_confirms_or_die, self()}, infinity).
    9.94 +
    9.95  %% @spec (Channel, ReturnHandler) -> ok
    9.96  %% where
    9.97  %%      Channel = pid()
    9.98 @@ -226,15 +246,25 @@
    9.99  register_flow_handler(Channel, FlowHandler) ->
   9.100      gen_server:cast(Channel, {register_flow_handler, FlowHandler} ).
   9.101  
   9.102 -%% @spec (Channel, Message) -> ok
   9.103 +%% @spec (Channel, Msg) -> ok
   9.104  %% where
   9.105  %%      Channel = pid()
   9.106 -%%      Message = any()
   9.107 +%%      Msg    = any()
   9.108  %% @doc This causes the channel to invoke Consumer:handle_call/2,
   9.109  %% where Consumer is the amqp_gen_consumer implementation registered with
   9.110  %% the channel.
   9.111 -call_consumer(Channel, Call) ->
   9.112 -    gen_server:call(Channel, {call_consumer, Call}, infinity).
   9.113 +call_consumer(Channel, Msg) ->
   9.114 +    gen_server:call(Channel, {call_consumer, Msg}, infinity).
   9.115 +
   9.116 +%% @spec (Channel, BasicConsume, Subscriber) -> ok
   9.117 +%% where
   9.118 +%%      Channel = pid()
   9.119 +%%      BasicConsume = amqp_method()
   9.120 +%%      Subscriber = pid()
   9.121 +%% @doc Subscribe the given pid to a queue using the specified
   9.122 +%% basic.consume method.
   9.123 +subscribe(Channel, BasicConsume = #'basic.consume'{}, Subscriber) ->
   9.124 +    gen_server:call(Channel, {subscribe, BasicConsume, Subscriber}, infinity).
   9.125  
   9.126  %%---------------------------------------------------------------------------
   9.127  %% Internal interface
   9.128 @@ -258,24 +288,22 @@
   9.129  %%---------------------------------------------------------------------------
   9.130  
   9.131  %% @private
   9.132 -init([Driver, Connection, ChannelNumber, {ConsumerModule, ConsumerArgs},
   9.133 -      SWF]) ->
   9.134 -    State0 = #state{connection       = Connection,
   9.135 -                    driver           = Driver,
   9.136 -                    number           = ChannelNumber,
   9.137 -                    consumer_module  = ConsumerModule,
   9.138 -                    start_writer_fun = SWF},
   9.139 -    {ok, consumer_callback(init, [ConsumerArgs], State0)}.
   9.140 +init([Driver, Connection, ChannelNumber, Consumer, SWF]) ->
   9.141 +    {ok, #state{connection       = Connection,
   9.142 +                driver           = Driver,
   9.143 +                number           = ChannelNumber,
   9.144 +                consumer         = Consumer,
   9.145 +                start_writer_fun = SWF}}.
   9.146  
   9.147  %% @private
   9.148  handle_call(open, From, State) ->
   9.149 -    {noreply, rpc_top_half(#'channel.open'{}, none, From, State)};
   9.150 +    {noreply, rpc_top_half(#'channel.open'{}, none, From, none, State)};
   9.151  %% @private
   9.152  handle_call({close, Code, Text}, From, State) ->
   9.153      handle_close(Code, Text, From, State);
   9.154  %% @private
   9.155 -handle_call({call, Method, AmqpMsg}, From, State) ->
   9.156 -    handle_method_to_server(Method, AmqpMsg, From, State);
   9.157 +handle_call({call, Method, AmqpMsg, Sender}, From, State) ->
   9.158 +    handle_method_to_server(Method, AmqpMsg, From, Sender, State);
   9.159  %% Handles the delivery of messages from a direct channel
   9.160  %% @private
   9.161  handle_call({send_command_sync, Method, Content}, From, State) ->
   9.162 @@ -292,13 +320,25 @@
   9.163  handle_call(next_publish_seqno, _From,
   9.164              State = #state{next_pub_seqno = SeqNo}) ->
   9.165      {reply, SeqNo, State};
   9.166 +
   9.167 +handle_call(wait_for_confirms, From, State) ->
   9.168 +    handle_wait_for_confirms(From, none, true, State);
   9.169 +
   9.170 +%% Lets the channel know that the process should be sent an exit
   9.171 +%% signal if a nack is received.
   9.172 +handle_call({wait_for_confirms_or_die, Pid}, From, State) ->
   9.173 +    handle_wait_for_confirms(From, Pid, ok, State);
   9.174  %% @private
   9.175 -handle_call({call_consumer, Call}, _From, State) ->
   9.176 -    handle_consumer_callback(handle_call, [Call], State).
   9.177 +handle_call({call_consumer, Msg}, _From,
   9.178 +            State = #state{consumer = Consumer}) ->
   9.179 +    {reply, amqp_gen_consumer:call_consumer(Consumer, Msg), State};
   9.180 +%% @private
   9.181 +handle_call({subscribe, BasicConsume, Subscriber}, From, State) ->
   9.182 +    handle_method_to_server(BasicConsume, none, From, Subscriber, State).
   9.183  
   9.184  %% @private
   9.185 -handle_cast({cast, Method, AmqpMsg}, State) ->
   9.186 -    handle_method_to_server(Method, AmqpMsg, none, State);
   9.187 +handle_cast({cast, Method, AmqpMsg, Sender}, State) ->
   9.188 +    handle_method_to_server(Method, AmqpMsg, none, Sender, State);
   9.189  %% @private
   9.190  handle_cast({register_return_handler, ReturnHandler}, State) ->
   9.191      erlang:monitor(process, ReturnHandler),
   9.192 @@ -377,8 +417,8 @@
   9.193      {noreply, State#state{flow_handler_pid = none}}.
   9.194  
   9.195  %% @private
   9.196 -terminate(Reason, State) ->
   9.197 -    consumer_callback(terminate, [Reason], State).
   9.198 +terminate(_Reason, State) ->
   9.199 +    State.
   9.200  
   9.201  %% @private
   9.202  code_change(_OldVsn, State, _Extra) ->
   9.203 @@ -388,7 +428,8 @@
   9.204  %% RPC mechanism
   9.205  %%---------------------------------------------------------------------------
   9.206  
   9.207 -handle_method_to_server(Method, AmqpMsg, From, State) ->
   9.208 +handle_method_to_server(Method, AmqpMsg, From, Sender,
   9.209 +                        State = #state{unconfirmed_set = USet}) ->
   9.210      case {check_invalid_method(Method), From,
   9.211            check_block(Method, AmqpMsg, State)} of
   9.212          {ok, _, ok} ->
   9.213 @@ -398,12 +439,14 @@
   9.214                           {#'basic.publish'{}, 0} ->
   9.215                               State;
   9.216                           {#'basic.publish'{}, SeqNo} ->
   9.217 -                             State#state{next_pub_seqno = SeqNo + 1};
   9.218 +                             State#state{unconfirmed_set =
   9.219 +                                             gb_sets:add(SeqNo, USet),
   9.220 +                                         next_pub_seqno = SeqNo + 1};
   9.221                           _ ->
   9.222                               State
   9.223                       end,
   9.224 -            {noreply,
   9.225 -             rpc_top_half(Method, build_content(AmqpMsg), From, State1)};
   9.226 +            {noreply, rpc_top_half(Method, build_content(AmqpMsg),
   9.227 +                                   From, Sender, State1)};
   9.228          {ok, none, BlockReply} ->
   9.229              ?LOG_WARN("Channel (~p): discarding method ~p in cast.~n"
   9.230                        "Reason: ~p~n", [self(), Method, BlockReply]),
   9.231 @@ -424,21 +467,22 @@
   9.232                               class_id   = 0,
   9.233                               method_id  = 0},
   9.234      case check_block(Close, none, State) of
   9.235 -        ok         -> {noreply, rpc_top_half(Close, none, From, State)};
   9.236 +        ok         -> {noreply, rpc_top_half(Close, none, From, none, State)};
   9.237          BlockReply -> {reply, BlockReply, State}
   9.238      end.
   9.239  
   9.240 -rpc_top_half(Method, Content, From,
   9.241 +rpc_top_half(Method, Content, From, Sender,
   9.242               State0 = #state{rpc_requests = RequestQueue}) ->
   9.243      State1 = State0#state{
   9.244 -        rpc_requests = queue:in({From, Method, Content}, RequestQueue)},
   9.245 +        rpc_requests =
   9.246 +                   queue:in({From, Sender, Method, Content}, RequestQueue)},
   9.247      IsFirstElement = queue:is_empty(RequestQueue),
   9.248      if IsFirstElement -> do_rpc(State1);
   9.249         true           -> State1
   9.250      end.
   9.251  
   9.252  rpc_bottom_half(Reply, State = #state{rpc_requests = RequestQueue}) ->
   9.253 -    {{value, {From, _Method, _Content}}, RequestQueue1} =
   9.254 +    {{value, {From, _Sender, _Method, _Content}}, RequestQueue1} =
   9.255          queue:out(RequestQueue),
   9.256      case From of
   9.257          none -> ok;
   9.258 @@ -449,8 +493,8 @@
   9.259  do_rpc(State = #state{rpc_requests = Q,
   9.260                        closing      = Closing}) ->
   9.261      case queue:out(Q) of
   9.262 -        {{value, {From, Method, Content}}, NewQ} ->
   9.263 -            State1 = pre_do(Method, Content, State),
   9.264 +        {{value, {From, Sender, Method, Content}}, NewQ} ->
   9.265 +            State1 = pre_do(Method, Content, Sender, State),
   9.266              DoRet = do(Method, Content, State1),
   9.267              case ?PROTOCOL:is_method_synchronous(Method) of
   9.268                  true  -> State1;
   9.269 @@ -475,15 +519,18 @@
   9.270      end.
   9.271  
   9.272  pending_rpc_method(#state{rpc_requests = Q}) ->
   9.273 -    {value, {_From, Method, _Content}} = queue:peek(Q),
   9.274 +    {value, {_From, _Sender, Method, _Content}} = queue:peek(Q),
   9.275      Method.
   9.276  
   9.277 -pre_do(#'channel.open'{}, none, State) ->
   9.278 +pre_do(#'channel.open'{}, none, _Sender, State) ->
   9.279      start_writer(State);
   9.280  pre_do(#'channel.close'{reply_code = Code, reply_text = Text}, none,
   9.281 -       State) ->
   9.282 +       _Sender, State) ->
   9.283      State#state{closing = {just_channel, {app_initiated_close, Code, Text}}};
   9.284 -pre_do(_, _, State) ->
   9.285 +pre_do(#'basic.consume'{} = Method, none, Sender, State) ->
   9.286 +    ok = call_to_consumer(Method, Sender, State),
   9.287 +    State;
   9.288 +pre_do(_, _, _, State) ->
   9.289      State.
   9.290  
   9.291  %%---------------------------------------------------------------------------
   9.292 @@ -543,12 +590,18 @@
   9.293      end;
   9.294  handle_method_from_server1(#'basic.consume_ok'{} = ConsumeOk, none, State) ->
   9.295      Consume = #'basic.consume'{} = pending_rpc_method(State),
   9.296 -    State1 = consumer_callback(handle_consume_ok, [ConsumeOk, Consume], State),
   9.297 -    {noreply, rpc_bottom_half(ConsumeOk, State1)};
   9.298 +    ok = call_to_consumer(ConsumeOk, Consume, State),
   9.299 +    {noreply, rpc_bottom_half(ConsumeOk, State)};
   9.300  handle_method_from_server1(#'basic.cancel_ok'{} = CancelOk, none, State) ->
   9.301      Cancel = #'basic.cancel'{} = pending_rpc_method(State),
   9.302 -    State1 = consumer_callback(handle_cancel_ok, [CancelOk, Cancel], State),
   9.303 -    {noreply, rpc_bottom_half(CancelOk, State1)};
   9.304 +    ok = call_to_consumer(CancelOk, Cancel, State),
   9.305 +    {noreply, rpc_bottom_half(CancelOk, State)};
   9.306 +handle_method_from_server1(#'basic.cancel'{} = Cancel, none, State) ->
   9.307 +    ok = call_to_consumer(Cancel, none, State),
   9.308 +    {noreply, State};
   9.309 +handle_method_from_server1(#'basic.deliver'{} = Deliver, AmqpMsg, State) ->
   9.310 +    ok = call_to_consumer(Deliver, AmqpMsg, State),
   9.311 +    {noreply, State};
   9.312  handle_method_from_server1(#'channel.flow'{active = Active} = Flow, none,
   9.313                             State = #state{flow_handler_pid = FlowHandler}) ->
   9.314      case FlowHandler of none -> ok;
   9.315 @@ -558,9 +611,7 @@
   9.316      %% flushed beforehand. Methods that made it to the queue are not
   9.317      %% blocked in any circumstance.
   9.318      {noreply, rpc_top_half(#'channel.flow_ok'{active = Active}, none, none,
   9.319 -                           State#state{flow_active = Active})};
   9.320 -handle_method_from_server1(#'basic.deliver'{} = Deliver, AmqpMsg, State) ->
   9.321 -    handle_consumer_callback(handle_deliver, [{Deliver, AmqpMsg}], State);
   9.322 +                           none, State#state{flow_active = Active})};
   9.323  handle_method_from_server1(
   9.324          #'basic.return'{} = BasicReturn, AmqpMsg,
   9.325          State = #state{return_handler_pid = ReturnHandler}) ->
   9.326 @@ -571,28 +622,26 @@
   9.327          _    -> ReturnHandler ! {BasicReturn, AmqpMsg}
   9.328      end,
   9.329      {noreply, State};
   9.330 -handle_method_from_server1(#'basic.cancel'{} = Cancel, none, State) ->
   9.331 -    handle_consumer_callback(handle_cancel, [Cancel], State);
   9.332  handle_method_from_server1(#'basic.ack'{} = BasicAck, none,
   9.333                             #state{confirm_handler_pid = none} = State) ->
   9.334      ?LOG_WARN("Channel (~p): received ~p but there is no "
   9.335                "confirm handler registered~n", [self(), BasicAck]),
   9.336 -    {noreply, State};
   9.337 +    {noreply, update_confirm_set(BasicAck, State)};
   9.338  handle_method_from_server1(
   9.339          #'basic.ack'{} = BasicAck, none,
   9.340          #state{confirm_handler_pid = ConfirmHandler} = State) ->
   9.341      ConfirmHandler ! BasicAck,
   9.342 -    {noreply, State};
   9.343 +    {noreply, update_confirm_set(BasicAck, State)};
   9.344  handle_method_from_server1(#'basic.nack'{} = BasicNack, none,
   9.345                             #state{confirm_handler_pid = none} = State) ->
   9.346      ?LOG_WARN("Channel (~p): received ~p but there is no "
   9.347                "confirm handler registered~n", [self(), BasicNack]),
   9.348 -    {noreply, State};
   9.349 +    {noreply, update_confirm_set(BasicNack, handle_nack(State))};
   9.350  handle_method_from_server1(
   9.351          #'basic.nack'{} = BasicNack, none,
   9.352          #state{confirm_handler_pid = ConfirmHandler} = State) ->
   9.353      ConfirmHandler ! BasicNack,
   9.354 -    {noreply, State};
   9.355 +    {noreply, update_confirm_set(BasicNack, handle_nack(State))};
   9.356  
   9.357  handle_method_from_server1(Method, none, State) ->
   9.358      {noreply, rpc_bottom_half(Method, State)};
   9.359 @@ -717,25 +766,44 @@
   9.360              {noreply, State}
   9.361      end.
   9.362  
   9.363 -handle_consumer_callback(handle_call, Args,
   9.364 -                         State = #state{consumer_state = CState,
   9.365 -                                        consumer_module = CModule}) ->
   9.366 -    {reply, Reply, NewCState} =
   9.367 -        erlang:apply(CModule, handle_call, Args ++ [CState]),
   9.368 -    {reply, Reply, State#state{consumer_state = NewCState}};
   9.369 -handle_consumer_callback(Function, Args, State) ->
   9.370 -    {noreply, consumer_callback(Function, Args, State)}.
   9.371 +handle_nack(State = #state{waiting_set = WSet}) ->
   9.372 +    DyingPids = [Pid || {_, Pid} <- gb_trees:to_list(WSet), Pid =/= none],
   9.373 +    case DyingPids of
   9.374 +        [] -> State;
   9.375 +        _  -> [exit(Pid, nack_received) || Pid <- DyingPids],
   9.376 +              close(self(), 200, <<"Nacks Received">>)
   9.377 +    end.
   9.378  
   9.379 -consumer_callback(init, Args, State = #state{}) ->
   9.380 -    consumer_callback_basic(init, Args, State);
   9.381 -consumer_callback(terminate, Args, State = #state{consumer_state = CState,
   9.382 -                                                  consumer_module = CModule}) ->
   9.383 -    erlang:apply(CModule, terminate, Args ++ [CState]),
   9.384 -    State;
   9.385 -consumer_callback(Function, Args, State = #state{consumer_state = CState}) ->
   9.386 -    consumer_callback_basic(Function, Args ++ [CState], State).
   9.387 +update_confirm_set(#'basic.ack'{delivery_tag = SeqNo},
   9.388 +                   State = #state{unconfirmed_set = USet}) ->
   9.389 +    maybe_notify_waiters(
   9.390 +      State#state{unconfirmed_set = gb_sets:del_element(SeqNo, USet)});
   9.391 +update_confirm_set(#'basic.nack'{delivery_tag = SeqNo},
   9.392 +                   State = #state{unconfirmed_set = USet}) ->
   9.393 +    maybe_notify_waiters(
   9.394 +      State#state{unconfirmed_set = gb_sets:del_element(SeqNo, USet),
   9.395 +                  only_acks_received = false}).
   9.396  
   9.397 -consumer_callback_basic(Function,
   9.398 -                        Args, State = #state{consumer_module = CModule}) ->
   9.399 -    {ok, NewCState} = erlang:apply(CModule, Function, Args),
   9.400 -    State#state{consumer_state = NewCState}.
   9.401 +maybe_notify_waiters(State = #state{unconfirmed_set = USet}) ->
   9.402 +    case gb_sets:is_empty(USet) of
   9.403 +        false -> State;
   9.404 +        true  -> notify_confirm_waiters(State)
   9.405 +    end.
   9.406 +
   9.407 +notify_confirm_waiters(State = #state{waiting_set = WSet,
   9.408 +                                      only_acks_received = OAR}) ->
   9.409 +    [gen_server:reply(From, OAR) || {From, _} <- gb_trees:to_list(WSet)],
   9.410 +    State#state{waiting_set = gb_trees:empty(),
   9.411 +                only_acks_received = true}.
   9.412 +
   9.413 +handle_wait_for_confirms(From, Notify, EmptyReply,
   9.414 +                         State = #state{unconfirmed_set = USet,
   9.415 +                                        waiting_set     = WSet}) ->
   9.416 +    case gb_sets:is_empty(USet) of
   9.417 +        true  -> {reply, EmptyReply, State};
   9.418 +        false -> {noreply, State#state{waiting_set =
   9.419 +                                           gb_trees:insert(From, Notify, WSet)}}
   9.420 +    end.
   9.421 +
   9.422 +call_to_consumer(Method, Args, #state{consumer = Consumer}) ->
   9.423 +    amqp_gen_consumer:call_consumer(Consumer, Method, Args).
    10.1 --- a/src/amqp_channel_sup.erl	Thu Apr 07 10:36:27 2011 -0500
    10.2 +++ b/src/amqp_channel_sup.erl	Thu Jul 28 13:46:07 2011 +0100
    10.3 @@ -28,14 +28,15 @@
    10.4  %% Interface
    10.5  %%---------------------------------------------------------------------------
    10.6  
    10.7 -start_link(Type, Connection, InfraArgs, ChNumber, Consumer) ->
    10.8 -    {ok, Sup} = supervisor2:start_link(?MODULE, []),
    10.9 +start_link(Type, Connection, InfraArgs, ChNumber, Consumer = {_, _}) ->
   10.10 +    {ok, Sup} = supervisor2:start_link(?MODULE, [Consumer]),
   10.11 +    [{gen_consumer, ConsumerPid, _, _}] = supervisor2:which_children(Sup),
   10.12      {ok, ChPid} = supervisor2:start_child(
   10.13                      Sup, {channel, {amqp_channel, start_link,
   10.14 -                                    [Type, Connection, ChNumber, Consumer,
   10.15 +                                    [Type, Connection, ChNumber, ConsumerPid,
   10.16                                       start_writer_fun(Sup, Type, InfraArgs,
   10.17                                                        ChNumber)]},
   10.18 -                          intrinsic, brutal_kill, worker, [amqp_channel]}),
   10.19 +                          intrinsic, ?MAX_WAIT, worker, [amqp_channel]}),
   10.20      {ok, AState} = init_command_assembler(Type),
   10.21      {ok, Sup, {ChPid, AState}}.
   10.22  
   10.23 @@ -60,7 +61,7 @@
   10.24                          {writer, {rabbit_writer, start_link,
   10.25                                    [Sock, ChNumber, ?FRAME_MIN_SIZE, ?PROTOCOL,
   10.26                                     self()]},
   10.27 -                         transient, ?MAX_WAIT, worker, [rabbit_writer]})
   10.28 +                         intrinsic, ?MAX_WAIT, worker, [rabbit_writer]})
   10.29      end.
   10.30  
   10.31  init_command_assembler(direct)  -> {ok, none};
   10.32 @@ -70,5 +71,8 @@
   10.33  %% supervisor2 callbacks
   10.34  %%---------------------------------------------------------------------------
   10.35  
   10.36 -init([]) ->
   10.37 -    {ok, {{one_for_all, 0, 1}, []}}.
   10.38 +init([{ConsumerModule, ConsumerArgs}]) ->
   10.39 +    {ok, {{one_for_all, 0, 1},
   10.40 +          [{gen_consumer, {amqp_gen_consumer, start_link,
   10.41 +                           [ConsumerModule, ConsumerArgs]},
   10.42 +           intrinsic, ?MAX_WAIT, worker, [amqp_gen_consumer]}]}}.
    11.1 --- a/src/amqp_connection.erl	Thu Apr 07 10:36:27 2011 -0500
    11.2 +++ b/src/amqp_connection.erl	Thu Jul 28 13:46:07 2011 +0100
    11.3 @@ -70,7 +70,7 @@
    11.4  -include("amqp_client.hrl").
    11.5  
    11.6  -export([open_channel/1, open_channel/2, open_channel/3]).
    11.7 --export([start/1, start/2]).
    11.8 +-export([start/1]).
    11.9  -export([close/1, close/3]).
   11.10  -export([info/2, info_keys/1, info_keys/0]).
   11.11  
   11.12 @@ -78,7 +78,22 @@
   11.13  %% Type Definitions
   11.14  %%---------------------------------------------------------------------------
   11.15  
   11.16 -%% @type amqp_params() = #amqp_params{}.
   11.17 +%% @type adapter_info() = #adapter_info{}.
   11.18 +%% @type amqp_params_direct() = #amqp_params_direct{}.
   11.19 +%% As defined in amqp_client.hrl. It contains the following fields:
   11.20 +%% <ul>
   11.21 +%% <li>username :: binary() - The name of a user registered with the broker, 
   11.22 +%%     defaults to &lt;&lt;guest"&gt;&gt;</li>
   11.23 +%% <li>virtual_host :: binary() - The name of a virtual host in the broker,
   11.24 +%%     defaults to &lt;&lt;"/"&gt;&gt;</li>
   11.25 +%% <li>node :: atom() - The node the broker runs on (direct only)</li>
   11.26 +%% <li>adapter_info :: adapter_info() - Extra management information for if
   11.27 +%%     this connection represents a non-AMQP network connection.</li>
   11.28 +%% <li>client_properties :: [{binary(), atom(), binary()}] - A list of extra
   11.29 +%%     client properties to be sent to the server, defaults to []</li>
   11.30 +%% </ul>
   11.31 +%%
   11.32 +%% @type amqp_params_network() = #amqp_params_network{}.
   11.33  %% As defined in amqp_client.hrl. It contains the following fields:
   11.34  %% <ul>
   11.35  %% <li>username :: binary() - The name of a user registered with the broker, 
   11.36 @@ -91,7 +106,6 @@
   11.37  %%     defaults to "localhost" (network only)</li>
   11.38  %% <li>port :: integer() - The port the broker is listening on,
   11.39  %%     defaults to 5672 (network only)</li>
   11.40 -%% <li>node :: atom() - The node the broker runs on (direct only)</li>
   11.41  %% <li>channel_max :: non_neg_integer() - The channel_max handshake parameter,
   11.42  %%     defaults to 0</li>
   11.43  %% <li>frame_max :: non_neg_integer() - The frame_max handshake parameter,
   11.44 @@ -104,43 +118,38 @@
   11.45  %%     client properties to be sent to the server, defaults to []</li>
   11.46  %% </ul>
   11.47  
   11.48 +
   11.49  %%---------------------------------------------------------------------------
   11.50  %% Starting a connection
   11.51  %%---------------------------------------------------------------------------
   11.52  
   11.53 -%% @spec (Type) -> {ok, Connection} | {error, Error}
   11.54 +%% @spec (Params) -> {ok, Connection} | {error, Error}
   11.55  %% where
   11.56 -%%     Type = network | direct
   11.57 -%%     Connection = pid()
   11.58 -%% @doc Starts a connection to an AMQP server. Use network type to connect
   11.59 -%% to a remote AMQP server - default connection settings are used, meaning that
   11.60 -%% the server is expected to be at localhost:5672, with a vhost of "/"
   11.61 -%% authorising a user guest/guest. Use direct type for a direct connection to
   11.62 -%% a RabbitMQ server, assuming that the server is running in the same process
   11.63 -%% space, and with a default set of amqp_params. If a different host, port,
   11.64 -%% vhost or credential set is required, start/2 should be used.
   11.65 -start(Type) ->
   11.66 -    start(Type, #amqp_params{}).
   11.67 -
   11.68 -%% @spec (Type, amqp_params()) -> {ok, Connection} | {error, Error}
   11.69 -%% where
   11.70 -%%      Type = network | direct
   11.71 +%%      Params = amqp_params_network() | amqp_params_direct()
   11.72  %%      Connection = pid()
   11.73 -%% @doc Starts a connection to an AMQP server. Use network type to connect
   11.74 -%% to a remote AMQP server or direct type for a direct connection to
   11.75 -%% a RabbitMQ server, assuming that the server is running in the same process
   11.76 -%% space.
   11.77 -start(Type, AmqpParams) ->
   11.78 +%% @doc Starts a connection to an AMQP server. Use network params to
   11.79 +%% connect to a remote AMQP server or direct params for a direct
   11.80 +%% connection to a RabbitMQ server, assuming that the server is
   11.81 +%% running in the same process space.  If the port is set to 'undefined',
   11.82 +%% the default ports will be selected depending on whether this is a
   11.83 +%% normal or an SSL connection.
   11.84 +start(AmqpParams) ->
   11.85 +    io:format("starting connection: ~p~n", [AmqpParams]),
   11.86      case amqp_client:start() of
   11.87          ok                                      -> ok;
   11.88          {error, {already_started, amqp_client}} -> ok;
   11.89          {error, _} = E                          -> throw(E)
   11.90      end,
   11.91 -    {ok, _Sup, Connection} =
   11.92 -        amqp_sup:start_connection_sup(
   11.93 -            Type, case Type of direct  -> amqp_direct_connection;
   11.94 -                               network -> amqp_network_connection
   11.95 -                  end, AmqpParams),
   11.96 +    AmqpParams1 =
   11.97 +        case AmqpParams of
   11.98 +            #amqp_params_network{port = undefined, ssl_options = none} ->
   11.99 +                AmqpParams#amqp_params_network{port = ?PROTOCOL_PORT};
  11.100 +            #amqp_params_network{port = undefined, ssl_options = _} ->
  11.101 +                AmqpParams#amqp_params_network{port = ?PROTOCOL_SSL_PORT};
  11.102 +            _ ->
  11.103 +                AmqpParams
  11.104 +        end,
  11.105 +    {ok, _Sup, Connection} = amqp_sup:start_connection_sup(AmqpParams1),
  11.106      amqp_gen_connection:connect(Connection).
  11.107  
  11.108  %%---------------------------------------------------------------------------
    12.1 --- a/src/amqp_connection_sup.erl	Thu Apr 07 10:36:27 2011 -0500
    12.2 +++ b/src/amqp_connection_sup.erl	Thu Jul 28 13:46:07 2011 +0100
    12.3 @@ -21,15 +21,20 @@
    12.4  
    12.5  -behaviour(supervisor2).
    12.6  
    12.7 --export([start_link/3]).
    12.8 +-export([start_link/1]).
    12.9  -export([init/1]).
   12.10  
   12.11  %%---------------------------------------------------------------------------
   12.12  %% Interface
   12.13  %%---------------------------------------------------------------------------
   12.14  
   12.15 -start_link(Type, Module, AmqpParams) ->
   12.16 +start_link(AmqpParams) ->
   12.17      {ok, Sup} = supervisor2:start_link(?MODULE, []),
   12.18 +    {Type, Module} =
   12.19 +        case AmqpParams of
   12.20 +            #amqp_params_direct{}  -> {direct,  amqp_direct_connection};
   12.21 +            #amqp_params_network{} -> {network, amqp_network_connection}
   12.22 +        end,
   12.23      SChMF = start_channels_manager_fun(Sup, Type),
   12.24      SIF = start_infrastructure_fun(Sup, Type),
   12.25      {ok, Connection} = supervisor2:start_child(
    13.1 --- a/src/amqp_direct_connection.erl	Thu Apr 07 10:36:27 2011 -0500
    13.2 +++ b/src/amqp_direct_connection.erl	Thu Jul 28 13:46:07 2011 +0100
    13.3 @@ -27,12 +27,18 @@
    13.4  -record(state, {node,
    13.5                  user,
    13.6                  vhost,
    13.7 +                params,
    13.8 +                adapter_info,
    13.9                  collector,
   13.10                  closing_reason %% undefined | Reason
   13.11                 }).
   13.12  
   13.13  -define(INFO_KEYS, [type]).
   13.14  
   13.15 +-define(CREATION_EVENT_KEYS, [pid, protocol, address, port, name,
   13.16 +                              peer_address, peer_port,
   13.17 +                              user, vhost, client_properties, type]).
   13.18 +
   13.19  %%---------------------------------------------------------------------------
   13.20  
   13.21  init([]) ->
   13.22 @@ -58,29 +64,70 @@
   13.23      rabbit_queue_collector:delete_all(Collector),
   13.24      {stop, {shutdown, Reason}, State}.
   13.25  
   13.26 -terminate(_Reason, _State) ->
   13.27 +terminate(_Reason, #state{node = Node}) ->
   13.28 +    rpc:call(Node, rabbit_direct, disconnect, [[{pid, self()}]]),
   13.29      ok.
   13.30  
   13.31  i(type, _State) -> direct;
   13.32 +i(pid,  _State) -> self();
   13.33 +%% AMQP Params
   13.34 +i(user,              #state{params = P}) -> P#amqp_params_direct.username;
   13.35 +i(vhost,             #state{params = P}) -> P#amqp_params_direct.virtual_host;
   13.36 +i(client_properties, #state{params = P}) ->
   13.37 +    P#amqp_params_direct.client_properties;
   13.38 +%% Optional adapter info
   13.39 +i(protocol,     #state{adapter_info = I}) -> I#adapter_info.protocol;
   13.40 +i(address,      #state{adapter_info = I}) -> I#adapter_info.address;
   13.41 +i(port,         #state{adapter_info = I}) -> I#adapter_info.port;
   13.42 +i(peer_address, #state{adapter_info = I}) -> I#adapter_info.peer_address;
   13.43 +i(peer_port,    #state{adapter_info = I}) -> I#adapter_info.peer_port;
   13.44 +i(name,         #state{adapter_info = I}) -> I#adapter_info.name;
   13.45 +
   13.46  i(Item, _State) -> throw({bad_argument, Item}).
   13.47  
   13.48  info_keys() ->
   13.49      ?INFO_KEYS.
   13.50  
   13.51 -connect(#amqp_params{username = Username,
   13.52 -                     password = Pass,
   13.53 -                     node = Node,
   13.54 -                     virtual_host = VHost}, SIF, _ChMgr, State) ->
   13.55 +infos(Items, State) ->
   13.56 +    [{Item, i(Item, State)} || Item <- Items].
   13.57 +
   13.58 +additional_info(#state{adapter_info = I}) -> I#adapter_info.additional_info.
   13.59 +
   13.60 +connect(Params = #amqp_params_direct{username     = Username,
   13.61 +                                     node         = Node,
   13.62 +                                     adapter_info = Info,
   13.63 +                                     virtual_host = VHost},
   13.64 +        SIF, _ChMgr, State) ->
   13.65 +    State1 = State#state{node         = Node,
   13.66 +                         vhost        = VHost,
   13.67 +                         params       = Params,
   13.68 +                         adapter_info = ensure_adapter_info(Info)},
   13.69      case rpc:call(Node, rabbit_direct, connect,
   13.70 -                  [Username, Pass, VHost, ?PROTOCOL]) of
   13.71 +                  [Username, VHost, ?PROTOCOL,
   13.72 +                   infos(?CREATION_EVENT_KEYS, State1) ++
   13.73 +                       additional_info(State1)]) of
   13.74          {ok, {User, ServerProperties}} ->
   13.75              {ok, Collector} = SIF(),
   13.76 -            {ok, {ServerProperties, 0, State#state{node = Node,
   13.77 -                                                   user = User,
   13.78 -                                                   vhost = VHost,
   13.79 -                                                   collector = Collector}}};
   13.80 +            State2 = State1#state{user      = User,
   13.81 +                                  collector = Collector},
   13.82 +            {ok, {ServerProperties, 0, State2}};
   13.83          {error, _} = E ->
   13.84              E;
   13.85          {badrpc, nodedown} ->
   13.86              {error, {nodedown, Node}}
   13.87      end.
   13.88 +
   13.89 +ensure_adapter_info(none) ->
   13.90 +    ensure_adapter_info(#adapter_info{});
   13.91 +
   13.92 +ensure_adapter_info(A = #adapter_info{protocol = unknown}) ->
   13.93 +    ensure_adapter_info(A#adapter_info{protocol =
   13.94 +                                           {'Direct', ?PROTOCOL:version()}});
   13.95 +
   13.96 +ensure_adapter_info(A = #adapter_info{name         = unknown,
   13.97 +                                      peer_address = unknown,
   13.98 +                                      peer_port    = unknown}) ->
   13.99 +    Name = list_to_binary(rabbit_misc:pid_to_string(self())),
  13.100 +    ensure_adapter_info(A#adapter_info{name = Name});
  13.101 +
  13.102 +ensure_adapter_info(Info) -> Info.
    14.1 --- a/src/amqp_direct_consumer.erl	Thu Apr 07 10:36:27 2011 -0500
    14.2 +++ b/src/amqp_direct_consumer.erl	Thu Jul 28 13:46:07 2011 +0100
    14.3 @@ -11,23 +11,28 @@
    14.4  %% The Original Code is RabbitMQ.
    14.5  %%
    14.6  %% The Initial Developer of the Original Code is VMware, Inc.
    14.7 -%% Copyright (c) 2007-2011 VMware, Inc.  All rights reserved.
    14.8 +%% Copyright (c) 2011-2011 VMware, Inc.  All rights reserved.
    14.9  %%
   14.10  
   14.11 -%% @doc This module is an implementation of the amqp_gen_consumer behaviour and
   14.12 -%% can be used as part of the Consumer parameter when opening AMQP
   14.13 -%% channels.<br/>
   14.14 +%% @doc This module is an implementation of the amqp_gen_consumer
   14.15 +%% behaviour and can be used as part of the Consumer parameter when
   14.16 +%% opening AMQP channels.
   14.17  %% <br/>
   14.18 -%% The Consumer parameter for this implementation is
   14.19 -%% {{@module}, [ConsumerPid]@}, where ConsumerPid is a process that
   14.20 -%% will receive queue subscription-related messages.<br/>
   14.21  %% <br/>
   14.22 -%% This consumer implementation causes the channel to send to the ConsumerPid
   14.23 -%% all basic.consume_ok, basic.cancel_ok, basic.cancel and basic.deliver
   14.24 -%% messages received from the server.<br/>
   14.25 +%% The Consumer parameter for this implementation is {{@module},
   14.26 +%% [ConsumerPid]@}, where ConsumerPid is a process that will receive
   14.27 +%% queue subscription-related messages.<br/>
   14.28  %% <br/>
   14.29 -%% In addition, this consumer implementation creates a link between the channel
   14.30 -%% and the provided ConsumerPid.<br/>
   14.31 +%% This consumer implementation causes the channel to send to the
   14.32 +%% ConsumerPid all basic.consume, basic.consume_ok, basic.cancel,
   14.33 +%% basic.cancel_ok and basic.deliver messages received from the
   14.34 +%% server.
   14.35 +%% <br/>
   14.36 +%% <br/>
   14.37 +%% In addition, this consumer implementation monitors the ConsumerPid
   14.38 +%% and exits with the same shutdown reason when it dies.  'DOWN'
   14.39 +%% messages from other sources are passed to ConsumerPid.
   14.40 +%% <br/>
   14.41  %% Warning! It is not recommended to rely on a consumer on killing off the
   14.42  %% channel (through the exit signal). That may cause messages to get lost.
   14.43  %% Always use amqp_channel:close/{1,3} for a clean shut down.<br/>
   14.44 @@ -35,10 +40,13 @@
   14.45  %% This module has no public functions.
   14.46  -module(amqp_direct_consumer).
   14.47  
   14.48 +-include("amqp_gen_consumer_spec.hrl").
   14.49 +
   14.50  -behaviour(amqp_gen_consumer).
   14.51  
   14.52 --export([init/1, handle_consume_ok/3, handle_cancel_ok/3, handle_cancel/2,
   14.53 -         handle_deliver/2, handle_call/2, terminate/2]).
   14.54 +-export([init/1, handle_consume_ok/3, handle_consume/3, handle_cancel_ok/3,
   14.55 +         handle_cancel/2, handle_deliver/3, handle_info/2, handle_call/3,
   14.56 +         terminate/2]).
   14.57  
   14.58  %%---------------------------------------------------------------------------
   14.59  %% amqp_gen_consumer callbacks
   14.60 @@ -46,34 +54,46 @@
   14.61  
   14.62  %% @private
   14.63  init([ConsumerPid]) ->
   14.64 -    link(ConsumerPid),
   14.65 +    monitor(process, ConsumerPid),
   14.66      {ok, ConsumerPid}.
   14.67  
   14.68  %% @private
   14.69 +handle_consume(M, A, C) ->
   14.70 +    C ! {M, A},
   14.71 +    {ok, C}.
   14.72 +
   14.73 +%% @private
   14.74  handle_consume_ok(M, _, C) ->
   14.75      C ! M,
   14.76      {ok, C}.
   14.77  
   14.78  %% @private
   14.79 +handle_cancel(M, C) ->
   14.80 +    C ! M,
   14.81 +    {ok, C}.
   14.82 +
   14.83 +%% @private
   14.84  handle_cancel_ok(M, _, C) ->
   14.85      C ! M,
   14.86      {ok, C}.
   14.87  
   14.88  %% @private
   14.89 -handle_cancel(M, C) ->
   14.90 -    C ! M,
   14.91 +handle_deliver(M, A, C) ->
   14.92 +    C ! {M, A},
   14.93      {ok, C}.
   14.94  
   14.95  %% @private
   14.96 -handle_deliver(M, C) ->
   14.97 -    C ! M,
   14.98 +handle_info({'DOWN', _MRef, process, C, Info}, C) ->
   14.99 +    {error, {consumer_died, Info}, C};
  14.100 +handle_info({'DOWN', MRef, process, Pid, Info}, C) ->
  14.101 +    C ! {'DOWN', MRef, process, Pid, Info},
  14.102      {ok, C}.
  14.103  
  14.104  %% @private
  14.105 -handle_call(M, C) ->
  14.106 -    C ! M,
  14.107 +handle_call(M, A, C) ->
  14.108 +    C ! {M, A},
  14.109      {reply, ok, C}.
  14.110  
  14.111  %% @private
  14.112 -terminate(_Reason, _C) ->
  14.113 -    ok.
  14.114 +terminate(_Reason, C) ->
  14.115 +    C.
    15.1 --- a/src/amqp_gen_consumer.erl	Thu Apr 07 10:36:27 2011 -0500
    15.2 +++ b/src/amqp_gen_consumer.erl	Thu Jul 28 13:46:07 2011 +0100
    15.3 @@ -11,16 +11,64 @@
    15.4  %% The Original Code is RabbitMQ.
    15.5  %%
    15.6  %% The Initial Developer of the Original Code is VMware, Inc.
    15.7 -%% Copyright (c) 2007-2011 VMware, Inc.  All rights reserved.
    15.8 +%% Copyright (c) 2011-2011 VMware, Inc.  All rights reserved.
    15.9  %%
   15.10  
   15.11 -%% @doc A behaviour module for implementing consumers for amqp_channel. To
   15.12 -%% specify a consumer implementation for a channel, use
   15.13 -%% amqp_connection:open_channel/{2,3}.<br/>
   15.14 -%% All callbacks are called withing the channel process.
   15.15 +%% @doc A behaviour module for implementing consumers for
   15.16 +%% amqp_channel. To specify a consumer implementation for a channel,
   15.17 +%% use amqp_connection:open_channel/{2,3}.
   15.18 +%% <br/>
   15.19 +%% All callbacks are called within the gen_consumer process. <br/>
   15.20 +%% <br/>
   15.21 +%% See comments in amqp_gen_consumer.erl source file for documentation
   15.22 +%% on the callback functions.
   15.23 +%% <br/>
   15.24 +%% Note that making calls to the channel from the callback module will
   15.25 +%% result in deadlock.
   15.26  -module(amqp_gen_consumer).
   15.27  
   15.28 +-include("amqp_client.hrl").
   15.29 +
   15.30 +-behaviour(gen_server2).
   15.31 +
   15.32 +-export([start_link/2, call_consumer/2, call_consumer/3]).
   15.33  -export([behaviour_info/1]).
   15.34 +-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
   15.35 +         handle_info/2, prioritise_info/2]).
   15.36 +
   15.37 +-record(state, {module,
   15.38 +                module_state}).
   15.39 +
   15.40 +%%---------------------------------------------------------------------------
   15.41 +%% Interface
   15.42 +%%---------------------------------------------------------------------------
   15.43 +
   15.44 +%% @type ok_error() = {ok, state()} | {error, reason(), state()}
   15.45 +%% Denotes a successful or an error return from a consumer module call.
   15.46 +
   15.47 +start_link(ConsumerModule, ExtraParams) ->
   15.48 +    gen_server2:start_link(?MODULE, [ConsumerModule, ExtraParams], []).
   15.49 +
   15.50 +%% @spec (Consumer, Msg) -> ok
   15.51 +%% where
   15.52 +%%      Consumer = pid()
   15.53 +%%      Msg = any()
   15.54 +%%
   15.55 +%% @doc This function is used to perform arbitrary calls into the
   15.56 +%% consumer module.
   15.57 +call_consumer(Pid, Msg) ->
   15.58 +    gen_server2:call(Pid, {consumer_call, Msg}, infinity).
   15.59 +
   15.60 +%% @spec (Consumer, Method, Args) -> ok
   15.61 +%% where
   15.62 +%%      Consumer = pid()
   15.63 +%%      Method = amqp_method()
   15.64 +%%      Args = any()
   15.65 +%%
   15.66 +%% @doc This function is used by amqp_channel to forward received
   15.67 +%% methods and deliveries to the consumer module.
   15.68 +call_consumer(Pid, Method, Args) ->
   15.69 +    gen_server2:call(Pid, {consumer_call, Method, Args}, infinity).
   15.70  
   15.71  %%---------------------------------------------------------------------------
   15.72  %% Behaviour
   15.73 @@ -29,77 +77,176 @@
   15.74  %% @private
   15.75  behaviour_info(callbacks) ->
   15.76      [
   15.77 -     %% @spec Module:init(Args) -> {ok, InitialState}
   15.78 +     %% init(Args) -> {ok, InitialState} | {stop, Reason} | ignore
   15.79       %% where
   15.80       %%      Args = [any()]
   15.81 -     %%      InitialState = any()
   15.82 -     %% @doc This function is called by the channel, when it starts up.
   15.83 +     %%      InitialState = state()
   15.84 +     %%      Reason = term()
   15.85 +     %%
   15.86 +     %% This callback is invoked by the channel, when it starts
   15.87 +     %% up. Use it to initialize the state of the consumer. In case of
   15.88 +     %% an error, return {stop, Reason} or ignore.
   15.89       {init, 1},
   15.90  
   15.91 -     %% @type consume() = #'basic.consume'{}.
   15.92 -     %% The AMQP method that is used to  subscribe a consumer to a queue.
   15.93 -     %% @type consume_ok() = #'basic.consume_ok'{}.
   15.94 -     %% The AMQP method returned in response to basic.consume.
   15.95 -     %% @spec Module:handle_consume(ConsumeOk, Consume, State) -> {ok, NewState}
   15.96 +     %% handle_consume(Consume, Sender, State) -> ok_error()
   15.97       %% where
   15.98 -     %%      ConsumeOk = consume_ok()
   15.99 -     %%      Consume = consume()
  15.100 -     %%      State = NewState = any()
  15.101 -     %% @doc This function is called by the channel every time a
  15.102 +     %%      Consume = #'basic.consume'{}
  15.103 +     %%      Sender = pid()
  15.104 +     %%      State = state()
  15.105 +     %%
  15.106 +     %% This callback is invoked by the channel before a basic.consume
  15.107 +     %% is sent to the server.
  15.108 +     {handle_consume, 3},
  15.109 +
  15.110 +     %% handle_consume_ok(ConsumeOk, Consume, State) -> ok_error()
  15.111 +     %% where
  15.112 +     %%      ConsumeOk = #'basic.consume_ok'{}
  15.113 +     %%      Consume = #'basic.consume'{}
  15.114 +     %%      State = state()
  15.115 +     %%
  15.116 +     %% This callback is invoked by the channel every time a
  15.117       %% basic.consume_ok is received from the server. Consume is the original
  15.118       %% method sent out to the server - it can be used to associate the
  15.119       %% call with the response.
  15.120       {handle_consume_ok, 3},
  15.121  
  15.122 -     %% @type cancel() = #'basic.cancel'{}.
  15.123 -     %% The AMQP method used to cancel a subscription.
  15.124 -     %% @type cancel_ok() = #'basic.cancel_ok'{}.
  15.125 -     %% The AMQP method returned as reply to basicl.cancel.
  15.126 -     %% @spec Module:handle_cancel_ok(CancelOk, Cancel, State) -> {ok, NewState}
  15.127 +     %% handle_cancel(Cancel, State) -> ok_error()
  15.128       %% where
  15.129 -     %%      CancelOk = cancel_ok()
  15.130 -     %%      Cancel = cancel()
  15.131 -     %%      State = NewState = any()
  15.132 -     %% @doc This function is called by the channel every time a basic.cancel_ok
  15.133 +     %%      Cancel = #'basic.cancel'{}
  15.134 +     %%      State = state()
  15.135 +     %%
  15.136 +     %% This callback is invoked by the channel every time a basic.cancel
  15.137 +     %% is received from the server.
  15.138 +     {handle_cancel, 2},
  15.139 +
  15.140 +     %% handle_cancel_ok(CancelOk, Cancel, State) -> ok_error()
  15.141 +     %% where
  15.142 +     %%      CancelOk = #'basic.cancel_ok'{}
  15.143 +     %%      Cancel = #'basic.cancel'{}
  15.144 +     %%      State = state()
  15.145 +     %%
  15.146 +     %% This callback is invoked by the channel every time a basic.cancel_ok
  15.147       %% is received from the server.
  15.148       {handle_cancel_ok, 3},
  15.149  
  15.150 -     %% @type cancel() = #'basic.cancel'{}.
  15.151 -     %% The AMQP method used to cancel a subscription.
  15.152 -     %% @spec Module:handle_cancel(cancel(), State) -> {ok, NewState}
  15.153 +     %% handle_deliver(Deliver, Message, State) -> ok_error()
  15.154       %% where
  15.155 -     %%      State = NewState = any()
  15.156 -     %% @doc This function is called by the channel every time a basic.cancel
  15.157 +     %%      Deliver = #'basic.deliver'{}
  15.158 +     %%      Message = #amqp_msg{}
  15.159 +     %%      State = state()
  15.160 +     %%
  15.161 +     %% This callback is invoked by the channel every time a basic.deliver
  15.162       %% is received from the server.
  15.163 -     {handle_cancel, 2},
  15.164 +     {handle_deliver, 3},
  15.165  
  15.166 -     %% @type deliver() = #'basic.deliver'{}.
  15.167 -     %% The AMQP method sent when a message is delivered from a subscribed
  15.168 -     %% queue.
  15.169 -     %% @spec Module:handle_deliver({deliver(), #amqp_msg{}}, State} ->
  15.170 -     %%           {ok, NewState}
  15.171 +     %% handle_info(Info, State) -> ok_error()
  15.172       %% where
  15.173 -     %%      State = NewState = any()
  15.174 -     %% @doc This function is called by the channel every time a basic.deliver
  15.175 -     %% is received from the server.
  15.176 -     {handle_deliver, 2},
  15.177 +     %%      Info = any()
  15.178 +     %%      State = state()
  15.179 +     %%
  15.180 +     %% This callback is invoked the consumer process receives a
  15.181 +     %% message.
  15.182 +     {handle_info, 2},
  15.183  
  15.184 -     %% @spec Module:handle_message(Message, State) -> {reply, Reply, NewState}
  15.185 +     %% handle_call(Msg, From, State) -> {reply, Reply, NewState} |
  15.186 +     %%                                  {noreply, NewState} |
  15.187 +     %%                                  {error, Reason, NewState}
  15.188       %% where
  15.189 +     %%      Msg = any()
  15.190 +     %%      From = any()
  15.191       %%      Reply = any()
  15.192 -     %%      State = NewState = any()
  15.193 -     %% @doc This function is called by the channel when calling
  15.194 -     %% amqp_channel:call_consumer/2. Reply is the term that will be returned
  15.195 -     %% when amqp_channel:call_consumer/2 returns.
  15.196 -     {handle_call, 2},
  15.197 +     %%      State = state()
  15.198 +     %%      NewState = state()
  15.199 +     %%
  15.200 +     %% This callback is invoked by the channel when calling
  15.201 +     %% amqp_channel:call_consumer/2. Reply is the term that
  15.202 +     %% amqp_channel:call_consumer/2 will return. If the callback
  15.203 +     %% returns {noreply, _}, then the caller to
  15.204 +     %% amqp_channel:call_consumer/2 and the channel remain blocked
  15.205 +     %% until gen_server2:reply/2 is used with the provided From as
  15.206 +     %% the first argument.
  15.207 +     {handle_call, 3},
  15.208  
  15.209 -     %% @spec Module:terminate(Reason, State) -> _
  15.210 +     %% terminate(Reason, State) -> any()
  15.211       %% where
  15.212 -     %%      State = any()
  15.213       %%      Reason = any()
  15.214 -     %% @doc This function is called by the channel after it has shut down and
  15.215 +     %%      State = state()
  15.216 +     %%
  15.217 +     %% This callback is invoked by the channel after it has shut down and
  15.218       %% just before its process exits.
  15.219       {terminate, 2}
  15.220      ];
  15.221  behaviour_info(_Other) ->
  15.222      undefined.
  15.223 +
  15.224 +%%---------------------------------------------------------------------------
  15.225 +%% gen_server2 callbacks
  15.226 +%%---------------------------------------------------------------------------
  15.227 +
  15.228 +init([ConsumerModule, ExtraParams]) ->
  15.229 +    case ConsumerModule:init(ExtraParams) of
  15.230 +        {ok, MState} ->
  15.231 +            {ok, #state{module = ConsumerModule, module_state = MState}};
  15.232 +        {stop, Reason} ->
  15.233 +            {stop, Reason};
  15.234 +        ignore ->
  15.235 +            ignore
  15.236 +    end.
  15.237 +
  15.238 +prioritise_info({'DOWN', _MRef, process, _Pid, _Info}, _State) -> 1;
  15.239 +prioritise_info(_, _State)                                     -> 0.
  15.240 +
  15.241 +handle_call({consumer_call, Msg}, From,
  15.242 +            State = #state{module       = ConsumerModule,
  15.243 +                           module_state = MState}) ->
  15.244 +    case ConsumerModule:handle_call(Msg, From, MState) of
  15.245 +        {noreply, NewMState} ->
  15.246 +            {noreply, State#state{module_state = NewMState}};
  15.247 +        {reply, Reply, NewMState} ->
  15.248 +            {reply, Reply, State#state{module_state = NewMState}};
  15.249 +        {error, Reason, NewMState} ->
  15.250 +            {stop, {error, Reason}, {error, Reason},
  15.251 +             State#state{module_state = NewMState}}
  15.252 +    end;
  15.253 +handle_call({consumer_call, Method, Args}, _From,
  15.254 +            State = #state{module       = ConsumerModule,
  15.255 +                           module_state = MState}) ->
  15.256 +    Return =
  15.257 +        case Method of
  15.258 +            #'basic.consume'{} ->
  15.259 +                ConsumerModule:handle_consume(Method, Args, MState);
  15.260 +            #'basic.consume_ok'{} ->
  15.261 +                ConsumerModule:handle_consume_ok(Method, Args, MState);
  15.262 +            #'basic.cancel'{} ->
  15.263 +                ConsumerModule:handle_cancel(Method, MState);
  15.264 +            #'basic.cancel_ok'{} ->
  15.265 +                ConsumerModule:handle_cancel_ok(Method, Args, MState);
  15.266 +            #'basic.deliver'{} ->
  15.267 +                ConsumerModule:handle_deliver(Method, Args, MState)
  15.268 +        end,
  15.269 +    case Return of
  15.270 +        {ok, NewMState} ->
  15.271 +            {reply, ok, State#state{module_state = NewMState}};
  15.272 +        {error, Reason, NewMState} ->
  15.273 +            {stop, {error, Reason}, {error, Reason},
  15.274 +             State#state{module_state = NewMState}}
  15.275 +    end.
  15.276 +
  15.277 +handle_cast(_What, State) ->
  15.278 +    {noreply, State}.
  15.279 +
  15.280 +handle_info(Info, State = #state{module_state = MState,
  15.281 +                                 module       = ConsumerModule}) ->
  15.282 +    case ConsumerModule:handle_info(Info, MState) of
  15.283 +        {ok, NewMState} ->
  15.284 +            {noreply, State#state{module_state = NewMState}};
  15.285 +        {error, Reason, NewMState} ->
  15.286 +            {stop, {error, Reason}, {error, Reason},
  15.287 +             State#state{module_state = NewMState}}
  15.288 +    end.
  15.289 +
  15.290 +terminate(Reason, #state{module = ConsumerModule, module_state = MState}) ->
  15.291 +    ConsumerModule:terminate(Reason, MState).
  15.292 +
  15.293 +code_change(_OldVsn, State, _Extra) ->
  15.294 +    State.
    16.1 --- a/src/amqp_network_connection.erl	Thu Apr 07 10:36:27 2011 -0500
    16.2 +++ b/src/amqp_network_connection.erl	Thu Jul 28 13:46:07 2011 +0100
    16.3 @@ -98,17 +98,19 @@
    16.4  %% Handshake
    16.5  %%---------------------------------------------------------------------------
    16.6  
    16.7 -connect(AmqpParams = #amqp_params{ssl_options = none,
    16.8 -                                  host        = Host,
    16.9 -                                  port        = Port}, SIF, ChMgr, State) ->
   16.10 +connect(AmqpParams = #amqp_params_network{ssl_options = none,
   16.11 +                                          host        = Host,
   16.12 +                                          port        = Port},
   16.13 +        SIF, ChMgr, State) ->
   16.14      case gen_tcp:connect(Host, Port, ?RABBIT_TCP_OPTS) of
   16.15          {ok, Sock}     -> try_handshake(AmqpParams, SIF, ChMgr,
   16.16                                          State#state{sock = Sock});
   16.17          {error, _} = E -> E
   16.18      end;
   16.19 -connect(AmqpParams = #amqp_params{ssl_options = SslOpts,
   16.20 -                                  host        = Host,
   16.21 -                                  port        = Port}, SIF, ChMgr, State) ->
   16.22 +connect(AmqpParams = #amqp_params_network{ssl_options = SslOpts,
   16.23 +                                          host        = Host,
   16.24 +                                          port        = Port},
   16.25 +        SIF, ChMgr, State) ->
   16.26      rabbit_misc:start_applications([crypto, public_key, ssl]),
   16.27      case gen_tcp:connect(Host, Port, ?RABBIT_TCP_OPTS) of
   16.28          {ok, Sock} ->
   16.29 @@ -139,7 +141,8 @@
   16.30      {ok, {_MainReader, _AState, Writer, SHF}} = SIF(Sock, ChMgr),
   16.31      {SHF, State#state{writer0 = Writer}}.
   16.32  
   16.33 -network_handshake(AmqpParams, SHF, State0) ->
   16.34 +network_handshake(AmqpParams = #amqp_params_network{virtual_host = VHost},
   16.35 +                  SHF, State0) ->
   16.36      Start = #'connection.start'{server_properties = ServerProperties,
   16.37                                  mechanisms = Mechanisms} =
   16.38          handshake_recv('connection.start'),
   16.39 @@ -147,8 +150,7 @@
   16.40      Tune = login(AmqpParams, Mechanisms, State0),
   16.41      {TuneOk, ChannelMax, State1} = tune(Tune, AmqpParams, SHF, State0),
   16.42      do2(TuneOk, State1),
   16.43 -    do2(#'connection.open'{virtual_host = AmqpParams#amqp_params.virtual_host},
   16.44 -        State1),
   16.45 +    do2(#'connection.open'{virtual_host = VHost}, State1),
   16.46      Params = {ServerProperties, ChannelMax, State1},
   16.47      case handshake_recv('connection.open_ok') of
   16.48          #'connection.open_ok'{}                     -> {ok, Params};
   16.49 @@ -169,9 +171,9 @@
   16.50  tune(#'connection.tune'{channel_max = ServerChannelMax,
   16.51                          frame_max   = ServerFrameMax,
   16.52                          heartbeat   = ServerHeartbeat},
   16.53 -     #amqp_params{channel_max = ClientChannelMax,
   16.54 -                  frame_max   = ClientFrameMax,
   16.55 -                  heartbeat   = ClientHeartbeat}, SHF, State) ->
   16.56 +     #amqp_params_network{channel_max = ClientChannelMax,
   16.57 +                          frame_max   = ClientFrameMax,
   16.58 +                          heartbeat   = ClientHeartbeat}, SHF, State) ->
   16.59      [ChannelMax, Heartbeat, FrameMax] =
   16.60          lists:zipwith(fun (Client, Server) when Client =:= 0; Server =:= 0 ->
   16.61                                lists:max([Client, Server]);
   16.62 @@ -192,8 +194,8 @@
   16.63      ReceiveFun = fun () -> Connection ! heartbeat_timeout end,
   16.64      SHF(Sock, Heartbeat, SendFun, Heartbeat, ReceiveFun).
   16.65  
   16.66 -login(Params = #amqp_params{auth_mechanisms = ClientMechanisms,
   16.67 -                            client_properties = UserProps},
   16.68 +login(Params = #amqp_params_network{auth_mechanisms = ClientMechanisms,
   16.69 +                                    client_properties = UserProps},
   16.70        ServerMechanismsStr, State) ->
   16.71      ServerMechanisms = string:tokens(binary_to_list(ServerMechanismsStr), " "),
   16.72      case [{N, S, F} || F <- ClientMechanisms,
    17.1 --- a/src/amqp_rpc_client.erl	Thu Apr 07 10:36:27 2011 -0500
    17.2 +++ b/src/amqp_rpc_client.erl	Thu Jul 28 13:46:07 2011 +0100
    17.3 @@ -141,10 +141,18 @@
    17.4      {noreply, State}.
    17.5  
    17.6  %% @private
    17.7 +handle_info({#'basic.consume'{}, _Pid}, State) ->
    17.8 +    {noreply, State};
    17.9 +
   17.10 +%% @private
   17.11  handle_info(#'basic.consume_ok'{}, State) ->
   17.12      {noreply, State};
   17.13  
   17.14  %% @private
   17.15 +handle_info(#'basic.cancel'{}, State) ->
   17.16 +    {noreply, State};
   17.17 +
   17.18 +%% @private
   17.19  handle_info(#'basic.cancel_ok'{}, State) ->
   17.20      {stop, normal, State};
   17.21  
    18.1 --- a/src/amqp_rpc_server.erl	Thu Apr 07 10:36:27 2011 -0500
    18.2 +++ b/src/amqp_rpc_server.erl	Thu Jul 28 13:46:07 2011 +0100
    18.3 @@ -75,10 +75,18 @@
    18.4      {stop, normal, State};
    18.5  
    18.6  %% @private
    18.7 +handle_info({#'basic.consume'{}, _}, State) ->
    18.8 +    {noreply, State};
    18.9 +
   18.10 +%% @private
   18.11  handle_info(#'basic.consume_ok'{}, State) ->
   18.12      {noreply, State};
   18.13  
   18.14  %% @private
   18.15 +handle_info(#'basic.cancel'{}, State) ->
   18.16 +    {noreply, State};
   18.17 +
   18.18 +%% @private
   18.19  handle_info(#'basic.cancel_ok'{}, State) ->
   18.20      {stop, normal, State};
   18.21  
   18.22 @@ -96,6 +104,10 @@
   18.23      amqp_channel:call(Channel, Publish, #amqp_msg{props = Properties,
   18.24                                                    payload = Response}),
   18.25      amqp_channel:call(Channel, #'basic.ack'{delivery_tag = DeliveryTag}),
   18.26 +    {noreply, State};
   18.27 +
   18.28 +%% @private
   18.29 +handle_info({'DOWN', _MRef, process, _Pid, _Info}, State) ->
   18.30      {noreply, State}.
   18.31  
   18.32  %% @private
    19.1 --- a/src/amqp_selective_consumer.erl	Thu Apr 07 10:36:27 2011 -0500
    19.2 +++ b/src/amqp_selective_consumer.erl	Thu Jul 28 13:46:07 2011 +0100
    19.3 @@ -1,4 +1,4 @@
    19.4 -%% The contents of this file are subject to the Mozilla Public License
    19.5 +%% The contents of this file are subject to the Mozilla Public Licensbe
    19.6  %% Version 1.1 (the "License"); you may not use this file except in
    19.7  %% compliance with the License. You may obtain a copy of the License at
    19.8  %% http://www.mozilla.org/MPL/
    19.9 @@ -11,20 +11,22 @@
   19.10  %% The Original Code is RabbitMQ.
   19.11  %%
   19.12  %% The Initial Developer of the Original Code is VMware, Inc.
   19.13 -%% Copyright (c) 2007-2011 VMware, Inc.  All rights reserved.
   19.14 +%% Copyright (c) 2011-2011 VMware, Inc.  All rights reserved.
   19.15  %%
   19.16  
   19.17 -%% @doc This module is an implementation of the amqp_gen_consumer behaviour and
   19.18 -%% can be used as part of the Consumer parameter when opening AMQP
   19.19 -%% channels.<br/>
   19.20 +%% @doc This module is an implementation of the amqp_gen_consumer
   19.21 +%% behaviour and can be used as part of the Consumer parameter when
   19.22 +%% opening AMQP channels. This is the default implementation selected
   19.23 +%% by channel. <br/>
   19.24  %% <br/>
   19.25  %% The Consumer parameter for this implementation is {{@module}, []@}<br/>
   19.26  %% This consumer implementation keeps track of consumer tags and sends
   19.27  %% the subscription-relevant messages to the registered consumers, according
   19.28  %% to an internal tag dictionary.<br/>
   19.29  %% <br/>
   19.30 -%% Use {@module}:subscribe/3 to subscribe a consumer to a queue and
   19.31 -%% {@module}:cancel/2 to cancel a subscription.<br/>
   19.32 +%% Send a #basic.consume{} message to the channel to subscribe a
   19.33 +%% consumer to a queue and send a #basic.cancel{} message to cancel a
   19.34 +%% subscription.<br/>
   19.35  %% <br/>
   19.36  %% The channel will send to the relevant registered consumers the
   19.37  %% basic.consume_ok, basic.cancel_ok, basic.cancel and basic.deliver messages
   19.38 @@ -33,104 +35,39 @@
   19.39  %% If a consumer is not registered for a given consumer tag, the message
   19.40  %% is sent to the default consumer registered with
   19.41  %% {@module}:register_default_consumer. If there is no default consumer
   19.42 -%% registered in this case, an exception occurs and the channel is abrubptly
   19.43 +%% registered in this case, an exception occurs and the channel is abruptly
   19.44  %% terminated.<br/>
   19.45 -%% <br/>
   19.46 -%% amqp_channel:call(ChannelPid, #'basic.consume'{}) can also be used to
   19.47 -%% subscribe to a queue, but one must register a default consumer for messages
   19.48 -%% to be delivered to, beforehand. Failing to do so generates the
   19.49 -%% above-mentioned exception.<br/>
   19.50 -%% <br/>
   19.51 -%% This consumer implementation creates a link between the channel and the
   19.52 -%% registered consumers (either through register_default_consumer/2 or
   19.53 -%% through subscribe/3). A cancel (either issued by the user application or the
   19.54 -%% server) causes the link to be removed. In addition, registering another
   19.55 -%% default consumer causes the old one to be unlinked.<br/>
   19.56 -%% Warning! It is not recommended to rely on a consumer on killing off the
   19.57 -%% channel (through the exit signal). That may cause messages to get lost.
   19.58 -%% Always use amqp_channel:close/{1,3} for a clean shut down.
   19.59  -module(amqp_selective_consumer).
   19.60  
   19.61  -include("amqp_client.hrl").
   19.62 +-include("amqp_gen_consumer_spec.hrl").
   19.63  
   19.64  -behaviour(amqp_gen_consumer).
   19.65  
   19.66 --export([subscribe/3, cancel/2, register_default_consumer/2]).
   19.67 --export([init/1, handle_consume_ok/3, handle_cancel_ok/3, handle_cancel/2,
   19.68 -         handle_deliver/2, handle_call/2, terminate/2]).
   19.69 +-export([register_default_consumer/2]).
   19.70 +-export([init/1, handle_consume_ok/3, handle_consume/3, handle_cancel_ok/3,
   19.71 +         handle_cancel/2, handle_deliver/3, handle_info/2, handle_call/3,
   19.72 +         terminate/2]).
   19.73  
   19.74 --record(state, {consumers        = dict:new(), %% Tag -> ConsumerPid
   19.75 -                unassigned       = dict:new(), %% BasicConsume -> [ConsumerPid]
   19.76 -                default_consumer = none}).
   19.77 +-record(state, {consumers             = dict:new(), %% Tag -> ConsumerPid
   19.78 +                unassigned            = undefined,  %% Pid
   19.79 +                monitors              = dict:new(), %% Pid -> MRef
   19.80 +                default_consumer      = none}).
   19.81  
   19.82  %%---------------------------------------------------------------------------
   19.83  %% Interface
   19.84  %%---------------------------------------------------------------------------
   19.85  
   19.86 -%% @type consume() = #'basic.consume'{}.
   19.87 -%% The AMQP method that is used to  subscribe a consumer to a queue.
   19.88 -%% @type consume_ok() = #'basic.consume_ok'{}.
   19.89 -%% The AMQP method returned in response to basic.consume.
   19.90 -%% @spec (ChannelPid, consume(), ConsumerPid) -> Result
   19.91 -%% where
   19.92 -%%      ChannelPid = pid()
   19.93 -%%      ConsumerPid = pid()
   19.94 -%%      Result = consume_ok() | ok | error
   19.95 -%% @doc Creates a subscription to a queue. This subscribes a consumer pid to
   19.96 -%% the queue defined in the #'basic.consume'{} method record. Note that
   19.97 -%% both the process invoking this method and the supplied consumer process
   19.98 -%% receive an acknowledgement of the subscription. The calling process will
   19.99 -%% receive the acknowledgement as the return value of this function, whereas
  19.100 -%% the consumer process will receive the notification as a message,
  19.101 -%% asynchronously.<br/>
  19.102 -%% <br/>
  19.103 -%% Attempting to subscribe with a consumer_tag that is already in use or
  19.104 -%% to subscribe with nowait true and not specifying a consumer_tag will
  19.105 -%% cause an exception and the channel will terminate, causing this function
  19.106 -%% to throw. If nowait is set to true the function will return 'error'
  19.107 -%% immediately, and the channel will be terminated by the server.
  19.108 -subscribe(ChannelPid, BasicConsume, ConsumerPid) ->
  19.109 -    ok = amqp_channel:call_consumer(ChannelPid,
  19.110 -                                    {subscribe, BasicConsume, ConsumerPid}),
  19.111 -    amqp_channel:call(ChannelPid, BasicConsume).
  19.112 -
  19.113 -%% @type cancel() = #'basic.cancel'{}.
  19.114 -%% The AMQP method used to cancel a subscription.
  19.115 -%% @spec (ChannelPid, Cancel) -> amqp_method() | ok
  19.116 -%% where
  19.117 -%%      ChannelPid = pid()
  19.118 -%%      Cancel = cancel()
  19.119 -%% @doc This function is the same as calling
  19.120 -%% amqp_channel:call(ChannelPid, Cancel) and is only provided for completeness.
  19.121 -cancel(ChannelPid, #'basic.cancel'{} = Cancel) ->
  19.122 -    amqp_channel:call(ChannelPid, Cancel).
  19.123 -
  19.124  %% @spec (ChannelPid, ConsumerPid) -> ok
  19.125  %% where
  19.126  %%      ChannelPid = pid()
  19.127  %%      ConsumerPid = pid()
  19.128 -%% @doc This function registers a default consumer with the channel. A default
  19.129 -%% consumer is used in two situations:<br/>
  19.130 -%% <br/>
  19.131 -%% 1) A subscription was made via
  19.132 +%% @doc This function registers a default consumer with the channel. A
  19.133 +%% default consumer is used when a subscription is made via
  19.134  %% amqp_channel:call(ChannelPid, #'basic.consume'{}) (rather than
  19.135 -%% {@module}:subscribe/3) and hence there is no consumer pid registered with the
  19.136 -%% consumer tag.<br/>
  19.137 -%% <br/>
  19.138 -%% 2) The following sequence of events occurs:<br/>
  19.139 -%% <br/>
  19.140 -%% - subscribe is used with basic.consume with explicit acks<br/>
  19.141 -%% - some deliveries take place but are not acked<br/>
  19.142 -%% - a basic.cancel is issued<br/>
  19.143 -%% - a basic.recover{requeue = false} is issued<br/>
  19.144 -%% <br/>
  19.145 -%% Since requeue is specified to be false in the basic.recover, the spec
  19.146 -%% states that the message must be redelivered to "the original recipient"
  19.147 -%% - i.e. the same channel / consumer-tag. But the consumer is no longer
  19.148 -%% active. <br/>
  19.149 -%% <br/>
  19.150 -%% In these two cases, the relevant deliveries will be sent to the default
  19.151 -%% consumer.
  19.152 +%% {@module}:subscribe/3) and hence there is no consumer pid
  19.153 +%% registered with the consumer tag. In this case, the relevant
  19.154 +%% deliveries will be sent to the default consumer.
  19.155  register_default_consumer(ChannelPid, ConsumerPid) ->
  19.156      amqp_channel:call_consumer(ChannelPid,
  19.157                                 {register_default_consumer, ConsumerPid}).
  19.158 @@ -144,35 +81,8 @@
  19.159      {ok, #state{}}.
  19.160  
  19.161  %% @private
  19.162 -handle_consume_ok(BasicConsumeOk, BasicConsume, State) ->
  19.163 -    State1 = assign_consumer(BasicConsume, tag(BasicConsumeOk), State),
  19.164 -    deliver(BasicConsumeOk, State1),
  19.165 -    {ok, State1}.
  19.166 -
  19.167 -%% @private
  19.168 -handle_cancel_ok(CancelOk, _Cancel, State) ->
  19.169 -    %% Unlink first!
  19.170 -    State1 = do_cancel(CancelOk, State),
  19.171 -    %% Use old state
  19.172 -    deliver(CancelOk, State),
  19.173 -    {ok, State1}.
  19.174 -
  19.175 -%% @private
  19.176 -handle_cancel(Cancel, State) ->
  19.177 -    %% Unlink first!
  19.178 -    State1 = do_cancel(Cancel, State),
  19.179 -    %% Use old state
  19.180 -    deliver(Cancel, State),
  19.181 -    {ok, State1}.
  19.182 -
  19.183 -%% @private
  19.184 -handle_deliver(Deliver, State) ->
  19.185 -    deliver(Deliver, State),
  19.186 -    {ok, State}.
  19.187 -
  19.188 -%% @private
  19.189 -handle_call({subscribe, BasicConsume, Pid},
  19.190 -            State = #state{consumers = Consumers, unassigned = Unassigned}) ->
  19.191 +handle_consume(BasicConsume, Pid, State = #state{consumers = Consumers,
  19.192 +                                                 monitors = Monitors}) ->
  19.193      Tag = tag(BasicConsume),
  19.194      Ok =
  19.195          case BasicConsume of
  19.196 @@ -187,66 +97,121 @@
  19.197             _ ->
  19.198                 true
  19.199          end,
  19.200 -    if Ok ->
  19.201 -           case BasicConsume of
  19.202 -               #'basic.consume'{nowait = true} ->
  19.203 -                   {reply, ok,
  19.204 -                    State#state{consumers = dict:store(Tag, Pid, Consumers)}};
  19.205 -               #'basic.consume'{nowait = false} ->
  19.206 -                   NewUnassigned =
  19.207 -                       dict:update(BasicConsume, fun (Pids) -> [Pid | Pids] end,
  19.208 -                                   [Pid], Unassigned),
  19.209 -                   {reply, ok, State#state{unassigned = NewUnassigned}}
  19.210 -           end;
  19.211 -       true ->
  19.212 -           %% There is an error. Don't do anything (don't override existing
  19.213 -           %% consumers), the server will close the channel with an error.
  19.214 -           {reply, error, State}
  19.215 -    end;
  19.216 -%% @private
  19.217 -handle_call({register_default_consumer, Pid},
  19.218 -            State = #state{default_consumer = PrevPid}) ->
  19.219 -    case PrevPid of none -> ok;
  19.220 -                    _    -> unlink(PrevPid)
  19.221 -    end,
  19.222 -    link(Pid),
  19.223 -    {reply, ok, State#state{default_consumer = Pid}}.
  19.224 +    case {Ok, BasicConsume} of
  19.225 +        {true, #'basic.consume'{nowait = true}} ->
  19.226 +            {ok, State#state
  19.227 +             {consumers = dict:store(Tag, Pid, Consumers),
  19.228 +              monitors  = dict:store(Pid, monitor(process, Pid), Monitors)}};
  19.229 +        {true, #'basic.consume'{nowait = false}} ->
  19.230 +            {ok, State#state{unassigned = Pid}};
  19.231 +        {false, #'basic.consume'{nowait = true}} ->
  19.232 +            {error, 'no_consumer_tag_specified', State};
  19.233 +        {false, #'basic.consume'{nowait = false}} ->
  19.234 +            %% Don't do anything (don't override existing
  19.235 +            %% consumers), the server will close the channel with an error.
  19.236 +            {ok, State}
  19.237 +    end.
  19.238  
  19.239  %% @private
  19.240 -terminate(_Reason, _State) ->
  19.241 -    ok.
  19.242 +handle_consume_ok(BasicConsumeOk, _BasicConsume,
  19.243 +                  State = #state{unassigned = Pid,
  19.244 +                                 consumers  = Consumers,
  19.245 +                                 monitors   = Monitors})
  19.246 +  when is_pid(Pid) ->
  19.247 +    State1 = State#state{consumers  =
  19.248 +                             dict:store(tag(BasicConsumeOk), Pid, Consumers),
  19.249 +                         monitors   =
  19.250 +                             dict:store(Pid, monitor(process, Pid), Monitors),
  19.251 +                         unassigned = undefined},
  19.252 +    deliver(BasicConsumeOk, State1),
  19.253 +    {ok, State1}.
  19.254 +
  19.255 +%% @private
  19.256 +%% The server sent a basic.cancel.
  19.257 +handle_cancel(Cancel, State) ->
  19.258 +    State1 = do_cancel(Cancel, State),
  19.259 +    %% Use old state
  19.260 +    deliver(Cancel, State),
  19.261 +    {ok, State1}.
  19.262 +
  19.263 +%% @private
  19.264 +%% We sent a basic.cancel and now receive the ok.
  19.265 +handle_cancel_ok(CancelOk, _Cancel, State) ->
  19.266 +    State1 = do_cancel(CancelOk, State),
  19.267 +    %% Use old state
  19.268 +    deliver(CancelOk, State),
  19.269 +    {ok, State1}.
  19.270 +
  19.271 +%% @private
  19.272 +handle_deliver(Deliver, Message, State) ->
  19.273 +    deliver(Deliver, Message, State),
  19.274 +    {ok, State}.
  19.275 +
  19.276 +%% @private
  19.277 +handle_info({'DOWN', _MRef, process, Pid, _Info},
  19.278 +            State = #state{monitors         = Monitors,
  19.279 +                           consumers        = Consumers,
  19.280 +                           default_consumer = DConsumer }) ->
  19.281 +    case dict:find(Pid, Monitors) of
  19.282 +        {ok, _Tag} ->
  19.283 +            {ok, State#state{monitors = dict:erase(Pid, Monitors),
  19.284 +                             consumers =
  19.285 +                                 dict:filter(
  19.286 +                                   fun (_, Pid1) when Pid1 =:= Pid -> false;
  19.287 +                                       (_, _)                      -> true
  19.288 +                                   end, Consumers)}};
  19.289 +        error ->
  19.290 +            case Pid of
  19.291 +                DConsumer -> {ok, State#state{
  19.292 +                                    monitors = dict:erase(Pid, Monitors),
  19.293 +                                    default_consumer = none}};
  19.294 +                _         -> {ok, State} %% unnamed consumer went down
  19.295 +                                         %% before receiving consume_ok
  19.296 +            end
  19.297 +    end.
  19.298 +
  19.299 +%% @private
  19.300 +handle_call({register_default_consumer, Pid}, _From,
  19.301 +            State = #state{default_consumer = PrevPid,
  19.302 +                           monitors         = Monitors}) ->
  19.303 +    case PrevPid of
  19.304 +        none -> ok;
  19.305 +        _    -> demonitor(dict:fetch(PrevPid, Monitors)),
  19.306 +                dict:erase(PrevPid, Monitors)
  19.307 +    end,
  19.308 +    {reply, ok,
  19.309 +     State#state{default_consumer = Pid,
  19.310 +                 monitors = dict:store(Pid, monitor(process, Pid),
  19.311 +                                       Monitors)}}.
  19.312 +
  19.313 +%% @private
  19.314 +terminate(_Reason, State) ->
  19.315 +    State.
  19.316  
  19.317  %%---------------------------------------------------------------------------
  19.318  %% Internal plumbing
  19.319  %%---------------------------------------------------------------------------
  19.320  
  19.321 -assign_consumer(BasicConsume, Tag, State = #state{consumers = Consumers,
  19.322 -                                                  unassigned = Unassigned}) ->
  19.323 -    case dict:find(BasicConsume, Unassigned) of
  19.324 -        {ok, [Pid]} ->
  19.325 -            State#state{unassigned = dict:erase(BasicConsume, Unassigned),
  19.326 -                        consumers = dict:store(Tag, Pid, Consumers)};
  19.327 -        {ok, [Pid | RestPids]} ->
  19.328 -            State#state{unassigned = dict:store(BasicConsume, RestPids,
  19.329 -                                                Unassigned),
  19.330 -                        consumers = dict:store(Tag, Pid, Consumers)};
  19.331 -        error ->
  19.332 -            %% Untracked consumer (subscribed with amqp_channel:call/2)
  19.333 -            State
  19.334 -    end.
  19.335 -
  19.336  deliver(Msg, State) ->
  19.337 +    deliver(Msg, undefined, State).
  19.338 +deliver(Msg, Message, State) ->
  19.339 +    Combined = if Message =:= undefined -> Msg;
  19.340 +                  true                  -> {Msg, Message}
  19.341 +               end,
  19.342      case resolve_consumer(tag(Msg), State) of
  19.343 -        {consumer, Pid} -> Pid ! Msg;
  19.344 -        {default, Pid}  -> Pid ! Msg;
  19.345 +        {consumer, Pid} -> Pid ! Combined;
  19.346 +        {default, Pid}  -> Pid ! Combined;
  19.347          error           -> exit(unexpected_delivery_and_no_default_consumer)
  19.348      end.
  19.349  
  19.350 -do_cancel(Cancel, State = #state{consumers = Consumers}) ->
  19.351 +do_cancel(Cancel, State = #state{consumers = Consumers,
  19.352 +                                 monitors  = Monitors}) ->
  19.353      Tag = tag(Cancel),
  19.354      case dict:find(Tag, Consumers) of
  19.355 -        {ok, Pid} -> unlink(Pid),
  19.356 -                     State#state{consumers = dict:erase(Tag, Consumers)};
  19.357 +        {ok, Pid} -> MRef = dict:fetch(Pid, Monitors),
  19.358 +                     demonitor(MRef),
  19.359 +                     State#state{consumers = dict:erase(Tag, Consumers),
  19.360 +                                 monitors  = dict:erase(Pid, Monitors)};
  19.361          error     -> %% Untracked consumer. Do nothing.
  19.362                       State
  19.363      end.
  19.364 @@ -265,4 +230,4 @@
  19.365  tag(#'basic.consume_ok'{consumer_tag = Tag})      -> Tag;
  19.366  tag(#'basic.cancel'{consumer_tag = Tag})          -> Tag;
  19.367  tag(#'basic.cancel_ok'{consumer_tag = Tag})       -> Tag;
  19.368 -tag({#'basic.deliver'{consumer_tag = Tag}, _})    -> Tag.
  19.369 +tag(#'basic.deliver'{consumer_tag = Tag})         -> Tag.
    20.1 --- a/src/amqp_sup.erl	Thu Apr 07 10:36:27 2011 -0500
    20.2 +++ b/src/amqp_sup.erl	Thu Jul 28 13:46:07 2011 +0100
    20.3 @@ -21,7 +21,7 @@
    20.4  
    20.5  -behaviour(supervisor2).
    20.6  
    20.7 --export([start_link/0, start_connection_sup/3]).
    20.8 +-export([start_link/0, start_connection_sup/1]).
    20.9  -export([init/1]).
   20.10  
   20.11  %%---------------------------------------------------------------------------
   20.12 @@ -31,8 +31,8 @@
   20.13  start_link() ->
   20.14      supervisor2:start_link({local, amqp_sup}, ?MODULE, []).
   20.15  
   20.16 -start_connection_sup(Type, Module, AmqpParams) ->
   20.17 -    supervisor2:start_child(amqp_sup, [Type, Module, AmqpParams]).
   20.18 +start_connection_sup(AmqpParams) ->
   20.19 +    supervisor2:start_child(amqp_sup, [AmqpParams]).
   20.20  
   20.21  %%---------------------------------------------------------------------------
   20.22  %% supervisor2 callbacks
    21.1 --- a/test.mk	Thu Apr 07 10:36:27 2011 -0500
    21.2 +++ b/test.mk	Thu Jul 28 13:46:07 2011 +0100
    21.3 @@ -60,7 +60,8 @@
    21.4  run_test_detached: start_test_broker_node
    21.5  	OK=true && \
    21.6  	TMPFILE=$(MKTEMP) && \
    21.7 -	{ $(RUN) -noinput $(TESTING_MESSAGE) $(RUN_TEST_ARGS) \
    21.8 +	{ $(RUN) -noinput $(TESTING_MESSAGE) \
    21.9 +	   $(SSL_CLIENT_ARGS) $(RUN_TEST_ARGS) \
   21.10  	    -s init stop 2>&1 | tee $$TMPFILE || OK=false; } && \
   21.11  	{ $(IS_SUCCESS) $$TMPFILE || OK=false; } && \
   21.12  	rm $$TMPFILE && \
   21.13 @@ -79,7 +80,7 @@
   21.14  	$(MAKE) unboot_broker
   21.15  
   21.16  boot_broker:
   21.17 -	$(MAKE) -C $(BROKER_DIR) start-background-node
   21.18 +	$(MAKE) -C $(BROKER_DIR) start-background-node RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS) $(SSL_BROKER_ARGS)"
   21.19  	$(MAKE) -C $(BROKER_DIR) start-rabbit-on-node
   21.20  
   21.21  unboot_broker:
    22.1 --- a/test/amqp_client_SUITE.erl	Thu Apr 07 10:36:27 2011 -0500
    22.2 +++ b/test/amqp_client_SUITE.erl	Thu Jul 28 13:46:07 2011 +0100
    22.3 @@ -58,6 +58,9 @@
    22.4  pub_and_close_test_()                   -> ?RUN([{timeout, 60}]).
    22.5  channel_tune_negotiation_test_()        -> ?RUN([]).
    22.6  confirm_test_()                         -> ?RUN([]).
    22.7 +confirm_barrier_test_()                 -> ?RUN([]).
    22.8 +confirm_barrier_nop_test_()             -> ?RUN([]).
    22.9 +default_consumer_test()                 -> ?RUN([]).
   22.10  subscribe_nowait_test_()                -> ?RUN([]).
   22.11  
   22.12  non_existent_exchange_test_()           -> ?RUN([negative]).
    23.1 --- a/test/negative_test_util.erl	Thu Apr 07 10:36:27 2011 -0500
    23.2 +++ b/test/negative_test_util.erl	Thu Jul 28 13:46:07 2011 +0100
    23.3 @@ -24,12 +24,12 @@
    23.4  non_existent_exchange_test() ->
    23.5      Connection = test_util:new_connection(),
    23.6      X = test_util:uuid(),
    23.7 -    RoutingKey = <<"a">>, 
    23.8 +    RoutingKey = <<"a">>,
    23.9      Payload = <<"foobar">>,
   23.10      {ok, Channel} = amqp_connection:open_channel(Connection),
   23.11      {ok, OtherChannel} = amqp_connection:open_channel(Connection),
   23.12      amqp_channel:call(Channel, #'exchange.declare'{exchange = X}),
   23.13 -    
   23.14 +
   23.15      %% Deliberately mix up the routingkey and exchange arguments
   23.16      Publish = #'basic.publish'{exchange = RoutingKey, routing_key = X},
   23.17      amqp_channel:call(Channel, Publish, #amqp_msg{payload = Payload}),
   23.18 @@ -177,20 +177,18 @@
   23.19      end.
   23.20  
   23.21  non_existent_user_test() ->
   23.22 -    Params = #amqp_params{username = test_util:uuid(),
   23.23 -                          password = test_util:uuid()},
   23.24 +    Params = [{username, test_util:uuid()}, {password, test_util:uuid()}],
   23.25      ?assertMatch({error, auth_failure}, test_util:new_connection(Params)).
   23.26  
   23.27  invalid_password_test() ->
   23.28 -    Params = #amqp_params{username = <<"guest">>,
   23.29 -                          password = test_util:uuid()},
   23.30 +    Params = [{username, <<"guest">>}, {password, test_util:uuid()}],
   23.31      ?assertMatch({error, auth_failure}, test_util:new_connection(Params)).
   23.32  
   23.33  non_existent_vhost_test() ->
   23.34 -    Params = #amqp_params{virtual_host = test_util:uuid()},
   23.35 +    Params = [{virtual_host, test_util:uuid()}],
   23.36      ?assertMatch({error, access_refused}, test_util:new_connection(Params)).
   23.37  
   23.38  no_permission_test() ->
   23.39 -    Params = #amqp_params{username = <<"test_user_no_perm">>,
   23.40 -                          password = <<"test_user_no_perm">>},
   23.41 +    Params = [{username, <<"test_user_no_perm">>},
   23.42 +              {password, <<"test_user_no_perm">>}],
   23.43      ?assertMatch({error, access_refused}, test_util:new_connection(Params)).
    24.1 --- a/test/test_util.erl	Thu Apr 07 10:36:27 2011 -0500
    24.2 +++ b/test/test_util.erl	Thu Jul 28 13:46:07 2011 +0100
    24.3 @@ -337,13 +337,12 @@
    24.4                                                   exchange = X,
    24.5                                                   routing_key = RoutingKey}),
    24.6      #'basic.consume_ok'{} =
    24.7 -        amqp_selective_consumer:subscribe(
    24.8 -            Channel, #'basic.consume'{queue = Q, consumer_tag = Tag}, self()),
    24.9 +        amqp_channel:call(Channel,
   24.10 +                          #'basic.consume'{queue = Q, consumer_tag = Tag}),
   24.11      receive #'basic.consume_ok'{consumer_tag = Tag} -> ok end,
   24.12      receive {#'basic.deliver'{}, _} -> ok end,
   24.13      #'basic.cancel_ok'{} =
   24.14 -        amqp_selective_consumer:cancel(
   24.15 -            Channel, #'basic.cancel'{consumer_tag = Tag}),
   24.16 +        amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = Tag}),
   24.17      receive #'basic.cancel_ok'{consumer_tag = Tag} -> ok end,
   24.18      Parent ! finished.
   24.19  
   24.20 @@ -354,8 +353,7 @@
   24.21      #'queue.declare_ok'{} =
   24.22          amqp_channel:call(Channel, #'queue.declare'{queue = Q}),
   24.23      #'basic.consume_ok'{consumer_tag = CTag} = ConsumeOk =
   24.24 -        amqp_selective_consumer:subscribe(
   24.25 -            Channel, #'basic.consume'{queue = Q}, self()),
   24.26 +        amqp_channel:call(Channel, #'basic.consume'{queue = Q}),
   24.27      receive ConsumeOk -> ok end,
   24.28      #'queue.delete_ok'{} =
   24.29          amqp_channel:call(Channel, #'queue.delete'{queue = Q}),
   24.30 @@ -413,10 +411,10 @@
   24.31      #'exchange.declare_ok'{} =
   24.32          amqp_channel:call(Channel2, #'exchange.declare'{exchange = uuid()}),
   24.33  
   24.34 -    teardown(Connection, Channel2).    
   24.35 +    teardown(Connection, Channel2).
   24.36  
   24.37  channel_tune_negotiation_test() ->
   24.38 -    amqp_connection:close(new_connection(#amqp_params{channel_max = 10})).
   24.39 +    amqp_connection:close(new_connection([{channel_max, 10}])).
   24.40  
   24.41  basic_qos_test() ->
   24.42      [NoQos, Qos] = [basic_qos_test(Prefetch) || Prefetch <- [0,1]],
   24.43 @@ -437,8 +435,8 @@
   24.44                  {ok, Channel} = amqp_connection:open_channel(Connection),
   24.45                  amqp_channel:call(Channel,
   24.46                                    #'basic.qos'{prefetch_count = Prefetch}),
   24.47 -                amqp_selective_consumer:subscribe(
   24.48 -                    Channel, #'basic.consume'{queue = Q}, self()),
   24.49 +                amqp_channel:call(Channel,
   24.50 +                                  #'basic.consume'{queue = Q}),
   24.51                  Parent ! finished,
   24.52                  sleeping_consumer(Channel, Sleep, Parent)
   24.53              end) || Sleep <- Workers],
   24.54 @@ -491,7 +489,6 @@
   24.55      {ok, Channel} = amqp_connection:open_channel(Connection),
   24.56      #'confirm.select_ok'{} = amqp_channel:call(Channel, #'confirm.select'{}),
   24.57      amqp_channel:register_confirm_handler(Channel, self()),
   24.58 -    io:format("Registered ~p~n", [self()]),
   24.59      {ok, Q} = setup_publish(Channel),
   24.60      {#'basic.get_ok'{}, _}
   24.61          = amqp_channel:call(Channel, #'basic.get'{queue = Q, no_ack = false}),
   24.62 @@ -503,15 +500,65 @@
   24.63           end,
   24.64      teardown(Connection, Channel).
   24.65  
   24.66 +confirm_barrier_test() ->
   24.67 +    Connection = new_connection(),
   24.68 +    {ok, Channel} = amqp_connection:open_channel(Connection),
   24.69 +    #'confirm.select_ok'{} = amqp_channel:call(Channel, #'confirm.select'{}),
   24.70 +    [amqp_channel:call(Channel, #'basic.publish'{routing_key = <<"whoosh">>},
   24.71 +                       #amqp_msg{payload = <<"foo">>})
   24.72 +     || _ <- lists:seq(1, 10)],
   24.73 +    true = amqp_channel:wait_for_confirms(Channel),
   24.74 +    teardown(Connection, Channel).
   24.75 +
   24.76 +confirm_barrier_nop_test() ->
   24.77 +    Connection = new_connection(),
   24.78 +    {ok, Channel} = amqp_connection:open_channel(Connection),
   24.79 +    true = amqp_channel:wait_for_confirms(Channel),
   24.80 +    amqp_channel:call(Channel, #'basic.publish'{routing_key = <<"whoosh">>},
   24.81 +                      #amqp_msg{payload = <<"foo">>}),
   24.82 +    true = amqp_channel:wait_for_confirms(Channel),
   24.83 +    teardown(Connection, Channel).
   24.84 +
   24.85 +default_consumer_test() ->
   24.86 +    Connection = new_connection(),
   24.87 +    {ok, Channel} = amqp_connection:open_channel(Connection),
   24.88 +    amqp_selective_consumer:register_default_consumer(Channel, self()),
   24.89 +
   24.90 +    #'queue.declare_ok'{queue = Q}
   24.91 +        = amqp_channel:call(Channel, #'queue.declare'{}),
   24.92 +    Pid = spawn(fun () -> receive
   24.93 +                          after 10000 -> ok
   24.94 +                          end
   24.95 +                end),
   24.96 +    #'basic.consume_ok'{} =
   24.97 +        amqp_channel:subscribe(Channel, #'basic.consume'{queue = Q}, Pid),
   24.98 +    monitor(process, Pid),
   24.99 +    exit(Pid, shutdown),
  24.100 +    receive
  24.101 +        {'DOWN', _, process, _, _} ->
  24.102 +            io:format("little consumer died out~n")
  24.103 +    end,
  24.104 +    Payload = <<"for the default consumer">>,
  24.105 +    amqp_channel:call(Channel,
  24.106 +                      #'basic.publish'{exchange = <<>>, routing_key = Q},
  24.107 +                      #amqp_msg{payload = Payload}),
  24.108 +
  24.109 +    receive
  24.110 +        {#'basic.deliver'{}, #'amqp_msg'{payload = Payload}} ->
  24.111 +            ok
  24.112 +    after 1000 ->
  24.113 +            exit('default_consumer_didnt_work')
  24.114 +    end,
  24.115 +    teardown(Connection, Channel).
  24.116 +
  24.117  subscribe_nowait_test() ->
  24.118      Connection = new_connection(),
  24.119      {ok, Channel} = amqp_connection:open_channel(Connection),
  24.120      {ok, Q} = setup_publish(Channel),
  24.121 -    ok = amqp_selective_consumer:subscribe(
  24.122 -             Channel, #'basic.consume'{queue = Q,
  24.123 -                                       consumer_tag = uuid(),
  24.124 -                                       nowait = true},
  24.125 -             self()),
  24.126 +    ok = amqp_channel:call(Channel,
  24.127 +                           #'basic.consume'{queue = Q,
  24.128 +                                            consumer_tag = uuid(),
  24.129 +                                            nowait = true}),
  24.130      receive #'basic.consume_ok'{} -> exit(unexpected_consume_ok)
  24.131      after 0 -> ok
  24.132      end,
  24.133 @@ -750,15 +797,15 @@
  24.134      <<A:32, B:32, C:32>>.
  24.135  
  24.136  new_connection() ->
  24.137 -    new_connection(both, #amqp_params{}).
  24.138 +    new_connection(both, []).
  24.139  
  24.140  new_connection(AllowedConnectionTypes) when is_atom(AllowedConnectionTypes) ->
  24.141 -    new_connection(AllowedConnectionTypes, #amqp_params{});
  24.142 -new_connection(#amqp_params{} = AmqpParams) ->
  24.143 -    new_connection(both, AmqpParams).
  24.144 +    new_connection(AllowedConnectionTypes, []);
  24.145 +new_connection(Params) when is_list(Params) ->
  24.146 +    new_connection(both, Params).
  24.147  
  24.148 -new_connection(AllowedConnectionTypes, AmqpParams) ->
  24.149 -    {Type, Params} =
  24.150 +new_connection(AllowedConnectionTypes, Params) ->
  24.151 +    Params1 =
  24.152          case {AllowedConnectionTypes,
  24.153                os:getenv("AMQP_CLIENT_TEST_CONNECTION_TYPE")} of
  24.154              {just_direct, "network"} ->
  24.155 @@ -768,24 +815,41 @@
  24.156              {just_network, "direct"} ->
  24.157                  exit(normal);
  24.158              {_, "network"} ->
  24.159 -                {network, AmqpParams};
  24.160 +                make_network_params(Params);
  24.161              {_, "network_ssl"} ->
  24.162                  {ok, [[CertsDir]]} = init:get_argument(erlang_client_ssl_dir),
  24.163 -                {network,
  24.164 -                 AmqpParams#amqp_params{
  24.165 -                     port = 5671,
  24.166 -                     ssl_options = [{cacertfile,
  24.167 -                                     CertsDir ++ "/testca/cacert.pem"},
  24.168 -                                    {certfile, CertsDir ++ "/client/cert.pem"},
  24.169 -                                    {keyfile, CertsDir ++ "/client/key.pem"},
  24.170 -                                    {verify, verify_peer},
  24.171 -                                    {fail_if_no_peer_cert, true}]}};
  24.172 +                make_network_params(
  24.173 +                  [{ssl_options, [{cacertfile,
  24.174 +                                   CertsDir ++ "/testca/cacert.pem"},
  24.175 +                                  {certfile, CertsDir ++ "/client/cert.pem"},
  24.176 +                                  {keyfile, CertsDir ++ "/client/key.pem"},
  24.177 +                                  {verify, verify_peer},
  24.178 +                                  {fail_if_no_peer_cert, true}]}] ++ Params);
  24.179              {_, "direct"} ->
  24.180 -                {direct,
  24.181 -                 AmqpParams#amqp_params{node = rabbit_misc:makenode(rabbit)}}
  24.182 +                make_direct_params([node, rabbit_misc:makenode(rabbit)] ++
  24.183 +                                       Params)
  24.184          end,
  24.185 -    case amqp_connection:start(Type, Params) of
  24.186 +    case amqp_connection:start(Params1) of
  24.187          {ok, Conn}     -> Conn;
  24.188          {error, _} = E -> E
  24.189      end.
  24.190  
  24.191 +%% Note: not all amqp_params_network fields supported.
  24.192 +make_network_params(Props) ->
  24.193 +    Pgv = fun (Key, Default) ->
  24.194 +                  proplists:get_value(Key, Props, Default)
  24.195 +          end,
  24.196 +    #amqp_params_network{username     = Pgv(username, <<"guest">>),
  24.197 +                         password     = Pgv(password, <<"guest">>),
  24.198 +                         virtual_host = Pgv(virtual_host, <<"/">>),
  24.199 +                         channel_max  = Pgv(channel_max, 0),
  24.200 +                         ssl_options  = Pgv(ssl_options, none)}.
  24.201 +
  24.202 +%% Note: not all amqp_params_direct fields supported.
  24.203 +make_direct_params(Props) ->
  24.204 +    Pgv = fun (Key, Default) ->
  24.205 +                  proplists:get_value(Key, Props, Default)
  24.206 +          end,
  24.207 +    #amqp_params_direct{username     = Pgv(username, <<"guest">>),
  24.208 +                        virtual_host = Pgv(virtual_host, <<"/">>),
  24.209 +                        node         = Pgv(node, node())}.