src/worker_pool.erl
author Simon MacMullen <simon@rabbitmq.com>
Fri Feb 03 15:59:12 2012 +0000 (3 months ago)
changeset 8922 4f87837a40be
parent 8788 2bb681711fb9
permissions -rw-r--r--
Merge bug24702
matthew@6072
     1
%% The contents of this file are subject to the Mozilla Public License
matthew@6072
     2
%% Version 1.1 (the "License"); you may not use this file except in
matthew@6072
     3
%% compliance with the License. You may obtain a copy of the License
matthew@6072
     4
%% at http://www.mozilla.org/MPL/
matthew@2926
     5
%%
matthew@6072
     6
%% Software distributed under the License is distributed on an "AS IS"
matthew@6072
     7
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
matthew@6072
     8
%% the License for the specific language governing rights and
matthew@6072
     9
%% limitations under the License.
matthew@2926
    10
%%
matthew@6072
    11
%% The Original Code is RabbitMQ.
matthew@2926
    12
%%
matthew@6072
    13
%% The Initial Developer of the Original Code is VMware, Inc.
emile@8906
    14
%% Copyright (c) 2007-2012 VMware, Inc.  All rights reserved.
matthew@2926
    15
%%
matthew@2926
    16
matthew@2926
    17
-module(worker_pool).
matthew@2926
    18
matthew@2926
    19
%% Generic worker pool manager.
matthew@2926
    20
%%
matthew@2926
    21
%% Supports nested submission of jobs (nested jobs always run
matthew@2926
    22
%% immediately in current worker process).
matthew@2926
    23
%%
matthew@2926
    24
%% Possible future enhancements:
matthew@2926
    25
%%
matthew@2926
    26
%% 1. Allow priorities (basically, change the pending queue to a
matthew@2926
    27
%% priority_queue).
matthew@2926
    28
matthew@2926
    29
-behaviour(gen_server2).
matthew@2926
    30
matthias@3008
    31
-export([start_link/0, submit/1, submit_async/1, idle/1]).
matthew@2926
    32
matthew@2926
    33
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
matthew@2926
    34
         terminate/2, code_change/3]).
matthew@2926
    35
matthew@2926
    36
%%----------------------------------------------------------------------------
matthew@2926
    37
matthew@2926
    38
-ifdef(use_specs).
matthew@2926
    39
matthias@8788
    40
-type(mfargs() :: {atom(), atom(), [any()]}).
matthias@8788
    41
matthias@4392
    42
-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
matthias@8788
    43
-spec(submit/1 :: (fun (() -> A) | mfargs()) -> A).
matthias@8788
    44
-spec(submit_async/1 :: (fun (() -> any()) | mfargs()) -> 'ok').
alexandru@7795
    45
-spec(idle/1 :: (any()) -> 'ok').
matthew@2926
    46
matthew@2926
    47
-endif.
matthew@2926
    48
matthew@2926
    49
%%----------------------------------------------------------------------------
matthew@2926
    50
matthew@2926
    51
-define(SERVER, ?MODULE).
matthew@2926
    52
-define(HIBERNATE_AFTER_MIN, 1000).
matthew@2926
    53
-define(DESIRED_HIBERNATE, 10000).
matthew@2926
    54
matthew@2926
    55
-record(state, { available, pending }).
matthew@2926
    56
matthew@2926
    57
%%----------------------------------------------------------------------------
matthew@2926
    58
matthew@2926
    59
start_link() ->
matthew@2926
    60
    gen_server2:start_link({local, ?SERVER}, ?MODULE, [],
matthew@2926
    61
                           [{timeout, infinity}]).
matthew@2926
    62
matthew@2926
    63
submit(Fun) ->
matthew@2926
    64
    case get(worker_pool_worker) of
matthew@2926
    65
        true -> worker_pool_worker:run(Fun);
matthew@2926
    66
        _    -> Pid = gen_server2:call(?SERVER, next_free, infinity),
matthew@2926
    67
                worker_pool_worker:submit(Pid, Fun)
matthew@2926
    68
    end.
matthew@2926
    69
matthias@3008
    70
submit_async(Fun) ->
matthias@3008
    71
    gen_server2:cast(?SERVER, {run_async, Fun}).
