Introduction

 

On this blog post, we will show you samples about how to use PI Web API Batch and Channels in C#. They both are some of the new features (in CTP) of PI Web API 2015 R3, Nevertheless, on the upcoming PI Web API 2016 release, the Batch service will be merged into the core services.

 

Please refer to this GitHub repository which was created to store the source code package of this blog post.

 

We will create a console application with the JSON.NET library, which can be downloaded using NuGet Package Manager.

 

The old way to make multiple requests with Tasks

 

First, let's remember how we used to run multiple HTTP requests simultaneously on previous version of the product. Please refer the code snippet below which shows how to create 50 new PI Points using C# Tasks.

 

     public static void Start()
        {
            int piPointsToBeCreated = 50;


            List<string> piPointNames = new List<string>();
            for (int i = 0; i < piPointsToBeCreated; i++)
            {
                piPointNames.Add("PIWebAPITest" + i.ToString());
            }
            
            Task<int>[] tasks = new Task<int>[piPointNames.Count];


            for (int i = 0; i < piPointNames.Count; i++)
            {
                tasks[i] = new Task<int>((piPointName) =>
                {
                    return CreatePIPoint(piPointName.ToString());
                }, piPointNames[i]);
            }


            Stopwatch stopWatch = new Stopwatch();
            stopWatch.Start();
            // start the antecedent tasks
            foreach (Task t in tasks)
            {
                t.Start();
            }


            Task.WaitAll(tasks);
            stopWatch.Stop();
            Console.WriteLine("Time elapsed: " + stopWatch.ElapsedMilliseconds);



            Console.WriteLine("Press enter to finish");
            Console.ReadLine();
        }




        internal static int CreatePIPoint(string piPointName)
        {
            string url = "https://marc-web-sql.marc.net/piwebapi/dataservers/s0IQnVxJzj6UuohiErVnqUqgTUFSQy1QSTIwMTQ/points";
            dynamic postBody = new JObject();
            postBody.Name = piPointName;
            postBody.PointClass = "classic";
            postBody.PointType = "Float32";
            postBody.Future = false;


            WebRequest request = WebRequest.Create(url);


            request.Credentials = new NetworkCredential("username", "password");


            ((HttpWebRequest)request).UserAgent = ".NET Framework Example Client";


            request.Method = "POST";
            request.ContentType = "application/json";


            byte[] byteArray = Encoding.UTF8.GetBytes(postBody.ToString());
            request.ContentLength = byteArray.Length;
            Stream dataStream = request.GetRequestStream();
            dataStream.Write(byteArray, 0, byteArray.Length);
            dataStream.Close();


            WebResponse response = request.GetResponse();
            return Convert.ToInt32(((System.Net.HttpWebResponse)(response)).StatusCode);
        }

 

By viewing the console application, it was needed 3.8 seconds for the 50 tags to be created.

 

 

You can find more information about this technique by reading my previous blog post: PI Web API 2015 R2 - New features demo - C#

 

The new way to make multiple requests with PI Web API Batch

 

The Batch controller has only one Action called Execute. The description below was copied from the PI Web API Online help:

 

 

Execute a batch of requests against the service. As shown in the Sample Request, the input is a dictionary with IDs as keys and request objects as values. Each request object specifies the HTTP method and the resource and, optionally, the content and a list of parent IDs. The list of parent IDs specifies which other requests must complete before the given request will be executed. The example first creates an element, then gets the element by the response's Location header, then creates an attribute for the element. Note that the resource can be an absolute URL or a JsonPath that references the response to the parent request. The batch's response is a dictionary uses keys corresponding those provided in the request, with response objects containing a status code, response headers, and the response body.

 

 

