1 %% This file is a copy of gen_server.erl from the R13B-1 Erlang/OTP
2 %% distribution, with the following modifications:
4 %% 1) the module name is gen_server2
6 %% 2) more efficient handling of selective receives in callbacks
7 %% gen_server2 processes drain their message queue into an internal
8 %% buffer before invoking any callback module functions. Messages are
9 %% dequeued from the buffer for processing. Thus the effective message
10 %% queue of a gen_server2 process is the concatenation of the internal
11 %% buffer and the real message queue.
12 %% As a result of the draining, any selective receive invoked inside a
13 %% callback is less likely to have to scan a large message queue.
15 %% 3) gen_server2:cast is guaranteed to be order-preserving
16 %% The original code could reorder messages when communicating with a
17 %% process on a remote node that was not currently connected.
19 %% 4) The callback module can optionally implement prioritise_call/4,
20 %% prioritise_cast/3 and prioritise_info/3. These functions take
21 %% Message, From, Length and State or just Message, Length and State
22 %% (where Length is the current number of messages waiting to be
23 %% processed) and return a single integer representing the priority
24 %% attached to the message, or 'drop' to ignore it (for
25 %% prioritise_cast/3 and prioritise_info/3 only). Messages with
26 %% higher priorities are processed before requests with lower
27 %% priorities. The default priority is 0.
29 %% 5) The callback module can optionally implement
30 %% handle_pre_hibernate/1 and handle_post_hibernate/1. These will be
31 %% called immediately prior to and post hibernation, respectively. If
32 %% handle_pre_hibernate returns {hibernate, NewState} then the process
33 %% will hibernate. If the module does not implement
34 %% handle_pre_hibernate/1 then the default action is to hibernate.
36 %% 6) init can return a 4th arg, {backoff, InitialTimeout,
37 %% MinimumTimeout, DesiredHibernatePeriod} (all in milliseconds,
38 %% 'infinity' does not make sense here). Then, on all callbacks which
39 %% can return a timeout (including init), timeout can be
40 %% 'hibernate'. When this is the case, the current timeout value will
41 %% be used (initially, the InitialTimeout supplied from init). After
42 %% this timeout has occurred, hibernation will occur as normal. Upon
43 %% awaking, a new current timeout value will be calculated.
45 %% The purpose is that the gen_server2 takes care of adjusting the
46 %% current timeout value such that the process will increase the
47 %% timeout value repeatedly if it is unable to sleep for the
48 %% DesiredHibernatePeriod. If it is able to sleep for the
49 %% DesiredHibernatePeriod it will decrease the current timeout down to
50 %% the MinimumTimeout, so that the process is put to sleep sooner (and
51 %% hopefully stays asleep for longer). In short, should a process
52 %% using this receive a burst of messages, it should not hibernate
53 %% between those messages, but as the messages become less frequent,
54 %% the process will not only hibernate, it will do so sooner after
57 %% When using this backoff mechanism, normal timeout values (i.e. not
58 %% 'hibernate') can still be used, and if they are used then the
59 %% handle_info(timeout, State) will be called as normal. In this case,
60 %% returning 'hibernate' from handle_info(timeout, State) will not
61 %% hibernate the process immediately, as it would if backoff wasn't
62 %% being used. Instead it'll wait for the current timeout as described
65 %% 7) The callback module can return from any of the handle_*
66 %% functions, a {become, Module, State} triple, or a {become, Module,
67 %% State, Timeout} quadruple. This allows the gen_server to
68 %% dynamically change the callback module. The State is the new state
69 %% which will be passed into any of the callback functions in the new
70 %% module. Note there is no form also encompassing a reply, thus if
71 %% you wish to reply in handle_call/3 and change the callback module,
72 %% you need to use gen_server2:reply/2 to issue the reply manually.
74 %% 8) The callback module can optionally implement
75 %% format_message_queue/2 which is the equivalent of format_status/2
76 %% but where the second argument is specifically the priority_queue
77 %% which contains the prioritised message_queue.
79 %% 9) The function with_state/2 can be used to debug a process with
80 %% heavyweight state (without needing to copy the entire state out of
81 %% process as sys:get_status/1 would). Pass through a function which
82 %% can be invoked on the state, get back the result. The state is not
85 %% All modifications are (C) 2009-2013 VMware, Inc.
87 %% ``The contents of this file are subject to the Erlang Public License,
88 %% Version 1.1, (the "License"); you may not use this file except in
89 %% compliance with the License. You should have received a copy of the
90 %% Erlang Public License along with this software. If not, it can be
91 %% retrieved via the world wide web at http://www.erlang.org/.
93 %% Software distributed under the License is distributed on an "AS IS"
94 %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
95 %% the License for the specific language governing rights and limitations
98 %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
99 %% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
100 %% AB. All Rights Reserved.''
104 -module(gen_server2).
106 %%% ---------------------------------------------------
108 %%% The idea behind THIS server is that the user module
109 %%% provides (different) functions to handle different
111 %%% If the Parent process terminates the Module:terminate/2
112 %%% function is called.
114 %%% The user module should export:
118 %%% {ok, State, Timeout}
119 %%% {ok, State, Timeout, Backoff}
123 %%% handle_call(Msg, {From, Tag}, State)
125 %%% ==> {reply, Reply, State}
126 %%% {reply, Reply, State, Timeout}
128 %%% {noreply, State, Timeout}
129 %%% {stop, Reason, Reply, State}
130 %%% Reason = normal | shutdown | Term terminate(State) is called
132 %%% handle_cast(Msg, State)
134 %%% ==> {noreply, State}
135 %%% {noreply, State, Timeout}
136 %%% {stop, Reason, State}
137 %%% Reason = normal | shutdown | Term terminate(State) is called
139 %%% handle_info(Info, State) Info is e.g. {'EXIT', P, R}, {nodedown, N}, ...
141 %%% ==> {noreply, State}
142 %%% {noreply, State, Timeout}
143 %%% {stop, Reason, State}
144 %%% Reason = normal | shutdown | Term, terminate(State) is called
146 %%% terminate(Reason, State) Let the user module clean up
147 %%% Reason = normal | shutdown | {shutdown, Term} | Term
148 %%% always called when server terminates
152 %%% handle_pre_hibernate(State)
154 %%% ==> {hibernate, State}
155 %%% {stop, Reason, State}
156 %%% Reason = normal | shutdown | Term, terminate(State) is called
158 %%% handle_post_hibernate(State)
160 %%% ==> {noreply, State}
161 %%% {stop, Reason, State}
162 %%% Reason = normal | shutdown | Term, terminate(State) is called
164 %%% The work flow (of the server) can be described as follows:
166 %%% User module Generic
167 %%% ----------- -------
168 %%% start -----> start
172 %%% handle_call <----- .
175 %%% handle_cast <----- .
177 %%% handle_info <----- .
179 %%% terminate <----- .
184 %%% ---------------------------------------------------
187 -export([start/3, start/4,
188 start_link/3, start_link/4,
192 multi_call/2, multi_call/3, multi_call/4,
194 enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/1]).
197 -export([system_continue/3,
199 system_code_change/4,
203 -export([init_it/6]).
205 -import(error_logger, [format/2]).
208 -record(gs2_state, {parent, name, state, mod, time,
209 timeout_state, queue, debug, prioritisers}).
213 %%%=========================================================================
214 %%% Specs. These exist only to shut up dialyzer's warnings
215 %%%=========================================================================
217 -type(gs2_state() :: #gs2_state{}).
219 -spec(handle_common_termination/3 ::
220 (any(), atom(), gs2_state()) -> no_return()).
221 -spec(hibernate/1 :: (gs2_state()) -> no_return()).
222 -spec(pre_hibernate/1 :: (gs2_state()) -> no_return()).
223 -spec(system_terminate/4 :: (_, _, _, gs2_state()) -> no_return()).
225 -type(millis() :: non_neg_integer()).
227 %%%=========================================================================
229 %%%=========================================================================
231 -callback init(Args :: term()) ->
232 {ok, State :: term()} |
233 {ok, State :: term(), timeout() | hibernate} |
234 {ok, State :: term(), timeout() | hibernate,
235 {backoff, millis(), millis(), millis()}} |
237 {stop, Reason :: term()}.
238 -callback handle_call(Request :: term(), From :: {pid(), Tag :: term()},
240 {reply, Reply :: term(), NewState :: term()} |
241 {reply, Reply :: term(), NewState :: term(), timeout() | hibernate} |
242 {noreply, NewState :: term()} |
243 {noreply, NewState :: term(), timeout() | hibernate} |
244 {stop, Reason :: term(),
245 Reply :: term(), NewState :: term()}.
246 -callback handle_cast(Request :: term(), State :: term()) ->
247 {noreply, NewState :: term()} |
248 {noreply, NewState :: term(), timeout() | hibernate} |
249 {stop, Reason :: term(), NewState :: term()}.
250 -callback handle_info(Info :: term(), State :: term()) ->
251 {noreply, NewState :: term()} |
252 {noreply, NewState :: term(), timeout() | hibernate} |
253 {stop, Reason :: term(), NewState :: term()}.
254 -callback terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
257 -callback code_change(OldVsn :: (term() | {down, term()}), State :: term(),
259 {ok, NewState :: term()} | {error, Reason :: term()}.
261 %% It's not possible to define "optional" -callbacks, so putting specs
262 %% for handle_pre_hibernate/1 and handle_post_hibernate/1 will result
263 %% in warnings (the same applied for the behaviour_info before).
267 -export([behaviour_info/1]).
269 behaviour_info(callbacks) ->
270 [{init,1},{handle_call,3},{handle_cast,2},{handle_info,2},
271 {terminate,2},{code_change,3}];
272 behaviour_info(_Other) ->
277 %%% -----------------------------------------------------------------
278 %%% Starts a generic server.
279 %%% start(Mod, Args, Options)
280 %%% start(Name, Mod, Args, Options)
281 %%% start_link(Mod, Args, Options)
282 %%% start_link(Name, Mod, Args, Options) where:
283 %%% Name ::= {local, atom()} | {global, atom()}
284 %%% Mod ::= atom(), callback module implementing the 'real' server
285 %%% Args ::= term(), init arguments (to Mod:init/1)
286 %%% Options ::= [{timeout, Timeout} | {debug, [Flag]}]
287 %%% Flag ::= trace | log | {logfile, File} | statistics | debug
288 %%% (debug == log && statistics)
289 %%% Returns: {ok, Pid} |
290 %%% {error, {already_started, Pid}} |
292 %%% -----------------------------------------------------------------
293 start(Mod, Args, Options) ->
294 gen:start(?MODULE, nolink, Mod, Args, Options).
296 start(Name, Mod, Args, Options) ->
297 gen:start(?MODULE, nolink, Name, Mod, Args, Options).
299 start_link(Mod, Args, Options) ->
300 gen:start(?MODULE, link, Mod, Args, Options).
302 start_link(Name, Mod, Args, Options) ->
303 gen:start(?MODULE, link, Name, Mod, Args, Options).
306 %% -----------------------------------------------------------------
307 %% Make a call to a generic server.
308 %% If the server is located at another node, that node will
310 %% If the client is trapping exits and is linked server termination
311 %% is handled here (? Shall we do that here (or rely on timeouts) ?).
312 %% -----------------------------------------------------------------
313 call(Name, Request) ->
314 case catch gen:call(Name, '$gen_call', Request) of
318 exit({Reason, {?MODULE, call, [Name, Request]}})
321 call(Name, Request, Timeout) ->
322 case catch gen:call(Name, '$gen_call', Request, Timeout) of
326 exit({Reason, {?MODULE, call, [Name, Request, Timeout]}})
329 %% -----------------------------------------------------------------
330 %% Make a cast to a generic server.
331 %% -----------------------------------------------------------------
332 cast({global,Name}, Request) ->
333 catch global:send(Name, cast_msg(Request)),
335 cast({Name,Node}=Dest, Request) when is_atom(Name), is_atom(Node) ->
336 do_cast(Dest, Request);
337 cast(Dest, Request) when is_atom(Dest) ->
338 do_cast(Dest, Request);
339 cast(Dest, Request) when is_pid(Dest) ->
340 do_cast(Dest, Request).
342 do_cast(Dest, Request) ->
343 do_send(Dest, cast_msg(Request)),
346 cast_msg(Request) -> {'$gen_cast',Request}.
348 %% -----------------------------------------------------------------
349 %% Send a reply to the client.
350 %% -----------------------------------------------------------------
351 reply({To, Tag}, Reply) ->
352 catch To ! {Tag, Reply}.
354 %% -----------------------------------------------------------------
355 %% Asyncronous broadcast, returns nothing, it's just send'n pray
356 %% -----------------------------------------------------------------
357 abcast(Name, Request) when is_atom(Name) ->
358 do_abcast([node() | nodes()], Name, cast_msg(Request)).
360 abcast(Nodes, Name, Request) when is_list(Nodes), is_atom(Name) ->
361 do_abcast(Nodes, Name, cast_msg(Request)).
363 do_abcast([Node|Nodes], Name, Msg) when is_atom(Node) ->
364 do_send({Name,Node},Msg),
365 do_abcast(Nodes, Name, Msg);
366 do_abcast([], _,_) -> abcast.
368 %%% -----------------------------------------------------------------
369 %%% Make a call to servers at several nodes.
370 %%% Returns: {[Replies],[BadNodes]}
371 %%% A Timeout can be given
373 %%% A middleman process is used in case late answers arrives after
374 %%% the timeout. If they would be allowed to glog the callers message
375 %%% queue, it would probably become confused. Late answers will
376 %%% now arrive to the terminated middleman and so be discarded.
377 %%% -----------------------------------------------------------------
378 multi_call(Name, Req)
379 when is_atom(Name) ->
380 do_multi_call([node() | nodes()], Name, Req, infinity).
382 multi_call(Nodes, Name, Req)
383 when is_list(Nodes), is_atom(Name) ->
384 do_multi_call(Nodes, Name, Req, infinity).
386 multi_call(Nodes, Name, Req, infinity) ->
387 do_multi_call(Nodes, Name, Req, infinity);
388 multi_call(Nodes, Name, Req, Timeout)
389 when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 ->
390 do_multi_call(Nodes, Name, Req, Timeout).
392 %% -----------------------------------------------------------------
393 %% Apply a function to a generic server's state.
394 %% -----------------------------------------------------------------
395 with_state(Name, Fun) ->
396 case catch gen:call(Name, '$with_state', Fun, infinity) of
400 exit({Reason, {?MODULE, with_state, [Name, Fun]}})
403 %%-----------------------------------------------------------------
404 %% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>, <Backoff>) ->_
406 %% Description: Makes an existing process into a gen_server.
407 %% The calling process will enter the gen_server receive
408 %% loop and become a gen_server process.
409 %% The process *must* have been started using one of the
410 %% start functions in proc_lib, see proc_lib(3).
411 %% The user is responsible for any initialization of the
412 %% process, including registering a name for it.
413 %%-----------------------------------------------------------------
414 enter_loop(Mod, Options, State) ->
415 enter_loop(Mod, Options, State, self(), infinity, undefined).
417 enter_loop(Mod, Options, State, Backoff = {backoff, _, _ , _}) ->
418 enter_loop(Mod, Options, State, self(), infinity, Backoff);
420 enter_loop(Mod, Options, State, ServerName = {_, _}) ->
421 enter_loop(Mod, Options, State, ServerName, infinity, undefined);
423 enter_loop(Mod, Options, State, Timeout) ->
424 enter_loop(Mod, Options, State, self(), Timeout, undefined).
426 enter_loop(Mod, Options, State, ServerName, Backoff = {backoff, _, _, _}) ->
427 enter_loop(Mod, Options, State, ServerName, infinity, Backoff);
429 enter_loop(Mod, Options, State, ServerName, Timeout) ->
430 enter_loop(Mod, Options, State, ServerName, Timeout, undefined).
432 enter_loop(Mod, Options, State, ServerName, Timeout, Backoff) ->
433 Name = get_proc_name(ServerName),
434 Parent = get_parent(),
435 Debug = debug_options(Name, Options),
436 Queue = priority_queue:new(),
437 Backoff1 = extend_backoff(Backoff),
438 loop(find_prioritisers(
439 #gs2_state { parent = Parent, name = Name, state = State,
440 mod = Mod, time = Timeout, timeout_state = Backoff1,
441 queue = Queue, debug = Debug })).
443 %%%========================================================================
444 %%% Gen-callback functions
445 %%%========================================================================
447 %%% ---------------------------------------------------
448 %%% Initiate the new process.
449 %%% Register the name using the Rfunc function
450 %%% Calls the Mod:init/Args function.
451 %%% Finally an acknowledge is sent to Parent and the main
453 %%% ---------------------------------------------------
454 init_it(Starter, self, Name, Mod, Args, Options) ->
455 init_it(Starter, self(), Name, Mod, Args, Options);
456 init_it(Starter, Parent, Name0, Mod, Args, Options) ->
458 Debug = debug_options(Name, Options),
459 Queue = priority_queue:new(),
460 GS2State = find_prioritisers(
461 #gs2_state { parent = Parent,
466 case catch Mod:init(Args) of
468 proc_lib:init_ack(Starter, {ok, self()}),
469 loop(GS2State #gs2_state { state = State,
471 timeout_state = undefined });
472 {ok, State, Timeout} ->
473 proc_lib:init_ack(Starter, {ok, self()}),
474 loop(GS2State #gs2_state { state = State,
476 timeout_state = undefined });
477 {ok, State, Timeout, Backoff = {backoff, _, _, _}} ->
478 Backoff1 = extend_backoff(Backoff),
479 proc_lib:init_ack(Starter, {ok, self()}),
480 loop(GS2State #gs2_state { state = State,
482 timeout_state = Backoff1 });
484 %% For consistency, we must make sure that the
485 %% registered name (if any) is unregistered before
486 %% the parent process is notified about the failure.
487 %% (Otherwise, the parent process could get
488 %% an 'already_started' error if it immediately
489 %% tried starting the process again.)
490 unregister_name(Name0),
491 proc_lib:init_ack(Starter, {error, Reason}),
494 unregister_name(Name0),
495 proc_lib:init_ack(Starter, ignore),
498 unregister_name(Name0),
499 proc_lib:init_ack(Starter, {error, Reason}),
502 Error = {bad_return_value, Else},
503 proc_lib:init_ack(Starter, {error, Error}),
507 name({local,Name}) -> Name;
508 name({global,Name}) -> Name;
509 %% name(Pid) when is_pid(Pid) -> Pid;
510 %% when R12 goes away, drop the line beneath and uncomment the line above
513 unregister_name({local,Name}) ->
514 _ = (catch unregister(Name));
515 unregister_name({global,Name}) ->
516 _ = global:unregister_name(Name);
517 unregister_name(Pid) when is_pid(Pid) ->
519 %% Under R12 let's just ignore it, as we have a single term as Name.
520 %% On R13 it will never get here, as we get tuple with 'local/global' atom.
521 unregister_name(_Name) -> ok.
523 extend_backoff(undefined) ->
525 extend_backoff({backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod}) ->
526 {backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod, now()}.
528 %%%========================================================================
529 %%% Internal functions
530 %%%========================================================================
531 %%% ---------------------------------------------------
533 %%% ---------------------------------------------------
534 loop(GS2State = #gs2_state { time = hibernate,
535 timeout_state = undefined }) ->
536 pre_hibernate(GS2State);
538 process_next_msg(drain(GS2State)).
542 Input -> drain(in(Input, GS2State))
546 process_next_msg(GS2State = #gs2_state { time = Time,
547 timeout_state = TimeoutState,
549 case priority_queue:out(Queue) of
550 {{value, Msg}, Queue1} ->
551 process_msg(Msg, GS2State #gs2_state { queue = Queue1 });
553 {Time1, HibOnTimeout}
554 = case {Time, TimeoutState} of
555 {hibernate, {backoff, Current, _Min, _Desired, _RSt}} ->
558 %% wake_hib/7 will set Time to hibernate. If
559 %% we were woken and didn't receive a msg
560 %% then we will get here and need a sensible
561 %% value for Time1, otherwise we crash.
562 %% R13B1 always waits infinitely when waking
563 %% from hibernation, so that's what we do
570 %% Time could be 'hibernate' here, so *don't* call loop
572 drain(in(Input, GS2State #gs2_state { queue = Queue1 })))
577 GS2State #gs2_state { queue = Queue1 });
580 GS2State #gs2_state { queue = Queue1 })
585 wake_hib(GS2State = #gs2_state { timeout_state = TS }) ->
586 TimeoutState1 = case TS of
589 {SleptAt, TimeoutState} ->
590 adjust_timeout_state(SleptAt, now(), TimeoutState)
593 drain(GS2State #gs2_state { timeout_state = TimeoutState1 })).
595 hibernate(GS2State = #gs2_state { timeout_state = TimeoutState }) ->
596 TS = case TimeoutState of
597 undefined -> undefined;
598 {backoff, _, _, _, _} -> {now(), TimeoutState}
600 proc_lib:hibernate(?MODULE, wake_hib,
601 [GS2State #gs2_state { timeout_state = TS }]).
603 pre_hibernate(GS2State = #gs2_state { state = State,
605 case erlang:function_exported(Mod, handle_pre_hibernate, 1) of
607 case catch Mod:handle_pre_hibernate(State) of
608 {hibernate, NState} ->
609 hibernate(GS2State #gs2_state { state = NState } );
611 handle_common_termination(Reply, pre_hibernate, GS2State)
617 post_hibernate(GS2State = #gs2_state { state = State,
619 case erlang:function_exported(Mod, handle_post_hibernate, 1) of
621 case catch Mod:handle_post_hibernate(State) of
623 process_next_msg(GS2State #gs2_state { state = NState,
625 {noreply, NState, Time} ->
626 process_next_msg(GS2State #gs2_state { state = NState,
629 handle_common_termination(Reply, post_hibernate, GS2State)
632 %% use hibernate here, not infinity. This matches
633 %% R13B. The key is that we should be able to get through
634 %% to process_msg calling sys:handle_system_msg with Time
635 %% still set to hibernate, iff that msg is the very msg
636 %% that woke us up (or the first msg we receive after
638 process_next_msg(GS2State #gs2_state { time = hibernate })
641 adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO,
642 DesiredHibPeriod, RandomState}) ->
643 NapLengthMicros = timer:now_diff(AwokeAt, SleptAt),
644 CurrentMicros = CurrentTO * 1000,
645 MinimumMicros = MinimumTO * 1000,
646 DesiredHibMicros = DesiredHibPeriod * 1000,
647 GapBetweenMessagesMicros = NapLengthMicros + CurrentMicros,
649 %% If enough time has passed between the last two messages then we
650 %% should consider sleeping sooner. Otherwise stay awake longer.
651 case GapBetweenMessagesMicros > (MinimumMicros + DesiredHibMicros) of
652 true -> lists:max([MinimumTO, CurrentTO div 2]);
655 {Extra, RandomState1} = random:uniform_s(Base, RandomState),
656 CurrentTO1 = Base + Extra,
657 {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, RandomState1}.
659 in({'$gen_cast', Msg} = Input,
660 GS2State = #gs2_state { prioritisers = {_, F, _} }) ->
661 in(Input, F(Msg, GS2State), GS2State);
662 in({'$gen_call', From, Msg} = Input,
663 GS2State = #gs2_state { prioritisers = {F, _, _} }) ->
664 in(Input, F(Msg, From, GS2State), GS2State);
665 in({'$with_state', _From, _Fun} = Input, GS2State) ->
666 in(Input, 0, GS2State);
667 in({'EXIT', Parent, _R} = Input, GS2State = #gs2_state { parent = Parent }) ->
668 in(Input, infinity, GS2State);
669 in({system, _From, _Req} = Input, GS2State) ->
670 in(Input, infinity, GS2State);
671 in(Input, GS2State = #gs2_state { prioritisers = {_, _, F} }) ->
672 in(Input, F(Input, GS2State), GS2State).
674 in(_Input, drop, GS2State) ->
677 in(Input, Priority, GS2State = #gs2_state { queue = Queue }) ->
678 GS2State # gs2_state { queue = priority_queue:in(Input, Priority, Queue) }.
680 process_msg({system, From, Req},
681 GS2State = #gs2_state { parent = Parent, debug = Debug }) ->
682 %% gen_server puts Hib on the end as the 7th arg, but that version
683 %% of the fun seems not to be documented so leaving out for now.
684 sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug, GS2State);
685 process_msg({'$with_state', From, Fun},
686 GS2State = #gs2_state{state = State}) ->
687 reply(From, catch Fun(State)),
689 process_msg({'EXIT', Parent, Reason} = Msg,
690 GS2State = #gs2_state { parent = Parent }) ->
691 terminate(Reason, Msg, GS2State);
692 process_msg(Msg, GS2State = #gs2_state { debug = [] }) ->
693 handle_msg(Msg, GS2State);
694 process_msg(Msg, GS2State = #gs2_state { name = Name, debug = Debug }) ->
695 Debug1 = sys:handle_debug(Debug, fun print_event/3, Name, {in, Msg}),
696 handle_msg(Msg, GS2State #gs2_state { debug = Debug1 }).
698 %%% ---------------------------------------------------
699 %%% Send/recive functions
700 %%% ---------------------------------------------------
701 do_send(Dest, Msg) ->
702 catch erlang:send(Dest, Msg).
704 do_multi_call(Nodes, Name, Req, infinity) ->
706 Monitors = send_nodes(Nodes, Name, Tag, Req),
707 rec_nodes(Tag, Monitors, Name, undefined);
708 do_multi_call(Nodes, Name, Req, Timeout) ->
714 %% Middleman process. Should be unsensitive to regular
715 %% exit signals. The sychronization is needed in case
716 %% the receiver would exit before the caller started
718 process_flag(trap_exit, true),
719 Mref = erlang:monitor(process, Caller),
722 Monitors = send_nodes(Nodes, Name, Tag, Req),
723 TimerId = erlang:start_timer(Timeout, self(), ok),
724 Result = rec_nodes(Tag, Monitors, Name, TimerId),
725 exit({self(),Tag,Result});
726 {'DOWN',Mref,_,_,_} ->
727 %% Caller died before sending us the go-ahead.
732 Mref = erlang:monitor(process, Receiver),
733 Receiver ! {self(),Tag},
735 {'DOWN',Mref,_,_,{Receiver,Tag,Result}} ->
737 {'DOWN',Mref,_,_,Reason} ->
738 %% The middleman code failed. Or someone did
739 %% exit(_, kill) on the middleman process => Reason==killed
743 send_nodes(Nodes, Name, Tag, Req) ->
744 send_nodes(Nodes, Name, Tag, Req, []).
746 send_nodes([Node|Tail], Name, Tag, Req, Monitors)
747 when is_atom(Node) ->
748 Monitor = start_monitor(Node, Name),
749 %% Handle non-existing names in rec_nodes.
750 catch {Name, Node} ! {'$gen_call', {self(), {Tag, Node}}, Req},
751 send_nodes(Tail, Name, Tag, Req, [Monitor | Monitors]);
752 send_nodes([_Node|Tail], Name, Tag, Req, Monitors) ->
753 %% Skip non-atom Node
754 send_nodes(Tail, Name, Tag, Req, Monitors);
755 send_nodes([], _Name, _Tag, _Req, Monitors) ->
758 %% Against old nodes:
759 %% If no reply has been delivered within 2 secs. (per node) check that
760 %% the server really exists and wait for ever for the answer.
762 %% Against contemporary nodes:
763 %% Wait for reply, server 'DOWN', or timeout from TimerId.
765 rec_nodes(Tag, Nodes, Name, TimerId) ->
766 rec_nodes(Tag, Nodes, Name, [], [], 2000, TimerId).
768 rec_nodes(Tag, [{N,R}|Tail], Name, Badnodes, Replies, Time, TimerId ) ->
770 {'DOWN', R, _, _, _} ->
771 rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, Time, TimerId);
772 {{Tag, N}, Reply} -> %% Tag is bound !!!
774 rec_nodes(Tag, Tail, Name, Badnodes,
775 [{N,Reply}|Replies], Time, TimerId);
776 {timeout, TimerId, _} ->
778 %% Collect all replies that already have arrived
779 rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
781 rec_nodes(Tag, [N|Tail], Name, Badnodes, Replies, Time, TimerId) ->
785 monitor_node(N, false),
786 rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, 2000, TimerId);
787 {{Tag, N}, Reply} -> %% Tag is bound !!!
788 receive {nodedown, N} -> ok after 0 -> ok end,
789 monitor_node(N, false),
790 rec_nodes(Tag, Tail, Name, Badnodes,
791 [{N,Reply}|Replies], 2000, TimerId);
792 {timeout, TimerId, _} ->
793 receive {nodedown, N} -> ok after 0 -> ok end,
794 monitor_node(N, false),
795 %% Collect all replies that already have arrived
796 rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies)
798 case rpc:call(N, erlang, whereis, [Name]) of
799 Pid when is_pid(Pid) -> % It exists try again.
800 rec_nodes(Tag, [N|Tail], Name, Badnodes,
801 Replies, infinity, TimerId);
803 receive {nodedown, N} -> ok after 0 -> ok end,
804 monitor_node(N, false),
805 rec_nodes(Tag, Tail, Name, [N|Badnodes],
806 Replies, 2000, TimerId)
809 rec_nodes(_, [], _, Badnodes, Replies, _, TimerId) ->
810 case catch erlang:cancel_timer(TimerId) of
811 false -> % It has already sent it's message
813 {timeout, TimerId, _} -> ok
817 _ -> % Timer was cancelled, or TimerId was 'undefined'
822 %% Collect all replies that already have arrived
823 rec_nodes_rest(Tag, [{N,R}|Tail], Name, Badnodes, Replies) ->
825 {'DOWN', R, _, _, _} ->
826 rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies);
827 {{Tag, N}, Reply} -> %% Tag is bound !!!
829 rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies])
832 rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
834 rec_nodes_rest(Tag, [N|Tail], Name, Badnodes, Replies) ->
838 monitor_node(N, false),
839 rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies);
840 {{Tag, N}, Reply} -> %% Tag is bound !!!
841 receive {nodedown, N} -> ok after 0 -> ok end,
842 monitor_node(N, false),
843 rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies])
845 receive {nodedown, N} -> ok after 0 -> ok end,
846 monitor_node(N, false),
847 rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
849 rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) ->
853 %%% ---------------------------------------------------
854 %%% Monitor functions
855 %%% ---------------------------------------------------
857 start_monitor(Node, Name) when is_atom(Node), is_atom(Name) ->
858 if node() =:= nonode@nohost, Node =/= nonode@nohost ->
860 self() ! {'DOWN', Ref, process, {Name, Node}, noconnection},
863 case catch erlang:monitor(process, {Name, Node}) of
866 monitor_node(Node, true),
868 Ref when is_reference(Ref) ->
873 %% Cancels a monitor started with Ref=erlang:monitor(_, _).
874 unmonitor(Ref) when is_reference(Ref) ->
875 erlang:demonitor(Ref),
877 {'DOWN', Ref, _, _, _} ->
883 %%% ---------------------------------------------------
884 %%% Message handling functions
885 %%% ---------------------------------------------------
887 dispatch({'$gen_cast', Msg}, Mod, State) ->
888 Mod:handle_cast(Msg, State);
889 dispatch(Info, Mod, State) ->
890 Mod:handle_info(Info, State).
892 common_reply(_Name, From, Reply, _NState, [] = _Debug) ->
895 common_reply(Name, {To, _Tag} = From, Reply, NState, Debug) ->
897 sys:handle_debug(Debug, fun print_event/3, Name, {out, Reply, To, NState}).
899 common_noreply(_Name, _NState, [] = _Debug) ->
901 common_noreply(Name, NState, Debug) ->
902 sys:handle_debug(Debug, fun print_event/3, Name, {noreply, NState}).
904 common_become(_Name, _Mod, _NState, [] = _Debug) ->
906 common_become(Name, Mod, NState, Debug) ->
907 sys:handle_debug(Debug, fun print_event/3, Name, {become, Mod, NState}).
909 handle_msg({'$gen_call', From, Msg}, GS2State = #gs2_state { mod = Mod,
913 case catch Mod:handle_call(Msg, From, State) of
914 {reply, Reply, NState} ->
915 Debug1 = common_reply(Name, From, Reply, NState, Debug),
916 loop(GS2State #gs2_state { state = NState,
919 {reply, Reply, NState, Time1} ->
920 Debug1 = common_reply(Name, From, Reply, NState, Debug),
921 loop(GS2State #gs2_state { state = NState,
924 {stop, Reason, Reply, NState} ->
926 (catch terminate(Reason, Msg,
927 GS2State #gs2_state { state = NState })),
928 common_reply(Name, From, Reply, NState, Debug),
931 handle_common_reply(Other, Msg, GS2State)
933 handle_msg(Msg, GS2State = #gs2_state { mod = Mod, state = State }) ->
934 Reply = (catch dispatch(Msg, Mod, State)),
935 handle_common_reply(Reply, Msg, GS2State).
937 handle_common_reply(Reply, Msg, GS2State = #gs2_state { name = Name,
941 Debug1 = common_noreply(Name, NState, Debug),
942 loop(GS2State #gs2_state {state = NState,
945 {noreply, NState, Time1} ->
946 Debug1 = common_noreply(Name, NState, Debug),
947 loop(GS2State #gs2_state {state = NState,
950 {become, Mod, NState} ->
951 Debug1 = common_become(Name, Mod, NState, Debug),
952 loop(find_prioritisers(
953 GS2State #gs2_state { mod = Mod,
957 {become, Mod, NState, Time1} ->
958 Debug1 = common_become(Name, Mod, NState, Debug),
959 loop(find_prioritisers(
960 GS2State #gs2_state { mod = Mod,
965 handle_common_termination(Reply, Msg, GS2State)
968 handle_common_termination(Reply, Msg, GS2State) ->
970 {stop, Reason, NState} ->
971 terminate(Reason, Msg, GS2State #gs2_state { state = NState });
973 terminate(What, Msg, GS2State);
975 terminate({bad_return_value, Reply}, Msg, GS2State)
978 %%-----------------------------------------------------------------
979 %% Callback functions for system messages handling.
980 %%-----------------------------------------------------------------
981 system_continue(Parent, Debug, GS2State) ->
982 loop(GS2State #gs2_state { parent = Parent, debug = Debug }).
984 system_terminate(Reason, _Parent, Debug, GS2State) ->
985 terminate(Reason, [], GS2State #gs2_state { debug = Debug }).
987 system_code_change(GS2State = #gs2_state { mod = Mod,
989 _Module, OldVsn, Extra) ->
990 case catch Mod:code_change(OldVsn, State, Extra) of
992 NewGS2State = find_prioritisers(
993 GS2State #gs2_state { state = NewState }),
999 %%-----------------------------------------------------------------
1000 %% Format debug messages. Print them as the call-back module sees
1001 %% them, not as the real erlang messages. Use trace for that.
1002 %%-----------------------------------------------------------------
1003 print_event(Dev, {in, Msg}, Name) ->
1005 {'$gen_call', {From, _Tag}, Call} ->
1006 io:format(Dev, "*DBG* ~p got call ~p from ~w~n",
1007 [Name, Call, From]);
1008 {'$gen_cast', Cast} ->
1009 io:format(Dev, "*DBG* ~p got cast ~p~n",
1012 io:format(Dev, "*DBG* ~p got ~p~n", [Name, Msg])
1014 print_event(Dev, {out, Msg, To, State}, Name) ->
1015 io:format(Dev, "*DBG* ~p sent ~p to ~w, new state ~w~n",
1016 [Name, Msg, To, State]);
1017 print_event(Dev, {noreply, State}, Name) ->
1018 io:format(Dev, "*DBG* ~p new state ~w~n", [Name, State]);
1019 print_event(Dev, Event, Name) ->
1020 io:format(Dev, "*DBG* ~p dbg ~p~n", [Name, Event]).
1023 %%% ---------------------------------------------------
1024 %%% Terminate the server.
1025 %%% ---------------------------------------------------
1027 terminate(Reason, Msg, #gs2_state { name = Name,
1031 case catch Mod:terminate(Reason, State) of
1033 error_info(R, Reason, Name, Msg, State, Debug),
1041 {shutdown,_}=Shutdown ->
1044 error_info(Reason, undefined, Name, Msg, State, Debug),
1049 error_info(_Reason, _RootCause, application_controller, _Msg, _State, _Debug) ->
1050 %% OTP-5811 Don't send an error report if it's the system process
1051 %% application_controller which is terminating - let init take care
1054 error_info(Reason, RootCause, Name, Msg, State, Debug) ->
1055 Reason1 = error_reason(Reason),
1057 "** Generic server ~p terminating~n"
1058 "** Last message in was ~p~n"
1059 "** When Server state == ~p~n"
1060 "** Reason for termination == ~n** ~p~n",
1062 undefined -> format(Fmt, [Name, Msg, State, Reason1]);
1063 _ -> format(Fmt ++ "** In 'terminate' callback "
1064 "with reason ==~n** ~p~n",
1065 [Name, Msg, State, Reason1,
1066 error_reason(RootCause)])
1068 sys:print_log(Debug),
1071 error_reason({undef,[{M,F,A}|MFAs]} = Reason) ->
1072 case code:is_loaded(M) of
1073 false -> {'module could not be loaded',[{M,F,A}|MFAs]};
1074 _ -> case erlang:function_exported(M, F, length(A)) of
1076 false -> {'function not exported',[{M,F,A}|MFAs]}
1079 error_reason(Reason) ->
1082 %%% ---------------------------------------------------
1083 %%% Misc. functions.
1084 %%% ---------------------------------------------------
1086 opt(Op, [{Op, Value}|_]) ->
1088 opt(Op, [_|Options]) ->
1093 debug_options(Name, Opts) ->
1094 case opt(debug, Opts) of
1095 {ok, Options} -> dbg_options(Name, Options);
1096 _ -> dbg_options(Name, [])
1099 dbg_options(Name, []) ->
1101 case init:get_argument(generic_debug) of
1107 dbg_opts(Name, Opts);
1108 dbg_options(Name, Opts) ->
1109 dbg_opts(Name, Opts).
1111 dbg_opts(Name, Opts) ->
1112 case catch sys:debug_options(Opts) of
1114 format("~p: ignoring erroneous debug options - ~p~n",
1121 get_proc_name(Pid) when is_pid(Pid) ->
1123 get_proc_name({local, Name}) ->
1124 case process_info(self(), registered_name) of
1125 {registered_name, Name} ->
1127 {registered_name, _Name} ->
1128 exit(process_not_registered);
1130 exit(process_not_registered)
1132 get_proc_name({global, Name}) ->
1133 case whereis_name(Name) of
1135 exit(process_not_registered_globally);
1136 Pid when Pid =:= self() ->
1139 exit(process_not_registered_globally)
1143 case get('$ancestors') of
1144 [Parent | _] when is_pid(Parent)->
1146 [Parent | _] when is_atom(Parent)->
1147 name_to_pid(Parent);
1149 exit(process_was_not_started_by_proc_lib)
1152 name_to_pid(Name) ->
1153 case whereis(Name) of
1155 case whereis_name(Name) of
1157 exit(could_not_find_registerd_name);
1165 whereis_name(Name) ->
1166 case ets:lookup(global_names, Name) of
1167 [{_Name, Pid, _Method, _RPid, _Ref}] ->
1168 if node(Pid) == node() ->
1169 case is_process_alive(Pid) of
1179 find_prioritisers(GS2State = #gs2_state { mod = Mod }) ->
1180 PCall = function_exported_or_default(Mod, 'prioritise_call', 4,
1181 fun (_Msg, _From, _State) -> 0 end),
1182 PCast = function_exported_or_default(Mod, 'prioritise_cast', 3,
1183 fun (_Msg, _State) -> 0 end),
1184 PInfo = function_exported_or_default(Mod, 'prioritise_info', 3,
1185 fun (_Msg, _State) -> 0 end),
1186 GS2State #gs2_state { prioritisers = {PCall, PCast, PInfo} }.
1188 function_exported_or_default(Mod, Fun, Arity, Default) ->
1189 case erlang:function_exported(Mod, Fun, Arity) of
1190 true -> case Arity of
1191 3 -> fun (Msg, GS2State = #gs2_state { queue = Queue,
1193 Length = priority_queue:len(Queue),
1194 case catch Mod:Fun(Msg, Length, State) of
1197 Res when is_integer(Res) ->
1200 handle_common_termination(Err, Msg, GS2State)
1203 4 -> fun (Msg, From, GS2State = #gs2_state { queue = Queue,
1205 Length = priority_queue:len(Queue),
1206 case catch Mod:Fun(Msg, From, Length, State) of
1207 Res when is_integer(Res) ->
1210 handle_common_termination(Err, Msg, GS2State)
1217 %%-----------------------------------------------------------------
1218 %% Status information
1219 %%-----------------------------------------------------------------
1220 format_status(Opt, StatusData) ->
1221 [PDict, SysState, Parent, Debug,
1222 #gs2_state{name = Name, state = State, mod = Mod, queue = Queue}] =
1224 NameTag = if is_pid(Name) ->
1229 Header = lists:concat(["Status for generic server ", NameTag]),
1230 Log = sys:get_debug(log, Debug, []),
1231 Specfic = callback(Mod, format_status, [Opt, [PDict, State]],
1232 fun () -> [{data, [{"State", State}]}] end),
1233 Messages = callback(Mod, format_message_queue, [Opt, Queue],
1234 fun () -> priority_queue:to_list(Queue) end),
1236 {data, [{"Status", SysState},
1238 {"Logged events", Log},
1239 {"Queued messages", Messages}]} |
1242 callback(Mod, FunName, Args, DefaultThunk) ->
1243 case erlang:function_exported(Mod, FunName, length(Args)) of
1244 true -> case catch apply(Mod, FunName, Args) of
1245 {'EXIT', _} -> DefaultThunk();
1248 false -> DefaultThunk()