src/rabbit_stomp.erl
author Tony Garnock-Jones <tonyg@lshift.net>
Wed Apr 30 16:48:11 2008 +0100 (2008-04-30)
changeset 26 b37faa511709
parent 25 36fad000db32
child 28 392d8cc8449c
child 37 a1099f0d77e8
permissions -rw-r--r--
Avoid losing messages when the socket closes abruptly by calling
rabbit_channel:shutdown/1, which nicely processes all the pending work
before notifying us of channel closure.
tonyg@4
     1
%%   The contents of this file are subject to the Mozilla Public License
tonyg@4
     2
%%   Version 1.1 (the "License"); you may not use this file except in
tonyg@4
     3
%%   compliance with the License. You may obtain a copy of the License at
tonyg@4
     4
%%   http://www.mozilla.org/MPL/
tonyg@4
     5
%%
tonyg@4
     6
%%   Software distributed under the License is distributed on an "AS IS"
tonyg@4
     7
%%   basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
tonyg@4
     8
%%   License for the specific language governing rights and limitations
tonyg@4
     9
%%   under the License.
tonyg@4
    10
%%
tonyg@4
    11
%%   The Original Code is RabbitMQ.
tonyg@4
    12
%%
tonyg@4
    13
%%   The Initial Developers of the Original Code are LShift Ltd.,
tonyg@4
    14
%%   Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
tonyg@4
    15
%%
tonyg@4
    16
%%   Portions created by LShift Ltd., Cohesive Financial
tonyg@4
    17
%%   Technologies LLC., and Rabbit Technologies Ltd. are Copyright (C) 
tonyg@4
    18
%%   2007 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit 
tonyg@4
    19
%%   Technologies Ltd.; 
tonyg@4
    20
%%
tonyg@4
    21
%%   All Rights Reserved.
tonyg@4
    22
%%
tonyg@4
    23
%%   Contributor(s): ______________________________________.
tonyg@4
    24
%%
tonyg@4
    25
tonyg@4
    26
%% rabbit_stomp implements STOMP messaging semantics, as per protocol
tonyg@4
    27
%% "version 1.0", at http://stomp.codehaus.org/Protocol
tonyg@4
    28
tonyg@1
    29
-module(rabbit_stomp).
tonyg@1
    30
tonyg@1
    31
-export([kickstart/0,
tonyg@1
    32
	 start/1,
tonyg@1
    33
	 listener_started/2, listener_stopped/2, start_client/1,
tonyg@1
    34
	 start_link/0, init/1, mainloop/1]).
tonyg@1
    35
tonyg@18
    36
-include("rabbit.hrl").
tonyg@18
    37
-include("rabbit_framing.hrl").
tonyg@1
    38
-include("stomp_frame.hrl").
tonyg@1
    39
tonyg@3
    40
-record(state, {socket, session_id, channel, parse_state, ticket}).
tonyg@1
    41
tonyg@1
    42
kickstart() ->
tonyg@1
    43
    {ok, StompListeners} = application:get_env(stomp_listeners),
tonyg@1
    44
    ok = start(StompListeners).
tonyg@1
    45
tonyg@1
    46
start(Listeners) ->
tonyg@1
    47
    {ok,_} = supervisor:start_child(
tonyg@1
    48
               rabbit_sup,
tonyg@1
    49
               {rabbit_stomp_client_sup,
tonyg@1
    50
                {tcp_client_sup, start_link,
tonyg@1
    51
                 [{local, rabbit_stomp_client_sup},
tonyg@1
    52
                  {rabbit_stomp,start_link,[]}]},
tonyg@1
    53
                transient, infinity, supervisor, [tcp_client_sup]}),
tonyg@1
    54
    ok = start_listeners(Listeners),
tonyg@1
    55
    ok.
tonyg@1
    56
tonyg@1
    57
start_listeners([]) ->
tonyg@1
    58
    ok;
tonyg@1
    59
start_listeners([{Host, Port} | More]) ->
tonyg@25
    60
    {IPAddress, Name} = rabbit_networking:check_tcp_listener_address(rabbit_stomp_listener_sup,
tonyg@25
    61
								     Host,
tonyg@25
    62
								     Port),
