Skip navigation
All Places > PI Developers Club > Blog > 2015 > April
2015

Did you know the PI System can be the perfect infrastructure to track real-time tweets and analyze them with Asset Framework?

 

Here, I will show how easy it is to write a C# Windows Service that

 

1) signs up for updates from Twitter using the Twitter Streaming API

2) filters tweets by topic

3) stores them in real-time into PI

 

The example files are posted below. They consist of

 

1) Visual Studio 2013 solution containing the Windows Service (PI Twitter Service)

2) A "starter" AF Element Template (XML file)

 

Here is a sneak preview of what our service does

diagram.PNG

 

Our service collects tweets in real-time from Twitter and distributes them to the corresponding AF elements and attributes based on a configured topic.

 

 

Twitter Streaming API

 

We use the Twitter Streaming API to collect tweets for our PI System in real-time.

 

What is it?

 

The Twitter Streaming API is a RESTful API that provides a way to receive a sample (~1%) of all public tweets via Twitter's Public API. For more information, please visit Twitter Streaming API. There is also a Firehose API that provides all (non-sampled) public tweets but this requires a higher level of access.

 

A request for tweets via the Streaming API can be thought of as a very long-lived HTTP request. The client processes the resulting response (JSON) incrementally. There is no need to continuously poll a REST API endpoint. We can think of it as similar to downloading an infinitely long file over HTTP. Twitter will keep the connection open barring any network interruptions.

 

First steps: Registering the Twitter app

 

Before the Streaming API, or any of Twitter's APIs can be used, we need to register our (third-party) Twitter application.

 

1) Create a new Twitter app. Sign in to your Twitter account and navigate to Twitter Application Management. Then, click Create New App.

createnewapp.PNG

 

2) Enter app details. Follow the instructions for entering in the Name, Description, Website, and CallbackURL. For Website, we can just use https://www.google.com, and leave CallbackURL blank. The latter two fields are not important for our purposes since our app will be a Windows Service, rather than web application.

registerapp.PNG


3) Setup the keys and tokens. After registering the app, click on the "Keys and Access Tokens" tab. Then, generate the consumer key, consumer secret, access token, and access token secret. Please save these values,

 

Detail note: These keys and tokens are used because Twitter uses OAuth to authenticate our applications and allow users to authorize these applications to access user information. If you've ever registered for a website using "Sign in with Google+" or "Sign in with Facebook", that was likely done using OAuth. For our purposes, we will not need to know the inner workings of OAuth, but only recognize the context it is being used in. We'll see that our wrapper libraries will handle the authentication in the background for us.

 

Tweetinvi - .NET Libary for Twitter Streaming API

 

Tweetinvi is a .NET library that encapsulates the functionality of the Streaming API and exposes them as a set of simpler classes and methods, so the client code does not need to be burdened by lower level tasks, such as performing the OAuth workflow, creating HTTP clients, executing requests, and parsing responses. For Tweetinvi details, please visit their CodePlex page Tweetinvi - a friendly Twitter C# API. We will use this library in our Windows service application. As part of the attached Visual Studio solution, we've installed Tweetinvi 0.9.6.1 from the NuGet (search "Tweetinvi").


NOTE: .NET Framework 4.0 or higher is required.


 

PI Twitter Service

 

PI Twitter Service is a Windows Service included in this Visual Studio solution that signs up for Twitter Streaming API updates and sends the tweet text and associated information (creator, hashtags, urls) to PI. The full Visual Studio 2013 solution is at the bottom of this post, along with a sample AF Element Template. In addition to demonstrating how to send real-time tweets to PI, other goodies include:

 

1) An example implementation of the Observer pattern, typically useful for "dataflow" type apps.

2) An example of a Windows Service that allows:

    a) To be run interactively by checking Environment.UserInteractive variable. Therefore, you can run this app as a service or via the console.

    b) To be debugged in Visual Studio IDE. Not a trivial issue as explained here: How to: Debug Windows Service Applications

 