The idea of BATCH is to make a single HTTP request that will actually execute many requests on the server side.  This can be achieved as the body request is an array of objects, each one has some properties to define as Method, Resource and Content which represents a single HTTP request. In order to create 50 PI Points, the body message is an array with 50 objects, each one will create a specific PI Point. Please refer to the example below:

 

 

        public static void Start()
        {
            int piPointsToBeCreated = 50;


            List<string> piPointNames = new List<string>();
            for (int i = 0; i < piPointsToBeCreated; i++)
            {
                piPointNames.Add("PIWebAPITest" + i.ToString());
            }


            dynamic globalBatch = new JObject();
            for (int i = 0; i < piPointNames.Count; i++)
            {
                dynamic postCreatePoint = new JObject();
                postCreatePoint.Name = piPointNames[i];
                postCreatePoint.PointClass = "classic";
                postCreatePoint.PointType = "Float32";
                postCreatePoint.Future = false;
      


                dynamic batchCreatePoint = new JObject();
                batchCreatePoint.Method = "POST";
                batchCreatePoint.Resource = "https://marc-web-sql.marc.net/piwebapi/dataservers/s0IQnVxJzj6UuohiErVnqUqgTUFSQy1QSTIwMTQ/points";
                batchCreatePoint.Content = postCreatePoint.ToString();
                globalBatch[(i+1).ToString()] = batchCreatePoint;
            }
            Console.WriteLine(globalBatch.ToString());
            Stopwatch stopWatch = new Stopwatch();
            stopWatch.Start();
      
            Task<int> task = Task<int>.Factory.StartNew(() =>
            {
                return SendBatchRequest(globalBatch);
            });




            task.Wait();
            stopWatch.Stop();
            Console.WriteLine("Time elapsed: " + stopWatch.ElapsedMilliseconds);


            Console.WriteLine("Press enter to finish");
            Console.ReadLine();
        }




        internal static int SendBatchRequest(dynamic postBatch)
        {
            string url = "https://marc-web-sql.marc.net/piwebapi/batch";




            WebRequest request = WebRequest.Create(url);


            request.Credentials =  new NetworkCredential("username", "password");


            ((HttpWebRequest)request).UserAgent = ".NET Framework Example Client";


            request.Method = "POST";
            request.ContentType = "application/json";


            byte[] byteArray = Encoding.UTF8.GetBytes(postBatch.ToString());
            request.ContentLength = byteArray.Length;
            Stream dataStream = request.GetRequestStream();
            dataStream.Write(byteArray, 0, byteArray.Length);
            dataStream.Close();


            WebResponse response = request.GetResponse();
            return Convert.ToInt32(((System.Net.HttpWebResponse)(response)).StatusCode);
        }

 

By viewing the console application, it was needed only 1.2 seconds (31% of 3.8s) for the 50 tags to be created using BATCH.

 

 

There is no doubt about the performance improvement that your app can benefit using this service.

 

 

Chaining requests with Batch

 

The previous example has shown how to use BATCH with independent requests as no response was used as an input to generate the resource for another request. Nevertheless, this is not always true. Imagine a scenario with two requests where:

 

  1. The first request gets the attributes information from the sinusoid PI Point of the MARC-PI2014 PI Data Archive, including the WebId
  2. The second request uses the WebId to generate a new Resource (link) in order to retrieve recorded values of the SINUSOID.

 

The previous example, the response of the first request is used to generate the url of the second request. This is called Chaining Requests.

 

