Skip navigation
All Places > PI Developers Club > Blog > 2015 > September
2015

Motivation

 

Functional Reactive Programming (FRP) has gained momentum in recent years as a means for expressing application logic centered around asynchronous data flows. FRP has foundations in functional languages such as Scheme, Haskell, and Scala, but FRP libraries have emerged in recent years in object-oriented languages such as C#, Java, JavaScript, and Python. You will find FRP principles used in modern web and cloud applications (Microsoft, Google, Facebook, Twitter, Netflix, Github). There is even a course for it now on Coursera (Principles of Reactive Programming).

 

How can FRP be used with PI System data? What can I do (or can't do) with FRP + AF SDK? That is the topic of this series of blog posts.

 

This introductory blog post will first briefly cover the basic idea of FRP and then show how in less than 40 lines of code, we can create integrate AF SDK into Rx.NET (a library for reactive programming), immediately giving us a treasure trove of tools such as LINQ semantics for transforming, aggregating, and analyzing observable data streams.

 

GitHub repository

 

The GitHub repository for the code library and examples are here:

 

bzshang/AFSDK-Rx · GitHub

 

It will be updated further in the next few weeks with more functionality and examples.

 

Most of the idea and concepts here I learned from this great resource.

 

Introduction to Rx

 

If interested, I highly recommend reading and re-reading the material above as it may help you discover additional ideas other than those here for programming reactively with AF SDK.

 

What is FRP? What is Rx.NET?

 

What exactly is FRP? While this blog post won't go into the details of FRP, I've linked a few resources below which help to introduce the concept. See References below.

 

The FRP-style of programming has been implemented in C# via Reactive Extensions for .NET (Rx.NET), which is available as a NuGet package.

 

For now, I will try to give my take on "What is FRP" in hopes that it may resonate with others.

 

Reversing the Enumerable

 

We can start to understand FRP via the more familiar IEnumerable interface in C#. With an enumerable collection, we "pull" the next item out (via foreach) and then do something with that item . This is the basic idea of the Iterator pattern.

 

In FRP, we have the IObservable interface. Items are "pushed" from the observable collection, and we react to them. This is the basic idea of the Observer pattern.

 

The Iterator and Observer patterns are duals. We "pull" in the former, and "push" with the latter. In a sense, Iterator (= Enumerator) is to Enumerable as Observer is to Observable.

 

iterator-observer.png

From http://www.cs.cornell.edu/courses/cs2112/2014fa/lectures/

 

In the former, the "pull" (Iterator.MoveNext) is a blocking call. The consumer must wait for the data source (or time out) before doing something with the item. In the latter, the receipt of the item is non-blocking. As soon as the item arrives, it is "pushed" to us.

 

Why Not C# Events?

 

The asynchronous "push" pattern appears often programming languages, expressed via constructs such as events, event handlers, delegates, callbacks, promises, futures, etc. What are the downsides to these constructs?

 

1) They don't treat the event as a first-class object. Therefore, we cannot perform further composition like we can with class types.

2) Because of this lack of composition, it becomes harder to write application logic that depends on sequences of events.

3) Similarly, it becomes harder to write application logic that depends on events from different sources.

 

As mentioned, one problem when writing event-driven applications is that events are not treated as first-class objects, in the sense that they don't offer any further means of combination. Once you have the event, you can only subscribe/unsubscribe to it, but the language doesn't offer any additional ways to reason about it. In particular, it is hard to reason about sequences of events.

 

This is where the observable comes in. What if we could represent a stream of events as collections? After all, the number of events can range from zero to infinite. Specifically, we can conceive of events as observable sequences, and represent them as an IObservable<T>. Just as we can take a familiar collection (array, list, etc.) and expose it via IEnumerable<T> type, we can do similarly for event collections.

 

See Intro to Rx - Why Rx? and End-to-End Reactive Programming at Netflix for additional discussion.

 

LINQ to Observables

 

Once we have the IObservable<T>, we now have events as first-class objects and can begin to develop means of combination for them. By analogy with LINQ to Enumerables (Objects), we can begin to develop a LINQ to Observables, and use familiar query methods such as Select, Where, and GroupBy to transform, aggregate, and analyze events just as we would with our familiar enumerable collection types.

 

This is where Rx.NET comes in. It offers methods to convert existing events into observable sequences and provides a framework (via LINQ-like extension methods) for manipulating and composing observable sequences. It also offers a framework for implementing the Observer pattern that handles scheduling and concurrency issues that can arise in an asynchronous programming model.

 

When should I use Rx?

 

In the context of AF SDK, when should you think about using Rx?

 

I would argue anytime your application logic is driven by composite real-time events, whether it is real-time data or metadata changes. I mention composite because I think it's overkill if the application logic typically only is driven by one event at a time (for these cases, a Task-based async pattern or plain-old C# Events might be a better fit). The need to handle composite events can occur in real-time calculation engines and applications that synchronize data among disparate systems. This doesn't mean you have to use Rx, but it is useful to consider what it could offer. The learning curve is steep but the rewards can be plentiful.

 

An Observable AFDataPipe

 

Enough with the theory. Now onto the practical. In this example, we will create an output time-series stream that represents the sum of two input time-series streams. This is very much like creating a Formula data reference or Analysis calculation that adds the values from two PI Point inputs. Specifically, we sign up for real-time events for SINUSOID and CDT158 and produce an observable sequence of AFDataPipeEvent types representing their sum.

 

This simple example is meant to just give a flavor of FRP using AF SDK with Rx.NET. Rx.NET is a huge library, and this example only scratches the surface.

 

The objectives are as follows:

 

1) From an instance of AFDataPipe, create an IObservable<AFDataPipeEvent>

2) Filter and split the observable sequence into two separate observable sequences exposing snapshot values of SINUSOID and CDT158.

3) Combine the two snapshot sequences into a single observable sequence representing the sum.

 

1) From an instance of AFDataPipe, create an IObservable<AFDataPipeEvent>.

 

This is perhaps the most important part. Without it, the rest is not possible.*

 

First, experienced AF SDK developers may ask: "But wait, doesn't AFDataPipe already implement the Observer pattern, since it exposes a Subscribe method and calls its observers' OnNext methods when new events arrive (via GetObserverEvents)?" This is true. However, although it can be argued that AFDataPipe implements the Observer pattern, it does not actually implement the IObservable<AFDataPipeEvent> interface. This is actually good for us. In Rx.NET, we do not want to actually implement the interfaces ourselves, but use the framework's factory methods to do this for us. This is because the factory methods also take care of scheduling, locking, and concurrency considerations, which we will probably miss by implementing the patterns ourselves. Rx.NET provides a rich library of extension methods for observables, and its self-created observable implementations will operate much more nicely with these extension methods than our custom-created ones.

 

So how to we actually create the IObservable<AFDataPipeEvent> from AFDataPipe? The code to do so is below and less than 40 lines.

 

using System;
using System.Timers;
using System.Reactive.Linq;
using System.Reactive.Disposables;

using OSIsoft.AF.Data;

namespace AFSDK.Rx
{
    public static class AFDataPipeExtensions
    {
        public static IObservable<AFDataPipeEvent> CreateObservable(this AFDataPipe dataPipe, double interval)
        {
            return Observable.Create<AFDataPipeEvent>(observer =>
            {
                IDisposable dpToken = dataPipe.Subscribe(observer);

                Timer timer = new Timer();
                timer.Interval = interval;
                timer.Elapsed += (object sender, ElapsedEventArgs e) =>
                {
                    bool hasMoreEvents = true;
                    while (hasMoreEvents)
                    {
                        dataPipe.GetObserverEvents(out hasMoreEvents);
                    }
                };

                timer.Start();

                return Disposable.Create(() =>
                {
                    timer.Dispose();
                    dpToken.Dispose();     
                });
            });
        }
    }
}

 

How does this all work? First, the above is an extension method for AFDataPipe. The method CreateObservable accepts one parameter (interval) that controls how often GetObserverEvents should be called. The method returns the IObservable<AFDataPipeEvent> we are looking for.**

 

The key line is line 14, the call to Observable.Create. This is the factory method in Rx.NET that allows us to create IObservable<T> types. The method accepts a delegate which represents what the Subscribe method would have been had we implemented IObservable<T> ourselves. The delegate takes in an IObserver as input. Inside the delegate body, we subscribe the observer to the data pipe via AFDataPipe.Subscribe and set up the polling timer. The timer's Elapsed event fires every time the configured interval is elapsed and calls GetObserverEvents. Internally, this method will call the OnNext method of the observer we passed in. We return an IDisposable where the Dispose implementation is defined via a delegate passed to Disposable.Create.

 

Note that the CreateObservable method is lazy. None of the delegates are actually invoked in this call. We are simply registering a method to be executed later when an observer actually subscribes to our observable via the call to IObservable.Subscribe. Rx.NET saves us the trouble of having to create an actual type implementing the IObservable. We just supply the implementation for IObservable.Subscribe. Rx.NET will perform the bootstrapping that creates the type implementing IObservable.

 

It is worth mentioning the broader pattern here. To move towards a true IObservable from AFDataPipe, we've sort of created our own Subscribe method implementation (this is the delegate passed into IObservable.Create). Inside our custom Subscribe, we call into the native AFDataPipe.Subscribe. We've created a subscription wrapper method to surface the underlying AFDataPipe as an IObservable<AFDataPipeEvent>.

 

Here is how to create the observable from AFDataPipe and the CreateObservable extension method.

 

AFElement element = AFObject.FindObject(@"\\AFServer\Sandbox\Reactive") as AFElement;
AFAttributeList attrList = new AFAttributeList(element.Attributes);

AFDataPipe dataPipe = new AFDataPipe();
var errors = dataPipe.AddSignups(attrList);

IObservable<AFDataPipeEvent> obsDataPipe = dataPipe.CreateObservable(); 

 

2) Filter and split the observable sequence into two separate observable sequences exposing snapshot values of SINUSOID and CDT158

 

This is where all the work we did in 1) pays off. Once we have the IObservable<AFDataPipeEvent> pumping out events, further operations can take advantage of the vast extension methods created for observables in Rx.NET. Since observables and iterators are essentially duals, it is no surprise that these extension methods have the flavor of familiar LINQ operators.

 

Because we've signed up both SINUSOID and CDT158 attributes to our AFDataPipe, the IObservable<AFDataPipeEvent> will have events for both of these PI Points. We can split or filter the observable sequence into two observable sequences: one for SINUSOID and one for CDT158. How can we do this? Via the Where (or Filter) operator of course!

 

For example, if I only want a stream of SINUSOID snapshots, here is the code:

 

IObservable<AFDataPipeEvent> sinusoidSnapshots = obsDataPipe.Where(evt => evt.Value.Attribute.Name == "SINUSOID" && evt.Action == AFDataPipeAction.Add);

 

I can do similarly for CDT158.

 

3) Combine the two sequences into a single observable sequence representing the sum.

 

I have two streams (sinsuoidSnapshots and cdt158Snapshots). Now, whenever a new snapshot value comes in from either stream, we want to emit a new value representing the sum of the latest snapshot values (i.e. natural scheduling). This is where the CombineLatest operator comes in. The returned stream is of the type IObservable<AFDataPipeEvent[]>. To get the sum, we can use Select (i.e. Map) to transform the stream into an IObservable<AFDataPipeEvent>, where the AFDataPipeEvent encapsulates the sum in an AFValue. I've written an Add extension method to capture these operations.

 

public static IObservable<AFDataPipeEvent> Add(this IObservable<AFDataPipeEvent> source1, IObservable<AFDataPipeEvent> source2)
{
    return source1
        .CombineLatest(source2, (s1, s2) => new[] { s1, s2 })
        .Select(evts =>
        {
            double sinusoid = Convert.ToDouble(evts[0].Value.Value);
            double cdt158 = Convert.ToDouble(evts[1].Value.Value);
            double result = sinusoid + cdt158;

            DateTime timestamp = evts[0].Value.Timestamp > evts[1].Value.Timestamp ? evts[0].Value.Timestamp : evts[1].Value.Timestamp;

            AFValue afVal = new AFValue(result, timestamp);
            AFDataPipeEvent dpEvent = new AFDataPipeEvent(AFDataPipeAction.Add, afVal);

            return dpEvent;
        });
}

 

Here is the usage of the method above.

 

IObservable<AFDataPipeEvent> sumStream = sinusoidSnapshot.Add(cdt158Snapshot);

 

Now that we have the sumStream as an IObservable<AFDataPipeEvent>, we can subscribe to it as an observer. In Rx.NET, we can call the Subscribe method on the observable and pass in the implementation of the OnNext, OnError, and OnCompleted methods as delegates.*** Here, we will just implement the OnNext and have it write the timestamp and value to the console.

 

IDisposable subscription = sumStream.Subscribe(val =>
{
    Console.WriteLine("Timestamp: {0}, SINSUOID + CDT158: {1}", val.Timestamp.ToString("HH:mm:ss.ffff"), val.Value);            
});

 

console_output.PNG

 

Limitations and Assumptions

 

It is important to discuss the limitations and assumptions made in the implementations above.

 

1) The data is in order. If an out-of-order event comes in, I do not update the sumStream and emit an out-of-order event to sumStream subscribers. I could implement this by not restricting to only AFDataPipeAction.Add in the event filter.

2) The data is not late arriving. Every time a new value from one stream comes in, I'm assuming I have the latest value for the other stream.

3) There is no error handling for backpressure scenarios. For example, if new events come in at a faster rate than my OnNext can process them, Rx.NET will buffer internally, but I do not account for when and how to handle growing buffers.

4) I'm sure I forgot many other ones. This is a placeholder for them

 

What's Next

 

Here are some topics I plan to discuss next.

 

1) How to implement a more robust Observable data pipe using an EventLoopScheduler along with recursive scheduling

2) How to handle late arriving data. This probably involves the use of Virtual Time Scheduling in Rx. Here, the AFValue timestamps drive the Rx scheduler, not the wall clock time. We will need to introduce some notion of consensus among streams for when the "virtual" time should be advanced, and hence, when scheduled operations are executed.

3) How to perform sampling, aggregations, sliding window operations, moving average calculations, and other LINQ to Observable functions.

 

See the updating GitHub repo below:

bzshang/AFSDK-Rx · GitHub

 

Notes

 

* Functional geeks will recognize this as the anamorphism (i.e. entering the monad).

** Our handler for timer.Elapsed could be improved. For example, we could handle AFErrors returned by GetObserverEvents and have logic to deal with disconnections and metadata changes. The coding and logic is more complex, and are left out here to focus the scope on using Rx.NET. Our use of Timers is also undesirable, for various reasons that I may discuss in another post. For a more robust implementation that takes advantage of Rx Schedulers and recursive scheduling, see the GitHub repository.

*** We could very well implement IObserver ourselves, but just like with IObservable, it is best to leave the actual creation of the concrete type to the Rx.NET framework and just specify minimally the interface methods (via delegates).

 

A Word About OSIsoft.AF.Reactive

 

vCampus members may remember a prior set of libraries (OSIsoft.AF.Reactive) that implemented the Observer pattern with AFDataPipe. The typical use case was for creating PI adapters to/from Microsoft's StreamInsight. There are some non-trivial differences between the Observable implementation introduced there and this one.

 

Here, I will argue the benefits of using our Observer implementation that leverages Rx.NET.

 

1) The vCampus library used a prior version of the AFDataPipe that lacked the Subscribe and GetObserverEvents method. Hence, an observable wrapper was written that allowed "subscription" to an AFDataPipe. Also, since GetObserverEvents was unavailable, a custom method was written to call an observer's OnNext method after a GetUpdateEvents call. However, GetObserverEvents should be better performing in that it avoids buffering up an AFValue list - it passes the AFValue directly to the observer through AFDataPipeEvent.

 

2) The vCampus library implemented its own IObservable and IObserver implementations, rather than use a framework like Rx.NET. It is entirely possible to integrate the vCampus library with Rx.NET. After all, much of Rx.NET is extension libraries for IObservables. However, these extension methods work "better" (I know, a bit hand waving here) when the framework creates the observable types itself, which is done in our implementation.

 

3) The vCampus library also implemented the Subject interface (ISubject = IObservable + IObserver) itself. As with the other interfaces, if Rx.NET is used, it is recommended to have the framework create the Subject itself.

 

4) The vCampus library implemented the ISubject as a proxy to convert from one IObservable<T> to another IObservable<T2>. The Subject subscribes to the source observable, converts the type, and then exposes its own observable with the new type. Using Rx.NET, we can completely eliminate creating proxy subjects for this purpose, via the Select and SelectMany (i.e. Map) extension methods on the IObservable<T>.

 

References

 

Intro to Rx - Why Rx?  (IMO the best overview and documentation for Rx.NET)

 

Reactive Extensions  - Push vs. Pull (the best IMO description of IEnumerable vs. IObservable)

 

The introduction to Reactive Programming you've been missing · GitHub (a helpful beginner's intro that walks through the concepts)

 

RxMarbles: Interactive diagrams of Rx Observables (marble diagrams illustrating the various Rx extension methods)

 

Reactive Extensions for .NET (landing page for the Rx.NET library)

 

101 Rx Samples - a work in progress - Reactive Framework (Rx) Wiki (a good repository of Rx.NET examples)

 

StackOverflow - What is Functional Reactive Programming? (SO offers some answers)

 

Monads, part one | Fabulous adventures in coding (C# expert Eric Lippert elegantly describes the concept of a monad, a design pattern for types that forms the basis of LINQ and Rx. I will honestly admit I don't understand all of the concepts here, but that's sometimes the beauty... at least you learn something new each time you re-read it!)

 

Reactive programming - Wikipedia, the free encyclopedia (a bit academic as Wikipedia sometimes is, but good references at the end)

 

ReactiveX - Links to More Information (treasure trove of links)

After careful consideration and after talking to many of you, we have decided that it is not appropriate to deprecate the PI OPC DA Server and the PI OPC HDA Server. It turns out that OPC is still a vital interconnection method for many industrial software applications and is still in use in mission-critical applications. So, you can once again consider our OPC Servers to be supported products with a strong future. We are working on a completely new implementation of the PI OPC DA Server using our AF SDK that should solve some of the performance and scalability problems you have been seeing. The architecture of the PI OPC HDA Server will remain as-is for the time being but we will be addressing some of its outstanding issues.

 

The PI OPC Servers are freely available for download from the Technical Support website just as the other Developer Technologies. As with the others, you will need a license from OSIsoft to deploy client applications that use the PI OPC Servers. The PI OPC DA Server and PI OPC HDA Server are covered by the PSA license but also the older DA license. Talk to your Account Manager if you have questions.

Filter Blog

By date: By tag: