1 %% The contents of this file are subject to the Mozilla Public License
2 %% Version 1.1 (the "License"); you may not use this file except in
3 %% compliance with the License. You may obtain a copy of the License
4 %% at http://www.mozilla.org/MPL/
6 %% Software distributed under the License is distributed on an "AS IS"
7 %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8 %% the License for the specific language governing rights and
9 %% limitations under the License.
11 %% The Original Code is RabbitMQ.
13 %% The Initial Developer of the Original Code is VMware, Inc.
14 %% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
19 %% Generic worker pool manager.
21 %% Supports nested submission of jobs (nested jobs always run
22 %% immediately in current worker process).
24 %% Possible future enhancements:
26 %% 1. Allow priorities (basically, change the pending queue to a
29 -behaviour(gen_server2).
31 -export([start_link/0, submit/1, submit_async/1, idle/1]).
33 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
34 terminate/2, code_change/3]).
36 %%----------------------------------------------------------------------------
40 -type(mfargs() :: {atom(), atom(), [any()]}).
42 -spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
43 -spec(submit/1 :: (fun (() -> A) | mfargs()) -> A).
44 -spec(submit_async/1 :: (fun (() -> any()) | mfargs()) -> 'ok').
45 -spec(idle/1 :: (any()) -> 'ok').
49 %%----------------------------------------------------------------------------
51 -define(SERVER, ?MODULE).
52 -define(HIBERNATE_AFTER_MIN, 1000).
53 -define(DESIRED_HIBERNATE, 10000).
55 -record(state, { available, pending }).
57 %%----------------------------------------------------------------------------
60 gen_server2:start_link({local, ?SERVER}, ?MODULE, [],
61 [{timeout, infinity}]).
64 case get(worker_pool_worker) of
65 true -> worker_pool_worker:run(Fun);
66 _ -> Pid = gen_server2:call(?SERVER, next_free, infinity),
67 worker_pool_worker:submit(Pid, Fun)
71 gen_server2:cast(?SERVER, {run_async, Fun}).
74 gen_server2:cast(?SERVER, {idle, WId}).
76 %%----------------------------------------------------------------------------
79 {ok, #state { pending = queue:new(), available = queue:new() }, hibernate,
80 {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
82 handle_call(next_free, From, State = #state { available = Avail,
83 pending = Pending }) ->
84 case queue:out(Avail) of
87 State #state { pending = queue:in({next_free, From}, Pending) },
89 {{value, WId}, Avail1} ->
90 {reply, get_worker_pid(WId), State #state { available = Avail1 },
94 handle_call(Msg, _From, State) ->
95 {stop, {unexpected_call, Msg}, State}.
97 handle_cast({idle, WId}, State = #state { available = Avail,
98 pending = Pending }) ->
99 {noreply, case queue:out(Pending) of
101 State #state { available = queue:in(WId, Avail) };
102 {{value, {next_free, From}}, Pending1} ->
103 gen_server2:reply(From, get_worker_pid(WId)),
104 State #state { pending = Pending1 };
105 {{value, {run_async, Fun}}, Pending1} ->
106 worker_pool_worker:submit_async(get_worker_pid(WId), Fun),
107 State #state { pending = Pending1 }
110 handle_cast({run_async, Fun}, State = #state { available = Avail,
111 pending = Pending }) ->
113 case queue:out(Avail) of
115 State #state { pending = queue:in({run_async, Fun}, Pending)};
116 {{value, WId}, Avail1} ->
117 worker_pool_worker:submit_async(get_worker_pid(WId), Fun),
118 State #state { available = Avail1 }
121 handle_cast(Msg, State) ->
122 {stop, {unexpected_cast, Msg}, State}.
124 handle_info(Msg, State) ->
125 {stop, {unexpected_info, Msg}, State}.
127 code_change(_OldVsn, State, _Extra) ->
130 terminate(_Reason, State) ->
133 %%----------------------------------------------------------------------------
135 get_worker_pid(WId) ->
136 [{WId, Pid, _Type, _Modules} | _] =
137 lists:dropwhile(fun ({Id, _Pid, _Type, _Modules})
138 when Id =:= WId -> false;
141 supervisor:which_children(worker_pool_sup)),