7 Replies Latest reply on Apr 3, 2018 1:29 AM by natdavidson

    AF Datapipes and PI Datapipes

    natdavidson

      I have and AF hierarchy with elements that have attributes that run some analysis to determine if a parent attribute should be a 1 or a 0. I wanted to monitor the change of these parent attributes' changes from 0 to 1, and thought an AF Datapipe would be the best way to go, but ran into some memory problems when i got all the attributes added (About 13.5k), probably due to the 20 or so calculations (from its' children attributes) that this attribute depends on. We fixed the memory problem by making the attribute a PI Point data reference, but now we've either exposed a new or existing problem. Since this attribute relies on other of it's children attributes doing analyses, and itself is an analysis that outputs to a PI Point, the AF Datapipe i was using is seeing the updates to the attribute as updates, so every attribute is being sent to the AF Datapipe, regardless of value change, just because the attribute was updated. Now i'm exploring the possibility that this might be a better case for a PI Datapipe, and have written some test code, but i seem to be getting the same results; every PI Point is getting sent to the Datapipe because it's being updated. Could someone help me find a way to only get updates on value changes (i.e. when the value changes, not even an archive updated). I'll provide my code below, i tried to generalize it, it's a little rough.

       

                  string MyDocumentsPath = Environment.GetFolderPath(Environment.SpecialFolder.MyDocuments);
                  string loggingFilePath = MyDocumentsPath + @"\Logging\";
                  string loggingFileName = "";
                  string logfile = "";
                  PISystems myPISystems = new PISystems();
                  PISystem myPISystem = myPISystems["AFSystem"];
                  PIServers myPIServers = new PIServers();
                  PIServer myPIServer = myPIServer["PIServer"];
      
      
                  AFDatabase myDB;
                  AFNamedCollection<AFAttribute> colAttributes;
                  colAttributes = null;
                  AFNamedCollection<AFElement> colCityElements;
                  myDB = ebayPISystem.Databases["DATABASE"];
                  AFDataPipe myAFDP = new AFDataPipe();
                  List<string> PointsNames = new List<string>();
      
      
                  colAttributes = AFAttribute.FindElementAttributes(
                      myDB,
                      null,
                      null,
                      null,
                      null,
                      AFElementType.Any,
                      "Template1",
                      null,
                      TypeCode.Empty,
                      true,
                      AFSortField.Name,
                      AFSortOrder.Ascending,
                      20000);
      
      
          //PI Datapipe experiment
                  foreach (AFAttribute att in colAttributes)
                  {
                      try {PointsNames.Add(att.PIPoint.Name); } //Try block because some points can't be built yet, but still appear in search
                      catch { };                
                  }
                  IList<PIPoint> attributePoints = PIPoint.FindPIPoints(myPIServer, PointsNames, null);
                  List<AFDataPipeEvent> pipeResults = new List<AFDataPipeEvent>();
                  using (PIDataPipe myPIDP = new PIDataPipe(AFDataPipeType.Archive))
                  {
      int teststop3 = 0;
                      myPIDP.AddSignups(attributePoints);
                      while (!(Console.KeyAvailable && Console.ReadKey(true).Key == ConsoleKey.Escape))
                      {                    
                          AFListResults<PIPoint, AFDataPipeEvent> myResults = myPIDP.GetUpdateEvents(14000);
                          if (myResults.Results.Count > 0)
                          {
                              foreach (var item in myResults.Results)
                              {
                                  if(item.PreviousEventAction.ToString() == item.Action.ToString())
                                  {
                                      teststop3++;
                                  }
                              }
      
      
                          }
                      }
                  }
      
      
          //Pre-PI Datapipe experiment
                  myAFDP.AddSignups(colAttributes);
                  AFListResults<AFAttribute, AFDataPipeEvent> updateResults;
                  while (!(Console.KeyAvailable && Console.ReadKey(true).Key == ConsoleKey.Escape))
                  {
                      updateResults = myAFDP.GetUpdateEvents();
                      if (updateResults.Results.Count > 0)
                      {
                          CustomClass VerfiyResult = new CustomClass();
      
      
                          foreach (AFDataPipeEvent item in updateResults)
                          {
                              AFAttribute itemAttribute = item.Value.Attribute;
                              if (itemAttribute.Parent == null)
                                  //Log result
                              else
                                  //Log different result
                              VerfiyResult.VerifyValues(itemAttribute, logfile, true);                     
                          }
                      }
                      else
                          Console.WriteLine("AFDataPipe was empty. " + updateResults.Results.Count + " results.");
      
        • Re: AF Datapipes and PI Datapipes
          John Messinger

          Hi Nate,

           

          When using AFDataPipe, I prefer to use the IObserver pattern, as it will provide a significant performance improvements. There is a very simple example implementation of this available at How to use the PIDataPipe or the AFDataPipe. Bear in mind that this is a simple example, and uses some hard-coded objects to demonstrate the basics. Having said that, my own implementations might look a little like the following:

           

          public class SnapshotObserver : IObserver<AFDataPipeEvent>
              {
                  private AFListResults<AFAttribute, AFValue> _snapshotValues = new AFListResults<AFAttribute, AFValue>();
          
                  public List<AFValue> GetDataPipeData()
                  {
                      var values = _snapshotValues.Results.Take(_snapshotValues.Results.Count).ToList();
                      _snapshotValues.Results.Clear();
                      return values.ToList();
                  }
                  public void OnCompleted()
                  {
                      
                  }
          
                  public void OnError(Exception error)
                  {
                      
                  }
          
                  public void OnNext(AFDataPipeEvent dataPipeEvent)
                  {
                      if (dataPipeEvent.Action == AFDataPipeAction.Add || dataPipeEvent.Action == AFDataPipeAction.Update) 
                      {
                          _snapshotValues.AddResult(dataPipeEvent.Value);
                      }
                  }
              }
          
          public class DataPipe : IDisposable
              {
                  private readonly AFDataPipe _dataPipe = new AFDataPipe();
                  private readonly SnapshotObserver _observer = new SnapshotObserver();
          
                  public DataPipe(IList<AFAttribute> attributesToSubscribe)
                  {
                      _dataPipe.Subscribe(_observer);
                      var signupResult = _dataPipe.AddSignups(attributesToSubscribe);
                  }
          
          
                  public OperationResult<List<AFValue>> CheckForNewData()
                  {
                      try
                      {
                          bool bMoreEvents = true;
                          while (bMoreEvents)
                          {
                              _dataPipe.GetObserverEvents(out bMoreEvents);
                          }
          
                          return OperationResult<List<AFValue>>.CreateSuccessResult(_observer.GetDataPipeData());
                      }
                      catch (Exception e)
                      {
                          return OperationResult<List<AFValue>>.CreateFailure(e);
                      }
                  }
          
          
                  public void Dispose()
                  {
                      _dataPipe.Dispose();
                  }
              }
          

           

          Note in the OnNext() method of the SnapshotObserver class that we use the AFDataPipeAction enumeration to determine what type of updates we are interested in adding to the pipe for retrieval by the client caller.

           

          Reading your post, I'm a little confused about what exactly you are trying to do - you are monitoring for changes to the value of attributes in the parents of a set of elements where analyses are running and outputting the result to the parents elements, but this is causing issues because you are seeing all updates?

          2 of 2 people found this helpful
          • Re: AF Datapipes and PI Datapipes
            yyang

            Hello Nate,

             

            If we understand you correctly, the key thing you want to do is to detect: when the attribute (step function) change its value from 0 to 1. So it seems that you need to use something (pseudocode) like this:

             

            If PrevVal('YourAttribute','*') = 0 and TagVal('YourAttribute','*') = 1 then ...

             

            to detect the change.

             

            If we are correct, maybe you can simply use PI Analysis in PSE to do that to save time?

             

            If you have to write codes to do that, you may also just add similar if statements to your code?

              • Re: AF Datapipes and PI Datapipes
                natdavidson

                Yes, we want to detect a change from 0 to 1, but any change in the value would be fine. Right now, we're getting all changes, because the timestamp is getting updated.

                 

                The pseudocode that you suggested looks like its more PI Datapipe based? That could be a solution. I did try a test attribute that just referenced the attribute that is the PI tag data reference, but i got the same issue, because it sees that attribute get an update, because of the timestamp issue.

                 

                There's a lot i can do in the code to get around of all of these issues, i'm just trying to make sure that i understand if there's a way that i could improve my interaction with the data from the datapipe, or possibly some small change in the attribute itself.

              • Re: AF Datapipes and PI Datapipes
                dmoler

                Hi Nate,

                 

                When you get these new events, is the timestamp on the AFValue updated? Is this the output of an event-triggered analysis you are observing? I suspect that you are observing the actual value updates that analysis service is sending to PI Data Archive. Analytics does not do exception reporting so these events are all sent and will be detected in a snapshot or time series data pipe. You could modify the analysis or another analysis to filter duplicate values.

                  • Re: AF Datapipes and PI Datapipes
                    natdavidson

                    The timestamp is updated, which is why i think i'm seeing all the attributes come through the data pipe. Setting up the analysis to filter the duplicate values would possibly fix my problem. If you could point me in the right direction, that would be appreciated.

                      • Re: AF Datapipes and PI Datapipes
                        dmoler

                        To decide where to fix this, you should think about how you want your data streams to behave. If it is valuable to have the snapshot value updated for the output (e.g. to know that it has updated recently) then you could leave it as is and filter on the client side. If you want to consume a filtered stream of data, you could create a new analysis like this:

                         

                        If(DeltaValue(var, TRUE) = 0) THEN NoOutput() Else(var)

                         

                        which will only output a value when the variable "var" has changed from the last evaluation. You could also use that as a filter for your output in the original analysis if you don't want to send those updates.

                        2 of 2 people found this helpful