matthias@3008
    72
matthew@2926
    73
idle(WId) ->
matthew@2926
    74
    gen_server2:cast(?SERVER, {idle, WId}).
matthew@2926
    75
matthew@2926
    76
%%----------------------------------------------------------------------------
matthew@2926
    77
matthew@2926
    78
init([]) ->
matthew@2926
    79
    {ok, #state { pending = queue:new(), available = queue:new() }, hibernate,
matthew@2926
    80
     {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
matthew@2926
    81
matthew@2926
    82
handle_call(next_free, From, State = #state { available = Avail,
matthew@2926
    83
                                              pending = Pending }) ->
matthew@2926
    84
    case queue:out(Avail) of
matthew@2926
    85
        {empty, _Avail} ->
matthias@3008
    86
            {noreply,
matthias@3008
    87
             State #state { pending = queue:in({next_free, From}, Pending) },
matthew@2961
    88
             hibernate};
matthew@2926
    89
        {{value, WId}, Avail1} ->
matthew@2961
    90
            {reply, get_worker_pid(WId), State #state { available = Avail1 },
matthew@2961
    91
             hibernate}
matthew@2926
    92
    end;
matthew@2926
    93
matthew@2926
    94
handle_call(Msg, _From, State) ->
matthew@2926
    95
    {stop, {unexpected_call, Msg}, State}.
matthew@2926
    96
matthew@2926
    97
handle_cast({idle, WId}, State = #state { available = Avail,
matthew@2926
    98
                                          pending = Pending }) ->
matthew@2926
    99
    {noreply, case queue:out(Pending) of
matthew@2926
   100
                  {empty, _Pending} ->
matthew@2926
   101
                      State #state { available = queue:in(WId, Avail) };
matthias@3008
   102
                  {{value, {next_free, From}}, Pending1} ->
matthew@2926
   103
                      gen_server2:reply(From, get_worker_pid(WId)),
matthias@3008
   104
                      State #state { pending = Pending1 };
matthias@3008
   105
                  {{value, {run_async, Fun}}, Pending1} ->
matthias@3008
   106
                      worker_pool_worker:submit_async(get_worker_pid(WId), Fun),
matthew@2926
   107
                      State #state { pending = Pending1 }
matthew@2961
   108
              end, hibernate};
matthew@2926
   109
matthias@3008
   110
handle_cast({run_async, Fun}, State = #state { available = Avail,
matthias@3008
   111
                                               pending = Pending }) ->
matthias@3008
   112
    {noreply,
matthias@3008
   113
     case queue:out(Avail) of
matthias@3008
   114
         {empty, _Avail} ->
matthias@3008
   115
             State #state { pending = queue:in({run_async, Fun}, Pending)};
matthias@3008
   116
         {{value, WId}, Avail1} ->
matthias@3008
   117
             worker_pool_worker:submit_async(get_worker_pid(WId), Fun),
matthias@3008
   118
             State #state { available = Avail1 }
matthias@3008
   119
     end, hibernate};
matthias@3008
   120
matthew@2926
   121
handle_cast(Msg, State) ->
matthew@2926
   122
    {stop, {unexpected_cast, Msg}, State}.
matthew@2926
   123
matthew@2926
   124
handle_info(Msg, State) ->
matthew@2926
   125
    {stop, {unexpected_info, Msg}, State}.
matthew@2926
   126
matthew@2926
   127
code_change(_OldVsn, State, _Extra) ->
matthew@2926
   128
    {ok, State}.
matthew@2926
   129
matthew@2926
   130
terminate(_Reason, State) ->
matthew@2926
   131
    State.
matthew@2926
   132
matthew@2926
   133
%%----------------------------------------------------------------------------
matthew@2926
   134
matthew@2926
   135
get_worker_pid(WId) ->
matthew@2926
   136
    [{WId, Pid, _Type, _Modules} | _] =
matthew@2926
   137
        lists:dropwhile(fun ({Id, _Pid, _Type, _Modules})
matthew@2926
   138
                              when Id =:= WId -> false;
matthew@2927
   139
                            (_)               -> true
matthew@2926
   140
                        end,
matthew@2926
   141
                        supervisor:which_children(worker_pool_sup)),
matthew@2926
   142
    Pid.