« Back
in Rx Reactive events C# read.

Starting Reactive Extensions With Existing Events In C#.

Reactive extensions treats our events as a type of stream that works with LINQ, allowing us to query, filter, and transform our events. It's particularly useful when business logic gets increasingly complex or when you have to deal with asynchrony. I particularly like this way of dealing with events as it makes the logic for dealing with complex situations much much easier.

There are plenty of learning sources on reactive out there and one of my favourites being Introduction to Rx.
I wanted to put this piece together to introduce it to my development team at work as a starting point and to really just help me learn it. If it helps someone along the way, great!

I’ll be going through a fictitious existing class that fires an event and how we can make it more reactive by treating those events as a stream of events.

Problem Context

Imagine that we had a factory class that produced vehicles. Each time that a vehicle was produced, it would fire an event. We want to do something important when a vehicle is created. Let’s say that Hondas had a paint finish problem because another line of vehicles (Chevy) was being produced at the same time and sprayed way too much paint which crossed over onto the Honda production lines. This only occurred if a Honda and a Chevy were created at the same moment.

Traditionally, having two events fired for each type of vehicle would be easy. However, now we need to see which ones were produced at the same time so we can call a QA manager over to double check the finish. In order to deal with this, we’d have to make some pretty complex logic with mutable state variables to deal with the problem and make sure we didn’t modify anything else.

Vehicle Factory Class

public class Factory  
{
    private string _name;
    private string _productName;

    public event EventHandler<VehicleProducedEventArgs> ProducedVehicle;

    public Factory(string name, string productName)
    {
        _name = name;
        _productName = productName;
    }

    public void MakeProduct(int delay, int id)
    {
        Thread.Sleep(delay);
        OnProducedProduct(new VehicleProducedEventArgs(_productName, id));
    }

    protected virtual void OnProducedProduct(VehicleProducedEventArgs e)
    {
        EventHandler<VehicleProducedEventArgs> handler = ProducedVehicle;
        if (handler != null)
        {
            handler(this, e);
        }
    }
}

The custom event arguments it fires:

public class VehicleProducedEventArgs  
{
    public int Id { get; private set; }
    public string ProductName { get; set; }
    public DateTime ManufactureDate { get; private set; }

    public VehicleProducedEventArgs(string productName, int id)
    {
        this.ProductName = productName;
        ManufactureDate = DateTime.Now;
        this.Id = id;
    }
}

Here we have our standard vehicle factory class which creates vehicles for us. It creates a standard event with the event argument VehicleProducedEventArgs. When a new vehicle is created, an Id, manufacture date, and name are given to the vehicle. We will need to query this information later on using Reactive Extensions to pull the kind of events we are interested in.
Normally if we attach an event handler to the ProducedVehicle event, it would always fire regardless. But like I stated above, we only want to look at specific types of events and let the system chug along normally.

Main Vehicle Maker Program

Our main console program has the following code below. We'll also need to pull in the Rx package from nuget with Install-Package Rx-Main.

static void Main(string[] args)  
{
    var hondaFactory = new Factory("CarFactory", "Honda");
    var chevyFactory = new Factory("TruckFactory", "Chevy");

    var hondaProductionStream = Observable.FromEventPattern<VehicleProducedEventArgs>(hondaFactory, "ProducedVehicle");
    var chevyProductionStream = Observable.FromEventPattern<VehicleProducedEventArgs>(chevyFactory, "ProducedVehicle");
    var hondaAndChevyProductionStream = Observable.Merge(hondaProductionStream, chevyProductionStream);


    var productionLineGroup = hondaAndChevyProductionStream
        .GroupBy(evt => evt.EventArgs.ManufactureDate.Second) // Vehicles created at the same second
        .Subscribe(grp =>
            {
                var contaminatedVehicles = new List<VehicleProducedEventArgs>();

                grp.Subscribe(
                    vehicle =>
                    {
                        contaminatedVehicles.Add(vehicle.EventArgs);
                        if (contaminatedVehicles.Count > 1) // at most two created at the same time
                        {
                            contaminatedVehicles.ForEach(v => Console.WriteLine(v.ProductName + " with ID " + v.Id + " might be contaminated!"));
                        }
                    }
                );
            }
    );

    for (var i = 0; i <= 3; i++)
    {
        hondaFactory.MakeProduct(500, i); //delay in how long it takes to make a Honda and an arbitrary ID
        chevyFactory.MakeProduct(1000, i + 10);
    }

    Console.WriteLine("Done");
    Console.ReadLine();
}

We created two vehicle factories that create two types of cars: Honda and Chevy. I made 3 observable streams of events that we could query. These streams contain the events that are fired by each factory.

When running the program, any two vehicles created in the same second will get this kind of output:
Vehicle Contamination Example Output

We can picture the streams like this:
Observable Vehicle Streams Notice that the third stream is the merged stream of events. From this we can group the events together by seconds to see which vehicles were created at the same time. After grouping we want to subscribe to that group. Within that group contains 1 or more vehicle events grouped by seconds. We make a small list of vehicles and when they get created (because it’s asynchronous) we check to see if there’s more than one in this second grouping. If there is, we know that there is a possibility of contamination for paint.
For every other event, we’ve ignored it. Additionally, if at a later date we wanted to query something on the Honda factory, we could simply subscribe to that stream and perform some kind of action with it as so:

var hondaLine = hondaProductionStream  
.Subscribe(h =>
    {
        Console.WriteLine("Do some action to all Hondas");
    });

Or if we specifically wanted to target Hondas created we could provide a LINQ where clause to perform another type of action:

var customPaintJobHondaLine = hondaProductionStream  
    .Where(h => h.EventArgs.Id % 2 == 0)
    .Subscribe(h =>
        {
            Console.WriteLine("Apply Custom paint job for every other vehicle: " + h.EventArgs.Id);
        });

Reactive Extensions looks to be a powerful tool in the tool belt. Definitely plan to play more with it.

comments powered by Disqus