|
matthew@6072
|
1 |
%% The contents of this file are subject to the Mozilla Public License
|
|
matthew@6072
|
2 |
%% Version 1.1 (the "License"); you may not use this file except in
|
|
matthew@6072
|
3 |
%% compliance with the License. You may obtain a copy of the License
|
|
matthew@6072
|
4 |
%% at http://www.mozilla.org/MPL/
|
|
matthew@2926
|
5 |
%%
|
|
matthew@6072
|
6 |
%% Software distributed under the License is distributed on an "AS IS"
|
|
matthew@6072
|
7 |
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
|
|
matthew@6072
|
8 |
%% the License for the specific language governing rights and
|
|
matthew@6072
|
9 |
%% limitations under the License.
|
|
matthew@2926
|
10 |
%%
|
|
matthew@6072
|
11 |
%% The Original Code is RabbitMQ.
|
|
matthew@2926
|
12 |
%%
|
|
matthew@6072
|
13 |
%% The Initial Developer of the Original Code is VMware, Inc.
|
|
emile@8906
|
14 |
%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
|
|
matthew@2926
|
15 |
%%
|
|
matthew@2926
|
16 |
|
|
matthew@2926
|
17 |
-module(worker_pool).
|
|
matthew@2926
|
18 |
|
|
matthew@2926
|
19 |
%% Generic worker pool manager.
|
|
matthew@2926
|
20 |
%%
|
|
matthew@2926
|
21 |
%% Supports nested submission of jobs (nested jobs always run
|
|
matthew@2926
|
22 |
%% immediately in current worker process).
|
|
matthew@2926
|
23 |
%%
|
|
matthew@2926
|
24 |
%% Possible future enhancements:
|
|
matthew@2926
|
25 |
%%
|
|
matthew@2926
|
26 |
%% 1. Allow priorities (basically, change the pending queue to a
|
|
matthew@2926
|
27 |
%% priority_queue).
|
|
matthew@2926
|
28 |
|
|
matthew@2926
|
29 |
-behaviour(gen_server2).
|
|
matthew@2926
|
30 |
|
|
matthias@3008
|
31 |
-export([start_link/0, submit/1, submit_async/1, idle/1]).
|
|
matthew@2926
|
32 |
|
|
matthew@2926
|
33 |
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
|
matthew@2926
|
34 |
terminate/2, code_change/3]).
|
|
matthew@2926
|
35 |
|
|
matthew@2926
|
36 |
%%----------------------------------------------------------------------------
|
|
matthew@2926
|
37 |
|
|
matthew@2926
|
38 |
-ifdef(use_specs).
|
|
matthew@2926
|
39 |
|
|
matthias@8788
|
40 |
-type(mfargs() :: {atom(), atom(), [any()]}).
|
|
matthias@8788
|
41 |
|
|
matthias@4392
|
42 |
-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
|
|
matthias@8788
|
43 |
-spec(submit/1 :: (fun (() -> A) | mfargs()) -> A).
|
|
matthias@8788
|
44 |
-spec(submit_async/1 :: (fun (() -> any()) | mfargs()) -> 'ok').
|
|
alexandru@7795
|
45 |
-spec(idle/1 :: (any()) -> 'ok').
|
|
matthew@2926
|
46 |
|
|
matthew@2926
|
47 |
-endif.
|
|
matthew@2926
|
48 |
|
|
matthew@2926
|
49 |
%%----------------------------------------------------------------------------
|
|
matthew@2926
|
50 |
|
|
matthew@2926
|
51 |
-define(SERVER, ?MODULE).
|
|
matthew@2926
|
52 |
-define(HIBERNATE_AFTER_MIN, 1000).
|
|
matthew@2926
|
53 |
-define(DESIRED_HIBERNATE, 10000).
|
|
matthew@2926
|
54 |
|
|
matthew@2926
|
55 |
-record(state, { available, pending }).
|
|
matthew@2926
|
56 |
|
|
matthew@2926
|
57 |
%%----------------------------------------------------------------------------
|
|
matthew@2926
|
58 |
|
|
matthew@2926
|
59 |
start_link() ->
|
|
matthew@2926
|
60 |
gen_server2:start_link({local, ?SERVER}, ?MODULE, [],
|
|
matthew@2926
|
61 |
[{timeout, infinity}]).
|
|
matthew@2926
|
62 |
|
|
matthew@2926
|
63 |
submit(Fun) ->
|
|
matthew@2926
|
64 |
case get(worker_pool_worker) of
|
|
matthew@2926
|
65 |
true -> worker_pool_worker:run(Fun);
|
|
matthew@2926
|
66 |
_ -> Pid = gen_server2:call(?SERVER, next_free, infinity),
|
|
matthew@2926
|
67 |
worker_pool_worker:submit(Pid, Fun)
|
|
matthew@2926
|
68 |
end.
|
|
matthew@2926
|
69 |
|
|
matthias@3008
|
70 |
submit_async(Fun) ->
|
|
matthias@3008
|
71 |
gen_server2:cast(?SERVER, {run_async, Fun}).
|
|
matthias@3008
|
72 |
|
|
matthew@2926
|
73 |
idle(WId) ->
|
|
matthew@2926
|
74 |
gen_server2:cast(?SERVER, {idle, WId}).
|
|
matthew@2926
|
75 |
|
|
matthew@2926
|
76 |
%%----------------------------------------------------------------------------
|
|
matthew@2926
|
77 |
|
|
matthew@2926
|
78 |
init([]) ->
|
|
matthew@2926
|
79 |
{ok, #state { pending = queue:new(), available = queue:new() }, hibernate,
|
|
matthew@2926
|
80 |
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
|
|
matthew@2926
|
81 |
|
|
matthew@2926
|
82 |
handle_call(next_free, From, State = #state { available = Avail,
|
|
matthew@2926
|
83 |
pending = Pending }) ->
|
|
matthew@2926
|
84 |
case queue:out(Avail) of
|
|
matthew@2926
|
85 |
{empty, _Avail} ->
|
|
matthias@3008
|
86 |
{noreply,
|
|
matthias@3008
|
87 |
State #state { pending = queue:in({next_free, From}, Pending) },
|
|
matthew@2961
|
88 |
hibernate};
|
|
matthew@2926
|
89 |
{{value, WId}, Avail1} ->
|
|
matthew@2961
|
90 |
{reply, get_worker_pid(WId), State #state { available = Avail1 },
|
|
matthew@2961
|
91 |
hibernate}
|
|
matthew@2926
|
92 |
end;
|
|
matthew@2926
|
93 |
|
|
matthew@2926
|
94 |
handle_call(Msg, _From, State) ->
|
|
matthew@2926
|
95 |
{stop, {unexpected_call, Msg}, State}.
|
|
matthew@2926
|
96 |
|
|
matthew@2926
|
97 |
handle_cast({idle, WId}, State = #state { available = Avail,
|
|
matthew@2926
|
98 |
pending = Pending }) ->
|
|
matthew@2926
|
99 |
{noreply, case queue:out(Pending) of
|
|
matthew@2926
|
100 |
{empty, _Pending} ->
|
|
matthew@2926
|
101 |
State #state { available = queue:in(WId, Avail) };
|
|
matthias@3008
|
102 |
{{value, {next_free, From}}, Pending1} ->
|
|
matthew@2926
|
103 |
gen_server2:reply(From, get_worker_pid(WId)),
|
|
matthias@3008
|
104 |
State #state { pending = Pending1 };
|
|
matthias@3008
|
105 |
{{value, {run_async, Fun}}, Pending1} ->
|
|
matthias@3008
|
106 |
worker_pool_worker:submit_async(get_worker_pid(WId), Fun),
|
|
matthew@2926
|
107 |
State #state { pending = Pending1 }
|
|
matthew@2961
|
108 |
end, hibernate};
|
|
matthew@2926
|
109 |
|
|
matthias@3008
|
110 |
handle_cast({run_async, Fun}, State = #state { available = Avail,
|
|
matthias@3008
|
111 |
pending = Pending }) ->
|
|
matthias@3008
|
112 |
{noreply,
|
|
matthias@3008
|
113 |
case queue:out(Avail) of
|
|
matthias@3008
|
114 |
{empty, _Avail} ->
|
|
matthias@3008
|
115 |
State #state { pending = queue:in({run_async, Fun}, Pending)};
|
|
matthias@3008
|
116 |
{{value, WId}, Avail1} ->
|
|
matthias@3008
|
117 |
worker_pool_worker:submit_async(get_worker_pid(WId), Fun),
|
|
matthias@3008
|
118 |
State #state { available = Avail1 }
|
|
matthias@3008
|
119 |
end, hibernate};
|
|
matthias@3008
|
120 |
|
|
matthew@2926
|
121 |
handle_cast(Msg, State) ->
|
|
matthew@2926
|
122 |
{stop, {unexpected_cast, Msg}, State}.
|
|
matthew@2926
|
123 |
|
|
matthew@2926
|
124 |
handle_info(Msg, State) ->
|
|
matthew@2926
|
125 |
{stop, {unexpected_info, Msg}, State}.
|
|
matthew@2926
|
126 |
|
|
matthew@2926
|
127 |
code_change(_OldVsn, State, _Extra) ->
|
|
matthew@2926
|
128 |
{ok, State}.
|
|
matthew@2926
|
129 |
|
|
matthew@2926
|
130 |
terminate(_Reason, State) ->
|
|
matthew@2926
|
131 |
State.
|
|
matthew@2926
|
132 |
|
|
matthew@2926
|
133 |
%%----------------------------------------------------------------------------
|
|
matthew@2926
|
134 |
|
|
matthew@2926
|
135 |
get_worker_pid(WId) ->
|
|
matthew@2926
|
136 |
[{WId, Pid, _Type, _Modules} | _] =
|
|
matthew@2926
|
137 |
lists:dropwhile(fun ({Id, _Pid, _Type, _Modules})
|
|
matthew@2926
|
138 |
when Id =:= WId -> false;
|
|
matthew@2927
|
139 |
(_) -> true
|
|
matthew@2926
|
140 |
end,
|
|
matthew@2926
|
141 |
supervisor:which_children(worker_pool_sup)),
|
|
matthew@2926
|
142 |
Pid.
|