src/worker_pool.erl
author Jean-Sebastien Pedron <jean-sebastien@rabbitmq.com>
Tue, 17 Feb 2015 17:28:45 +0100
changeset 14460 cbfc99c735ad
parent 14429 2fbd1cff5998
permissions -rw-r--r--
stable to default
     1 %% The contents of this file are subject to the Mozilla Public License
     2 %% Version 1.1 (the "License"); you may not use this file except in
     3 %% compliance with the License. You may obtain a copy of the License
     4 %% at http://www.mozilla.org/MPL/
     5 %%
     6 %% Software distributed under the License is distributed on an "AS IS"
     7 %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
     8 %% the License for the specific language governing rights and
     9 %% limitations under the License.
    10 %%
    11 %% The Original Code is RabbitMQ.
    12 %%
    13 %% The Initial Developer of the Original Code is GoPivotal, Inc.
    14 %% Copyright (c) 2007-2014 GoPivotal, Inc.  All rights reserved.
    15 %%
    16 
    17 -module(worker_pool).
    18 
    19 %% Generic worker pool manager.
    20 %%
    21 %% Submitted jobs are functions. They can be executed asynchronously
    22 %% (using worker_pool:submit/1, worker_pool:submit/2) or synchronously
    23 %% (using worker_pool:submit_async/1).
    24 %%
    25 %% We typically use the worker pool if we want to limit the maximum
    26 %% parallelism of some job. We are not trying to dodge the cost of
    27 %% creating Erlang processes.
    28 %%
    29 %% Supports nested submission of jobs and two execution modes:
    30 %% 'single' and 'reuse'. Jobs executed in 'single' mode are invoked in
    31 %% a one-off process. Those executed in 'reuse' mode are invoked in a
    32 %% worker process out of the pool. Nested jobs are always executed
    33 %% immediately in current worker process.
    34 %%
    35 %% 'single' mode is offered to work around a bug in Mnesia: after
    36 %% network partitions reply messages for prior failed requests can be
    37 %% sent to Mnesia clients - a reused worker pool process can crash on
    38 %% receiving one.
    39 %%
    40 %% Caller submissions are enqueued internally. When the next worker
    41 %% process is available, it communicates it to the pool and is
    42 %% assigned a job to execute. If job execution fails with an error, no
    43 %% response is returned to the caller.
    44 %%
    45 %% Worker processes prioritise certain command-and-control messages
    46 %% from the pool.
    47 %%
    48 %% Future improvement points: job prioritisation.
    49 
    50 -behaviour(gen_server2).
    51 
    52 -export([start_link/0, submit/1, submit/2, submit_async/1, ready/1,
    53          idle/1]).
    54 
    55 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
    56          terminate/2, code_change/3]).
    57 
    58 %%----------------------------------------------------------------------------
    59 
    60 -ifdef(use_specs).
    61 
    62 -type(mfargs() :: {atom(), atom(), [any()]}).
    63 
    64 -spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
    65 -spec(submit/1 :: (fun (() -> A) | mfargs()) -> A).
    66 -spec(submit/2 :: (fun (() -> A) | mfargs(), 'reuse' | 'single') -> A).
    67 -spec(submit_async/1 :: (fun (() -> any()) | mfargs()) -> 'ok').
    68 -spec(ready/1 :: (pid()) -> 'ok').
    69 -spec(idle/1 :: (pid()) -> 'ok').
    70 
    71 -endif.
    72 
    73 %%----------------------------------------------------------------------------
    74 
    75 -define(SERVER, ?MODULE).
    76 -define(HIBERNATE_AFTER_MIN, 1000).
    77 -define(DESIRED_HIBERNATE, 10000).
    78 
    79 -record(state, { available, pending }).
    80 
    81 %%----------------------------------------------------------------------------
    82 
    83 start_link() -> gen_server2:start_link({local, ?SERVER}, ?MODULE, [],
    84                                        [{timeout, infinity}]).
    85 
    86 submit(Fun) ->
    87     submit(Fun, reuse).
    88 
    89 %% ProcessModel =:= single is for working around the mnesia_locker bug.
    90 submit(Fun, ProcessModel) ->
    91     case get(worker_pool_worker) of
    92         true -> worker_pool_worker:run(Fun);
    93         _    -> Pid = gen_server2:call(?SERVER, {next_free, self()}, infinity),
    94                 worker_pool_worker:submit(Pid, Fun, ProcessModel)
    95     end.
    96 
    97 submit_async(Fun) -> gen_server2:cast(?SERVER, {run_async, Fun}).
    98 
    99 ready(WPid) -> gen_server2:cast(?SERVER, {ready, WPid}).
   100 
   101 idle(WPid) -> gen_server2:cast(?SERVER, {idle, WPid}).
   102 
   103 %%----------------------------------------------------------------------------
   104 
   105 init([]) ->
   106     {ok, #state { pending = queue:new(), available = ordsets:new() }, hibernate,
   107      {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
   108 
   109 handle_call({next_free, CPid}, From, State = #state { available = [],
   110                                                       pending   = Pending }) ->
   111     {noreply, State#state{pending = queue:in({next_free, From, CPid}, Pending)},
   112      hibernate};
   113 handle_call({next_free, CPid}, _From, State = #state { available =
   114                                                            [WPid | Avail1] }) ->
   115     worker_pool_worker:next_job_from(WPid, CPid),
   116     {reply, WPid, State #state { available = Avail1 }, hibernate};
   117 
   118 handle_call(Msg, _From, State) ->
   119     {stop, {unexpected_call, Msg}, State}.
   120 
   121 handle_cast({ready, WPid}, State) ->
   122     erlang:monitor(process, WPid),
   123     handle_cast({idle, WPid}, State);
   124 
   125 handle_cast({idle, WPid}, State = #state { available = Avail,
   126                                            pending   = Pending }) ->
   127     {noreply,
   128      case queue:out(Pending) of
   129          {empty, _Pending} ->
   130              State #state { available = ordsets:add_element(WPid, Avail) };
   131          {{value, {next_free, From, CPid}}, Pending1} ->
   132              worker_pool_worker:next_job_from(WPid, CPid),
   133              gen_server2:reply(From, WPid),
   134              State #state { pending = Pending1 };
   135          {{value, {run_async, Fun}}, Pending1} ->
   136              worker_pool_worker:submit_async(WPid, Fun),
   137              State #state { pending = Pending1 }
   138      end, hibernate};
   139 
   140 handle_cast({run_async, Fun}, State = #state { available = [],
   141                                                pending   = Pending }) ->
   142     {noreply, State #state { pending = queue:in({run_async, Fun}, Pending)},
   143      hibernate};
   144 handle_cast({run_async, Fun}, State = #state { available = [WPid | Avail1] }) ->
   145     worker_pool_worker:submit_async(WPid, Fun),
   146     {noreply, State #state { available = Avail1 }, hibernate};
   147 
   148 handle_cast(Msg, State) ->
   149     {stop, {unexpected_cast, Msg}, State}.
   150 
   151 handle_info({'DOWN', _MRef, process, WPid, _Reason},
   152             State = #state { available = Avail }) ->
   153     {noreply, State #state { available = ordsets:del_element(WPid, Avail) },
   154      hibernate};
   155 
   156 handle_info(Msg, State) ->
   157     {stop, {unexpected_info, Msg}, State}.
   158 
   159 code_change(_OldVsn, State, _Extra) ->
   160     {ok, State}.
   161 
   162 terminate(_Reason, State) ->
   163     State.