Achieve Maximum Performance With DataPipes and the Observer Pattern

Blog Post created by rborges Employee on Apr 10, 2018


The AF SDK provides two different ways to get live data updates and I recently did some stress tests on AFDataPipes, comparing the observer pattern (GetObserverEvents) with the more traditional GetUpdateEvents. My goal was to determine if there is a preferred implementation.


The Performance Test

The setup is simple: listen to 5332 attributes that are updated at a rate of 20 events per second. This produces over 100k events per second that we should process. I agree that this is not a challenging stress test but is on par with what we usually see on customers around the globe. The server is very modest, with only 8GB of RAM and around 1.2GHz of processor speed (it’s an old spare laptop that we have here at the office). Here is the code I used to fetch data using GetUpdateEvents (DON’T USE IT - Later in this article, I will show the code I've used to test the observer pattern implementation):


var dataPipe = new AFDataPipe();
CancellationTokenSource source = new CancellationTokenSource();
Task.Run(async () =>
        while (!source.IsCancellationRequested)
            // Here we fetch new data
            var updates = dataPipe.GetUpdateEvents();
            foreach (var update in updates)
                Console.WriteLine("{0}, Value {1}, TimeStamp: {2}",
            await Task.Delay(500);
    catch (Exception exception)
        Console.WriteLine("Server sent an error: {0}", exception.Message);
}, source.Token);


After several hours running the application, I noticed that the GetUpdateEvents was falling behind and sometimes it was leaving some data for the next iteration. This is not a problem per se as, eventually, it would catch up with current data. I suspected that this would happen, but I decided to investigate what was going on. After some bit twiddling, I noticed something weird. Below we have a chart with the memory used by the application. On the top, we have the one that uses GetObserverEvents. On the bottom the GetUpdateEvents. They both use the same amount of memory but look closely at the number of GC calls executed by the .NET Framework.


2018-04-09 11_39_55-ObservableTest (Running) - Microsoft Visual Studio.png

(using GetObserverEvents)

2018-04-09 11_37_51-ObservableTest (Running) - Microsoft Visual Studio.png

(Using GetUpdateEvents)



Amazingly, this is expected as we are running the code on a server with a limited amount of memory and GetUpdates has extra code to deal with. Honestly, I was expected an increased memory usage and the GC kicking in like this was a surprise. Ultimately, the .NET framework is trying to save my bad code by constantly freeing resources back to the system.


Can this be fixed? Absolutely, but it is a waste of time as you could use this effort to implement the observer pattern (that handles all of this natively) and get some extra benefits:

  • Because it allows you to decouple your event handling from the code that is responsible for fetching the new data.
  • Because it is OOP and easier to encapsulate.
  • Because it is easier to control the flow.


Observer Pattern Implementation for AF SDK

In this GitHub file, you can find the full implementation of a reusable generic class that listens to AF attributes and executes a callback when new data arrives. It's very simple, efficient and has a minimal memory footprint. Let’s break down the most important aspects of it so I can explain what’s going on and show how it works.


The class starts by implementing the IObserver interface. This allows it to subscribe itself to receive notifications of incoming data. I also implement IDisposable because the observer pattern can cause memory leaks when you fail to explicitly unsubscribe to observers. This is known as the lapsed listener problem and it is a very common cause of memory issues:


public class AttributeListener : IObserver<AFDataPipeEvent>, IDisposable


Then comes our constructor:


public AttributeListener(List<AFAttribute> attributes, Action<AFDataPipeEvent> onNewData, Action<Exception> onError, Action onFinish)
      _dataPipe = new AFDataPipe();


Here I expect some controversy. First, because we are moving the subject to inside the observer and breaking the traditional structure of the pattern. Secondly, by using Action callbacks I’m going against the Event Pattern that Microsoft has been using since the first version of the .NET framework and has a lot of fans. It's a matter of preference and there are no performance differences. I personally don’t like events because they are too verbose and we usually don't remove the accessor (ie: implement a -=) and that can cause memory leaks. By the way, I’m not alone on this preference for reactive design as even the folks from Redmond think that reactive code is more suitable for the observer pattern. The takeaway here is how we subscribe the class to the AFDataPipe while keeping the data handling oblivious to it, giving us maximum encapsulation and modularity.


Now comes the important stuff, the code that does the polling:


public void StartListening()
    if (Attributes.Count > 0)
        Task.Run(async () =>
        while (!_source.IsCancellationRequested)
            await Task.Delay(500);
        }, _source.Token);


There is not much to talk about this code.  It starts a background thread with a cancellable loop that polls new data every 500 milliseconds. The await operator (together with the async modifier) allows our anonymous function to run fully asynchronous. Additionally, note how the cancellation token is used twice: as a regular token for the thread created by Task.Run(), but also as a loop breaker, ensuring that there will be no more calls to the server. To see how the cancelation is handled, give a look at the StopListening method of the class.


When a new DataPipeEvent arrives, the AF SDK calls the OnNext method of the IObserver. In our case it’s a simple code that only executes the callback provided to the constructor:


public void OnNext(AFDataPipeEvent pipeEvent)


Caveat lector: This is an oversimplified version of the actual implementation. In the final version of the class , the IObserver implementations are actually piping data to a BufferBlock that fires your Action whenever a new AFDataPipeEvent comes in. I'm using a producer-consumer pattern based on Microsoft's Dataflow library.


Finally, here’s an example of how this class should be used. The full code is available in this GitHub repo:


static void Main(string[] args)
    // We start by getting the database that we want to get data from
    AFDatabase database = (new PISystems())["MySystem"].Databases["MyDB"];
    // Defining our callbacks
    void newDataCallback(AFDataPipeEvent pipeEvent)
        Console.WriteLine("{0}, Value {1}, TimeStamp: {2}",
        pipeEvent.Value.Attribute.GetPath(), pipeEvent.Value.Value, pipeEvent.Value.Timestamp.ToString());
    void errorCallback(Exception exception) => Console.WriteLine("Server sent an error: {0}", exception.Message);
    void finishCallback() => Console.WriteLine("Finished");
    // Then we search for the attributes that we want
    IEnumerable<AFAttribute> attributes = null;
    using (AFAttributeSearch attributeSearch =
        new AFAttributeSearch(database, "ROPSearch", @"Element:{ Name:'Rig*' } Name:'ROP'"))
        attributeSearch.CacheTimeout = TimeSpan.FromMinutes(10);
        attributes = attributeSearch.FindAttributes();
    // We proceed by creating our listener
    var listener = new AttributeListener(attributes.Take(10).ToList(), finishCallback);
    // Now we inform the user that a key press cancels everything
    Console.WriteLine("Press any key to quit");
    // Now we consume new data arriving
    listener.ConsumeDataAsync(newDataCallback, errorCallback);
    // Then we wait for an user key press to end it all
    // User pressed a key, let's stop everything


Simple and straightforward. I hope you like it. And please, let me know whether you agree or disagree with me. This is an important topic and everybody benefits from this discussion!


UPDATE: Following David's comments, I updated the class to offer both async and sync data handling. Give a look at my final code to see how to use the two methods. Keep in mind that the sync version will run on the main thread and block it, so I strongly suggest you use the async call ConsumeDataAsync(). If you need to update your GUI from a separated thread, use Control.Invoke.


Related posts

Barry Shang's post on Reactive extensions for AF SDK

Patrice Thivierge 's post on how to use DataPipes

Marcos Loeff's post on observer pattern with PI Web API channels

David Moler's comment on GetObserverEvents performance

Barry Shang's post on async observer calls