8 Replies Latest reply on Jun 14, 2011 10:15 PM by Ahmad Fattahi

    Using PI Adaptors for StreamInsight




      I am a biginner in PI and streaminsight, I have created a module using ACE whitin which I need to compare the avrage  value of a tag for last 24 hours.  doing it with ACE required a simple code like :


      If Flow1.Avg("-24h", "*", 0) - Flow1.Value > Flow1.Avg("-24h", "*", 0) * 0.1 Then
                  FlowOut.Value = "Bad Data"




      I just wonder if it is possible to do the same with StreamInsight ?!

        • Re: Using PI Adaptors for StreamInsight
          Ahmad Fattahi



          Thanks for bringing up the question about the new OSIsoft product. It indeed features great functionalities to handle Complex Event Processing (CEP). The answer to your question is yes. These types of functionalities are very well handled by PI for StreamInsight. However, you would want to be aware of the differences between PI ACE and PI for StreamInsight as well as their use cases. PI ACE gives you a powerful library to access PI Points in a .NET programmatic environment. It also gives you out-of-the-box access to the PI Points which is very easy to start.


          PI for StreamInsight on the other hand, is extremely powerful in handling large streams of data. It is capable of reading many data streams off PI Snapshot, perform analytical operations on them, and write them back to PI Archives. Microsoft StreamInsight Server handles the queries for you. Another distinction is that it is based on LINQ (Language Integrated Queries); so, if you are more of a query-based language programmer, this may be a plus for you.


          To do what you just described in PI for StreamInsight, in summary we need to configure input and output adapters first. Here are examples:



          SnapshotInputConfig inputConfig = new SnapshotInputConfig();
          inputConfig.Server = inputPIServer;
          inputConfig.PointsQuery = new string[] { inputTag };


          SnapshotOutputConfig outputConfig =
          new SnapshotOutputConfig();
          outputConfig.Server = outputPIServer;



          We would then define the Input Stream. Here is an example:

          var input = CepStream>.Create("PI Input Stream",
          typeof(SnapshotInputFactory), inputConfig, EventShape.Point,

          You can also perform analytical calculations such as averaging by defining a Window (time interval) of certain length and using the Avg method on it. Typical cases include bringing and handling tens of PI tags and work with them; PI for StreamInsight is really powerful in handling these large number of data streams.


          To see more on all these topics and how to configure adapters, define streams, and do calculations, please refer to "PI for StreamInsight User Guide" and "PI for StreamInsight Reference" in vCampus Library with lots of examples and sample codes. PI for StreamInsight install kit also comes with a few sample projects you may want to double check. You can also watch this vCampus-exclusive Webinar for more discussion and explanation. Also, we are planning to have a brand new Webinar on the exact same topic toward the end of July (currently scheduled for July 27). Stay tuned



          • Re: Using PI Adaptors for StreamInsight

            Hi Reyhaneh


            If you’re comparing values over periods like 24-hours, you’ll need windows to bound those periods.  Let’s begin by generating a stream of 24-hour or daily averages using a tumbling window.  Assuming we had a stream of raw events, rawEventStream, we could


            var averageStream = from e in rawEventStream


                               group e by e.Path into eachGroup


                               from window in eachGroup.TumblingWindow(TimeSpan.FromHours(24),






                               select new PIEventBasic<Double>




                                   Path = eachGroup.Key,


                                   Value = window.Avg(e => e.Value),


                                   Id = 0,


                                   Status = 0,




            This results in a stream of events of “interval” shape starting at the beginning of a day and ending at the end of the day.  I use the PIEventBasic<Double> so that the event payload is the same as the raw stream so that I can join the two streams by Path.  This way we can process a bunch of tags with a single query


            var badEventSream = from avg in averageStream


                               join raw in rawEventStream


                               on avg.Path equals raw.Path


                               where avg.Value - raw.Value > avg.Value * 0.01


                               select new




                                   Path = raw.Path.Replace("Flow", "FlowOut"),


                                   Value = "Bad Data",




            The result is a stream of events whose Path is converted from “FLOWn” to “FLOWOUTn” and whose value is “Bad Data” whenever the results of the comparison are true.  The resulting stream contains only those events determined to be bad and their time stamps match the raw events.


            If this doesn’t make sense or you need more details, let me know.

              • Re: Using PI Adaptors for StreamInsight
                Ahmad Fattahi

                Sorry my initial attempt to post got butchered for some reason. If you are reading the posts in email, you may want to double check my original post

                • Re: Using PI Adaptors for StreamInsight



                  We already discussed this via email, but I want to share the information with the vCampus audience. Erwin already solved most of the problem, here are some further aspects. It seems that in your expression you want to compare each value with the average over the last 24h, seen from that value. That's a sliding window, implemented with the Snapshot operator. Assuming that you have a set of tags imported into StreamInsight, you would compute the average for each of them separately as follows (Almost the same as Erwin's, just a different window):


                  // compute the 24h average for each Id separately


                  var avgStream = from e in input


                                  group e by e.Id into g


                                  from win in g


                                            .AlterEventDuration(e => TimeSpan.FromHours(24))




                                  select new { Path = g.Key, Avg = win.Avg(e => e.Value) };


                  Now you can express your check by comparing each original event with the currently valid average - in StreamInsight terms, this is a join:


                  // for each tag, compare the average with the real-time value


                  var check = from a in avgStream


                              join b in input


                              on a.Path equals b.Path


                              select new { a.Path, Check = Math.Abs(b.Value - a.Avg) > a.Avg * 0.1 };


                  Where Erwin used a filter, I am actually keeping all the results, whether the condition is true or false. I use a filter a little later, when I am correlating across all my tags.
                  The check expression will give you a series of point events, with the same timestamps as the original input events. Hence before we can compare between tags, we need to make sure that we have a continuous signal, with a value at each point in time:


                  // turn the result into a continuous signal for each tag:


                  var checkSignal = check


                                        .AlterEventDuration(e => TimeSpan.MaxValue)


                                        .ClipEventDuration(check, (e1, e2) => e1.Path == e2.Path);


                  We can now examine how many of our input tags did fulfill the condition (here is the filter for the check value) at each point in time:


                  // compute the periods where condition is met over all tags:


                  var result = from win in checkSignal


                                               .Where(e => e.Check)




                               select new PIEventBasic<bool>




                                   Path = "Flow2",


                                   Value = win.Count() >= 2,


                                   Id = 0,


                                   Status = 0,



                  In your example, you looked at two tags, flow0 and flow1, so if the count result is 2, your overall condition is met and "true" is written to the output tag.


                  The nice thing about this query is that it doesn't matter whether you are looking at a single tag or are comparing 2, 3, or more tags - you simply change the last count comparison.



                    • Re: Using PI Adaptors for StreamInsight
                      Ahmad Fattahi

                      Thanks Roman and Erwin for the great suggestions and keeping this thread up to date. I am wondering how the memory foot print of large queries with PI for StreamInsight would be. Since everything is kept in memory what we get is very superior speed and performance. On the flip side, if we try to come up with average of a very active tag over 24 hours and do this for many tags, wouldn't that consume a lot of memory space?


                      All I am trying to clarify is the use cases and considerations for different analytical purposes. Thanks for the great discussion!