tonyg@1
    63
    {ok,_} = supervisor:start_child(
tonyg@1
    64
               rabbit_sup,
tonyg@1
    65
               {Name,
tonyg@1
    66
                {tcp_listener_sup, start_link,
tonyg@1
    67
		 [IPAddress, Port,
tonyg@1
    68
		  [{packet, raw},
tonyg@1
    69
		   {reuseaddr, true}],
tonyg@1
    70
		  {?MODULE, listener_started, []},
tonyg@1
    71
		  {?MODULE, listener_stopped, []},
tonyg@1
    72
		  {?MODULE, start_client, []}]},
tonyg@1
    73
                transient, infinity, supervisor, [tcp_listener_sup]}),
tonyg@1
    74
    start_listeners(More).
tonyg@1
    75
tonyg@1
    76
listener_started(_IPAddress, _Port) ->
tonyg@1
    77
    ok.
tonyg@1
    78
tonyg@1
    79
listener_stopped(_IPAddress, _Port) ->
tonyg@1
    80
    ok.
tonyg@1
    81
tonyg@1
    82
start_client(Sock) ->
tonyg@1
    83
    {ok, Child} = supervisor:start_child(rabbit_stomp_client_sup, []),
tonyg@1
    84
    ok = gen_tcp:controlling_process(Sock, Child),
tonyg@1
    85
    Child ! {go, Sock},
tonyg@1
    86
    Child.
tonyg@1
    87
tonyg@1
    88
start_link() ->
tonyg@1
    89
    {ok, proc_lib:spawn_link(?MODULE, init, [self()])}.
tonyg@1
    90
tonyg@1
    91
init(_Parent) ->
tonyg@1
    92
    receive
tonyg@1
    93
        {go, Sock} ->
tonyg@1
    94
	    ok = inet:setopts(Sock, [{active, true}]),
tonyg@1
    95
	    process_flag(trap_exit, true),
tonyg@1
    96
	    ?MODULE:mainloop(#state{socket = Sock,
tonyg@1
    97
				    channel = none,
tonyg@1
    98
				    parse_state = stomp_frame:initial_state()})
tonyg@1
    99
    end.
tonyg@1
   100
tonyg@1
   101
mainloop(State) ->
tonyg@1
   102
    receive
tonyg@3
   103
	E = {'EXIT', _Pid, _Reason} ->
tonyg@3
   104
	    handle_exit(E, State);
tonyg@1
   105
	{tcp, _Sock, Bytes} ->
tonyg@1
   106
	    process_received_bytes(Bytes, State);
tonyg@1
   107
	{tcp_closed, _Sock} ->
tonyg@26
   108
	    case State#state.channel of
tonyg@26
   109
		none ->
tonyg@26
   110
		    done;
tonyg@26
   111
		ChPid ->
tonyg@26
   112
		    rabbit_channel:shutdown(ChPid),
tonyg@26
   113
		    ?MODULE:mainloop(State)
tonyg@26
   114
	    end;
tonyg@1
   115
	{send_command, Command} ->
tonyg@1
   116
	    ?MODULE:mainloop(send_reply(Command, State));
tonyg@1
   117
	{send_command_and_notify, QPid, TxPid, Method, Content} ->
tonyg@1
   118
	    State1 = send_reply(Method, Content, State),
tonyg@1
   119
	    rabbit_amqqueue:notify_sent(QPid, TxPid),
tonyg@1
   120
	    ?MODULE:mainloop(State1);
tonyg@1
   121
	{send_command_and_shutdown, Command} ->
tonyg@1
   122
	    send_reply(Command, State),
tonyg@1
   123
	    done;
tonyg@1
   124
	shutdown ->
tonyg@1
   125
	    done;
tonyg@1
   126
	Data ->
tonyg@11
   127
	    error_logger:error_msg("Internal error: unknown STOMP Data: ~p~n", [Data]),
tonyg@1
   128
	    ?MODULE:mainloop(State)
