Introduce a message limit flag, and make it so that if there are no consumers and a named queue we declare the queue anyway. No more qos1/tx2 trick!
1.1 --- a/test/src/com/rabbitmq/examples/MulticastMain.java Tue Feb 07 15:36:52 2012 +0000
1.2 +++ b/test/src/com/rabbitmq/examples/MulticastMain.java Wed Feb 08 13:40:10 2012 +0000
1.3 @@ -59,6 +59,7 @@
1.4 int prefetchCount = intArg(cmd, 'q', 0);
1.5 int minMsgSize = intArg(cmd, 's', 0);
1.6 int timeLimit = intArg(cmd, 'z', 0);
1.7 + int msgLimit = intArg(cmd, 'C', 0);
1.8 List<?> flags = lstArg(cmd, 'f');
1.9 int frameMax = intArg(cmd, 'M', 0);
1.10 int heartbeat = intArg(cmd, 'b', 0);
1.11 @@ -91,6 +92,7 @@
1.12 p.setExclusive( exclusive);
1.13 p.setFlags( flags);
1.14 p.setMinMsgSize( minMsgSize);
1.15 + p.setMsgLimit( msgLimit);
1.16 p.setPrefetchCount( prefetchCount);
1.17 p.setProducerCount( producerCount);
1.18 p.setProducerTxSize( producerTxSize);
1.19 @@ -136,6 +138,7 @@
1.20 options.addOption(new Option("q", "qos", true, "qos prefetch count"));
1.21 options.addOption(new Option("s", "size", true, "message size"));
1.22 options.addOption(new Option("z", "time", true, "time limit"));
1.23 + options.addOption(new Option("C", "messages", true, "message limit"));
1.24 Option flag = new Option("f", "flag", true, "message flag");
1.25 flag.setArgs(Option.UNLIMITED_VALUES);
1.26 options.addOption(flag);
2.1 --- a/test/src/com/rabbitmq/examples/perf/Consumer.java Tue Feb 07 15:36:52 2012 +0000
2.2 +++ b/test/src/com/rabbitmq/examples/perf/Consumer.java Wed Feb 08 13:40:10 2012 +0000
2.3 @@ -35,11 +35,12 @@
2.4 private int txSize;
2.5 private boolean autoAck;
2.6 private Stats stats;
2.7 + private int msgLimit;
2.8 private long timeLimit;
2.9
2.10 public Consumer(Channel channel, String id,
2.11 String queueName, int txSize, boolean autoAck,
2.12 - Stats stats, int timeLimit) {
2.13 + Stats stats, int msgLimit, int timeLimit) {
2.14
2.15 this.channel = channel;
2.16 this.id = id;
2.17 @@ -47,6 +48,7 @@
2.18 this.txSize = txSize;
2.19 this.autoAck = autoAck;
2.20 this.stats = stats;
2.21 + this.msgLimit = msgLimit;
2.22 this.timeLimit = 1000L * timeLimit;
2.23 }
2.24
2.25 @@ -60,7 +62,8 @@
2.26 q = new QueueingConsumer(channel);
2.27 channel.basicConsume(queueName, autoAck, q);
2.28
2.29 - while (timeLimit == 0 || now < startTime + timeLimit) {
2.30 + while ((timeLimit == 0 || now < startTime + timeLimit) &&
2.31 + totalMsgCount < msgLimit) {
2.32 QueueingConsumer.Delivery delivery;
2.33 try {
2.34 if (timeLimit == 0) {
3.1 --- a/test/src/com/rabbitmq/examples/perf/MulticastParams.java Tue Feb 07 15:36:52 2012 +0000
3.2 +++ b/test/src/com/rabbitmq/examples/perf/MulticastParams.java Wed Feb 08 13:40:10 2012 +0000
3.3 @@ -30,6 +30,7 @@
3.4
3.5 protected int timeLimit = 10;
3.6 protected int rateLimit = 0;
3.7 + protected int msgLimit = 0;
3.8
3.9 protected String exchangeName = "direct";
3.10 protected String exchangeType = "direct";
3.11 @@ -93,6 +94,10 @@
3.12 this.timeLimit = timeLimit;
3.13 }
3.14
3.15 + public void setMsgLimit(int msgLimit) {
3.16 + this.msgLimit = msgLimit;
3.17 + }
3.18 +
3.19 public void setFlags(List<?> flags) {
3.20 this.flags = flags;
3.21 }
4.1 --- a/test/src/com/rabbitmq/examples/perf/MulticastSet.java Tue Feb 07 15:36:52 2012 +0000
4.2 +++ b/test/src/com/rabbitmq/examples/perf/MulticastSet.java Wed Feb 08 13:40:10 2012 +0000
4.3 @@ -63,10 +63,21 @@
4.4 Thread t =
4.5 new Thread(new Consumer(channel, id, qName,
4.6 p.consumerTxSize, p.autoAck,
4.7 - stats, p.timeLimit));
4.8 + stats, p.msgLimit, p.timeLimit));
4.9 consumerThreads[i] = t;
4.10 }
4.11
4.12 + if (p.consumerCount == 0 && !p.queueName.equals("")) {
4.13 + Connection conn = factory.newConnection();
4.14 + Channel channel = conn.createChannel();
4.15 + channel.queueDeclare(p.queueName,
4.16 + p.flags.contains("persistent"),
4.17 + p.exclusive, p.autoDelete,
4.18 + null).getQueue();
4.19 + channel.queueBind(p.queueName, p.exchangeName, id);
4.20 + conn.close();
4.21 + }
4.22 +
4.23 Thread[] producerThreads = new Thread[p.producerCount];
4.24 Connection[] producerConnections = new Connection[p.producerCount];
4.25 Channel[] producerChannels = new Channel[p.producerCount];
4.26 @@ -83,7 +94,8 @@
4.27 channel.exchangeDeclare(p.exchangeName, p.exchangeType);
4.28 final Producer producer = new Producer(channel, p.exchangeName, id,
4.29 p.flags, p.producerTxSize,
4.30 - p.rateLimit, p.minMsgSize, p.timeLimit,
4.31 + p.rateLimit, p.msgLimit,
4.32 + p.minMsgSize, p.timeLimit,
4.33 p.confirm, stats);
4.34 channel.addReturnListener(producer);
4.35 channel.addConfirmListener(producer);
5.1 --- a/test/src/com/rabbitmq/examples/perf/Producer.java Tue Feb 07 15:36:52 2012 +0000
5.2 +++ b/test/src/com/rabbitmq/examples/perf/Producer.java Wed Feb 08 13:40:10 2012 +0000
5.3 @@ -42,6 +42,7 @@
5.4 private boolean persistent;
5.5 private int txSize;
5.6 private int rateLimit;
5.7 + private int msgLimit;
5.8 private long timeLimit;
5.9
5.10 private Stats stats;
5.11 @@ -58,7 +59,7 @@
5.12
5.13 public Producer(Channel channel, String exchangeName, String id,
5.14 List<?> flags, int txSize,
5.15 - int rateLimit, int minMsgSize, int timeLimit,
5.16 + int rateLimit, int msgLimit, int minMsgSize, int timeLimit,
5.17 long confirm, Stats stats)
5.18 throws IOException {
5.19
5.20 @@ -70,6 +71,7 @@
5.21 this.persistent = flags.contains("persistent");
5.22 this.txSize = txSize;
5.23 this.rateLimit = rateLimit;
5.24 + this.msgLimit = msgLimit;
5.25 this.timeLimit = 1000L * timeLimit;
5.26 this.message = new byte[minMsgSize];
5.27 if (confirm > 0) {
5.28 @@ -128,7 +130,8 @@
5.29
5.30 try {
5.31
5.32 - while (timeLimit == 0 || now < startTime + timeLimit) {
5.33 + while ((timeLimit == 0 || now < startTime + timeLimit) &&
5.34 + msgCount < msgLimit) {
5.35 if (confirmPool != null) {
5.36 confirmPool.acquire();
5.37 }