Marcos Vainer Loeff

Using data pipes with future data in PI AF SDK 2.7

Blog Post created by Marcos Vainer Loeff Employee on Oct 6, 2014

PI Data Archive and PI AF SDK 2015 Beta are released!

As you probably already know, PI AF SDK 2015 Beta and PI Data Archive 2015 Beta were released during the UC EMEA 2014 on Lisbon.


Concerning the server-side, PI Data Archive 2015 Beta release introduces two of the major features of PI Server 2015:

  • The ability to store time series data with future time stamps and
  • The ability to migrate PI Batch.

According to the release notes of the PI AF Client 2015, this product release adds a number of new features.  Major enhancements include:

  • Support for Future Data in the PI Data Archive through the AFDataPipe, AFDataCache, and new methods for specifically differentiating current value from end-of-stream values.
  • An update to the security model to make it similar to match the model exposed by the PI Data Archives.  ACL’s on AF Objects are no longer be tied directly to Window’s principal, but are instead defined using AF defined identities.

I was interested in writing something about this new security model. That is why I have opened PI AF Explorer and tried to find out where Identities and Mapping were located. I found them under PI Asset Server Properties, but when I clicked on them, I found out that I had to wait a little to use this feature because of what is shown on Figure 1.




Figure 1 – Screenshot of PI Asset Server Properties of PI System Explorer 2015 Beta.


AF Identities and Mapping cannot be used unless you update your PI AF Server to 2015 Beta, which is not released yet.


This is why I have decided to write about the first major features of PI AF SDK 2015 Beta Support for Future Data.

What are future PI Points?

In this new PI Data Archive release, a new pipoint attribute called “future” was introduced. If this value is set to 0, the PI Point will be an “historical tag“ behaving as in previous versions of this product. This tag will reject any data with timestamps more than 10 minutes. If this attribute is set to 1, then this PI Point will accept future data with timestamps up until January 2038. The PI SMT 2015 Beta which is installed with PI Data Archive 2015 Beta is able to create and set future PI Points. To learn more about Future Data in PI Data Archive 2015, please refer to the “Guide to PI Data Archive 2015 Beta” manual.

Preparing the environment

I have decided to use two PI Points that I have already created for the Programming Hackathon from vCampus Live! 2013: Weather_Albany_Pressure and Weather_Albany_Pressure Prediction. The first PI Point with future attribute set to 0 was used to store the historical values from the pressure in the city of Albany. The second PI Point with future attribute set to 1 was used to store the 15 days forecast of the pressure in the same city.


Let’s now create a test AF database (in my case “AFSDK Test”) with one element called “DataPipe” which has two attributes:

  • Historical à refers to the ”Weather_Albany_Pressure” PI Point.
  • Future à refers to the ”Weather_Albany_Pressure Prediction” PI Point.

Please refer to Figure 2 of the new AF objects created.




Figure 2 – AF database created for this blog post.

Changes in OSIsoft.AF.PI

In order for the PI AF SDK to handle future PI Points, a few changes were made to the OSIsoft.AF.PI namespace. The main ones are:

  • The property Future was introduced in PIPoint class in order to advise if the PI Point is a future tag or not.
  • The method Snapshot( ) is deprecated, and if called, it will be routed to CurrentValue().
  • The method CurrentValue() was added to the PIPoint class.
    • For historical tags, this will result on the snapshot that we are all familiar with.
    • For future tags, it will result on a value with a current timestamp calculated by interpolation if ‘step’ attribute is off.
    • The method EndOfStream () was added to the PIPoint class.
      • For historical tags, this will return the same AFValue object of CurrentValue().
      • For future tags, it will result on a AFValue which is in the end of stream (last event of the time series).

We can find out these new features through the code snippet below:

        static void Main(string[] args)
            PIServer myServer = new PIServers()["MARC-RRAS"];
            string[] myPIPointNames = new string[] { "Weather_Albany_Pressure", "Weather_Albany_Pressure Prediction" };
            IList<PIPoint> myPIPoints = PIPoint.FindPIPoints(myServer, myPIPointNames.AsEnumerable());
            foreach (PIPoint myPIPoint in myPIPoints)
                if (myPIPoint.Future)
                    AFValue CurrentValue = myPIPoint.CurrentValue();
                    AFValue EndOfStream = myPIPoint.EndOfStream();
                    Console.WriteLine("Current value of future PI Point {0} is {1}", myPIPoint.Name, CurrentValue.Value.ToString()); //Snapshot is deprecated
                    Console.WriteLine("End of stream value of future PI Point {0} is {1}", myPIPoint.Name, EndOfStream.Value.ToString()); //Snapshot is deprecated
                    AFValue CurrentValue = myPIPoint.CurrentValue();
                    AFValue EndOfStream = myPIPoint.EndOfStream();
                    Console.WriteLine("End of stream value of historical PI Point {0} is {1}", myPIPoint.Name, EndOfStream.Value.ToString()); //Snapshot is deprecated
                    Console.WriteLine("Current value of historical PI Point {0} is {1}", myPIPoint.Name, CurrentValue.Value.ToString()); //Snapshot is deprecated

Developing a solution to monitor AFDataPipes for future data

We are going to create a new solution with two projects. The first project is responsible for sending new values to the PI Data Archive and the second project is responsible for detecting data.


The first program will send six values to the historical PI Point from 100 to 105 and another six values from 200 to 205 to the future PI Point. There will be six iterations where on each iteration one value is going to be sent to the historical PI Point and one value to the future PI Point. For each iteration, the program will wait 10 seconds to generate new values with different timestamps. The historical values are always sent with the current timestamp while the future values has an offset of 10 minutes towards the future.

static void Main(string[] args)
            PISystem myPISystem = new PISystems()["OSI-SERV"];
            AFAttribute myHistoricalAttribute = AFObject.FindObject(@"\\OSI-SERV\AFSDK Test\DataPipe|Historical") as AFAttribute;
            AFAttribute myFutureAttribute = AFObject.FindObject(@"\\OSI-SERV\AFSDK Test\DataPipe|Future") as AFAttribute;

            for (int i = 0; i < 7; i++)
                AFValue myNewHistoricalValue = new AFValue(i + 100, new AFTime("*"));
                AFValue myNewFutureValue = new AFValue(i + 200, new AFTime("*+10m"));

                myHistoricalAttribute.Data.UpdateValue(myNewHistoricalValue, OSIsoft.AF.Data.AFUpdateOption.Replace);
                myFutureAttribute.Data.UpdateValue(myNewFutureValue, OSIsoft.AF.Data.AFUpdateOption.Replace);

                Console.WriteLine("\n\nCurrent time: {0}\n", DateTime.Now.ToString());
                Console.WriteLine("Sent historical value => Value: {0}, Timestamp: {1}", myNewHistoricalValue.Value.ToString(), myNewHistoricalValue.Timestamp.LocalTime.ToString());
                Console.WriteLine("Sent future value => Value: {0}, Timestamp: {1}.", myNewFutureValue.Value.ToString(), myNewFutureValue.Timestamp.LocalTime.ToString());


When running the first program, you will get the following output shown on Figure 3.




Figure 3 – Output of Program 1.


Remember that timestamp of the values sent to the future tags has an offset of 10 minutes compared to the current time.


The second program will signed up for data change events for those two PI Points. Concerning historical PI Points we already know how it works. Changes to the snapshot and archives are captured by the data pipe.  For the future PI Points, it behaves according to what is set on the property myDataPipe.EventHorizonMode which specifies which events are returned by the datapipe. The main two options are:

  • AFEventHorizonMode.EndOfStream à the data pipe will only return changes to the time series for the new events added after the end of the stream after the sign up.
  • AFEventHorizonMode.TimeOffset à will return events up to Now + EventHorizonOffset. The default value of EventHorizonOffset is zero. In this case the datapipe gets new events that were on the future, are now on the present and that soon will belong to the past.

The last option will be used in our program with EventHorizonOffset  equal to five minutes. In this second program, I have chosen to use the GetObserverEvents() instead of GetUpdateEvents(). Therefore, I have created a DataPipeObserver class which implements IObserver<AFDataPipeEvent> as shown below:



public class DataPipeObserver : IObserver<AFDataPipeEvent>
        public void OnCompleted()

        public void OnError(Exception error)

        public void OnNext(AFDataPipeEvent value)
            Console.WriteLine("\n{0} NEW VALUE from PI Point: {1}\n => Value: {2} and Timestamp is {3}.", DateTime.Now.ToString(), value.Value.PIPoint.Name, value.Value.Value, value.Value.Timestamp.LocalTime);

For every new method sent, the OnNext() method will be called.


On the second program, it is necessary to register an IObserver for AFDataPipeEvent with the AFDataPipe through the subscribe method. Then, all the AFDataPipeEvents received by the data pipe will be sent to the IObserver.



static void Main(string[] args)
            PISystem myPISystem = new PISystems()["OSI-SERV"];
            AFElement myElement = AFObject.FindObject(@"\\OSI-SERV\AFSDK Test\DataPipe") as AFElement;
            AFAttribute myHistoricalAttribute = myElement.Attributes["Historical"];
            AFAttribute myFutureAttribute = myElement.Attributes["Future"]; 

            // Sign up for updates for attributes
            using (AFDataPipe myDataPipe = new AFDataPipe())
                myDataPipe.EventHorizonMode = AFEventHorizonMode.TimeOffset;
                myDataPipe.EventHorizonOffset = new TimeSpan(0, 5, 0);
                IObserver<AFDataPipeEvent> observer = new DataPipeObserver();
                Console.WriteLine("Starting (press any key to exit)"); 

                // create a cancellation source to terminate the update thread when the user is done
                CancellationTokenSource cancellationSource = new CancellationTokenSource();
                Task task = Task.Run(() =>
                    // keep polling while the user hasn't requested cancellation
                    while (!cancellationSource.IsCancellationRequested)
                        // Get updates from pipe and process them
                        AFErrors<AFAttribute> myErrors = myDataPipe.GetObserverEvents();

                        // wait for 1 second using the handle provided by the cancellation source

                }, cancellationSource.Token); 

                Console.ReadKey(); // this will block until the user presses a key 
                Console.WriteLine("Exiting updates");
                cancellationSource.Cancel(); // when this is called the update loop will terminate
                task.Wait(); // wait for the task to complete before taking down the pipe

The first and second programs were started together by setting up the solution as shown on Figure 4.




Figure 4– Solution Property Pages


The output of the second program is shown on Figure 5.




Figure 5– Output of Program 2.


Let’s try to understand what has happened. For the historical data as soon as the events were sent, they were captured by the data pipe.


For the future values, it has taken 5 minutes for the second program to display the values because the values were sent 10 minutes on the future and the EventHorizonOffset was set to 5 minutes. This scenario is useful when you have a display and you want its values to be updated when they are exactly 5 minutes on the future.


Although in this demo I have continuously written data to the future tag using another project from the same solution, for the TimeOffset mode, the PI AF SDK will behave the same way as if the same values were already stored on the future archive.


Although data pipes is a complex topic especially with future data, I hope you got a glimpse about how to use data pipes with future data. Let’s wait for any release of PI AF Server 2015 so I can write something about AF Identities and Mappings!


Stay tuned!