Skip navigation
All Places > PI Developers Club > Blog > Author: bshang

PI Developers Club

9 Posts authored by: bshang Employee

Motivation

 

Many use cases of async PI AF SDK methods will involve making multiple async calls and awaiting their results. For example, we can make an async call for each PI Point in a collection of these objects. This query seems similar to a synchronous bulk call using PIPointList or AFListData. However, the difference is that with async, we can vary the query parameters (such as time range) for each PI Point, which is not possible with the bulk call.

 

In this post, we introduce a pattern that allows PI AF SDK clients to consume the results of async calls as they are completed. One major motivation for doing so is greater concurrency. Processing the result of each async call as it completes allows us to use both CPU and IO resources more effectively.

 

The blog post will walk through some code patterns we can use when making multiple async calls in PI AF SDK.

 

Problem definition

 

We focus on an event frame use case here, although the concepts can apply to any use case involving "heterogeneous" queries across a list of PI Points or AF Attributes (i.e. the parameters of query are not the same for each PIPoint/AFAttribute so a bulk call cannot be used).

 

We would like to perform summary calculations on a list of event frames. For each event frame, we perform a summary calculation for a PI Point within the event frame time range. Each event frame has a different start and end time. There are multiple ways we can programmatically solve this problem.

  1. Synchronous loop over the list of event frames
  2. Parallel tasks over the event frames
  3. Async calls that are processed only after all tasks complete
  4. Async calls using Task.WhenAny to process tasks as they complete
  5. Async calls using IObservable<T> to process tasks as they complete

 

The code for these examples is hosted at:

GitHub - osisoft/PI-AF-SDK-AsyncToObservable

0. Find the Event Frames

AFDatabase database = PISystem.CreatePISystem("SERVER").Databases["AsyncExamples"];
AFEventFrameSearch efSearch = new AFEventFrameSearch(database, "EFSearch", new List<AFSearchToken> {
     new AFSearchToken(AFSearchFilter.Name, AFSearchOperator.Equal, "EF*"),
     new AFSearchToken(AFSearchFilter.Start, AFSearchOperator.GreaterThan, "01-jan-2012"),
     new AFSearchToken(AFSearchFilter.End, AFSearchOperator.LessThan, "01-jan-2013"),
});
IEnumerable<AFEventFrame> efs = efSearch.FindEventFrames(0, false, 1000).Take(10000);

 

We show this because we are using the new search in PI AF SDK 2016 to find our event frames. The important concept to know here is that the new search returns an IEnumerable collection of event frames that we can loop over. We just find the first 10,000 event frames to keep things simpler.

 

1. Synchronous loop over the list of event frames

public static class SyncExample
{
     public static void Run(IEnumerable<AFEventFrame> efs)
     {
          foreach (AFEventFrame ef in efs)
          {
               OutputInfo output = Common.GetSummary(ef);
               Common.WriteToConsole(output);
          }
          Common.WriteCompletion();
     }
}

 

This one is fairly simple. We just loop over the event frames and make a summary call on each (i.e. Common.GetSummary will make an AFData.Summary call). See the Github code for the implementation of the methods above but we try not to make them too important for these examples. Here, by "process", we simply choose to print some output info to the Console, but we target our discussion to be relevant for any generic "processing" tasks.

 

2. Parallel tasks over the event frames

public static class ParallelExample
{
     public static void Run(IEnumerable<AFEventFrame> efs)
     {
          Parallel.ForEach(efs, new ParallelOptions { MaxDegreeOfParallelism = 8 }, ef =>
          {
               OutputInfo output = Common.GetSummary(ef);
               Common.WriteToConsole(output);
          });
          Common.WriteCompletion();
     }
}

 

Here, we use .NET Task Parallel Library (TPL) to parallelize the calls over the event frames. TPL is convenient to use and perhaps ideal if our client machine has many cores, but our data access is over the network while calls to TPL ideally should be CPU-bound. For server-side code bound by threads, such as approach likely will not scale well. In addition, this is a poor fit for web services with many users as it can demand many threads per user.

 

3. Async calls that are processed only after all tasks complete

public static class TaskExample1
{
     public static void Run(IEnumerable<AFEventFrame> efs)
     {
          Task<IList<OutputInfo>> efTasks = GetOutputInfoList(efs);

          IList<OutputInfo> output = efTasks.Result;
          foreach (OutputInfo info in output)
          {
               Common.WriteToConsole(info);
          }
          Common.WriteCompletion();
     }

     private static async Task<IList<OutputInfo>> GetOutputInfoList(IEnumerable<AFEventFrame> efs)
     {
          Task<OutputInfo>[] tasks = efs.Select(async ef => await Common.GetSummaryAsync(ef)).ToArray();
          return await Task.WhenAll(tasks);
     }
}

 

Here, we launch an async call for each event frame (i.e. Common.GetSummaryAsync will make an AFData.SummaryAsync call). We launch them all at once and then await for them all to complete. This suspends the current execution until all calls complete. The method GetOuputInfoList only returns once we've received results from all of our async calls. The async caller in this case must block (with Task.Result) and can only write to the Console (i.e. process the results) once all the tasks are complete.

 

Note that we are achieving PI Data Archive concurrency here and will get client-side concurrency within processing by AF SDK code itself, but our custom code will not get client-side concurrency. In a producer-consumer analogy, we allow our producers to provide results concurrently, but we don't allow our consumers to consume until all the results are provided, and therefore, we are sacrificing some degree of concurrency in this approach.

 

4. Async calls using Task.WhenAny to process tasks as they complete

public static class TaskExample2
{
     public static void Run(IEnumerable<AFEventFrame> efs)
     {
          WriteOutputInfo(efs);
     }
     private static async void WriteOutputInfo(IEnumerable<AFEventFrame> efs)
     {
          List<Task<OutputInfo>> tasks = efs.Select(async ef => await Common.GetSummaryAsync(ef)).ToList();
          while (tasks.Count > 0)
          {
               var t = await Task.WhenAny(tasks);
               tasks.Remove(t);
               Common.WriteToConsole(await t);
          }
          Common.WriteCompletion();
     }
}

 

Above, we've shown the pattern introduced in this post by Stephen Toub. Instead of waiting for all tasks to complete via WhenAll, we wait for just one to complete via WhenAny. Then, we remove it from the list, extract the result of the task, and repeat until the list of tasks is empty. Note several bottlenecks in this case. First, we must materialize the task list. Second, removing the task from the list is O(N). Third, registering the continuation for each task is also O(N). The latter two result in an O(N^2) time complexity, not great if we have a large number of tasks. Although we allow our consumers to process at the same time our producers are producing, we've added extra overhead in the processing. In the same post, another approach is introduced to optimize further. We won't discuss it, but like to suggest a way to tackle this problem in a more "natural" way.

 

5. Async calls using IObservable<T> to process tasks as they complete

public static class ObservableExample
{
     public static void Run(IEnumerable<AFEventFrame> efs)
     {
          IObservable<OutputInfo> obs = efs.ToObservable()
          .Select(i => Observable.FromAsync(() => Common.GetSummaryAsync(i)))
          .Merge();

          obs.Subscribe(Common.WriteToConsole, Common.WriteCompletion);
     }
}

 

What we'd really like to do is launch a bunch of async calls and process the results in the order of task completion, not initialization. In essence, we'd like to "observe" the completion of tasks as a continuous (but bounded) stream. For this pattern, I'm a big proponent of using Rx.NET (NuGet: rx-main).

 

Above, we're using Rx.NET to create an observable sequence of results from tasks as they complete. From the IEnumerable<AFEventFrame>, we convert this to an observable sequence of event frames. For each event frame observed, we make an async summary call returning a Task<OutputInfo>. Here is an important step. Via Observable.FromAsync, we convert the asynchronous receipt of a single result from Task into the asynchronous observation of a sequence producing a single result. Wrapping the Task in this way also ensures it does not start right away but only when we have a subscribed observer! At this point, we have an IObservable<IObservable<OutputInfo>>. The Merge method allows us to "flatten" the observable hierarchy we just created into just a single sequence producing an OutputInfo object for each observed event frame (LINQ experts may recognize this is basically a SelectMany operation). Finally, we subscribe an implicit observer via the Subscribe method. Our observer's OnNext calls WriteToConsole and OnCompleted calls WriteCompletion.

 

The benefit of Rx is that it allows us to declaratively chain data pipelines together, as we could using LINQ to IEnumerable.

 

For example, we can implement buffering easily if we wanted to process a group of items. Below, we write the items to the Console in groups of 1,000.

IObservable<IList<OutputInfo>> obs = efs.ToObservable()
     .Select(i => Observable.FromAsync(() => Common.GetSummaryAsync(i)))
     .Merge()
     .Buffer(1000);
obs.Subscribe(Common.WriteBufferToConsole, Common.WriteCompletion);

 

What if we wanted to process the results in the order of the event frames? Easy, just change Merge() to Concat().

 

Observable sequences are an appropriate pattern to use when you want to process a list of tasks as they complete. The learning curve for Rx is steep, but it can really make code more readable, declarative, and shorter.

 

Bulk call analogy

 

There is an analogy here to the PI AF SDK bulk calls in terms of degree of "laziness". In the bulk call enumerable pattern, we (the client) will page through the results and ask the server to make them available. We do not have to wait for the server to return all of the results before we can start processing. In an async pattern, we can achieve a similar "laziness" by processing the list of tasks as they complete. We do not have to wait for all tasks to complete before processing the results. In the former, we still synchronously "pull" to ask for the next results. In the latter, we receive the results via an asynchronous "push". If you've read my earlier post on using Rx with AFDataPipe, you won't be surprised that the interface to use in the latter case is IObservable<T>

 

If there is one takeaway from this post, it is the table below (inspiration from GitHub - Reactive-Extensions/Rx.NET). It shows the analogy we can draw between synchronous and asynchronous method interfaces and how this analogy is fleshed out in our own PI AF SDK methods.

 

PatternSingle return valueMultiple return values
Return typePI AF SDK exampleReturn typePI AF SDK example
Pull/Synchronous/InteractiveTPIPoint.RecordedValuesIEnumerable<T>PIPointList.RecordedValues
Push/Asynchronous/ReactiveTask<T>PIPoint.RecordedValuesAsyncIObservable<T>Rx.NET example in this blog post

 

Using a PI AF SDK example of RecordedValues calls, we have

T = AFValues

We encourage you to check the PI AF SDK methods listed in the table above to verify. IObservable<T> return type is currently not natively supported in PI AF SDK as of 2016 for historical data retrieval, but we have shown here it is possible to implement this pattern yourself.

 

Throttling

 

There is currently a client-side cap on the number of outstanding calls to a PI Data Archive. If you are making multiple async calls, it is important to keep this cap in mind. Having a queue of tasks or using an async Semaphore pattern can be a way to throttle the calls.

 

References

 

The ideas we've introduced here are not new and have been extensively discussed in the .NET literature. They are worth revisiting however if you plan to use some of the async methods introduced in PI AF SDK 2016. Here is a list of some of our favorites:

 

Introduction to Rx

Async Iterators - Dave Sexton Blog

Processing tasks as they complete - .NET Parallel Programming - Site Home - MSDN Blogs

Help me evangelize Rx: Select + async selector lambda  

Async, await, and yield return

c# - Asynchronous iterator Task<IEnumerable<T>> - Stack Overflow 

Proposal: language support for async sequences · Issue #261 · dotnet/roslyn · GitHub 

Motivation

 

Do you want to potentially increase the speed of your AF SDK asset searches by more than one order of magnitude? If so, please read on .

 

PI AF SDK 2016 introduced new methods to perform searches against assets, such as AF Elements and Event Frames. The goal of this blog post is to demonstrate the superior performance and usage of the new search and convince you that you should be using it

 

We will present a series of generic use cases and compare "new" and "traditional" methods for searching.

 

  1. Find and process a list of elements belonging to a template
  2. Find and process a list of elements with a certain attribute value
  3. Find and process a list of child elements
  4. How the new search obtains a consistent snapshot of a collection

 

We use the term "process" generally here to mean tasks such as modifying the properties of the element, querying its attributes' values, or signing up its attributes to a data pipe.

 

We use an example AF Database that has one root element called LeafElements and 10,000 child elements. Half of the child elements belong to the "Leaf_Rand" element template and the other half to the "Leaf_Sin" element template. Both element templates have attribute templates with a mix of static and PI Point data references. We adopted the AF database structure from our Large Asset Count Project. Please note we try here to not make the details of the AF database important, but merely provide a generic database from which to demonstrate examples.

 

Note: This post is part of the blog series on new AF SDK 2016 features.

 

 

Examples

Our examples are run on an Azure VM with the following specs:

  •      7 GB RAM
  •      AMD 2.10 GHz
  •      SQL Server 2014 Express
  •      AF Server and AF Client 2.8.0.7444

PI Data Archive, AF Server, SQL Server, and AF client application are all on the same machine. We don't guarantee the performance numbers below for your environment and network. These results are just a small data point in a large design space, but we hope it can provide you a reference and encourage you to perform your own tests.

 

 

1. Find and process a list of elements belonging to a template

 

Traditional search

const int pageSize = 1000;
int startIndex = 0;
int totalCount;
AFNamedCollectionList<AFElement> myElements;

do
{
     myElements = AFElement.FindElementsByTemplate(database,
                    null,
                    database.ElementTemplates["Leaf_Sin"],
                    false,
                    AFSortField.Name,
                    AFSortOrder.Ascending,
                    startIndex,
                    pageSize,
                    out totalCount);
     ProcessElements(myElements);

     startIndex += pageSize;
} while (startIndex < totalCount);

 

New search

AFSearchToken templateToken = new AFSearchToken(AFSearchFilter.Template, AFSearchOperator.Equal, database.ElementTemplates["Leaf_Sin"].GetPath());
AFElementSearch elementSearch = new AFElementSearch(database, "FindSinusoidElements", new[] { templateToken });
elementSearch.CacheTimeout = TimeSpan.FromMinutes(10); // Opt in to server-side caching

IEnumerable<AFElement> elements = elementSearch.FindElements(0, false, 1000);
foreach (AFElement element in elements)
{
     ProcessElement(element);
}

 

Code comparison

You can already notice some differences between the traditional and new patterns of searching.

  • The newer pattern for searching presents a more uniform abstraction. The class AFElementSearch represents the search. The search query is described by a list of AFSearchToken objects. Each token corresponds to an AFSearchFilter denoting what condition or property we want to filter on. The search is executed via a FindElements call and the caller receives an IEnumerable<AFElement> query result that can be looped through. LINQ aficionados should be excited about this
  • The former pattern relies on various method overloads of FindElements to specify the search criteria. These methods also have long argument lists that can be cumbersome to build. These methods also require the developer to write "wrapper" code such as a do-while loop to loop through the found elements and also keep track of the loop state. The do-while pattern tends to force the developer to think in terms of pages or batches, when in many cases, a foreach loop on individual elements can be more natural. In many cases, code using the new search will be more declarative and easier to maintain.

 

Performance comparison

I know what you are thinking. What about performance?

This search returns 50,000 elements and we restarted the SQL Server after each run. Our ProcessElement(s) methods simply write the AF Element name to the Console. For more expensive tasks, the timings below would be expected to be higher.

 

Traditional search: 3.2 minutes

New search without caching: 1.1 minutes

New search with caching: 0.15 minutes

 

Why is the new search faster?

  • The new search query does not require a sort (to be executed by SQL Server). Note that FindElements requires us to specify an AFSortField, but we do not need to specify one when creating an AFElementSearch. In the new search, we can opt in to sorting by using a token with AFSearchFilter.SortField.
  • The new search allows you to opt in to caching object identifiers of found elements on the server. This is done via setting the AFElementSearch.CacheTimeout property. This effectively takes a "snapshot" of the found collection, caches it, and provides a server-side iterable that the AF client can consume. This is enabled via Line 3 of the "New search" code above. Caching of identifiers allows SQL Server to retrieve subsequent items faster by avoiding repetitive queries. The traditional search which does not implement caching will incur more overhead on the SQL Server side.
  • Should you opt in to caching when using the new search? Let's see what the docs for CacheTimeout say: If you will only be getting items from the first page, then it is best to leave the cache disabled. If you will be paging through several pages of items, then it is best to enable the cache by setting this property to a value larger than you expect to be using the search.
  • Exercise for the interested reader: You can use SQL Server Profiler to look at the differences in implementation between the traditional search and new search (with and without caching).

 

Let's look at another example (if you are not yet convinced )

 

 

2. Find and process a list of elements with a certain attribute value

 

Traditional search

const int pageSize = 1000;
int startIndex = 0;
int totalCount;
AFNamedCollectionList<AFElement> myElements;

AFElementTemplate template = database.ElementTemplates["Leaf"];
AFAttributeValueQuery query = new AFAttributeValueQuery(template.AttributeTemplates["SubTree"], AFSearchOperator.Equal, "1");
do
{
     myElements = AFElement.FindElementsByAttribute(
                    null,
                    "*",
                    new AFAttributeValueQuery[] { query },
                    true,
                    AFSortField.Name,
                    AFSortOrder.Ascending,
                    startIndex,
                    pageSize,
                    out totalCount);
     ProcessElements(myElements);

     startIndex += pageSize;
} while (startIndex < totalCount);

 

New search

AFElementTemplate template = database.ElementTemplates["Leaf"];
AFSearchToken templateToken = new AFSearchToken(AFSearchFilter.Template, AFSearchOperator.Equal, template.GetPath());
AFSearchToken valueToken = new AFSearchToken(AFSearchFilter.Value, AFSearchOperator.Equal, "1", template.AttributeTemplates["SubTree"].GetPath());
AFElementSearch elementSearch = new AFElementSearch(database, "FindSubTreeElements", new[] { templateToken, valueToken });
elementSearch.CacheTimeout = TimeSpan.FromMinutes(10); // Opt in to server-side caching

IEnumerable<AFElement> elements = elementSearch.FindElements(0, false, 1000);
foreach (AFElement element in elements)
{
     ProcessElement(element);
}

 

Search tokens are ANDed together

Here, we have two tokens. As mentioned in the search query syntax documentation, please note that all the search tokens are ANDed together.

 

Performance comparison

This search returns 10,000 elements.

Traditional search: 0.20 minutes

New search without caching: 0.14 minutes

New search with caching: 0.09 minutes

 

Wow, this new search is amazing. Fewer lines of code and better performance. A developer's paradise

 

 

3. Find and process a list of child elements given a parent

 

Traditional search

const int pageSize = 1000;
int startIndex = 0;
int totalCount;
AFNamedCollectionList<AFElement> myElements;

do
{
     myElements = AFElement.FindElements(database,
                    database.Elements["LeafElements"], 
                    "*",
                    AFSearchField.Name, false,
                    AFSortField.Name,
                    AFSortOrder.Ascending,
                    startIndex,
                    pageSize,
                    out totalCount);
     ProcessElements(myElements);

     startIndex += pageSize;
} while (startIndex < totalCount);

 

New search

AFSearchToken rootToken = new AFSearchToken(AFSearchFilter.Root, AFSearchOperator.Equal, database.Elements["LeafElements"].GetPath());
AFSearchToken descToken = new AFSearchToken(AFSearchFilter.AllDescendants, AFSearchOperator.Equal, "false");
AFElementSearch elementSearch = new AFElementSearch(database, "FindLeafElements", new[] { rootToken, descToken });
elementSearch.CacheTimeout = TimeSpan.FromMinutes(10); // Opt in to server-side caching

IEnumerable<AFElement> elements = elementSearch.FindElements(0, false, 1000);
foreach (AFElement element in elements)
{
     ProcessElement(element);
}

 

Performance comparison

This search returns 100,000 elements.

Traditional search: 9.6 minutes

New search without caching: 5.4 minutes

New search with caching: 0.32 minutes

 

 

4. How the new search obtains a consistent snapshot of a collection

 

We mentioned earlier that the new search has the ability to cache the search results. See for example the properties under AFElementSearch Class. If your application is using the traditional paging pattern and another client is modifying that collection, you may miss items or see duplicates. If you use the new search and opt in to server-side caching, then upon the initial search call, the server will take a "snapshot" of the found items and cache their identifiers. The server will use this cache to provide items as the client iterates. Thus, the client will see a consistent snapshot of the collection at the time of the query and be immune from any modifications to the query result set that could occur as it iterates through the results.

 

 

AFElementSearchBuilder

 

I'm a fan of using the Builder Pattern to construct complex objects. Notice in the above, we have to first construct the filters before constructing the search object. This seems a little backwards. Intuitively, we'd like to be able to construct the search object and then add our filters.

 

We provide an AFElementSearchBuilder class to help with this. Example usage is below:

AFElementSearch elementSearch = AFElementSearchBuilder.Create()
     .SetDatabase(database)
     .SetName("FindLeafElements")
     .AddToken(new AFSearchToken(AFSearchFilter.Root, AFSearchOperator.Equal, elementPath))
     .AddToken(new AFSearchToken(AFSearchFilter.AllDescendants, AFSearchOperator.Equal, "false"))
     .Build();

List<AFElement> elementsList = elementSearch.FindElements(0, false, 1000).ToList();

 

You can follow a similar pattern to write your own builders for AFEventFrameSearch.

 

As mentioned in the search query syntax documentation, please note that all the search tokens are ANDed together.

 

 

Conclusion

 

