1 %% The contents of this file are subject to the Mozilla Public License
2 %% Version 1.1 (the "License"); you may not use this file except in
3 %% compliance with the License. You may obtain a copy of the License at
4 %% http://www.mozilla.org/MPL/
6 %% Software distributed under the License is distributed on an "AS IS"
7 %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8 %% License for the specific language governing rights and limitations
11 %% The Original Code is the RabbitMQ Erlang Client.
13 %% The Initial Developers of the Original Code are LShift Ltd.,
14 %% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
16 %% Portions created by LShift Ltd., Cohesive Financial
17 %% Technologies LLC., and Rabbit Technologies Ltd. are Copyright (C)
18 %% 2007 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
21 %% All Rights Reserved.
23 %% Contributor(s): Ben Hood <0x6e6562@gmail.com>.
28 -include_lib("rabbitmq_server/include/rabbit.hrl").
29 -include_lib("rabbitmq_server/include/rabbit_framing.hrl").
30 -include_lib("eunit/include/eunit.hrl").
31 -include("amqp_client.hrl").
33 -compile([export_all]).
35 -record(publish, {q, x, routing_key, bind_key, payload,
36 mandatory = false, immediate = false}).
38 %% The latch constant defines how many processes are spawned in order
39 %% to run certain functionality in parallel. It follows the standard
40 %% countdown latch pattern.
43 %% The wait constant defines how long a consumer waits before it
49 %% This is an example of how the client interaction should work
51 %% Connection = amqp_connection:start(User, Password, Host),
52 %% Channel = amqp_connection:open_channel(Connection),
53 %% %%...do something useful
54 %% ChannelClose = #'channel.close'{ %% set the appropriate fields },
55 %% amqp_channel:call(Channel, ChannelClose),
56 %% ConnectionClose = #'connection.close'{ %% set the appropriate fields },
57 %% amqp_connection:close(Connection, ConnectionClose).
60 lifecycle_test(Connection) ->
62 Channel = lib_amqp:start_channel(Connection),
63 lib_amqp:declare_exchange(Channel, X, <<"topic">>),
67 queue_exchange_binding(Channel, X, Parent, Tag) end)
68 || Tag <- lists:seq(1, ?Latch)],
70 lib_amqp:delete_exchange(Channel, X),
71 lib_amqp:teardown(Connection, Channel),
74 queue_exchange_binding(Channel, X, Parent, Tag) ->
77 after (?Latch - Tag rem 7) * 10 ->
80 Q = <<"a.b.c", Tag:32>>,
81 Binding = <<"a.b.c.*">>,
82 Q1 = lib_amqp:declare_queue(Channel, Q),
84 lib_amqp:bind_queue(Channel, X, Q, Binding),
85 lib_amqp:delete_queue(Channel, Q),
88 channel_lifecycle_test(Connection) ->
89 Channel = lib_amqp:start_channel(Connection),
90 lib_amqp:close_channel(Channel),
91 Channel2 = lib_amqp:start_channel(Connection),
92 lib_amqp:teardown(Connection, Channel2),
95 %% This is designed to exercize the internal queuing mechanism
96 %% to ensure that commands are properly serialized
97 command_serialization_test(Connection) ->
98 Channel = lib_amqp:start_channel(Connection),
102 Q1 = lib_amqp:declare_queue(Channel, Q),
105 end) || _ <- lists:seq(1, ?Latch)],
107 lib_amqp:teardown(Connection, Channel).
109 queue_unbind_test(Connection) ->
110 X = <<"eggs">>, Q = <<"foobar">>, Key = <<"quay">>,
111 Payload = <<"foobar">>,
112 Channel = lib_amqp:start_channel(Connection),
113 lib_amqp:declare_exchange(Channel, X),
114 lib_amqp:declare_queue(Channel, Q),
115 lib_amqp:bind_queue(Channel, X, Q, Key),
116 lib_amqp:publish(Channel, X, Key, Payload),
117 get_and_assert_equals(Channel, Q, Payload),
118 lib_amqp:unbind_queue(Channel, X, Q, Key),
119 lib_amqp:publish(Channel, X, Key, Payload),
120 get_and_assert_empty(Channel, Q),
121 lib_amqp:teardown(Connection, Channel).
123 get_and_assert_empty(Channel, Q) ->
124 BasicGetEmpty = lib_amqp:get(Channel, Q, false),
125 ?assertMatch('basic.get_empty', BasicGetEmpty).
127 get_and_assert_equals(Channel, Q, Payload) ->
128 Content = lib_amqp:get(Channel, Q),
129 #content{payload_fragments_rev = PayloadFragments} = Content,
130 ?assertMatch([Payload], PayloadFragments).
132 basic_get_test(Connection) ->
133 Channel = lib_amqp:start_channel(Connection),
134 {ok, Q} = setup_publish(Channel),
135 %% TODO: This could be refactored to use get_and_assert_equals,
136 %% get_and_assert_empty .... would require another bug though :-)
137 Content = lib_amqp:get(Channel, Q),
138 #content{payload_fragments_rev = PayloadFragments} = Content,
139 ?assertMatch([<<"foobar">>], PayloadFragments),
140 BasicGetEmpty = lib_amqp:get(Channel, Q, false),
141 ?assertMatch('basic.get_empty', BasicGetEmpty),
142 lib_amqp:teardown(Connection, Channel).
144 basic_return_test(Connection) ->
148 Payload = <<"qwerty">>,
149 Channel = lib_amqp:start_channel(Connection),
150 amqp_channel:register_return_handler(Channel, self()),
151 lib_amqp:declare_exchange(Channel, X),
152 lib_amqp:declare_queue(Channel, Q),
153 lib_amqp:publish(Channel, X, Key, Payload, true),
156 {BasicReturn = #'basic.return'{}, Content} ->
157 #'basic.return'{reply_text = ReplyText,
158 exchange = X} = BasicReturn,
159 ?assertMatch(<<"unroutable">>, ReplyText),
160 #content{payload_fragments_rev = Payload2} = Content,
161 ?assertMatch([Payload], Payload2);
163 %% TODO investigate where this comes from
164 io:format("Spurious message ~p~n", [WhatsThis])
166 exit(no_return_received)
168 lib_amqp:teardown(Connection, Channel).
170 basic_ack_test(Connection) ->
171 Channel = lib_amqp:start_channel(Connection),
172 {ok, Q} = setup_publish(Channel),
173 {DeliveryTag, _} = lib_amqp:get(Channel, Q, false),
174 lib_amqp:ack(Channel, DeliveryTag),
175 lib_amqp:teardown(Connection, Channel).
177 basic_consume_test(Connection) ->
178 Channel = lib_amqp:start_channel(Connection),
180 lib_amqp:declare_exchange(Channel, X),
185 consume_loop(Channel, X, RoutingKey, Parent, <<Tag:32>>) end)
186 || Tag <- lists:seq(1, ?Latch)],
187 timer:sleep(?Latch * 20),
188 lib_amqp:publish(Channel, X, RoutingKey, <<"foobar">>),
190 lib_amqp:teardown(Connection, Channel).
192 consume_loop(Channel, X, RoutingKey, Parent, Tag) ->
193 Q = lib_amqp:declare_queue(Channel),
194 lib_amqp:bind_queue(Channel, X, Q, RoutingKey),
195 lib_amqp:subscribe(Channel, Q, self(), Tag),
197 #'basic.consume_ok'{consumer_tag = Tag} -> ok
200 {#'basic.deliver'{}, _} -> ok
202 lib_amqp:unsubscribe(Channel, Tag),
204 #'basic.cancel_ok'{consumer_tag = Tag} -> ok
208 basic_recover_test(Connection) ->
210 Channel = lib_amqp:start_channel(Connection),
211 lib_amqp:declare_queue(Channel, Q),
212 Tag = lib_amqp:subscribe(Channel, Q, self(), false),
214 #'basic.consume_ok'{consumer_tag = Tag} -> ok
216 exit(did_not_receive_subscription_message)
218 lib_amqp:publish(Channel, <<>>, Q, <<"foobar">>),
220 {#'basic.deliver'{}, _} ->
221 %% no_ack set to false, but don't send ack
224 exit(did_not_receive_first_message)
226 BasicRecover = #'basic.recover'{requeue = true},
227 amqp_channel:cast(Channel, BasicRecover),
229 {#'basic.deliver'{delivery_tag = DeliveryTag2}, _} ->
230 lib_amqp:ack(Channel, DeliveryTag2)
232 exit(did_not_receive_second_message)
234 lib_amqp:teardown(Connection, Channel).
236 %% QOS is not yet implemented in RabbitMQ
237 basic_qos_test(Connection) ->
238 lib_amqp:close_connection(Connection).
240 %% Reject is not yet implemented in RabbitMQ
241 basic_reject_test(Connection) ->
242 lib_amqp:close_connection(Connection).
244 %%----------------------------------------------------------------------------
245 %% Unit test for the direct client
246 %% This just relies on the fact that a fresh Rabbit VM must consume more than
247 %% 0.1 pc of the system memory:
248 %% 0. Wait 1 minute to let memsup do stuff
249 %% 1. Make sure that the high watermark is set high
250 %% 2. Start a process to receive the pause and resume commands from the broker
251 %% 3. Register this as flow control notification handler
252 %% 4. Let the system settle for a little bit
253 %% 5. Set the threshold to the lowest possible value
254 %% 6. When the flow handler receives the pause command, it sets the watermark
255 %% to a high value in order to get the broker to send the resume command
256 %% 7. Allow 10 secs to receive the pause and resume, otherwise timeout and
258 channel_flow_test(Connection) ->
259 X = <<"amq.direct">>,
260 K = Payload = <<"x">>,
261 memsup:set_sysmem_high_watermark(0.99),
263 Channel = lib_amqp:start_channel(Connection),
268 #'channel.flow'{active = false} -> ok
270 blocked = lib_amqp:publish(Channel, X, K, Payload),
271 memsup:set_sysmem_high_watermark(0.99),
273 #'channel.flow'{active = true} -> ok
277 amqp_channel:register_flow_handler(Channel, Child),
279 memsup:set_sysmem_high_watermark(0.001),
283 io:format("Are you sure that you have waited 1 minute?~n"),
284 exit(did_not_receive_channel_flow)
287 %%----------------------------------------------------------------------------
288 %% This is a test, albeit not a unit test, to see if the producer
289 %% handles the effect of being throttled.
291 channel_flow_sync(Connection) ->
292 start_channel_flow(Connection, fun lib_amqp:publish/4).
294 channel_flow_async(Connection) ->
295 start_channel_flow(Connection, fun lib_amqp:async_publish/4).
297 start_channel_flow(Connection, PublishFun) ->
298 X = <<"amq.direct">>,
300 Producer = spawn_link(
302 Channel = lib_amqp:start_channel(Connection),
304 FlowHandler = spawn_link(fun() -> cf_handler_loop(Parent) end),
305 amqp_channel:register_flow_handler(Channel, FlowHandler),
306 Payload = << <<B:8>> || B <- lists:seq(1, 10000) >>,
307 cf_producer_loop(Channel, X, Key, PublishFun, Payload, 0)
309 Consumer = spawn_link(
311 Channel = lib_amqp:start_channel(Connection),
312 Q = lib_amqp:declare_queue(Channel),
313 lib_amqp:bind_queue(Channel, X, Q, Key),
314 Tag = lib_amqp:subscribe(Channel, Q, self()),
315 cf_consumer_loop(Channel, Tag)
317 {Producer, Consumer}.
319 cf_consumer_loop(Channel, Tag) ->
321 #'basic.consume_ok'{} -> cf_consumer_loop(Channel, Tag);
322 #'basic.cancel_ok'{} -> ok;
323 {#'basic.deliver'{delivery_tag = DeliveryTag}, _Content} ->
324 lib_amqp:ack(Channel, DeliveryTag),
325 cf_consumer_loop(Channel, Tag);
327 lib_amqp:unsubscribe(Channel, Tag),
331 cf_producer_loop(Channel, X, Key, PublishFun, Payload, N)
332 when N rem 5000 =:= 0 ->
333 io:format("Producer (~p) has sent about ~p messages since it started~n",
335 cf_producer_loop(Channel, X, Key, PublishFun, Payload, N + 1);
337 cf_producer_loop(Channel, X, Key, PublishFun, Payload, N) ->
338 case PublishFun(Channel, X, Key, Payload) of
340 io:format("Producer (~p) is blocked, will go to sleep.....ZZZ~n",
344 io:format("Producer (~p) has woken up :-)~n", [self()]),
345 cf_producer_loop(Channel, X, Key,
346 PublishFun, Payload, N + 1)
349 cf_producer_loop(Channel, X, Key, PublishFun, Payload, N + 1)
352 cf_handler_loop(Producer) ->
354 #'channel.flow'{active = false} ->
355 io:format("Producer throttling ON~n"),
356 cf_handler_loop(Producer);
357 #'channel.flow'{active = true} ->
358 io:format("Producer throttling OFF, waking up producer (~p)~n",
361 cf_handler_loop(Producer);
365 %%---------------------------------------------------------------------------
366 %% This tests whether RPC over AMQP produces the same result as invoking the
367 %% same argument against the same underlying gen_server instance.
368 rpc_test(Connection) ->
370 Fun = fun(X) -> X + 1 end,
371 RPCHandler = fun(X) -> term_to_binary(Fun(binary_to_term(X))) end,
372 Server = amqp_rpc_server:start(Connection, Q, RPCHandler),
373 Client = amqp_rpc_client:start(Connection, Q),
375 Reply = amqp_rpc_client:call(Client, term_to_binary(Input)),
376 Expected = Fun(Input),
377 DecodedReply = binary_to_term(Reply),
378 ?assertMatch(Expected, DecodedReply),
379 amqp_rpc_client:stop(Client),
380 amqp_rpc_server:stop(Server),
383 %%---------------------------------------------------------------------------
385 setup_publish(Channel) ->
386 Publish = #publish{routing_key = <<"a.b.c.d">>,
389 bind_key = <<"a.b.c.*">>,
390 payload = <<"foobar">>
392 setup_publish(Channel, Publish).
394 setup_publish(Channel, #publish{routing_key = RoutingKey,
397 payload = Payload}) ->
398 ok = setup_exchange(Channel, Q, X, BindKey),
399 lib_amqp:publish(Channel, X, RoutingKey, Payload),
402 teardown_test(Connection) ->
403 Channel = lib_amqp:start_channel(Connection),
404 ?assertMatch(true, is_process_alive(Channel)),
405 ?assertMatch(true, is_process_alive(Connection)),
406 lib_amqp:teardown(Connection, Channel),
407 ?assertMatch(false, is_process_alive(Channel)),
408 ?assertMatch(false, is_process_alive(Connection)).
410 setup_exchange(Channel, Q, X, Binding) ->
411 lib_amqp:declare_exchange(Channel, X, <<"topic">>),
412 lib_amqp:declare_queue(Channel, Q),
413 lib_amqp:bind_queue(Channel, X, Q, Binding),
422 latch_loop(Latch - 1)
423 after ?Latch * ?Wait ->
424 exit(waited_too_long)
429 <<A:32, B:32, C:32>>.