https://fsharpforfunandprofit.com/posts/concurrency-reactive/
Events are everywhere. Almost every program has to handle events, whether it be button clicks in the user interface, listening to sockets in a server, or even a system shutdown notification.
And events are the basis of one of the most common OO design patterns: the “Observer” pattern.
But as we know, event handling, like concurrency in general, can be tricky to implement. Simple event logic is straightforward, but what about logic like “do something if two events happen in a row but do something different if only one event happens” or “do something if two events happen at roughly the same time”. And how easy is it to combine these requirements in other, more complex ways?
Even if you can successfully implement these requirements, the code tends to be spaghetti like and hard to understand, even with the best intentions.
Is there an approach that can make event handling easier?
We saw in the previous post on message queues that one of the advantages of that approach was that the requests were “serialized” making it conceptually easier to deal with.
There is a similar approach that can be used with events. The idea is to turn a series of events into an “event stream”. Event streams then become quite like IEnumerables, and so the obvious next step is to treat them in much the the same way that LINQ handles collections, so that they can be filtered, mapped, split and combined.
F# has built in support for this model, as well as for the more traditional approach.
A simple event stream
Let’s start with a simple example to compare the two approaches. We’ll implement the classic event handler approach first.
First, we define a utility function that will:
- create a timer
- register a handler for the
Elapsed
event - run the timer for five seconds and then stop it
Here’s the code:
1 | open System |
Now test it interactively:
1 | // create a handler. The event args are ignored |
Now let’s create a similar utility method to create a timer, but this time it will return an “observable” as well, which is the stream of events.
1 | let createTimerAndObservable timerInterval = |
And again test it interactively:
1 | // create the timer and the corresponding observable |
The difference is that instead of registering a handler directly with an event, we are “subscribing” to an event stream. Subtly different, and important.
Counting events
In this next example, we’ll have a slightly more complex requirement:
1 | Create a timer that ticks every 500ms. |
To do this in a classic imperative way, we would probably create a class with a mutable counter, as below:
1 | type ImperativeTimerCount() = |
We can reuse the utility functions we created earlier to test it:
1 | // create a handler class |
Let’s see how we would do this same thing in a functional way:
1 | // create the timer and the corresponding observable |
Here we see how you can build up layers of event transformations, just as you do with list transformations in LINQ.
The first transformation is scan
, which accumulates state for each event. It is roughly equivalent to the List.fold
function that we have seen used with lists. In this case, the accumulated state is just a counter.
And then, for each event, the count is printed out.
Note that in this functional approach, we didn’t have any mutable state, and we didn’t need to create any special classes.
Merging multiple event streams
For a final example, we’ll look at merging multiple event streams.
Let’s make a requirement based on the well-known “FizzBuzz” problem:
1 | Create two timers, called '3' and '5'. The '3' timer ticks every 300ms and the '5' timer ticks |
First let’s create some code that both implementations can use.
We’ll want a generic event type that captures the timer id and the time of the tick.
1 | type FizzBuzzEvent = {label:int; time: DateTime} |
And then we need a utility function to see if two events are simultaneous. We’ll be generous and allow a time difference of up to 50ms.
1 | let areSimultaneous (earlierEvent,laterEvent) = |
In the imperative design, we’ll need to keep track of the previous event, so we can compare them. And we’ll need special case code for the first time, when the previous event doesn’t exist
1 | type ImperativeFizzBuzzHandler() = |
Now the code is beginning to get ugly fast! Already we have mutable state, complex conditional logic, and special cases, just for such a simple requirement.
Let’s test it:
1 | // create the class |
It does work, but are you sure the code is not buggy? Are you likely to accidentally break something if you change it?
The problem with this imperative code is that it has a lot of noise that obscures the the requirements.
Can the functional version do better? Let’s see!
First, we create two event streams, one for each timer:
1 | let timer3, timerEventStream3 = createTimerAndObservable 300 |
Next, we convert each event on the “raw” event streams into our FizzBuzz event type:
1 | // convert the time events into FizzBuzz events with the appropriate id |
Now, to see if two events are simultaneous, we need to compare them from the two different streams somehow.
It’s actually easier than it sounds, because we can:
- combine the two streams into a single stream:
- then create pairs of sequential events
- then test the pairs to see if they are simultaneous
- then split the input stream into two new output streams based on that test
Here’s the actual code to do this:
1 | // combine the two streams |
Finally, we can split the nonSimultaneousStream
again, based on the event id:
1 | // split the non-simultaneous stream based on the id |
Let’s review so far. We have started with the two original event streams and from them created four new ones:
combinedStream
contains all the eventssimultaneousStream
contains only the simultaneous eventsfizzStream
contains only the non-simultaneous events with id=3buzzStream
contains only the non-simultaneous events with id=5
Now all we need to do is attach behavior to each stream:
1 | //print events from the combinedStream |
Let’s test it:
1 | // run the two timers at the same time |
Here’s all the code in one complete set:
1 | // create the event streams and raw observables |
The code might seem a bit long winded, but this kind of incremental, step-wise approach is very clear and self-documenting.
Some of the benefits of this style are:
- I can see that it meets the requirements just by looking at it, without even running it. Not so with the imperative version.
- From a design point of view, each final “output” stream follows the single responsibility principle – it only does one thing – so it is very easy to associate behavior with it.
- This code has no conditionals, no mutable state, no edge cases. It would be easy to maintain or change, I hope.
- It is easy to debug. For example, I could easily “tap” the output of the
simultaneousStream
to see if it contains what I think it contains:
1 | // debugging code |
This would be much harder in the imperative version.
Summary
Functional Reactive Programming (known as FRP) is a big topic, and we’ve only just touched on it here. I hope this introduction has given you a glimpse of the usefulness of this way of doing things.
If you want to learn more, see the documentation for the F# Observable module, which has the basic transformations used above. And there is also the Reactive Extensions (Rx) library which shipped as part of .NET 4. That contains many other additional transformations.