src/worker_pool_worker.erl
author Jean-Sebastien Pedron <jean-sebastien@rabbitmq.com>
Tue, 17 Feb 2015 17:28:45 +0100
changeset 14460 cbfc99c735ad
parent 13883 5a63c9e273cc
permissions -rw-r--r--
stable to default
     1 %% The contents of this file are subject to the Mozilla Public License
     2 %% Version 1.1 (the "License"); you may not use this file except in
     3 %% compliance with the License. You may obtain a copy of the License
     4 %% at http://www.mozilla.org/MPL/
     5 %%
     6 %% Software distributed under the License is distributed on an "AS IS"
     7 %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
     8 %% the License for the specific language governing rights and
     9 %% limitations under the License.
    10 %%
    11 %% The Original Code is RabbitMQ.
    12 %%
    13 %% The Initial Developer of the Original Code is GoPivotal, Inc.
    14 %% Copyright (c) 2007-2014 GoPivotal, Inc.  All rights reserved.
    15 %%
    16 
    17 -module(worker_pool_worker).
    18 
    19 %% Executes jobs (functions) submitted to a worker pool with worker_pool:submit/1,
    20 %% worker_pool:submit/2 or worker_pool:submit_async/1.
    21 %%
    22 %% See worker_pool for an overview.
    23 
    24 -behaviour(gen_server2).
    25 
    26 -export([start_link/0, next_job_from/2, submit/3, submit_async/2, run/1]).
    27 
    28 -export([set_maximum_since_use/2]).
    29 
    30 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
    31          terminate/2, code_change/3, prioritise_cast/3]).
    32 
    33 %%----------------------------------------------------------------------------
    34 
    35 -ifdef(use_specs).
    36 
    37 -type(mfargs() :: {atom(), atom(), [any()]}).
    38 
    39 -spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
    40 -spec(next_job_from/2 :: (pid(), pid()) -> 'ok').
    41 -spec(submit/3 :: (pid(), fun (() -> A) | mfargs(), 'reuse' | 'single') -> A).
    42 -spec(submit_async/2 :: (pid(), fun (() -> any()) | mfargs()) -> 'ok').
    43 -spec(run/1 :: (fun (() -> A)) -> A; (mfargs()) -> any()).
    44 -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
    45 
    46 -endif.
    47 
    48 %%----------------------------------------------------------------------------
    49 
    50 -define(HIBERNATE_AFTER_MIN, 1000).
    51 -define(DESIRED_HIBERNATE, 10000).
    52 
    53 %%----------------------------------------------------------------------------
    54 
    55 start_link() ->
    56     gen_server2:start_link(?MODULE, [], [{timeout, infinity}]).
    57 
    58 next_job_from(Pid, CPid) ->
    59     gen_server2:cast(Pid, {next_job_from, CPid}).
    60 
    61 submit(Pid, Fun, ProcessModel) ->
    62     gen_server2:call(Pid, {submit, Fun, self(), ProcessModel}, infinity).
    63 
    64 submit_async(Pid, Fun) ->
    65     gen_server2:cast(Pid, {submit_async, Fun}).
    66 
    67 set_maximum_since_use(Pid, Age) ->
    68     gen_server2:cast(Pid, {set_maximum_since_use, Age}).
    69 
    70 run({M, F, A}) -> apply(M, F, A);
    71 run(Fun)       -> Fun().
    72 
    73 run(Fun, reuse) ->
    74     run(Fun);
    75 run(Fun, single) ->
    76     Self = self(),
    77     Ref = make_ref(),
    78     spawn_link(fun () ->
    79                        put(worker_pool_worker, true),
    80                        Self ! {Ref, run(Fun)},
    81                        unlink(Self)
    82                end),
    83     receive
    84         {Ref, Res} -> Res
    85     end.
    86 
    87 %%----------------------------------------------------------------------------
    88 
    89 init([]) ->
    90     ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
    91                                              [self()]),
    92     ok = worker_pool:ready(self()),
    93     put(worker_pool_worker, true),
    94     {ok, undefined, hibernate,
    95      {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
    96 
    97 prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8;
    98 prioritise_cast({next_job_from, _CPid},        _Len, _State) -> 7;
    99 prioritise_cast(_Msg,                          _Len, _State) -> 0.
   100 
   101 handle_call({submit, Fun, CPid, ProcessModel}, From, undefined) ->
   102     {noreply, {job, CPid, From, Fun, ProcessModel}, hibernate};
   103 
   104 handle_call({submit, Fun, CPid, ProcessModel}, From, {from, CPid, MRef}) ->
   105     erlang:demonitor(MRef),
   106     gen_server2:reply(From, run(Fun, ProcessModel)),
   107     ok = worker_pool:idle(self()),
   108     {noreply, undefined, hibernate};
   109 
   110 handle_call(Msg, _From, State) ->
   111     {stop, {unexpected_call, Msg}, State}.
   112 
   113 handle_cast({next_job_from, CPid}, undefined) ->
   114     MRef = erlang:monitor(process, CPid),
   115     {noreply, {from, CPid, MRef}, hibernate};
   116 
   117 handle_cast({next_job_from, CPid}, {job, CPid, From, Fun, ProcessModel}) ->
   118     gen_server2:reply(From, run(Fun, ProcessModel)),
   119     ok = worker_pool:idle(self()),
   120     {noreply, undefined, hibernate};
   121 
   122 handle_cast({submit_async, Fun}, undefined) ->
   123     run(Fun),
   124     ok = worker_pool:idle(self()),
   125     {noreply, undefined, hibernate};
   126 
   127 handle_cast({set_maximum_since_use, Age}, State) ->
   128     ok = file_handle_cache:set_maximum_since_use(Age),
   129     {noreply, State, hibernate};
   130 
   131 handle_cast(Msg, State) ->
   132     {stop, {unexpected_cast, Msg}, State}.
   133 
   134 handle_info({'DOWN', MRef, process, CPid, _Reason}, {from, CPid, MRef}) ->
   135     ok = worker_pool:idle(self()),
   136     {noreply, undefined, hibernate};
   137 
   138 handle_info({'DOWN', _MRef, process, _Pid, _Reason}, State) ->
   139     {noreply, State, hibernate};
   140 
   141 handle_info(Msg, State) ->
   142     {stop, {unexpected_info, Msg}, State}.
   143 
   144 code_change(_OldVsn, State, _Extra) ->
   145     {ok, State}.
   146 
   147 terminate(_Reason, State) ->
   148     State.