PI AF 2.x allows the creation of custom data references, which extend the capability of PI AF by defining the information and mechanism necessary for an AF Attribute to read, calculate, and/or write a value. There are many reasons to implement a custom data reference (e.g. displaying data in AF from an external data source, custom calculations – though we encourage checking out asset analytics first!). In the background, data references are simply .NET classes that derive from the OSIsoft.AF.Asset.AFDataReference base class that are registered and stored in the AF Server. If you haven’t worked with custom data references before, please check out the white paper “Implementing AF 2.x Data References” available for download from the Tech Support Download Center.

 

The purpose of this blog post is to discuss a less-documented feature, which is to implement AF Data Pipes for custom data references. An AF Data Pipe is simply a collection of AF Attribute objects that are signed up for data change events on the server. It is often used by clients to get all data change events for AF Attributes. This question was first brought up in a thread: Custom Data Reference - How To Implement Data Pipe? After realizing that there is limited information about this topic, I have decided to test out implementing an AF Data Pipe.

 

 

Background

There are two types of data source for the AF Data Pipe (AFDataPipe): calculation or system of record. AFDataPipe considers a data reference a calculation data source if it has one or more inputs. For a calculation data reference to support data pipe, it must have at least one input supporting DataPipe.

 

In general, for a data reference plugin to support AFDataPipe:

  • If the data reference is not system of record and depends on AFSDK to get the inputs data to the data reference, AFSDK can handle getting the inputs and call AFDataReference.GetValue to compute the output for the data pipe event.
  • If the data reference is system of record, then it can implement a derived class of AFEventSource and expose a static CreateDataPipe method in the plugin to return this derived AFEventSource.

 

 

My Custom Data Reference

In this blog, I will be implementing a simple data reference based on a system of record to illustrate how to support AFDataPipe. This data reference, called SQLDR, simply gets the timestamps and values from the specified SQL table for display in PI AF.

 

Please feel free to skip this section if you are familiar with implementing a custom data reference. I’m using AF SDK 2.6.2 in this example.

 

Creating the Visual Studio Project

  1. Create a new Class Library using the .NET Framework 4.5.
  2. Add a reference to the OSIsoft.AFSDK.dll located at %pihome%\AF\PublicAssemblies\4.0 folder.
  3. Rename the class name (I renamed mine to SQLDR.cs).

 

Preparing our new class library to implement AFDataReference

Add the using directive:

using OSIsoft.AF.Asset;

 

Add inheritance for SQLDR to AFDataReference:

public class SQLDR : AFDataReference

 

Note that each custom AFDataReference class implementation requires a unique System.Runtime.InteropServices.GuidAttribute attribute to be specified to uniquely identify the data reference in AF. A System.SerializableAttribute and System.ComponentModel.DescriptionAttribute are also required on the class before an AFDataReference can be used in AF.

 

using System.ComponentModel;
using System.Runtime.InteropServices;

 

[Serializable]
[Guid("A1AC3A39-9E55-4700-BBC0-68299E67C4A1")]
[Description("SQLDR; Get values from a SQL table")]
public class SQLDR : AFDataReference

 

 

Implementing AFDataReference

ConfigString

There are 3 pieces of information we need for our custom data reference: the name of the SQL server, the database, as well as the name of the table containing the data. We will implement a simple configuration where these information are stored in a string separated by semicolons.

 

// Private fields storing configuration of data reference
private string _tableName = String.Empty;
private string _dbName = String.Empty;
private string _sqlName = String.Empty;

// Public property for name of the SQL table
public string TableName
{
    get
    {
        return _tableName;
    }
    set
    {
        if (_tableName != value)
        {
            _tableName = value;
            SaveConfigChanges();
        }
    }
}

// Public property for name of the SQL database
public string DBName
{
    get
    {
        return _dbName;
    }
    set
    {
        if (_dbName != value)
        {
            _dbName = value;
            SaveConfigChanges();
        }
    }
}

// Public property for name of the SQL instance
public string SQLName
{
    get
    {
        return _sqlName;
    }
    set
    {
        if (_sqlName != value)
        {
            _sqlName = value;
            SaveConfigChanges();
        }
    }
}

// Get or set the config string for the SQL data reference
public override string ConfigString
{
    get
    {
        return String.Format("{0};{1};{2}", SQLName, DBName, TableName);
    }
    set
    {
        if (value != null)
        {
            string[] configSplit = value.Split(';');
            SQLName = configSplit[0];
            DBName = configSplit[1];
            TableName = configSplit[2];
            SaveConfigChanges();
        }
    }
}

 

 

GetValue/GetValues

Next, we need to tell AF what values to retrieve and display when the client requests for values for the data reference. Since we will be getting data from a SQL server, let’s write a separate SQLHelper class.

 

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Data;
using System.Data.SqlClient;

namespace AFSDK_CustomDR_SQL
{
    /*****************************************************************************************
     * SQL Helper class to get data from the specified SQL Server, SQL Database and SQL Table. 
     * Given a start and end time, the static GetSQLData method will return a SQLDataReader
     * object that contains all values between the time interval in ascending order.
     * Given the end time only, only the most current value will be returned. 
     *****************************************************************************************/
    class SQLHelper
    {
        public static SqlDataReader GetSQLData(string sqlServer, string sqlDb, string sqlTable, DateTime startTime, DateTime endTime)
        {
            // Construct connection string to SQL Server based on input parameters for SQL server name and database name.
            string connectString = String.Format("server={0}; database={1}; Integrated Security=SSPI; Connection Timeout=10", sqlServer, sqlDb);
            SqlConnection sqlConnection = new SqlConnection(connectString);

            // Construct SQL query
            string query;
            using (SqlCommand cmd = new SqlCommand())
            {
                cmd.Connection = sqlConnection;

                // SQL query for the most recent values before the end time
                if (startTime == DateTime.MinValue)
                {
                    query = String.Format("SELECT TOP 1 pi_time, pi_value FROM {0}.{1} WHERE pi_time <= @time ORDER BY pi_time DESC", sqlDb, sqlTable);
                    SqlParameter sqlTime = cmd.Parameters.Add(new SqlParameter("time", System.Data.SqlDbType.DateTime2));
                    sqlTime.Value = endTime;
                    cmd.CommandText = query;
                }

                // SQL query for all values over a specified time range
                else
                {
                    query = String.Format("SELECT pi_time, pi_value FROM {0}.{1} WHERE pi_time >= @startTime AND pi_time <= @endTime ORDER BY pi_time ASC", sqlDb, sqlTable);
                    SqlParameter sqlStartTime = cmd.Parameters.Add(new SqlParameter("startTime", System.Data.SqlDbType.DateTime2));
                    SqlParameter sqlEndTime = cmd.Parameters.Add(new SqlParameter("endTime", System.Data.SqlDbType.DateTime2));
                    sqlStartTime.Value = startTime;
                    sqlEndTime.Value = endTime;
                    cmd.CommandText = query;
                }

                /* Open SQL connection and return the SqlDataReader object. Use CommandBehavior.CloseConnection to ensure that the 
                 * SQL connection is closed when the SqlDataReader object is closed. */
                sqlConnection.Open();
                SqlDataReader sqlReader = cmd.ExecuteReader(CommandBehavior.CloseConnection);
                return sqlReader;
            }
        }
    }
}

 

 

Now, we are ready to override the GetValue and GetValues methods in our main SQLDR class.

 

Add the using directive:

using System.Data;
using System.Data.SqlClient;
using OSIsoft.AF.Time;

 

Override GetValue and GetValues:

// Return latest value if timeContext is null, otherwise return latest value before a specific time
public override AFValue GetValue(object context, object timeContext, AFAttributeList inputAttributes, AFValues inputValues)
{
    AFValue currentVal = new AFValue();
    DateTime time;
    if (timeContext != null)
    {
        time = ((AFTime)timeContext).LocalTime;
    }
    else
    {
        time = DateTime.Now;
    }
    using (SqlDataReader reader = SQLHelper.GetSQLData(SQLName, DBName, TableName, DateTime.MinValue, time))
    {
        if (reader.Read())
        {
            currentVal.Timestamp = AFTime.Parse(reader["pi_time"].ToString());
            currentVal.Value = reader["pi_value"];
        }
    }

    return currentVal;
}

// Return all values (converted to AFValues) over a specific time interval
public override AFValues GetValues(object context, AFTimeRange timeRange, int numberOfValues, AFAttributeList inputAttributes, AFValues[] inputValues)
{
    AFValues values = new AFValues();
    DateTime startTime = timeRange.StartTime.LocalTime;
    DateTime endTime = timeRange.EndTime.LocalTime;
    using (SqlDataReader reader = SQLHelper.GetSQLData(SQLName, DBName, TableName, startTime, endTime))
    {
        while (reader.Read())
        {
            AFValue newVal = new AFValue();
            newVal.Timestamp = AFTime.Parse(reader["pi_time"].ToString());
            newVal.Value = reader["pi_value"];
            values.Add(newVal);
        }
    }
    return values;
}

 

 

Note that we could also be implementing Rich Data Access (RDA) methods such as RecordedValue, InterpolatedValue, etc. These were omitted in this short example. However, I encourage you to check out his PI Developers Club discussion about the topic: When would one need to implement AFDataMethods on a custom DR?

 

SupportedMethods, SupportedDataMethods and SupportedContexts

Finally, we will specify the minimal list of supported methods for our simple custom data reference.

 

Add the using directive:

using OSIsoft.AF.Data;

 

Override SupportedMethods, SupportedDataMethods, and SupportedContexts:

public override AFDataReferenceMethod SupportedMethods
{
    get
    {
        return AFDataReferenceMethod.GetValue | AFDataReferenceMethod.GetValues;
    }
}

public override AFDataMethods SupportedDataMethods
{
    get
    {
        return AFDataMethods.None;
}

public override AFDataReferenceContext SupportedContexts
{
    get
    {
        return AFDataReferenceContext.Time;
    }
}

 

 

Testing

To test our SQLDR data reference so far, let’s register it using the RegPlugIn.exe utility. Navigate to %pihome%\AF directory and run the following command:

Regplugin /PISystem:your-af-server “C:\path\to\dll”

 

Verify that the plug-in is registered by running:

Regplugin /PISystem:your-af-server /List

 

regplugin-list.png

 

Finally, open up PI System Explorer and create a new Attribute using our custom data reference SQLDR!

 

CDRTest.png                    CDRTest2.png

 

 

Implementing the Data Pipe

Alright! We have a working data reference. We are now ready to implement AF Data Pipe.

 

If you try to sign up for updates, you will not receive any updates because the data reference does not support AFDataPipe currently.

 

There are a few things we will need to do:

  1. Implement AFEventSource with AddSignup, RemoveSignup, GetEvents() and Dispose(bool). AFEventSource is a public abstract class providing a basis for data reference developers to implement the Data Pipe feature. The base class will handle all the interaction with AF SDK pipe modules, and implementer does not have to worry about interfacing with different data sinks.
  2. Add a static method, CreateDataPipe, in your implementation of AFDataReference to return an AFEventSource object that you have implemented.
  3. Explicitly state support for data pipe by changing the SupportedDataMethods to include the AFDataMethod.DataPipe.

 

1. Inheriting AFEventSource

The AFEventSource base class has methods to publish data pipe events and report errors, as well as maintains a dictionary of the AFAttributes being monitored by the eventsource. We will create a new class (EventSource.cs) which inherits from the AFEventSource base class.

 

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using OSIsoft.AF;
using OSIsoft.AF.Asset;
using OSIsoft.AF.Data;
using OSIsoft.AF.Time;

namespace AFSDK_CustomDR_SQL
{
    /*****************************************************************************************
     * The EventSource class implements AFEventSource to specify how to get data pipe events 
     * from the system of record. 
     *****************************************************************************************/
    class EventSource : AFEventSource
     {

 

 

Storing Timestamps

First, we would like to store the start time when the data pipe was initiated, as well as the last timestamp where data was retrieved for each AF Attribute. Since there can be a list of AFAttributes signed up for the data pipe, we will keep a dictionary of <AFAttribute, AFTime> to record the last timestamps for each AF Attribute.

 

// Last timestamps for each AF Attribute
Dictionary<AFAttribute, AFTime> _lastTimes = new Dictionary<AFAttribute, AFTime>();

// Start time when the pipe is initiated
AFTime _startTime;

// Initialize the start time for the event source
public EventSource()
{
    _startTime = new AFTime("*");
}

 

 

The GetEvents method

Next, we will implement the GetEvents method, which is designed to get data pipe events from the system of record. The base class maintains a dictionary of the AFAttributes being monitored by the eventsource. The idea here is to publish AFDataPipeEvents between the last timestamps till current time, for each AF Attribute that is signed up for the data pipe.

 

// Get new events for the pipe from the last timestamps till current time of evaluation
protected override bool GetEvents()
{
    // Set evaluation time to current time
    AFTime evalTime = AFTime.Now;

    // Get the list of AF Attributes signed up on the data pipe
    IEnumerable<AFAttribute> signupList = base.Signups;

    // Get values for each AF Attribute, one at a time
    foreach (AFAttribute att in signupList)
    {
        if (!ReferenceEquals(att, null))
        {
            // Add AF Attribute if it hasn't been added to the _lastTimes dictionary yet
            if (!_lastTimes.ContainsKey(att))
            {
                _lastTimes.Add(att, this._startTime);
            }

            // Set time range to get all values between last timestamps to current evaluation time
            AFTimeRange timeRange = new AFTimeRange(_lastTimes[att], evalTime);

            /* Note: Use RecordedValues if supported. GetValues call return interpolated values at the start and end time,
             * which can be problematic in a data pipe implementation. GetValues is used here for this simple example because
             * the implementation of GetValues in my custom DR does not return interpolated values at the start and end time. */
            AFValues vals = att.GetValues(timeRange, 0, att.DefaultUOM);

            // Store old last time for the AF Attribute
            AFTime lastTime = _lastTimes[att];

            // Publish each value to the data pipe
            foreach (AFValue val in vals)
            {
                // Record latest timestamp
                if (val.Timestamp > lastTime)
                {
                    lastTime = val.Timestamp;
                }
                AFDataPipeEvent ev = new AFDataPipeEvent(AFDataPipeAction.Add, val);
                base.PublishEvent(att, ev);
            }

            // Add a tick to the latest time stamp to prevent the next GetValues call from returning value at the same time
            _lastTimes[att] = lastTime + TimeSpan.FromTicks(1);
        }
    }
    return false;
}

 

 

The Dispose method

Finally, we will override the dispose method to clean up objects.

 

// Dispose resources
protected override void Dispose(bool disposing)
{
    _lastTimes = null;
}

 

 

2. Add the CreateDataPipe method

At the main SQLDR class, we will add a static method CreateDataPipe that returns an AFEventSource object.

 

// Return an AFEventSource object for this custom data reference
public static object CreateDataPipe()
{
    EventSource pipe = new EventSource();
    return pipe;
}

 

 

3. Include Data Pipe in Supported Data Methods

Last but not least, we need to tell clients that data pipe is supported for our custom data reference. We will therefore change the SupportedDataMethods to include DataPipe.

 

public override AFDataMethods SupportedDataMethods
{
     get
    {
        return AFDataMethods.DataPipe;
    }
}

 

 

Testing Data Pipe Functionality

To test, let’s rebuild our solution and re-register the data reference to the AF server. Remember RegPlugIn? We will be using it to first un-register the data reference:

RegPlugIn /PISystem:your-af-server name-of-dll.dll /unregister

 

(You can find the name of the dll by inspecting the output of RegPlugIn /PISystem:your-AF-server /List)

 

Then, we will reregister using the same command as before:

Regplugin /PISystem:your-af-server “C:\path\to\dll”

 

Make sure the SQLDR attribute still looks good in PSE! (Reopen PSE to make sure that the latest data reference version is downloaded.)

 

This time, we will be testing the event pipe functionality in a custom AF SDK application. Before testing, I have created 3 attributes: one using PI Point data reference mapping to the PI Tag CDT158 (cdt158), and two attributes using our custom data reference SQLDR (SQLDRTest and SQLDRTest2). The code for the test is taken from Marco’s blog post: Using data pipes with future data in PI AF SDK 2.7. I will not go into the detailed implementation here; nonetheless, here are the results:

datapipe_results.png

 

Notice our SQLData Reference gets AFDataPipe updates, just as a PI Point data reference! (You might be wondering why the values of the custom data reference is identical to CDT158. The truth is, I am using the PI Interface for RDBMS to write the value of CDT158 to a SQL table, then using the custom data reference to read from the same SQL table. So the fact that we see the same value updates for CDT158 shows that my data pipe is working!)

 

 

Limitations in this Example

Note that in the above example, we are looping through each AFAttribute and making a single RecordedValues call. If you have implemented RDA methods and your data source supports bulk calls, you can do a bulk call on AFAttributeList in the GetEvents method. The downside is that you will be using the same time range for your bulk call, which may not apply for your data reference. If you want to keep track of the list of AFAttributes signed up for the data pipe, you can implement AddSignUp and RemoveSignUp in AFEventSource as well.

 

 

Things to Note

When implementing data pipes, be careful not to retrieve the same event twice. You will notice that we are adding a tick to lastTime in order to get around this issue. In addition, we are using SQLDbType.DateTime2 (instead of DateTime) to add higher precision when doing value retrieval.

 

 

Debugging

If you need to debug your custom data reference at any time during development, check out the following resources:

(Note that the second reference is written specifically for debugging custom delivery channel, but the general principals can be applied to debugging custom data references.)

 

 

Conclusion

I hope this blog post is useful to you. The full VS 2013 solution (including the test project) is in the GitHub repository afsdk-customdr-sql. Please feel free to give me any feedbacks and comments!