Friday, May 18, 2012

erlangs gen_fsm with queueing

OK no time for a full blow, polished blog post; When using gen_fsm I realized that often I want some sort of queueing. Two reasons promote this:
  1. to reduce the complexity caused by {States} x {Events}
  2. to implement some kind of asynchronous command execution processing model

I usualle encounter both reasons and I ended up with two useful function that I copy into every gen_fsm I build:

deq(State = #state{evtq = []}) ->
    {next_state, ready, State};
deq(State = #state{evtq = [Msg | T]}) ->
    ?MODULE:ready(Msg, State#state{evtq = T}).

This requires one adds evtq as a field to the #state{} record.

The function deq/1 will be used to work of events(one could say commands) until the queue is empty and finally rests in the state ready.

This function is called at the point where one would return {next_state, ready, State} So instead of transitioning directly to ready we call the deq function which will process all defered events.

On the other hand there is a function enq/2 which is called whenever the process is in a state where it cannot handle a new request, and wants to defer the event:

enq(Msg, State = #state{evtq = EvtQ}) ->
    State#state{evtq = EvtQ ++ [Msg]}.

Isn't this helpful? compared to the complexity this little trick offers a lot of value for me.

Here is a small example; Imagine a gen_fsm representing an RTP connection. The connection must be established by some dsp processing hardware and the dsp will tell the process when it's done asynchronously.

After that, the connection calls a callback provided by the user to indicate the connection can now be assumed established. Some time in the future the user requests the RTP connection process to tell the dsp resource to stop sending data to the rtp destination, and the connection process will exit.

Now there are quite a few clauses in this li'll statemachine, because the user might send the disconnect request anytime before or after the rtp connection is established.

To handle this the process simply queues the disconnect request until it is ready. I hope this incomplete piece of code helps:

-module(rtp_connection).

-behaviour(gen_fsm).

-export(.....).

start_link(...) -> ...


[gen_fsm boilerplate]

%%------------------------------------------------------------------------------
%% @doc
%% Will exit the processes and reset the RTP destination of the voice channel.
%% @end
%%------------------------------------------------------------------------------
-spec disconnect(core_rtp:connection()) ->
       ok.
disconnect(Ref) ->
    gen_fsm:send_event(lbm_object:pid(Ref), disconnect).

%%%=============================================================================
%%% acme_voice_channel_op Callbacks
%%%=============================================================================

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
update(CBRef, Ctx) ->
    gen_fsm:sync_send_event(lbm_object:pid(CBRef), {perform_update, Ctx}).

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
voice_config_succeeded(CBRef) ->
    gen_fsm:send_event(lbm_object:pid(CBRef), update_success).

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
voice_config_failed(CBRef) ->
    gen_fsm:send_event(lbm_object:pid(CBRef), update_failure).

%%%=============================================================================
%%% gen_fsm Callbacks
%%%=============================================================================

-record(state, {
          self_ref :: core_connection:ref(),
          channel  :: pid(),
          rtp_dest :: core_rtp:endpoint(),
          rtp_cb   :: core_rtp_cb:ref(),
          evtq     :: [any()]}).

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
init({ChannelPid, RTPDest, RTPCB, ChannelEP}) ->
    Ref = core_connection:ref(self(), ?MODULE, ChannelEP, RTPDest),
    gen_fsm:send_event(self(), connect),
    {ok,
     initializing,
     #state{
       self_ref = Ref,
       channel = ChannelPid,
       rtp_dest = RTPDest,
       rtp_cb = RTPCB,
       evtq = []}}.

%%%=============================================================================
%%% gen_fsm state functions
%%%=============================================================================

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
initializing(connect, State = #state{channel = Ch}) ->
    Self = acme_voice_channel_op:ref(self(), ?MODULE),
    acme_voice_channel:request_update(Ch, Self),
    {next_state, initializing, State};

initializing(M, State) ->
    {next_state, initializing, enq(M, State)}.

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
initializing({perform_update, Ctx},
             _From,
             State = #state{rtp_dest = RtpDest}) ->
    NewCtx = acme_channel_common:set_rtp_dest(Ctx, RtpDest),
    {reply, NewCtx, connecting, State}.

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
connecting(update_failure, State) ->
    {stop, voice_config_failed, State};
connecting(update_success, State = #state{self_ref =  Self,
                                          rtp_cb = RtpCB}) ->
    core_rtp_cb:rtp_connected(RtpCB, Self),
    deq(State);

connecting(M, State) ->
    {next_state, connecting, enq(M, State)}.

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
ready(disconnect, State = #state{channel = Ch}) ->
    Self = acme_voice_channel_op:ref(self(), ?MODULE),
    acme_voice_channel:request_update(Ch, Self),
    {next_state, disconnecting, State};

ready(Msg, State) ->
    error_logger:error_report(acme_driver,
                              [{?MODULE, self()}, {ignoring, Msg}]),
    deq(State).
%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
disconnecting({perform_update, Ctx},
              _From,
              State) ->
    NewCtx = acme_channel_common:reset_rtp_dest(Ctx),
    {reply, NewCtx, disconnecting, State}.

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
disconnecting(update_failure, State) ->
    {stop, voice_config_failed, State};
disconnecting(update_success, State) ->
    {stop, normal, State}.

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
deq(State = #state{evtq = []}) ->
    {next_state, ready, State};
deq(State = #state{evtq = [Msg | T]}) ->
    ?MODULE:ready(Msg, State#state{evtq = T}).

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
enq(Msg, State = #state{evtq = EvtQ}) ->
    State#state{evtq = EvtQ ++ [Msg]}.

Thanks for reading, feedback very welcome.

Follow me on Twitter @SvenHeyll

No comments: