jbazis

Stream Updates: new features in PI Web API 2018 SP1

Blog Post created by jbazis Employee on Jun 5, 2019

Stream Updates allows a PI Web API client to retrieve data updates from PI Points and AF Attributes without using Channels (which are based on websockets). With Stream Updates, you register for data streams of interest with an HTTP POST request to a new "streams/updates" endpoint. Data updates are then retrieved by polling with a marker returned to you by the registration call. We first introduced Stream Updates as a Community Technology Preview (CTP) in PI Web API 2018. We've added some new features in PI Web API 2018 SP1.

 

Changes in AF metadata

Previously, you would not be notified if there was a change in AF metadata. Now, you are notified if metadata – like AF Data Reference, units of measure, and description – has changed. The exact metadata change is not reported. Query the AF Attribute to see what changed; then, register again for Stream Updates to receive further updates.

 

This feature reacts to AF metadata changes only. Changes to PI Point attributes are not reported.

 

If an AF Attribute metadata change has occurred, the response payload will contain this:

"Exception": {
"Errors": [
"The signup was updated and any cached data could be invalid."
]
}

 

There is one type of edit to an AF Attribute that will be reported to your client in PI Web API 2018 SP1. If the value of a static Attribute is changed, the new value will be returned to your client as a data update.

 

Marker error always available

If a passed marker is valid, but the AF Attribute you are tracking experiences an error, you will receive a message in the Exception section of your response payload. For example, if an AF Attribute you are tracking is deleted, you will get this error in your response payload:

"Exception": {
"Errors": [
"The signup was removed because it is no longer valid."
]
}

 

Selected Fields

Most controllers in PI Web API support the optional selectedFields parameter that allows you to choose which fields in the standard payload are returned. This parameter has been added to Stream Updates registration and data updates. For example, to retrieve only the latest marker and the data updates, you would issue an HTTP GET request to:

https://myserver.com/streams/updates/{marker}/selectedFields=LatestMarker;Events

 

PreviousEventAction

When processing data updates, it is often useful to know what happened to the previous event received by the PI Data Archive. We have added the PreviousEventAction item to each Event in the response payload. Possible values are PreviousEventArchived and PreviousEventDeleted.

 

Performance

How fast can you expect to retrieve data updates this way? We set up a test environment with a 34 GB PI Data Archive server with 6 cores receiving data values into 250,000 PI Points at 1 second intervals each. Our 8 GB PI Web API server had 8 cores. Our test program registered for Stream Updates in blocks of 40 points each. We are able to sustain 40,000 data updates per second. We did need to edit some of the PI Update Manager tuning parameters and PI Web API configuration parameters to achieve this retrieval rate. For PI Update Manager, we set MaxUpdateQueue and TotalUpdateQueue to 2 million to overcome the default 50,000 queue size. We set the CacheInstanceUpdateInterval parameter in PI Web API to 1 (second) and the CacheInstanceUpdateHoldoffTime parameter to 0 (seconds) to more quickly query for new events and therefore empty the queue faster.

Knowing that PI Data Archive is capable of significantly higher throughput, we believe the current throughput is memory-bound in PI Web API; however, we also believe the throughput is sufficient for most uses of Stream Updates. If you have specific use cases where higher throughput is needed, please share them with us at https://feedback.osisoft.com.

Additionally, we do not recommend using the exact tuning parameters values as described above - totalUpdateQueue should be greater than MaxUpdateQueue. For additional information on these PI Data Archive tuning parameters, please refer to Knowledge Base article KB3151OSI8.

 

Sample C# code demonstrating the new features

This sample is an update from our original blog post:

PIWebAPIClient.cs

