Skip navigation
All Places > PI Developers Club > Blog > Authors msingh

PI Developers Club

1 Post authored by: msingh Employee
msingh

Stream Updates in PI Web API

Posted by msingh Employee Aug 22, 2018

What is Stream Updates?

Stream Updates is a mechanism in PI Web API to retrieve data updates for PI Points and Attributes. It is built on top of Streams and StreamSets, which use the HTTP protocol. Stream Updates uses markers to mark the specific event in a stream where the client got the last updates and uses those to get new updates since that point in the stream. Every time you request the updates, you get the changes since the time you registered, as well as a new hyperlink to use for the next set of updates. Currently, Stream Updates is only available as a CTP feature with PI Web API 2018.

 

Why was Stream Updates added?

Before Stream Updates, the way we retrieved new data for PI points and attributes was not very efficient. The only way to get new data was to continually issue requests to find out about changes (polling) which was time consuming and ineffiecient. We had lots of tweaks and options to make the overall experience less time-consuming but in order to achieve better performance, we decided to support incremental updates.

 

Why use Stream Updates?

Stream Updates is built to overcome some basic challenges with getting incremental data. Stream Updates operates over HTTP which means that all the basic benefits of normal HTTP requests are present. This contrasts with Channels, which are implemented via the WebSockets protocol. In most cases, Stream Updates is more performant than Channels.

Response sizes are much smaller than continuously polling because we get only the changes instead of the whole response all over again. Stream Updates also requires less server and network resource requirements than polling. Unlike Channels, Stream Updates is compatible with claims-based authentication.

 

How to use Stream Updates?

Stream Updates usage consists of these steps:

1. The Client registers an attribute/point for updates by sending a POST request.

2. If successful, the Client gets the updates by using the marker in the registration response. Markers are also provided as part of the "Links" object in the response and the "Location" header of the response. Clients can get updates by sending a GET request using this marker.

3. The response to receive updates will contain the "Latest Marker" which will be the current position in the stream. The user can get new updates after this position by sending out GET requests using this new marker. These requests can be chained together to get incremental updates for registered resources.

 

Sample CSharp client illustrating usage of Stream Updates:

`PIWebAPIClient.cs` is a wrapper around the `HttpClient` which implements the GET and the POST methods.

 

public class PIWebAPIClient
  {
       private HttpClient client;
       private string baseUrl;

       public PIWebAPIClient(string url, string username, string password)
       {
            client = new HttpClient();
            string auth = Convert.ToBase64String(Encoding.ASCII.GetBytes(string.Format("{0}:{1}", username, password)));
            client.DefaultRequestHeaders.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Basic", auth);
            baseUrl = url;
       }

       public PIWebAPIClient(string url)
       {
            client = new HttpClient();
            baseUrl = url;
       }

       public async Task<object> GetAsync(string uri)
       {
            HttpResponseMessage response = await client.GetAsync(uri);
            var jsonString = await response.Content.ReadAsStringAsync();
            var json = JsonConvert.DeserializeObject<object>(jsonString);
            if (!response.IsSuccessStatusCode)
            {
                 var responseMessage = "Response status code does not indicate success: " + (int)response.StatusCode + " (" + response.StatusCode + " ). ";
                 throw new HttpRequestException(responseMessage + Environment.NewLine + jsonString);
            }
            return json;
       }

       public async Task<object> PostAsync(string uri)
       {
            HttpResponseMessage response = await client.PostAsync(uri, null);
            var jsonString = await response.Content.ReadAsStringAsync();
            var json = JsonConvert.DeserializeObject<object>(jsonString);
            if (!response.IsSuccessStatusCode)
            {
                 var responseMessage = "Response status code does not indicate success: " + (int)response.StatusCode + " (" + response.StatusCode + " ). ";
                 throw new HttpRequestException(responseMessage + Environment.NewLine + jsonString);
            }
            return json;
       }

       public async Task<dynamic> RegisterForStreamUpdates(string webId)
       {
            string url = baseUrl + "/streams/" + webId + "/updates";
            dynamic response = await PostAsync(url);  
            return response;
       }

       public async Task<dynamic> RetrieveStreamUpdates(string marker)
       {
            string url = baseUrl + "/streams/" + "/updates/" + marker;
            dynamic response = await GetAsync(url);
            return response;
       }
  }

`Program.cs` is a simple C# class which uses the Client to register for and retrieve Stream Updates.

 

class Program
 {
    static string baseUrl = "https://your-server/piwebapi";
    static string marker = null;
    static string username = "username";
    static string password = "password";
    static PIWebAPIClient client = new PIWebAPIClient(baseUrl, username, password);

    static void Main(string[] args)
    {
        string webId = "webId";
        dynamic response = client.RegisterForStreamUpdates(webId).Result;
        marker = response.LatestMarker;

        var startTimeSpan = TimeSpan.Zero;
        var periodTimeSpan = TimeSpan.FromSeconds(10);
       
       //ReceiveUpdates is called every 10 seconds until the client explicitly exits the application.
        var timer = new System.Threading.Timer((e) =>
        {
            ReceiveUpdates(marker);
        } ,null, startTimeSpan, periodTimeSpan);

        Console.ReadLine();
    }

    public static void ReceiveUpdates(string marker)
    {
        dynamic update = client.RetrieveStreamUpdates(marker).Result;
        Console.WriteLine(update);
        Console.WriteLine("Press any key to exit anytime!");
        marker = update.LatestMarker;
    }
 }

For more information, see the topics page of your PI Web API installation: https://your-server/piwebapi/help/topics/stream-updates

Filter Blog

By date: By tag: