AnsweredAssumed Answered

AFDataReference datapipe not showing up

Question asked by ezanstra on Feb 25, 2020
Latest reply on Feb 27, 2020 by ezanstra

Hello all,

 

I'm experimenting with AFSDK, and am trying to implement a datareference, plus a datapipe for live triggering of analyses etc.

For now, I'm implementing a simple cosinus-over-time data generator. I can get the plugin installed, the getValue(s) and interpolatedValue functions work as expected in AF, but the datapipe is not showing up as a supported method in AF explorer.

Does anybody have experience with live data ingestion in AF and point us in the right direction?

The source of the datareference I've implemented is as follows:

using OSIsoft.AF.Asset;
using System;
using System.ComponentModel;
using System.Runtime.InteropServices;
using OSIsoft.AF.Time;
using System.Collections.Generic;
using OSIsoft.AF.Data;
using System.Linq;
using FluentScheduler;
using System.Collections.Concurrent;

namespace AFDataSourceTest
{
    [Serializable]
    [Guid("5D31BE7A-C7C1-4AE8-9677-30A7C24B435D")]
    [Description("Cosinus; generate cosinus values")]
    public class CosinusDataSource : AFDataReference
    {
        public static readonly string cosinusTimespanParam = "cosinusTimespan", intervalParam = "interval";
        public static readonly char defaultParamSeperator = ';', defaultAttributeSeperator = '=';

        public override string ConfigString
        {
            get
            {
                return cosinusTimespanParam + defaultAttributeSeperator + CosinusTimespan
                    + defaultParamSeperator +
                    intervalParam + defaultAttributeSeperator + Interval;
            }
            set
            {
                if (value != null)
                {
                    Dictionary<string, string> attributes = getAttributes(value);
                    CosinusTimespan = long.Parse(attributes[cosinusTimespanParam]);
                    Interval = int.Parse(attributes[intervalParam]);
                    SaveConfigChanges();
                }
            }
        }

        public int Interval { get; private set; }

        public long CosinusTimespan { get; private set; }

        public override AFDataReferenceMethod SupportedMethods
        {
            get
            {
                return AFDataReferenceMethod.GetValue | AFDataReferenceMethod.GetValues;
            }
        }
        public override AFDataMethods SupportedDataMethods
        {
            get
            {
                return AFDataMethods.DataPipe | AFDataMethods.InterpolatedValue;
            }
        }

        public override AFDataReferenceContext SupportedContexts
        {
            get
            {
                return AFDataReferenceContext.Time;
            }
        }
        private Dictionary<string, string> getAttributes(String cfg)
        {
            return getAttributes(cfg, defaultParamSeperator, defaultAttributeSeperator);
        }

        private Dictionary<string, string> getAttributes(String cfg, char parameterSeperator, char attributeSeperator)
        {
            Dictionary<string, string> result = new Dictionary<string, string>();
            string[] paramAttrs = cfg.Split(parameterSeperator);
            foreach (string paramAttr in paramAttrs)
            {
                string[] paramAttrSplit = paramAttr.Split(attributeSeperator);
                result[paramAttrSplit[0]] = paramAttrSplit[1];
            }
            return result;
        }

        public override AFValue InterpolatedValue(AFTime time, AFAttributeList inputAttributes, AFValues inputValues)
        {
            AFValue result = new AFValue();
            long epochNow = (new DateTimeOffset(time.LocalTime)).ToUnixTimeMilliseconds();
            result.Timestamp = time;
            result.Value = getTimeValue(epochNow);
            return result;
        }

        public override AFValue GetValue(object context, object timeContext, AFAttributeList inputAttributes, AFValues inputValues)
        {
            AFValue result = new AFValue();
            long epochNow = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
            long epochNowIntervalRounded = (epochNow / Interval) * Interval;
            result.Attribute = this.Attribute;
            result.Timestamp = DateTimeOffset.FromUnixTimeMilliseconds(epochNowIntervalRounded).UtcDateTime;
            result.Value = getTimeValue(epochNowIntervalRounded);
            return result;
        }

        public override AFValues GetValues(object context, AFTimeRange timeRange, int numberOfValues, AFAttributeList inputAttributes, AFValues[] inputValues)
        {
            AFValues result = new AFValues();
            DateTime startTime = timeRange.StartTime.LocalTime;
            DateTime endTime = timeRange.EndTime.LocalTime;
            long startTimeTs = (new DateTimeOffset(startTime)).ToUnixTimeMilliseconds();
            long endTimeTs = (new DateTimeOffset(endTime)).ToUnixTimeMilliseconds();
            if (startTimeTs % Interval > 0)
            {
                startTimeTs += (Interval - (startTimeTs % Interval));
            }
            long loopTs = startTimeTs;
            while (loopTs <= endTimeTs)
            {
                AFValue resultItem = new AFValue();
                resultItem.Timestamp = DateTimeOffset.FromUnixTimeMilliseconds(loopTs).UtcDateTime;
                resultItem.Value = getTimeValue(loopTs);
                result.Add(resultItem);
                loopTs += Interval;
            }

            return result;
        }

        private double getTimeValue(long timestamp)
        {
            double partOfTimespan = timestamp % CosinusTimespan;
            double result = Math.Cos(Math.PI * 2 * (partOfTimespan / CosinusTimespan));
            return result;
        }

        public static object CreateDataPipe()
        {
            CosinusValueEventGenerator pipe = new CosinusValueEventGenerator();
            return pipe;
        }


        public class CosinusValueEventGenerator : AFEventSource
        {
            protected override void Dispose(bool disposing)
            {
                foreach(Schedule schedule in schedulers.Values)
                {
                    schedule.Disable();
                }

            }
            private Dictionary<int, IList<CosinusDataSource>> intervalDatasources = new Dictionary<int, IList<CosinusDataSource>>();
            private Dictionary<int, Schedule> schedulers = new Dictionary<int, Schedule>();

            protected override void AddSignups(IList<AFAttribute> streams, bool getInitialEvents)
            {
                base.AddSignups(streams, getInitialEvents);
                var cosinusDataSources = (from attr in streams where attr.DataReference.GetType() == typeof(CosinusDataSource) select (CosinusDataSource)attr.DataReference);
                foreach (CosinusDataSource cosinusDataSource in cosinusDataSources)
                {
                    if (!intervalDatasources.ContainsKey(cosinusDataSource.Interval)) {
                        intervalDatasources[cosinusDataSource.Interval] = new List<CosinusDataSource>();
                        schedulers[cosinusDataSource.Interval] = schedule(cosinusDataSource.Interval);
                    }
                    intervalDatasources[cosinusDataSource.Interval].Add(cosinusDataSource);
                }
               
            }
            ConcurrentQueue<AFValue> afUpdateQueue = new ConcurrentQueue<AFValue>();

            private Schedule schedule(int interval)
            {
                Schedule result = (new Schedule(() => { run(interval); }));
                result.ToRunEvery(interval).Milliseconds();
                return result;
            }

            private void run(int interval)
            {
                foreach(CosinusDataSource cosinusDataSource in intervalDatasources[interval])
                {
                    AFValue afValue = cosinusDataSource.GetValue(null, null, null, null);
                    afUpdateQueue.Enqueue(afValue);
                }
            }


            protected override bool GetEvents()
            {
                AFValue afValue = null;
                while (afUpdateQueue.TryDequeue(out afValue))
                {
                    AFDataPipeEvent ev = new AFDataPipeEvent(AFDataPipeAction.Add, afValue);
                    base.PublishEvent(afValue.Attribute, ev);
                }

                return false;
            }
        }
    }
}

 

Thanks in advance,

 

Eyso

Outcomes