Skip navigation
All Places > PI Developers Club > PI .NET Framework (PI AF SDK) > Blog

There is a blog post that shows how to use AFSDK with Python.

PI and Python? PIthon! (Rafael Borges created it)

Sometimes I provide training courses in OSIsoft Japan office and some customers wanted to use python to get data from PI.

For connecting PI from python, PI Web API can be used.

Using PI Web API with Python

Though AFSDK is also works as Rafael's post, so I created some codes and want to share it with community.

 

At first download Python.

https://www.python.org/downloads/

There are 2.7 and 3.6. This time, I used python 3.6.1.

We need to install pythonnet too.

http://pythonnet.github.io/

I used pip command and could install easily.

From command prompt, go to python 3.6's directory and run following commands.

cd scripts
pip install pythonnet

This screenshot's install path is C:\Python36

After that, create PIthon.py file with following codes. Put this file to Python.exe's directory and run following from command prompt.

python.exe PIthon.py

 

I uploaded the code to GitHub too.

GitHub - kenji0711/PIthon: Python and AFSDK project

 

* This code includes write value to PI Tag.

Please change write tag name within the code.

writept = PIPoint.FindPIPoint(piServer,"PleaseEnterWriteTagName")

Also this code use NuGreen database. Please change element, attribute name as your AF object name.

import sys
import clr


sys.path.append(r'C:\Program Files (x86)\PIPC\AF\PublicAssemblies\4.0')  
clr.AddReference('OSIsoft.AFSDK')


from OSIsoft.AF import *
from OSIsoft.AF.PI import *
from OSIsoft.AF.Asset import *
from OSIsoft.AF.Data import *
from OSIsoft.AF.Time import *
from OSIsoft.AF.UnitsOfMeasure import *


print("Welcome to PIthon!!")
# PI Data Archive
piServers = PIServers()  
piServer = piServers.DefaultPIServer;


pt = PIPoint.FindPIPoint(piServer, "sinusoid")
name = pt.Name.lower()
# CurrentValue
print('\nShowing PI Tag CurrentValue from {0}'.format(name))
current_value = pt.CurrentValue()
print( '{0}\'s Current Value: {1}'.format(name, current_value.Value))


#recordedvalues
timerange = AFTimeRange("*-3h", "*")
recorded = pt.RecordedValues(timerange, AFBoundaryType.Inside, "", False)
print('\nShowing PI Tag RecordedValues from {0}'.format(name))
for event in recorded:
    print('{0} value: {1}'.format(event.Timestamp.LocalTime, event.Value))


#plotValues
plotvalues = pt.PlotValues(timerange, 1)
print('\nShowing PI Tag PlotValues from {0}'.format(name))
for event in plotvalues:
    print('{0} value: {1}'.format(event.Timestamp.LocalTime, event.Value))


#interpolatedvalues
span = AFTimeSpan.Parse("1h")
interpolated = pt.InterpolatedValues(timerange, span, "", False)
print('\nShowing PI Tag InterpolatedValues from {0}'.format(name))
for event in interpolated:
    print('{0} value: {1}'.format(event.Timestamp.LocalTime, event.Value))


#summariesvalues
summaries = pt.Summaries(timerange, span, AFSummaryTypes.Average, AFCalculationBasis.TimeWeighted, AFTimestampCalculation.Auto)
print('\nShowing PI Tag SummariesValues(Average) from {0}'.format(name))
for summary in summaries:
    for event in summary.Value:
        print('{0} value: {1}'.format(event.Timestamp.LocalTime, event.Value))


#writeValue
writept = PIPoint.FindPIPoint(piServer,"test999")
writeptname = writept.Name.lower()
val = AFValue()
val.Value = 20
#val.Timestamp = AFTime("t+9h")


print('\nWrite value to {0} value: {1}'.format(writeptname, val.Value))
writept.UpdateValue(val, AFUpdateOption.Replace, AFBufferOption.BufferIfPossible)


#Connect to AF
print ('\nConnect to AF')
afServers = PISystems()
afServer = afServers.DefaultPISystem
#DB = afServer.Databases.DefaultDatabase
DB = afServer.Databases.get_Item("NuGreen")
element = DB.Elements.get_Item("NuGreen").Elements.get_Item("Little Rock").Elements.get_Item("Extruding Process").Elements.get_Item("Equipment").Elements.get_Item("K-435")


attribute = element.Attributes.get_Item("Steam Flow")
attval = attribute.GetValue()


print ('Element Name: {0}'.format(element.Name))
print ('Attribute Name: {0} | Value : {1} {2}'.format(attribute.Name, attval.Value, attribute.DefaultUOM))


#create element with attribute
print('\nCreate Element with Attribute')
if DB.Elements.get_Item("Test New Element") is not None:
    print("Already Existing Element: Test New Element")
else:
    newelement = DB.Elements.Add("Test New Element")
    newelement.Description = "Created element from PIthon"
    newattribute = newelement.Attributes.Add("Test Attribute")
    newattribute.DataReferencePlugIn = afServer.DataReferencePlugIns.get_Item("PI Point")
    newattribute.DataReference.ConfigString = "cdt158"
    DB.CheckIn()
    print("Created new Element : Test New Element")











The results are following.

This code can be run by both Python 3.6, 2.7.

With Python 2.7, I needed to create AFValue by

val = AFValue()  
val.Value = 20  
val.Timestamp = AFTime("t+9h") 

 

Python 3.6 works by

val = AFValue(20, AFTime("t+9h"))

 

Python 3.6 is easier than 2.7 to write code.

AFSDK help file does not contain Python example.

So we need to looking for how to write codes by python. I hope this post helps someone.

If you have any questions, please comment it.

DataPipes are a very convenient way to get a continuous flow of data.  When you are using a DataPipe, you are subscribing to the PI Server and say "hey PI Server, please keep for me all data changes for tags A,B,C ... ".  After that, the only thing you need to do is to check for data from time to time to see if new changes occurred.  This is a very simple illustration but it should be enough for you to know if you should use a DataPipe or not.

--

Edit: 2015-03-11 I have modified this example to integrate Robert Wai and Daphne Ng recommendations.

Edit: 2015-03-12

  • For reference, attached the initial code sample AFDataPipeExample - Version 1.cs this is the simplest approach, prior to Robert and Daphne's comments.
  • For reference, also attached Version 2 - which wraps the entire DataPipe and signups  and updates into a Task, this works, but once its started, you cannot access the dataPipe object anymore.
  • Current Version implements the IObserver pattern and provides more flexibility than previous examples and is better suitable for re-use.  And should also perform better with a high volume of data because the IObserver pattern limits the number of AFValue (s) created and that helps minimizing the number of generations in .Net Garbage Collection.

 

Edit: 2015-05-11 - You me also be interested by this KB article which explains why the AFDataPipe requires polling. KB01195 - Why AFDataPipe requires polling when it uses IObservable pattern

 

Edit: 2015-11-30: A complete solution that contains the most up to date code and a running PI DataPipe and AFDataPipe example can now be found on GitHub: osisoft/PI-AF-SDK-Clues · GitHub

--

 

Why this example?

I have seen several questions about Data Pipes, and to properly answer it I thought the best would be that I test it myself.  I did it and created this small example which I believe can help you get started with it.  This example should be good enough to give you the main idea on how Data Pipes work and how you can implement them in your own application. When you run this example, you'll get information about changes from configured tags and attributes as show on the picture below:

2015-03-11_19-01-02_C__CurDev_PIDataAccessSamples_SimpleAFCommandLineApplication_bin_Debug_SimpleAFC.png

 

I would like very much to get your feedback from real use cases and adapt this example so we have a good starting point for people who need to implement AFSDK DataPipes.

 

To test it:

  • insert the class below into a new command line application
  • Change the namespace so its fits to your application
  • In the Program.cs file,  Main function, just call the method:
DataPipeSubscribeExample.Run()

 

Here is the first class that contains the main logic and parameters, the file should be named DataPipesExamples.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using OSIsoft.AF;
using OSIsoft.AF.Asset;
using OSIsoft.AF.Data;
using OSIsoft.AF.PI;


namespace ConsoleApplication1
{
    public class DataPipeSubscribeExample
    {
        // constants
        private const string AFserverName = "OPTIMUS";
        private const string PIserverName = "OPTIMUS";
        private const string AFdatabaseName = "CasesDatabase";
        private const string AFElementName = "AFDataPipe";
        private const string PITag = "CDT158";


