Eugene Lee

Async Streaming with AF SDK

Blog Post created by Eugene Lee on Jun 10, 2019

Disclaimer:

Any of the code in this blog could contain bugs and shouldn’t be used in production without extensive testing.

You agree that if you use any of the provided code in your own production code that you accept all ownership, risks, liabilities, and responsibilities associated with the performance, support, and maintenance of the code.

Introduction

Greetings everyone! 

 

In this blog post, we shall be discussing about a concept called Async Streaming and how we can use it with AF SDK to help make more responsive, scalable and concurrent applications.

 

Async Streaming is a new feature in C# that will be natively supported in version 8 and .NET Core 3. Even though AF SDK is not supported in .NET Core, there are still libraries available out there that can bring the benefits of Async Streaming to AF SDK.

 

Async Streaming can be advantageous in many cases. For example:

  1. In front-end applications, the main UI thread can stay responsive during a data access call.
  2. In both client and server applications, the number of threads used to service a call can be reduced, as waiting threads won't be blocked and can be returned to the thread pool for re-use.
  3. The effect of latency is mitigated because remote calls can be executed concurrently.
  4. Receiving data and processing it as it is retrieved in a way that doesn't block while we wait.

 

What is Asynchronous programming?

Asynchronous programming is a means of parallel programming in which a unit of work runs separately from the main application thread and notifies the calling thread of its completion, failure or progress.


This is the first thing you will find if you do a Google search using that term. In layman terms, this means that it will be able to help us achieve more responsive applications by not blocking the main thread.


AF SDK bulk calls

Let's examine the available data access bulk calls for PI Points that offer async behavior.

 

 

We notice that the methods with native async behavior tend to return one value per PI Point. If you look at their counterparts which return multiple values per PI Point, we find that they are not natively async.

 

 

I shall use the RecordedValues method as an example here. Below is a snippet of how we normally call this method.

 

We can make a wrapper called GetRecordedValues to return us a list of the recorded values. The return type is IEnumerable<AFValues>.

 

Disclaimer:

Any of the code in this blog could contain bugs and shouldn’t be used in production without extensive testing.

You agree that if you use any of the provided code in your own production code that you accept all ownership, risks, liabilities, and responsibilities associated with the performance, support, and maintenance of the code.

 

private static IEnumerable<AFValues> GetRecordedValues(PIPointList pointList)
{
    PIPagingConfiguration config = new PIPagingConfiguration(PIPageType.TagCount, 1);
    var timeRange = new AFTimeRange("*-10y", "*");

    try
    {
        var listResults = pointList.RecordedValues(timeRange, AFBoundaryType.Inside, null, true, config);
        return listResults;
    }
    catch (OperationCanceledException)
    {
        // Errors that occur during bulk calls get trapped here
        // The actual error is stored on the PIPagingConfiguration object
        Console.WriteLine(config.Error.Message);
        return null;
    }
    catch (Exception otherEx)
    {
        // Errors that occur in an iterative fallback method get trapped here
        Console.WriteLine(otherEx.Message);
        return null;
    }
}

 

And then we can consume the wrapper using a foreach loop.

 

var afvalslist = GetRecordedValues(pointList);
foreach (var pointResults in afvalslist)
{
    foreach (var item in pointResults)
    {
        Console.WriteLine("Timestamp: " + item.Timestamp + "\tValue: " + item.Value + "\tName: " + pointResults.PIPoint);
    }
    Console.WriteLine();
}

 

Now, this sample is generally fine in most cases. The only bad thing about it is that it doesn't have any async behavior. If the PI Data Archive is busy serving other users or applications, threads may get blocked such that responsiveness and performance will suffer. What can we do to improve upon our code?

 

This is where Async Streaming can save the day!

 

Async Streaming

Async Streaming makes it possible to await for a stream of results. As I mentioned in the introduction above, there are libraries out there to integrate AF SDK with Async Streaming. For this blog post, I am going to use one of them called AsyncEnumerator found here

 

https://www.nuget.org/packages/AsyncEnumerator/

 

The package can be easily installed from NuGet via

 

Install-Package AsyncEnumerator -Version 2.2.2


It introduces 2 new interfaces called IAsyncEnumerable and IAsyncEnumerator. Lets examine each of them to understand how it helps us to do Async Streaming.

 

public interface IAsyncEnumerator
{
    object Current { get; }
    Task<bool> MoveNextAsync(CancellationToken cancellationToken = default);
}

 

The Current property is the same as IEnumerator's version. It gets the element in the collection at the current position of the enumerator. What's different is the MoveNextAsync method. Over here, we can see that it returns a Task to us. Thus, we can start the task and continue on with our work while letting the task run in the background. MoveNextAsync does not block the thread compared to MoveNext of IEnumerator.

 

public interface IAsyncEnumerable
{
    Task<IAsyncEnumerator> GetAsyncEnumeratorAsync(CancellationToken cancellationToken = default);
}

 

GetAsyncEnumeratorAsync creates an enumerator that iterates through a collection asynchronously. This also returns a Task which returns an IAsyncEnumerator when it is complete.

 

General usage patterns

We can use a general construct such as the one below to consume an async stream of values. Take note that this construct is specific to the library being used. C# 8 has a very similar syntax. This pattern of iteration will not block the thread which is what we desire for our application.

 

await asyncEnumerable.ForEachAsync(async number => {
    await Console.Out.WriteLineAsync($"{number}");
});

 

Behind the scenes, the compiler will translate the ForEachAsync statement to utilize the MoveNextAsync method and then access the Current property to get the element of interest.

 

Cancellation

With this pattern, you can use a cancellation token to stop the streaming. This is useful for implementing timeouts or for the user to cancel the operation. If you look at the parameters of MoveNextAsync, you will notice that it accepts a cancellation token which you can use for notifying the streaming to stop. 

 

public virtual Task<bool> MoveNextAsync(CancellationToken cancellationToken = default)

public static async Task ForEachAsync(this IAsyncEnumerable enumerable, Action<object> action, CancellationToken cancellationToken = default)

 

The ForEachAsync extension method passes this token to MoveNextAsync where we can then retrieve this token with the yield.CancellationToken property to check for cancellation. An example is like the following.

 

token = yield.CancellationToken;
if (token.IsCancellationRequested)
{
    await Console.Out.WriteLineAsync("cancelling");
    yield.Break();
}

 

 

Async Streaming + AF SDK = GetStreamingRecordedValuesAsync

Now that we know what Async Streaming is about, let us improve upon the GetRecordedValues wrapper that was introduced in the previous section. We will leverage on the general usage patterns and also include cancellation in our wrapper.

 

We will call this wrapper GetStreamingRecordedValuesAsync. We will retrieve pages of results from the PI Data Archive one tag at a time as defined by the PIPagingConfiguration settings. The return type is IAsyncEnumerable<AFValue>.

 

Disclaimer:

Any of the code in this blog could contain bugs and shouldn’t be used in production without extensive testing.

You agree that if you use any of the provided code in your own production code that you accept all ownership, risks, liabilities, and responsibilities associated with the performance, support, and maintenance of the code.

 

private static IAsyncEnumerable<AFValue> GetStreamingRecordedValuesAsync(PIPointList pointList)
{
    PIPagingConfiguration config = new PIPagingConfiguration(PIPageType.TagCount, 1);
    var timeRange = new AFTimeRange("*-10y", "*");

    return new AsyncEnumerable<AFValue>(async yield =>
    {
        try
        {
            await Task.Run(async () =>
            {
                var listResults = pointList.RecordedValues(timeRange, AFBoundaryType.Inside, null, true, config);
                CancellationToken token;
                foreach (var pointResults in listResults)
                {
                    token = yield.CancellationToken;
                    if (token.IsCancellationRequested)
                    {
                        await Console.Out.WriteLineAsync("cancelling");
                        yield.Break();
                    }

                    foreach (var result in pointResults)
                    {
                        await yield.ReturnAsync(result);
                    }

                }
            });
        }
        catch (OperationCanceledException)
        {
            // Errors that occur during bulk calls get trapped here
            // The actual error is stored on the PIPagingConfiguration object
            await Console.Out.WriteLineAsync(config.Error.Message);
            yield.Break();
        }
        catch (Exception otherEx)
        {
            // Errors that occur in an iterative fallback method get trapped here
            await Console.Out.WriteLineAsync(otherEx.Message);
            yield.Break();
        }
    });
}

 

With this sample, we will be streaming the recorded values for each PI Point on the list. We can utilize the wrapper using the ForEachAsync loop and pass to it a cancellation token.

 

var cts = new CancellationTokenSource();
var afvalslist = GetStreamingRecordedValuesAsync(pointList);
await afvalslist.ForEachAsync(async item =>
{
    await Console.Out.WriteLineAsync("Timestamp: " + item.Timestamp + "\tValue: " + item.Value.ToString().PadRight(20) + "Name: " + item.PIPoint);
}, cts.Token);

 

This method of streaming ensures the calling thread doesn't get blocked and can continue with other work. To refresh your memory, a PIPointList can contain points from multiple PI Data Archives. For a global enterprise, your PI Data Archives could be scattered around the world. What if your application is hosted in USA but you need data from the server in Singapore? No matter what, latency will definitely affect its performance. You can't beat the laws of physics but at least you are free to do other work while waiting. That's what productivity and concurrency is about!

 

Point of caution

With Async Streaming on the client side, we can conveniently fire and forget calls. However, one has to keep in mind that the server will still need to process the data request.  If every single application just dumps all these data calls asynchronously to the server, it will have some negative effects on the server. Therefore, it is up to the user to implement some kind of throttling.

 

Conclusion

In this blog post, we have looked at what Async Streaming is and how it can help you make responsive, scalable and concurrent applications. In AF SDK, some bulk data calls might not have async methods. However, we can still use async streaming to improve the performance of our application utilizing these methods. I found a feature request here to expose asynchronous interfaces for bulk calls of AF Attributes. You can vote for it if you are interested.

https://pisquare.osisoft.com/ideas/5743-af-sdk-async-data-methods-for-multiple-afattributes

 

I hope you have learnt something useful from this article. Let me know if you have any comments!

Outcomes