Skip navigation
All People > ernstamort > Holger Amort's Blog
1 2 Previous Next

Holger Amort's Blog

26 posts

Hi Guys,


Just a quick update for those who were using the ROSIsoft R Package.
I developed and released this library a few years ago (check out my post here ROSIsoft, OSIsoft PI and R - Version 2 ), just as a community package for anyone interested in playing around with OSI-PI and R.
I was revisiting the project recently, and made some minor updates to add support for R version 3.5+. 
The library was built with a fairly simple purpose ; create a way to access PI data directly out in R, and into a data frame.




Let me know if you have any problems with the latest built.


For a copy, bug fixes or enhancements just drop me a quick note.

Last year we developed a calculation engine for C# and VB.Net applications:Advanced Analytics Engine for AF


The idea was to expand the capabilities of the PI Analysis manager by providing a framework for .NET development. We also developed a mock-up for easy migration of legacy PI-ACE code.


The next step was to implement R and Python scripts and make it seamless to apply scripted languages and there we hit a road block. We didn't find the right open or closed source web service that fit our requirements. Main requirements were that it has to be Windows service based, supports different data types, the major operations such as push\execute\pull and provides metrics to optimize load balancing.


We ended up writing our own micros service:R and Python WebService I


The last task is to extend the existing calculation engine with the new web services. To be able to do that we need configuration options for the inputs and outputs and hence a redesign of the UI.


Here is a first look of the new engine:




R and Python WebService I

Posted by ernstamort Jan 14, 2019

Although developing models and algorithms in packages such as R and Python is straight forward, integrating these scripts is not - especially not in Windows environments. The main reason is that the scripts require a run time environment to execute and any other system need to communicate with these engines. Or in NET language: You have to call unmanaged code from managed code.


The other main hurdle is scalability. I believe R and Python can be setup to be thread safe, so they do provide a lock so that they can be called from different threads. But since the script execution is single threaded adding more calculations requires separate engines\cores\threads. The challenge then is to distribute calculations on a core pool in order to minimize latency and fully take advantage of parallel execution.


The other aspect is that R and python scripts are very different from C# or VB.Net code since they run typically slower and the calculations are also more complex. To tune a system of 10's or 100's of models requires a lot of metrics such as execution time and client\server latency. Ideally these metrics could also be used to automatically load balance the system.


The idea was to create a web service that provides a common interface and can communicates to different scripting languages. The web service can be configured to spin up several cores, each with its own communication channels and internal buffering. The cores will load standard and custom libraries on start-up based on configuration settings.


The service is pretty simple and just has GET commands for each language, for example:





For each call the returning JSON will contain the time for core processing and execution.


The data model is based on a sequence of operations that are necessary to execute a command\script. So you have Push, Pull and Execute operations and an equation such as: a + b = c would be split up into the following 4 operations:



  1. Push a
  2. Push b
  3. Execute: a + b = c
  4. Pull c


The advantage of this approach is that it minimizes the number of calls to the server. It also allows to provide a common interface to both languages, which makes the client side programming easier and allows flexibility in the way models or scripts can be called recalled.


Currently the data model supports the following data types:

Single Valueüüü
Data Frame (only for R)üüü


Here are just a few examples:


Push double value:


Push and pull same value:


a+b=c Example:



Sleep for 1 second:


Note: All times are in milliseconds.


In the next blog post I will provide some more examples and also share some load balancing metrics.

The first time I read the work from  Dacorogna and Mueller on unevenly spaced time series operator in Finance, I thought this should have broad applications also in manufacturing data analysis. The underlying idea is to create a family of common operators such as moving average, standard deviation and deviation on the basis of the exponential moving average. The benefit is that you don't have to store a window of historical data in order to calculate the next estimate. In this approach you would need only to store the current and in some cases the previous value.


The calculations are then of course much faster than using the traditional approach and you don't have the network latency of going back to the server to request historical data. There is also the additional benefit that the calculation results are more continuous on regime changes. There are also some drawbacks, for example the algorithm needs a build up time so the calculation have to be preconditioned.


We have included a whole range of continuous operators in the Advanced Analytics Engine, but they can of course also be implemented in a standard Custom Data Reference:


EMA = exponential moving average (similar to Moving average - Wikipedia )

CMa = Continuous moving average

CMsd = Continuous moving standard deviation


and others. The benefit of these operators are that they can be easily combined to create new operators. The Z-score for example is really useful to identify outlier:


{\displaystyle z={x-{\bar {x}} \over S}}(from Standard score - Wikipedia )


And a look of the normal distribution curve, shows how it works:

If the abs(Z-score) > 3 you can suspect that the value is an outlier. The amazing part of these operators is that the score can be computed in real-time and since the calculation is so fast you can perform outlier detection for every new point at high sampling rates.


To test this we can simulate a sinusoid and add some Gaussian noise. The first example shows how the CMa operator filters a very noisy signal:

Since we can compute the average and standard deviation in real-time, we can also now monitor the z-score on an event base. The following shows the real-time Zscore operator:

I used the absolute value, suspected samples can then be automatically flagged if the z-score is greater than 3.


Some properties of any moving operator are that:


     a) you are basically creating a modified time-series - so these are not the original data points
     b) there is also some lag of the filtered signal


Especially the first bullet point is important in the regulated industry, because they do insist on original values.

One of the things I have always pondered about is, if you can create a time-series with the original data points that denoises the signal. The OSIsoft compression algorithm by design over samples extreme values. So can you resample the original sample in a way to remove extreme values in real-time?


One way to do this is to calculate the median of a moving window and preserve the time stamp of the result:

The algorithm re samples the original data series, but now keeps the original data points that have a smaller distance to the signal. This can also be shown in the summary statistics and the histogram:



StatRawPosition Median


In my view the AF layer is the hand off between the engineering and data science teams. Data that are provided downstream should be conditioned and cleaned to improve the success rate of the modeling efforts.

But also Site applications such as alarming should be based on high quality data streams. Real-time operator and the re sampling methods are great tools that can be implemented with little efforts.

VCampus has always been an ideas incubator where people share new developments and best practices. Once of the things I was always interested in was real time analytics, where data are analyzed with very little latency. This is especially important in data driven process analytics, where data are used for making decisions on process changes.


A couple of months ago we developed a prototype to executes C# code in real-time using a calculation engine. The idea was to close the gap between site specific low-level calculations that can be performed in PI Analysis and high-level data analytics that typically resides on the Enterprise level. The demo was a good start and thanks to my manager Damien O'Connor we found a project that supports the effort and were able to productize it.


The main driver for the project was PI-ACE to AF migration as there are still a lot of ACE projects out there as the product is hitting the end-of-life cycle. So naturally the engine had to be extended to fully support VB.NET. We received a lot of ACE code that will need to be migrated and interestingly a lot of the code was around batch processing. Engineers have been leveraging ACE to trigger custom batches and modifying existing structures.


During the project we also had the opportunity to work with our intern Pablo Gomez. He investigated real-time operators that can now be called as extensions to a time series operation. In addition, we worked on R and Python implementation and are unit testing the integration. The data science GUI is very different from the NET programming and tailored to allow data scientist to call their functions easily.


In summary, it is really nice to see something 1st developed in VCampus make it into the real world and allow engineers to develop ultra fast real time solutions.


Here is a screen shot of the engine in AF:Demo75.gif


We also set up a Github page for more information:


GitHub - tqsintegration/Advanced-Analytics-Engine-for-AF: A custom AF Data Reference, that allows users to template, sto…


R Calculation Server Design

Posted by ernstamort Mar 21, 2018

An R scripting engine for PI is really powerful way to apply R time series libraries to real time data. One of the challenges of using the R libraries out-of-the box is that they are written for evenly spaced data points, but sensor data is often unevenly spaced. This requires interpolation of data points to retrieve an evenly spaced vector.


In my last blog I showed the Custom Data Reference to configure a calculation.


The question is how do you provide interpolated values in real time in a scalable solution. One approach would be to use the Rx libraries to create Observables to buffer data, interpolate them and send them to the R engine for calculation. There are already several articles on VCampus describing how to use Rx with the AF-SDK for example:


Async with PI AF SDK: From Tasks to Observables


The Rx library is a great way to handle data in motion and excellent fit for PI data.


The task to provide real time interpolations to R can be broken down into the following tasks:

  1. Subscribe to PI data
  2. Provide rolling windows for one or more variables
  3. Sample the stream based on the polling rate; polling rate = 0 would be event based
  4. Group the stream by variable; here you would transition from Observable to List
  5. Perform Interpolation
  6. Calculate R
  7. Send results to PI


Each task is one layer, so if you would create for example 2 windows of different sizes and use "variable1" you would then only subscribe to one data feed.

Therefore the only load on the PI server itself is to provide real time data stream.


A marble diagram  shows the main operation on the data stream (here is a nice website to play with Rx operator):



The operations are mostly out-of-the box functionality of the Rx library. The interpolation uses the previous value for the left side boundary and the current value for the right side boundary.


When you put this all together the question is: So how fast is this and how scalable is this?


The second question is easier to answer in that Rx was designed for this type of application, so by abstracting away the hard part of threading, locking, syncing, scaling up seems almost too easy. There have been some question raised about the performance though:


c# - Reactive Extensions seem very slow - am I doing something wrong? - Stack Overflow


I did some measurements and found for interpolating 100 data points, it takes ~ 200 microseconds. Considering that the R script execution time in the range of milliseconds, this is really negligible.


In summary, complex event processing (CEP) or Rx which are designed to handle large data streams are ideal to perform real time calculations. Vector based calculations require an additional fast interpolation step that can be integrated in the data flow. By placing the buffering and interpolation on the calculation engine, the PI server is stripped on the additional load of providing a vast number of very similar interpolations.

