Skip to main content

erlangs gen_fsm with queueing

OK no time for a full blow, polished blog post; When using gen_fsm I realized that often I want some sort of queueing. Two reasons promote this:
  1. to reduce the complexity caused by {States} x {Events}
  2. to implement some kind of asynchronous command execution processing model

I usualle encounter both reasons and I ended up with two useful function that I copy into every gen_fsm I build:

deq(State = #state{evtq = []}) ->
    {next_state, ready, State};
deq(State = #state{evtq = [Msg | T]}) ->
    ?MODULE:ready(Msg, State#state{evtq = T}).

This requires one adds evtq as a field to the #state{} record.

The function deq/1 will be used to work of events(one could say commands) until the queue is empty and finally rests in the state ready.

This function is called at the point where one would return {next_state, ready, State} So instead of transitioning directly to ready we call the deq function which will process all defered events.

On the other hand there is a function enq/2 which is called whenever the process is in a state where it cannot handle a new request, and wants to defer the event:

enq(Msg, State = #state{evtq = EvtQ}) ->
    State#state{evtq = EvtQ ++ [Msg]}.

Isn't this helpful? compared to the complexity this little trick offers a lot of value for me.

Here is a small example; Imagine a gen_fsm representing an RTP connection. The connection must be established by some dsp processing hardware and the dsp will tell the process when it's done asynchronously.

After that, the connection calls a callback provided by the user to indicate the connection can now be assumed established. Some time in the future the user requests the RTP connection process to tell the dsp resource to stop sending data to the rtp destination, and the connection process will exit.

Now there are quite a few clauses in this li'll statemachine, because the user might send the disconnect request anytime before or after the rtp connection is established.

To handle this the process simply queues the disconnect request until it is ready. I hope this incomplete piece of code helps:

-module(rtp_connection).

-behaviour(gen_fsm).

-export(.....).

start_link(...) -> ...


[gen_fsm boilerplate]

%%------------------------------------------------------------------------------
%% @doc
%% Will exit the processes and reset the RTP destination of the voice channel.
%% @end
%%------------------------------------------------------------------------------
-spec disconnect(core_rtp:connection()) ->
       ok.
disconnect(Ref) ->
    gen_fsm:send_event(lbm_object:pid(Ref), disconnect).

%%%=============================================================================
%%% acme_voice_channel_op Callbacks
%%%=============================================================================

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
update(CBRef, Ctx) ->
    gen_fsm:sync_send_event(lbm_object:pid(CBRef), {perform_update, Ctx}).

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
voice_config_succeeded(CBRef) ->
    gen_fsm:send_event(lbm_object:pid(CBRef), update_success).

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
voice_config_failed(CBRef) ->
    gen_fsm:send_event(lbm_object:pid(CBRef), update_failure).

%%%=============================================================================
%%% gen_fsm Callbacks
%%%=============================================================================

-record(state, {
          self_ref :: core_connection:ref(),
          channel  :: pid(),
          rtp_dest :: core_rtp:endpoint(),
          rtp_cb   :: core_rtp_cb:ref(),
          evtq     :: [any()]}).

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
init({ChannelPid, RTPDest, RTPCB, ChannelEP}) ->
    Ref = core_connection:ref(self(), ?MODULE, ChannelEP, RTPDest),
    gen_fsm:send_event(self(), connect),
    {ok,
     initializing,
     #state{
       self_ref = Ref,
       channel = ChannelPid,
       rtp_dest = RTPDest,
       rtp_cb = RTPCB,
       evtq = []}}.

%%%=============================================================================
%%% gen_fsm state functions
%%%=============================================================================

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
initializing(connect, State = #state{channel = Ch}) ->
    Self = acme_voice_channel_op:ref(self(), ?MODULE),
    acme_voice_channel:request_update(Ch, Self),
    {next_state, initializing, State};

initializing(M, State) ->
    {next_state, initializing, enq(M, State)}.

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
initializing({perform_update, Ctx},
             _From,
             State = #state{rtp_dest = RtpDest}) ->
    NewCtx = acme_channel_common:set_rtp_dest(Ctx, RtpDest),
    {reply, NewCtx, connecting, State}.

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
connecting(update_failure, State) ->
    {stop, voice_config_failed, State};
connecting(update_success, State = #state{self_ref =  Self,
                                          rtp_cb = RtpCB}) ->
    core_rtp_cb:rtp_connected(RtpCB, Self),
    deq(State);

connecting(M, State) ->
    {next_state, connecting, enq(M, State)}.

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
ready(disconnect, State = #state{channel = Ch}) ->
    Self = acme_voice_channel_op:ref(self(), ?MODULE),
    acme_voice_channel:request_update(Ch, Self),
    {next_state, disconnecting, State};

ready(Msg, State) ->
    error_logger:error_report(acme_driver,
                              [{?MODULE, self()}, {ignoring, Msg}]),
    deq(State).
%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
disconnecting({perform_update, Ctx},
              _From,
              State) ->
    NewCtx = acme_channel_common:reset_rtp_dest(Ctx),
    {reply, NewCtx, disconnecting, State}.

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
disconnecting(update_failure, State) ->
    {stop, voice_config_failed, State};
disconnecting(update_success, State) ->
    {stop, normal, State}.

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
deq(State = #state{evtq = []}) ->
    {next_state, ready, State};
deq(State = #state{evtq = [Msg | T]}) ->
    ?MODULE:ready(Msg, State#state{evtq = T}).

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
enq(Msg, State = #state{evtq = EvtQ}) ->
    State#state{evtq = EvtQ ++ [Msg]}.

Thanks for reading, feedback very welcome.

Follow me on Twitter @SvenHeyll

Comments

Popular posts from this blog

Lazy Evaluation(there be dragons and basement cats)

Lazy Evaluation and "undefined" I am on the road to being a haskell programmer, and it still is a long way to go. Yesterday I had some nice guys from #haskell explain to me lazy evaluation. Take a look at this code: Prelude> let x = undefined in "hello world" "hello world" Prelude> Because of Haskells lazyness, x will not be evaluated because it is not used, hence undefined will not be evaluated and no exception will occur. The evaluation of "undefined" will result in a runtime exception: Prelude> undefined *** Exception: Prelude.undefined Prelude> Strictness Strictness means that the result of a function is undefined, if one of the arguments, the function is applied to, is undefined. Classical programming languages are strict. The following example in Java will demonstrate this. When the programm is run, it will throw a RuntimeException, although the variable "evilX" is never actually used, strictness requires that all argu

Learning Haskell, functional music

As you might have realized, I started to learn Haskell. One of the most fun things to do in any programming language is creating some kind of audible side effects with a program. Already back in the days when I started programming, I always played around with audio when toying around with a new language. I have found a wonderful set of lecture slides about haskell and multimedia programming, called school of expression. Inspired by the slides about functional music I implemented a little song. Ahh ... and yes it is intended to sound slightly strange . I used the synthesis toolkit to transform the music to real noise, simply by piping skini message to std-out. I used this command line to achieve the results audible in the table: sven@hhi1214a:~/Mukke$ ghc -o test1 test1.hs && ./test1 | stk-demo Plucked -n 16 -or -ip Sound samples: Plucked play Clarinet play Whistle(attention very crazy!) play As always the source... stueck = anfang :+: mitte :+: ende anfang = groovy :+: (Trans

The purpose of the MOCK

In response to a much nicer blog entry, that can be found here . There are actually several distinct "tests" that make up usual unit tests, among them two that really do stand out: one kind of testing to test method flows, one to test some sort of computation. Mock objects are for the purpose of testing method flows. A method flow is a series of message transmissions to dependent objects. The control flow logic inside the method(the ifs and whiles) will alter the flow in repsonse to the parameters of the method call parameters passed by calling the method under test, depending on the state of the object that contains the method under test and the return values of the external method calls(aka responses to the messages sent). There should be one test method for every branch of an if statement, and usuale some sort of mock control objects in the mock framework will handle loop checking. BTW: I partly use message transmission instead of method invocation to include other kind