public class PIWebAPIClient 
    {
        private HttpClient client;
        private string baseUrl;
        public PIWebAPIClient(string url, string username, string password)
        {
            client = new HttpClient();
            if (!String.IsNullOrEmpty(username) && !String.IsNullOrEmpty(password))
            {
                string auth = Convert.ToBase64String(Encoding.ASCII.GetBytes(string.Format("{0}:{1}", username, password)));
                client.DefaultRequestHeaders.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Basic", auth);
            }
            client.DefaultRequestHeaders.Add("X-Requested-With", "asdf");  // avoids CSRF warning
            baseUrl = url;
        }
        public PIWebAPIClient(string url)
        {
            client = new HttpClient();
            baseUrl = url;
        }
        public async Task<object> GetAsync(string uri)
        {
            HttpResponseMessage response = await client.GetAsync(uri);
            var jsonString = await response.Content.ReadAsStringAsync();
            var json = JsonConvert.DeserializeObject<object>(jsonString);
            if (!response.IsSuccessStatusCode)
            {
                var responseMessage = "Response status code does not indicate success: " + (int)response.StatusCode + " (" + response.StatusCode + " ). ";
                throw new HttpRequestException(responseMessage + Environment.NewLine + jsonString);
            }
            return json;
        }
        public async Task<object> PostAsync(string uri)
        {
            HttpResponseMessage response = await client.PostAsync(uri, null);
            var jsonString = await response.Content.ReadAsStringAsync();
            var json = JsonConvert.DeserializeObject<object>(jsonString);
            if (!response.IsSuccessStatusCode)
            {
                var responseMessage = "Response status code does not indicate success: " + (int)response.StatusCode + " (" + response.StatusCode + " ). ";
                throw new HttpRequestException(responseMessage + Environment.NewLine + jsonString);
            }
            return json;
        }
        public async Task<dynamic> RegisterForStreamUpdates(string webId, string selectedFields)
        {
            string url = baseUrl + "/streams/" + webId + "/updates";
            if (!String.IsNullOrEmpty(selectedFields))
            {
                url += "?selectedFields=" + selectedFields;
            }
            dynamic response = await PostAsync(url);
            return response;
        }
        public async Task<dynamic> RetrieveStreamUpdates(string marker, string selectedFields)
        {
            string url = baseUrl + "/streams/" + "/updates/" + marker;
            if (!String.IsNullOrEmpty(selectedFields))
            {
                url += "?selectedFields=" + selectedFields            
            }
            dynamic response = await GetAsync(url);
            return response;
        }
        public string GetVersion()
        {
            string url = baseUrl + "/system";
            dynamic response = GetAsync(url).Result;
            return response.ProductVersion;
        }
    }

 

Program.cs

class Program 
    {
        static string baseUrl = "https://my.server.com/piwebapi";
        static string marker = null;
        static string username = "username";
        static string password = "password";
        static string webId = "myWebId";
        static PIWebAPIClient client = null;
        static void Main(string[] args)
        {
            client = new PIWebAPIClient(baseUrl, username, password);
            Console.WriteLine("PI Web API Version: {0}", client.GetVersion());
            // Register for Stream Updates requesting only LatestMarker and Status
            dynamic response = client.RegisterForStreamUpdates(webId, "LatestMarker;Status").Result;
            marker = response.LatestMarker;
            string stat = response.Status;
            string src = response.SourceName;
            //ReceiveUpdates is called every 10 seconds until you explicitly exit the application
            TimeSpan startTimeSpan = TimeSpan.Zero;
            TimeSpan periodTimeSpan = TimeSpan.FromSeconds(10);
            var timer = new System.Threading.Timer((e) =>
            {
                Console.WriteLine("{0},{1}", DateTime.Now.ToString(), marker);
                ReceiveUpdates(ref marker);
            }, null, startTimeSpan, periodTimeSpan);
            Console.ReadLine();
        }
        public static void ReceiveUpdates(ref string marker)
        {
            dynamic update = client.RetrieveStreamUpdates(marker, "Status;LatestMarker;Events;Exception.Errors").Result;
            //dynamic update = client.RetrieveStreamUpdates(marker, null).Result;
            Console.WriteLine(update);
            Console.WriteLine("Press the Enter key to exit anytime!");
            marker = update.LatestMarker;
            dynamic excp = update.Exception;
            if (excp != null)
            {
                throw new System.Exception(excp.Errors[0].ToString());
            }
        }
    }

Outcomes