src/worker_pool_worker.erl
author Simon MacMullen <simon@rabbitmq.com>
Tue May 21 14:19:43 2013 +0100 (3 days ago)
changeset 11897 07cf05043a49
parent 10794 aafb326b75df
parent 11388 95fe4d9ef70a
permissions -rw-r--r--
Merge bug25563
     1 %% The contents of this file are subject to the Mozilla Public License
     2 %% Version 1.1 (the "License"); you may not use this file except in
     3 %% compliance with the License. You may obtain a copy of the License
     4 %% at http://www.mozilla.org/MPL/
     5 %%
     6 %% Software distributed under the License is distributed on an "AS IS"
     7 %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
     8 %% the License for the specific language governing rights and
     9 %% limitations under the License.
    10 %%
    11 %% The Original Code is RabbitMQ.
    12 %%
    13 %% The Initial Developer of the Original Code is VMware, Inc.
    14 %% Copyright (c) 2007-2013 VMware, Inc.  All rights reserved.
    15 %%
    16 
    17 -module(worker_pool_worker).
    18 
    19 -behaviour(gen_server2).
    20 
    21 -export([start_link/1, submit/2, submit_async/2, run/1]).
    22 
    23 -export([set_maximum_since_use/2]).
    24 
    25 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
    26          terminate/2, code_change/3, prioritise_cast/3]).
    27 
    28 %%----------------------------------------------------------------------------
    29 
    30 -ifdef(use_specs).
    31 
    32 -type(mfargs() :: {atom(), atom(), [any()]}).
    33 
    34 -spec(start_link/1 :: (any()) -> {'ok', pid()} | {'error', any()}).
    35 -spec(submit/2 :: (pid(), fun (() -> A) | mfargs()) -> A).
    36 -spec(submit_async/2 :: (pid(), fun (() -> any()) | mfargs()) -> 'ok').
    37 -spec(run/1 :: (fun (() -> A)) -> A; (mfargs()) -> any()).
    38 -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
    39 
    40 -endif.
    41 
    42 %%----------------------------------------------------------------------------
    43 
    44 -define(HIBERNATE_AFTER_MIN, 1000).
    45 -define(DESIRED_HIBERNATE, 10000).
    46 
    47 %%----------------------------------------------------------------------------
    48 
    49 start_link(WId) ->
    50     gen_server2:start_link(?MODULE, [WId], [{timeout, infinity}]).
    51 
    52 submit(Pid, Fun) ->
    53     gen_server2:call(Pid, {submit, Fun}, infinity).
    54 
    55 submit_async(Pid, Fun) ->
    56     gen_server2:cast(Pid, {submit_async, Fun}).
    57 
    58 set_maximum_since_use(Pid, Age) ->
    59     gen_server2:cast(Pid, {set_maximum_since_use, Age}).
    60 
    61 run({M, F, A}) ->
    62     apply(M, F, A);
    63 run(Fun) ->
    64     Fun().
    65 
    66 %%----------------------------------------------------------------------------
    67 
    68 init([WId]) ->
    69     ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
    70                                              [self()]),
    71     ok = worker_pool:idle(WId),
    72     put(worker_pool_worker, true),
    73     {ok, WId, hibernate,
    74      {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
    75 
    76 prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8;
    77 prioritise_cast(_Msg,                          _Len, _State) -> 0.
    78 
    79 handle_call({submit, Fun}, From, WId) ->
    80     gen_server2:reply(From, run(Fun)),
    81     ok = worker_pool:idle(WId),
    82     {noreply, WId, hibernate};
    83 
    84 handle_call(Msg, _From, State) ->
    85     {stop, {unexpected_call, Msg}, State}.
    86 
    87 handle_cast({submit_async, Fun}, WId) ->
    88     run(Fun),
    89     ok = worker_pool:idle(WId),
    90     {noreply, WId, hibernate};
    91 
    92 handle_cast({set_maximum_since_use, Age}, WId) ->
    93     ok = file_handle_cache:set_maximum_since_use(Age),
    94     {noreply, WId, hibernate};
    95 
    96 handle_cast(Msg, State) ->
    97     {stop, {unexpected_cast, Msg}, State}.
    98 
    99 handle_info(Msg, State) ->
   100     {stop, {unexpected_info, Msg}, State}.
   101 
   102 code_change(_OldVsn, State, _Extra) ->
   103     {ok, State}.
   104 
   105 terminate(_Reason, State) ->
   106     State.