Time series data are a very good application in R, so naturally the OSIsoft PI database is a good fit as a data source. Getting PI or AF data into R requires some plumbing and flattening of the data structures. The following library takes care of this:


ROSIsoft, OSIsoft PI and R - Version 2


But how do you deploy a model? How do you get inputs and outputs from a script engine from and to PI? And this in real time ...


I have been working on this problem for a while now and this goes beyond the plumbing part: Engineers have to be able to quickly develop and deploy models and work within the context of their AF and EF models. A graphical user interface needs to facilitate the process and allow to visualize the calculation.


There is also a need to work with time series vectors\matrices instead of single value calculations. Time series vectors\matrices can feed into forecasting and multivariate models and allow for much more sophisticated models or solutions. Calculations also need to support several in and outputs, current and future values.


Most important IMO is that any solution should also integrate into the existing AF and EF architecture. AF currently offers Custom Data References (CDR) to extend the existing functionality. Here are a couple of great references:


Implementing the AF Data Pipe in a Custom Data Reference

Developing the Wikipedia Data Reference - Part 1

Developing the Wikipedia Data Reference - Part 2


For this project I used the CDR just as a configuration and modeling environment. The calculation will be performed on a R Data Server, which will be described in my next blog post.


The "R Calculation Engine" CDR can be selected as any other reference:


The first step in the calculation is to frame the analysis by creating an element template with the input and output attributes. Then the calculation can be configured using the following GUI:


Each attribute is mapped to a R Variable (in- and output). The time stamp is translated to a POSIXct data type and the value to double precision. The data can be used in R as follows:


     Variable$Time : Array of time stamps

     Variable$Value : Array of values


The model can interactively be developed and is executed in an R Engine. The results are translated back to AFValues type and displayed in the plot window.


Once the model is developed the model settings are stored in a JSON string. As mentioned earlier, this custom data reference allows the configuration of the calculation. Real time and historical calculation are better performed as separate service, which I will describe in my next blog.

Almost a year ago I wrote a blog about a R library to connect to OSIsoft PI and AF (ROSIsoft). The feedback was great and I tried to respond quickly.

I am also seeing more and more project with modeling and forecasting needs, which is a great development.


The problem is that the build process is rather lengthy; you have to work both in the .NET and R environments to put a package together. That is inherently a problem with scripting languages, they just don't do well with loops. So you need to build an intermediate layer between the AF-SDK objects and simple array types that R understands.


There were also other problems with the initial approach:


  1. The results were returned in lists, whereas most applications in R work with data.frames.
  2. Time Stamps were returned as string or double value (EXCEL) types instead of R date time types such as POSIXct; this required an extra conversion step in R.
  3. Function and variable description were missing.
  4. And as mentioned the build process was mostly manual.


I automated the build process and created a scripting engine in VS to write the R functions. That really helped accelerating the build process.


The library is loaded the same way:


  1. Installation is done manually. In RStudio select Tools\Install packages … and when the open dialog opens, change the option “Install from:” to “Packaged Archive File”
  2. After the installation the library is loaded with: library(ROSIsoft)
  3. To connect to AF and PI server use first: AFSetup() this will install dependencies such as rClr
  4. To connect to the PI server use the following:
  5. connector<-ConnectToPIWithPrompt("<PI Server>")  and connector<-ConnectToAFWithPrompt("<AF Server>","AFDatabase"

That's it.

But the data model looks now different:

And the data calls return the R time stamps:



Or values:

Some people have asked about summary types, which is really a good way to down sample data. This function is now also returning a data.frame:

Or for multiple values:

The package still needs a fair amount of testing and I would be happy if you would send your feedback to: As I mentioned, due to the automated build process maintenance of this package should now be much easier.

The PI2PI interface is routinely used to copy data from a source Server to a target Server. This is especially useful when consolidating for example data from a site server to an enterprise server. A lot of companies differentiate between production PI Server and application PI Server, where business units have direct access for visualization, reporting and analysis. The PI2PI interface is also the gate between different networks, which is an important aspect in cyber security.


One drawback of copying data between servers is that the PI2PI interface adds latency to the data flow - this is of course to be expected, data have to be read from the source and then written to the target.


The latency is an important factor when designing applications, especially event driven calculation. In PI an event is triggered when the data value enters the snapshot or archive queue. This process can be monitored and the latency calculated:


Measuring latency using PowerShell


When I measured the latency of data values on a production system, where two PI2PI interface were used in series, I was surprised about the measurements. The average latency was in the range I expected, but the standard deviation and distribution seem odd.


To understand the effect better I put together a small simulation in R. Here are the results for a system with 2 PI2PI interfaces in series:


This was not an exact match of the production system, but it showed some of the same patterns. The simulation was performed for a tag with a polling rate of 3 sec. and PI2PI interfaces with 5 sec scan rate each.


Since this looks like a decent model, we can optimize the distribution by selecting different parameters. Since the PI2PI scan rates seem to have the largest impact, we can rerun the same model with 2 sec. scan rates:


This looks already better! We could try to try to fit the MC parameters to the real measurements, but this model is good enough to get the basic metrics.


So in summary:

  1. The PI2PI interface adds latency to the data flow and modifies the distribution (non normal, several modes)
  2. Event based apps should be robustifed to take this into account
  3. It's in general a good idea to measure latency in time critical applications
  4. A simple MC model can help to understand data flow and optimize settings

Dashboards are not Analysis

Posted by ernstamort Jan 21, 2018

Somebody had to say it ...


It's a common approach in data analytics to provide insights by creating colorful dashboards. These tools have become very powerful and the charts or graphs are very impressive. But is this really analytics ...?


Most data drive analysis works like the following:


     Training: Historical Data + Algo => Model

     Application: Real Time Data + Model => Prediction, Regression, Classification, .... depending on the algorithm


So what is the Algo part in Dashboards ...? Correct, it's the developer or user. Humans still have an unmatched capability in image processing which included classification and pattern recognition. The training of our human algo machine is the experience, skills and expertise we gain by working with data for a long time. This is evolution at its finest ... good to know what's a mammoth and what's a sable-tooth.


(Somewhere on LinkedIn I read an article that a group has developed a system that can detect  90% of pedestrians crossing a street ... I'm pretty sure I can do better ....)


And even though this is a powerful combination, it is an outdated concept. Nobody has time to stare at a screen for prolonged time, especially not in 24/7 operations. It might also be a waste of your most valuable\experienced resources to perform simple classification tasks.


There is no way around it: Most of the analysis part have to be performed by algorithms. The system has top be set-up in a way that information flows to the operator and doesn't have to be retrieved from screens. IMO this will lead to an enormous modeling effort during the transition - which has already started - , but will at the end lead to much more robust production.

The topic of system latency has come up a couple of times in recent projects. If you really think about it, this is not surprising. The more manufacturing gets integrated, data have to be synchronized and\or orchestrated between different applications. Here are just some examples:

  1. MES: Manufacturing execution system typically connect to a variety of data sources, so the workflow developer needs to know timeout settings for different applications. Connections to the automation system will have a very low latency, but what is the expected latency of the historian?
  2. Analysis: More and more companies move towards real-time analytics. But how fast can you really expect calculations to be updated? This is especially true for Enterprise level systems, that are typically clones from source PI servers by way of PI-to-PI. So you are looking of a data flow of for example:

    Source -> PI Data Archive (local) -> PI-to-PI -> PI Data Archive (region) -> PI-to-PI -> PI Data Archive (enterprise)

    and latency in each steep.

  3. Reports: One example are product release reports, how long do you need to wait to make sure that all data have been collected?


The PI time series object provides a time stamp which is typically provided from the source system. This time stamp will bubble up though interfaces and data archives unchanged. This makes sense when you comparing historical data, but it masked the latency in your data.


To detect when the data point actually gets queued and the recorded at the data server, PI offers 2 event queue that can be monitored:


AFDataPipeType.Snapshot ... to monitor the snapshot queue

AFDataPipeType.Archive ... to monitor the archive queue


There are some excellent articles on VCampus how to poll these queues, like:How to use the PIDataPipe or the AFDataPipe


You can also use PowerShell scripts, which have the advantage to be lighter application that can be combined with the existing OSIsoft PowerShell library. PowerShell is also available on most server, so you don't need a separate development environment for code changes.


The first step is to connect to the PI Server using the AFSDK:

function Connect-PIServer{














   Add-Type -Path $Library






The function opens a connection to the server and returns the .NET object.

To monitor the queues and write the values will then look like the following:


function Get-PointReference{




















function Get-QueueValues{
















   # get the pi point and cretae NET list

   $PIPointList = New-Object System.Collections.Generic.List[OSIsoft.AF.PI.PIPoint]


   # create the pipeline

   $ArchivePipeline=[OSIsoft.AF.PI.PIDataPipe]::new( [OSIsoft.AF.Data.AFDataPipeType]::Archive)

   $SnapShotPipeline=[OSIsoft.AF.PI.PIDataPipe]::new( [OSIsoft.AF.Data.AFDataPipeType]::Snapshot)

   # add signups



   # now the polling


   While((Get-Date) -lt $EndTime){

   $ArchiveEvents = $ArchivePipeline.GetUpdateEvents(1000);

   $SnapShotEvents = $SnapShotPipeline.GetUpdateEvents(1000);


   # format output:

   foreach($ArchiveEvent in $ArchiveEvents){

   $AFEvent = New-Object PSObject -Property @{

  Name =  $ArchiveEvent.Value.PIPoint.Name

  Type = "ArchiveEvent"

  Action = $ArchiveEvent.Action

  TimeStamp = $ArchiveEvent.Value.Timestamp.LocalTime.ToString("yyyy-MM-dd HH:mm:ss.fff")

  QueueTime = $RecordedTime.ToString("yyyy-MM-dd HH:mm:ss.fff")

  Value = $ArchiveEvent.Value.Value.ToString()





   foreach($SnapShotEvent in $SnapShotEvents){

   $AFEvent = New-Object PSObject -Property @{

  Name =  $SnapShotEvent.Value.PIPoint.Name

  Type = "SnapShotEvent"

  Action = $SnapShotEvent.Action

  TimeStamp = $SnapShotEvent.Value.Timestamp.LocalTime.ToString("yyyy-MM-dd HH:mm:ss.fff")

  QueueTime = $RecordedTime.ToString("yyyy-MM-dd HH:mm:ss.fff")

  Value = $SnapShotEvent.Value.Value.ToString()





   # 150 ms delay

   Start-Sleep -m 150





These 2 scripts are all you need to monitor events coming into a single server. The latency is simply the difference between the value's time stamp and the time it is recorded.


Measuring the latency between 2 servers - for example a local and an enterprise server - can be done the same way. You just need 2 server objects and then monitor the snapshot (or archive) events.


function Get-Server2ServerLatency{





   [Parameter(Mandatory=$true, Position=0,

   ValueFromPipeline=$true, ValueFromPipelineByPropertyName=$true)]



   [Parameter(Mandatory=$true, Position=1,

   ValueFromPipeline=$true, ValueFromPipelineByPropertyName=$true)]



   [Parameter(Mandatory=$true, Position=2,

   ValueFromPipeline=$true, ValueFromPipelineByPropertyName=$true)]



   $SourceList = New-Object System.Collections.Generic.List[OSIsoft.AF.PI.PIPoint]


   $TargetList = New-Object System.Collections.Generic.List[OSIsoft.AF.PI.PIPoint]


   # create the pipeline

   $SourcePipeline=[OSIsoft.AF.PI.PIDataPipe]::new( [OSIsoft.AF.Data.AFDataPipeType]::Snapshot)

   $TargetPipeline=[OSIsoft.AF.PI.PIDataPipe]::new( [OSIsoft.AF.Data.AFDataPipeType]::Snapshot)

   # add signups



   # now the polling


   While((Get-Date) -lt $EndTime){

   $SourceEvents = $SourcePipeline.GetUpdateEvents(1000);

   $TargetEvents = $TargetPipeline.GetUpdateEvents(1000);


   # format output:

   foreach($SourceEvent in $SourceEvents){

   $AFEvent = New-Object PSObject -Property @{

  Name =  $SourceEvent.Value.PIPoint.Name

  Type = "SourceEvent"

  Action = $SourceEvent.Action

  TimeStamp = $SourceEvent.Value.Timestamp.LocalTime.ToString("yyyy-MM-dd HH:mm:ss.fff")

  QueueTime = $RecordedTime.ToString("yyyy-MM-dd HH:mm:ss.fff")

  Value = $SourceEvent.Value.Value.ToString()





   foreach($TargetEvent in $TargetEvents){

   $AFEvent = New-Object PSObject -Property @{

  Name =  $TargetEvent.Value.PIPoint.Name

  Type = "TargetEvent"

  Action = $TargetEvent.Action

  TimeStamp = $TargetEvent.Value.Timestamp.LocalTime.ToString("yyyy-MM-dd HH:mm:ss.fff")

  QueueTime = $RecordedTime.ToString("yyyy-MM-dd HH:mm:ss.fff")

  Value = $TargetEvent.Value.Value.ToString()





   # 150 ms delay

   Start-Sleep -m 150






Here is a quick test of a PI2PI interface reading and writing to the same server

Get-Server2ServerLatency $srv $srv sinusoid sinusclone 30 | Out-GridView

As you can see the difference between target and source is a bit over 1 sec, which is to be expected since the scan rate is 1 second.

I have pondered with this topic for a while now and thought it might be a good discussion topic. A lot of architecture discussion I have been involved with go like “so we PI2PI the local data to the enterprise level, create BI views and then bolt on whatever big data analytic”. These must be also the three words that most likely get you through your next system architect interview (any permutation too).


But what does that really mean?


When you think about it, it goes back to the different time series that sensors are producing and the ones that data application expect. To make a longer story short, sensors create heteroscedastic time series, where you have unequal spacing between points in time. In PI this is mostly caused by the compression algorithms. On the other side data applications expect table like data structures (homoscedastic), where data are evenly spaced (I call them SQL data ....).


That’s where interpolation comes in: You can transform any heteroscedastic time series into a homoscedastic time series by interpolating on regular intervals.


So where is the problem?


Most data analytic are low frequency applications, the time scale might be minutes or even hours. On the other side sensor data are high frequency - let’s say seconds or a couple of seconds. So, when you apply data analytics you are performing two operations:

  •      down sampling
  •      and interpolation.


Just as an example, in batch analysis you might sample 100 data points from a total batch duration of a couple of days. This is a data point every 30 min … ! Therefor you are acquiring samples maybe at a 5-sec rate and then using only data every 30 min …?


So why even sample at a 5-sec. rate if you discard most of the samples …?


A better use of the data is to take advantage of the larger sample number by averaging the signal. In that approach, your standard deviation (e.g. noise) improves by 1/sqr(n). In the above example -  5 sec vs 30 min – that is a factor of almost 20! In PI this can be accomplished by using exponential moving average in AF Analysis.


IMO, interpolating at low frequency is really not a good idea, because it discards a large fraction of data that could be used to improve the data quality. Better is to create moving averages and interpolate of those to improve the signal-noise ration (SNR) and improve the model quality. Even shorter moving averages do improve the SNR and will benefit the model quality. One thing to keep in mind is that the model stdev depends on the data stdev - in multivariate statistics the noise of the prediction is at best twice that of the raw data. So every improvement on the raw data side will helps predictability on the model end.

A couple of month ago I developed a Custom Data Reference (CDR) for R calculations and it looked on the surface fairly simple. But after using it for a bit it became pretty clear that this solution is not scalable.


The main problem when using a lot of R functions is that most of them require homoscedastic time series – data need to be sampled with equal spacing. Due to exception and compression PI data are heteroscedastic and therefore need to be interpolated before using them in most R packages. In the CDR, I used the interpolation function of the PI system to do the work, which made the workflow easy. In event driven calculation mode this is:

           New event arrives

           Get n interpolated values during a time span
           Submit the samples to R
           Calculate in R – I used linear regression as an example
           Get the result (slope) back
           Write to PI


In history recovery mode, a single CDR would just slow down my development system severly, so I didn’t even try to do the same for let’s say 100 attributes.


So the basic idea to improve performance is to develop an R Analysis service that takes of the load from the PI server and performs all the data conditioning and the calculation. The requirements are:


                Each calculation\point is indepently configurable
                Calculations are event or time triggered (elapsed)
                Interpolation are performed on the service
                Interpolated or Recorded data are sent to R
                Results are written back to a PI tag
                Each window calculation is in sequence - this is required to carry over calculation between time frames


Since R is single threaded I thought that ~ 500 calculations per service would be good goal.


To isolate each step, I created three worker threads:

               R Calcs
               Write to PI

and used Rx for all event driven components.


When I first tested the application, I measured around 1 - 2 milliseconds for the R calls, which I thought was fast enough for most purposes. That turned out quickly to be a wrong assumption ….

Let’s say you have a source tag with 5 sec sampling rates and you perform event based calculation. For a history recovery of a 3 month period this would translate to:

5 sec/sample = 1200 values/hour = 28,000 values/day ~ 2.5 Million values/ 3 months


1 – 2 milliseconds latency per R call means you can process 500 values/second, so for the 2.5 Million values it would take ~ 1.5 hours.


So 500 calculations per server would be unrealistic and there would be also a significant lag introduced in the real-time application (0.5 - 1 sec).


To speed up the calculation there are only 2 know knobs you can tweak:

               Reduce the number of R calls from Net
               Optimize the R function itself



To reduce the number of calls from NET, I changed the R Code to accept a matrix instead of a vector. This allows to perform multiple calculations on the R thread, which is especially important in history recover. The following shows an example of the modified regression function:


regression <- function(m,WindowSizeInSeconds,NoOfPoints) {
  # stuff that can be calcualted for all evals
  # build the model matrix
  # x<- matrix(seq(0,WindowSizeInSeconds,span),nrow=NoOfPoints)
  # actual worker function
    temp< (x, y)
    #temp<-lm(y~x, method = "qr", singular.ok = TRUE)
  # calculate for each row of matrix
  apply(m, 1, calc) 


R offers a couple of functions for benchmarking functions and I found that 'microbenchmark' works really great. The following shows the difference between 2 approaches to calculate the linear regression:


Unit: microseconds

         expr          min      lq       mean   median       uq      max neval

lm(y ~ x) 820.018 852.420 1025.02731 951.4145 1163.563 1796.691   100, y)       47.711  53.835   63.00429  59.3205   66.847  140.071   100



The 1st expression(lm(y ~ x)) is the recommend approach to do linear regression and each function call takes ~950 micro seconds or ~1 milliseconds. The 2nd one is a stripped down version and as you can see much faster @ 60 micro seconds. So just by changing the R function you can achieve a 15 fold increase in performance.


The following shows the R Service performance using the new approach:


The yellow curve shows the interpolation queue, which is able to process 450,000 interpolation in under 10 sec. The green line shows the R calculation queue, which processes 450,000 linear regressions in R in ~45 sec. The PI system (blue curve) obviously did not have any problems to write the results back to the data archive.


The next figure shows the linear regression in blue of a sinusoid in green:


In summary, the R service approach is promising. I haven't found many R applications that process that many events, so some of the solutions might be unique to this problem. Benchmarking the R functions will be always a necessary step to evaluate at what sampling rate the calculation can be deployed. As I mentioned there are large differences between the performance of different R functions\packages. The main reason is that these functions often are not designed for speed but to get a wide range of statistics. So for an interactive sessions these function are very convenient, but might not be ideal for real time applications.


The next step will be more testing and some more examples. My focus is on signal processing and forecasting, but let me know if you have other applications.

I'm also looking for beta tester - people that are brave enough to run this on their machine ...

AF Analysis is a pretty powerful tool and covers the majority of use cases. But there are also situations, where more advanced solutions are required.

The following is a common request: Basic Linear Regression: Slope, Intercept, and R-squared


And although the math is not difficult and code snippets are available on the internet, I don't think its a good idea to create modules just for that specific need. There will always something missing.

     You start with linear regression,

     then you need something to clean up outliers and missing values,

     maybe you want to robustify your calculations,

     a non linear function might give you a better fit,

     one variable might not enough to describe your data


Solutions for these problems are easy in MATLAB and R since they have already plenty of libraries available. The following describes how to build a R custom data reference.



You need Visual Studio and an 64 bit R installation: Microsoft R Open: The Enhanced R Distribution · MRAN

Setup a standard class library, similarly to this post: Implementing the AF Data Pipe in a Custom Data Reference

In addition you will need the following nuget packages

    NuGet Gallery | Costura.Fody 1.3.3


There are alternatives for both nugets, but I just tested this combination.

And of course a standard AF client installation.

Its also useful to include a logger, I really like the following:  Simple Log - CodeProject


In the project set up you need to set the bitness of the library to x64.


To automate the testing I used the following: Developing the Wikipedia Data Reference - Part 2

which I changed to the x64 deployment.


Before you start I would also execute RSetReg.exe in the R home directory.


The Code


     We first need to set the config string:

        public override string ConfigString
                return $"{AttributeName};" +
                       $"{WindowSizeInSeconds};" +
                       $"{NoOfSegments};" +
                if (value != null)
                    string[] configSplit = value.Split(';');
                    AttributeName = configSplit[0].Trim('\r', '\n');
                    WindowSizeInSeconds = Convert.ToDouble(configSplit[1]);
                    NoOfSegments = Convert.ToInt32(configSplit[2]);
                    RFunctionName = configSplit[3].Trim('\r', '\n');


The idea is to have a function based on a source attribute that is executed on window size with a number of points

In the property setter you can already set the attribute:


        private string _AttributeName;
        public string AttributeName
            private set
                if (_AttributeName != value)
                    _AttributeName = value;
                    // get the referenced attribute
                    var frame = Attribute.Element as AFEventFrame;
                    var element = Attribute.Element as AFElement;
                    if (element != null) SourceAttribute = element.Attributes[_AttributeName];
            get { return _AttributeName; }
        public AFAttribute SourceAttribute { private set; get; }
        public double WindowSizeInSeconds { private set; get; }
        public int NoOfSegments { private set; get; }
        public string RFunctionName { private set; get; }
        private REngine engine { get; set; }


Next we add the R engine in the constructor:


        // initialize REngine
        public CalculateInR()
            // set up logger
            string pathAppData = Environment.GetFolderPath(Environment.SpecialFolder.CommonApplicationData);
            SimpleLog.SetLogDir(pathAppData + @"\CalculateInR", true);
            SimpleLog.SetLogFile(logDir: pathAppData + @"\CalculateInR",
                prefix: "CalculateInR_", writeText: false);
            SimpleLog.WriteText = true;
                // create R instance - R is single threaded!
                engine = REngine.GetInstance();
                // set working directory
                // source the function
                // might need to install and load libraries in R
            catch (Exception ex)


At minimum you would need to set the working directory and source your R code. For more advanced calculation you also might need to install\load libraries.


Next we need to build the helper method to send the values to R and get the results back:


        private double ExecuteRFunction(AFValues values)
            var vector = engine.CreateNumericVector(values.Select(n =>
            // make symbol unique; R is single threaded and share the variable space
            var uniquex = "x" + Attribute.ID.ToString().Replace("-", "");
            var uniquer = "r" + Attribute.ID.ToString().Replace("-", "");
            // set symbol
            engine.SetSymbol(uniquex, vector);
            // perform calculation
            string executionString = uniquer + "<-" + RFunctionName +
                                     "(" + uniquex + "," + WindowSizeInSeconds + "," + NoOfSegments + ")";
            double result;
                result = engine.Evaluate(executionString).AsNumeric()[0];
            catch (Exception ex)
                result = Double.NaN;
            return result;
        private AFValues CreateVector(DateTime endTime)
            var timeRange = new AFTimeRange(endTime - TimeSpan.FromSeconds(WindowSizeInSeconds), endTime);
            AFTimeSpan span = new AFTimeSpan(TimeSpan.FromSeconds(timeRange.Span.TotalSeconds / NoOfSegments));
            return SourceAttribute.Data.InterpolatedValues(timeRange, span, null, "", true);

s prett}y much follows the examples here: Basic types with R.NET  | R.NET -- user version

Since R is single threaded and different instances share the same variable space, I would recommend to make the R variables unique. I measured the execution time from .NET and it took ~ 1 ms. This of course depends on the type of calculation you perform. There is also some overhead on the PI side when requesting interpolated values.


Next we need to define the GetValue and GetValues methods:

        public override AFValue GetValue(object context, object timeContext, AFAttributeList inputAttributes,
            AFValues inputValues)
            var currentContext = context as AFDataReferenceContext?;
            var endTime = ((AFTime?)timeContext)?.LocalTime ?? DateTime.Now;
            // get the function result from R
            var values = CreateVector(endTime);
            return new AFValue(null, ExecuteRFunction(values), endTime);
        public override AFValues GetValues(object context, AFTimeRange timeRange, int numberOfValues,
            AFAttributeList inputAttributes, AFValues[] inputValues)
            AFValues values = new AFValues();
            DateTime startTime = timeRange.StartTime.LocalTime;
            DateTime endTime = timeRange.EndTime.LocalTime;
            // loop through the timeRange
            double span = (endTime - startTime).TotalSeconds;
            for (var index = 0; index < numberOfValues; index++)
                var tmpValues = CreateVector(startTime + TimeSpan.FromSeconds(index * span));
                values.Add(new AFValue(null, ExecuteRFunction(tmpValues), endTime));
            return values;


Then we just populate the data methods using

and create the data pipe using Daphne Ng code.


So now we have custom data reference that can execute a function that takes the following inputs: x,WindowSizeInSeconds and NoOfSegments

In R we can develop the code for the linear regression, which is basically just calling the lm-function. I believe this is included ion the standard installation. Since for this example we are only interested in the slope the R wrapper code looks as follows:


regression <- function(x,WindowSizeInSeconds,NoOfSegments) {


After registration, we can use the CDR in AF Analysis, which provides the all the plumbing to call the CDR based on point updates.




Here is the result of 10 min average a linear regression with a 10 min window of a fast moving 1h sinusoid:


The more I read about R, the more interested I get. It really provides solutions for a wide range of problems. The R packages are well documented, have example to get you started and I found that the creators and authors are very approachable . There is a learning curve especially if you want to create some custom scripts, but at large you get more work out of it than you have to invest.


For PI user the most interesting packages are the time series applications. I can recommend the following: by Prof Rob Hyndman from TU Dortmund



One of the challenges is that R time series packages are mostly designed for homogeneous time series (identical point to point distance), but PI due to the compression algorithm stores heterogenous data series (varying point-to point distance). So, in almost all cases the data have to be re- and down sampled.


The following are common steps for down sampling

Data cleansing, outlier removal
Moving average
Resampling at lower rate

This process can be very resource intense, especially when applied in real-time. One approach is to use a set of algorithms based on the exponential moving average algorithm. The following is an excellent read:

And here are some R code examples:


The last package you need is a R data access package – I attached ROSIsoft to the post.


Quick primer on ROSIsoft


The library is built using the rClr package and a wrapper dll. The wrapper dll is necessary to do the plumbing between .NET data and basic R types. I simplified the AF data models to make them more compatible with R.


Installation is done manually. In RStudio select Tools\Install packages … and when the open dialog opens, change the option “Install from:” to           “Packaged Archive File”


After the installation the library is loaded with: library(ROSIsoft)

(If you are missing a library or package, the process is always the same)


To connect to AF and PI server use first: AFSetup()


This will also install the rClr package, which is included in the ROSIsoft package.


All functions are documented in the help file, although I have to spend some more time on it. To connect to the PI server use the following:

 connector<-ConnectToPIWithPrompt("<PI Server>")


connector<-ConnectToAFWithPrompt("<AF Server>","AFDatabase")


The connector object contains information about the PI and AF server as well as their connection states. It’s also the only object that needs to be initiated, all other methods are static.


To get values just use the GetPIPointValues() function. It requires a retrieval type constant as string, which can be looked up with the GetRetrievalTypes() function.

To get some recorded values for the sinusoid (sinusoid1H is a faster moving sinusoid for testing) is then straightforward:


values <- GetPIPointValues("sinusoid1H","T+8H","T+10h","recorded",10)


Plotting requires the xts package to convert the string datetime into a R time object.




which produces the following plot:

As I mentioned above, most R packages require homogeneous time series. Since I didn't find all the functions in R I added a couple of real time operators and also exception\compression functions:


     ApplyCompression: to apply different exception\compression settings to the time series

     CalculateEMA: calculate realtime exponential moving average

     CalculateMA: calculate realtime moving average
     CalculateMSD: calculate realtime moving standard deviation average

     CalculateZScore: calculate realtime moving zscore average; helpful for outlier detection and removal

     CalculateOutlier: outlier removal based on zscore


I will provide some data sets in upcoming posts that are a good starting point. Here is an example of using the ApplyCompression function on the same time series:




and then the plot:



New version as of 08/06/2017