1 %% This file is a copy of gen_server.erl from the R13B-1 Erlang/OTP
2 %% distribution, with the following modifications:
4 %% 1) the module name is gen_server2
6 %% 2) more efficient handling of selective receives in callbacks
7 %% gen_server2 processes drain their message queue into an internal
8 %% buffer before invoking any callback module functions. Messages are
9 %% dequeued from the buffer for processing. Thus the effective message
10 %% queue of a gen_server2 process is the concatenation of the internal
11 %% buffer and the real message queue.
12 %% As a result of the draining, any selective receive invoked inside a
13 %% callback is less likely to have to scan a large message queue.
15 %% 3) gen_server2:cast is guaranteed to be order-preserving
16 %% The original code could reorder messages when communicating with a
17 %% process on a remote node that was not currently connected.
19 %% 4) The callback module can optionally implement prioritise_call/3,
20 %% prioritise_cast/2 and prioritise_info/2. These functions take
21 %% Message, From and State or just Message and State and return a
22 %% single integer representing the priority attached to the message.
23 %% Messages with higher priorities are processed before requests with
24 %% lower priorities. The default priority is 0.
26 %% 5) The callback module can optionally implement
27 %% handle_pre_hibernate/1 and handle_post_hibernate/1. These will be
28 %% called immediately prior to and post hibernation, respectively. If
29 %% handle_pre_hibernate returns {hibernate, NewState} then the process
30 %% will hibernate. If the module does not implement
31 %% handle_pre_hibernate/1 then the default action is to hibernate.
33 %% 6) init can return a 4th arg, {backoff, InitialTimeout,
34 %% MinimumTimeout, DesiredHibernatePeriod} (all in
35 %% milliseconds). Then, on all callbacks which can return a timeout
36 %% (including init), timeout can be 'hibernate'. When this is the
37 %% case, the current timeout value will be used (initially, the
38 %% InitialTimeout supplied from init). After this timeout has
39 %% occurred, hibernation will occur as normal. Upon awaking, a new
40 %% current timeout value will be calculated.
42 %% The purpose is that the gen_server2 takes care of adjusting the
43 %% current timeout value such that the process will increase the
44 %% timeout value repeatedly if it is unable to sleep for the
45 %% DesiredHibernatePeriod. If it is able to sleep for the
46 %% DesiredHibernatePeriod it will decrease the current timeout down to
47 %% the MinimumTimeout, so that the process is put to sleep sooner (and
48 %% hopefully stays asleep for longer). In short, should a process
49 %% using this receive a burst of messages, it should not hibernate
50 %% between those messages, but as the messages become less frequent,
51 %% the process will not only hibernate, it will do so sooner after
54 %% When using this backoff mechanism, normal timeout values (i.e. not
55 %% 'hibernate') can still be used, and if they are used then the
56 %% handle_info(timeout, State) will be called as normal. In this case,
57 %% returning 'hibernate' from handle_info(timeout, State) will not
58 %% hibernate the process immediately, as it would if backoff wasn't
59 %% being used. Instead it'll wait for the current timeout as described
62 %% 7) The callback module can return from any of the handle_*
63 %% functions, a {become, Module, State} triple, or a {become, Module,
64 %% State, Timeout} quadruple. This allows the gen_server to
65 %% dynamically change the callback module. The State is the new state
66 %% which will be passed into any of the callback functions in the new
67 %% module. Note there is no form also encompassing a reply, thus if
68 %% you wish to reply in handle_call/3 and change the callback module,
69 %% you need to use gen_server2:reply/2 to issue the reply manually.
71 %% 8) The callback module can optionally implement
72 %% format_message_queue/2 which is the equivalent of format_status/2
73 %% but where the second argument is specifically the priority_queue
74 %% which contains the prioritised message_queue.
76 %% All modifications are (C) 2009-2011 VMware, Inc.
78 %% ``The contents of this file are subject to the Erlang Public License,
79 %% Version 1.1, (the "License"); you may not use this file except in
80 %% compliance with the License. You should have received a copy of the
81 %% Erlang Public License along with this software. If not, it can be
82 %% retrieved via the world wide web at http://www.erlang.org/.
84 %% Software distributed under the License is distributed on an "AS IS"
85 %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
86 %% the License for the specific language governing rights and limitations
89 %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
90 %% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
91 %% AB. All Rights Reserved.''
97 %%% ---------------------------------------------------
99 %%% The idea behind THIS server is that the user module
100 %%% provides (different) functions to handle different
102 %%% If the Parent process terminates the Module:terminate/2
103 %%% function is called.
105 %%% The user module should export:
109 %%% {ok, State, Timeout}
110 %%% {ok, State, Timeout, Backoff}
114 %%% handle_call(Msg, {From, Tag}, State)
116 %%% ==> {reply, Reply, State}
117 %%% {reply, Reply, State, Timeout}
119 %%% {noreply, State, Timeout}
120 %%% {stop, Reason, Reply, State}
121 %%% Reason = normal | shutdown | Term terminate(State) is called
123 %%% handle_cast(Msg, State)
125 %%% ==> {noreply, State}
126 %%% {noreply, State, Timeout}
127 %%% {stop, Reason, State}
128 %%% Reason = normal | shutdown | Term terminate(State) is called
130 %%% handle_info(Info, State) Info is e.g. {'EXIT', P, R}, {nodedown, N}, ...
132 %%% ==> {noreply, State}
133 %%% {noreply, State, Timeout}
134 %%% {stop, Reason, State}
135 %%% Reason = normal | shutdown | Term, terminate(State) is called
137 %%% terminate(Reason, State) Let the user module clean up
138 %%% always called when server terminates
142 %%% handle_pre_hibernate(State)
144 %%% ==> {hibernate, State}
145 %%% {stop, Reason, State}
146 %%% Reason = normal | shutdown | Term, terminate(State) is called
148 %%% handle_post_hibernate(State)
150 %%% ==> {noreply, State}
151 %%% {stop, Reason, State}
152 %%% Reason = normal | shutdown | Term, terminate(State) is called
154 %%% The work flow (of the server) can be described as follows:
156 %%% User module Generic
157 %%% ----------- -------
158 %%% start -----> start
162 %%% handle_call <----- .
165 %%% handle_cast <----- .
167 %%% handle_info <----- .
169 %%% terminate <----- .
174 %%% ---------------------------------------------------
177 -export([start/3, start/4,
178 start_link/3, start_link/4,
182 multi_call/2, multi_call/3, multi_call/4,
183 enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/1]).
185 -export([behaviour_info/1]).
188 -export([system_continue/3,
190 system_code_change/4,
194 -export([init_it/6]).
196 -import(error_logger, [format/2]).
199 -record(gs2_state, {parent, name, state, mod, time,
200 timeout_state, queue, debug, prioritise_call,
201 prioritise_cast, prioritise_info}).
203 %%%=========================================================================
204 %%% Specs. These exist only to shut up dialyzer's warnings
205 %%%=========================================================================
209 -type(gs2_state() :: #gs2_state{}).
211 -spec(handle_common_termination/3 ::
212 (any(), atom(), gs2_state()) -> no_return()).
213 -spec(hibernate/1 :: (gs2_state()) -> no_return()).
214 -spec(pre_hibernate/1 :: (gs2_state()) -> no_return()).
215 -spec(system_terminate/4 :: (_, _, _, gs2_state()) -> no_return()).
219 %%%=========================================================================
221 %%%=========================================================================
223 behaviour_info(callbacks) ->
224 [{init,1},{handle_call,3},{handle_cast,2},{handle_info,2},
225 {terminate,2},{code_change,3}];
226 behaviour_info(_Other) ->
229 %%% -----------------------------------------------------------------
230 %%% Starts a generic server.
231 %%% start(Mod, Args, Options)
232 %%% start(Name, Mod, Args, Options)
233 %%% start_link(Mod, Args, Options)
234 %%% start_link(Name, Mod, Args, Options) where:
235 %%% Name ::= {local, atom()} | {global, atom()}
236 %%% Mod ::= atom(), callback module implementing the 'real' server
237 %%% Args ::= term(), init arguments (to Mod:init/1)
238 %%% Options ::= [{timeout, Timeout} | {debug, [Flag]}]
239 %%% Flag ::= trace | log | {logfile, File} | statistics | debug
240 %%% (debug == log && statistics)
241 %%% Returns: {ok, Pid} |
242 %%% {error, {already_started, Pid}} |
244 %%% -----------------------------------------------------------------
245 start(Mod, Args, Options) ->
246 gen:start(?MODULE, nolink, Mod, Args, Options).
248 start(Name, Mod, Args, Options) ->
249 gen:start(?MODULE, nolink, Name, Mod, Args, Options).
251 start_link(Mod, Args, Options) ->
252 gen:start(?MODULE, link, Mod, Args, Options).
254 start_link(Name, Mod, Args, Options) ->
255 gen:start(?MODULE, link, Name, Mod, Args, Options).
258 %% -----------------------------------------------------------------
259 %% Make a call to a generic server.
260 %% If the server is located at another node, that node will
262 %% If the client is trapping exits and is linked server termination
263 %% is handled here (? Shall we do that here (or rely on timeouts) ?).
264 %% -----------------------------------------------------------------
265 call(Name, Request) ->
266 case catch gen:call(Name, '$gen_call', Request) of
270 exit({Reason, {?MODULE, call, [Name, Request]}})
273 call(Name, Request, Timeout) ->
274 case catch gen:call(Name, '$gen_call', Request, Timeout) of
278 exit({Reason, {?MODULE, call, [Name, Request, Timeout]}})
281 %% -----------------------------------------------------------------
282 %% Make a cast to a generic server.
283 %% -----------------------------------------------------------------
284 cast({global,Name}, Request) ->
285 catch global:send(Name, cast_msg(Request)),
287 cast({Name,Node}=Dest, Request) when is_atom(Name), is_atom(Node) ->
288 do_cast(Dest, Request);
289 cast(Dest, Request) when is_atom(Dest) ->
290 do_cast(Dest, Request);
291 cast(Dest, Request) when is_pid(Dest) ->
292 do_cast(Dest, Request).
294 do_cast(Dest, Request) ->
295 do_send(Dest, cast_msg(Request)),
298 cast_msg(Request) -> {'$gen_cast',Request}.
300 %% -----------------------------------------------------------------
301 %% Send a reply to the client.
302 %% -----------------------------------------------------------------
303 reply({To, Tag}, Reply) ->
304 catch To ! {Tag, Reply}.
306 %% -----------------------------------------------------------------
307 %% Asyncronous broadcast, returns nothing, it's just send'n pray
308 %% -----------------------------------------------------------------
309 abcast(Name, Request) when is_atom(Name) ->
310 do_abcast([node() | nodes()], Name, cast_msg(Request)).
312 abcast(Nodes, Name, Request) when is_list(Nodes), is_atom(Name) ->
313 do_abcast(Nodes, Name, cast_msg(Request)).
315 do_abcast([Node|Nodes], Name, Msg) when is_atom(Node) ->
316 do_send({Name,Node},Msg),
317 do_abcast(Nodes, Name, Msg);
318 do_abcast([], _,_) -> abcast.
320 %%% -----------------------------------------------------------------
321 %%% Make a call to servers at several nodes.
322 %%% Returns: {[Replies],[BadNodes]}
323 %%% A Timeout can be given
325 %%% A middleman process is used in case late answers arrives after
326 %%% the timeout. If they would be allowed to glog the callers message
327 %%% queue, it would probably become confused. Late answers will
328 %%% now arrive to the terminated middleman and so be discarded.
329 %%% -----------------------------------------------------------------
330 multi_call(Name, Req)
331 when is_atom(Name) ->
332 do_multi_call([node() | nodes()], Name, Req, infinity).
334 multi_call(Nodes, Name, Req)
335 when is_list(Nodes), is_atom(Name) ->
336 do_multi_call(Nodes, Name, Req, infinity).
338 multi_call(Nodes, Name, Req, infinity) ->
339 do_multi_call(Nodes, Name, Req, infinity);
340 multi_call(Nodes, Name, Req, Timeout)
341 when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 ->
342 do_multi_call(Nodes, Name, Req, Timeout).
345 %%-----------------------------------------------------------------
346 %% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>, <Backoff>) ->_
348 %% Description: Makes an existing process into a gen_server.
349 %% The calling process will enter the gen_server receive
350 %% loop and become a gen_server process.
351 %% The process *must* have been started using one of the
352 %% start functions in proc_lib, see proc_lib(3).
353 %% The user is responsible for any initialization of the
354 %% process, including registering a name for it.
355 %%-----------------------------------------------------------------
356 enter_loop(Mod, Options, State) ->
357 enter_loop(Mod, Options, State, self(), infinity, undefined).
359 enter_loop(Mod, Options, State, Backoff = {backoff, _, _ , _}) ->
360 enter_loop(Mod, Options, State, self(), infinity, Backoff);
362 enter_loop(Mod, Options, State, ServerName = {_, _}) ->
363 enter_loop(Mod, Options, State, ServerName, infinity, undefined);
365 enter_loop(Mod, Options, State, Timeout) ->
366 enter_loop(Mod, Options, State, self(), Timeout, undefined).
368 enter_loop(Mod, Options, State, ServerName, Backoff = {backoff, _, _, _}) ->
369 enter_loop(Mod, Options, State, ServerName, infinity, Backoff);
371 enter_loop(Mod, Options, State, ServerName, Timeout) ->
372 enter_loop(Mod, Options, State, ServerName, Timeout, undefined).
374 enter_loop(Mod, Options, State, ServerName, Timeout, Backoff) ->
375 Name = get_proc_name(ServerName),
376 Parent = get_parent(),
377 Debug = debug_options(Name, Options),
378 Queue = priority_queue:new(),
379 Backoff1 = extend_backoff(Backoff),
380 loop(find_prioritisers(
381 #gs2_state { parent = Parent, name = Name, state = State,
382 mod = Mod, time = Timeout, timeout_state = Backoff1,
383 queue = Queue, debug = Debug })).
385 %%%========================================================================
386 %%% Gen-callback functions
387 %%%========================================================================
389 %%% ---------------------------------------------------
390 %%% Initiate the new process.
391 %%% Register the name using the Rfunc function
392 %%% Calls the Mod:init/Args function.
393 %%% Finally an acknowledge is sent to Parent and the main
395 %%% ---------------------------------------------------
396 init_it(Starter, self, Name, Mod, Args, Options) ->
397 init_it(Starter, self(), Name, Mod, Args, Options);
398 init_it(Starter, Parent, Name0, Mod, Args, Options) ->
400 Debug = debug_options(Name, Options),
401 Queue = priority_queue:new(),
402 GS2State = find_prioritisers(
403 #gs2_state { parent = Parent,
408 case catch Mod:init(Args) of
410 proc_lib:init_ack(Starter, {ok, self()}),
411 loop(GS2State #gs2_state { state = State,
413 timeout_state = undefined });
414 {ok, State, Timeout} ->
415 proc_lib:init_ack(Starter, {ok, self()}),
416 loop(GS2State #gs2_state { state = State,
418 timeout_state = undefined });
419 {ok, State, Timeout, Backoff = {backoff, _, _, _}} ->
420 Backoff1 = extend_backoff(Backoff),
421 proc_lib:init_ack(Starter, {ok, self()}),
422 loop(GS2State #gs2_state { state = State,
424 timeout_state = Backoff1 });
426 %% For consistency, we must make sure that the
427 %% registered name (if any) is unregistered before
428 %% the parent process is notified about the failure.
429 %% (Otherwise, the parent process could get
430 %% an 'already_started' error if it immediately
431 %% tried starting the process again.)
432 unregister_name(Name0),
433 proc_lib:init_ack(Starter, {error, Reason}),
436 unregister_name(Name0),
437 proc_lib:init_ack(Starter, ignore),
440 unregister_name(Name0),
441 proc_lib:init_ack(Starter, {error, Reason}),
444 Error = {bad_return_value, Else},
445 proc_lib:init_ack(Starter, {error, Error}),
449 name({local,Name}) -> Name;
450 name({global,Name}) -> Name;
451 %% name(Pid) when is_pid(Pid) -> Pid;
452 %% when R12 goes away, drop the line beneath and uncomment the line above
455 unregister_name({local,Name}) ->
456 _ = (catch unregister(Name));
457 unregister_name({global,Name}) ->
458 _ = global:unregister_name(Name);
459 unregister_name(Pid) when is_pid(Pid) ->
461 %% Under R12 let's just ignore it, as we have a single term as Name.
462 %% On R13 it will never get here, as we get tuple with 'local/global' atom.
463 unregister_name(_Name) -> ok.
465 extend_backoff(undefined) ->
467 extend_backoff({backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod}) ->
468 {backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod, now()}.
470 %%%========================================================================
471 %%% Internal functions
472 %%%========================================================================
473 %%% ---------------------------------------------------
475 %%% ---------------------------------------------------
476 loop(GS2State = #gs2_state { time = hibernate,
477 timeout_state = undefined }) ->
478 pre_hibernate(GS2State);
480 process_next_msg(drain(GS2State)).
484 Input -> drain(in(Input, GS2State))
488 process_next_msg(GS2State = #gs2_state { time = Time,
489 timeout_state = TimeoutState,
491 case priority_queue:out(Queue) of
492 {{value, Msg}, Queue1} ->
493 process_msg(Msg, GS2State #gs2_state { queue = Queue1 });
495 {Time1, HibOnTimeout}
496 = case {Time, TimeoutState} of
497 {hibernate, {backoff, Current, _Min, _Desired, _RSt}} ->
500 %% wake_hib/7 will set Time to hibernate. If
501 %% we were woken and didn't receive a msg
502 %% then we will get here and need a sensible
503 %% value for Time1, otherwise we crash.
504 %% R13B1 always waits infinitely when waking
505 %% from hibernation, so that's what we do
512 %% Time could be 'hibernate' here, so *don't* call loop
514 drain(in(Input, GS2State #gs2_state { queue = Queue1 })))
519 GS2State #gs2_state { queue = Queue1 });
522 GS2State #gs2_state { queue = Queue1 })
527 wake_hib(GS2State = #gs2_state { timeout_state = TS }) ->
528 TimeoutState1 = case TS of
531 {SleptAt, TimeoutState} ->
532 adjust_timeout_state(SleptAt, now(), TimeoutState)
535 drain(GS2State #gs2_state { timeout_state = TimeoutState1 })).
537 hibernate(GS2State = #gs2_state { timeout_state = TimeoutState }) ->
538 TS = case TimeoutState of
539 undefined -> undefined;
540 {backoff, _, _, _, _} -> {now(), TimeoutState}
542 proc_lib:hibernate(?MODULE, wake_hib,
543 [GS2State #gs2_state { timeout_state = TS }]).
545 pre_hibernate(GS2State = #gs2_state { state = State,
547 case erlang:function_exported(Mod, handle_pre_hibernate, 1) of
549 case catch Mod:handle_pre_hibernate(State) of
550 {hibernate, NState} ->
551 hibernate(GS2State #gs2_state { state = NState } );
553 handle_common_termination(Reply, pre_hibernate, GS2State)
559 post_hibernate(GS2State = #gs2_state { state = State,
561 case erlang:function_exported(Mod, handle_post_hibernate, 1) of
563 case catch Mod:handle_post_hibernate(State) of
565 process_next_msg(GS2State #gs2_state { state = NState,
567 {noreply, NState, Time} ->
568 process_next_msg(GS2State #gs2_state { state = NState,
571 handle_common_termination(Reply, post_hibernate, GS2State)
574 %% use hibernate here, not infinity. This matches
575 %% R13B. The key is that we should be able to get through
576 %% to process_msg calling sys:handle_system_msg with Time
577 %% still set to hibernate, iff that msg is the very msg
578 %% that woke us up (or the first msg we receive after
580 process_next_msg(GS2State #gs2_state { time = hibernate })
583 adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO,
584 DesiredHibPeriod, RandomState}) ->
585 NapLengthMicros = timer:now_diff(AwokeAt, SleptAt),
586 CurrentMicros = CurrentTO * 1000,
587 MinimumMicros = MinimumTO * 1000,
588 DesiredHibMicros = DesiredHibPeriod * 1000,
589 GapBetweenMessagesMicros = NapLengthMicros + CurrentMicros,
591 %% If enough time has passed between the last two messages then we
592 %% should consider sleeping sooner. Otherwise stay awake longer.
593 case GapBetweenMessagesMicros > (MinimumMicros + DesiredHibMicros) of
594 true -> lists:max([MinimumTO, CurrentTO div 2]);
597 {Extra, RandomState1} = random:uniform_s(Base, RandomState),
598 CurrentTO1 = Base + Extra,
599 {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, RandomState1}.
601 in({'$gen_cast', Msg} = Input,
602 GS2State = #gs2_state { prioritise_cast = PC }) ->
603 in(Input, PC(Msg, GS2State), GS2State);
604 in({'$gen_call', From, Msg} = Input,
605 GS2State = #gs2_state { prioritise_call = PC }) ->
606 in(Input, PC(Msg, From, GS2State), GS2State);
607 in({'EXIT', Parent, _R} = Input, GS2State = #gs2_state { parent = Parent }) ->
608 in(Input, infinity, GS2State);
609 in({system, _From, _Req} = Input, GS2State) ->
610 in(Input, infinity, GS2State);
611 in(Input, GS2State = #gs2_state { prioritise_info = PI }) ->
612 in(Input, PI(Input, GS2State), GS2State).
614 in(Input, Priority, GS2State = #gs2_state { queue = Queue }) ->
615 GS2State # gs2_state { queue = priority_queue:in(Input, Priority, Queue) }.
617 process_msg({system, From, Req},
618 GS2State = #gs2_state { parent = Parent, debug = Debug }) ->
619 %% gen_server puts Hib on the end as the 7th arg, but that version
620 %% of the fun seems not to be documented so leaving out for now.
621 sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug, GS2State);
622 process_msg({'EXIT', Parent, Reason} = Msg,
623 GS2State = #gs2_state { parent = Parent }) ->
624 terminate(Reason, Msg, GS2State);
625 process_msg(Msg, GS2State = #gs2_state { debug = [] }) ->
626 handle_msg(Msg, GS2State);
627 process_msg(Msg, GS2State = #gs2_state { name = Name, debug = Debug }) ->
628 Debug1 = sys:handle_debug(Debug, fun print_event/3, Name, {in, Msg}),
629 handle_msg(Msg, GS2State #gs2_state { debug = Debug1 }).
631 %%% ---------------------------------------------------
632 %%% Send/recive functions
633 %%% ---------------------------------------------------
634 do_send(Dest, Msg) ->
635 catch erlang:send(Dest, Msg).
637 do_multi_call(Nodes, Name, Req, infinity) ->
639 Monitors = send_nodes(Nodes, Name, Tag, Req),
640 rec_nodes(Tag, Monitors, Name, undefined);
641 do_multi_call(Nodes, Name, Req, Timeout) ->
647 %% Middleman process. Should be unsensitive to regular
648 %% exit signals. The sychronization is needed in case
649 %% the receiver would exit before the caller started
651 process_flag(trap_exit, true),
652 Mref = erlang:monitor(process, Caller),
655 Monitors = send_nodes(Nodes, Name, Tag, Req),
656 TimerId = erlang:start_timer(Timeout, self(), ok),
657 Result = rec_nodes(Tag, Monitors, Name, TimerId),
658 exit({self(),Tag,Result});
659 {'DOWN',Mref,_,_,_} ->
660 %% Caller died before sending us the go-ahead.
665 Mref = erlang:monitor(process, Receiver),
666 Receiver ! {self(),Tag},
668 {'DOWN',Mref,_,_,{Receiver,Tag,Result}} ->
670 {'DOWN',Mref,_,_,Reason} ->
671 %% The middleman code failed. Or someone did
672 %% exit(_, kill) on the middleman process => Reason==killed
676 send_nodes(Nodes, Name, Tag, Req) ->
677 send_nodes(Nodes, Name, Tag, Req, []).
679 send_nodes([Node|Tail], Name, Tag, Req, Monitors)
680 when is_atom(Node) ->
681 Monitor = start_monitor(Node, Name),
682 %% Handle non-existing names in rec_nodes.
683 catch {Name, Node} ! {'$gen_call', {self(), {Tag, Node}}, Req},
684 send_nodes(Tail, Name, Tag, Req, [Monitor | Monitors]);
685 send_nodes([_Node|Tail], Name, Tag, Req, Monitors) ->
686 %% Skip non-atom Node
687 send_nodes(Tail, Name, Tag, Req, Monitors);
688 send_nodes([], _Name, _Tag, _Req, Monitors) ->
691 %% Against old nodes:
692 %% If no reply has been delivered within 2 secs. (per node) check that
693 %% the server really exists and wait for ever for the answer.
695 %% Against contemporary nodes:
696 %% Wait for reply, server 'DOWN', or timeout from TimerId.
698 rec_nodes(Tag, Nodes, Name, TimerId) ->
699 rec_nodes(Tag, Nodes, Name, [], [], 2000, TimerId).
701 rec_nodes(Tag, [{N,R}|Tail], Name, Badnodes, Replies, Time, TimerId ) ->
703 {'DOWN', R, _, _, _} ->
704 rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, Time, TimerId);
705 {{Tag, N}, Reply} -> %% Tag is bound !!!
707 rec_nodes(Tag, Tail, Name, Badnodes,
708 [{N,Reply}|Replies], Time, TimerId);
709 {timeout, TimerId, _} ->
711 %% Collect all replies that already have arrived
712 rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
714 rec_nodes(Tag, [N|Tail], Name, Badnodes, Replies, Time, TimerId) ->
718 monitor_node(N, false),
719 rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, 2000, TimerId);
720 {{Tag, N}, Reply} -> %% Tag is bound !!!
721 receive {nodedown, N} -> ok after 0 -> ok end,
722 monitor_node(N, false),
723 rec_nodes(Tag, Tail, Name, Badnodes,
724 [{N,Reply}|Replies], 2000, TimerId);
725 {timeout, TimerId, _} ->
726 receive {nodedown, N} -> ok after 0 -> ok end,
727 monitor_node(N, false),
728 %% Collect all replies that already have arrived
729 rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies)
731 case rpc:call(N, erlang, whereis, [Name]) of
732 Pid when is_pid(Pid) -> % It exists try again.
733 rec_nodes(Tag, [N|Tail], Name, Badnodes,
734 Replies, infinity, TimerId);
736 receive {nodedown, N} -> ok after 0 -> ok end,
737 monitor_node(N, false),
738 rec_nodes(Tag, Tail, Name, [N|Badnodes],
739 Replies, 2000, TimerId)
742 rec_nodes(_, [], _, Badnodes, Replies, _, TimerId) ->
743 case catch erlang:cancel_timer(TimerId) of
744 false -> % It has already sent it's message
746 {timeout, TimerId, _} -> ok
750 _ -> % Timer was cancelled, or TimerId was 'undefined'
755 %% Collect all replies that already have arrived
756 rec_nodes_rest(Tag, [{N,R}|Tail], Name, Badnodes, Replies) ->
758 {'DOWN', R, _, _, _} ->
759 rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies);
760 {{Tag, N}, Reply} -> %% Tag is bound !!!
762 rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies])
765 rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
767 rec_nodes_rest(Tag, [N|Tail], Name, Badnodes, Replies) ->
771 monitor_node(N, false),
772 rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies);
773 {{Tag, N}, Reply} -> %% Tag is bound !!!
774 receive {nodedown, N} -> ok after 0 -> ok end,
775 monitor_node(N, false),
776 rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies])
778 receive {nodedown, N} -> ok after 0 -> ok end,
779 monitor_node(N, false),
780 rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
782 rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) ->
786 %%% ---------------------------------------------------
787 %%% Monitor functions
788 %%% ---------------------------------------------------
790 start_monitor(Node, Name) when is_atom(Node), is_atom(Name) ->
791 if node() =:= nonode@nohost, Node =/= nonode@nohost ->
793 self() ! {'DOWN', Ref, process, {Name, Node}, noconnection},
796 case catch erlang:monitor(process, {Name, Node}) of
799 monitor_node(Node, true),
801 Ref when is_reference(Ref) ->
806 %% Cancels a monitor started with Ref=erlang:monitor(_, _).
807 unmonitor(Ref) when is_reference(Ref) ->
808 erlang:demonitor(Ref),
810 {'DOWN', Ref, _, _, _} ->
816 %%% ---------------------------------------------------
817 %%% Message handling functions
818 %%% ---------------------------------------------------
820 dispatch({'$gen_cast', Msg}, Mod, State) ->
821 Mod:handle_cast(Msg, State);
822 dispatch(Info, Mod, State) ->
823 Mod:handle_info(Info, State).
825 common_reply(_Name, From, Reply, _NState, [] = _Debug) ->
828 common_reply(Name, From, Reply, NState, Debug) ->
829 reply(Name, From, Reply, NState, Debug).
831 common_debug([] = _Debug, _Func, _Info, _Event) ->
833 common_debug(Debug, Func, Info, Event) ->
834 sys:handle_debug(Debug, Func, Info, Event).
836 handle_msg({'$gen_call', From, Msg}, GS2State = #gs2_state { mod = Mod,
840 case catch Mod:handle_call(Msg, From, State) of
841 {reply, Reply, NState} ->
842 Debug1 = common_reply(Name, From, Reply, NState, Debug),
843 loop(GS2State #gs2_state { state = NState,
846 {reply, Reply, NState, Time1} ->
847 Debug1 = common_reply(Name, From, Reply, NState, Debug),
848 loop(GS2State #gs2_state { state = NState,
852 Debug1 = common_debug(Debug, fun print_event/3, Name,
854 loop(GS2State #gs2_state {state = NState,
857 {noreply, NState, Time1} ->
858 Debug1 = common_debug(Debug, fun print_event/3, Name,
860 loop(GS2State #gs2_state {state = NState,
863 {stop, Reason, Reply, NState} ->
865 (catch terminate(Reason, Msg,
866 GS2State #gs2_state { state = NState })),
867 reply(Name, From, Reply, NState, Debug),
870 handle_common_reply(Other, Msg, GS2State)
872 handle_msg(Msg, GS2State = #gs2_state { mod = Mod, state = State }) ->
873 Reply = (catch dispatch(Msg, Mod, State)),
874 handle_common_reply(Reply, Msg, GS2State).
876 handle_common_reply(Reply, Msg, GS2State = #gs2_state { name = Name,
880 Debug1 = common_debug(Debug, fun print_event/3, Name,
882 loop(GS2State #gs2_state { state = NState,
885 {noreply, NState, Time1} ->
886 Debug1 = common_debug(Debug, fun print_event/3, Name,
888 loop(GS2State #gs2_state { state = NState,
891 {become, Mod, NState} ->
892 Debug1 = common_debug(Debug, fun print_event/3, Name,
893 {become, Mod, NState}),
894 loop(find_prioritisers(
895 GS2State #gs2_state { mod = Mod,
899 {become, Mod, NState, Time1} ->
900 Debug1 = common_debug(Debug, fun print_event/3, Name,
901 {become, Mod, NState}),
902 loop(find_prioritisers(
903 GS2State #gs2_state { mod = Mod,
908 handle_common_termination(Reply, Msg, GS2State)
911 handle_common_termination(Reply, Msg, GS2State) ->
913 {stop, Reason, NState} ->
914 terminate(Reason, Msg, GS2State #gs2_state { state = NState });
916 terminate(What, Msg, GS2State);
918 terminate({bad_return_value, Reply}, Msg, GS2State)
921 reply(Name, {To, Tag}, Reply, State, Debug) ->
922 reply({To, Tag}, Reply),
924 Debug, fun print_event/3, Name, {out, Reply, To, State}).
927 %%-----------------------------------------------------------------
928 %% Callback functions for system messages handling.
929 %%-----------------------------------------------------------------
930 system_continue(Parent, Debug, GS2State) ->
931 loop(GS2State #gs2_state { parent = Parent, debug = Debug }).
933 system_terminate(Reason, _Parent, Debug, GS2State) ->
934 terminate(Reason, [], GS2State #gs2_state { debug = Debug }).
936 system_code_change(GS2State = #gs2_state { mod = Mod,
938 _Module, OldVsn, Extra) ->
939 case catch Mod:code_change(OldVsn, State, Extra) of
941 NewGS2State = find_prioritisers(
942 GS2State #gs2_state { state = NewState }),
948 %%-----------------------------------------------------------------
949 %% Format debug messages. Print them as the call-back module sees
950 %% them, not as the real erlang messages. Use trace for that.
951 %%-----------------------------------------------------------------
952 print_event(Dev, {in, Msg}, Name) ->
954 {'$gen_call', {From, _Tag}, Call} ->
955 io:format(Dev, "*DBG* ~p got call ~p from ~w~n",
957 {'$gen_cast', Cast} ->
958 io:format(Dev, "*DBG* ~p got cast ~p~n",
961 io:format(Dev, "*DBG* ~p got ~p~n", [Name, Msg])
963 print_event(Dev, {out, Msg, To, State}, Name) ->
964 io:format(Dev, "*DBG* ~p sent ~p to ~w, new state ~w~n",
965 [Name, Msg, To, State]);
966 print_event(Dev, {noreply, State}, Name) ->
967 io:format(Dev, "*DBG* ~p new state ~w~n", [Name, State]);
968 print_event(Dev, Event, Name) ->
969 io:format(Dev, "*DBG* ~p dbg ~p~n", [Name, Event]).
972 %%% ---------------------------------------------------
973 %%% Terminate the server.
974 %%% ---------------------------------------------------
976 terminate(Reason, Msg, #gs2_state { name = Name,
980 case catch Mod:terminate(Reason, State) of
982 error_info(R, Reason, Name, Msg, State, Debug),
990 {shutdown,_}=Shutdown ->
993 error_info(Reason, undefined, Name, Msg, State, Debug),
998 error_info(_Reason, _RootCause, application_controller, _Msg, _State, _Debug) ->
999 %% OTP-5811 Don't send an error report if it's the system process
1000 %% application_controller which is terminating - let init take care
1003 error_info(Reason, RootCause, Name, Msg, State, Debug) ->
1004 Reason1 = error_reason(Reason),
1006 "** Generic server ~p terminating~n"
1007 "** Last message in was ~p~n"
1008 "** When Server state == ~p~n"
1009 "** Reason for termination == ~n** ~p~n",
1011 undefined -> format(Fmt, [Name, Msg, State, Reason1]);
1012 _ -> format(Fmt ++ "** In 'terminate' callback "
1013 "with reason ==~n** ~p~n",
1014 [Name, Msg, State, Reason1,
1015 error_reason(RootCause)])
1017 sys:print_log(Debug),
1020 error_reason({undef,[{M,F,A}|MFAs]} = Reason) ->
1021 case code:is_loaded(M) of
1022 false -> {'module could not be loaded',[{M,F,A}|MFAs]};
1023 _ -> case erlang:function_exported(M, F, length(A)) of
1025 false -> {'function not exported',[{M,F,A}|MFAs]}
1028 error_reason(Reason) ->
1031 %%% ---------------------------------------------------
1032 %%% Misc. functions.
1033 %%% ---------------------------------------------------
1035 opt(Op, [{Op, Value}|_]) ->
1037 opt(Op, [_|Options]) ->
1042 debug_options(Name, Opts) ->
1043 case opt(debug, Opts) of
1044 {ok, Options} -> dbg_options(Name, Options);
1045 _ -> dbg_options(Name, [])
1048 dbg_options(Name, []) ->
1050 case init:get_argument(generic_debug) of
1056 dbg_opts(Name, Opts);
1057 dbg_options(Name, Opts) ->
1058 dbg_opts(Name, Opts).
1060 dbg_opts(Name, Opts) ->
1061 case catch sys:debug_options(Opts) of
1063 format("~p: ignoring erroneous debug options - ~p~n",
1070 get_proc_name(Pid) when is_pid(Pid) ->
1072 get_proc_name({local, Name}) ->
1073 case process_info(self(), registered_name) of
1074 {registered_name, Name} ->
1076 {registered_name, _Name} ->
1077 exit(process_not_registered);
1079 exit(process_not_registered)
1081 get_proc_name({global, Name}) ->
1082 case whereis_name(Name) of
1084 exit(process_not_registered_globally);
1085 Pid when Pid =:= self() ->
1088 exit(process_not_registered_globally)
1092 case get('$ancestors') of
1093 [Parent | _] when is_pid(Parent)->
1095 [Parent | _] when is_atom(Parent)->
1096 name_to_pid(Parent);
1098 exit(process_was_not_started_by_proc_lib)
1101 name_to_pid(Name) ->
1102 case whereis(Name) of
1104 case whereis_name(Name) of
1106 exit(could_not_find_registerd_name);
1114 whereis_name(Name) ->
1115 case ets:lookup(global_names, Name) of
1116 [{_Name, Pid, _Method, _RPid, _Ref}] ->
1117 if node(Pid) == node() ->
1118 case is_process_alive(Pid) of
1128 find_prioritisers(GS2State = #gs2_state { mod = Mod }) ->
1129 PrioriCall = function_exported_or_default(
1130 Mod, 'prioritise_call', 3,
1131 fun (_Msg, _From, _State) -> 0 end),
1132 PrioriCast = function_exported_or_default(Mod, 'prioritise_cast', 2,
1133 fun (_Msg, _State) -> 0 end),
1134 PrioriInfo = function_exported_or_default(Mod, 'prioritise_info', 2,
1135 fun (_Msg, _State) -> 0 end),
1136 GS2State #gs2_state { prioritise_call = PrioriCall,
1137 prioritise_cast = PrioriCast,
1138 prioritise_info = PrioriInfo }.
1140 function_exported_or_default(Mod, Fun, Arity, Default) ->
1141 case erlang:function_exported(Mod, Fun, Arity) of
1142 true -> case Arity of
1143 2 -> fun (Msg, GS2State = #gs2_state { state = State }) ->
1144 case catch Mod:Fun(Msg, State) of
1145 Res when is_integer(Res) ->
1148 handle_common_termination(Err, Msg, GS2State)
1151 3 -> fun (Msg, From, GS2State = #gs2_state { state = State }) ->
1152 case catch Mod:Fun(Msg, From, State) of
1153 Res when is_integer(Res) ->
1156 handle_common_termination(Err, Msg, GS2State)
1163 %%-----------------------------------------------------------------
1164 %% Status information
1165 %%-----------------------------------------------------------------
1166 format_status(Opt, StatusData) ->
1167 [PDict, SysState, Parent, Debug,
1168 #gs2_state{name = Name, state = State, mod = Mod, queue = Queue}] =
1170 NameTag = if is_pid(Name) ->
1175 Header = lists:concat(["Status for generic server ", NameTag]),
1176 Log = sys:get_debug(log, Debug, []),
1177 Specfic = callback(Mod, format_status, [Opt, [PDict, State]],
1178 fun () -> [{data, [{"State", State}]}] end),
1179 Messages = callback(Mod, format_message_queue, [Opt, Queue],
1180 fun () -> priority_queue:to_list(Queue) end),
1182 {data, [{"Status", SysState},
1184 {"Logged events", Log},
1185 {"Queued messages", Messages}]} |
1188 callback(Mod, FunName, Args, DefaultThunk) ->
1189 case erlang:function_exported(Mod, FunName, length(Args)) of
1190 true -> case catch apply(Mod, FunName, Args) of
1191 {'EXIT', _} -> DefaultThunk();
1194 false -> DefaultThunk()