2 * ***** BEGIN LICENSE BLOCK *****
3 * Version: MPL 1.1/GPL 2.0
5 * The contents of this file are subject to the Mozilla Public License
6 * Version 1.1 (the "License"); you may not use this file except in
7 * compliance with the License. You may obtain a copy of the License at
8 * http://www.mozilla.org/MPL/
10 * Software distributed under the License is distributed on an "AS IS"
11 * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
12 * the License for the specific language governing rights and
13 * limitations under the License.
15 * The Original Code is librabbitmq.
17 * The Initial Developers of the Original Code are LShift Ltd, Cohesive
18 * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions
19 * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive
20 * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright
21 * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and
22 * Rabbit Technologies Ltd.
24 * Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
25 * Ltd. Portions created by Cohesive Financial Technologies LLC are
26 * Copyright (C) 2007-2009 Cohesive Financial Technologies
27 * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C)
28 * 2007-2009 Rabbit Technologies Ltd.
30 * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010
31 * LShift Ltd and Tony Garnock-Jones.
33 * All Rights Reserved.
35 * Contributor(s): ______________________________________.
37 * Alternatively, the contents of this file may be used under the terms
38 * of the GNU General Public License Version 2 or later (the "GPL"), in
39 * which case the provisions of the GPL are applicable instead of those
40 * above. If you wish to allow use of your version of this file only
41 * under the terms of the GPL, and not to allow others to use your
42 * version of this file under the terms of the MPL, indicate your
43 * decision by deleting the provisions above and replace them with the
44 * notice and other provisions required by the GPL. If you do not
45 * delete the provisions above, a recipient may use your version of
46 * this file under the terms of any one of the MPL or the GPL.
48 * ***** END LICENSE BLOCK *****
60 /* Convert a amqp_bytes_t to an escaped string form for printing. We
61 use the same escaping conventions as rabbitmqctl. */
62 static char *stringify_bytes(amqp_bytes_t bytes)
64 /* We will need up to 4 chars per byte, plus the terminating 0 */
65 char *res = malloc(bytes.len * 4 + 1);
66 uint8_t *data = bytes.bytes;
70 for (i = 0; i < bytes.len; i++) {
71 if (data[i] >= 32 && data[i] != 127) {
76 *p++ = '0' + (data[i] >> 6);
77 *p++ = '0' + (data[i] >> 3 & 0x7);
78 *p++ = '0' + (data[i] & 0x7);
86 static amqp_bytes_t setup_queue(amqp_connection_state_t conn,
87 char *queue, char *exchange,
88 char *exchange_type, char *routing_key)
90 amqp_bytes_t queue_bytes;
91 amqp_queue_declare_ok_t *res;
93 /* if an exchange name wasn't provided, check that we don't
94 have options that require it. */
98 opt = "--routing-key";
99 else if (exchange_type)
100 opt = "--exchange-type";
104 "%s option requires an exchange name to be "
105 "provided with --exchange\n", opt);
110 /* Declare the queue as auto-delete. If the queue already
111 exists, this won't have any effect. */
112 queue_bytes = cstring_bytes(queue);
113 res = amqp_queue_declare(conn, 1, queue_bytes, 0, 0, 0, 1,
116 die_rpc(amqp_get_rpc_reply(conn), "queue.declare");
119 /* the server should have provided a queue name */
121 queue_bytes = amqp_bytes_malloc_dup(res->queue);
122 sq = stringify_bytes(queue_bytes);
123 fprintf(stderr, "Server provided queue name: %s\n", sq);
127 /* Bind to an exchange if requested */
129 amqp_bytes_t eb = amqp_cstring_bytes(exchange);
132 /* we should create the exchange */
133 if (!amqp_exchange_declare(conn, 1, eb,
134 amqp_cstring_bytes(exchange_type),
135 0, 0, 1, AMQP_EMPTY_TABLE))
136 die_rpc(amqp_get_rpc_reply(conn), "exchange.declare");
139 if (!amqp_queue_bind(conn, 1, queue_bytes, eb,
140 cstring_bytes(routing_key),
142 die_rpc(amqp_get_rpc_reply(conn), "queue.bind");
148 static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue,
149 int no_ack, const char * const *argv)
151 if (!amqp_basic_consume(conn, 1, queue, AMQP_EMPTY_BYTES, 0, no_ack,
153 die_rpc(amqp_get_rpc_reply(conn), "basic.consume");
158 uint64_t delivery_tag;
159 int res = amqp_simple_wait_frame(conn, &frame);
161 die_errno(-res, "waiting for header frame");
163 if (frame.frame_type != AMQP_FRAME_METHOD
164 || frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
167 amqp_basic_deliver_t *deliver
168 = (amqp_basic_deliver_t *)frame.payload.method.decoded;
169 delivery_tag = deliver->delivery_tag;
172 copy_body(conn, pl.infd);
174 if (finish_pipeline(&pl) && !no_ack)
175 die_errno(-amqp_basic_ack(conn, 1, delivery_tag, 0),
178 amqp_maybe_release_buffers(conn);
182 int main(int argc, const char **argv)
186 amqp_connection_state_t conn;
187 const char * const *cmd_argv;
189 char *exchange = NULL;
190 char *exchange_type = NULL;
191 char *routing_key = NULL;
192 amqp_bytes_t queue_bytes;
194 struct poptOption options[] = {
195 INCLUDE_OPTIONS(connect_options),
196 {"queue", 'q', POPT_ARG_STRING, &queue, 0,
197 "the queue to consume from", "queue"},
198 {"exchange", 'e', POPT_ARG_STRING, &exchange, 0,
199 "bind the queue to this exchange", "exchange"},
200 {"exchange-type", 't', POPT_ARG_STRING, &exchange_type, 0,
201 "create auto-delete exchange of this type for binding",
203 {"routing-key", 'r', POPT_ARG_STRING, &routing_key, 0,
204 "the routing key to bind with", "routing key"},
205 {"no-ack", 'A', POPT_ARG_NONE, &no_ack, 0,
206 "consume in no-ack mode", NULL},
208 { NULL, 0, 0, NULL, 0 }
211 opts = process_options(argc, argv, options,
212 "[OPTIONS]... <command> <args>");
214 cmd_argv = poptGetArgs(opts);
215 if (!cmd_argv || !cmd_argv[0]) {
216 fprintf(stderr, "consuming command not specified\n");
217 poptPrintUsage(opts, stderr, 0);
221 conn = make_connection();
222 queue_bytes = setup_queue(conn, queue, exchange, exchange_type,
224 do_consume(conn, queue_bytes, no_ack, cmd_argv);
225 close_connection(conn);
229 poptFreeContext(opts);