Skip to main content

erlangs gen_fsm with queueing

OK no time for a full blow, polished blog post; When using gen_fsm I realized that often I want some sort of queueing. Two reasons promote this:
  1. to reduce the complexity caused by {States} x {Events}
  2. to implement some kind of asynchronous command execution processing model

I usualle encounter both reasons and I ended up with two useful function that I copy into every gen_fsm I build:

deq(State = #state{evtq = []}) ->
    {next_state, ready, State};
deq(State = #state{evtq = [Msg | T]}) ->
    ?MODULE:ready(Msg, State#state{evtq = T}).

This requires one adds evtq as a field to the #state{} record.

The function deq/1 will be used to work of events(one could say commands) until the queue is empty and finally rests in the state ready.

This function is called at the point where one would return {next_state, ready, State} So instead of transitioning directly to ready we call the deq function which will process all defered events.

On the other hand there is a function enq/2 which is called whenever the process is in a state where it cannot handle a new request, and wants to defer the event:

enq(Msg, State = #state{evtq = EvtQ}) ->
    State#state{evtq = EvtQ ++ [Msg]}.

Isn't this helpful? compared to the complexity this little trick offers a lot of value for me.

Here is a small example; Imagine a gen_fsm representing an RTP connection. The connection must be established by some dsp processing hardware and the dsp will tell the process when it's done asynchronously.

After that, the connection calls a callback provided by the user to indicate the connection can now be assumed established. Some time in the future the user requests the RTP connection process to tell the dsp resource to stop sending data to the rtp destination, and the connection process will exit.

Now there are quite a few clauses in this li'll statemachine, because the user might send the disconnect request anytime before or after the rtp connection is established.

To handle this the process simply queues the disconnect request until it is ready. I hope this incomplete piece of code helps:

-module(rtp_connection).

-behaviour(gen_fsm).

-export(.....).

start_link(...) -> ...


[gen_fsm boilerplate]

%%------------------------------------------------------------------------------
%% @doc
%% Will exit the processes and reset the RTP destination of the voice channel.
%% @end
%%------------------------------------------------------------------------------
-spec disconnect(core_rtp:connection()) ->
       ok.
disconnect(Ref) ->
    gen_fsm:send_event(lbm_object:pid(Ref), disconnect).

%%%=============================================================================
%%% acme_voice_channel_op Callbacks
%%%=============================================================================

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
update(CBRef, Ctx) ->
    gen_fsm:sync_send_event(lbm_object:pid(CBRef), {perform_update, Ctx}).

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
voice_config_succeeded(CBRef) ->
    gen_fsm:send_event(lbm_object:pid(CBRef), update_success).

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
voice_config_failed(CBRef) ->
    gen_fsm:send_event(lbm_object:pid(CBRef), update_failure).

%%%=============================================================================
%%% gen_fsm Callbacks
%%%=============================================================================

-record(state, {
          self_ref :: core_connection:ref(),
          channel  :: pid(),
          rtp_dest :: core_rtp:endpoint(),
          rtp_cb   :: core_rtp_cb:ref(),
          evtq     :: [any()]}).

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
init({ChannelPid, RTPDest, RTPCB, ChannelEP}) ->
    Ref = core_connection:ref(self(), ?MODULE, ChannelEP, RTPDest),
    gen_fsm:send_event(self(), connect),
    {ok,
     initializing,
     #state{
       self_ref = Ref,
       channel = ChannelPid,
       rtp_dest = RTPDest,
       rtp_cb = RTPCB,
       evtq = []}}.

%%%=============================================================================
%%% gen_fsm state functions
%%%=============================================================================

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
initializing(connect, State = #state{channel = Ch}) ->
    Self = acme_voice_channel_op:ref(self(), ?MODULE),
    acme_voice_channel:request_update(Ch, Self),
    {next_state, initializing, State};

initializing(M, State) ->
    {next_state, initializing, enq(M, State)}.

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
initializing({perform_update, Ctx},
             _From,
             State = #state{rtp_dest = RtpDest}) ->
    NewCtx = acme_channel_common:set_rtp_dest(Ctx, RtpDest),
    {reply, NewCtx, connecting, State}.

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
connecting(update_failure, State) ->
    {stop, voice_config_failed, State};
connecting(update_success, State = #state{self_ref =  Self,
                                          rtp_cb = RtpCB}) ->
    core_rtp_cb:rtp_connected(RtpCB, Self),
    deq(State);

connecting(M, State) ->
    {next_state, connecting, enq(M, State)}.

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
ready(disconnect, State = #state{channel = Ch}) ->
    Self = acme_voice_channel_op:ref(self(), ?MODULE),
    acme_voice_channel:request_update(Ch, Self),
    {next_state, disconnecting, State};

ready(Msg, State) ->
    error_logger:error_report(acme_driver,
                              [{?MODULE, self()}, {ignoring, Msg}]),
    deq(State).
%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
disconnecting({perform_update, Ctx},
              _From,
              State) ->
    NewCtx = acme_channel_common:reset_rtp_dest(Ctx),
    {reply, NewCtx, disconnecting, State}.

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
disconnecting(update_failure, State) ->
    {stop, voice_config_failed, State};
disconnecting(update_success, State) ->
    {stop, normal, State}.

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
deq(State = #state{evtq = []}) ->
    {next_state, ready, State};
deq(State = #state{evtq = [Msg | T]}) ->
    ?MODULE:ready(Msg, State#state{evtq = T}).

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
enq(Msg, State = #state{evtq = EvtQ}) ->
    State#state{evtq = EvtQ ++ [Msg]}.

Thanks for reading, feedback very welcome.

Follow me on Twitter @SvenHeyll

Comments

Popular posts from this blog

The purpose of the MOCK

In response to a much nicer blog entry, that can be found here . There are actually several distinct "tests" that make up usual unit tests, among them two that really do stand out: one kind of testing to test method flows, one to test some sort of computation. Mock objects are for the purpose of testing method flows. A method flow is a series of message transmissions to dependent objects. The control flow logic inside the method(the ifs and whiles) will alter the flow in repsonse to the parameters of the method call parameters passed by calling the method under test, depending on the state of the object that contains the method under test and the return values of the external method calls(aka responses to the messages sent). There should be one test method for every branch of an if statement, and usuale some sort of mock control objects in the mock framework will handle loop checking. BTW: I partly use message transmission instead of method invocation to include other kind...

Learning Haskell, functional music

As you might have realized, I started to learn Haskell. One of the most fun things to do in any programming language is creating some kind of audible side effects with a program. Already back in the days when I started programming, I always played around with audio when toying around with a new language. I have found a wonderful set of lecture slides about haskell and multimedia programming, called school of expression. Inspired by the slides about functional music I implemented a little song. Ahh ... and yes it is intended to sound slightly strange . I used the synthesis toolkit to transform the music to real noise, simply by piping skini message to std-out. I used this command line to achieve the results audible in the table: sven@hhi1214a:~/Mukke$ ghc -o test1 test1.hs && ./test1 | stk-demo Plucked -n 16 -or -ip Sound samples: Plucked play Clarinet play Whistle(attention very crazy!) play As always the source... stueck = anfang :+: mitte :+: ende anfang = groovy :+: (Trans ...

Erlang mock - erlymock

NOTE THIS POST IS OUTDATED! The project has evolved and can be found here: ErlyMock Some features Easy to use Design based on easymock Works together with otp: can be used even if the clut is called from another process, by invoking mock:verify_after_last_call(Mock,optional: timeout) custom return functions predefined return functions for returning values, receiving message, throwing exceptions, etc.. erlymock automatically purges all modules that were mocked, after verify() Custom argument matchers: %% Orderchecking types: in_order, out_of_order, stub; %% Answering: {return, ...}|{error, ...}|{throw, ...}|{exit, ...}|{rec_msg, Pid}|{function, Fun(Args) -> RetVal} expect(Mock, Type, Module, Function, Arguments, Answer = {AT, _}) when AT==return;AT==error;AT==throw;AT==exit;AT==rec_msg;AT==function -> call(Mock, {expect, Type, Module, Function, length(Arguments), {Arguments, Answer}}). %% this version of expect is suited for useing custom argument matchers expect(Mock, Type, ...