I hope this post demonstrates that using the new search in AF SDK can be a valuable investment and that it is not that much code to transition. It is desirable when some of the following are true:

  • Your AF database contains large collections (10,000 elements or event frames+)
  • Your applications perform processing on large collections of elements or event frames.
  • Your applications don't require the returned collection to be sorted. You can still opt-in to sorting using AFSearchFilter.SortField.
  • You find that some of your current asset search implementations are slow
  • You want to be ensured that the server provides consistent snapshots of your collections

 

For more information, please consult the PI AF SDK Reference:

Search Query Syntax Overview

AFSearch Class

 

Thank you for reading and as always, questions and comments are welcome below. Please look forward to the next post in this series on asynchronous data access!

bshang

PI AF SDK Basic Samples

Posted by bshang Employee Nov 24, 2015

Dear PI geeks,

 

A new Github repository has been created for newcomers to PI AF SDK.

 

PI-AF-SDK-Basic-Samples

 

osisoft/PI-AF-SDK-Basic-Samples · GitHub

 

This repository provides self-contained and reproducible samples of basic operations using PI AF SDK. The aim is to provide an accessible, hands-on learning experience with the PI AF SDK using code examples that are short, portable, and easy to follow. Many of the examples follow the Examples in the PI AF SDK online reference guide but in the context of a sample NuGreen AF Database.

 

Each example demonstrates a basic operation within the PI AF SDK. These examples present PI AF SDK as a set of basic building blocks. The goal is to enable the user to learn the basic operations of PI AF SDK so they then have the tools to build more specific applications and a foundation to learn more advanced PI AF SDK concepts.

 

The aim is not to cover the entire PI AF SDK, but just enough to get started and understand the broader picture. As experienced users may know, there are many ways to get to a result in PI AF SDK and these examples will only aim to cover a small, but relevant portion.

 

We'll be adding additional examples in the future and welcome any suggestions and contributions.

bshang

Arduino To PI with PI Web API

Posted by bshang Employee Nov 11, 2015

Motivation

 

Internet of Things is all the craze these days. It's currently at the top of the Gartner hype cycle. It's only appropriate that I blog about it soon .

 

In this post, I will show a simple example that collects temperature data by my desk from a sensor, Arduino Yun, PI Web API, and a PI System. I will also show how something very simple helped us learn some interesting things about temperature behavior in the office.

 

Components

 

The components for a sensor-to-PI System data flow are as follows:

 

Sensor: For the sensor, I chose a simple temperature sensor here. It provides a simple low voltage output linearly proportional to the Celcius temperature.

Development Board: I chose the Arduino Yun because it is natively Ethernet and Wi-Fi capable. This component is the device that runs the code. To learn more about it, I encourage you to read the links shared before.

Gateway device: This is the device that offers Internet connectivity. It can be a home router, a cellular router, or another specialized gateway. In this example, we used a cellular router. The development board and router are connected via ethernet cable although Wi-Fi could do as well (as long as the router includes a Wi-Fi access point).

Cloud endpoint: PI Web API is the cloud endpoint here. Another choice could be Azure Event Hubs. The endpoint should be HTTP-accessible although other protocols could be used if the device supports it.

PI System: The PI System is the persistence layer that stores the temperature data.

 

Here is a picture of the setup:

arduino_setup.JPG

 

As you see, there are some additional components, such as a breadboard and wires. For the electronics setup, I followed the examples provided in the links below:

SIK Experiment Guide for Arduino - V3.2 - learn.sparkfun.com

Arduino - TemperatureWebPanel

 

Code

 

As seen in the diagram, The Arduino Yun actually contains two processors: an ATmega 32U4 and AR9331 Linux. The ATmega 32U4 does not include an operating system and can only support code written in the Arduino programming language (which has flavors of C).  The AR9331 runs a version of Linux that supports a Python interpreter so Python code can be written on it.

 

Analog inputs, such as those from the temperature sensor, can only be read via the ATmega 32U4. However, Ethernet and Wi-Fi are offered only via the AR9331. Therefore, a Bridge abstraction is used to pass data between the two processors so the analog input can be sent over Internet to PI Web API (This Bridge abstraction is actually implemented via custom TCP sockets but the end developer doesn't need to know too much about this, as you'll see later).

 

There are two Bridge libraries, one on each processor. These libraries can be used to pass data as key-value pairs between code running on the ATmega 32U4 and AR9331.

ATmega 32U4 Bridge library

AR9331 Bridge library (Python)

 

ATmega 32U4 code:

arduino-sketch-temperature · GitHub

 

#include <Bridge.h>

const int temperaturePin = 0;

void setup()
{
  Bridge.begin();
}


void loop()
{
  float voltage, degreesC, degreesF;
  voltage = getVoltage(temperaturePin);
  degreesC = (voltage - 0.5) * 100.0;
  degreesF = degreesC * (9.0/5.0) + 32.0;

  Bridge.put("TEMP", String(degreesF));

  delay(5000); // repeat once per 5 seconds (change as you wish!)
}


float getVoltage(int pin)
{
  return (analogRead(pin) * 0.004882814);
}

 

As you can see, analogRead(pinNumber) is all that's needed to reach from the analog input on the Arduino Yun. Bridge.put is used to pass the temperature to the Python code running on the other processor.

 

 

 

AR9331 code:

arduino-python-temperature.py · GitHub

 

#!/usr/bin/python

import sys
sys.path.insert(0, '/usr/lib/python2.7/bridge/')
import urllib2, urllib

import random
from time import gmtime, strftime, sleep

from bridgeclient import BridgeClient as bridgeclient

client = bridgeclient()

print "python script is starting"

url = 'https://<SERVER>/piwebapi/streams/{webId}/value'

password_manager = urllib2.HTTPPasswordMgrWithDefaultRealm()
password_manager.add_password(None, url, 'username', 'password')

auth = urllib2.HTTPBasicAuthHandler(password_manager) # create an authentication handler
opener = urllib2.build_opener(auth) # create an opener with the authentication handler
urllib2.install_opener(opener) # install the opener... 

req_headers = {'Content-Type': 'application/json'}

while 1:
    val = client.get('TEMP')

    ts = strftime('%Y-%m-%d %H:%M:%S', gmtime()) + 'Z'

    print ts, val

    #urllib2
    req_data = str({"Timestamp": ts, "Value": val})
    request = urllib2.Request(url, headers = req_headers) 
    request.add_data(req_data)

    try:
        response = urllib2.urlopen(request).read()
    except:
        print "Unexpected error:", sys.exc_info()[0]

    sleep(5)

 

We use the Python bridgeclient class to read the temperature passed from the ATmega. We then use the urllib2 Python library to make the REST call to PI Web API.

 

 

Office Temperature Data

 

Let's look at the temperature data collected over 4 days. Interestingly, there is a clear pattern in temperature throughout the day, although there are some differences between days as well. It gets warmer during the afternoon (just in time for a nap) and for the past two days, the heater was turned on the morning.

 

San Leandro:

blog_temp.PNG

 

We also collected temperature data with a Raspberry Pi in the Philadelphia office and compared office desk temperatures.

 

San Leandro (green) vs. Philadelphia (cyan):

blog_temp_compare.PNG

 

Clearly, there is a large difference in temperature between the two office desks (the Philadelphia sensor is also more accurate). There are also some patterns in temperature behavior in the Philadephia office but they are less pronounced, and the temperature variations are smoother. Such insight likely cannot be gained from temperature data from a building management system, which likely gives a more global or averaged view of temperature. There are likely temperature variations within a building, and even on a single floor, depending on closeness to windows, etc. Here, with small sensors and development boards, we can get finer spatial resolution to understand how temperature behaves locally near distributed office desks.

 

Conclusion

 

We showed how simple IoT devices like Arduinos and basic sensors could be used to bring data into the PI System, via PI Web API. We then showed some interesting insights that could be gained from only a few days worth of data collection from two temperature streams. Much more can probably be learned via data collected at the scale of industrial IoT. This concludes the blog post and I hope you may find it helpful for your research. As always, questions/comments are welcome below.

Motivation

 

Functional Reactive Programming (FRP) has gained momentum in recent years as a means for expressing application logic centered around asynchronous data flows. FRP has foundations in functional languages such as Scheme, Haskell, and Scala, but FRP libraries have emerged in recent years in object-oriented languages such as C#, Java, JavaScript, and Python. You will find FRP principles used in modern web and cloud applications (Microsoft, Google, Facebook, Twitter, Netflix, Github). There is even a course for it now on Coursera (Principles of Reactive Programming).

 

How can FRP be used with PI System data? What can I do (or can't do) with FRP + AF SDK? That is the topic of this series of blog posts.

 

This introductory blog post will first briefly cover the basic idea of FRP and then show how in less than 40 lines of code, we can create integrate AF SDK into Rx.NET (a library for reactive programming), immediately giving us a treasure trove of tools such as LINQ semantics for transforming, aggregating, and analyzing observable data streams.

 

GitHub repository

 

The GitHub repository for the code library and examples are here:

 

bzshang/AFSDK-Rx · GitHub

 

It will be updated further in the next few weeks with more functionality and examples.

 

Most of the idea and concepts here I learned from this great resource.

 

Introduction to Rx

 

If interested, I highly recommend reading and re-reading the material above as it may help you discover additional ideas other than those here for programming reactively with AF SDK.

 

What is FRP? What is Rx.NET?

 

What exactly is FRP? While this blog post won't go into the details of FRP, I've linked a few resources below which help to introduce the concept. See References below.

 

The FRP-style of programming has been implemented in C# via Reactive Extensions for .NET (Rx.NET), which is available as a NuGet package.

 

For now, I will try to give my take on "What is FRP" in hopes that it may resonate with others.

 

Reversing the Enumerable

 

We can start to understand FRP via the more familiar IEnumerable interface in C#. With an enumerable collection, we "pull" the next item out (via foreach) and then do something with that item . This is the basic idea of the Iterator pattern.

 

In FRP, we have the IObservable interface. Items are "pushed" from the observable collection, and we react to them. This is the basic idea of the Observer pattern.

 

The Iterator and Observer patterns are duals. We "pull" in the former, and "push" with the latter. In a sense, Iterator (= Enumerator) is to Enumerable as Observer is to Observable.

 

iterator-observer.png

From http://www.cs.cornell.edu/courses/cs2112/2014fa/lectures/

 

In the former, the "pull" (Iterator.MoveNext) is a blocking call. The consumer must wait for the data source (or time out) before doing something with the item. In the latter, the receipt of the item is non-blocking. As soon as the item arrives, it is "pushed" to us.

 

Why Not C# Events?

 

The asynchronous "push" pattern appears often programming languages, expressed via constructs such as events, event handlers, delegates, callbacks, promises, futures, etc. What are the downsides to these constructs?

 

1) They don't treat the event as a first-class object. Therefore, we cannot perform further composition like we can with class types.

2) Because of this lack of composition, it becomes harder to write application logic that depends on sequences of events.

3) Similarly, it becomes harder to write application logic that depends on events from different sources.

 

As mentioned, one problem when writing event-driven applications is that events are not treated as first-class objects, in the sense that they don't offer any further means of combination. Once you have the event, you can only subscribe/unsubscribe to it, but the language doesn't offer any additional ways to reason about it. In particular, it is hard to reason about sequences of events.

 

This is where the observable comes in. What if we could represent a stream of events as collections? After all, the number of events can range from zero to infinite. Specifically, we can conceive of events as observable sequences, and represent them as an IObservable<T>. Just as we can take a familiar collection (array, list, etc.) and expose it via IEnumerable<T> type, we can do similarly for event collections.

 

See Intro to Rx - Why Rx? and End-to-End Reactive Programming at Netflix for additional discussion.

 

LINQ to Observables

 

Once we have the IObservable<T>, we now have events as first-class objects and can begin to develop means of combination for them. By analogy with LINQ to Enumerables (Objects), we can begin to develop a LINQ to Observables, and use familiar query methods such as Select, Where, and GroupBy to transform, aggregate, and analyze events just as we would with our familiar enumerable collection types.

 

This is where Rx.NET comes in. It offers methods to convert existing events into observable sequences and provides a framework (via LINQ-like extension methods) for manipulating and composing observable sequences. It also offers a framework for implementing the Observer pattern that handles scheduling and concurrency issues that can arise in an asynchronous programming model.

 

When should I use Rx?

 

In the context of AF SDK, when should you think about using Rx?

 

I would argue anytime your application logic is driven by composite real-time events, whether it is real-time data or metadata changes. I mention composite because I think it's overkill if the application logic typically only is driven by one event at a time (for these cases, a Task-based async pattern or plain-old C# Events might be a better fit). The need to handle composite events can occur in real-time calculation engines and applications that synchronize data among disparate systems. This doesn't mean you have to use Rx, but it is useful to consider what it could offer. The learning curve is steep but the rewards can be plentiful.

 

An Observable AFDataPipe

 

Enough with the theory. Now onto the practical. In this example, we will create an output time-series stream that represents the sum of two input time-series streams. This is very much like creating a Formula data reference or Analysis calculation that adds the values from two PI Point inputs. Specifically, we sign up for real-time events for SINUSOID and CDT158 and produce an observable sequence of AFDataPipeEvent types representing their sum.

 

This simple example is meant to just give a flavor of FRP using AF SDK with Rx.NET. Rx.NET is a huge library, and this example only scratches the surface.

 

The objectives are as follows:

 

1) From an instance of AFDataPipe, create an IObservable<AFDataPipeEvent>

2) Filter and split the observable sequence into two separate observable sequences exposing snapshot values of SINUSOID and CDT158.

3) Combine the two snapshot sequences into a single observable sequence representing the sum.

 

1) From an instance of AFDataPipe, create an IObservable<AFDataPipeEvent>.

 

This is perhaps the most important part. Without it, the rest is not possible.*

 

First, experienced AF SDK developers may ask: "But wait, doesn't AFDataPipe already implement the Observer pattern, since it exposes a Subscribe method and calls its observers' OnNext methods when new events arrive (via GetObserverEvents)?" This is true. However, although it can be argued that AFDataPipe implements the Observer pattern, it does not actually implement the IObservable<AFDataPipeEvent> interface. This is actually good for us. In Rx.NET, we do not want to actually implement the interfaces ourselves, but use the framework's factory methods to do this for us. This is because the factory methods also take care of scheduling, locking, and concurrency considerations, which we will probably miss by implementing the patterns ourselves. Rx.NET provides a rich library of extension methods for observables, and its self-created observable implementations will operate much more nicely with these extension methods than our custom-created ones.

 

So how to we actually create the IObservable<AFDataPipeEvent> from AFDataPipe? The code to do so is below and less than 40 lines.

 

using System;
using System.Timers;
using System.Reactive.Linq;
using System.Reactive.Disposables;

using OSIsoft.AF.Data;

namespace AFSDK.Rx
{
    public static class AFDataPipeExtensions
    {
        public static IObservable<AFDataPipeEvent> CreateObservable(this AFDataPipe dataPipe, double interval)
        {
            return Observable.Create<AFDataPipeEvent>(observer =>
            {
                IDisposable dpToken = dataPipe.Subscribe(observer);

                Timer timer = new Timer();
                timer.Interval = interval;
                timer.Elapsed += (object sender, ElapsedEventArgs e) =>
                {
                    bool hasMoreEvents = true;
                    while (hasMoreEvents)
                    {
                        dataPipe.GetObserverEvents(out hasMoreEvents);
                    }
                };

                timer.Start();

                return Disposable.Create(() =>
                {
                    timer.Dispose();
                    dpToken.Dispose();     
                });
            });
        }
    }
}

 

How does this all work? First, the above is an extension method for AFDataPipe. The method CreateObservable accepts one parameter (interval) that controls how often GetObserverEvents should be called. The method returns the IObservable<AFDataPipeEvent> we are looking for.**

 

The key line is line 14, the call to Observable.Create. This is the factory method in Rx.NET that allows us to create IObservable<T> types. The method accepts a delegate which represents what the Subscribe method would have been had we implemented IObservable<T> ourselves. The delegate takes in an IObserver as input. Inside the delegate body, we subscribe the observer to the data pipe via AFDataPipe.Subscribe and set up the polling timer. The timer's Elapsed event fires every time the configured interval is elapsed and calls GetObserverEvents. Internally, this method will call the OnNext method of the observer we passed in. We return an IDisposable where the Dispose implementation is defined via a delegate passed to Disposable.Create.

 

Note that the CreateObservable method is lazy. None of the delegates are actually invoked in this call. We are simply registering a method to be executed later when an observer actually subscribes to our observable via the call to IObservable.Subscribe. Rx.NET saves us the trouble of having to create an actual type implementing the IObservable. We just supply the implementation for IObservable.Subscribe. Rx.NET will perform the bootstrapping that creates the type implementing IObservable.

 

It is worth mentioning the broader pattern here. To move towards a true IObservable from AFDataPipe, we've sort of created our own Subscribe method implementation (this is the delegate passed into IObservable.Create). Inside our custom Subscribe, we call into the native AFDataPipe.Subscribe. We've created a subscription wrapper method to surface the underlying AFDataPipe as an IObservable<AFDataPipeEvent>.

 

Here is how to create the observable from AFDataPipe and the CreateObservable extension method.

 

AFElement element = AFObject.FindObject(@"\\AFServer\Sandbox\Reactive") as AFElement;
AFAttributeList attrList = new AFAttributeList(element.Attributes);

AFDataPipe dataPipe = new AFDataPipe();
var errors = dataPipe.AddSignups(attrList);

IObservable<AFDataPipeEvent> obsDataPipe = dataPipe.CreateObservable(); 

 

2) Filter and split the observable sequence into two separate observable sequences exposing snapshot values of SINUSOID and CDT158

 

This is where all the work we did in 1) pays off. Once we have the IObservable<AFDataPipeEvent> pumping out events, further operations can take advantage of the vast extension methods created for observables in Rx.NET. Since observables and iterators are essentially duals, it is no surprise that these extension methods have the flavor of familiar LINQ operators.

 

Because we've signed up both SINUSOID and CDT158 attributes to our AFDataPipe, the IObservable<AFDataPipeEvent> will have events for both of these PI Points. We can split or filter the observable sequence into two observable sequences: one for SINUSOID and one for CDT158. How can we do this? Via the Where (or Filter) operator of course!

 

For example, if I only want a stream of SINUSOID snapshots, here is the code:

 

IObservable<AFDataPipeEvent> sinusoidSnapshots = obsDataPipe.Where(evt => evt.Value.Attribute.Name == "SINUSOID" && evt.Action == AFDataPipeAction.Add);

 

I can do similarly for CDT158.

 

3) Combine the two sequences into a single observable sequence representing the sum.

 

I have two streams (sinsuoidSnapshots and cdt158Snapshots). Now, whenever a new snapshot value comes in from either stream, we want to emit a new value representing the sum of the latest snapshot values (i.e. natural scheduling). This is where the CombineLatest operator comes in. The returned stream is of the type IObservable<AFDataPipeEvent[]>. To get the sum, we can use Select (i.e. Map) to transform the stream into an IObservable<AFDataPipeEvent>, where the AFDataPipeEvent encapsulates the sum in an AFValue. I've written an Add extension method to capture these operations.

 

public static IObservable<AFDataPipeEvent> Add(this IObservable<AFDataPipeEvent> source1, IObservable<AFDataPipeEvent> source2)
{
    return source1
        .CombineLatest(source2, (s1, s2) => new[] { s1, s2 })
        .Select(evts =>
        {
            double sinusoid = Convert.ToDouble(evts[0].Value.Value);
            double cdt158 = Convert.ToDouble(evts[1].Value.Value);
            double result = sinusoid + cdt158;

            DateTime timestamp = evts[0].Value.Timestamp > evts[1].Value.Timestamp ? evts[0].Value.Timestamp : evts[1].Value.Timestamp;

            AFValue afVal = new AFValue(result, timestamp);
            AFDataPipeEvent dpEvent = new AFDataPipeEvent(AFDataPipeAction.Add, afVal);

            return dpEvent;
        });
}

 

Here is the usage of the method above.

 

IObservable<AFDataPipeEvent> sumStream = sinusoidSnapshot.Add(cdt158Snapshot);

 

Now that we have the sumStream as an IObservable<AFDataPipeEvent>, we can subscribe to it as an observer. In Rx.NET, we can call the Subscribe method on the observable and pass in the implementation of the OnNext, OnError, and OnCompleted methods as delegates.*** Here, we will just implement the OnNext and have it write the timestamp and value to the console.

 

IDisposable subscription = sumStream.Subscribe(val =>
{
    Console.WriteLine("Timestamp: {0}, SINSUOID + CDT158: {1}", val.Timestamp.ToString("HH:mm:ss.ffff"), val.Value);            
});

 

console_output.PNG

 

Limitations and Assumptions

 

It is important to discuss the limitations and assumptions made in the implementations above.

 

1) The data is in order. If an out-of-order event comes in, I do not update the sumStream and emit an out-of-order event to sumStream subscribers. I could implement this by not restricting to only AFDataPipeAction.Add in the event filter.

2) The data is not late arriving. Every time a new value from one stream comes in, I'm assuming I have the latest value for the other stream.

3) There is no error handling for backpressure scenarios. For example, if new events come in at a faster rate than my OnNext can process them, Rx.NET will buffer internally, but I do not account for when and how to handle growing buffers.

4) I'm sure I forgot many other ones. This is a placeholder for them

 

What's Next

 

Here are some topics I plan to discuss next.

 

1) How to implement a more robust Observable data pipe using an EventLoopScheduler along with recursive scheduling

2) How to handle late arriving data. This probably involves the use of Virtual Time Scheduling in Rx. Here, the AFValue timestamps drive the Rx scheduler, not the wall clock time. We will need to introduce some notion of consensus among streams for when the "virtual" time should be advanced, and hence, when scheduled operations are executed.

3) How to perform sampling, aggregations, sliding window operations, moving average calculations, and other LINQ to Observable functions.

 

See the updating GitHub repo below:

bzshang/AFSDK-Rx · GitHub

 

Notes

 

* Functional geeks will recognize this as the anamorphism (i.e. entering the monad).

** Our handler for timer.Elapsed could be improved. For example, we could handle AFErrors returned by GetObserverEvents and have logic to deal with disconnections and metadata changes. The coding and logic is more complex, and are left out here to focus the scope on using Rx.NET. Our use of Timers is also undesirable, for various reasons that I may discuss in another post. For a more robust implementation that takes advantage of Rx Schedulers and recursive scheduling, see the GitHub repository.

*** We could very well implement IObserver ourselves, but just like with IObservable, it is best to leave the actual creation of the concrete type to the Rx.NET framework and just specify minimally the interface methods (via delegates).

 

A Word About OSIsoft.AF.Reactive

 

vCampus members may remember a prior set of libraries (OSIsoft.AF.Reactive) that implemented the Observer pattern with AFDataPipe. The typical use case was for creating PI adapters to/from Microsoft's StreamInsight. There are some non-trivial differences between the Observable implementation introduced there and this one.

 

Here, I will argue the benefits of using our Observer implementation that leverages Rx.NET.

 

1) The vCampus library used a prior version of the AFDataPipe that lacked the Subscribe and GetObserverEvents method. Hence, an observable wrapper was written that allowed "subscription" to an AFDataPipe. Also, since GetObserverEvents was unavailable, a custom method was written to call an observer's OnNext method after a GetUpdateEvents call. However, GetObserverEvents should be better performing in that it avoids buffering up an AFValue list - it passes the AFValue directly to the observer through AFDataPipeEvent.

 

2) The vCampus library implemented its own IObservable and IObserver implementations, rather than use a framework like Rx.NET. It is entirely possible to integrate the vCampus library with Rx.NET. After all, much of Rx.NET is extension libraries for IObservables. However, these extension methods work "better" (I know, a bit hand waving here) when the framework creates the observable types itself, which is done in our implementation.

 

3) The vCampus library also implemented the Subject interface (ISubject = IObservable + IObserver) itself. As with the other interfaces, if Rx.NET is used, it is recommended to have the framework create the Subject itself.

 

4) The vCampus library implemented the ISubject as a proxy to convert from one IObservable<T> to another IObservable<T2>. The Subject subscribes to the source observable, converts the type, and then exposes its own observable with the new type. Using Rx.NET, we can completely eliminate creating proxy subjects for this purpose, via the Select and SelectMany (i.e. Map) extension methods on the IObservable<T>.

 

References

 

Intro to Rx - Why Rx?  (IMO the best overview and documentation for Rx.NET)

 

Reactive Extensions  - Push vs. Pull (the best IMO description of IEnumerable vs. IObservable)

 

The introduction to Reactive Programming you've been missing · GitHub (a helpful beginner's intro that walks through the concepts)

 

RxMarbles: Interactive diagrams of Rx Observables (marble diagrams illustrating the various Rx extension methods)

 

Reactive Extensions for .NET (landing page for the Rx.NET library)

 

101 Rx Samples - a work in progress - Reactive Framework (Rx) Wiki (a good repository of Rx.NET examples)

 

StackOverflow - What is Functional Reactive Programming? (SO offers some answers)

 

Monads, part one | Fabulous adventures in coding (C# expert Eric Lippert elegantly describes the concept of a monad, a design pattern for types that forms the basis of LINQ and Rx. I will honestly admit I don't understand all of the concepts here, but that's sometimes the beauty... at least you learn something new each time you re-read it!)

 

Reactive programming - Wikipedia, the free encyclopedia (a bit academic as Wikipedia sometimes is, but good references at the end)

 

ReactiveX - Links to More Information (treasure trove of links)

bshang

Using PI Web API with Python

Posted by bshang Employee Jun 4, 2015

Motivation

 

Traditionally, accessing the PI System from the Python programming language can be challenging. See for example these threads.

Working with Pi Data from Python - Python version of Pi SDK?

AF SDK and Python (not IronPython)

Re: What is the simplest way to write to the snapshot table in python, without providing the timestamp.

Python & PI Code

Accessing the REST service from Nodejs or Python with authentication

Re: Using PISDK in Python: How do I access PISDK constants?

 

The primary Python language implementation, CPython, is based on C. The PI SDK, an older PI Developer Technology, is based on COM, while AF SDK, a widely-used PI Developer Technology, is based on .NET. These differences in platform require interfaces or other wrappers that must be used as a bridge, but they can be cumbersome to use.

 

For example, to use PI SDK within CPython, the pywin32 library can be used to allow Python to communicate with COM classes. For AF SDK, the adodbapi library can be used to access AF via PI OLEDB Enterprise. Additionally, Python for .NET may be an option but this has not been well explored. Regardless, PI SDK will be deprecated and PI OLEDB may not be suitable in some cases. PI ODBC and pyodbc looks interesting though and may be worth exploring more...they almost sound like they were made for each other...

 

An alternative approach is to use a .NET implementation of the Python language, such as IronPython. However, one of the strengths of Python is its wide variety of analytics libraries, such as numpy, scipy, pandas, and scikit-learn. These analytics libraries call into highly optimized C and Fortran numerical routines, but integration of these Python libraries into the .NET runtime via IronPython has been historically challenging.

 

PI Web API

 

The addition of PI Web API to the suite of PI Developer Technologies vastly increased the accessible surface area of PI from various programming platforms. One of the benefits of exposing a RESTful API is that much of the web has adopted this architectural style, and hence, the API communicates with programmers via a common "language" (i.e. HTTP). Programmers really need to just understand the basics of formulating an HTTP request and handing a response, and do not need to download custom SDK's or deeply interact with and understand that SDK's object model. Once the basics of HTTP are understood, then the rest is just implementation in the desired programming language (Of course, it is not "just" so simple. I'm looking at you, OAuth... )

 

In addition, RESTful API documentation all look fairly similar, so once PI Web API is understood, for example, it is easier to pick up API's from Facebook, Google, Esri, etc., enabling apps that can connect to multiple services and provide greater contextual experience for the user.

 

YAWAPP

 

This blog post will present YAWAPP (Yet Another Way to Access PI from Python). But hopefully, it will motivate some compelling use cases. As mentioned, RESTful API's can provide a standardized approach for data integration. Bringing PI System data into Python, particularly CPython, allows this data to be consumed by Python's extensive, optimized numerical libraries. Using PI Web API in Python can also be used to expose PI data in web and mobile applications via Python's popular web application framework, Django.

 

The complete source code examples for this post are hosted in the Github repo:

bzshang/piwebapi-python-examples · GitHub

 

The GitHub repo also contains the technical documentation of the code. This rest of this post will provide a higher level overview of the steps that could be followed to use PI Web API within Python.

 

Disclaimer: I am not a Python developer. I've used Python once 3 years ago and then only recently. But hopefully, I'll demonstrate how easy it is to get started when working with RESTful API's, even with little prior experience.

 

Which Python?

 

There are many distributions of Python language implementations. For example, here is a small list:

 

Here, I will be using the Python distribution from python.org (CPython), namely version 2.7.10 for Windows.

 

Which IDE?

 

There are many IDE's to choose from. As mentioned, I'm not a Python developer and previously, I hacked away with Notepad++... I decided to use a more extensive IDE this time and chose PyCharm (Community Edition). Why? It was the first one that came up in the google search , so you are free to explore your own.

 

I found PyCharm offers many features familiar to Visual Studio users (intellisense, code completion, static analysis, debugging, project file/package management, etc.), but the Community edition did not have the "streamlined" feel of Visual Studio, which can be expected. I will say it does seem to be more enjoyable to program in Python than C#...

 

Which Python packages?

 

I imported three packages into my example project.

 

requests is used as the HTTP client library and seems to be the most popular within the Python community. Amazon, Google, and even POTUS use it. json as the name suggests helps me deserialize JSON text into Python dictionaries and vice versa. bunch is a package that provides a wrapper class around Python dictionaries so I can browse the dictionary using dot notation (e.g. dict.key instead of dict["key"]), evocative of the C# anonymous type.

 

Overview of Examples

 

I assume the reader is somewhat familiar with the PI Web API already and its basic operations. Below are some introductory resources to get started:

KB01128 - Introduction to PI Web API

PI Web API Online Reference

Getting started with PI Web API 

Learn the Basics of PI Programming - Simple PI Web API Calls

 

I show how to browse AF databases, elements, and attributes using PI Web API along with the requests library. I also show how to retrieve the current value of an attribute, write to that attribute, and modify its properties. What is really shown, however, are how to perform the following using requests:

  • How to use different HTTP verbs (GET, POST, PATCH in my examples) using requests
  • How to pass in query string parameters in the URL
  • How to set request headers (i.e. Content-Type: application/json)
  • How to send JSON request payloads (body)
  • How to parse the incoming JSON using json and bunch

 

All of these are well-documented by the requests authors. It could be said that, in my examples, I just substituted their URL strings for ones relevant to PI Web API

 

Authentication

 

Something that is not at all trivial though is authentication via requests. Out of the box, it supports Basic authentication but not Kerberos. If PI Web API can only be authenticated via Kerberos, then the requests-kerberos package must also be installed.

 

Examples on GitHub

 

The GitHub repo bzshang/piwebapi-python-examples contains the examples and more technical details. One of the benefits of GitHub is that now, I can fix my bugs behind the reader's eyes

 

Please post your questions and comments in this PI Square blog post!

We are excited to present the TechCon Hackathon 2015 winners!

 

The theme of this year's Hackathon was Internet of Things (IoT), focused on personal fitness. Participants were encouraged to expand upon this idea and develop an app leveraging the PI System infrastructure to connect IoT devices.

 

The participants had 24 hours to create an app using any of the following technologies:

  • PI Future data
  • PI Web API 2015
  • PI Integrator for Esri ArcGIS and Esri API's
  • PI Integrator for BI and Azure Machine Learning

 

We provided three sources of data:

  • Fitbit data collected from OSIsoft employee volunteers over a course of 2 months
  • GPX data that tracked geolocation of users during a recorded activity (e.g. running, walking, cycling)
  • Weather data from World Weather Online API (including predictions)

 

Participants were encouraged to connect PI to other data sources as well.

 

Our judges evaluated each app based on their creativity, technical content, commercial viability, enterprise readiness, and UI design.

 

It is a tough challenge to create an app in 24 hours, and we are amazed at all of the awesome apps that were developed!

 

Congratulations to everyone who participated!

 

 

Without further ado, here are the winners!

 

1st place - Almost No Borders

 

The team members were:

Artur Okolity

Ken Morikawa

Chien SiHarriman

Gregor Vilkner

 

 

For 1st place prize, each participant received a BLADE 350 QX3 Quadcopter!

drone2.PNG

Team Almost No Borders developed a Mobile Laboratory Platform (MLP). MLP is a cloud-based application that allows scientists, engineers, and medical staff to use their mobile devices paired with sophisticated sensors to make field measurements and share these measurements with peers around the globe. Mobile devices, such as smartphones and tablets, act as field gateways that connect locally to one or more sensors. These devices can perform measurements and continuously stream data to the MLP for processing and storage. Remote users can then browse data from past experiments/measurements and connect to sessions in real-time.

 

The team demonstrated live that users could sign into MLP on their phones, register their device, and connect to the MLP platform. The field devices were Windows Phones connected to Microsoft Band via Bluetooth LE. The AF Server was used as an infrastructure to host MLP and track users and devices. Once connected, users can start measurement sessions and carry out experiments. Remote personnel can sign in to the MLP website, browse archived sessions, view reports, browse currently connected devices, and observe measurements in real-time.

 

Almost No Borders hosted the MLP in Azure. The team used the following technologies:

  • Microsoft Azure
  • Windows Phone 8.1
  • Microsoft Band SDK
  • ASP.NET MVC 5 + Web API 2
  • ASP.NET Signal R
  • PI Web API

 

Here are screenshots of the design they developed!

afstructure.PNGattribute1.PNG

 

Look at the data they were able to get from the Band device!

bandattributes.PNG

 

 

2nd place - FitSquare

 

The team members were:

Ionut Buse

Gaёl Cottet

Jean-Francois Beaulieu

 

For 2nd place prize, each participant received a FitBit ChargeHR!

 

Team FitSquare created the FitSquare app, created to build a real Sport Community aimed at promoting collaboration over competition. Their mobile app allows users to plan their sport training sessions, check if their sessions are risky or not, ask for assistance, and view stats based on sensors.

 

FitSquare used the following technologies:

  • ASP.NET MVC 5
  • Twitter Web API
  • Esri Maps
  • PI Web API
  • PI Future Data

 

Here is the design for planning a sport session.

planning_session.PNG

 

Using weather predictions stored as PI Future Data, the app estimates the risk level of the planned activity.

risky.PNG

 

FitSquare also provides real-time assistance via Twitter, as shown below.

assistance.PNG

 

Users can also view their stats via PI Web API and AF Event Frames.

stats.PNG

 

3rd place - Unhandled Exception II: Judgment Day

 

The team members were:

Sarven Buyuker

Alan Kenyon

Jerry Vin

 

For 3rd place prize, each participant received a OSIsoft gear and apparel!

 

Team Unhandled Exception created a fitness achievement application and used it to present the 2015 OSIsoft Fitness Awards. They used the Fitbit data we collected from OSIsoft employees and developed a web app with a sleek, responsive UI that showed the top performers in each fitness category.

  • Distance traveled
  • Steps taken
  • Calories burned
  • Active time
  • Sleep time
  • Sedentary time

 

The team used PI Web API to access the fitness data, along with front-end technologies such as:

  • HTML5
  • Javascript (jQuery, Chart.js)
  • CSS3
  • Bootswatch (a Bootstrap modification)
  • PI Web API

 

Without further ado, here some screenshots of the awards as presented by Unhandled Exception!

titlescreen.PNGawards.PNG

walker.PNGgasguzzler.PNG

Did you know the PI System can be the perfect infrastructure to track real-time tweets and analyze them with Asset Framework?

 

Here, I will show how easy it is to write a C# Windows Service that

 

1) signs up for updates from Twitter using the Twitter Streaming API

2) filters tweets by topic

3) stores them in real-time into PI

 

The example files are posted below. They consist of

 

1) Visual Studio 2013 solution containing the Windows Service (PI Twitter Service)

2) A "starter" AF Element Template (XML file)

 

Here is a sneak preview of what our service does

diagram.PNG

 

Our service collects tweets in real-time from Twitter and distributes them to the corresponding AF elements and attributes based on a configured topic.

 

 

Twitter Streaming API

 

We use the Twitter Streaming API to collect tweets for our PI System in real-time.

 

What is it?

 

The Twitter Streaming API is a RESTful API that provides a way to receive a sample (~1%) of all public tweets via Twitter's Public API. For more information, please visit Twitter Streaming API. There is also a Firehose API that provides all (non-sampled) public tweets but this requires a higher level of access.

 

A request for tweets via the Streaming API can be thought of as a very long-lived HTTP request. The client processes the resulting response (JSON) incrementally. There is no need to continuously poll a REST API endpoint. We can think of it as similar to downloading an infinitely long file over HTTP. Twitter will keep the connection open barring any network interruptions.

 

First steps: Registering the Twitter app

 

Before the Streaming API, or any of Twitter's APIs can be used, we need to register our (third-party) Twitter application.

 

1) Create a new Twitter app. Sign in to your Twitter account and navigate to Twitter Application Management. Then, click Create New App.

createnewapp.PNG

 

2) Enter app details. Follow the instructions for entering in the Name, Description, Website, and CallbackURL. For Website, we can just use https://www.google.com, and leave CallbackURL blank. The latter two fields are not important for our purposes since our app will be a Windows Service, rather than web application.

registerapp.PNG


3) Setup the keys and tokens. After registering the app, click on the "Keys and Access Tokens" tab. Then, generate the consumer key, consumer secret, access token, and access token secret. Please save these values,

 

Detail note: These keys and tokens are used because Twitter uses OAuth to authenticate our applications and allow users to authorize these applications to access user information. If you've ever registered for a website using "Sign in with Google+" or "Sign in with Facebook", that was likely done using OAuth. For our purposes, we will not need to know the inner workings of OAuth, but only recognize the context it is being used in. We'll see that our wrapper libraries will handle the authentication in the background for us.

 

Tweetinvi - .NET Libary for Twitter Streaming API

 

Tweetinvi is a .NET library that encapsulates the functionality of the Streaming API and exposes them as a set of simpler classes and methods, so the client code does not need to be burdened by lower level tasks, such as performing the OAuth workflow, creating HTTP clients, executing requests, and parsing responses. For Tweetinvi details, please visit their CodePlex page Tweetinvi - a friendly Twitter C# API. We will use this library in our Windows service application. As part of the attached Visual Studio solution, we've installed Tweetinvi 0.9.6.1 from the NuGet (search "Tweetinvi").


NOTE: .NET Framework 4.0 or higher is required.


 

PI Twitter Service

 

PI Twitter Service is a Windows Service included in this Visual Studio solution that signs up for Twitter Streaming API updates and sends the tweet text and associated information (creator, hashtags, urls) to PI. The full Visual Studio 2013 solution is at the bottom of this post, along with a sample AF Element Template. In addition to demonstrating how to send real-time tweets to PI, other goodies include:

 

1) An example implementation of the Observer pattern, typically useful for "dataflow" type apps.

2) An example of a Windows Service that allows:

    a) To be run interactively by checking Environment.UserInteractive variable. Therefore, you can run this app as a service or via the console.

    b) To be debugged in Visual Studio IDE. Not a trivial issue as explained here: How to: Debug Windows Service Applications

 

Our app does many things, but this post will not touch on all aspects. For learning how to create a Windows Service, MSDN is a great resource - How to: Create Windows Services. For the Observer pattern, MSDN also provides a good resource too - Observer Design Pattern.


AF Database design


An AF database is used to organize our streaming tweets. The service application writes to AF attributes belonging to AF elements that match the "topic" of the tweet. Let's look at the AF structure now.


I've created an AF database called Twitter. It contains one AF Element Template, called "Twitter Topic Template". In this design, each AF element presents a "topic" and that topic is determined by its set of filters, or keywords (see Keywords AF Attribute in image below). In essence, our Windows Service will filter the tweets and redirect them to the AF element that contains the matching keywords. Then, tweet information is written to the AF attributes (PI Point Data References). The AF attributes are shown below.

afattributes.PNG

 

The category "Configuration" stores the keywords, represented by a string array. Any tweets containing words matching our specified keywords will be have its information stored under the AF attributes for this AF element. The category "Tweet Data" stores all of the information associated with a tweet. For example, if our keywords were "datascience" and "data science", then any tweets containing these keywords will be written to attributes belonging our "Data Science" AF element. Therefore, we can think of an AF element as an abstraction providing the infrastructure to track tweets of a given topic. Here are the AF elements (topics) I've decided to track.

elementstotrack.PNG

 

Observer pattern implementation

 

We make use of the Observer pattern (the .NET implementation of it), which is suggested by the first diagram above. There is an observable TwitterStream object representing all of our input tweets that we receive from Twitter. Different AF elements can subscribe to this stream, via an observer TwitterPipe object.

 

TwitterStream Observable

 

The TwitterStream object uses the Tweetinvi .NET library to establish a streaming HTTP connection with Twitter's servers. Let's look at the TwitterStream class below.

 

    public class TwitterStream : IObservable<ITweet>
    {
        private IFilteredStream _internalStream;
        private IList<IObserver<ITweet>> _pipes;

        public TwitterStream()
        {
            string accessToken = ConfigurationManager.AppSettings["accessToken"];
            string accessTokenSecret = ConfigurationManager.AppSettings["accessTokenSecret"];
            string consumerKey = ConfigurationManager.AppSettings["consumerKey"];
            string consumerSecret = ConfigurationManager.AppSettings["consumerSecret"];

            TwitterCredentials.SetCredentials(accessToken, accessTokenSecret, consumerKey, consumerSecret);

            this._internalStream = Stream.CreateFilteredStream();
            this._internalStream.AddTweetLanguageFilter(Language.English);

            this._pipes = new List<IObserver<ITweet>>();

            //Signup the delegate for the MatchingTweetReceived event
            this._internalStream.MatchingTweetReceived += (sender, args) =>
            {
                ProcessTweet(args);
            };
        }

        //IObservable
        public IDisposable Subscribe(IObserver<ITweet> pipe)
        {
            if (!_pipes.Contains(pipe))
            {
                _pipes.Add(pipe);
            }
            return new Unsubscriber();
        }

        public void AddTrack(string s)
        {
            this._twitterStream.AddTrack(s);
        }

        public async void StartStreamAsync()
        {
            await this._internalStream.StartStreamMatchingAnyConditionAsync();
        }

        public void StopStream()
        {
            _internalStream.StopStream();
        }

        private void ProcessTweet(MatchedTweetReceivedEventArgs args)
        {
            //Update observers
            foreach (var observer in _pipes)
            {
                observer.OnNext(args.Tweet);
            }
        }
    } //end class



















 

We have two fields.

  • IFilteredStream _internalStream is our interface to the private internal stream in Tweetinvi's library.
  • IList<IObserver<ITweet>> _pipes is the list of observers that sign up for updates from this TwitterStream.

 

Our constructor does the following:

  1. Read our keys and tokens from the app.config file. Lines 8-11
  2. Pass these credentials to Tweetinvi. Line 13
  3. Create a filtered Twitter stream exposing IFilteredStream. Line 15
  4. Specify to only receive tweets in English. Line 16 (but you can choose any language you want!)
  5. Provide a delegate to be called by the IFilteredStream when a matching tweet is received. Line 21.

 

Our public TwitterStream exposes methods to

  1. Subscribe to the stream via the Observer pattern. Line 28
  2. Add a filter via AddTrack(). Line 37
  3. Start/stop the stream. Lines 42 and 47.
  4. Process an incoming tweet via ProcessTweet() (used in our delegate instance). Inside this method, it just calls observer.OnNext() in accordance with the .NET Observer pattern. Line 52

 

Detail note: A "track" is Twitter API's terminology for a filter by keywords. One track is configured by providing a list of keywords, separated by spaces. For Twitter to send the tweet to the client stream, the tweet must contain at least ALL of the keywords in that track. A stream can have multiple tracks. If so, then the client can specify whether Twitter should send tweets that match at least one track's keywords or it must send tweets that match the keywords of every track. Since we are starting the stream by calling StartStreamMatchingAnyConditionAsync(), we receive tweets that match at least one track's keywords.

 

TwitterPipe Observer

 

Let's look at a stripped down version of our observer TwitterPipe class now. It writes to 5 AF attributes. The full implementation is in the attached Visual Studio solution.

    public class TwitterPipe : IObserver<ITweet>
    {
        private AFElement _element;
        private IList<string> _filters;

        public TwitterPipe(AFElement element)
        {
            this._element = element;
            this._filters = new List<string>();
            AFAttribute keywordsAttr = _element.Attributes[Attributes.Keywords.ToString()];
            string[] keywords = keywordsAttr.GetValue().Value as string[];
            this._filters = keywords.ToList<string>();
        }

        public void SubscribeToStream(TwitterStream twitterStream)
        {
            twitterStream.Subscribe(this);
            foreach (string s in this._filters)
            {
                twitterStream.AddTrack(s);
            }
        }

        public virtual void OnNext(ITweet tweet)
        {
            string tweetText = tweet.Text;

            //Check if this tweet matches pipe's filter
            bool isMatched = IsMatchingTweet(tweetText);
            if (isMatched)
            {
                //Get timestamp of tweet
                AFTime timestamp = new AFTime(tweet.CreatedAt);


                //Extract tweet info and assign to AFValue and AFAttribute
                AFValue text = new AFValue(_element.Attributes["Text"], tweet.Text, timestamp);
                AFValue creator = new AFValue(_element.Attributes["Creator"], tweet.Creator.Name, timestamp);      
                AFValue location = new AFValue(_element.Attributes["Location"], tweet.Creator.Location, timestamp);

                string[] hashTagArray = tweet.Hashtags.Select<IHashtagEntity, string>(p => p.Text).ToArray<string>();
                string hashtags = String.Join(" ", hashTagArray);
                AFValue tags = new AFValue(_element.Attributes["Hashtags"], hashtags, timestamp);

                string[] urlArray = tweet.Urls.Select<IUrlEntity, string>(p => p.URL).ToArray<string>();
                string sUrl = String.Join(" ", urlArray);
                AFValue urls = new AFValue(_element.Attributes["Urls"], sUrl, timestamp);

                IList<AFValue> vals = new List<AFValue>();
                vals.Add(text);
                vals.Add(creator);
                vals.Add(location);
                vals.Add(tags);
                vals.Add(urls);

                AFListData.UpdateValues(vals, AFUpdateOption.Insert, AFBufferOption.BufferIfPossible);
              }
        }

        private bool IsMatchingTweet(string text)
        {
            bool isMatched = false;
            foreach (string s in this._filters)
            {
                isMatched = text.IndexOf(s, StringComparison.OrdinalIgnoreCase) >= 0;
                if (isMatched) return true;
            }
            return false;
        }

        public void OnCompleted()
        {
            throw new NotImplementedException();
        }

        public virtual void OnError(Exception e)
        {
            throw new NotImplementedException();
        }
    } //end class
















 

We have two fields.

  • AFElement _element is the element representing our "topic". For example, this would correspond to our "Big Data", "IoT", and "Smart Cities" elements in our diagram above.
  • IList<string> _filters is the list of filters for our topic. For example, for an "IoT" topic, the list could be {"IoT", "Internet of Things"}.

 

Our constructor does the following:

  1. Sets the AFElement field. Line 8
  2. Reads the "Keywords" AF attribute and stores them in our IList<string> _filters variable. Lines 9-12.

 

Our TwitterPipe exposes methods to

  1. Add the pipe to the list of subscribers of TwitterStream via SubscribeToStream(TwitterStream stream). Line 15.
  2. Process incoming tweets using OnNext() via Observer pattern implementation. Line 24
  3. Uses the private method IsMatchingTweet(string text) to check if incoming tweet matches any of the pipe's filters. Line 59

 

Detail note: When the TwitterPipe subscribes to the TwitterStream, it adds all of its filters as tracks for the TwitterStream. Therefore, the TwitterStream contains tracks for all of the filters from all subscribed pipes. When an incoming tweet arrives from Twitter's servers, the tweet will match the filter criteria for at least one of the pipes, but the stream will send the tweet to all pipes. It is up to the pipe via IsMatchingTweet(string text) to decide if it matches the set of filters that define the AF element's topic.

 

How are the Observers and Observable linked together?

 

This is done via an AFObservers class.

 

    public class AFObservers
    {
        private AFElements _elements;

        public AFObservers(string server, string database)
        {
            PISystem ps = new PISystems()[server];
            AFDatabase db = ps.Databases[database];
            this._elements = db.Elements;
        }

        public void AddSubscribers(TwitterStream twitterStream)
        {
            foreach (AFElement element in this._elements)
            {
                TwitterPipe pipe = new TwitterPipe(element);
                pipe.SubscribeToStream(twitterStream);                  
            }
        }
    }















 

It contains an AFElements collection as a field representing our list of topics to track. From this element list, via AddSubscribers(), we create the TwitterPipe and subscribe it to the passed in TwitterStream.

 

After establishing these links, all we need to do is call StartStreamAsync() on the TwitterStream class to begin collecting tweets!

 

It would take too long to go through every detail in this Windows Service app. Please take a break if you managed to read this far . The explanations above should provide a good way to get started when looking through the Visual Studio source code.

 

 

Results


Let's view all the tweets we collected on the topic of "Machine Learning".

ml_tweets.PNG

Instead of tweets, we can also look at hash tags.

ml_hashtags.PNG

There are many possibilities now that we can collect tweets in real-time using PI. We can use Asset-Based Analytics and Event Frames, perhaps to detect when a certain topic goes viral. We can create new apps with PI that perform sentiment analyses, track trends, and offer personalized news or daily notification summaries.

 

Something to be explored is integration with Esri ArcGIS maps for visualizing tweets spatially in real-time. The current limitation is that only a very small percentage of tweets are geo-enabled (i.e. they send latitude/longitude along with the tweet). It is possible to extract the user profile location to guess at the geo-coordinates, but this information is often sparse and unclean, as shown below.

location.PNG

Some clever engineering can probably overcome this and allow us to infer or guess at the latitude and longitude. With the PI Integrator for Esri, we would then be able to visual the tweets on a map as they arrive in real-time. That will be a project for next time.

 

 

What if we sent all (sampled) tweets to PI?

 

Instead of applying filters or tracks, we can have Twitter send us a sample (~1%) of all the real-time tweets. This amounts to about 60 tweets/second. Can the PI server handle this load?

 

Of course! Here were the stats I found:

1) pibufss max events in queue: ~5

2) pisnapss avg. CPU usage: 0.6%

3) piarchss avg. CPU usage: 0.3%

 

Of course, there are more metrics to look at, but for the PI Server, this Twitter stream hardly makes a blip. It would be interesting to see how the PI Server would respond to the Twitter Firehose API, which sends ALL (no sampling) public tweets. This means about 6000 tweets/sec. My guess is that the server will fare just fine

 

I hope to have demonstrated a non-traditional, but fun use case for the PI Server. Although traditionally seen as a "process historian", our wonderful PI Server can do so much more. It can really be a very general and scalable infrastructure for any sort of high frequency real-time information, whether the data is coming from an OPC server or a streaming RESTful API.

 

If you find this post interesting, we invite you to join our Programming Hackathon at TechCon 2015.

 

Registration is now open!

hackathon_sig.png

Interested to see how the PI System and Esri ArcGIS maps can be used to show cool visualizations like the one below?

route.PNG

 

At OSIsoft, we’ve developed an internal application PI Fitness to store geolocation data (such as latitude and longitude) collected during activities such as running, walking, and even mountain biking!

Other than secretly spying on co-workers’ weekend activities , it is a great tool for us to find new exercise routes or discover shared interests. Using Asset-Based Analytics, we've also created a "leaderboard" of total points to provide some competitive flavor.

 

The architecture is like such:

 

pifitness.png

One of the data sources is geolocation data stored as GPX files. GPX is an XML schema for exchanging GPS and related data. Free mobile apps such as RunKeeper and Strava allow you to track your geolocation during an activity and later export them as GPX files. Wearable devices such as Garmin watches also expose their data as GPX files.

 

We can bring this data into PI and leverage Esri Maps Javascript API to share this data via a website.

 

The first part of this blog series will show how to parse the GPX file and use the PI System to store this information and bring more context. The next part will show how to retrieve this data from PI and display routes on a map using the Esri Maps API.

 

The full Visual Studio 2013 solution is attached below, along with an AF Element template and sample GPX file.

 

Special thanks to Paul Pirogovsky for creating the GPX parser!

 

Taking a peek at the GPX file

 

The GPX file is actually very simple. It is just an XML text file with a specific schema. Here’s a sample below:

 

<?xml version="1.0" encoding="UTF-8"?>
<gpx
  version="1.1"
  creator="RunKeeper - http://www.runkeeper.com"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns="http://www.topografix.com/GPX/1/1"
  xsi:schemaLocation="http://www.topografix.com/GPX/1/1 http://www.topografix.com/GPX/1/1/gpx.xsd"
  xmlns:gpxtpx="http://www.garmin.com/xmlschemas/TrackPointExtension/v1">
<trk>
  <name>Running</name>
  <time>2015-03-30T02:34:18Z</time>
<trkseg>
<trkpt lat="37.808881000" lon="-122.256421000"><ele>9.0</ele><time>2015-03-30T02:34:18Z</time></trkpt>
<trkpt lat="37.808849000" lon="-122.256279000"><ele>9.2</ele><time>2015-03-30T02:34:28Z</time></trkpt>
</trkseg>
</trk>
</gpx>












 

You can see it stores a timestamp, latitude, longitude, and elevation.

 

Generate C# classes to represent GPX data

 

Our goal is to generate C# classes that will encapsulate the GPX data after it's been deserialized. We can use the tool xsd. From the Visual Studio Developer Command Prompt, run

 

xsd GpxXmlType.xml

 

(I named my sample file GpxXmlType.xml)

 

Then run

 

xsd GpxXmlType.xsd /classes

 

This generates GpxXmlType.cs, which you want to include in your project.

 

To deserialize, we can run

 

string fileContents = File.ReadAllText(pathToFile);

XmlSerializer xmlSerializer = new XmlSerializer(typeof(GpxXmlType));
using (TextReader reader = new StringReader(fileContents))
{
        GpxXmlType _gpxData = xmlSerializer.Deserialize(reader) as GpxXmlType;
}








 

Once we have _gpxData, we can navigate through the properties of _gpxData to browse the XML structure.


To assist us later, we have also created a simple POCO (plain ol' C# object) called TrackPoint for storing a coordinate event, representing a 4-tuple of Timestamp, Elevation, Latitude, and Longitude.

 

public class TrackPoint
{
    public DateTime Timestamp { get; set; }
    public double Elevation { get; set; }
    public double Latitude { get; set; }
    public double Longitude { get; set; }

    public TrackPoint(DateTime timestamp, double elevation, double latitude, double longitude)
    {
        this.Timestamp = timestamp;
        this.Elevation = elevation;
        this.Latitude = latitude;
        this.Longitude = longitude;
    }
}






 

 

AF Asset Structure

 

Before parsing the GPX data, it is good to decide what our AF structure will look like. Here is mine below.

 

af_structure.PNG

 

I created a "My Routes" element and four attributes underneath. Each attribute corresponds to a data stream exposed in the GPX file.

 

Also, we will create an enumeration type, where the names reflect the AF attributes that store our data. This makes it easier to key each AFValues collection and associate them with an AF Attribute.

public enum GpxAttributes
{
    Activity,
    Elevation,
    Latitude,
    Longitude
}





 

We will also want to create an AF Event Frame for each route to store the start and end times. This makes retrieval of the route data from PI easier later on, as we have a start/end time to "bookmark" the data. It also enables us to perform configured analytics should we want to do so, given the time context provided by an event frame.

eventframes.PNG

To bring data into PI from GPX, we will use our custom GpxXmlType class (generated from xsd) and AF SDK to send these values to the PI Data Archive.

 

Parsing the XML into AFValues


First, we will create a client class that services calls to read GPX data and convert them to AFValue objects. It is called GpxToPIClient.

 

public class GpxToPIClient
{
    private GpxParser _parser;
    public GpxToPIClient(GpxParser parser)
    {
          _parser = parser;
    }

    public IDictionary<GpxAttributes, AFValues> GetTimeSeries()
    {
          return _parser.ParseString();
    }
}






 

It has one dependency GpxParser and exposes one method GetTimeSeries(). GpxParser does the actual work. Let's take a look at its header and constructor.

 

public class GpxParser
{
    private GpxXmlType _gpxData;
    private string _filePath;

    public GpxParser(string filePath)
    {
          this._filePath = filePath;
          string fileContents = File.ReadAllText(this._filePath);

          XmlSerializer xmlSerializer = new XmlSerializer(typeof(GpxXmlType));
          using (TextReader reader = new StringReader(fileContents))
          {
              _gpxData = xmlSerializer.Deserialize(reader) as GpxXmlType;
          }
    }
    ...
}






 

GpxParser holds a reference to GpxXmlType, our class for holding XML data. The ParseString() method on GpxParser browses the XML data via GpxXmlType, creates AFValues, and updates them to the PI System. Let's take a look at the full method below.

 

internal IDictionary<GpxAttributes, AFValues> ParseString()
{
    string _activityType = GetActivityType();

      //pull all geopositional data into the class
      List<TrackPoint> _wayPoints = new List<TrackPoint>();
      foreach (trksegType trackSeg in _gpxData.trk[0].trkseg)
      {
            foreach (wptType waypoint in trackSeg.trkpt)
            {  
                wayPoints.Add(new TrackPoint(waypoint.time, (double)waypoint.lon, (double)waypoint.lat, (double)waypoint.ele));
            }
      }

      //create AF objects
      AFValues _elevationValues = new AFValues();
      AFValues _latitudeValues = new AFValues();
      AFValues _longitudeValues = new AFValues();
      AFValues _activityValues = new AFValues();

      foreach (TrackPoint point in _wayPoints)
      {
            AFTime timeStamp = new AFTime(point.Timestamp);
            _elevationValues.Add(new AFValue(point.Elevation, timeStamp));
            _latitudeValues.Add(new AFValue(point.Latitude, timeStamp));
            _longitudeValues.Add(new AFValue(point.Longitude, timeStamp));
      }

      //update the activity tag
      DateTime temp_time = (from pt in _wayPoints
                            orderby pt.Timestamp ascending
                            select pt.Timestamp).FirstOrDefault();

      _activityValues.Add(new AFValue(_activityType, temp_time));

      //reset the activity tag to "Idle"
      temp_time = (from pt in _wayPoints
                    orderby pt.Timestamp descending
                    select pt.Timestamp).FirstOrDefault();

      //increment by one second
      temp_time = temp_time.AddSeconds((double)1.0);
      _activityValues.Add(new AFValue("Idle", temp_time));

      //create output dictionary
      IDictionary<GpxAttributes, AFValues> dict = new Dictionary<GpxAttributes, AFValues>();
      dict.Add(GpxAttributes.Activity, _activityValues);
      dict.Add(GpxAttributes.Elevation, _elevationValues);
      dict.Add(GpxAttributes.Latitude, _latitudeValues);
      dict.Add(GpxAttributes.Longitude, _longitudeValues);

      return dict;
}







 

Whew! That is a long method. It certainly doesn't follow the SRP (Single Responsibility Principle) but it does the trick

 

All we are doing is using our member variable _gpxData to load the XML data and create AFValues from it. We are using our enumeration GpxAttributes to key the AFValues collections. One detail is that we want to update our AFValues denoting the Activity (running, walking, etc.) by resetting it to "Idle" once we are done with the actual activity.

 

Creating Event Frames

 

AFEventFrames are perfect objects for providing a time context for our routes. Without them, it would be difficult to determine from the raw PI Point events when one activity starts and ends. We would need to look for transitions within the Activity AF Attribute which can be cumbersome. With event frames, though, it is easy to retrieve the relevant time series data for an activity. We simply get the start/end time for the event frame and pass that into RecordedValues to get the associated timeseries data.

 

Using event frames to store activity meta data, it is also possible to perform additional statistics on each activity, such as average speed, distance, and total time. We can also perform statistics on multiple event frames, looking at the distribution of speeds and distances based on activity and see how they vary over time.

 

We've created a custom EventFrameHelper class to create an event frame from each parsed GPX file. This class also extracts out a custom Route object from our stored AF data. The Route object can be conveniently used by the Esri Maps API to show the route on a map. We won't show the full methods here, but just the class stub. Please take a look at the full Visual Studio solution posted below for details.

 

public class EventFrameHelper
{
    public EventFrameHelper()
    public AFEventFrame CreateEventFrame(AFDatabase db, IDictionary<GpxAttributes, AFValues> dict, AFElement primaryRefElement)
    public IList<Route> GetRoutes(AFElement element, object startSearchTime, object endSearchTime)
}




 

The GetRoutes method returns a list of Route objects. The Route class combines the AF Event Frame metadata with the PI timeseries data. Here is the class below.

public class Route
{
    public string Name { get; set; }
    public DateTime StartTime { get; set; }
    public DateTime EndTime { get; set; }
    public IList<TrackPoint> Coordinates{ get; set; }

    public Route(string name, DateTime startTime, DateTime endTime, IList<TrackPoint> coordinates)
    {
          this.Name = name;
          this.StartTime = startTime;
          this.EndTime = endTime;
          this.Coordinates = coordinates;
    }
}




 

 

Sample console application

 

Below is a console application that shows how to use every class and method that we have described above. It demonstrates the following:

 

1. Set up the GPX parser classes

2. Parse the file and return a keyed collection of AFValues

3. Send these values to PI

4. Create an AFEventFrame from the GPX data

5. Retrieve the data from PI and create a Route class

 

class Program
{
     static void Main(string[] args)
     {
          //Set up gpx parser
          string filePath = @"C:\example.gpx";
          GpxParser parser = new GpxParser(filePath);
          GpxToPIClient rkClient = new GpxToPIClient(parser);

          //Parse the Gpx file and return a dictionary of AFValues
          IDictionary<GpxAttributes, AFValues> rkData = rkClient.GetTimeSeries();

          //Update data to PI System
          PISystem ps = new PISystems().DefaultPISystem;
          AFDatabase db = ps.Databases["My Activities"];
          AFElement element = db.Elements["My Routes"];

          foreach (var stream in rkData)
          {
              AFValues vals = stream.Value;
              AFAttribute attr = element.Attributes[stream.Key.ToString()];
              AFErrors<AFValue> errors = attr.Data.UpdateValues(vals, AFUpdateOption.Replace);   
          }

          //Create AFEventFrame on AF server
          EventFrameHelper efHelper = new EventFrameHelper();
          AFEventFrame eventFrame = efHelper.CreateEventFrame(db, rkData, element);

          //Retrieve routes from last 60 days
          IList<Route> routes = efHelper.GetRoutes(element, "*-60d", "*");

          Console.WriteLine("Press any key to exit");
          Console.ReadKey();
      }
}

 

 

Want to play with our data? Come join the Programming Hackathon!

 

At PI Fitness, we've already collected over 300 routes!

 

Here is just a small gallery:

 

Running                                                                                              Mountain Biking

marina.PNGmoutain_biking.PNG

Walking                                                                                              Running

commute.PNGrunning.PNG

 

 

If you want to play with the data or learn more, please join our Programming Hackathon!

 

Registration is now open!

hackathon_sig.png

 

 

Next time, Marcos Vainer Loeff will continue with Part 2 of this blog series and show how to display data from PI using Esri ArcGIS maps!

Filter Blog

By date: By tag: