RJKSolutions

Sporadic staleness after stream join

Discussion created by RJKSolutions on Aug 13, 2012
Latest reply on Aug 14, 2012 by RJKSolutions

I have an interesting but equally frustrating problem that I cannot pinpoint.  At first it doesn't seem to be a problem with the PI StreamInsight adapters but you never know...

 

I can re-create the problem with a single PI Point, one that has data generated by the PI Random Simulator every 1 second and has no exception or compression settings.  It gets a new value updated every second, no problem.  If I create a simple query to read the value and output to the console it works just fine and I see every single value.  For example:

 

AdvanceTimeSettings inputPointStreamConfig = new AdvanceTimeSettings

 

                (

 

                   new AdvanceTimeGenerationSettings(TimeSpan.FromMilliseconds(1000), TimeSpan.FromMilliseconds(0), true),

 

                   null,

 

                   AdvanceTimePolicy.Drop

 

                );

 

                CepStream<PIEvent<Double>> inputPointStream = CepStream<PIEvent<Double>>.Create

 

                    (

 

                        "PI Point Input Stream",

 

                        typeof(SnapshotInputFactory),

 

                        inputPointsConfig,

 

                        EventShape.Point,

 

                        inputPointStreamConfig

 

                    );

 

                var inputSignalStreamTracerQuery = inputPointStream.ToQuery

 

                    (

 

                        streaminsightApp,

 

                        "PI Point Tracer Output Stream",

 

                        string.Empty,

 

                        typeof(TracerFactory),

 

                        new TracerConfig

 

                        {

 

                            DisplayCtiEvents = true,

 

                            SingleLine = true,

 

                            TraceType = TraceTypeValue.Console

 

                        },

 

                        EventShape.Point,

 

                        StreamEventOrder.FullyOrdered

 

                    );

 

Now what I really want is to push a value to the output adapter based on a wall clock so I need to inject some wall clock CTIs, which will be imported by the stream created from the PI adapters.  To generate the CTIs I use an observable timer class that generates some dummy events to force the push to the output on each timer interval.  In theory should work but in practice is works for a a few seconds then the output is repeatedly the same (stale) then it comes back to life again, then goes stale, ...  There is no pattern to the staleness, it seems to be sporadic.  The period of staleness also varies is length.  My immediate thought was the PI Adapters were not delivering the data, but I can run the simple query above alongside the joined query below and the simple query processes all results during the stale periods of the joined query.  I have tried numerous tests but always end up with stale periods of data so I am at a loss, it must a behaviour of StreamInsight I cannot see.  Any thoughts?

 

As a test to rule out the importing of CTIs to the PI Adapters being the issue I created 2 matching streams from the PI Adapters, imported CTIs from one of them to the other and I don't get the staleness issue.  

 

Anyway, this is the joined query code....

 

AdvanceTimeSettings inputPointStreamConfig = new AdvanceTimeSettings

 

                (

 

                   null,

 

                   new AdvanceTimeImportSettings("heartbeat"),

 

                   AdvanceTimePolicy.Drop

 

                );

 

 

 

                CepStream<PIEvent<Double>> inputPointStream = CepStream<PIEvent<Double>>.Create

 

                    (

 

                        "PI Point Input Stream",        

 

                        typeof(SnapshotInputFactory),   

 

                        inputPointsConfig,              

 

                        EventShape.Point,               

 

                        inputPointStreamConfig          

 

                    );

 

 

 

                var inputHeartbeat = new HeartBeat(1000, inputPoints)

 

                                                    .ToPointStream

 

                                                    (

 

                                                        streaminsightApp, 

 

                                                        e => PointEvent.CreateInsert<PIEvent<Double>>(DateTimeOffset.UtcNow, e), 

 

                                                        AdvanceTimeSettings.StrictlyIncreasingStartTime, 

 

                                                        "heartbeat"

 

                                                    );

 

 

 

                var inputSignalStream = from eachPointEvent in inputPointStream 

 

                                      .AlterEventDuration(eventDuration => TimeSpan.MaxValue)

 

                                      .ClipEventDuration(inputPointStream,(firstEvent, secondEvent) => firstEvent.Path == secondEvent.Path)

 

                                        from eachHeartbeat in inputHeartbeat 

 

                                        where eachPointEvent.Path == eachHeartbeat.Path 

 

                                        select eachPointEvent;

 

 

 

                var inputSignalStreamTracerQuery = inputSignalStream.ToQuery

 

                    (

 

                        streaminsightApp,

 

                        "PI Point Tracer Output Stream",

 

                        string.Empty,

 

                        typeof(TracerFactory),

 

                        new TracerConfig

 

                        {

 

                            DisplayCtiEvents = true,

 

                            SingleLine = true,

 

                            TraceType = TraceTypeValue.Console 

 

                        },

 

                        EventShape.Point,

 

                        StreamEventOrder.FullyOrdered

 

                    );

 

 

 

sealed class HeartBeat : IObservable<PIEvent<Double>>, IDisposable

 

    {

 

        private bool _done;

 

        private readonly List<IObserver<PIEvent<Double>>> _observers;

 

        private readonly Dictionary<String, PIEvent<Double>> _dummyevents;

 

        private readonly Random _random;

 

        private readonly object _sync;

 

        private readonly Timer _timer;

 

        private readonly int _timerPeriod;

 

        private readonly string[] _tags;

 

 

 

        public HeartBeat(int timerPeriod, string[] tags)

 

        {

 

            _done = false;

 

            _observers = new List<IObserver<PIEvent<Double>>>();

 

            _random = new Random();

 

            _tags = tags;

 

            _sync = new object();

 

            _timer = new Timer(EmitRandomValue);

 

            _timerPeriod = timerPeriod;

 

 

 

            _dummyevents = new Dictionary<String, PIEvent<Double>>();

 

            foreach (string _tag in tags)

 

            {

 

                _dummyevents.Add(_tag, new PIEvent<double> { Id = 0, Timestamp = DateTime.UtcNow, Path = _tag, Value = 0, Status = 0, ArrivalTime = DateTime.UtcNow, Annotation = null, IsAnnotated = false, IsEdited = false, IsQuestionable = false, StatusText = "" });

 

            }

 

 

 

            Schedule();

 

        }

 

 

 

        public IDisposable Subscribe(IObserver<PIEvent<Double>> observer)

 

        {

 

            lock (_sync)

 

            {

 

                _observers.Add(observer);

 

            }

 

            return new Subscription(this, observer);

 

        }

 

 

 

        public void OnNext(PIEvent<Double> ev)

 

        {

 

            lock (_sync)

 

            {

 

                if (!_done)

 

                {

 

                    foreach (var observer in _observers)

 

                    {

 

                        observer.OnNext(ev);

 

                    }

 

                }

 

            }

 

        }

 

 

 

        public void OnError(Exception e)

 

        {

 

            lock (_sync)

 

            {

 

                foreach (var observer in _observers)

 

                {

 

                    observer.OnError(e);

 

                }

 

                _done = true;

 

            }

 

        }

 

 

 

        public void OnCompleted()

 

        {

 

            lock (_sync)

 

            {

 

                foreach (var observer in _observers)

 

                {

 

                    observer.OnCompleted();

 

                }

 

                _done = true;

 

            }

 

        }

 

 

 

        void IDisposable.Dispose()

 

        {

 

            _timer.Dispose();

 

        }

 

 

 

        private void Schedule()

 

        {

 

            lock (_sync)

 

            {

 

                if (!_done)

 

                {

 

                    _timer.Change(_timerPeriod, Timeout.Infinite);

 

                }

 

            }

 

        }

 

 

 

        private void EmitRandomValue(object _)

 

        {

 

            DateTime dt = DateTime.UtcNow;

 

            foreach (string t in _tags)

 

            {

 

                PIEvent<Double> ev = _dummyevents[t];

 

                ev.ArrivalTime = dt;

 

                ev.Timestamp = dt;

 

                OnNext(ev);

 

            }

 

            Schedule();

 

        }

 

 

 

        private sealed class Subscription : IDisposable

 

        {

 

            private readonly HeartBeat _subject;

 

            private IObserver<PIEvent<Double>> _observer;

 

 

 

            public Subscription(HeartBeat subject, IObserver<PIEvent<Double>> observer)

 

            {

 

                _subject = subject;

 

                _observer = observer;

 

            }

 

 

 

            public void Dispose()

 

            {

 

                IObserver<PIEvent<Double>> observer = _observer;

 

                if (null != observer)

 

                {

 

                    lock (_subject._sync)

 

                    {

 

                        _subject._observers.Remove(observer);

 

                    }

 

                    _observer = null;

 

                }

 

            }

 

        }

 

    }

Outcomes