Let's take a look at how this works in C#. Please refer to the code snippet below:

 

 

        public static void Start()
        {


            dynamic globalBatch = new JObject();
            globalBatch["1"] = new JObject();
            globalBatch["1"].Method = "GET";
            globalBatch["1"].Resource = @"https://marc-web-sql.marc.net/piwebapi/points?path=\\marc-pi2014\sinusoid";
            globalBatch["1"].Headers = new JObject();
            globalBatch["1"].Headers["Cache-Control"] = "no-cache";




            globalBatch["2"] = new JObject();
            globalBatch["2"].Method = "GET";
            globalBatch["2"].Resource = "https://marc-web-sql.marc.net/piwebapi/streamsets/recorded?webId={0}";
            globalBatch["2"].Parameters = new JArray(new object[] { "$.1.Content.WebId" });
            globalBatch["2"].ParentIds = new JArray(new object[] { "1" });


            Console.WriteLine(globalBatch.ToString());
            Stopwatch stopWatch = new Stopwatch();
            stopWatch.Start();


            dynamic responseObject = SendBatchRequest(globalBatch);


            stopWatch.Stop();
            Console.WriteLine("SYNC: Time elapsed: " + stopWatch.ElapsedMilliseconds);


        }




        internal static dynamic SendBatchRequest(dynamic postBatch)
        {
            string url = "https://marc-web-sql.marc.net/piwebapi/batch";
            WebRequest request = WebRequest.Create(url);
            request.Credentials = new NetworkCredential("username", "password");
            ((HttpWebRequest)request).UserAgent = ".NET Framework Example Client";
            request.Method = "POST";
            request.ContentType = "application/json";
            byte[] byteArray = Encoding.UTF8.GetBytes(postBatch.ToString());
            request.ContentLength = byteArray.Length;
            Stream dataStream = request.GetRequestStream();
            dataStream.Write(byteArray, 0, byteArray.Length);
            dataStream.Close();
            string result = string.Empty;
            WebResponse response = request.GetResponse();
            using (var reader = new StreamReader(response.GetResponseStream()))
            {
                result = reader.ReadToEnd();
                //Console.WriteLine(result);
            }
            return JsonConvert.DeserializeObject<dynamic>(result);
        }

 

Note that:

  • ParentsId = ["1"] on the second request was used to make PI Web API wait for the first request to finish before starting the second request.
  • The Resource property of the second request used a parameter {0} on the link defined on the Parameter property.
  • The Parameter property has an array with only one element which is $.1.Content.WebId. This means that the first parameter of the second Resource is the WebId of the response of the first request.

 

 

Running async

 

Let's improve our code by making our program run async methods.

 

        public async static Task StartAsync()
        {


            dynamic globalBatch = new JObject();
            globalBatch["1"] = new JObject();
            globalBatch["1"].Method = "GET";
            globalBatch["1"].Resource = @"https://marc-web-sql.marc.net/piwebapi/points?path=\\marc-pi2014\sinusoid";
            globalBatch["1"].Headers = new JObject();
            globalBatch["1"].Headers["Cache-Control"] = "no-cache";




            globalBatch["2"] = new JObject();
            globalBatch["2"].Method = "GET";
            globalBatch["2"].Resource = "https://marc-web-sql.marc.net/piwebapi/streamsets/recorded?webId={0}";
            globalBatch["2"].Parameters = new JArray(new object[] { "$.1.Content.WebId" });
            globalBatch["2"].ParentIds = new JArray(new object[] { "1" });


            Console.WriteLine(globalBatch.ToString());
            Stopwatch stopWatch = new Stopwatch();
            stopWatch.Start();


            dynamic responseObject = await SendBatchRequestAsync(globalBatch);


            stopWatch.Stop();
            Console.WriteLine("ASYNC: Time elapsed: " + stopWatch.ElapsedMilliseconds);


        }








        internal async static Task<dynamic> SendBatchRequestAsync(dynamic postBatch)
        {
            string url = "https://marc-web-sql.marc.net/piwebapi/batch";




            WebRequest request = WebRequest.Create(url);


            request.Credentials = new NetworkCredential("marc.adm", "kk");


            ((HttpWebRequest)request).UserAgent = ".NET Framework Example Client";


            request.Method = "POST";
            request.ContentType = "application/json";


            byte[] byteArray = Encoding.UTF8.GetBytes(postBatch.ToString());
            request.ContentLength = byteArray.Length;
            Stream dataStream = request.GetRequestStream();
            dataStream.Write(byteArray, 0, byteArray.Length);
            dataStream.Close();
            string result = string.Empty;
            WebResponse response = await request.GetResponseAsync();
            using (var reader = new StreamReader(response.GetResponseStream()))
            {
                result = reader.ReadToEnd();
                //Console.WriteLine(result);
            }
            return JsonConvert.DeserializeObject<dynamic>(result);
        }

 

The main concepts to convert those methods are:

  • Convert methods signature:
    • static void --> async static Task
    • static dynamic --> async static Task<dynamic>
  • Use async methods if possible:
    • request.GetResponse() --> request.GetResponseAsync()
  • Add await for async methods
    • request.GetResponseAsync() --> await request.GetResponseAsync()
    • sendBatchRequestAsync() --> await sendBatchRequestAsync()

 

Finally, we just need to change the Main method from Program.cs in order to be able to make async requests as the Main method runs syncronous.

 


            Task.Run(async () =>
            {
                await ComplexBatchTest.StartAsync();
            }).Wait();

 

 

 

Using Channels (CTP) in C#

 

Under Topics of the PI Web API online help, there is one for Channels. There you can find useful information about how to use this funcionality.

 

By using the WebId of the sinusoid PI Point, I have generated the following URI:

 

wss://marc-web-sql.marc.net/piwebapi/streams/P0IQnVxJzj6UuohiErVnqUqgAQAAAATUFSQy1QSTIwMTRcU0lOVVNPSUQ/channel

 

The first step is to make sure channels is working properly by accessing a pre-built tool available at https://www.websocket.org/echo.htmlIf you are using Basic authentication, you might receive 401 errors after making the HTTP requests. In this case, open another tab on the browser, access PI Web API main endpoint, type your credentials, refresh the original web page and try again.

 

If it is working fine, let's try to receive new updates from the sinusoid PI Point in C#. I have referred to this stackoverflow thread to get started.

 

Please review the code snippet below. I have copied the URI on the code itself to make things easier.

 


        public static async Task Client()
        {


            ClientWebSocket ws = new ClientWebSocket();
            ws.Options.SetRequestHeader("Authorization", "Basic xxxxxxxxxx");


            var uri = new Uri("wss://marc-web-sql.marc.net/piwebapi/streams/P0IQnVxJzj6UuohiErVnqUqgAQAAAATUFSQy1QSTIwMTRcU0lOVVNPSUQ/channel");
            try
            {
                await ws.ConnectAsync(uri, CancellationToken.None);
            }
            catch (Exception)
            {
        
            }


            var buffer = new byte[1024];
            while (true)
            {
                var segment = new ArraySegment<byte>(buffer);


                var result = await ws.ReceiveAsync(segment, CancellationToken.None);


                if (result.MessageType == WebSocketMessageType.Close)
                {
                    await ws.CloseAsync(WebSocketCloseStatus.InvalidMessageType, "I don't do binary", CancellationToken.None);
                    return;
                }


                int count = result.Count;
                while (!result.EndOfMessage)
                {
                    if (count >= buffer.Length)
                    {
                        await ws.CloseAsync(WebSocketCloseStatus.InvalidPayloadData, "That's too long", CancellationToken.None);
                        return;
                    }


                    segment = new ArraySegment<byte>(buffer, count, buffer.Length - count);
                    result = await ws.ReceiveAsync(segment, CancellationToken.None);
                    count += result.Count;
                }


                var message = Encoding.UTF8.GetString(buffer, 0, count);
                Console.WriteLine(">" + message);
            }
        }

 

 

For using Basic Authentication, make sure to set ws.Options.SetRequestHeader properly as shown on this example.

 

Finally, add those lines on the Program.cs in order to start running:

 

            var clientTask1 = ChannelsTest.Client();
            Console.WriteLine("Press enter to finish");
            Console.ReadLine();

 

 

You are supposed to get messages every 30 seconds like this:

 

 

Conclusion

 

I hope you this is a great resource for you to start writing C# applications using PI Web API Batch and Channels. If you have comments or questions, please don't hesitate to post a comment here.

 

Have fun!