tools/consume.c
author Tony Garnock-Jones <tonygarnockjones@gmail.com>
Mon May 24 15:21:30 2010 +1200 (24 months ago)
changeset 110 494ea5a0c807
parent 80 449f6a6f351c
child 86 bd809e834eef
child 111 606efb1eb230
permissions -rw-r--r--
Compute dependencies properly
dpw@80
     1
/*
dpw@80
     2
 * ***** BEGIN LICENSE BLOCK *****
dpw@80
     3
 * Version: MPL 1.1/GPL 2.0
dpw@80
     4
 *
dpw@80
     5
 * The contents of this file are subject to the Mozilla Public License
dpw@80
     6
 * Version 1.1 (the "License"); you may not use this file except in
dpw@80
     7
 * compliance with the License. You may obtain a copy of the License at
dpw@80
     8
 * http://www.mozilla.org/MPL/
dpw@80
     9
 *
dpw@80
    10
 * Software distributed under the License is distributed on an "AS IS"
dpw@80
    11
 * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
dpw@80
    12
 * the License for the specific language governing rights and
dpw@80
    13
 * limitations under the License.
dpw@80
    14
 *
dpw@80
    15
 * The Original Code is librabbitmq.
dpw@80
    16
 *
dpw@80
    17
 * The Initial Developers of the Original Code are LShift Ltd, Cohesive
dpw@80
    18
 * Financial Technologies LLC, and Rabbit Technologies Ltd.  Portions
dpw@80
    19
 * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive
dpw@80
    20
 * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright
dpw@80
    21
 * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and
dpw@80
    22
 * Rabbit Technologies Ltd.
dpw@80
    23
 *
dpw@80
    24
 * Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
dpw@80
    25
 * Ltd. Portions created by Cohesive Financial Technologies LLC are
dpw@80
    26
 * Copyright (C) 2007-2009 Cohesive Financial Technologies
dpw@80
    27
 * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C)
dpw@80
    28
 * 2007-2009 Rabbit Technologies Ltd.
dpw@80
    29
 *
dpw@80
    30
 * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010
dpw@80
    31
 * LShift Ltd and Tony Garnock-Jones.
dpw@80
    32
 *
dpw@80
    33
 * All Rights Reserved.
dpw@80
    34
 *
dpw@80
    35
 * Contributor(s): ______________________________________.
dpw@80
    36
 *
dpw@80
    37
 * Alternatively, the contents of this file may be used under the terms
dpw@80
    38
 * of the GNU General Public License Version 2 or later (the "GPL"), in
dpw@80
    39
 * which case the provisions of the GPL are applicable instead of those
dpw@80
    40
 * above. If you wish to allow use of your version of this file only
dpw@80
    41
 * under the terms of the GPL, and not to allow others to use your
dpw@80
    42
 * version of this file under the terms of the MPL, indicate your
dpw@80
    43
 * decision by deleting the provisions above and replace them with the
dpw@80
    44
 * notice and other provisions required by the GPL. If you do not
dpw@80
    45
 * delete the provisions above, a recipient may use your version of
dpw@80
    46
 * this file under the terms of any one of the MPL or the GPL.
dpw@80
    47
 *
dpw@80
    48
 * ***** END LICENSE BLOCK *****
dpw@80
    49
 */
dpw@80
    50
dpw@77
    51
#include "config.h"
dpw@77
    52
dpw@77
    53
#include <stdio.h>
dpw@77
    54
dpw@77
    55
#include <popt.h>
dpw@77
    56
dpw@77
    57
#include "common.h"
dpw@77
    58
#include "common_consume.h"
dpw@77
    59
dpw@77
    60
static void do_consume(amqp_connection_state_t conn, int no_ack,
dpw@77
    61
		       const char * const *argv)
dpw@77
    62
{
dpw@77
    63
	if (!amqp_basic_consume(conn, 1, setup_queue(conn),
dpw@77
    64
			       AMQP_EMPTY_BYTES, 0, no_ack, 0))
dpw@77
    65
		die_rpc(amqp_get_rpc_reply(conn), "basic.consume");
dpw@77
    66
dpw@77
    67
	for (;;) {
dpw@77
    68
		amqp_frame_t frame;
dpw@77
    69
		struct pipeline pl;
dpw@77
    70
		uint64_t delivery_tag;
dpw@77
    71
		int res = amqp_simple_wait_frame(conn, &frame);
dpw@77
    72
		if (res < 0)
dpw@77
    73
			die_errno(-res, "waiting for header frame");
dpw@77
    74
dpw@77
    75
		if (frame.frame_type != AMQP_FRAME_METHOD
dpw@77
    76
		    || frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
dpw@77
    77
			continue;
dpw@77
    78
dpw@77
    79
		amqp_basic_deliver_t *deliver
dpw@77
    80
			= (amqp_basic_deliver_t *)frame.payload.method.decoded;
dpw@77
    81
		delivery_tag = deliver->delivery_tag;
dpw@77
    82
		
dpw@77
    83
		pipeline(argv, &pl);
dpw@77
    84
		copy_body(conn, pl.infd);
dpw@77
    85
dpw@77
    86
		if (finish_pipeline(&pl) && !no_ack)
dpw@77
    87
			die_errno(-amqp_basic_ack(conn, 1, delivery_tag, 0),
dpw@77
    88
				  "basic.ack");
dpw@77
    89
dpw@77
    90
		amqp_maybe_release_buffers(conn);
dpw@77
    91
	}
dpw@77
    92
}
dpw@77
    93
dpw@77
    94
int main(int argc, const char **argv)
dpw@77
    95
{
dpw@77
    96
	poptContext opts;
dpw@77
    97
	int no_ack;
dpw@77
    98
	amqp_connection_state_t conn;
dpw@77
    99
	const char * const *cmd_argv;
dpw@77
   100
	
dpw@77
   101
	struct poptOption options[] = {
dpw@77
   102
		INCLUDE_OPTIONS(connect_options),
dpw@77
   103
		INCLUDE_OPTIONS(consume_queue_options),
dpw@77
   104
		{"no-ack", 'A', POPT_ARG_NONE, &no_ack, 0,
dpw@77
   105
		 "consume in no-ack mode", NULL},
dpw@77
   106
		POPT_AUTOHELP
dpw@77
   107
		{ NULL, 0, 0, NULL, 0 }
dpw@77
   108
	};
dpw@77
   109
	
dpw@77
   110
	opts = process_options(argc, argv, options,
dpw@77
   111
			       "[OPTIONS]... <command> <args>");
dpw@77
   112
	
dpw@77
   113
	cmd_argv = poptGetArgs(opts);
dpw@82
   114
	if (!cmd_argv || !cmd_argv[0]) {
dpw@82
   115
		fprintf(stderr, "consuming command not specified\n");
dpw@82
   116
		poptPrintUsage(opts, stderr, 0);
dpw@77
   117
		goto error;
dpw@77
   118
	}
dpw@77
   119
dpw@77
   120
	conn = make_connection();
dpw@77
   121
	do_consume(conn, no_ack, cmd_argv);
dpw@77
   122
	close_connection(conn);
dpw@77
   123
	return 0;
dpw@77
   124
dpw@77
   125
error:
dpw@77
   126
	poptFreeContext(opts);
dpw@77
   127
	return 1;
dpw@77
   128
}