ok, limits on the cache, and on prefetch.
I decided the right thing to do is to prefer older messages in the cache to younger ones. This is because they're more likely to be used sooner. Which means we just fill it up and then leave it alone, which is nice and simple.
Things are pretty much ok with it now, but the whole notion of prefetch is still wrong and needs to be changed to be driven by the mixed queue, not the disk_queue. For one reason, currently, if two or more queues issue prefetch requests, and the first fills the cache up, then the 2nd won't do anything. The cache is useful, but shouldn't be abused for prefetching purposes. The two things are separate.
1 %% This file is a copy of gen_server.erl from the R13B-1 Erlang/OTP
2 %% distribution, with the following modifications:
4 %% 1) the module name is gen_server2
6 %% 2) more efficient handling of selective receives in callbacks
7 %% gen_server2 processes drain their message queue into an internal
8 %% buffer before invoking any callback module functions. Messages are
9 %% dequeued from the buffer for processing. Thus the effective message
10 %% queue of a gen_server2 process is the concatenation of the internal
11 %% buffer and the real message queue.
12 %% As a result of the draining, any selective receive invoked inside a
13 %% callback is less likely to have to scan a large message queue.
15 %% 3) gen_server2:cast is guaranteed to be order-preserving
16 %% The original code could reorder messages when communicating with a
17 %% process on a remote node that was not currently connected.
19 %% 4) The new functions gen_server2:pcall/3, pcall/4, and pcast/3
20 %% allow callers to attach priorities to requests. Requests with
21 %% higher priorities are processed before requests with lower
22 %% priorities. The default priority is 0.
24 %% 5) On return from init/1, the timeout value {binary, Min} creates a
25 %% binary exponential timeout, where Min is the minimum number of
26 %% milliseconds permitted, and is also used as the current timeout
27 %% value. Returning from handle_* with the timeout value set to
28 %% 'binary' will use the current binary timeout value. handle_info/2
29 %% with the Info of 'timeout' will function normally, and supports the
30 %% return value of {noreply, State, hibernate} which will hibernate
31 %% the process. The current timeout value is:
33 %% a) doubled if the time spent in hibernation is < 4 * the current value;
34 %% b) halved if the time spent in hibernation is > 16 * the current value;
35 %% c) maintained in all other cases
37 %% Explicit timeouts (i.e. not 'binary') from the handle_* functions
38 %% are still supported, and do not have any effect on the current
41 %% 6) init/1 can also return (either a further arg in addition to
42 %% timeout above, or as a key-value list with the timeout as {timeout,
43 %% Timeout}) a minimum priority (key: min_priority). This can also be
44 %% returned from handle_* functions (i.e. {noreply, NewState} or
45 %% {noreply, NewState, Timeout} or {noreply, NewState, Timeout,
46 %% MinPri} or {noreply, NewState, [{min_priority, MinPri}]} or
47 %% {noreply, NewState, [{min_priority, MinPri}, {timeout,
48 %% Timeout}]}). What this does is to only allow messages greater than
49 %% the indicated priority through to the module. To allow any message
50 %% through (as is the default), use 'any'. One effect of this is that
51 %% when hibernating, the process can be woken up to receive a message
52 %% which it then realises it is not interested in. When this happens,
53 %% handle_info(roused_and_disinterested, State) will be called as soon
54 %% as there are no further messages to process (i.e. upon waking, the
55 %% message queue is drained, and a timeout of 0 is used).
57 %% This feature means that you can delay processing lower priority
58 %% messages. For example, when a min_priority of 0 is combined with
59 %% the binary backoff timeout, you can delay processing any
60 %% negative-priority messages until the first timeout fires which
61 %% indicates that, given a steady state, the process has been idle for
62 %% sufficiently long that it's reasonable to expect it to be
63 %% uninterrupted by higher-priority messages for some little while;
64 %% thus preventing low-priority, but lengthy jobs from getting in the
65 %% way of higher priority jobs that need quick responses.
67 %% All modifications are (C) 2009 LShift Ltd.
69 %% ``The contents of this file are subject to the Erlang Public License,
70 %% Version 1.1, (the "License"); you may not use this file except in
71 %% compliance with the License. You should have received a copy of the
72 %% Erlang Public License along with this software. If not, it can be
73 %% retrieved via the world wide web at http://www.erlang.org/.
75 %% Software distributed under the License is distributed on an "AS IS"
76 %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
77 %% the License for the specific language governing rights and limitations
80 %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
81 %% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
82 %% AB. All Rights Reserved.''
88 %%% ---------------------------------------------------
90 %%% The idea behind THIS server is that the user module
91 %%% provides (different) functions to handle different
93 %%% If the Parent process terminates the Module:terminate/2
94 %%% function is called.
96 %%% The user module should export:
100 %%% {ok, State, Timeout}
104 %%% handle_call(Msg, {From, Tag}, State)
106 %%% ==> {reply, Reply, State}
107 %%% {reply, Reply, State, Timeout}
109 %%% {noreply, State, Timeout}
110 %%% {stop, Reason, Reply, State}
111 %%% Reason = normal | shutdown | Term terminate(State) is called
113 %%% handle_cast(Msg, State)
115 %%% ==> {noreply, State}
116 %%% {noreply, State, Timeout}
117 %%% {stop, Reason, State}
118 %%% Reason = normal | shutdown | Term terminate(State) is called
120 %%% handle_info(Info, State) Info is e.g. {'EXIT', P, R}, {nodedown, N}, ...
122 %%% ==> {noreply, State}
123 %%% {noreply, State, Timeout}
124 %%% {stop, Reason, State}
125 %%% Reason = normal | shutdown | Term, terminate(State) is called
127 %%% terminate(Reason, State) Let the user module clean up
128 %%% always called when server terminates
133 %%% The work flow (of the server) can be described as follows:
135 %%% User module Generic
136 %%% ----------- -------
137 %%% start -----> start
141 %%% handle_call <----- .
144 %%% handle_cast <----- .
146 %%% handle_info <----- .
148 %%% terminate <----- .
153 %%% ---------------------------------------------------
156 -export([start/3, start/4,
157 start_link/3, start_link/4,
158 call/2, call/3, pcall/3, pcall/4,
159 cast/2, pcast/3, reply/2,
161 multi_call/2, multi_call/3, multi_call/4,
162 enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6,
165 -export([behaviour_info/1]).
168 -export([system_continue/3,
170 system_code_change/4,
174 -export([init_it/6, print_event/3]).
176 -import(error_logger, [format/2]).
178 %%%=========================================================================
180 %%%=========================================================================
183 -spec behaviour_info(atom()) -> 'undefined' | [{atom(), arity()}].
186 behaviour_info(callbacks) ->
187 [{init,1},{handle_call,3},{handle_cast,2},{handle_info,2},
188 {terminate,2},{code_change,3}];
189 behaviour_info(_Other) ->
192 %%% -----------------------------------------------------------------
193 %%% Starts a generic server.
194 %%% start(Mod, Args, Options)
195 %%% start(Name, Mod, Args, Options)
196 %%% start_link(Mod, Args, Options)
197 %%% start_link(Name, Mod, Args, Options) where:
198 %%% Name ::= {local, atom()} | {global, atom()}
199 %%% Mod ::= atom(), callback module implementing the 'real' server
200 %%% Args ::= term(), init arguments (to Mod:init/1)
201 %%% Options ::= [{timeout, Timeout} | {debug, [Flag]}]
202 %%% Flag ::= trace | log | {logfile, File} | statistics | debug
203 %%% (debug == log && statistics)
204 %%% Returns: {ok, Pid} |
205 %%% {error, {already_started, Pid}} |
207 %%% -----------------------------------------------------------------
208 start(Mod, Args, Options) ->
209 gen:start(?MODULE, nolink, Mod, Args, Options).
211 start(Name, Mod, Args, Options) ->
212 gen:start(?MODULE, nolink, Name, Mod, Args, Options).
214 start_link(Mod, Args, Options) ->
215 gen:start(?MODULE, link, Mod, Args, Options).
217 start_link(Name, Mod, Args, Options) ->
218 gen:start(?MODULE, link, Name, Mod, Args, Options).
221 %% -----------------------------------------------------------------
222 %% Make a call to a generic server.
223 %% If the server is located at another node, that node will
225 %% If the client is trapping exits and is linked server termination
226 %% is handled here (? Shall we do that here (or rely on timeouts) ?).
227 %% -----------------------------------------------------------------
228 call(Name, Request) ->
229 case catch gen:call(Name, '$gen_call', Request) of
233 exit({Reason, {?MODULE, call, [Name, Request]}})
236 call(Name, Request, Timeout) ->
237 case catch gen:call(Name, '$gen_call', Request, Timeout) of
241 exit({Reason, {?MODULE, call, [Name, Request, Timeout]}})
244 pcall(Name, Priority, Request) ->
245 case catch gen:call(Name, '$gen_pcall', {Priority, Request}) of
249 exit({Reason, {?MODULE, pcall, [Name, Priority, Request]}})
252 pcall(Name, Priority, Request, Timeout) ->
253 case catch gen:call(Name, '$gen_pcall', {Priority, Request}, Timeout) of
257 exit({Reason, {?MODULE, pcall, [Name, Priority, Request, Timeout]}})
260 %% -----------------------------------------------------------------
261 %% Make a cast to a generic server.
262 %% -----------------------------------------------------------------
263 cast({global,Name}, Request) ->
264 catch global:send(Name, cast_msg(Request)),
266 cast({Name,Node}=Dest, Request) when is_atom(Name), is_atom(Node) ->
267 do_cast(Dest, Request);
268 cast(Dest, Request) when is_atom(Dest) ->
269 do_cast(Dest, Request);
270 cast(Dest, Request) when is_pid(Dest) ->
271 do_cast(Dest, Request).
273 do_cast(Dest, Request) ->
274 do_send(Dest, cast_msg(Request)),
277 cast_msg(Request) -> {'$gen_cast',Request}.
279 pcast({global,Name}, Priority, Request) ->
280 catch global:send(Name, cast_msg(Priority, Request)),
282 pcast({Name,Node}=Dest, Priority, Request) when is_atom(Name), is_atom(Node) ->
283 do_cast(Dest, Priority, Request);
284 pcast(Dest, Priority, Request) when is_atom(Dest) ->
285 do_cast(Dest, Priority, Request);
286 pcast(Dest, Priority, Request) when is_pid(Dest) ->
287 do_cast(Dest, Priority, Request).
289 do_cast(Dest, Priority, Request) ->
290 do_send(Dest, cast_msg(Priority, Request)),
293 cast_msg(Priority, Request) -> {'$gen_pcast', {Priority, Request}}.
295 %% -----------------------------------------------------------------
296 %% Send a reply to the client.
297 %% -----------------------------------------------------------------
298 reply({To, Tag}, Reply) ->
299 catch To ! {Tag, Reply}.
301 %% -----------------------------------------------------------------
302 %% Asyncronous broadcast, returns nothing, it's just send'n prey
303 %%-----------------------------------------------------------------
304 abcast(Name, Request) when is_atom(Name) ->
305 do_abcast([node() | nodes()], Name, cast_msg(Request)).
307 abcast(Nodes, Name, Request) when is_list(Nodes), is_atom(Name) ->
308 do_abcast(Nodes, Name, cast_msg(Request)).
310 do_abcast([Node|Nodes], Name, Msg) when is_atom(Node) ->
311 do_send({Name,Node},Msg),
312 do_abcast(Nodes, Name, Msg);
313 do_abcast([], _,_) -> abcast.
315 %%% -----------------------------------------------------------------
316 %%% Make a call to servers at several nodes.
317 %%% Returns: {[Replies],[BadNodes]}
318 %%% A Timeout can be given
320 %%% A middleman process is used in case late answers arrives after
321 %%% the timeout. If they would be allowed to glog the callers message
322 %%% queue, it would probably become confused. Late answers will
323 %%% now arrive to the terminated middleman and so be discarded.
324 %%% -----------------------------------------------------------------
325 multi_call(Name, Req)
326 when is_atom(Name) ->
327 do_multi_call([node() | nodes()], Name, Req, infinity).
329 multi_call(Nodes, Name, Req)
330 when is_list(Nodes), is_atom(Name) ->
331 do_multi_call(Nodes, Name, Req, infinity).
333 multi_call(Nodes, Name, Req, infinity) ->
334 do_multi_call(Nodes, Name, Req, infinity);
335 multi_call(Nodes, Name, Req, Timeout)
336 when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 ->
337 do_multi_call(Nodes, Name, Req, Timeout).
340 %%-----------------------------------------------------------------
341 %% enter_loop(Mod, Options, State) -> _
342 %% enter_loop(Mod, Options, State, ServerName) -> _
343 %% enter_loop(Mod, Options, State, [{Key, Value}]) -> _
344 %% enter_loop(Mod, Options, State, Timeout) -> _
345 %% enter_loop(Mod, Options, State, ServerName, [{Key, Value}]) -> _
346 %% enter_loop(Mod, Options, State, ServerName, Timeout) -> _
347 %% enter_loop(Mod, Options, State, ServerName, Timeout, MinPri) -> _
349 %% {Key, Value} = {min_priority, MinPri} | {timeout, Timeout}
351 %% Description: Makes an existing process into a gen_server.
352 %% The calling process will enter the gen_server receive
353 %% loop and become a gen_server process.
354 %% The process *must* have been started using one of the
355 %% start functions in proc_lib, see proc_lib(3).
356 %% The user is responsible for any initialization of the
357 %% process, including registering a name for it.
358 %%-----------------------------------------------------------------
359 enter_loop(Mod, Options, State) ->
360 enter_loop(Mod, Options, State, self(), []).
362 enter_loop(Mod, Options, State, ServerName = {_, _}) ->
363 enter_loop(Mod, Options, State, ServerName, []);
365 enter_loop(Mod, Options, State, Opts) when is_list(Opts) ->
366 enter_loop(Mod, Options, State, self(), Opts);
368 enter_loop(Mod, Options, State, Timeout) ->
369 enter_loop(Mod, Options, State, self(), [{timeout, Timeout}]).
371 enter_loop(Mod, Options, State, ServerName, Opts) when is_list(Opts) ->
372 Name = get_proc_name(ServerName),
373 Parent = get_parent(),
374 Debug = debug_options(Name, Options),
375 Queue = priority_queue:new(),
376 [{timeout, Timeout}, {min_priority, MinPri}] = extract_timeout_minpri(Opts),
377 {Timeout1, TimeoutState} = build_timeout_state(Timeout),
378 loop(Parent, Name, State, Mod, Timeout1, TimeoutState, MinPri, Queue, Debug);
380 enter_loop(Mod, Options, State, ServerName, Timeout) ->
381 enter_loop(Mod, Options, State, ServerName, [{timeout, Timeout}]).
383 enter_loop(Mod, Options, State, ServerName, Timeout, MinPri) ->
384 enter_loop(Mod, Options, State, ServerName,
385 [{timeout, Timeout}, {min_priority, MinPri}]).
386 %%%========================================================================
387 %%% Gen-callback functions
388 %%%========================================================================
390 %%% ---------------------------------------------------
391 %%% Initiate the new process.
392 %%% Register the name using the Rfunc function
393 %%% Calls the Mod:init/Args function.
394 %%% Finally an acknowledge is sent to Parent and the main
396 %%% ---------------------------------------------------
397 init_it(Starter, self, Name, Mod, Args, Options) ->
398 init_it(Starter, self(), Name, Mod, Args, Options);
399 init_it(Starter, Parent, Name0, Mod, Args, Options) ->
401 Debug = debug_options(Name, Options),
402 Queue = priority_queue:new(),
403 case catch Mod:init(Args) of
405 proc_lib:init_ack(Starter, {ok, self()}),
406 loop(Parent, Name, State, Mod, infinity, undefined,
408 {ok, State, Timeout} ->
409 proc_lib:init_ack(Starter, {ok, self()}),
410 {Timeout1, TimeoutState} = build_timeout_state(Timeout),
411 loop(Parent, Name, State, Mod, Timeout1, TimeoutState,
413 {ok, State, Timeout, MinPri} ->
414 proc_lib:init_ack(Starter, {ok, self()}),
415 {Timeout1, TimeoutState} = build_timeout_state(Timeout),
416 loop(Parent, Name, State, Mod, Timeout1, TimeoutState,
417 MinPri, Queue, Debug);
419 %% For consistency, we must make sure that the
420 %% registered name (if any) is unregistered before
421 %% the parent process is notified about the failure.
422 %% (Otherwise, the parent process could get
423 %% an 'already_started' error if it immediately
424 %% tried starting the process again.)
425 unregister_name(Name0),
426 proc_lib:init_ack(Starter, {error, Reason}),
429 unregister_name(Name0),
430 proc_lib:init_ack(Starter, ignore),
433 unregister_name(Name0),
434 proc_lib:init_ack(Starter, {error, Reason}),
437 Error = {bad_return_value, Else},
438 proc_lib:init_ack(Starter, {error, Error}),
442 name({local,Name}) -> Name;
443 name({global,Name}) -> Name;
444 %% name(Pid) when is_pid(Pid) -> Pid;
445 %% when R11 goes away, drop the line beneath and uncomment the line above
448 unregister_name({local,Name}) ->
449 _ = (catch unregister(Name));
450 unregister_name({global,Name}) ->
451 _ = global:unregister_name(Name);
452 unregister_name(Pid) when is_pid(Pid) ->
455 build_timeout_state(Timeout) ->
457 {binary, Min} -> {binary, {Min, Min, undefined}};
458 _ -> {Timeout, undefined}
461 extract_timeout_minpri(Opts) ->
462 rabbit_misc:keygets([{timeout, infinity}, {min_priority, any}], Opts).
464 %%%========================================================================
465 %%% Internal functions
466 %%%========================================================================
467 %%% ---------------------------------------------------
469 %%% ---------------------------------------------------
470 loop(Parent, Name, State, Mod, hibernate, undefined, MinPri, Queue, Debug) ->
471 proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, State, Mod, undefined,
472 MinPri, Queue, Debug]);
473 loop(Parent, Name, State, Mod, hibernate, {Current, Min, undefined},
474 MinPri, Queue, Debug) ->
475 proc_lib:hibernate(?MODULE,wake_hib,[Parent, Name, State, Mod,
476 {Current, Min, now()},
477 MinPri, Queue, Debug]);
478 loop(Parent, Name, State, Mod, Time, TimeoutState, MinPri, Queue, Debug) ->
480 Input -> loop(Parent, Name, State, Mod,
481 Time, TimeoutState, MinPri, in(Input, Queue), Debug)
483 process_next_msg(Parent, Name, State, Mod, Time, TimeoutState,
484 MinPri, Queue, Debug)
487 process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, MinPri, Queue,
490 any -> priority_queue:out(Queue);
491 _ -> priority_queue:out(MinPri, Queue)
494 {{value, Msg}, Queue1} ->
495 process_msg(Parent, Name, State, Mod,
496 Time, TimeoutState, Queue1, Debug, Msg);
498 Time1 = case {Time, TimeoutState} of
500 {binary, {Current, _Min, undefined}} -> Current;
505 loop(Parent, Name, State, Mod,
506 Time, TimeoutState, MinPri, in(Input, Queue1), Debug)
508 process_msg(Parent, Name, State, Mod,
509 Time, TimeoutState, Queue1, Debug,
510 case Time == hibernate of
511 true -> {roused_and_disinterested, MinPri};
512 false when MinPri =:= any -> timeout;
513 false -> {timeout, MinPri}
518 wake_hib(Parent, Name, State, Mod, TimeoutState, MinPri, Queue, Debug) ->
523 TimeoutState1 = adjust_hibernate_after(TimeoutState),
524 process_next_msg(Parent, Name, State, Mod, hibernate, TimeoutState1,
525 MinPri, in(Msg, Queue), Debug).
527 adjust_hibernate_after(undefined) ->
529 adjust_hibernate_after({Current, Min, HibernatedAt}) ->
530 NapLengthMicros = timer:now_diff(now(), HibernatedAt),
531 CurrentMicros = Current * 1000,
532 LowTargetMicros = CurrentMicros * 4,
533 HighTargetMicros = LowTargetMicros * 4,
535 NapLengthMicros < LowTargetMicros ->
536 %% nap was too short, don't go to sleep as soon
537 {Current * 2, Min, undefined};
539 NapLengthMicros > HighTargetMicros ->
540 %% nap was long, try going to sleep sooner
541 {lists:max([Min, round(Current / 2)]), Min, undefined};
544 %% nap and timeout seem to be in the right relationship. stay here
545 {Current, Min, undefined}
548 in({'$gen_pcast', {Priority, Msg}}, Queue) ->
549 priority_queue:in({'$gen_cast', Msg}, Priority, Queue);
550 in({'$gen_pcall', From, {Priority, Msg}}, Queue) ->
551 priority_queue:in({'$gen_call', From, Msg}, Priority, Queue);
553 priority_queue:in(Input, Queue).
555 process_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue,
558 {system, From, Req} ->
559 sys:handle_system_msg
560 (Req, From, Parent, ?MODULE, Debug,
561 [Name, State, Mod, Time, TimeoutState, Queue]);
562 {'EXIT', Parent, Reason} ->
563 terminate(Reason, Name, Msg, Mod, State, Debug);
564 _Msg when Debug =:= [] ->
565 handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue);
567 Debug1 = sys:handle_debug(Debug, {?MODULE, print_event},
569 handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue,
573 %%% ---------------------------------------------------
574 %%% Send/recive functions
575 %%% ---------------------------------------------------
576 do_send(Dest, Msg) ->
577 catch erlang:send(Dest, Msg).
579 do_multi_call(Nodes, Name, Req, infinity) ->
581 Monitors = send_nodes(Nodes, Name, Tag, Req),
582 rec_nodes(Tag, Monitors, Name, undefined);
583 do_multi_call(Nodes, Name, Req, Timeout) ->
589 %% Middleman process. Should be unsensitive to regular
590 %% exit signals. The sychronization is needed in case
591 %% the receiver would exit before the caller started
593 process_flag(trap_exit, true),
594 Mref = erlang:monitor(process, Caller),
597 Monitors = send_nodes(Nodes, Name, Tag, Req),
598 TimerId = erlang:start_timer(Timeout, self(), ok),
599 Result = rec_nodes(Tag, Monitors, Name, TimerId),
600 exit({self(),Tag,Result});
601 {'DOWN',Mref,_,_,_} ->
602 %% Caller died before sending us the go-ahead.
607 Mref = erlang:monitor(process, Receiver),
608 Receiver ! {self(),Tag},
610 {'DOWN',Mref,_,_,{Receiver,Tag,Result}} ->
612 {'DOWN',Mref,_,_,Reason} ->
613 %% The middleman code failed. Or someone did
614 %% exit(_, kill) on the middleman process => Reason==killed
618 send_nodes(Nodes, Name, Tag, Req) ->
619 send_nodes(Nodes, Name, Tag, Req, []).
621 send_nodes([Node|Tail], Name, Tag, Req, Monitors)
622 when is_atom(Node) ->
623 Monitor = start_monitor(Node, Name),
624 %% Handle non-existing names in rec_nodes.
625 catch {Name, Node} ! {'$gen_call', {self(), {Tag, Node}}, Req},
626 send_nodes(Tail, Name, Tag, Req, [Monitor | Monitors]);
627 send_nodes([_Node|Tail], Name, Tag, Req, Monitors) ->
628 %% Skip non-atom Node
629 send_nodes(Tail, Name, Tag, Req, Monitors);
630 send_nodes([], _Name, _Tag, _Req, Monitors) ->
633 %% Against old nodes:
634 %% If no reply has been delivered within 2 secs. (per node) check that
635 %% the server really exists and wait for ever for the answer.
637 %% Against contemporary nodes:
638 %% Wait for reply, server 'DOWN', or timeout from TimerId.
640 rec_nodes(Tag, Nodes, Name, TimerId) ->
641 rec_nodes(Tag, Nodes, Name, [], [], 2000, TimerId).
643 rec_nodes(Tag, [{N,R}|Tail], Name, Badnodes, Replies, Time, TimerId ) ->
645 {'DOWN', R, _, _, _} ->
646 rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, Time, TimerId);
647 {{Tag, N}, Reply} -> %% Tag is bound !!!
649 rec_nodes(Tag, Tail, Name, Badnodes,
650 [{N,Reply}|Replies], Time, TimerId);
651 {timeout, TimerId, _} ->
653 %% Collect all replies that already have arrived
654 rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
656 rec_nodes(Tag, [N|Tail], Name, Badnodes, Replies, Time, TimerId) ->
660 monitor_node(N, false),
661 rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, 2000, TimerId);
662 {{Tag, N}, Reply} -> %% Tag is bound !!!
663 receive {nodedown, N} -> ok after 0 -> ok end,
664 monitor_node(N, false),
665 rec_nodes(Tag, Tail, Name, Badnodes,
666 [{N,Reply}|Replies], 2000, TimerId);
667 {timeout, TimerId, _} ->
668 receive {nodedown, N} -> ok after 0 -> ok end,
669 monitor_node(N, false),
670 %% Collect all replies that already have arrived
671 rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies)
673 case rpc:call(N, erlang, whereis, [Name]) of
674 Pid when is_pid(Pid) -> % It exists try again.
675 rec_nodes(Tag, [N|Tail], Name, Badnodes,
676 Replies, infinity, TimerId);
678 receive {nodedown, N} -> ok after 0 -> ok end,
679 monitor_node(N, false),
680 rec_nodes(Tag, Tail, Name, [N|Badnodes],
681 Replies, 2000, TimerId)
684 rec_nodes(_, [], _, Badnodes, Replies, _, TimerId) ->
685 case catch erlang:cancel_timer(TimerId) of
686 false -> % It has already sent it's message
688 {timeout, TimerId, _} -> ok
692 _ -> % Timer was cancelled, or TimerId was 'undefined'
697 %% Collect all replies that already have arrived
698 rec_nodes_rest(Tag, [{N,R}|Tail], Name, Badnodes, Replies) ->
700 {'DOWN', R, _, _, _} ->
701 rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies);
702 {{Tag, N}, Reply} -> %% Tag is bound !!!
704 rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies])
707 rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
709 rec_nodes_rest(Tag, [N|Tail], Name, Badnodes, Replies) ->
713 monitor_node(N, false),
714 rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies);
715 {{Tag, N}, Reply} -> %% Tag is bound !!!
716 receive {nodedown, N} -> ok after 0 -> ok end,
717 monitor_node(N, false),
718 rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies])
720 receive {nodedown, N} -> ok after 0 -> ok end,
721 monitor_node(N, false),
722 rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
724 rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) ->
728 %%% ---------------------------------------------------
729 %%% Monitor functions
730 %%% ---------------------------------------------------
732 start_monitor(Node, Name) when is_atom(Node), is_atom(Name) ->
733 if node() =:= nonode@nohost, Node =/= nonode@nohost ->
735 self() ! {'DOWN', Ref, process, {Name, Node}, noconnection},
738 case catch erlang:monitor(process, {Name, Node}) of
741 monitor_node(Node, true),
743 Ref when is_reference(Ref) ->
748 %% Cancels a monitor started with Ref=erlang:monitor(_, _).
749 unmonitor(Ref) when is_reference(Ref) ->
750 erlang:demonitor(Ref),
752 {'DOWN', Ref, _, _, _} ->
758 %%% ---------------------------------------------------
759 %%% Message handling functions
760 %%% ---------------------------------------------------
762 dispatch({'$gen_cast', Msg}, Mod, State) ->
763 Mod:handle_cast(Msg, State);
764 dispatch(Info, Mod, State) ->
765 Mod:handle_info(Info, State).
767 handle_msg({'$gen_call', From, Msg},
768 Parent, Name, State, Mod, TimeoutState, Queue) ->
769 case catch Mod:handle_call(Msg, From, State) of
770 {reply, Reply, NState} ->
772 loop(Parent, Name, NState, Mod, infinity, TimeoutState, any, Queue,
774 {reply, Reply, NState, Opts} when is_list(Opts) ->
776 [{timeout, Time}, {min_priority, MinPri}] =
777 extract_timeout_minpri(Opts),
778 loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue,
780 {reply, Reply, NState, Time} ->
782 loop(Parent, Name, NState, Mod, Time, TimeoutState, any, Queue, []);
783 {reply, Reply, NState, Time, MinPri} ->
785 loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue,
788 loop(Parent, Name, NState, Mod, infinity, TimeoutState, any, Queue,
790 {noreply, NState, Opts} when is_list(Opts) ->
791 [{timeout, Time}, {min_priority, MinPri}] =
792 extract_timeout_minpri(Opts),
793 loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue,
795 {noreply, NState, Time} ->
796 loop(Parent, Name, NState, Mod, Time, TimeoutState, any, Queue, []);
797 {noreply, NState, Time, MinPri} ->
798 loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue,
800 {stop, Reason, Reply, NState} ->
802 (catch terminate(Reason, Name, Msg, Mod, NState, [])),
805 Other -> handle_common_reply(Other, Parent, Name, Msg, Mod, State,
809 Parent, Name, State, Mod, TimeoutState, Queue) ->
810 Reply = (catch dispatch(Msg, Mod, State)),
811 handle_common_reply(Reply, Parent, Name, Msg, Mod, State,
812 TimeoutState, Queue).
814 handle_msg({'$gen_call', From, Msg},
815 Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
816 case catch Mod:handle_call(Msg, From, State) of
817 {reply, Reply, NState} ->
818 Debug1 = reply(Name, From, Reply, NState, Debug),
819 loop(Parent, Name, NState, Mod, infinity, TimeoutState, any, Queue,
821 {reply, Reply, NState, Opts} when is_list(Opts) ->
822 Debug1 = reply(Name, From, Reply, NState, Debug),
823 [{timeout, Time}, {min_priority, MinPri}] =
824 extract_timeout_minpri(Opts),
825 loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue,
827 {reply, Reply, NState, Time} ->
828 Debug1 = reply(Name, From, Reply, NState, Debug),
829 loop(Parent, Name, NState, Mod, Time, TimeoutState, any, Queue,
831 {reply, Reply, NState, Time, MinPri} ->
832 Debug1 = reply(Name, From, Reply, NState, Debug),
833 loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue,
836 Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
838 loop(Parent, Name, NState, Mod, infinity, TimeoutState, any, Queue,
840 {noreply, NState, Opts} when is_list(Opts) ->
841 Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
843 [{timeout, Time}, {min_priority, MinPri}] =
844 extract_timeout_minpri(Opts),
845 loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue,
847 {noreply, NState, Time} ->
848 Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
850 loop(Parent, Name, NState, Mod, Time, TimeoutState, any, Queue,
852 {noreply, NState, Time, MinPri} ->
853 Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
855 loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue,
857 {stop, Reason, Reply, NState} ->
859 (catch terminate(Reason, Name, Msg, Mod, NState, Debug)),
860 reply(Name, From, Reply, NState, Debug),
863 handle_common_reply(Other, Parent, Name, Msg, Mod, State,
864 TimeoutState, Queue, Debug)
867 Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
868 Reply = (catch dispatch(Msg, Mod, State)),
869 handle_common_reply(Reply, Parent, Name, Msg, Mod, State,
870 TimeoutState, Queue, Debug).
872 handle_common_reply(Reply, Parent, Name, Msg, Mod, State,
873 TimeoutState, Queue) ->
876 loop(Parent, Name, NState, Mod, infinity, TimeoutState, any, Queue,
878 {noreply, NState, Opts} when is_list(Opts) ->
879 [{timeout, Time}, {min_priority, MinPri}] =
880 extract_timeout_minpri(Opts),
881 loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue,
883 {noreply, NState, Time} ->
884 loop(Parent, Name, NState, Mod, Time, TimeoutState, any, Queue, []);
885 {noreply, NState, Time, MinPri} ->
886 loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue,
888 {stop, Reason, NState} ->
889 terminate(Reason, Name, Msg, Mod, NState, []);
891 terminate(What, Name, Msg, Mod, State, []);
893 terminate({bad_return_value, Reply}, Name, Msg, Mod, State, [])
896 handle_common_reply(Reply, Parent, Name, Msg, Mod, State, TimeoutState, Queue,
900 Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
902 loop(Parent, Name, NState, Mod, infinity, TimeoutState, any, Queue,
904 {noreply, NState, Opts} when is_list(Opts) ->
905 Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
907 [{timeout, Time}, {min_priority, MinPri}] =
908 extract_timeout_minpri(Opts),
909 loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue,
911 {noreply, NState, Time} ->
912 Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
914 loop(Parent, Name, NState, Mod, Time, TimeoutState, any, Queue,
916 {noreply, NState, Time, MinPri} ->
917 Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
919 loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue,
921 {stop, Reason, NState} ->
922 terminate(Reason, Name, Msg, Mod, NState, Debug);
924 terminate(What, Name, Msg, Mod, State, Debug);
926 terminate({bad_return_value, Reply}, Name, Msg, Mod, State, Debug)
929 reply(Name, {To, Tag}, Reply, State, Debug) ->
930 reply({To, Tag}, Reply),
931 sys:handle_debug(Debug, {?MODULE, print_event}, Name,
932 {out, Reply, To, State} ).
935 %%-----------------------------------------------------------------
936 %% Callback functions for system messages handling.
937 %%-----------------------------------------------------------------
938 system_continue(Parent, Debug, [Name, State, Mod, Time, TimeoutState, MinPri,
940 loop(Parent, Name, State, Mod, Time, TimeoutState, MinPri, Queue, Debug).
943 -spec system_terminate(_, _, _, [_]) -> no_return().
946 system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time,
947 _TimeoutState, _Queue]) ->
948 terminate(Reason, Name, [], Mod, State, Debug).
950 system_code_change([Name, State, Mod, Time, TimeoutState, Queue], _Module,
952 case catch Mod:code_change(OldVsn, State, Extra) of
954 {ok, [Name, NewState, Mod, Time, TimeoutState, Queue]};
959 %%-----------------------------------------------------------------
960 %% Format debug messages. Print them as the call-back module sees
961 %% them, not as the real erlang messages. Use trace for that.
962 %%-----------------------------------------------------------------
963 print_event(Dev, {in, Msg}, Name) ->
965 {'$gen_call', {From, _Tag}, Call} ->
966 io:format(Dev, "*DBG* ~p got call ~p from ~w~n",
968 {'$gen_cast', Cast} ->
969 io:format(Dev, "*DBG* ~p got cast ~p~n",
972 io:format(Dev, "*DBG* ~p got ~p~n", [Name, Msg])
974 print_event(Dev, {out, Msg, To, State}, Name) ->
975 io:format(Dev, "*DBG* ~p sent ~p to ~w, new state ~w~n",
976 [Name, Msg, To, State]);
977 print_event(Dev, {noreply, State}, Name) ->
978 io:format(Dev, "*DBG* ~p new state ~w~n", [Name, State]);
979 print_event(Dev, Event, Name) ->
980 io:format(Dev, "*DBG* ~p dbg ~p~n", [Name, Event]).
983 %%% ---------------------------------------------------
984 %%% Terminate the server.
985 %%% ---------------------------------------------------
987 terminate(Reason, Name, Msg, Mod, State, Debug) ->
988 case catch Mod:terminate(Reason, State) of
990 error_info(R, Name, Msg, State, Debug),
998 {shutdown,_}=Shutdown ->
1001 error_info(Reason, Name, Msg, State, Debug),
1006 error_info(_Reason, application_controller, _Msg, _State, _Debug) ->
1007 %% OTP-5811 Don't send an error report if it's the system process
1008 %% application_controller which is terminating - let init take care
1011 error_info(Reason, Name, Msg, State, Debug) ->
1014 {undef,[{M,F,A}|MFAs]} ->
1015 case code:is_loaded(M) of
1017 {'module could not be loaded',[{M,F,A}|MFAs]};
1019 case erlang:function_exported(M, F, length(A)) of
1023 {'function not exported',[{M,F,A}|MFAs]}
1029 format("** Generic server ~p terminating \n"
1030 "** Last message in was ~p~n"
1031 "** When Server state == ~p~n"
1032 "** Reason for termination == ~n** ~p~n",
1033 [Name, Msg, State, Reason1]),
1034 sys:print_log(Debug),
1037 %%% ---------------------------------------------------
1038 %%% Misc. functions.
1039 %%% ---------------------------------------------------
1041 opt(Op, [{Op, Value}|_]) ->
1043 opt(Op, [_|Options]) ->
1048 debug_options(Name, Opts) ->
1049 case opt(debug, Opts) of
1050 {ok, Options} -> dbg_options(Name, Options);
1051 _ -> dbg_options(Name, [])
1054 dbg_options(Name, []) ->
1056 case init:get_argument(generic_debug) of
1062 dbg_opts(Name, Opts);
1063 dbg_options(Name, Opts) ->
1064 dbg_opts(Name, Opts).
1066 dbg_opts(Name, Opts) ->
1067 case catch sys:debug_options(Opts) of
1069 format("~p: ignoring erroneous debug options - ~p~n",
1076 get_proc_name(Pid) when is_pid(Pid) ->
1078 get_proc_name({local, Name}) ->
1079 case process_info(self(), registered_name) of
1080 {registered_name, Name} ->
1082 {registered_name, _Name} ->
1083 exit(process_not_registered);
1085 exit(process_not_registered)
1087 get_proc_name({global, Name}) ->
1088 case global:safe_whereis_name(Name) of
1090 exit(process_not_registered_globally);
1091 Pid when Pid =:= self() ->
1094 exit(process_not_registered_globally)
1098 case get('$ancestors') of
1099 [Parent | _] when is_pid(Parent)->
1101 [Parent | _] when is_atom(Parent)->
1102 name_to_pid(Parent);
1104 exit(process_was_not_started_by_proc_lib)
1107 name_to_pid(Name) ->
1108 case whereis(Name) of
1110 case global:safe_whereis_name(Name) of
1112 exit(could_not_find_registerd_name);
1120 %%-----------------------------------------------------------------
1121 %% Status information
1122 %%-----------------------------------------------------------------
1123 format_status(Opt, StatusData) ->
1124 [PDict, SysState, Parent, Debug, [Name, State, Mod, _Time,
1125 TimeoutState, Queue]] =
1127 NameTag = if is_pid(Name) ->
1132 Header = lists:concat(["Status for generic server ", NameTag]),
1133 Log = sys:get_debug(log, Debug, []),
1135 case erlang:function_exported(Mod, format_status, 2) of
1137 case catch Mod:format_status(Opt, [PDict, State]) of
1138 {'EXIT', _} -> [{data, [{"State", State}]}];
1142 [{data, [{"State", State}]}]
1144 Specfic1 = case TimeoutState of
1145 undefined -> Specfic;
1146 {Current, Min, undefined} ->
1147 [ {"Binary Timeout Current and Min", {Current, Min}}
1151 {data, [{"Status", SysState},
1153 {"Logged events", Log},
1154 {"Queued messages", priority_queue:to_list(Queue)}]} |