ok, limits on the cache, and on prefetch.
I decided the right thing to do is to prefer older messages in the cache to younger ones. This is because they're more likely to be used sooner. Which means we just fill it up and then leave it alone, which is nice and simple.
Things are pretty much ok with it now, but the whole notion of prefetch is still wrong and needs to be changed to be driven by the mixed queue, not the disk_queue. For one reason, currently, if two or more queues issue prefetch requests, and the first fills the cache up, then the 2nd won't do anything. The cache is useful, but shouldn't be abused for prefetching purposes. The two things are separate.
1 %% The contents of this file are subject to the Mozilla Public License
2 %% Version 1.1 (the "License"); you may not use this file except in
3 %% compliance with the License. You may obtain a copy of the License at
4 %% http://www.mozilla.org/MPL/
6 %% Software distributed under the License is distributed on an "AS IS"
7 %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8 %% License for the specific language governing rights and limitations
11 %% The Original Code is RabbitMQ.
13 %% The Initial Developers of the Original Code are LShift Ltd,
14 %% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
16 %% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
17 %% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
18 %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
19 %% Technologies LLC, and Rabbit Technologies Ltd.
21 %% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
22 %% Ltd. Portions created by Cohesive Financial Technologies LLC are
23 %% Copyright (C) 2007-2009 Cohesive Financial Technologies
24 %% LLC. Portions created by Rabbit Technologies Ltd are Copyright
25 %% (C) 2007-2009 Rabbit Technologies Ltd.
27 %% All Rights Reserved.
29 %% Contributor(s): ______________________________________.
32 -module(rabbit_access_control).
33 -include_lib("stdlib/include/qlc.hrl").
34 -include("rabbit.hrl").
36 -export([check_login/2, user_pass_login/2,
37 check_vhost_access/2, check_resource_access/3]).
38 -export([add_user/2, delete_user/1, change_password/2, list_users/0,
40 -export([add_vhost/1, delete_vhost/1, list_vhosts/0]).
41 -export([set_permissions/5, clear_permissions/2,
42 list_vhost_permissions/1, list_user_permissions/1]).
44 %%----------------------------------------------------------------------------
48 -type(permission_atom() :: 'configure' | 'read' | 'write').
50 -spec(check_login/2 :: (binary(), binary()) -> user()).
51 -spec(user_pass_login/2 :: (username(), password()) -> user()).
52 -spec(check_vhost_access/2 :: (user(), vhost()) -> 'ok').
53 -spec(check_resource_access/3 ::
54 (username(), r(atom()), permission_atom()) -> 'ok').
55 -spec(add_user/2 :: (username(), password()) -> 'ok').
56 -spec(delete_user/1 :: (username()) -> 'ok').
57 -spec(change_password/2 :: (username(), password()) -> 'ok').
58 -spec(list_users/0 :: () -> [username()]).
59 -spec(lookup_user/1 :: (username()) -> {'ok', user()} | not_found()).
60 -spec(add_vhost/1 :: (vhost()) -> 'ok').
61 -spec(delete_vhost/1 :: (vhost()) -> 'ok').
62 -spec(list_vhosts/0 :: () -> [vhost()]).
63 -spec(set_permissions/5 ::
64 (username(), vhost(), regexp(), regexp(), regexp()) -> 'ok').
65 -spec(clear_permissions/2 :: (username(), vhost()) -> 'ok').
66 -spec(list_vhost_permissions/1 ::
67 (vhost()) -> [{username(), regexp(), regexp(), regexp()}]).
68 -spec(list_user_permissions/1 ::
69 (username()) -> [{vhost(), regexp(), regexp(), regexp()}]).
73 %%----------------------------------------------------------------------------
75 %% SASL PLAIN, as used by the Qpid Java client and our clients. Also,
76 %% apparently, by OpenAMQ.
77 check_login(<<"PLAIN">>, Response) ->
78 [User, Pass] = [list_to_binary(T) ||
79 T <- string:tokens(binary_to_list(Response), [0])],
80 user_pass_login(User, Pass);
81 %% AMQPLAIN, as used by Qpid Python test suite. The 0-8 spec actually
82 %% defines this as PLAIN, but in 0-9 that definition is gone, instead
83 %% referring generically to "SASL security mechanism", i.e. the above.
84 check_login(<<"AMQPLAIN">>, Response) ->
85 LoginTable = rabbit_binary_parser:parse_table(Response),
86 case {lists:keysearch(<<"LOGIN">>, 1, LoginTable),
87 lists:keysearch(<<"PASSWORD">>, 1, LoginTable)} of
88 {{value, {_, longstr, User}},
89 {value, {_, longstr, Pass}}} ->
90 user_pass_login(User, Pass);
92 %% Is this an information leak?
93 rabbit_misc:protocol_error(
95 "AMQPPLAIN auth info ~w is missing LOGIN or PASSWORD field",
99 check_login(Mechanism, _Response) ->
100 rabbit_misc:protocol_error(
101 access_refused, "unsupported authentication mechanism '~s'",
104 user_pass_login(User, Pass) ->
105 ?LOGDEBUG("Login with user ~p pass ~p~n", [User, Pass]),
106 case lookup_user(User) of
109 Pass == U#user.password -> U;
111 rabbit_misc:protocol_error(
112 access_refused, "login refused for user '~s'", [User])
114 {error, not_found} ->
115 rabbit_misc:protocol_error(
116 access_refused, "login refused for user '~s'", [User])
119 internal_lookup_vhost_access(Username, VHostPath) ->
120 %% TODO: use dirty ops instead
121 rabbit_misc:execute_mnesia_transaction(
123 case mnesia:read({rabbit_user_permission,
124 #user_vhost{username = Username,
125 virtual_host = VHostPath}}) of
131 check_vhost_access(#user{username = Username}, VHostPath) ->
132 ?LOGDEBUG("Checking VHost access for ~p to ~p~n", [Username, VHostPath]),
133 case internal_lookup_vhost_access(Username, VHostPath) of
137 rabbit_misc:protocol_error(
138 access_refused, "access to vhost '~s' refused for user '~s'",
139 [VHostPath, Username])
142 permission_index(configure) -> #permission.configure;
143 permission_index(write) -> #permission.write;
144 permission_index(read) -> #permission.read.
146 check_resource_access(Username,
147 R = #resource{kind = exchange, name = <<"">>},
149 check_resource_access(Username,
150 R#resource{name = <<"amq.default">>},
152 check_resource_access(_Username,
153 #resource{name = <<"amq.gen",_/binary>>},
156 check_resource_access(Username,
157 R = #resource{virtual_host = VHostPath, name = Name},
159 Res = case mnesia:dirty_read({rabbit_user_permission,
160 #user_vhost{username = Username,
161 virtual_host = VHostPath}}) of
164 [#user_permission{permission = P}] ->
166 binary_to_list(Name),
167 binary_to_list(element(permission_index(Permission), P))) of
168 {match, _, _} -> true;
173 true -> rabbit_misc:protocol_error(
174 access_refused, "access to ~s refused for user '~s'",
175 [rabbit_misc:rs(R), Username])
178 add_user(Username, Password) ->
179 R = rabbit_misc:execute_mnesia_transaction(
181 case mnesia:wread({rabbit_user, Username}) of
183 ok = mnesia:write(rabbit_user,
184 #user{username = Username,
185 password = Password},
188 mnesia:abort({user_already_exists, Username})
191 rabbit_log:info("Created user ~p~n", [Username]),
194 delete_user(Username) ->
195 R = rabbit_misc:execute_mnesia_transaction(
196 rabbit_misc:with_user(
199 ok = mnesia:delete({rabbit_user, Username}),
200 [ok = mnesia:delete_object(
201 rabbit_user_permission, R, write) ||
202 R <- mnesia:match_object(
203 rabbit_user_permission,
204 #user_permission{user_vhost = #user_vhost{
211 rabbit_log:info("Deleted user ~p~n", [Username]),
214 change_password(Username, Password) ->
215 R = rabbit_misc:execute_mnesia_transaction(
216 rabbit_misc:with_user(
219 ok = mnesia:write(rabbit_user,
220 #user{username = Username,
221 password = Password},
224 rabbit_log:info("Changed password for user ~p~n", [Username]),
228 mnesia:dirty_all_keys(rabbit_user).
230 lookup_user(Username) ->
231 rabbit_misc:dirty_read({rabbit_user, Username}).
233 add_vhost(VHostPath) ->
234 R = rabbit_misc:execute_mnesia_transaction(
236 case mnesia:wread({rabbit_vhost, VHostPath}) of
238 ok = mnesia:write(rabbit_vhost,
239 #vhost{virtual_host = VHostPath},
241 [rabbit_exchange:declare(
242 rabbit_misc:r(VHostPath, exchange, Name),
243 Type, true, false, []) ||
246 {<<"amq.direct">>, direct},
247 {<<"amq.topic">>, topic},
248 {<<"amq.match">>, headers}, %% per 0-9-1 pdf
249 {<<"amq.headers">>, headers}, %% per 0-9-1 xml
250 {<<"amq.fanout">>, fanout}]],
253 mnesia:abort({vhost_already_exists, VHostPath})
256 rabbit_log:info("Added vhost ~p~n", [VHostPath]),
259 delete_vhost(VHostPath) ->
260 %%FIXME: We are forced to delete the queues outside the TX below
261 %%because queue deletion involves sending messages to the queue
262 %%process, which in turn results in further mnesia actions and
263 %%eventually the termination of that process.
264 lists:foreach(fun (Q) ->
265 {ok,_} = rabbit_amqqueue:delete(Q, false, false)
267 rabbit_amqqueue:list(VHostPath)),
268 R = rabbit_misc:execute_mnesia_transaction(
269 rabbit_misc:with_vhost(
272 ok = internal_delete_vhost(VHostPath)
274 rabbit_log:info("Deleted vhost ~p~n", [VHostPath]),
277 internal_delete_vhost(VHostPath) ->
278 lists:foreach(fun (#exchange{name=Name}) ->
279 ok = rabbit_exchange:delete(Name, false)
281 rabbit_exchange:list(VHostPath)),
282 lists:foreach(fun ({Username, _, _, _}) ->
283 ok = clear_permissions(Username, VHostPath)
285 list_vhost_permissions(VHostPath)),
286 ok = mnesia:delete({rabbit_vhost, VHostPath}),
290 mnesia:dirty_all_keys(rabbit_vhost).
292 validate_regexp(RegexpBin) ->
293 Regexp = binary_to_list(RegexpBin),
294 case regexp:parse(Regexp) of
296 {error, Reason} -> throw({error, {invalid_regexp, Regexp, Reason}})
299 set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) ->
300 lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]),
301 rabbit_misc:execute_mnesia_transaction(
302 rabbit_misc:with_user_and_vhost(
304 fun () -> ok = mnesia:write(
305 rabbit_user_permission,
306 #user_permission{user_vhost = #user_vhost{
308 virtual_host = VHostPath},
309 permission = #permission{
310 configure = ConfigurePerm,
316 clear_permissions(Username, VHostPath) ->
317 rabbit_misc:execute_mnesia_transaction(
318 rabbit_misc:with_user_and_vhost(
321 ok = mnesia:delete({rabbit_user_permission,
322 #user_vhost{username = Username,
323 virtual_host = VHostPath}})
326 list_vhost_permissions(VHostPath) ->
327 [{Username, ConfigurePerm, WritePerm, ReadPerm} ||
328 {Username, _, ConfigurePerm, WritePerm, ReadPerm} <-
329 list_permissions(rabbit_misc:with_vhost(
330 VHostPath, match_user_vhost('_', VHostPath)))].
332 list_user_permissions(Username) ->
333 [{VHostPath, ConfigurePerm, WritePerm, ReadPerm} ||
334 {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <-
335 list_permissions(rabbit_misc:with_user(
336 Username, match_user_vhost(Username, '_')))].
338 list_permissions(QueryThunk) ->
339 [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} ||
340 #user_permission{user_vhost = #user_vhost{username = Username,
341 virtual_host = VHostPath},
342 permission = #permission{
343 configure = ConfigurePerm,
346 %% TODO: use dirty ops instead
347 rabbit_misc:execute_mnesia_transaction(QueryThunk)].
349 match_user_vhost(Username, VHostPath) ->
350 fun () -> mnesia:match_object(
351 rabbit_user_permission,
352 #user_permission{user_vhost = #user_vhost{
354 virtual_host = VHostPath},