src/worker_pool_worker.erl
author Simon MacMullen <simon@rabbitmq.com>
Mon, 28 Jul 2014 19:00:04 +0100
changeset 13738 32b9f859b7eb
parent 13330 989f9da88bb5
permissions -rw-r--r--
force_boot requires no app.
     1 %% The contents of this file are subject to the Mozilla Public License
     2 %% Version 1.1 (the "License"); you may not use this file except in
     3 %% compliance with the License. You may obtain a copy of the License
     4 %% at http://www.mozilla.org/MPL/
     5 %%
     6 %% Software distributed under the License is distributed on an "AS IS"
     7 %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
     8 %% the License for the specific language governing rights and
     9 %% limitations under the License.
    10 %%
    11 %% The Original Code is RabbitMQ.
    12 %%
    13 %% The Initial Developer of the Original Code is GoPivotal, Inc.
    14 %% Copyright (c) 2007-2014 GoPivotal, Inc.  All rights reserved.
    15 %%
    16 
    17 -module(worker_pool_worker).
    18 
    19 -behaviour(gen_server2).
    20 
    21 -export([start_link/0, next_job_from/2, submit/2, submit_async/2, run/1]).
    22 
    23 -export([set_maximum_since_use/2]).
    24 
    25 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
    26          terminate/2, code_change/3, prioritise_cast/3]).
    27 
    28 %%----------------------------------------------------------------------------
    29 
    30 -ifdef(use_specs).
    31 
    32 -type(mfargs() :: {atom(), atom(), [any()]}).
    33 
    34 -spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
    35 -spec(next_job_from/2 :: (pid(), pid()) -> 'ok').
    36 -spec(submit/2 :: (pid(), fun (() -> A) | mfargs()) -> A).
    37 -spec(submit_async/2 :: (pid(), fun (() -> any()) | mfargs()) -> 'ok').
    38 -spec(run/1 :: (fun (() -> A)) -> A; (mfargs()) -> any()).
    39 -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
    40 
    41 -endif.
    42 
    43 %%----------------------------------------------------------------------------
    44 
    45 -define(HIBERNATE_AFTER_MIN, 1000).
    46 -define(DESIRED_HIBERNATE, 10000).
    47 
    48 %%----------------------------------------------------------------------------
    49 
    50 start_link() ->
    51     gen_server2:start_link(?MODULE, [], [{timeout, infinity}]).
    52 
    53 next_job_from(Pid, CPid) ->
    54     gen_server2:cast(Pid, {next_job_from, CPid}).
    55 
    56 submit(Pid, Fun) ->
    57     gen_server2:call(Pid, {submit, Fun, self()}, infinity).
    58 
    59 submit_async(Pid, Fun) ->
    60     gen_server2:cast(Pid, {submit_async, Fun}).
    61 
    62 set_maximum_since_use(Pid, Age) ->
    63     gen_server2:cast(Pid, {set_maximum_since_use, Age}).
    64 
    65 run({M, F, A}) ->
    66     apply(M, F, A);
    67 run(Fun) ->
    68     Fun().
    69 
    70 %%----------------------------------------------------------------------------
    71 
    72 init([]) ->
    73     ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
    74                                              [self()]),
    75     ok = worker_pool:ready(self()),
    76     put(worker_pool_worker, true),
    77     {ok, undefined, hibernate,
    78      {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
    79 
    80 prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8;
    81 prioritise_cast({next_job_from, _CPid},        _Len, _State) -> 7;
    82 prioritise_cast(_Msg,                          _Len, _State) -> 0.
    83 
    84 handle_call({submit, Fun, CPid}, From, undefined) ->
    85     {noreply, {job, CPid, From, Fun}, hibernate};
    86 
    87 handle_call({submit, Fun, CPid}, From, {from, CPid, MRef}) ->
    88     erlang:demonitor(MRef),
    89     gen_server2:reply(From, run(Fun)),
    90     ok = worker_pool:idle(self()),
    91     {noreply, undefined, hibernate};
    92 
    93 handle_call(Msg, _From, State) ->
    94     {stop, {unexpected_call, Msg}, State}.
    95 
    96 handle_cast({next_job_from, CPid}, undefined) ->
    97     MRef = erlang:monitor(process, CPid),
    98     {noreply, {from, CPid, MRef}, hibernate};
    99 
   100 handle_cast({next_job_from, CPid}, {job, CPid, From, Fun}) ->
   101     gen_server2:reply(From, run(Fun)),
   102     ok = worker_pool:idle(self()),
   103     {noreply, undefined, hibernate};
   104 
   105 handle_cast({submit_async, Fun}, undefined) ->
   106     run(Fun),
   107     ok = worker_pool:idle(self()),
   108     {noreply, undefined, hibernate};
   109 
   110 handle_cast({set_maximum_since_use, Age}, State) ->
   111     ok = file_handle_cache:set_maximum_since_use(Age),
   112     {noreply, State, hibernate};
   113 
   114 handle_cast(Msg, State) ->
   115     {stop, {unexpected_cast, Msg}, State}.
   116 
   117 handle_info({'DOWN', MRef, process, CPid, _Reason}, {from, CPid, MRef}) ->
   118     ok = worker_pool:idle(self()),
   119     {noreply, undefined, hibernate};
   120 
   121 handle_info({'DOWN', _MRef, process, _Pid, _Reason}, State) ->
   122     {noreply, State, hibernate};
   123 
   124 handle_info(Msg, State) ->
   125     {stop, {unexpected_info, Msg}, State}.
   126 
   127 code_change(_OldVsn, State, _Extra) ->
   128     {ok, State}.
   129 
   130 terminate(_Reason, State) ->
   131     State.