tonyg@1
   129
    end.
tonyg@1
   130
tonyg@3
   131
simple_method_sync_rpc(Method, State0) ->
tonyg@3
   132
    State = send_method(Method, State0),
tonyg@3
   133
    receive
tonyg@3
   134
	E = {'EXIT', _Pid, _Reason} ->
tonyg@3
   135
	    handle_exit(E, State);
tonyg@3
   136
	{send_command, Reply} ->
tonyg@3
   137
	    {ok, Reply, State}
tonyg@3
   138
    end.
tonyg@3
   139
tonyg@3
   140
handle_exit({'EXIT', _Pid, {amqp, Code, Method}}, State) ->
tonyg@3
   141
    explain_amqp_death(Code, Method, State),
tonyg@3
   142
    done;
tonyg@3
   143
handle_exit({'EXIT', Pid, Reason}, State) ->
tonyg@3
   144
    send_error("Error", "Process ~p exited with reason:~n~p~n", [Pid, Reason], State),
tonyg@3
   145
    done.
tonyg@3
   146
tonyg@1
   147
process_received_bytes([], State) ->
tonyg@1
   148
    ?MODULE:mainloop(State);
tonyg@1
   149
process_received_bytes(Bytes, State = #state{parse_state = ParseState}) ->
tonyg@1
   150
    case stomp_frame:parse(Bytes, ParseState) of
tonyg@1
   151
	{more, ParseState1} ->
tonyg@1
   152
	    ?MODULE:mainloop(State#state{parse_state = ParseState1});
tonyg@1
   153
	{ok, Frame = #stomp_frame{command = Command}, Rest} ->
tonyg@11
   154
	    %% io:format("Frame: ~p~n", [Frame]),
tonyg@1
   155
	    case catch process_frame(Command, Frame,
tonyg@1
   156
				     State#state{parse_state = stomp_frame:initial_state()}) of
tonyg@1
   157
		{'EXIT', {amqp, Code, Method}} ->
tonyg@1
   158
		    explain_amqp_death(Code, Method, State),
tonyg@1
   159
		    done;
tonyg@1
   160
		{'EXIT', Reason} ->
tonyg@1
   161
		    send_error("Processing error", "~p~n", [Reason], State),
tonyg@1
   162
		    done;
tonyg@1
   163
		{ok, NewState} ->
tonyg@1
   164
		    process_received_bytes(Rest, NewState);
tonyg@1
   165
		stop ->
tonyg@1
   166
		    done
tonyg@1
   167
	    end;
tonyg@1
   168
	{error, Reason} ->
tonyg@1
   169
	    send_error("Invalid frame", "Could not parse frame: ~p~n", [Reason], State),
tonyg@1
   170
	    done
tonyg@1
   171
    end.
tonyg@1
   172
tonyg@1
   173
explain_amqp_death(Code, Method, State) ->
tonyg@1
   174
    send_error(atom_to_list(Code), "Method was ~p~n", [Method], State).
tonyg@1
   175
tonyg@1
   176
send_reply(#'channel.close_ok'{}, State) ->
tonyg@1
   177
    State;
tonyg@1
   178
send_reply(Command, State) ->
tonyg@11
   179
    error_logger:error_msg("STOMP Reply command unhandled: ~p~n", [Command]),
tonyg@1
   180
    State.
tonyg@1
   181
tonyg@1
   182
send_reply(#'basic.deliver'{consumer_tag = ConsumerTag,
tonyg@1
   183
			    delivery_tag = DeliveryTag,
tonyg@1
   184
			    exchange = Exchange,
tonyg@1
   185
			    routing_key = RoutingKey},
tonyg@1
   186
	   #content{properties = #'P_basic'{headers = Headers},
tonyg@1
   187
		    payload_fragments_rev = BodyFragmentsRev},
tonyg@1
   188
	   State = #state{session_id = SessionId}) ->
tonyg@1
   189
    send_frame("MESSAGE",
tonyg@1
   190
	       [{"destination", binary_to_list(RoutingKey)},
tonyg@1
   191
		{"exchange", binary_to_list(Exchange)},
tonyg@1
   192
		{"message-id", SessionId ++ "_" ++ integer_to_list(DeliveryTag)}]
tonyg@1
   193
	       ++ case ConsumerTag of
tonyg@1
   194
		      <<"Q_", _/binary>> ->
tonyg@1
   195
			  [];
tonyg@1
   196
		      <<"T_", Id/binary>> ->
tonyg@1
   197
			  [{"subscription", binary_to_list(Id)}]
tonyg@1
   198
		  end
tonyg@1
   199
	       ++ adhoc_convert_headers(case Headers of
tonyg@1
   200
					    undefined -> [];
tonyg@1
   201
					    _ -> Headers
tonyg@1
   202
					end),
tonyg@1
   203
	       lists:concat(lists:reverse(lists:map(fun erlang:binary_to_list/1,
tonyg@1
   204
						    BodyFragmentsRev))),
tonyg@1
   205
	       State);
tonyg@1
   206
send_reply(Command, Content, State) ->
tonyg@11
   207
    error_logger:error_msg("STOMP Reply command unhandled: ~p~n~p~n", [Command, Content]),
tonyg@1
   208
    State.
tonyg@1
   209
tonyg@1
   210
adhoc_convert_headers([]) ->
tonyg@1
   211
    [];
tonyg@1
   212
adhoc_convert_headers([{K, longstr, V} | Rest]) ->
tonyg@9
   213
    [{"X-" ++ binary_to_list(K), binary_to_list(V)} | adhoc_convert_headers(Rest)];
tonyg@1
   214
adhoc_convert_headers([{K, signedint, V} | Rest]) ->
tonyg@9
   215
    [{"X-" ++ binary_to_list(K), integer_to_list(V)} | adhoc_convert_headers(Rest)];
tonyg@1
   216
adhoc_convert_headers([_ | Rest]) ->
tonyg@1
   217
    adhoc_convert_headers(Rest).
tonyg@1
   218
tonyg@1
   219
send_frame(Frame, State = #state{socket = Sock}) ->
tonyg@11
   220
    %% io:format("Sending ~p~n", [Frame]),
tonyg@1
   221
    ok = gen_tcp:send(Sock, stomp_frame:serialize(Frame)),
tonyg@1
   222
    State.
tonyg@1
   223
tonyg@1
   224
send_frame(Command, Headers, Body, State) ->
tonyg@1
   225
    send_frame(#stomp_frame{command = Command,
tonyg@1
   226
			    headers = Headers,
tonyg@1
   227
			    body = Body},
tonyg@1
   228
	       State).
tonyg@1
   229
tonyg@1
   230
send_error(Message, Detail, State) ->
tonyg@11
   231
    error_logger:error_msg("STOMP error frame sent:~nMessage: ~p~nDetail: ~p~n",
tonyg@11
   232
			   [Message, Detail]),
tonyg@1
   233
    send_frame("ERROR", [{"message", Message}], Detail, State).
tonyg@1
   234
tonyg@1
   235
send_error(Message, Format, Args, State) ->
tonyg@1
   236
    send_error(Message, lists:flatten(io_lib:format(Format, Args)), State).
tonyg@1
   237
tonyg@1
   238
process_frame("CONNECT", Frame, State = #state{channel = none}) ->
tonyg@1
   239
    {ok, DefaultVHost} = application:get_env(default_vhost),
tonyg@1
   240
    do_login(stomp_frame:header(Frame, "login"),
tonyg@1
   241
	     stomp_frame:header(Frame, "passcode"),
tonyg@1
   242
	     stomp_frame:header(Frame, "virtual-host", binary_to_list(DefaultVHost)),
tonyg@3
   243
	     stomp_frame:header(Frame, "realm", "/data"),
tonyg@1
   244
	     State);
tonyg@2
   245
process_frame("DISCONNECT", _Frame, _State = #state{channel = none}) ->
tonyg@2
   246
    stop;
tonyg@1
   247
process_frame(_Command, _Frame, State = #state{channel = none}) ->
tonyg@1
   248
    {ok, send_error("Illegal command",
tonyg@1
   249
		    "You must log in using CONNECT first\n",
tonyg@1
   250
		    State)};
tonyg@1
   251
process_frame(Command, Frame, State) ->
tonyg@1
   252
    case process_command(Command, Frame, State) of
tonyg@1
   253
	{ok, State1} ->
tonyg@1
   254
	    {ok, case stomp_frame:header(Frame, "receipt") of
tonyg@1
   255
		     {ok, Id} ->
tonyg@1
   256
			 send_frame("RECEIPT", [{"receipt-id", Id}], "", State1);
tonyg@1
   257
		     not_found ->
tonyg@1
   258
			 State1
tonyg@1
   259
		 end};
tonyg@1
   260
	stop ->
tonyg@1
   261
	    stop
tonyg@1
   262
    end.
tonyg@1
   263
tonyg@1
   264
send_method(Method, State = #state{channel = ChPid}) ->
tonyg@8
   265
    ok = rabbit_channel:do(ChPid, Method),
tonyg@1
   266
    State.
tonyg@1
   267
tonyg@1
   268
send_method(Method, Properties, Body, State = #state{channel = ChPid}) ->
tonyg@8
   269
    ok = rabbit_channel:do(ChPid,
tonyg@8
   270
			   Method,
tonyg@8
   271
			   #content{class_id = 60, %% basic
tonyg@8
   272
				    properties = Properties,
tonyg@8
   273
				    properties_bin = none,
tonyg@8
   274
				    payload_fragments_rev = [list_to_binary(Body)]}),
tonyg@1
   275
    State.
tonyg@1
   276
tonyg@3
   277
do_login({ok, Login}, {ok, Passcode}, VirtualHost, Realm, State) ->
tonyg@25
   278
    U = rabbit_access_control:user_pass_login(list_to_binary(Login),
tonyg@25
   279
					      list_to_binary(Passcode)),
tonyg@1
   280
    ok = rabbit_access_control:check_vhost_access(U, list_to_binary(VirtualHost)),
tonyg@1
   281
    ChPid = 
tonyg@6
   282
	rabbit_channel:start_link(self(), self(), U#user.username, list_to_binary(VirtualHost)),
tonyg@3
   283
    {ok, #'channel.open_ok'{}, State1} =
tonyg@3
   284
	simple_method_sync_rpc(#'channel.open'{out_of_band = <<"">>},
tonyg@3
   285
			       State#state{channel = ChPid}),
tonyg@23
   286
    SessionId = rabbit_misc:string_guid("session"),
tonyg@3
   287
    {ok, #'access.request_ok'{ticket = Ticket}, State2} =
tonyg@3
   288
	simple_method_sync_rpc(#'access.request'{realm = list_to_binary(Realm),
tonyg@3
   289
						 exclusive = false,
tonyg@3
   290
						 passive = true,
tonyg@3
   291
						 active = true,
tonyg@3
   292
						 write = true,
tonyg@3
   293
						 read = true},
tonyg@3
   294
			       send_frame("CONNECTED",
tonyg@3
   295
					  [{"session", SessionId}],
tonyg@3
   296
					  "",
tonyg@3
   297
					  State1#state{session_id = SessionId})),
tonyg@3
   298
    {ok, State2#state{ticket = Ticket}};
tonyg@3
   299
do_login(_, _, _, _, State) ->
tonyg@1
   300
    {ok, send_error("Bad CONNECT", "Missing login or passcode header(s)\n", State)}.
tonyg@1
   301
tonyg@9
   302
user_header_key("X-" ++ UserKey) -> UserKey;
tonyg@9
   303
user_header_key(_) -> false.
tonyg@1
   304
tonyg@1
   305
make_string_table(_KeyFilter, []) -> [];
tonyg@1
   306
make_string_table(KeyFilter, [{K, V} | Rest]) ->
tonyg@1
   307
    case KeyFilter(K) of
tonyg@9
   308
	false ->
tonyg@1
   309
	    make_string_table(KeyFilter, Rest);
tonyg@9
   310
	NewK ->
tonyg@9
   311
	    [{list_to_binary(NewK), longstr, list_to_binary(V)}
tonyg@9
   312
	     | make_string_table(KeyFilter, Rest)]
tonyg@1
   313
    end.
tonyg@1
   314
tonyg@10
   315
transactional(Frame) ->
tonyg@10
   316
    case stomp_frame:header(Frame, "transaction") of
tonyg@10
   317
	{ok, Transaction} ->
tonyg@10
   318
	    {yes, Transaction};
tonyg@10
   319
	not_found ->
tonyg@10
   320
	    no
tonyg@10
   321
    end.
tonyg@10
   322
tonyg@10
   323
transactional_action(Frame, Name, Fun, State) ->
tonyg@10
   324
    case transactional(Frame) of
tonyg@10
   325
	{yes, Transaction} ->
tonyg@10
   326
	    Fun(Transaction, State);
tonyg@10
   327
	no ->
tonyg@10
   328
	    {ok, send_error("Missing transaction",
tonyg@10
   329
			    Name ++ " must include a 'transaction' header\n",
tonyg@10
   330
			    State)}
tonyg@10
   331
    end.
tonyg@10
   332
tonyg@10
   333
with_transaction(Transaction, State, Fun) ->
tonyg@10
   334
    case get({transaction, Transaction}) of
tonyg@10
   335
	undefined ->
tonyg@10
   336
	    {ok, send_error("Bad transaction",
tonyg@10
   337
			    "Invalid transaction identifier: ~p~n", [Transaction],
tonyg@10
   338
			    State)};
tonyg@10
   339
	Actions ->
tonyg@10
   340
	    Fun(Actions, State)
tonyg@10
   341
    end.
tonyg@10
   342
tonyg@10
   343
begin_transaction(Transaction, State) ->
tonyg@10
   344
    put({transaction, Transaction}, []),
tonyg@10
   345
    {ok, State}.
tonyg@10
   346
tonyg@10
   347
extend_transaction(Transaction, Action, State0) ->
tonyg@10
   348
    with_transaction(Transaction, State0,
tonyg@10
   349
		     fun (Actions, State) ->
tonyg@10
   350
			     put({transaction, Transaction}, [Action | Actions]),
tonyg@10
   351
			     {ok, State}
tonyg@10
   352
		     end).
tonyg@10
   353
tonyg@10
   354
commit_transaction(Transaction, State0) ->
tonyg@10
   355
    with_transaction(Transaction, State0,
tonyg@10
   356
		     fun (Actions, State) ->
tonyg@10
   357
			     FinalState = lists:foldr(fun perform_transaction_action/2,
tonyg@10
   358
						      State,
tonyg@10
   359
						      Actions),
tonyg@10
   360
			     erase({transaction, Transaction}),
tonyg@10
   361
			     {ok, FinalState}
tonyg@10
   362
		     end).
tonyg@10
   363
tonyg@10
   364
abort_transaction(Transaction, State0) ->
tonyg@10
   365
    with_transaction(Transaction, State0,
tonyg@10
   366
		     fun (_Actions, State) ->
tonyg@10
   367
			     erase({transaction, Transaction}),
tonyg@10
   368
			     {ok, State}
tonyg@10
   369
		     end).
tonyg@10
   370
tonyg@10
   371
perform_transaction_action({Method}, State) ->
tonyg@10
   372
    send_method(Method, State);
tonyg@10
   373
perform_transaction_action({Method, Props, Body}, State) ->
tonyg@10
   374
    send_method(Method, Props, Body, State).
tonyg@10
   375
tonyg@10
   376
process_command("BEGIN", Frame, State) ->
tonyg@10
   377
    transactional_action(Frame, "BEGIN", fun begin_transaction/2, State);
tonyg@3
   378
process_command("SEND",
tonyg@3
   379
		Frame = #stomp_frame{headers = Headers, body = Body},
tonyg@3
   380
		State = #state{ticket = Ticket}) ->
tonyg@1
   381
    case stomp_frame:header(Frame, "destination") of
tonyg@1
   382
	{ok, RoutingKeyStr} ->
tonyg@1
   383
	    ExchangeStr = stomp_frame:header(Frame, "exchange", ""),
tonyg@1
   384
	    Props = #'P_basic'{
tonyg@1
   385
	      content_type = stomp_frame:binary_header(Frame, "content-type", <<"text/plain">>),
tonyg@9
   386
	      headers = make_string_table(fun user_header_key/1, Headers),
tonyg@1
   387
	      delivery_mode = stomp_frame:integer_header(Frame, "delivery-mode", undefined),
tonyg@1
   388
	      priority = stomp_frame:integer_header(Frame, "priority", undefined),
tonyg@1
   389
	      correlation_id = stomp_frame:binary_header(Frame, "correlation-id", undefined),
tonyg@1
   390
	      reply_to = stomp_frame:binary_header(Frame, "reply-to", undefined),
tonyg@1
   391
	      message_id = stomp_frame:binary_header(Frame, "message-id", undefined)
tonyg@1
   392
	     },
tonyg@10
   393
	    Method = #'basic.publish'{ticket = Ticket,
tonyg@10
   394
				      exchange = list_to_binary(ExchangeStr),
tonyg@10
   395
				      routing_key = list_to_binary(RoutingKeyStr),
tonyg@10
   396
				      mandatory = false,
tonyg@10
   397
				      immediate = false},
tonyg@10
   398
	    case transactional(Frame) of
tonyg@10
   399
		{yes, Transaction} ->
tonyg@10
   400
		    extend_transaction(Transaction, {Method, Props, Body}, State);
tonyg@10
   401
		no ->
tonyg@10
   402
		    {ok, send_method(Method, Props, Body, State)}
tonyg@10
   403
	    end;
tonyg@1
   404
	not_found ->
tonyg@1
   405
	    {ok, send_error("Missing destination",
tonyg@1
   406
			    "SEND must include a 'destination', and optional 'exchange' header\n",
tonyg@1
   407
			    State)}
tonyg@1
   408
    end;
tonyg@1
   409
process_command("ACK", Frame, State = #state{session_id = SessionId}) ->
tonyg@1
   410
    case stomp_frame:header(Frame, "message-id") of
tonyg@1
   411
	{ok, IdStr} ->
tonyg@1
   412
	    IdPrefix = SessionId ++ "_",
tonyg@1
   413
	    case string:substr(IdStr, 1, length(IdPrefix)) of
tonyg@1
   414
		IdPrefix ->
tonyg@1
   415
		    DeliveryTag = list_to_integer(string:substr(IdStr, length(IdPrefix) + 1)),
tonyg@10
   416
		    Method = #'basic.ack'{delivery_tag = DeliveryTag,
tonyg@10
   417
					  multiple = false},
tonyg@10
   418
		    case transactional(Frame) of
tonyg@10
   419
			{yes, Transaction} ->
tonyg@10
   420
			    extend_transaction(Transaction, {Method}, State);
tonyg@10
   421
			no ->
tonyg@10
   422
			    {ok, send_method(Method, State)}
tonyg@10
   423
		    end;
tonyg@1
   424
		_ ->
tonyg@1
   425
		    rabbit_misc:die(command_invalid, 'basic.ack')
tonyg@1
   426
	    end;
tonyg@1
   427
	not_found ->
tonyg@1
   428
	    {ok, send_error("Missing message-id",
tonyg@1
   429
			    "ACK must include a 'message-id' header\n",
tonyg@1
   430
			    State)}
tonyg@1
   431
    end;
tonyg@10
   432
process_command("COMMIT", Frame, State) ->
tonyg@10
   433
    transactional_action(Frame, "COMMIT", fun commit_transaction/2, State);
tonyg@10
   434
process_command("ABORT", Frame, State) ->
tonyg@10
   435
    transactional_action(Frame, "ABORT", fun abort_transaction/2, State);
tonyg@3
   436
process_command("SUBSCRIBE",
tonyg@3
   437
		Frame = #stomp_frame{headers = Headers},
tonyg@3
   438
		State = #state{ticket = Ticket}) ->
tonyg@1
   439
    AckMode = case stomp_frame:header(Frame, "ack", "auto") of
tonyg@1
   440
		  "auto" -> auto;
tonyg@1
   441
		  "client" -> client
tonyg@1
   442
	      end,
tonyg@1
   443
    case stomp_frame:header(Frame, "destination") of
tonyg@1
   444
	{ok, QueueStr} ->
tonyg@1
   445
	    ConsumerTag = case stomp_frame:header(Frame, "id") of
tonyg@1
   446
			      {ok, Str} ->
tonyg@1
   447
				  list_to_binary("T_" ++ Str);
tonyg@1
   448
			      not_found ->
tonyg@1
   449
				  list_to_binary("Q_" ++ QueueStr)
tonyg@1
   450
			  end,
tonyg@1
   451
	    Queue = list_to_binary(QueueStr),
tonyg@3
   452
	    {ok, send_method(#'basic.consume'{ticket = Ticket,
tonyg@1
   453
					      queue = Queue,
tonyg@1
   454
					      consumer_tag = ConsumerTag,
tonyg@1
   455
					      no_local = false,
tonyg@1
   456
					      no_ack = (AckMode == auto),
tonyg@1
   457
					      exclusive = false,
tonyg@1
   458
					      nowait = true},
tonyg@3
   459
			     send_method(#'queue.declare'{ticket = Ticket,
tonyg@1
   460
							  queue = Queue,
tonyg@1
   461
							  passive = false,
tonyg@1
   462
							  durable = false,
tonyg@1
   463
							  exclusive = falxse,
tonyg@1
   464
							  auto_delete = true,
tonyg@1
   465
							  nowait = true,
tonyg@1
   466
							  arguments =
tonyg@9
   467
							    make_string_table(fun user_header_key/1,
tonyg@1
   468
									      Headers)},
tonyg@1
   469
					 State))};
tonyg@1
   470
	not_found ->
tonyg@1
   471
	    {ok, send_error("Missing destination",
tonyg@1
   472
			    "SUBSCRIBE must include a 'destination' header\n",
tonyg@1
   473
			    State)}
tonyg@1
   474
    end;
tonyg@1
   475
process_command("UNSUBSCRIBE", Frame, State) ->
tonyg@1
   476
    ConsumerTag = case stomp_frame:header(Frame, "id") of
tonyg@1
   477
		      {ok, IdStr} ->
tonyg@1
   478
			  list_to_binary("T_" ++ IdStr);
tonyg@1
   479
		      not_found ->
tonyg@1
   480
			  case stomp_frame:header(Frame, "destination") of
tonyg@1
   481
			      {ok, QueueStr} ->
tonyg@1
   482
				  list_to_binary("Q_" ++ QueueStr);
tonyg@1
   483
			      not_found ->
tonyg@1
   484
				  missing
tonyg@1
   485
			  end
tonyg@1
   486
		  end,
tonyg@1
   487
    if
tonyg@1
   488
	ConsumerTag == missing ->
tonyg@1
   489
	    {ok, send_error("Missing destination or id",
tonyg@1
   490
			    "UNSUBSCRIBE must include a 'destination' or 'id' header\n",
tonyg@1
   491
			    State)};
tonyg@1
   492
	true ->
tonyg@1
   493
	    {ok, send_method(#'basic.cancel'{consumer_tag = ConsumerTag,
tonyg@1
   494
					     nowait = true},
tonyg@1
   495
			    State)}
tonyg@1
   496
    end;
tonyg@1
   497
process_command("DISCONNECT", _Frame, State) ->
tonyg@1
   498
    {ok, send_method(#'channel.close'{reply_code = 200, reply_text = <<"">>,
tonyg@1
   499
				      class_id = 0, method_id = 0}, State)};
tonyg@1
   500
process_command(Command, _Frame, State) ->
tonyg@1
   501
    {ok, send_error("Bad command",
tonyg@1
   502
		    "Could not interpret command " ++ Command ++ "\n",
tonyg@1
   503
		    State)}.