21 Replies Latest reply on Oct 18, 2018 12:40 PM by Roger Palmen

    Best practice for high performance / throughput reading

    oysteintorget

      What is the best practice to achieve high performance for a single read query in AF SDK and/or PI Web API?

       

      My use cases is that I want to fetch large amounts out of data out of PI for machine learning purposes. The machine learning models will be created in Python. At the moment I do not know what tags are useful and for the tags that I read I want all the data stored in PI to train the machine learning models.

       

      I have tried to read the data using both AF SDK in C# and through the PI Web API and in both cases the number of values returned per second is less than I expected (about 50 000 values per second for WebAPI and about 200 000 for AF SDK un-cached reads).

       

      I have several questions related to this.

      1. Does the performance numbers I am seeing look about what I can expect for a PI Data Archive hosted on a physical machine with 24 cores, 32 GB RAM and with data stored on NVMe SSD drives? Or should I expect a lot more meaning that our setup is wrong?
      2. If the performance is what I can expect what is the recommended way to achieve better performance?
        1. Run several parallel queries for different tags?
        2. Duplicate data from the PI data archive into another storage solution like Parquet files and use that for analysis?
        3. Something else?
        • Re: Best practice for high performance / throughput reading
          John Messinger

          'Best practice' in this kind of scenario to me would involve using appropriate bulk data queries to return the required data in the most efficient manner. Without having posted any information as to the data methods you are using, we can only make assumptions. My preference and experience leans more to the AFSDK than PI Web API, so I'm going to talk in terms of the SDK going forward.

           

          To maximise data retrieval you should be using bulk data queries - something like calling RecordedValues on an AFAttributeList or PIPointList object. The other part of this is that your PI Data Archive needs to be at a minimum version to support using a single bulk RPC - if your Data Archive server is a fairly recent version (within the last 3-4 years) this won't be a problem.

           

          Will running queries in parallel improve the performance of your data retrieval? Maybe. Bulk calls in parallel can be the fastest option. The number of threads available on the server to service these RPC's will be limited and will also be servicing requests from other users and applications, so querying in multiple threads in your code won't necessarily improve the overall throughput you get. Also, the server does limit the number of concurrent bulk calls that can be executed. If you haven't seen it, have a read of KB01216.

           

          The other aspect of your question is "Does the performance numbers I am seeing look about what I can expect". Again, it depends. While your PI Data Archive server appears to be well spec'd, what other constraints are on this server? How many users and other applications are generally pulling data from this system? What about data ingress - how many data sources are writing data to this server, and at what kind of frequency? In a nutshell, how busy is this system in general? These things could all impose some limits on the throughput of your application.

           

          Finally, I'm wondering about what is your requirement for higher throughput. Does your ML model require streaming data for training, or are you dumping the data to flat files or another system for loading into the model? How much more throughput than 200K events per second are you expecting to achieve?

          3 of 3 people found this helpful
            • Re: Best practice for high performance / throughput reading
              oysteintorget

              Thank you for a detailed reply. I will see if I managed to answer all your points

              • I tried multiple ways of getting the data without much difference in performance. Through the attribute, directly from the PIServer by getting the PIPoint name and through an AFAttributeList. There was not significant performance difference between them except that reading through the AFAttributeList sometimes timed out.
              • As far as I know we are running version 2017 R2 or something like that.
              • The number of current users of the system is low. When I logged into the server to check load it appears that I am the only one reading data out of it. Quite a bit of data is flowing in, but I am querying historical data so it does not touch the same files as the writing.
              • For now I do not require streaming the data. That will come later once the ML models have been created. For now it is just about getting all the data out of the system so that our data scientist can start to look at the data and train the models.
              • My expectation was for at least 10x the performance I am currently getting. That would be more comparable to other timeseries database we have in use for other purposes. Especially I would expect the cached read performance to be much higher. If I understand the storage model of PI correctly it stores all points for a time period in one file. It would therefore be easy to know when historical data could be cached by the server or not.

               

              I will have a look at that KB and see if the helps.

                • Re: Best practice for high performance / throughput reading
                  John Messinger

                  I think that 10x your current performance of 200K events per second (~2M events per second) is not likely to happen on your current system. I don't know what the current performance benchmarks are for data extraction - perhaps Stephen Kwan can shed some light on this.

                   

                  Whilst your data is stored on SSD's disk I/O won't be such a limiting factor. As far as data caching goes, the Archive Read cache typically stores about 4KB of data per PI point, the same as the Archive Write cache. This isn't going to be a lot of data per tag, so I wouldn't expect the read cache to provide much of a performance increase for your use case. Otherwise, your data is going to be stored across multiple archive files (depending on the time range that you are requesting and the size of your archive files). Because of this, one possible optimisation to your data extraction might be to reduce the timespan for each call. If a single data request spans multiple archive files for the specified time range, then the loading and unloading of files in and out of the FSC may negatively impact your overall performance.

                   

                  I would also recommend reviewing the blog post at Improving PI Data Archive read performance by prefetching PI Data into File System Cache as this may also help with your performance optimisations. One of the comments on this post also talks about reprocessing your archives - if you have deleted large numbers of tags for example, reprocessing can optimise the internal archive structure for reading large amounts of data.

                   

                  Lastly, have a look also at KB00717. It's a slightly older article, but as it has been updated in the past year there is probably some useful information in there as to benchmarking your disk performance. While you should be getting good performance from your SSDs, there could still be some disk I/O involved in your performance issues. 

                  1 of 1 people found this helpful
                    • Re: Best practice for high performance / throughput reading
                      oysteintorget

                      Thank you for the links. The server has a bit of free RAM at the moment so maybe it is possible to get some more file system caching to improve things. I wish there was a way to tune this on the PI Server so that it would cache more data internally. RAM is fairly cheap so having a 250 GB RAM cache would have been great.

                       

                      It is good to know that expecting millions of events per second is not realistic. Then I can stop barking up the wrong tree and look at alternatives ways to solve this.

                       

                      I am wondering if my usage scenario is not that common since there appears to be no ready made solution for it. E.g assuming wanting to read 1 year of data for 100 tags with 4 values per minute gives (4 * 60 * 24 * 365 * 100 events) / (200 000 events per second) * 60 seconds per minute = approx 17 minutes of time to read the data. That is ok if just reading the data once, but not something you want to do often.

                        • Re: Best practice for high performance / throughput reading
                          dmoler

                          For what it's worth, it is possible to retrieve several million evt/s from a client application using concurrent calls. As noted above, you should cap the number of in-flight calls to PI Data Archive, but you will probably see gains going up to ~2x the number of archive threads (which is tunable via the PIarchss_ThreadCount parameter and defaults to 8). Data rates that high probably require the data to be in memory in the data archive so the other recommendations are worth pursuing. The PIPoint.RecordedValuesAsync calls work well for managing concurrent data extractions across several tags and chunking in time.

                          3 of 3 people found this helpful
                            • Re: Best practice for high performance / throughput reading
                              oysteintorget

                              Splitting into several tasks does give significant performance improvements in my case.

                               

                              Extending my tests with splitting reading of RecordedValues across parallel Task's I got a read speed of about 500 000 data points / second for first time read 900 000 data points / second for subsequent reads of the same data. This was when splitting across 8 task. Adding additional task did not appear to improve performance and is consistent with the default PIarchss_ThreadCount parameter being 8. I am fairly certain no-one has changed it on our system, but I do not know where to check.

                                • Re: Best practice for high performance / throughput reading
                                  tramachandran

                                  To access PIArchss_ThreadCount: PI SMT -> Operation -> Tuning Parameters -> Archive Tab

                                   

                                  Note:

                                  • Typical recommendation is to keep in x2 times the number of processor cores
                                  • Depending on the number of processors on the machine, this value may be increased so more RPC requests can be handled simultaneously. If all the threads are busy, RPCs are queued up and processed in chronological order.
                                  • Any changes to the parameter takes effect at server restart only.
                                  1 of 1 people found this helpful
                      • Re: Best practice for high performance / throughput reading
                        tramachandran

                        I couldn't have said it better myself, John Messinger has made some great points. Would it be possible for you to share more information on your ML  platform and requirements? Solution for a streaming problem may be a little different for a data extraction problem.

                        I also want to let you know about PI Integrator for Business Analytics, which presents PI System data perfectly suited to business intelligence tools including but not limited to Tableau, Tibco Spotfire, QlikView, and Microsoft Power BI for reporting and analytics. Business Intelligence (BI) client tools offer the ability to run retrospective analyses on a much larger set of your real-time PI System data.

                        2 of 2 people found this helpful
                        • Re: Best practice for high performance / throughput reading
                          Rick Davin

                          How many tags are we talking about here?  Are you reading recorded values or interpolated data?  How big is the time span?  Is there any subsecond data?  What is the sampling rate of the data?

                          1 of 1 people found this helpful
                            • Re: Best practice for high performance / throughput reading
                              oysteintorget
                              • How many tags is a bit hard to say. Usually it will be a smaller amount like 10 - 100. Though I can easily picture a data scientist wanting to access much more for a preliminary analysis.
                              • Reading recorded values.
                              • Time span is long for this scenario as you want all available data to train your model, i.e. as far back as you have history. For us that is for now about 1 year, but the longer we collect data the longer the period becomes.
                              • No sub-second data for now to my knowledge, but I do not know about all installed sensors. Sub-second data might come in the future if it is not there yet.
                              • Again I am not sure about the sampling rate for all tags. For one of the signals I tested it got a new value on average about every 15 second.
                                • Re: Best practice for high performance / throughput reading
                                  Rick Davin

                                  I'm still thinking out loud here ... new values every 15 seconds would be over 2 million uncompressed values per year.  For one tag.  What is your ArcMaxCollect set to?

                                   

                                  Is your model using all the history in 1 calculation?  That is you may have 10-100 tags each with 500K to 2 million values being fed into one calculation?  Or would the 10-100 tags feed a single value into a calculation, and you want to perform 500K to 2 million calculations?

                                   

                                  For pulling this much data, parallel may not be the best answer because you would be submitting 10-100 requests that are crawling over the archives.  If you were to experiment with parallel calls, you are advised to set MaxDegreeOfParallelism to 3 or 4.  Be forewarned that sending in 100 parallel calls would only be competing against yourself for resources, and your other calls are busy crawling over the archives.  The key reason to use parallel calls is to take advantage of network latency.  And having your calls queued up waiting for crawling to stop is not taking advantage of latency.  (This is mentioned in KB previously provided by John Messinger - see KB01216 - AF SDK Performance: Serial vs. Parallel vs. Bulk. )

                                  2 of 2 people found this helpful
                              • Re: Best practice for high performance / throughput reading
                                Roger Palmen

                                Just to set the stage, in a recent attempt, i did manage to pull data from the PI Server using a custom C# AFSDK application in the order of a 1 to 2 million events per minute of time the tool is running. But all the previous comments apply here: it depends on many factors, and this was a highly optimized setup for extracting the data

                                • Re: Best practice for high performance / throughput reading
                                  dmoler

                                  Below is an example of chunked, concurrent requests using async calls for points over a time range. It can be altered to make different calls, use attributes, etc. I'm trying to illustrate an effective way of data extraction so I used a simple algorithm for stitching boundaries (skip 1s forward). Rick Davin has a nice blog post that gives a much more robust treatment of stitching query results together.

                                   

                                  Usage would be something like this:

                                              var stopwatch = Stopwatch.StartNew();
                                              long count = 0;
                                              // try concurrentRequestCount = 1-2 times the number of archive processing threads
                                              foreach (var result in SampleDataExtractor.QueryForData(piPoints, new AFTimeRange("1-jan-2018", "*"), concurrentRequestCount: 8))
                                              {
                                                  // don't spend too much time here. If processing needs more work, send to a background task
                                                  count += result.Count;
                                              }
                                              stopwatch.Stop();
                                              Console.WriteLine($"{count.ToString("N0")} in {stopwatch.Elapsed} = {((double)count / stopwatch.Elapsed.TotalSeconds).ToString("N0")} evt/s");
                                  

                                   

                                   

                                      static class SampleDataExtractor
                                      {
                                          private const int ChunkSize = 50_000; // this is a well balanced chunk size for good performance
                                  
                                  
                                          public static IEnumerable<IList<AFValue>> QueryForData(IEnumerable<PIPoint> piPoints, AFTimeRange timeRange, int concurrentRequestCount, CancellationToken cancellationToken = default(CancellationToken))
                                          {
                                              // create lists to track queries in progress
                                              List<ChunkedQuery> queriesInProgress = new List<ChunkedQuery>(concurrentRequestCount);
                                              List<Task> tasksInProgress = new List<Task>(concurrentRequestCount);
                                  
                                  
                                              // create a queue for queries that won't run right away
                                              Queue<ChunkedQuery> queriesQueue = new Queue<ChunkedQuery>();
                                  
                                  
                                              // populate in-progress requests and queue the rest
                                              foreach (var piPoint in piPoints)
                                              {
                                                  var query = new ChunkedQuery(piPoint, timeRange);
                                                  if (queriesInProgress.Count < concurrentRequestCount)
                                                  {
                                                      // get the query to execute and request the next chunk
                                                      var task = query.RequestNextChunk(cancellationToken);
                                  
                                  
                                                      // add these to the list
                                                      queriesInProgress.Add(query);
                                                      tasksInProgress.Add(task);
                                                  }
                                                  else
                                                  {
                                                      // if the in progess items are full, queue up
                                                      queriesQueue.Enqueue(query);
                                                  }
                                              }
                                  
                                  
                                              // keep going while there are queries still being processed.
                                              while (tasksInProgress.Count > 0)
                                              {
                                                  if (cancellationToken.IsCancellationRequested)
                                                  {
                                                      Task.WaitAll(tasksInProgress.ToArray());
                                                      cancellationToken.ThrowIfCancellationRequested();
                                                  }
                                  
                                  
                                                  // wait for task to finish
                                                  var completedIndex = Task.WaitAny(tasksInProgress.ToArray());
                                  
                                  
                                                  // check if the completed chunk completed the query.
                                                  if (queriesInProgress[completedIndex].TryGetCompletedResult(out AFValues result))
                                                  {
                                                      // if there are queries left in queue, run them
                                                      if (queriesQueue.Count > 0)
                                                      {
                                                          var nextQuery = queriesQueue.Dequeue();
                                                          queriesInProgress[completedIndex] = nextQuery;
                                                          tasksInProgress[completedIndex] = nextQuery.RequestNextChunk(cancellationToken);
                                                      }
                                                      else
                                                      {
                                                          // if nothing is left in queue, remove the item
                                                          queriesInProgress.RemoveAt(completedIndex);
                                                          tasksInProgress.RemoveAt(completedIndex);
                                                      }
                                  
                                  
                                                      // Then yield the result to the caller.
                                                      // The caller will block further work if they don't MoveNext quickly but this gives an opportunity to release memory.
                                                      yield return result;
                                                  }
                                                  else
                                                  {
                                                      // if this query isn't done yet, request the next chunk
                                                      tasksInProgress[completedIndex] = queriesInProgress[completedIndex].RequestNextChunk(cancellationToken);
                                                  }
                                              }
                                          }
                                  
                                  
                                          private class ChunkedQuery
                                          {
                                              private readonly PIPoint _piPoint;
                                              private readonly List<AFValues> _queryResults = new List<AFValues>();
                                              private AFTimeRange _remainingRangeToQuery;
                                  
                                  
                                              public ChunkedQuery(PIPoint piPoint, AFTimeRange timeRange)
                                              {
                                                  _piPoint = piPoint;
                                                  _remainingRangeToQuery = timeRange;
                                              }
                                  
                                  
                                              public Task RequestNextChunk(CancellationToken cancellationToken)
                                              {
                                                  if (_remainingRangeToQuery.Span == TimeSpan.Zero)
                                                      throw new InvalidOperationException("Data retrieval is already completed");
                                  
                                  
                                                  return RequestNextChunkInternal(cancellationToken);
                                              }
                                  
                                  
                                              private async Task RequestNextChunkInternal(CancellationToken cancellationToken)
                                              {
                                                  var values = await _piPoint.RecordedValuesAsync(_remainingRangeToQuery, AFBoundaryType.Inside, filterExpression: null, includeFilteredValues: false, maxCount: ChunkSize, cancellationToken: cancellationToken);
                                                  if (values.Count < ChunkSize)
                                                  {
                                                      _remainingRangeToQuery = new AFTimeRange(_remainingRangeToQuery.EndTime, _remainingRangeToQuery.EndTime);
                                                  }
                                                  else
                                                  {
                                                      // note we are bumping by 1s in this situation - won't be valid for all situations
                                                      if (_remainingRangeToQuery.StartTime > _remainingRangeToQuery.EndTime)
                                                          _remainingRangeToQuery = new AFTimeRange(values[values.Count - 1].Timestamp - TimeSpan.FromSeconds(1), _remainingRangeToQuery.EndTime);
                                                      else
                                                          _remainingRangeToQuery = new AFTimeRange(values[values.Count - 1].Timestamp + TimeSpan.FromSeconds(1), _remainingRangeToQuery.EndTime);
                                                  }
                                                  _queryResults.Add(values);
                                              }
                                  
                                  
                                              public bool TryGetCompletedResult(out AFValues result)
                                              {
                                                  if (_remainingRangeToQuery.Span == TimeSpan.Zero)
                                                  {
                                                      var values = new AFValues(_queryResults.Sum(r => r.Count));
                                                      values.PIPoint = _piPoint;
                                                      foreach (var queryResult in _queryResults)
                                                      {
                                                          values.AddRange(queryResult);
                                                      }
                                                      result = values;
                                                      return true;
                                                  }
                                                  else
                                                  {
                                                      result = default(AFValues);
                                                      return false;
                                                  }
                                              }
                                          }
                                      }
                                  
                                  3 of 3 people found this helpful
                                    • Re: Best practice for high performance / throughput reading
                                      oysteintorget

                                      Thank you. This was really useful.

                                       

                                      I get 1 050 000 points per second with 12 concurrent calls with our setup. I think it can increased further since the number of cores on the server is 24 I do not think that anyone has changed the PIarchss_ThreadCount configuration parameter. I will try to find out what it is set to and if we might increase it.

                                        • Re: Best practice for high performance / throughput reading
                                          Roger Palmen

                                          I've given this a lot of thought before, and had a some of discussions with clients and OSIsoft on the throughput you can achieve on mass data extraction from PI. Nobody seems to want to commit to a specific achievable limit (although i have seen some tests by OSIsoft), but there is also no reason why the PI System should be limited to a specific number.

                                          In theory there is no 'hard limit' on the throughput you can achieve. So if you hit a limit (and to see the throughput you achieve of 1M Events/second), there should be a infrastructure bottleneck you can identify.

                                           

                                          The 3 main bottlenecks are:

                                            • Re: Best practice for high performance / throughput reading
                                              gregor

                                              This is an interesting discussion and I am impressed by the potential for improvement with asynchronous calls. I wasn't expecting a factor of 5 without changing any tuning parameters on the server. For sure hardware in the end is setting the limits as pointed out by Roger.

                                               

                                              There is one aspect which I believe was just scratched. Good PI Data Archive performance starts with the data acquisition. I admit, this is less of help when the history is already recorded but the more important is it to remind about it. A smaller data volume helps to accomplish good read performance but what are the things to consider?

                                               

                                              1. Scan frequency, Exception Reporting and Compression

                                                It depends on the sensor and the process which read frequencies are reasonable. Scanning faster than a sensor is able to deliver only generates stress for the DCS. Exception Reporting is used to remove signal noise. Looking at the sensor reading accuracy should help to adjust exception settings for PI Points. Because PI Interfaces perform the Exception Reporting this can already help to reduce the required network bandwidth. Compression is taking care to reduce the amount of archived values for periods the slope of the reading curve doesn't change much. For those who are concerned of loosing accuracy, please enable compression and set its deviation to 0. This will remove all readings along a straight line without losing any accuracy.

                                                 

                                              2. Data Type

                                                Yes, these days CPU's have 64-bit registers but when a value is written to disk, the required space is depending on the data type. Yes, I know disk space is cheap these days. If a value is stored with 2 or 4 times more accuracy than required, the reading performance will also be 2 to 4 times slower. Please keep in mind you like to deal with huge data volumes. Again, please consult with the sensor specifications for the sensors accuracy.

                                                Let's talk about strings. Each character is one byte. The longer a string is, the more space it requires and the worse will be the impact on the performance. PI offers the data type Digital which mainly allows you to refer a string representation by an integer number (15-bit unsigned). Whenever possible, use Digital State Sets and Digital points to store text information efficiently.

                                                 

                                              3. Sub second timestamps

                                                Yes, this is the one thing which was briefly mentioned but why is there an impact to the performance? With full second timestamps, PI Data Archive will store the offset in seconds instead of the full timestamp. When timestamps contain a sub second portion, PI Data Archive will store the full timestamp. No, sub second timestamps are not illegal but expensive from a performance perspective. The rule of thumps is, if there is no additional value in recording sub seconds, use full seconds.

                                              3 of 3 people found this helpful
                                                • Re: Best practice for high performance / throughput reading
                                                  Rick Davin

                                                  Nice points, Gregor.

                                                   

                                                  The biggest thing with sub second timestamps is people unwittingly updating them.  Generally this occurs if someone uses DateTime.Now, DateTime.UtcNow, or AFTime.Now.  I would suggest using AFTime.NowInWholeSeconds where possible.

                                                   

                                                  As for data type, I cringe every time I see a Float64 tag.  Not just because it's taking up twice as much disk space than it needs, but because it provides a false illusion about the precision and accuracy of the data.  In a real world where data is read from field devices, a majority of those devices support Float32 or Single, which a more limited precision.  And even then the value from the field device has a certain amount of noise in the signal, reducing the actual precision further. Seeing more decimal places does not magically make the value better.

                                                • Re: Best practice for high performance / throughput reading
                                                  Rick Davin

                                                  Hi Roger,

                                                   

                                                  It's hard to commit to what's achievable because there is a staggering number of factors at play.  Not just with the PI Data Archive, which might be a collective, but the entire PI System and network involved.

                                                   

                                                  I've followed the news about the 32 core AMD Threadripper 2, and saw an article about it being overclocked to 6 GHz across all 32 cores.  Does this tell me what's achievable?  Well, the guy was using a Liquid Nitrogen cooler and later reports were that it was only on 1 of the cores.  So tricking out a system just to see what is possible does not really tell us anything that is practical.

                                                   

                                                  Let's say we can trick out a PI Data Archive with all the fanciest and fastest hardware.  Let's load it down with 100 tags that have 1 second data for several years.  We could then publish stats on how fast it takes us to retrieve 1 year of data for all 100 tags.  Would that be helpful to you?  What if I then told you that we (1) turned off all data collecting nodes or any interfaces/services feeding data and (2) we had no other users of that PIDA other than ourselves?  Would you then even think you could come close to achieving the same results in a production environment?  Would you be slightly upset with us for teasing you with impractical numbers?

                                                  2 of 2 people found this helpful
                                                    • Re: Best practice for high performance / throughput reading
                                                      Roger Palmen

                                                      I agree there are many factors at play. But a thorough understanding of which factors attribute to the potential throughput really helps. Which knobs do i need to turn to increase throughput?

                                                      Your CPU cooling example is actually quite good. If you know CPU speed (and not CPU count) is your bottleneck, then that is what you need to address.

                                                       

                                                      And in the end, the bottlenecks are not many more than a handful, but do require thorough understanding of the PI architecture, and the cooperation with the OS. Understanding tuning parameters & system settings versus Performance Counters. Not saying it's easy to understand, but saying it's doable to get the insight where the bottleneck lies. I just can't accept people saying "No, this is the maximum", without being able to explain which limitation is hit.