Skip to main content

erlangs gen_fsm with queueing

OK no time for a full blow, polished blog post; When using gen_fsm I realized that often I want some sort of queueing. Two reasons promote this:
  1. to reduce the complexity caused by {States} x {Events}
  2. to implement some kind of asynchronous command execution processing model

I usualle encounter both reasons and I ended up with two useful function that I copy into every gen_fsm I build:

deq(State = #state{evtq = []}) ->
    {next_state, ready, State};
deq(State = #state{evtq = [Msg | T]}) ->
    ?MODULE:ready(Msg, State#state{evtq = T}).

This requires one adds evtq as a field to the #state{} record.

The function deq/1 will be used to work of events(one could say commands) until the queue is empty and finally rests in the state ready.

This function is called at the point where one would return {next_state, ready, State} So instead of transitioning directly to ready we call the deq function which will process all defered events.

On the other hand there is a function enq/2 which is called whenever the process is in a state where it cannot handle a new request, and wants to defer the event:

enq(Msg, State = #state{evtq = EvtQ}) ->
    State#state{evtq = EvtQ ++ [Msg]}.

Isn't this helpful? compared to the complexity this little trick offers a lot of value for me.

Here is a small example; Imagine a gen_fsm representing an RTP connection. The connection must be established by some dsp processing hardware and the dsp will tell the process when it's done asynchronously.

After that, the connection calls a callback provided by the user to indicate the connection can now be assumed established. Some time in the future the user requests the RTP connection process to tell the dsp resource to stop sending data to the rtp destination, and the connection process will exit.

Now there are quite a few clauses in this li'll statemachine, because the user might send the disconnect request anytime before or after the rtp connection is established.

To handle this the process simply queues the disconnect request until it is ready. I hope this incomplete piece of code helps:

-module(rtp_connection).

-behaviour(gen_fsm).

-export(.....).

start_link(...) -> ...


[gen_fsm boilerplate]

%%------------------------------------------------------------------------------
%% @doc
%% Will exit the processes and reset the RTP destination of the voice channel.
%% @end
%%------------------------------------------------------------------------------
-spec disconnect(core_rtp:connection()) ->
       ok.
disconnect(Ref) ->
    gen_fsm:send_event(lbm_object:pid(Ref), disconnect).

%%%=============================================================================
%%% acme_voice_channel_op Callbacks
%%%=============================================================================

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
update(CBRef, Ctx) ->
    gen_fsm:sync_send_event(lbm_object:pid(CBRef), {perform_update, Ctx}).

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
voice_config_succeeded(CBRef) ->
    gen_fsm:send_event(lbm_object:pid(CBRef), update_success).

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
voice_config_failed(CBRef) ->
    gen_fsm:send_event(lbm_object:pid(CBRef), update_failure).

%%%=============================================================================
%%% gen_fsm Callbacks
%%%=============================================================================

-record(state, {
          self_ref :: core_connection:ref(),
          channel  :: pid(),
          rtp_dest :: core_rtp:endpoint(),
          rtp_cb   :: core_rtp_cb:ref(),
          evtq     :: [any()]}).

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
init({ChannelPid, RTPDest, RTPCB, ChannelEP}) ->
    Ref = core_connection:ref(self(), ?MODULE, ChannelEP, RTPDest),
    gen_fsm:send_event(self(), connect),
    {ok,
     initializing,
     #state{
       self_ref = Ref,
       channel = ChannelPid,
       rtp_dest = RTPDest,
       rtp_cb = RTPCB,
       evtq = []}}.

%%%=============================================================================
%%% gen_fsm state functions
%%%=============================================================================

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
initializing(connect, State = #state{channel = Ch}) ->
    Self = acme_voice_channel_op:ref(self(), ?MODULE),
    acme_voice_channel:request_update(Ch, Self),
    {next_state, initializing, State};

initializing(M, State) ->
    {next_state, initializing, enq(M, State)}.

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
initializing({perform_update, Ctx},
             _From,
             State = #state{rtp_dest = RtpDest}) ->
    NewCtx = acme_channel_common:set_rtp_dest(Ctx, RtpDest),
    {reply, NewCtx, connecting, State}.

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
connecting(update_failure, State) ->
    {stop, voice_config_failed, State};
connecting(update_success, State = #state{self_ref =  Self,
                                          rtp_cb = RtpCB}) ->
    core_rtp_cb:rtp_connected(RtpCB, Self),
    deq(State);

connecting(M, State) ->
    {next_state, connecting, enq(M, State)}.

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
ready(disconnect, State = #state{channel = Ch}) ->
    Self = acme_voice_channel_op:ref(self(), ?MODULE),
    acme_voice_channel:request_update(Ch, Self),
    {next_state, disconnecting, State};

ready(Msg, State) ->
    error_logger:error_report(acme_driver,
                              [{?MODULE, self()}, {ignoring, Msg}]),
    deq(State).
%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
disconnecting({perform_update, Ctx},
              _From,
              State) ->
    NewCtx = acme_channel_common:reset_rtp_dest(Ctx),
    {reply, NewCtx, disconnecting, State}.

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
disconnecting(update_failure, State) ->
    {stop, voice_config_failed, State};
disconnecting(update_success, State) ->
    {stop, normal, State}.

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
deq(State = #state{evtq = []}) ->
    {next_state, ready, State};
deq(State = #state{evtq = [Msg | T]}) ->
    ?MODULE:ready(Msg, State#state{evtq = T}).

%%------------------------------------------------------------------------------
%% @private
%%------------------------------------------------------------------------------
enq(Msg, State = #state{evtq = EvtQ}) ->
    State#state{evtq = EvtQ ++ [Msg]}.

Thanks for reading, feedback very welcome.

Follow me on Twitter @SvenHeyll

Comments

Popular posts from this blog

Learning Haskell, functional music

As you might have realized, I started to learn Haskell. One of the most fun things to do in any programming language is creating some kind of audible side effects with a program. Already back in the days when I started programming, I always played around with audio when toying around with a new language. I have found a wonderful set of lecture slides about haskell and multimedia programming, called school of expression. Inspired by the slides about functional music I implemented a little song. Ahh ... and yes it is intended to sound slightly strange . I used the synthesis toolkit to transform the music to real noise, simply by piping skini message to std-out. I used this command line to achieve the results audible in the table: sven@hhi1214a:~/Mukke$ ghc -o test1 test1.hs && ./test1 | stk-demo Plucked -n 16 -or -ip Sound samples: Plucked play Clarinet play Whistle(attention very crazy!) play As always the source... stueck = anfang :+: mitte :+: ende anfang = groovy :+: (Trans ...

The purpose of the MOCK

In response to a much nicer blog entry, that can be found here . There are actually several distinct "tests" that make up usual unit tests, among them two that really do stand out: one kind of testing to test method flows, one to test some sort of computation. Mock objects are for the purpose of testing method flows. A method flow is a series of message transmissions to dependent objects. The control flow logic inside the method(the ifs and whiles) will alter the flow in repsonse to the parameters of the method call parameters passed by calling the method under test, depending on the state of the object that contains the method under test and the return values of the external method calls(aka responses to the messages sent). There should be one test method for every branch of an if statement, and usuale some sort of mock control objects in the mock framework will handle loop checking. BTW: I partly use message transmission instead of method invocation to include other kind...

Keys, Values and Rules: Three Important Shake Concepts

The title was a click-bait! This article will actually try to explain five instead of three important notions in Shake. These are: Rules Keys Values The Build Database Actions This short blog post was inspired by the hurdles with my Shake based build, after the new Shake version was released, which had breaking API changes. Jump to the next section if you are not interested in the why and how of this blog post. Shake is rule based build system much like GNU make. Like make it is robust, unlike make, it is pretty fast and supports dynamic build dependencies. But you knew all that already, if you are the target audience of this post, since this post is about me explaining to myself by explaining to you, how that build tool, I used for years, actually works. Although I used it for years, I never read the paper or wrapped my head around it more than absolutely necessary to get the job done. When Shake was updated to version 0.16.x, the internal API for custom rules w...