7 Replies Latest reply on Jun 22, 2017 1:21 PM by Rick Davin

    Custom Data Reference - How To Implement Data Pipe?

    MikeRay

      Hi There,

       

      I am working on a custom data reference which consumes web services from a proprietary source data system. I have it up and running in AF Analyses which can even call it as part of a back-fill calculation but only on periodic scheduling. Now I would like it to support natural (event driven) scheduling in AF Analyses. I know I need to have it support AFDataMethods.DataPipe as one of the SupportedDataMethods, however I'm sure there is more to it than that, in terms of providing a mechanism that sends data down the data pipe. I have added this to the list of the SupportedDataMethods and I can check it as part of the event driven schedule dialog but the analysis won't run saying "Did not find any inputs supporting data pipe." Are there any guides on how to do this please?

       

      Any assistance greatly appreciated.

       

      Cheers,

       

      Mike.

        • Re: Custom Data Reference - How To Implement Data Pipe?
          dng

          Hi Mike,

           

          There are two types of data source for the AFDataPipe: calculation or system of record. AFDataPipe considers a data reference a calculation data source if it has one or more inputs. For a calculation data reference to support data pipe, it must have at least one input supporting DataPipe.

           

          For a data reference plugin to support AFDataPipe:

          • If the data reference is not system of record and depends on AFSDK to get the inputs data to the data reference, AFSDK can handle getting the inputs and call AFDataReference.GetValue to compute the output for the data pipe event.
          • If the data reference is system of record, then it can implement a derived class of AFEventSource and expose a static CreateDataPipe method in the plugin to return this derived AFEventSource.

           

          Which type of data source does your data reference use? Does it satisfy the above requirements?

            • Re: Custom Data Reference - How To Implement Data Pipe?
              MikeRay

              Hi Daphne, thanks for the reply.


              This will be a system of record. There is an input attribute which is an external ID that is passed to the web service to identify the data stream that it returns. That external ID attribute is static (no time-stamp) from a table look up and so therefore not a candidate for its own data pipe.


              If that fits the bill for the SoR AFDataPipe, then do I still need to derive my custom DR from AFDataReference, but also make it implement the AFEventSource interface? .NET does not do multiple inheritance so I cant derive from both base classes?


              Thanks,


              Mike.

                • Re: Custom Data Reference - How To Implement Data Pipe?
                  dng

                  Just as I was typing a comment, Rhys beat me to it! Yes, I wasn't clear on my initial description. There are two things you need to do:

                  • Implement AFEventSource with AddSignup, RemoveSignup, GetEvents() and Dispose(bool).
                  • Add a static method, CreateDataPipe, in your implementation of AFDataReference to return an AFEventSource object that you have implemented (see Rhys' reply below).

                   

                  EDIT (more information): AFEventSource is a public abstract class providing a basis for data reference developers to implement the Data Pipe feature. The base class will handle all the interaction with AF SDK pipe modules, and implementer does not have to worry about interfacing with different data sinks. I encourage you to check out the AF SDK Reference Guide for more information about the AFEventSource class.

                   

                  Hope this is clearer. Let me know if you have any questions!

                    • Re: Custom Data Reference - How To Implement Data Pipe?
                      MikeRay

                      Rhys, Daphne, thanks very much for the replies and information, I can move forward with this and give it a try! Will let you know how I get on.

                       

                      Regards,

                       

                      Mike.

                        • Re: Custom Data Reference - How To Implement Data Pipe?
                          dng

                          Hi Michael,

                           

                          I found some examples on the implementation of AFEventSource. This simple implementation should get your started:

                           

                                  private class DirectEventPipe : AFEventSource
                                  {
                                      // Storing the timestamp of the last values retrieved by the data pipe for each attribute
                                      Dictionary<AFAttribute, AFTime> _lastTimes = new Dictionary<AFAttribute, AFTime>();
                                      // The time when the call is made
                                      DateTime _startTime;
                          
                                      public DirectEventPipe()
                                          : base()
                                      {
                                          this._startTime = DateTime.UtcNow;
                                      }
                          
                                      // Get new events for each signed up attribute between the last timestamp till evaluation time
                                      protected override bool GetEvents()
                                      {
                                          AFTime evalTime = AFTime.Now;
                                          IEnumerable<AFAttribute> signupList = base.Signups;
                                          foreach (AFAttribute att in signupList)
                                          {
                                              if (!ReferenceEquals(att, null))
                                              {
                                                  if (!_lastTimes.ContainsKey(att))
                                                  {
                                                      _lastTimes.Add(att, this._startTime);
                                                  }
                                                  AFTimeRange timeRange = new AFTimeRange(_lastTimes[att], evalTime);
                                                  AFValues vals = att.Data.RecordedValues(timeRange, AFBoundaryType.Inside, att.DefaultUOM, null, false);
                                                  AFTime lastTime = _lastTimes[att];
                                                  foreach (AFValue val in vals)
                                                  {
                                                      if (val.Timestamp > lastTime) 
                                                      {
                                                          _lastTimes[att] = val.Timestamp + TimeSpan.FromTicks(1); // This ensures that the next RecordedValues call do not return the same value
                                                      }
                                                      AFDataPipeEvent ev = new AFDataPipeEvent(AFDataPipeAction.Add, val);
                                                      base.PublishEvent(att, ev);
                                                  }
                                              }
                                          }
                                          return false;
                                      }
                          
                                      // Disposing resources
                                      protected override void Dispose(bool disposing)
                                      {
                                          _lastTimes = null;
                                      }
                                  }
                          

                           

                          Note that in the above example, we are looping through each AFAttribute and making a single RecordedValues call. If you have implemented RDA methods and your data source supports bulk calls, you can do a bulk call on AFAttributeList in the GetEvents method. The downside is that you will be using the same time range for your bulk call, which may not apply for your data reference. If you want to keep track of the list of AFAttributes signed up for the data pipe, you can implement AddSignUp and RemoveSignUp as well.

                           

                          Since documentation on this is limited, I am going to provide a full example in the main PI Developers Club space after I clean up some code. Let me know if you have any questions so far!

                  • Re: Custom Data Reference - How To Implement Data Pipe?
                    Rhys Kirk

                    Hi Mike!

                     

                    You have to expose a static method so that Abacus knows you've implemented the DataPipe.

                     

                    static public object CreateDataPipe()

                    {

                        //return my pipe

                    }


                    Note, the object returned needs to be a derived class of AFEventSource and provide the needed overrides of AFEventSource.
                    Last time I worked on this it was very much work in progress from the AF developers so hopefully this will prompt them to provide an update. There was talk of providing a more simplified way for developers to implement this functionality.

                    The other thing to remember is that although it is classed as Event Driven, the events are processed every 250ms by the Analysis Service so you shouldn't expect the events to be processed as soon as they're pushed down a pipe.


                    1 of 1 people found this helpful
                      • Re: Custom Data Reference - How To Implement Data Pipe?
                        Rick Davin

                        Good tips all around.  Implementing a data pipe is one thing.  Having that data pipe trigger an Analysis is another.  Any triggering, be it event-triggered, periodic, or back-filling, is done under the NT Service\PIAnalysisManager account.  If your CDR relies on a system external to AF, then you must be sure that the PIAnalysisManager account has permissions to read dataIf it can't read data, it can't be triggered.  For example, if your CDR connects to SQL Server to provide data to the pipe, you must use SQL Server Management Studio to assign Read permissions to NT Service\PIAnalysisManager.  Don't be fooled or confused as to why you can see data in PSE and even Evaluate in Analytics.  PSE and Evaluate do so under your logged in Windows credentials.