Our app does many things, but this post will not touch on all aspects. For learning how to create a Windows Service, MSDN is a great resource - How to: Create Windows Services. For the Observer pattern, MSDN also provides a good resource too - Observer Design Pattern.


AF Database design


An AF database is used to organize our streaming tweets. The service application writes to AF attributes belonging to AF elements that match the "topic" of the tweet. Let's look at the AF structure now.


I've created an AF database called Twitter. It contains one AF Element Template, called "Twitter Topic Template". In this design, each AF element presents a "topic" and that topic is determined by its set of filters, or keywords (see Keywords AF Attribute in image below). In essence, our Windows Service will filter the tweets and redirect them to the AF element that contains the matching keywords. Then, tweet information is written to the AF attributes (PI Point Data References). The AF attributes are shown below.

afattributes.PNG

 

The category "Configuration" stores the keywords, represented by a string array. Any tweets containing words matching our specified keywords will be have its information stored under the AF attributes for this AF element. The category "Tweet Data" stores all of the information associated with a tweet. For example, if our keywords were "datascience" and "data science", then any tweets containing these keywords will be written to attributes belonging our "Data Science" AF element. Therefore, we can think of an AF element as an abstraction providing the infrastructure to track tweets of a given topic. Here are the AF elements (topics) I've decided to track.

elementstotrack.PNG

 

Observer pattern implementation

 

We make use of the Observer pattern (the .NET implementation of it), which is suggested by the first diagram above. There is an observable TwitterStream object representing all of our input tweets that we receive from Twitter. Different AF elements can subscribe to this stream, via an observer TwitterPipe object.

 

TwitterStream Observable

 

The TwitterStream object uses the Tweetinvi .NET library to establish a streaming HTTP connection with Twitter's servers. Let's look at the TwitterStream class below.

 

    public class TwitterStream : IObservable<ITweet>
    {
        private IFilteredStream _internalStream;
        private IList<IObserver<ITweet>> _pipes;

        public TwitterStream()
        {
            string accessToken = ConfigurationManager.AppSettings["accessToken"];
            string accessTokenSecret = ConfigurationManager.AppSettings["accessTokenSecret"];
            string consumerKey = ConfigurationManager.AppSettings["consumerKey"];
            string consumerSecret = ConfigurationManager.AppSettings["consumerSecret"];

            TwitterCredentials.SetCredentials(accessToken, accessTokenSecret, consumerKey, consumerSecret);

            this._internalStream = Stream.CreateFilteredStream();
            this._internalStream.AddTweetLanguageFilter(Language.English);

            this._pipes = new List<IObserver<ITweet>>();

            //Signup the delegate for the MatchingTweetReceived event
            this._internalStream.MatchingTweetReceived += (sender, args) =>
            {
                ProcessTweet(args);
            };
        }

        //IObservable
        public IDisposable Subscribe(IObserver<ITweet> pipe)
        {
            if (!_pipes.Contains(pipe))
            {
                _pipes.Add(pipe);
            }
            return new Unsubscriber();
        }

        public void AddTrack(string s)
        {
            this._twitterStream.AddTrack(s);
        }

        public async void StartStreamAsync()
        {
            await this._internalStream.StartStreamMatchingAnyConditionAsync();
        }

        public void StopStream()
        {
            _internalStream.StopStream();
        }

        private void ProcessTweet(MatchedTweetReceivedEventArgs args)
        {
            //Update observers
            foreach (var observer in _pipes)
            {
                observer.OnNext(args.Tweet);
            }
        }
    } //end class



















 

We have two fields.

  • IFilteredStream _internalStream is our interface to the private internal stream in Tweetinvi's library.
  • IList<IObserver<ITweet>> _pipes is the list of observers that sign up for updates from this TwitterStream.

 

Our constructor does the following:

  1. Read our keys and tokens from the app.config file. Lines 8-11
  2. Pass these credentials to Tweetinvi. Line 13
  3. Create a filtered Twitter stream exposing IFilteredStream. Line 15
  4. Specify to only receive tweets in English. Line 16 (but you can choose any language you want!)
  5. Provide a delegate to be called by the IFilteredStream when a matching tweet is received. Line 21.

 

Our public TwitterStream exposes methods to

  1. Subscribe to the stream via the Observer pattern. Line 28
  2. Add a filter via AddTrack(). Line 37
  3. Start/stop the stream. Lines 42 and 47.
  4. Process an incoming tweet via ProcessTweet() (used in our delegate instance). Inside this method, it just calls observer.OnNext() in accordance with the .NET Observer pattern. Line 52

 

Detail note: A "track" is Twitter API's terminology for a filter by keywords. One track is configured by providing a list of keywords, separated by spaces. For Twitter to send the tweet to the client stream, the tweet must contain at least ALL of the keywords in that track. A stream can have multiple tracks. If so, then the client can specify whether Twitter should send tweets that match at least one track's keywords or it must send tweets that match the keywords of every track. Since we are starting the stream by calling StartStreamMatchingAnyConditionAsync(), we receive tweets that match at least one track's keywords.

 

TwitterPipe Observer

 

Let's look at a stripped down version of our observer TwitterPipe class now. It writes to 5 AF attributes. The full implementation is in the attached Visual Studio solution.

    public class TwitterPipe : IObserver<ITweet>
    {
        private AFElement _element;
        private IList<string> _filters;

        public TwitterPipe(AFElement element)
        {
            this._element = element;
            this._filters = new List<string>();
            AFAttribute keywordsAttr = _element.Attributes[Attributes.Keywords.ToString()];
            string[] keywords = keywordsAttr.GetValue().Value as string[];
            this._filters = keywords.ToList<string>();
        }

        public void SubscribeToStream(TwitterStream twitterStream)
        {
            twitterStream.Subscribe(this);
            foreach (string s in this._filters)
            {
                twitterStream.AddTrack(s);
            }
        }

        public virtual void OnNext(ITweet tweet)
        {
            string tweetText = tweet.Text;

            //Check if this tweet matches pipe's filter
            bool isMatched = IsMatchingTweet(tweetText);
            if (isMatched)
            {
                //Get timestamp of tweet
                AFTime timestamp = new AFTime(tweet.CreatedAt);


                //Extract tweet info and assign to AFValue and AFAttribute
                AFValue text = new AFValue(_element.Attributes["Text"], tweet.Text, timestamp);
                AFValue creator = new AFValue(_element.Attributes["Creator"], tweet.Creator.Name, timestamp);      
                AFValue location = new AFValue(_element.Attributes["Location"], tweet.Creator.Location, timestamp);

                string[] hashTagArray = tweet.Hashtags.Select<IHashtagEntity, string>(p => p.Text).ToArray<string>();
                string hashtags = String.Join(" ", hashTagArray);
                AFValue tags = new AFValue(_element.Attributes["Hashtags"], hashtags, timestamp);

                string[] urlArray = tweet.Urls.Select<IUrlEntity, string>(p => p.URL).ToArray<string>();
                string sUrl = String.Join(" ", urlArray);
                AFValue urls = new AFValue(_element.Attributes["Urls"], sUrl, timestamp);

                IList<AFValue> vals = new List<AFValue>();
                vals.Add(text);
                vals.Add(creator);
                vals.Add(location);
                vals.Add(tags);
                vals.Add(urls);

                AFListData.UpdateValues(vals, AFUpdateOption.Insert, AFBufferOption.BufferIfPossible);
              }
        }

        private bool IsMatchingTweet(string text)
        {
            bool isMatched = false;
            foreach (string s in this._filters)
            {
                isMatched = text.IndexOf(s, StringComparison.OrdinalIgnoreCase) >= 0;
                if (isMatched) return true;
            }
            return false;
        }

        public void OnCompleted()
        {
            throw new NotImplementedException();
        }

        public virtual void OnError(Exception e)
        {
            throw new NotImplementedException();
        }
    } //end class
















 

We have two fields.

  • AFElement _element is the element representing our "topic". For example, this would correspond to our "Big Data", "IoT", and "Smart Cities" elements in our diagram above.
  • IList<string> _filters is the list of filters for our topic. For example, for an "IoT" topic, the list could be {"IoT", "Internet of Things"}.

 

Our constructor does the following:

  1. Sets the AFElement field. Line 8
  2. Reads the "Keywords" AF attribute and stores them in our IList<string> _filters variable. Lines 9-12.

 

Our TwitterPipe exposes methods to

  1. Add the pipe to the list of subscribers of TwitterStream via SubscribeToStream(TwitterStream stream). Line 15.
  2. Process incoming tweets using OnNext() via Observer pattern implementation. Line 24
  3. Uses the private method IsMatchingTweet(string text) to check if incoming tweet matches any of the pipe's filters. Line 59

 

Detail note: When the TwitterPipe subscribes to the TwitterStream, it adds all of its filters as tracks for the TwitterStream. Therefore, the TwitterStream contains tracks for all of the filters from all subscribed pipes. When an incoming tweet arrives from Twitter's servers, the tweet will match the filter criteria for at least one of the pipes, but the stream will send the tweet to all pipes. It is up to the pipe via IsMatchingTweet(string text) to decide if it matches the set of filters that define the AF element's topic.

 

How are the Observers and Observable linked together?

 

This is done via an AFObservers class.

 

    public class AFObservers
    {
        private AFElements _elements;

        public AFObservers(string server, string database)
        {
            PISystem ps = new PISystems()[server];
            AFDatabase db = ps.Databases[database];
            this._elements = db.Elements;
        }

        public void AddSubscribers(TwitterStream twitterStream)
        {
            foreach (AFElement element in this._elements)
            {
                TwitterPipe pipe = new TwitterPipe(element);
                pipe.SubscribeToStream(twitterStream);                  
            }
        }
    }















 

It contains an AFElements collection as a field representing our list of topics to track. From this element list, via AddSubscribers(), we create the TwitterPipe and subscribe it to the passed in TwitterStream.

 

After establishing these links, all we need to do is call StartStreamAsync() on the TwitterStream class to begin collecting tweets!

 

It would take too long to go through every detail in this Windows Service app. Please take a break if you managed to read this far . The explanations above should provide a good way to get started when looking through the Visual Studio source code.

 

 

Results


Let's view all the tweets we collected on the topic of "Machine Learning".

ml_tweets.PNG

Instead of tweets, we can also look at hash tags.

ml_hashtags.PNG

There are many possibilities now that we can collect tweets in real-time using PI. We can use Asset-Based Analytics and Event Frames, perhaps to detect when a certain topic goes viral. We can create new apps with PI that perform sentiment analyses, track trends, and offer personalized news or daily notification summaries.

 

Something to be explored is integration with Esri ArcGIS maps for visualizing tweets spatially in real-time. The current limitation is that only a very small percentage of tweets are geo-enabled (i.e. they send latitude/longitude along with the tweet). It is possible to extract the user profile location to guess at the geo-coordinates, but this information is often sparse and unclean, as shown below.

location.PNG

Some clever engineering can probably overcome this and allow us to infer or guess at the latitude and longitude. With the PI Integrator for Esri, we would then be able to visual the tweets on a map as they arrive in real-time. That will be a project for next time.

 

 

What if we sent all (sampled) tweets to PI?

 

Instead of applying filters or tracks, we can have Twitter send us a sample (~1%) of all the real-time tweets. This amounts to about 60 tweets/second. Can the PI server handle this load?

 

Of course! Here were the stats I found:

1) pibufss max events in queue: ~5

2) pisnapss avg. CPU usage: 0.6%

3) piarchss avg. CPU usage: 0.3%

 

Of course, there are more metrics to look at, but for the PI Server, this Twitter stream hardly makes a blip. It would be interesting to see how the PI Server would respond to the Twitter Firehose API, which sends ALL (no sampling) public tweets. This means about 6000 tweets/sec. My guess is that the server will fare just fine

 

I hope to have demonstrated a non-traditional, but fun use case for the PI Server. Although traditionally seen as a "process historian", our wonderful PI Server can do so much more. It can really be a very general and scalable infrastructure for any sort of high frequency real-time information, whether the data is coming from an OPC server or a streaming RESTful API.

 

If you find this post interesting, we invite you to join our Programming Hackathon at TechCon 2015.

 

Registration is now open!

hackathon_sig.png

Hi all,

 

The Esri Integrator team has been hard at work since our first release this past fall – and PI Integrator for Esri ArcGIS 2014 SP1 is here!

 

2014 SP1

This release brings some welcome enhancements to the product:

  • Compatibility with
    • The newly-released Esri ArcGIS 10.3 platform
    • Windows-based security in ArcGIS
    • Oracle-backed ArcGIS Server instances
    • Future versions of the PI AF SDK
    • AF Child Attributes
    • Direct support for Portal for ArcGIS data stores in the GeoEvent Extension when publishing layers. Previously, the Server data store was necessary; now we can go directly to Portal.

 

  • A slew of bug fixes, among which the interesting bits are fixes for
    • A few possible odd behaviors in the Configuration Wizard which would prevent publishing. These are cleaned up.
    • Inability to access the CSV definition for manually creating the target Feature Service, and subsequent inability to select a manually-created Feature Service in the Wizard. Manual publishing works as expected now.
    • UI glitches when defining a new layer, especially if that new layer is large. This service pack doesn’t bring many design changes to the UI, but it certainly polishes what is already there.

 

    The release notes go into further detail.

 

Two other general items of note:

 

  • Two of the Esri products we interact with have been renamed with ArcGIS 10.3. The GeoEvent Processor is now the GeoEvent Extension, and the Portal Extension for Server is now Portal for Server. Our docs reflect this, though if you’re still using ArcGIS 10.2.2, the renaming hasn’t struck you yet.
  • ArcGIS 10.3 introduces a new database option underlying the ArcGIS Server: the “ArcGIS datastore,” which is a free PostgreSQL incarnation that ostensibly is a great alternative to paid SQL Server and Oracle underpinnings. But it is not compatible with the Esri Integrator product. OSIsoft and Esri are working together to establish the cause and resolution. In the meantime, we didn’t want to hold the release of this service pack!

 

We, the members of the development team, are encouraged by the feedback you’ve given us thus far, and excited that this service pack will help the momentum continue.

 

ArcGIS Extensibility

Ok, enough about the Integrator product. Now a word on something arguably more interesting – the PI data itself, once it’s available in ArcGIS! (This being, of course, the whole point of the Esri Integrator...)

 

The PI ecosystem currently lacks direct Esri map plugins (but we’re working on that!) and the ArcGIS side is ripe for more tools and widgets that expose PI data. There’s a lot to be done in that area. Esri has wonderful partner support, their ArcGIS platform is highly extensible, and customers regularly adopt partner add-ins and solutions. We encourage all of you PI acolytes to explore that world of ArcGIS extensibility. Time to see what all that PI data can really do when delivered into such a rich and extensible GIS platform!

 

All the best,
Brandon
and the rest of the Esri Integrator development team

We have made our PI Developer Technologies more easily available. Anyone with an account on the Tech Support website or PI Square is now able to download the PI AF SDK, PI Web API, the PI OPC DA and PI OPC HDA Servers, and all products in the PI SQL Framework (PI OLEDB Provider, PI OLEDB Enterprise, PI JDBC Driver and PI ODBC Driver). We are doing this to make it easier for you to build applications that leverage PI System data. We are also allowing you to blog in public forums about using our PI Developer Technologies products, and to contribute to open-source projects that involve the PI System.

 

This does not mean that the PI Developer Technologies are license-free. You still need a PI System Access (PSA) license to deploy your applications. This is not a new requirement and we are not changing it. You might think of this as a run-time license. Note that the PI OPC DA and HDA Servers were also available under the older DA license. Talk to your Account Manager if you have questions about PSA or DA licenses.

Filter Blog

By date: By tag: