Motivation

 

Many use cases of async PI AF SDK methods will involve making multiple async calls and awaiting their results. For example, we can make an async call for each PI Point in a collection of these objects. This query seems similar to a synchronous bulk call using PIPointList or AFListData. However, the difference is that with async, we can vary the query parameters (such as time range) for each PI Point, which is not possible with the bulk call.

 

In this post, we introduce a pattern that allows PI AF SDK clients to consume the results of async calls as they are completed. One major motivation for doing so is greater concurrency. Processing the result of each async call as it completes allows us to use both CPU and IO resources more effectively.

 

The blog post will walk through some code patterns we can use when making multiple async calls in PI AF SDK.

 

Problem definition

 

We focus on an event frame use case here, although the concepts can apply to any use case involving "heterogeneous" queries across a list of PI Points or AF Attributes (i.e. the parameters of query are not the same for each PIPoint/AFAttribute so a bulk call cannot be used).

 

We would like to perform summary calculations on a list of event frames. For each event frame, we perform a summary calculation for a PI Point within the event frame time range. Each event frame has a different start and end time. There are multiple ways we can programmatically solve this problem.

  1. Synchronous loop over the list of event frames
  2. Parallel tasks over the event frames
  3. Async calls that are processed only after all tasks complete
  4. Async calls using Task.WhenAny to process tasks as they complete
  5. Async calls using IObservable<T> to process tasks as they complete

 

The code for these examples is hosted at:

GitHub - osisoft/PI-AF-SDK-AsyncToObservable

0. Find the Event Frames

AFDatabase database = PISystem.CreatePISystem("SERVER").Databases["AsyncExamples"];
AFEventFrameSearch efSearch = new AFEventFrameSearch(database, "EFSearch", new List<AFSearchToken> {
     new AFSearchToken(AFSearchFilter.Name, AFSearchOperator.Equal, "EF*"),
     new AFSearchToken(AFSearchFilter.Start, AFSearchOperator.GreaterThan, "01-jan-2012"),
     new AFSearchToken(AFSearchFilter.End, AFSearchOperator.LessThan, "01-jan-2013"),
});
IEnumerable<AFEventFrame> efs = efSearch.FindEventFrames(0, false, 1000).Take(10000);

 

We show this because we are using the new search in PI AF SDK 2016 to find our event frames. The important concept to know here is that the new search returns an IEnumerable collection of event frames that we can loop over. We just find the first 10,000 event frames to keep things simpler.

 

1. Synchronous loop over the list of event frames

public static class SyncExample
{
     public static void Run(IEnumerable<AFEventFrame> efs)
     {
          foreach (AFEventFrame ef in efs)
          {
               OutputInfo output = Common.GetSummary(ef);
               Common.WriteToConsole(output);
          }
          Common.WriteCompletion();
     }
}

 

This one is fairly simple. We just loop over the event frames and make a summary call on each (i.e. Common.GetSummary will make an AFData.Summary call). See the Github code for the implementation of the methods above but we try not to make them too important for these examples. Here, by "process", we simply choose to print some output info to the Console, but we target our discussion to be relevant for any generic "processing" tasks.

 

2. Parallel tasks over the event frames

public static class ParallelExample
{
     public static void Run(IEnumerable<AFEventFrame> efs)
     {
          Parallel.ForEach(efs, new ParallelOptions { MaxDegreeOfParallelism = 8 }, ef =>
          {
               OutputInfo output = Common.GetSummary(ef);
               Common.WriteToConsole(output);
          });
          Common.WriteCompletion();
     }
}

 

Here, we use .NET Task Parallel Library (TPL) to parallelize the calls over the event frames. TPL is convenient to use and perhaps ideal if our client machine has many cores, but our data access is over the network while calls to TPL ideally should be CPU-bound. For server-side code bound by threads, such as approach likely will not scale well. In addition, this is a poor fit for web services with many users as it can demand many threads per user.

 

3. Async calls that are processed only after all tasks complete

public static class TaskExample1
{
     public static void Run(IEnumerable<AFEventFrame> efs)
     {
          Task<IList<OutputInfo>> efTasks = GetOutputInfoList(efs);

          IList<OutputInfo> output = efTasks.Result;
          foreach (OutputInfo info in output)
          {
               Common.WriteToConsole(info);
          }
          Common.WriteCompletion();
     }

     private static async Task<IList<OutputInfo>> GetOutputInfoList(IEnumerable<AFEventFrame> efs)
     {
          Task<OutputInfo>[] tasks = efs.Select(async ef => await Common.GetSummaryAsync(ef)).ToArray();
          return await Task.WhenAll(tasks);
     }
}

 

Here, we launch an async call for each event frame (i.e. Common.GetSummaryAsync will make an AFData.SummaryAsync call). We launch them all at once and then await for them all to complete. This suspends the current execution until all calls complete. The method GetOuputInfoList only returns once we've received results from all of our async calls. The async caller in this case must block (with Task.Result) and can only write to the Console (i.e. process the results) once all the tasks are complete.

 

Note that we are achieving PI Data Archive concurrency here and will get client-side concurrency within processing by AF SDK code itself, but our custom code will not get client-side concurrency. In a producer-consumer analogy, we allow our producers to provide results concurrently, but we don't allow our consumers to consume until all the results are provided, and therefore, we are sacrificing some degree of concurrency in this approach.

 

4. Async calls using Task.WhenAny to process tasks as they complete

public static class TaskExample2
{
     public static void Run(IEnumerable<AFEventFrame> efs)
     {
          WriteOutputInfo(efs);
     }
     private static async void WriteOutputInfo(IEnumerable<AFEventFrame> efs)
     {
          List<Task<OutputInfo>> tasks = efs.Select(async ef => await Common.GetSummaryAsync(ef)).ToList();
          while (tasks.Count > 0)
          {
               var t = await Task.WhenAny(tasks);
               tasks.Remove(t);
               Common.WriteToConsole(await t);
          }
          Common.WriteCompletion();
     }
}

 

Above, we've shown the pattern introduced in this post by Stephen Toub. Instead of waiting for all tasks to complete via WhenAll, we wait for just one to complete via WhenAny. Then, we remove it from the list, extract the result of the task, and repeat until the list of tasks is empty. Note several bottlenecks in this case. First, we must materialize the task list. Second, removing the task from the list is O(N). Third, registering the continuation for each task is also O(N). The latter two result in an O(N^2) time complexity, not great if we have a large number of tasks. Although we allow our consumers to process at the same time our producers are producing, we've added extra overhead in the processing. In the same post, another approach is introduced to optimize further. We won't discuss it, but like to suggest a way to tackle this problem in a more "natural" way.

 

5. Async calls using IObservable<T> to process tasks as they complete

public static class ObservableExample
{
     public static void Run(IEnumerable<AFEventFrame> efs)
     {
          IObservable<OutputInfo> obs = efs.ToObservable()
          .Select(i => Observable.FromAsync(() => Common.GetSummaryAsync(i)))
          .Merge();

          obs.Subscribe(Common.WriteToConsole, Common.WriteCompletion);
     }
}

 

What we'd really like to do is launch a bunch of async calls and process the results in the order of task completion, not initialization. In essence, we'd like to "observe" the completion of tasks as a continuous (but bounded) stream. For this pattern, I'm a big proponent of using Rx.NET (NuGet: rx-main).

 

Above, we're using Rx.NET to create an observable sequence of results from tasks as they complete. From the IEnumerable<AFEventFrame>, we convert this to an observable sequence of event frames. For each event frame observed, we make an async summary call returning a Task<OutputInfo>. Here is an important step. Via Observable.FromAsync, we convert the asynchronous receipt of a single result from Task into the asynchronous observation of a sequence producing a single result. Wrapping the Task in this way also ensures it does not start right away but only when we have a subscribed observer! At this point, we have an IObservable<IObservable<OutputInfo>>. The Merge method allows us to "flatten" the observable hierarchy we just created into just a single sequence producing an OutputInfo object for each observed event frame (LINQ experts may recognize this is basically a SelectMany operation). Finally, we subscribe an implicit observer via the Subscribe method. Our observer's OnNext calls WriteToConsole and OnCompleted calls WriteCompletion.

 

The benefit of Rx is that it allows us to declaratively chain data pipelines together, as we could using LINQ to IEnumerable.

 

For example, we can implement buffering easily if we wanted to process a group of items. Below, we write the items to the Console in groups of 1,000.

IObservable<IList<OutputInfo>> obs = efs.ToObservable()
     .Select(i => Observable.FromAsync(() => Common.GetSummaryAsync(i)))
     .Merge()
     .Buffer(1000);
obs.Subscribe(Common.WriteBufferToConsole, Common.WriteCompletion);

 

What if we wanted to process the results in the order of the event frames? Easy, just change Merge() to Concat().

 

Observable sequences are an appropriate pattern to use when you want to process a list of tasks as they complete. The learning curve for Rx is steep, but it can really make code more readable, declarative, and shorter.

 

Bulk call analogy

 

There is an analogy here to the PI AF SDK bulk calls in terms of degree of "laziness". In the bulk call enumerable pattern, we (the client) will page through the results and ask the server to make them available. We do not have to wait for the server to return all of the results before we can start processing. In an async pattern, we can achieve a similar "laziness" by processing the list of tasks as they complete. We do not have to wait for all tasks to complete before processing the results. In the former, we still synchronously "pull" to ask for the next results. In the latter, we receive the results via an asynchronous "push". If you've read my earlier post on using Rx with AFDataPipe, you won't be surprised that the interface to use in the latter case is IObservable<T>

 

If there is one takeaway from this post, it is the table below (inspiration from GitHub - Reactive-Extensions/Rx.NET). It shows the analogy we can draw between synchronous and asynchronous method interfaces and how this analogy is fleshed out in our own PI AF SDK methods.

 

PatternSingle return valueMultiple return values
Return typePI AF SDK exampleReturn typePI AF SDK example
Pull/Synchronous/InteractiveTPIPoint.RecordedValuesIEnumerable<T>PIPointList.RecordedValues
Push/Asynchronous/ReactiveTask<T>PIPoint.RecordedValuesAsyncIObservable<T>Rx.NET example in this blog post

 

Using a PI AF SDK example of RecordedValues calls, we have

T = AFValues

We encourage you to check the PI AF SDK methods listed in the table above to verify. IObservable<T> return type is currently not natively supported in PI AF SDK as of 2016 for historical data retrieval, but we have shown here it is possible to implement this pattern yourself.

 

Throttling

 

There is currently a client-side cap on the number of outstanding calls to a PI Data Archive. If you are making multiple async calls, it is important to keep this cap in mind. Having a queue of tasks or using an async Semaphore pattern can be a way to throttle the calls.

 

References

 

The ideas we've introduced here are not new and have been extensively discussed in the .NET literature. They are worth revisiting however if you plan to use some of the async methods introduced in PI AF SDK 2016. Here is a list of some of our favorites:

 

Introduction to Rx

Async Iterators - Dave Sexton Blog

Processing tasks as they complete - .NET Parallel Programming - Site Home - MSDN Blogs

Help me evangelize Rx: Select + async selector lambda  

Async, await, and yield return

c# - Asynchronous iterator Task<IEnumerable<T>> - Stack Overflow 

Proposal: language support for async sequences · Issue #261 · dotnet/roslyn · GitHub