        /// <summary>
        /// This examples shows how an AF data pipe works
        /// We create one afDataPipe and two piDataPipes
        /// </summary>
        public static void Run()
        {
            // AF AND PI CONNECTIONS
            //connect to AF Server
            var afServers = new PISystems();
            var afServer = afServers[AFserverName];
            afServer.Connect();
            var database = afServer.Databases[AFdatabaseName];
            var element = database.Elements[AFElementName];  // we select the element we want to monitor


            //PI SERVER
            // Connect PI Server
            var piserver = PIServer.FindPIServer(PIserverName);
            piserver.Connect();


            // get the tag we want to monitor
            var pointList = PIPoint.FindPIPoints(piserver, PITag).ToList();


            // MONITOR THE DATA PIPES
            // here we create a little "scheduler" to monitor changes on our element
            Console.WriteLine("Starting Monitoring Events in data pipe.");


            var dataPipes = new DataPipesHelper();
            dataPipes.AddSignup(element.Attributes);
            dataPipes.AddSignup(pointList, AFDataPipeType.Snapshot);
            dataPipes.AddSignup(pointList, AFDataPipeType.Archive);
            dataPipes.StartListening(TimeSpan.FromSeconds(5));


            // Waiting until user presses Enter, data gets updated by the data pipes class
            Console.WriteLine("Press enter key to stop monitor the Data Pipes.");
         
            // if you uncomment this, you may test to add more signups while its already listening...
            //Console.ReadLine();
            // dataPipes.AddSignup(PIPoint.FindPIPoints(piserver, "TIME.SECONDS").ToList(), AFDataPipeType.Archive);
         
            Console.ReadLine();
         
            // calling the dispose is IMPORTANT, it will remove the signups and terminate the pipes in an elegant manner.
            // if you don't the PI update manager will keep the signups until they time out, this is not elegant and may lead to situations
            // where the PI server consumes unecessary ressource: because it continues to keep values for unused signups (for something like 10 or 30 minutes).
            // This can occur especially during testing when you start and stop your application a lot.
            // on the PI Server you can execute the command pilistupd -cs, under %piserver%\adm to see active signups.
            dataPipes.Dispose();


            piserver.Disconnect();
            afServer.Disconnect();
            Console.WriteLine("Monitoring Events Stopped");
            Console.WriteLine("Terminated");

        }

    }

    /// <summary>
    /// This class Uses the Iboserver pattern : The Subscribe method on the DataPipes object
    /// uses an object that we have created to receive the data (AFConsoleDataReceiver and PIConsoleDataReceiver)
    /// It should be flexible enough to fit your needs
    /// </summary>
    public class DataPipesHelper : IDisposable
    {
        // Members
        private readonly AFDataPipe _afDataPipe = new AFDataPipe();
        private readonly PIDataPipe _piSnapshotDataPipe = new PIDataPipe(AFDataPipeType.Snapshot);
        private readonly PIDataPipe _piArchiveDataPipe = new PIDataPipe(AFDataPipeType.Archive);
        private Timer _timer;

        /// <summary>
        /// Constructor
        /// </summary>
        public DataPipesHelper()
        {
           // YOU SHOULD CONSIDER PASSING THE DATA RECEIVERS (AS IOBSERVER OBJECTS ) TO THIS CLASS, HERE THEY ARE HARD CODED FOR DEMONSTRATION PURPOSE    
            _afDataPipe.Subscribe(new AFConsoleDataReceiver());
            _piSnapshotDataPipe.Subscribe(new PIConsoleDataReceiver(AFDataPipeType.Snapshot));
            _piArchiveDataPipe.Subscribe(new PIConsoleDataReceiver(AFDataPipeType.Archive));
        }


        /// <summary>
        /// Starts the data update
        /// </summary>
        /// <param name="checkIntervall">interval that set how often GetObserverEvents is called </param>
        public void StartListening(TimeSpan checkIntervall)
        {
            if (_timer == null)
                _timer = new Timer(CheckForData, null, 0, (int)checkIntervall.TotalMilliseconds);
        }


        /// <summary>
        /// Stops the data update
        /// </summary>
        public void StopListening()
        {
            if (_timer != null)
                _timer.Dispose();
        }


        /// <summary>
        /// add  attributes signups to the class
        /// </summary>
        /// <param name="attributes"></param>
        public void AddSignup(IList<AFAttribute> attributes)
        {
            _afDataPipe.AddSignups(attributes);
        }

        /// <summary>
        /// add pipoints signups to the class
        /// </summary>
        /// <param name="piPoints"></param>
        /// <param name="afDataPipeType"></param>
        public void AddSignup(IList<PIPoint> piPoints, AFDataPipeType afDataPipeType)
        {
            switch (afDataPipeType)
            {
                case AFDataPipeType.Archive:
                    _piArchiveDataPipe.AddSignups(piPoints);
                    break;
                case AFDataPipeType.Snapshot:
                    _piSnapshotDataPipe.AddSignups(piPoints);
                    break;


                case AFDataPipeType.TimeSeries:
                    throw new NotImplementedException("Time Series Data Pipe is not implemented in this class");
                    break;
            }

        }

        /// <summary>
        /// This method retrieves data from the PI Update Manager
        /// Its is used internally by the timer every X seconds
        /// </summary>
        /// <param name="o"></param>
        private void CheckForData(object o)
        {

            bool hasMoreEvents;
            _afDataPipe.GetObserverEvents();

            do
            {
                _piArchiveDataPipe.GetObserverEvents(1000, out hasMoreEvents);
            }
            while (hasMoreEvents);

            do
            {
                _piSnapshotDataPipe.GetObserverEvents(1000, out hasMoreEvents);
            }
            while (hasMoreEvents);

        }

        /// <summary>
        /// Free the resource
        /// </summary>
        public void Dispose()
        {
            StopListening();
            _afDataPipe.Dispose();
            _piSnapshotDataPipe.Dispose();
            _piArchiveDataPipe.Dispose();

        }
    }

    /// <summary>
    /// This class receives data from AF Data Pipe
    /// </summary>
    public class AFConsoleDataReceiver : IObserver<AFDataPipeEvent>
    {
        /// <summary>
        /// Provides the observer with new data.
        /// </summary>
        /// <param name="value"></param>
        public void OnNext(AFDataPipeEvent value)
        {
            Console.WriteLine("AFDataPipe event - Attribute Name: {0}, Action Type: {1}, Value {2}, TimeStamp: {3}", value.Value.Attribute.Name, value.Action.ToString(), value.Value.Value, value.Value.Timestamp.ToString());
        }

        /// <summary>
        /// An error has occured
        /// </summary>
        /// <param name="error"></param>
        public void OnError(Exception error)
        {
            Console.WriteLine("Provider has sent an error");
            Console.WriteLine(error.Message);
            Console.WriteLine(error.StackTrace);
        }

        /// <summary>
        /// Notifies the observer that the provider has finished sending push-based notifications.
        /// </summary>
        public void OnCompleted()
        {
            Console.WriteLine("Provider has terminated sending data");
        }
    }

    /// <summary>
    /// This class receives data from a PI Data Pipe
    /// </summary>
    public class PIConsoleDataReceiver : IObserver<AFDataPipeEvent>
    {

        private AFDataPipeType _dataPipeType;

        public PIConsoleDataReceiver(AFDataPipeType afDataPipeType)
        {
            _dataPipeType = afDataPipeType;
        }

        /// <summary>
        /// Provides the observer with new data.
        /// </summary>
        /// <param name="value"></param>
        public void OnNext(AFDataPipeEvent value)
        {
            Console.WriteLine("PIDataPipe event - {4} - Tag Name: {0}, Action Type: {1}, Value {2}, TimeStamp: {3}", value.Value.PIPoint.Name, value.Action.ToString(), value.Value.Value, value.Value.Timestamp.ToString(), _dataPipeType);
        }

        /// <summary>
        /// An error has occured
        /// </summary>
        /// <param name="error"></param>
        public void OnError(Exception error)
        {
            Console.WriteLine("Provider has sent an error");
            Console.WriteLine(error.Message);
            Console.WriteLine(error.StackTrace);
        }

        /// <summary>
        /// Notifies the observer that the provider has finished sending push-based notifications.
        /// </summary>
        public void OnCompleted()
        {
            Console.WriteLine("Provider has terminated sending data");
        }
    }
}