src/test_util.erl
author Ben Hood <0x6e6562@gmail.com>
Fri, 09 Jan 2009 18:28:35 +0000
branchbug20103
changeset 206 437d0e4d66c8
parent 201 a4b1c7d1109b
child 209 a9c530d8f641
permissions -rw-r--r--
Switched over to gen_server2 handling
     1 %%   The contents of this file are subject to the Mozilla Public License
     2 %%   Version 1.1 (the "License"); you may not use this file except in
     3 %%   compliance with the License. You may obtain a copy of the License at
     4 %%   http://www.mozilla.org/MPL/
     5 %%
     6 %%   Software distributed under the License is distributed on an "AS IS"
     7 %%   basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
     8 %%   License for the specific language governing rights and limitations
     9 %%   under the License.
    10 %%
    11 %%   The Original Code is the RabbitMQ Erlang Client.
    12 %%
    13 %%   The Initial Developers of the Original Code are LShift Ltd.,
    14 %%   Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
    15 %%
    16 %%   Portions created by LShift Ltd., Cohesive Financial
    17 %%   Technologies LLC., and Rabbit Technologies Ltd. are Copyright (C)
    18 %%   2007 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
    19 %%   Technologies Ltd.;
    20 %%
    21 %%   All Rights Reserved.
    22 %%
    23 %%   Contributor(s): Ben Hood <0x6e6562@gmail.com>.
    24 %%
    25 
    26 -module(test_util).
    27 
    28 -include_lib("rabbitmq_server/include/rabbit.hrl").
    29 -include_lib("rabbitmq_server/include/rabbit_framing.hrl").
    30 -include_lib("eunit/include/eunit.hrl").
    31 -include("amqp_client.hrl").
    32 
    33 -compile([export_all]).
    34 
    35 -record(publish, {q, x, routing_key, bind_key, payload,
    36                  mandatory = false, immediate = false}).
    37 
    38 %% The latch constant defines how many processes are spawned in order
    39 %% to run certain functionality in parallel. It follows the standard
    40 %% countdown latch pattern.
    41 -define(Latch, 100).
    42 
    43 %% The wait constant defines how long a consumer waits before it
    44 %% unsubscribes
    45 -define(Wait, 200).
    46 
    47 %%%%
    48 %%
    49 %% This is an example of how the client interaction should work
    50 %%
    51 %%   Connection = amqp_connection:start(User, Password, Host),
    52 %%   Channel = amqp_connection:open_channel(Connection),
    53 %%   %%...do something useful
    54 %%   ChannelClose = #'channel.close'{ %% set the appropriate fields },
    55 %%   amqp_channel:call(Channel, ChannelClose),
    56 %%   ConnectionClose = #'connection.close'{ %% set the appropriate fields },
    57 %%   amqp_connection:close(Connection, ConnectionClose).
    58 %%
    59 
    60 lifecycle_test(Connection) ->
    61     X = <<"x">>,
    62     Channel = lib_amqp:start_channel(Connection),
    63     lib_amqp:declare_exchange(Channel, X, <<"topic">>),
    64     Parent = self(),
    65     [spawn(
    66            fun() ->
    67                 queue_exchange_binding(Channel, X, Parent, Tag) end)
    68             || Tag <- lists:seq(1, ?Latch)],
    69     latch_loop(?Latch),
    70     lib_amqp:delete_exchange(Channel, X),
    71     lib_amqp:teardown(Connection, Channel),
    72     ok.
    73 
    74 queue_exchange_binding(Channel, X, Parent, Tag) ->
    75     receive
    76         nothing -> ok
    77     after (?Latch - Tag rem 7) * 10 ->
    78         ok
    79     end,
    80     Q = <<"a.b.c", Tag:32>>,
    81     Binding = <<"a.b.c.*">>,
    82     Q1 = lib_amqp:declare_queue(Channel, Q),
    83     ?assertMatch(Q, Q1),
    84     lib_amqp:bind_queue(Channel, X, Q, Binding),
    85     lib_amqp:delete_queue(Channel, Q),
    86     Parent ! finished.
    87 
    88 channel_lifecycle_test(Connection) ->
    89     Channel = lib_amqp:start_channel(Connection),
    90     lib_amqp:close_channel(Channel),
    91     Channel2 = lib_amqp:start_channel(Connection),
    92     lib_amqp:teardown(Connection, Channel2),
    93     ok.
    94 
    95 %% This is designed to exercize the internal queuing mechanism
    96 %% to ensure that commands are properly serialized
    97 command_serialization_test(Connection) ->
    98     Channel = lib_amqp:start_channel(Connection),
    99     Parent = self(),
   100     [spawn(fun() ->
   101                 Q = uuid(),
   102                 Q1 = lib_amqp:declare_queue(Channel, Q),
   103                 ?assertMatch(Q, Q1),
   104                 Parent ! finished
   105            end) || _ <- lists:seq(1, ?Latch)],
   106     latch_loop(?Latch),
   107     lib_amqp:teardown(Connection, Channel).
   108 
   109 queue_unbind_test(Connection) ->
   110     X = <<"eggs">>, Q = <<"foobar">>, Key = <<"quay">>,
   111     Payload = <<"foobar">>,
   112     Channel = lib_amqp:start_channel(Connection),
   113     lib_amqp:declare_exchange(Channel, X),
   114     lib_amqp:declare_queue(Channel, Q),
   115     lib_amqp:bind_queue(Channel, X, Q, Key),
   116     lib_amqp:publish(Channel, X, Key, Payload),
   117     get_and_assert_equals(Channel, Q, Payload),
   118     lib_amqp:unbind_queue(Channel, X, Q, Key),
   119     lib_amqp:publish(Channel, X, Key, Payload),
   120     get_and_assert_empty(Channel, Q),
   121     lib_amqp:teardown(Connection, Channel).
   122 
   123 get_and_assert_empty(Channel, Q) ->
   124     BasicGetEmpty = lib_amqp:get(Channel, Q, false),
   125     ?assertMatch('basic.get_empty', BasicGetEmpty).
   126 
   127 get_and_assert_equals(Channel, Q, Payload) ->
   128     Content = lib_amqp:get(Channel, Q),
   129     #content{payload_fragments_rev = PayloadFragments} = Content,
   130     ?assertMatch([Payload], PayloadFragments).
   131 
   132 basic_get_test(Connection) ->
   133     Channel = lib_amqp:start_channel(Connection),
   134     {ok, Q} = setup_publish(Channel),
   135     %% TODO: This could be refactored to use get_and_assert_equals,
   136     %% get_and_assert_empty .... would require another bug though :-)
   137     Content = lib_amqp:get(Channel, Q),
   138     #content{payload_fragments_rev = PayloadFragments} = Content,
   139     ?assertMatch([<<"foobar">>], PayloadFragments),
   140     BasicGetEmpty = lib_amqp:get(Channel, Q, false),
   141     ?assertMatch('basic.get_empty', BasicGetEmpty),
   142     lib_amqp:teardown(Connection, Channel).
   143 
   144 basic_return_test(Connection) ->
   145     X = uuid(),
   146     Q = uuid(),
   147     Key = uuid(),
   148     Payload = <<"qwerty">>,
   149     Channel = lib_amqp:start_channel(Connection),
   150     amqp_channel:register_return_handler(Channel, self()),
   151     lib_amqp:declare_exchange(Channel, X),
   152     lib_amqp:declare_queue(Channel, Q),
   153     lib_amqp:publish(Channel, X, Key, Payload, true),
   154     timer:sleep(200),
   155     receive
   156         {BasicReturn = #'basic.return'{}, Content} ->
   157             #'basic.return'{reply_text = ReplyText,
   158                             exchange = X} = BasicReturn,
   159             ?assertMatch(<<"unroutable">>, ReplyText),
   160             #content{payload_fragments_rev = Payload2} = Content,
   161             ?assertMatch([Payload], Payload2);
   162         WhatsThis ->
   163             %% TODO investigate where this comes from
   164             io:format("Spurious message ~p~n", [WhatsThis])
   165     after 2000 ->
   166         exit(no_return_received)
   167     end,
   168     lib_amqp:teardown(Connection, Channel).
   169 
   170 basic_ack_test(Connection) ->
   171     Channel = lib_amqp:start_channel(Connection),
   172     {ok, Q} = setup_publish(Channel),
   173     {DeliveryTag, _} = lib_amqp:get(Channel, Q, false),
   174     lib_amqp:ack(Channel, DeliveryTag),
   175     lib_amqp:teardown(Connection, Channel).
   176 
   177 basic_consume_test(Connection) ->
   178     Channel = lib_amqp:start_channel(Connection),
   179     X = uuid(),
   180     lib_amqp:declare_exchange(Channel, X),
   181     RoutingKey = uuid(),
   182     Parent = self(),
   183     [spawn(
   184         fun() ->
   185             consume_loop(Channel, X, RoutingKey, Parent, <<Tag:32>>) end)
   186         || Tag <- lists:seq(1, ?Latch)],
   187     timer:sleep(?Latch * 20),
   188     lib_amqp:publish(Channel, X, RoutingKey, <<"foobar">>),
   189     latch_loop(?Latch),
   190     lib_amqp:teardown(Connection, Channel).
   191 
   192 consume_loop(Channel, X, RoutingKey, Parent, Tag) ->
   193     Q = lib_amqp:declare_queue(Channel),
   194     lib_amqp:bind_queue(Channel, X, Q, RoutingKey),
   195     lib_amqp:subscribe(Channel, Q, self(), Tag),
   196     receive
   197         #'basic.consume_ok'{consumer_tag = Tag} -> ok
   198     end,
   199     receive
   200         {#'basic.deliver'{}, _} -> ok
   201     end,
   202     lib_amqp:unsubscribe(Channel, Tag),
   203     receive
   204         #'basic.cancel_ok'{consumer_tag = Tag} -> ok
   205     end,
   206     Parent ! finished.
   207 
   208 basic_recover_test(Connection) ->
   209     Q = uuid(),
   210     Channel = lib_amqp:start_channel(Connection),
   211     lib_amqp:declare_queue(Channel, Q),
   212     Tag = lib_amqp:subscribe(Channel, Q, self(), false),
   213     receive
   214         #'basic.consume_ok'{consumer_tag = Tag} -> ok
   215     after 2000 ->
   216         exit(did_not_receive_subscription_message)
   217     end,
   218     lib_amqp:publish(Channel, <<>>, Q, <<"foobar">>),
   219     receive
   220         {#'basic.deliver'{}, _} ->
   221             %% no_ack set to false, but don't send ack
   222             ok
   223     after 2000 ->
   224         exit(did_not_receive_first_message)
   225     end,
   226     BasicRecover = #'basic.recover'{requeue = true},
   227     amqp_channel:cast(Channel, BasicRecover),
   228     receive
   229         {#'basic.deliver'{delivery_tag = DeliveryTag2}, _} ->
   230             lib_amqp:ack(Channel, DeliveryTag2)
   231     after 2000 ->
   232         exit(did_not_receive_second_message)
   233     end,
   234     lib_amqp:teardown(Connection, Channel).
   235 
   236 %% QOS is not yet implemented in RabbitMQ
   237 basic_qos_test(Connection) ->
   238     lib_amqp:close_connection(Connection).
   239 
   240 %% Reject is not yet implemented in RabbitMQ
   241 basic_reject_test(Connection) ->
   242     lib_amqp:close_connection(Connection).
   243 
   244 %%----------------------------------------------------------------------------
   245 %% Unit test for the direct client
   246 %% This just relies on the fact that a fresh Rabbit VM must consume more than
   247 %% 0.1 pc of the system memory:
   248 %% 0. Wait 1 minute to let memsup do stuff
   249 %% 1. Make sure that the high watermark is set high
   250 %% 2. Start a process to receive the pause and resume commands from the broker
   251 %% 3. Register this as flow control notification handler
   252 %% 4. Let the system settle for a little bit
   253 %% 5. Set the threshold to the lowest possible value
   254 %% 6. When the flow handler receives the pause command, it sets the watermark
   255 %%    to a high value in order to get the broker to send the resume command
   256 %% 7. Allow 10 secs to receive the pause and resume, otherwise timeout and
   257 %%    fail
   258 channel_flow_test(Connection) ->
   259     X = <<"amq.direct">>,
   260     K = Payload = <<"x">>,
   261     memsup:set_sysmem_high_watermark(0.99),
   262     timer:sleep(1000),
   263     Channel = lib_amqp:start_channel(Connection),
   264     Parent = self(),
   265     Child = spawn_link(
   266               fun() ->
   267                       receive
   268                           #'channel.flow'{active = false} -> ok
   269                       end,
   270                       blocked = lib_amqp:publish(Channel, X, K, Payload),
   271                       memsup:set_sysmem_high_watermark(0.99),
   272                       receive
   273                           #'channel.flow'{active = true} -> ok
   274                       end,
   275                       Parent ! ok
   276               end),
   277     amqp_channel:register_flow_handler(Channel, Child),
   278     timer:sleep(1000),
   279     memsup:set_sysmem_high_watermark(0.001),
   280     receive
   281         ok -> ok
   282     after 10000 ->
   283         io:format("Are you sure that you have waited 1 minute?~n"),
   284         exit(did_not_receive_channel_flow)
   285     end.
   286 
   287 %%----------------------------------------------------------------------------
   288 %% This is a test, albeit not a unit test, to see if the producer
   289 %% handles the effect of being throttled.
   290 
   291 channel_flow_sync(Connection) ->
   292     start_channel_flow(Connection, fun lib_amqp:publish/4).
   293 
   294 channel_flow_async(Connection) ->
   295     start_channel_flow(Connection, fun lib_amqp:async_publish/4).
   296 
   297 start_channel_flow(Connection, PublishFun) ->
   298     X = <<"amq.direct">>,
   299     Key = uuid(),
   300     Producer = spawn_link(
   301         fun() ->
   302             Channel = lib_amqp:start_channel(Connection),
   303             Parent = self(),
   304             FlowHandler = spawn_link(fun() -> cf_handler_loop(Parent) end),
   305             amqp_channel:register_flow_handler(Channel, FlowHandler),
   306             Payload = << <<B:8>> || B <- lists:seq(1, 10000) >>,
   307             cf_producer_loop(Channel, X, Key, PublishFun, Payload, 0)
   308         end),
   309     Consumer = spawn_link(
   310         fun() ->
   311             Channel = lib_amqp:start_channel(Connection),
   312             Q = lib_amqp:declare_queue(Channel),
   313             lib_amqp:bind_queue(Channel, X, Q, Key),
   314             Tag = lib_amqp:subscribe(Channel, Q, self()),
   315             cf_consumer_loop(Channel, Tag)
   316         end),
   317     {Producer, Consumer}.
   318 
   319 cf_consumer_loop(Channel, Tag) ->
   320     receive
   321         #'basic.consume_ok'{} -> cf_consumer_loop(Channel, Tag);
   322         #'basic.cancel_ok'{} -> ok;
   323         {#'basic.deliver'{delivery_tag = DeliveryTag}, _Content} ->
   324              lib_amqp:ack(Channel, DeliveryTag),
   325              cf_consumer_loop(Channel, Tag);
   326         stop ->
   327              lib_amqp:unsubscribe(Channel, Tag),
   328              ok
   329     end.
   330 
   331 cf_producer_loop(Channel, X, Key, PublishFun, Payload, N)
   332         when N rem 5000 =:= 0 ->
   333     io:format("Producer (~p) has sent about ~p messages since it started~n",
   334               [self(), N]),
   335     cf_producer_loop(Channel, X, Key, PublishFun, Payload, N + 1);
   336 
   337 cf_producer_loop(Channel, X, Key, PublishFun, Payload, N) ->
   338     case PublishFun(Channel, X, Key, Payload) of
   339         blocked ->
   340             io:format("Producer (~p) is blocked, will go to sleep.....ZZZ~n",
   341                       [self()]),
   342             receive
   343                 resume ->
   344                     io:format("Producer (~p) has woken up :-)~n", [self()]),
   345                     cf_producer_loop(Channel, X, Key,
   346                                      PublishFun, Payload, N + 1)
   347             end;
   348         _ ->
   349             cf_producer_loop(Channel, X, Key, PublishFun, Payload, N + 1)
   350     end.
   351 
   352 cf_handler_loop(Producer) ->
   353     receive
   354         #'channel.flow'{active = false} ->
   355             io:format("Producer throttling ON~n"),
   356             cf_handler_loop(Producer);
   357         #'channel.flow'{active = true} ->
   358             io:format("Producer throttling OFF, waking up producer (~p)~n",
   359                       [Producer]),
   360             Producer ! resume,
   361             cf_handler_loop(Producer);
   362         stop -> ok
   363     end.
   364 
   365 %%---------------------------------------------------------------------------
   366 %% This tests whether RPC over AMQP produces the same result as invoking the
   367 %% same argument against the same underlying gen_server instance.
   368 rpc_test(Connection) ->
   369     Q = uuid(),
   370     Fun = fun(X) -> X + 1 end,
   371     RPCHandler = fun(X) -> term_to_binary(Fun(binary_to_term(X))) end,
   372     Server = amqp_rpc_server:start(Connection, Q, RPCHandler),
   373     Client = amqp_rpc_client:start(Connection, Q),
   374     Input = 1,
   375     Reply = amqp_rpc_client:call(Client, term_to_binary(Input)),
   376     Expected = Fun(Input),
   377     DecodedReply = binary_to_term(Reply),
   378     ?assertMatch(Expected, DecodedReply),
   379     amqp_rpc_client:stop(Client),
   380     amqp_rpc_server:stop(Server),
   381     ok.
   382 
   383 %%---------------------------------------------------------------------------
   384 
   385 setup_publish(Channel) ->
   386     Publish = #publish{routing_key = <<"a.b.c.d">>,
   387                        q = <<"a.b.c">>,
   388                        x = <<"x">>,
   389                        bind_key = <<"a.b.c.*">>,
   390                        payload = <<"foobar">>
   391                        },
   392     setup_publish(Channel, Publish).
   393 
   394 setup_publish(Channel, #publish{routing_key = RoutingKey,
   395                                 q = Q, x = X,
   396                                 bind_key = BindKey,
   397                                 payload = Payload}) ->
   398     ok = setup_exchange(Channel, Q, X, BindKey),
   399     lib_amqp:publish(Channel, X, RoutingKey, Payload),
   400     {ok, Q}.
   401 
   402 teardown_test(Connection) ->
   403     Channel = lib_amqp:start_channel(Connection),
   404     ?assertMatch(true, is_process_alive(Channel)),
   405     ?assertMatch(true, is_process_alive(Connection)),
   406     lib_amqp:teardown(Connection, Channel),
   407     ?assertMatch(false, is_process_alive(Channel)),
   408     ?assertMatch(false, is_process_alive(Connection)).
   409 
   410 setup_exchange(Channel, Q, X, Binding) ->
   411     lib_amqp:declare_exchange(Channel, X, <<"topic">>),
   412     lib_amqp:declare_queue(Channel, Q),
   413     lib_amqp:bind_queue(Channel, X, Q, Binding),
   414     ok.
   415 
   416 latch_loop(0) ->
   417     ok;
   418 
   419 latch_loop(Latch) ->
   420     receive
   421         finished ->
   422             latch_loop(Latch - 1)
   423     after ?Latch * ?Wait ->
   424         exit(waited_too_long)
   425     end.
   426 
   427 uuid() ->
   428     {A, B, C} = now(),
   429     <<A:32, B:32, C:32>>.
   430