Marcos Vainer Loeff

Using PI Web API Channels with the IObserver pattern

Blog Post created by Marcos Vainer Loeff Employee on Feb 27, 2018

Introduction

 

Channels  is the PI Web API version of the AFDataPIpe feature of the AF SDK. It is a way to receive continuous updates about a stream or stream set. Rather than using a typical HTTP request, channels are accessed using the Web Socket protocol. There are numerous Web Socket client libraries in several languages available for use with channels.

 

The latest release of PI Web API client library for .NET Framework adds 3 methods to the PIWebApiClient.Channels class in order to use this feature client side using the IObserver pattern. This pattern was chosen due to the fact that PI Developers are used to using this pattern with AFDataPipe.

 

Methods added to the ChannelsApi class

 

PI Web API Channels can be used with 3 different URLs:

  • wss://myserver/piwebapi/streams/{webId}/channel (webId must be a stream
  • wss://myserver/piwebapi/streamsets/{webId}/channel (webId should be a streamset)
  • wss://myserver/piwebapi/streamsets/channel?webId={webId} (multiple streams)

 

A stream in PI Web API could be a PI Point or an AF Attribute with a data reference. A stream set is defined as a collection of streams. Streams inside a stream set can be independent, or share the same base element (e.g. element or event frame) or parent (e.g. parent attribute). Please review the PI Web API help for more information.

 

As a result, the following 3 methods were added to the ChannelsApi:

 

 

All of those methods returns a task still in execution. The reason it was designed this way is to avoid blocking the main thread of your application. The inputs of those methods are:

 

  • The webId of a stream or a stream,  or the list of webIds from multiple streams
  • A custom class that implements IObserver<PIItemsStreamValues> pattern. This is how you customize how the application would process new received values.
  • CancellationTokenSource which is used to cancel your task.

 

Example

 

Let's start by creating our custom class which implements the IObserver<PIItemsStreamValues> interface.

 

    public class CustomChannelObserver : IObserver<PIItemsStreamValues>
    {
        public void OnCompleted()
        {
            Console.WriteLine("Completed");
        }


        public void OnError(Exception error)
        {
            Console.WriteLine(error.Message);
        }


        public void OnNext(PIItemsStreamValues value)
        {
            foreach(PIStreamValues item in value.Items)
            {
                foreach (PITimedValue subItem in item.Items)
                {
                    Console.WriteLine("\n\nName={0}, Path={1}, WebId={2}, Value={3}, Timestamp={4}", item.Name, item.Path, item.WebId, subItem.Value, subItem.Timestamp);
                }
            }
        }
    }

 

 

Since this is just an example for you to refer to, this custom class will show the received values on the console. I suggest taking a look at the PIItemsStreamValues class. Its Items property has many PIStreamValues. The PIStreamValues has also the Items property which has many PITimedValue objects.

 

The code snippet of the main thread which will call the StartStreamSets method is below:

 

        private static void ChannelsExamples()
        {
            PIWebApiClient client = new PIWebApiClient("https://marc-web-sql.marc.net/piwebapi", true);
            PIPoint point1 = client.Point.GetByPath("\\\\marc-pi2016\\sinusoid");
            PIPoint point2 = client.Point.GetByPath("\\\\marc-pi2016\\sinusoidu");
            PIPoint point3 = client.Point.GetByPath("\\\\marc-pi2016\\cdt158");
            List<string> webIds = new List<string>() { point1.WebId, point2.WebId, point3.WebId };


            CancellationTokenSource cancellationSource = new CancellationTokenSource();
            IObserver<PIItemsStreamValues> observer = new CustomChannelObserver();
            Task channelTask = client.Channel.StartStreamSets(webIds, observer, cancellationSource.Token);
            System.Threading.Thread.Sleep(120000);
            cancellationSource.Cancel();
            channelTask.Wait();
        }

 

This is how the code works:

 

  • Create a new instance of PI Web API using Kerberos authentication
  • Get the 3 WebIDs from the SINUSOID, SINUSOIDU and CDT158 points.
  • Create a list of those 3 WebIDs.
  • Instantiate a cancellationTokenSource and the custom class with the IObserver pattern.
  • Call StartStreamSets method which will return a task.
  • Wait 2 minutes using the Thread.Sleep() method. During this period all the events will be received through the OnNext() method from the custom class with the IObserver pattern.
  • Call cancellationTokenSource.Cancel() method.
  • Wait for the returned task to finish.

 

The new values are shown on the console:

 

 

Please refer to this sample application for more details of the implementation.

 

Conclusions

 

For all PI developers who are used to using IObserver pattern with AFDataPIpe, I think they will find pretty easy to use it with PI Web API Channels. Unfortunately, this feature is only available on the PI Web API client library for .NET Framework and not on the .NET Standard version due to the fact that WebSockets from .NET Core is not compatible with PI Web API Channels security.

Outcomes