Track Real-Time Tweets in PI using the Twitter Streaming API

Blog Post created by bshang Employee on Apr 8, 2015

Did you know the PI System can be the perfect infrastructure to track real-time tweets and analyze them with Asset Framework?


Here, I will show how easy it is to write a C# Windows Service that


1) signs up for updates from Twitter using the Twitter Streaming API

2) filters tweets by topic

3) stores them in real-time into PI


The example files are posted below. They consist of


1) Visual Studio 2013 solution containing the Windows Service (PI Twitter Service)

2) A "starter" AF Element Template (XML file)


Here is a sneak preview of what our service does



Our service collects tweets in real-time from Twitter and distributes them to the corresponding AF elements and attributes based on a configured topic.



Twitter Streaming API


We use the Twitter Streaming API to collect tweets for our PI System in real-time.


What is it?


The Twitter Streaming API is a RESTful API that provides a way to receive a sample (~1%) of all public tweets via Twitter's Public API. For more information, please visit Twitter Streaming API. There is also a Firehose API that provides all (non-sampled) public tweets but this requires a higher level of access.


A request for tweets via the Streaming API can be thought of as a very long-lived HTTP request. The client processes the resulting response (JSON) incrementally. There is no need to continuously poll a REST API endpoint. We can think of it as similar to downloading an infinitely long file over HTTP. Twitter will keep the connection open barring any network interruptions.


First steps: Registering the Twitter app


Before the Streaming API, or any of Twitter's APIs can be used, we need to register our (third-party) Twitter application.


1) Create a new Twitter app. Sign in to your Twitter account and navigate to Twitter Application Management. Then, click Create New App.



2) Enter app details. Follow the instructions for entering in the Name, Description, Website, and CallbackURL. For Website, we can just use, and leave CallbackURL blank. The latter two fields are not important for our purposes since our app will be a Windows Service, rather than web application.


3) Setup the keys and tokens. After registering the app, click on the "Keys and Access Tokens" tab. Then, generate the consumer key, consumer secret, access token, and access token secret. Please save these values,


Detail note: These keys and tokens are used because Twitter uses OAuth to authenticate our applications and allow users to authorize these applications to access user information. If you've ever registered for a website using "Sign in with Google+" or "Sign in with Facebook", that was likely done using OAuth. For our purposes, we will not need to know the inner workings of OAuth, but only recognize the context it is being used in. We'll see that our wrapper libraries will handle the authentication in the background for us.


Tweetinvi - .NET Libary for Twitter Streaming API


Tweetinvi is a .NET library that encapsulates the functionality of the Streaming API and exposes them as a set of simpler classes and methods, so the client code does not need to be burdened by lower level tasks, such as performing the OAuth workflow, creating HTTP clients, executing requests, and parsing responses. For Tweetinvi details, please visit their CodePlex page Tweetinvi - a friendly Twitter C# API. We will use this library in our Windows service application. As part of the attached Visual Studio solution, we've installed Tweetinvi from the NuGet (search "Tweetinvi").

NOTE: .NET Framework 4.0 or higher is required.


PI Twitter Service


PI Twitter Service is a Windows Service included in this Visual Studio solution that signs up for Twitter Streaming API updates and sends the tweet text and associated information (creator, hashtags, urls) to PI. The full Visual Studio 2013 solution is at the bottom of this post, along with a sample AF Element Template. In addition to demonstrating how to send real-time tweets to PI, other goodies include:


1) An example implementation of the Observer pattern, typically useful for "dataflow" type apps.

2) An example of a Windows Service that allows:

    a) To be run interactively by checking Environment.UserInteractive variable. Therefore, you can run this app as a service or via the console.

    b) To be debugged in Visual Studio IDE. Not a trivial issue as explained here: How to: Debug Windows Service Applications


Our app does many things, but this post will not touch on all aspects. For learning how to create a Windows Service, MSDN is a great resource - How to: Create Windows Services. For the Observer pattern, MSDN also provides a good resource too - Observer Design Pattern.

AF Database design

An AF database is used to organize our streaming tweets. The service application writes to AF attributes belonging to AF elements that match the "topic" of the tweet. Let's look at the AF structure now.

I've created an AF database called Twitter. It contains one AF Element Template, called "Twitter Topic Template". In this design, each AF element presents a "topic" and that topic is determined by its set of filters, or keywords (see Keywords AF Attribute in image below). In essence, our Windows Service will filter the tweets and redirect them to the AF element that contains the matching keywords. Then, tweet information is written to the AF attributes (PI Point Data References). The AF attributes are shown below.



The category "Configuration" stores the keywords, represented by a string array. Any tweets containing words matching our specified keywords will be have its information stored under the AF attributes for this AF element. The category "Tweet Data" stores all of the information associated with a tweet. For example, if our keywords were "datascience" and "data science", then any tweets containing these keywords will be written to attributes belonging our "Data Science" AF element. Therefore, we can think of an AF element as an abstraction providing the infrastructure to track tweets of a given topic. Here are the AF elements (topics) I've decided to track.



Observer pattern implementation


We make use of the Observer pattern (the .NET implementation of it), which is suggested by the first diagram above. There is an observable TwitterStream object representing all of our input tweets that we receive from Twitter. Different AF elements can subscribe to this stream, via an observer TwitterPipe object.


TwitterStream Observable


The TwitterStream object uses the Tweetinvi .NET library to establish a streaming HTTP connection with Twitter's servers. Let's look at the TwitterStream class below.


    public class TwitterStream : IObservable<ITweet>
        private IFilteredStream _internalStream;
        private IList<IObserver<ITweet>> _pipes;

        public TwitterStream()
            string accessToken = ConfigurationManager.AppSettings["accessToken"];
            string accessTokenSecret = ConfigurationManager.AppSettings["accessTokenSecret"];
            string consumerKey = ConfigurationManager.AppSettings["consumerKey"];
            string consumerSecret = ConfigurationManager.AppSettings["consumerSecret"];

            TwitterCredentials.SetCredentials(accessToken, accessTokenSecret, consumerKey, consumerSecret);

            this._internalStream = Stream.CreateFilteredStream();

            this._pipes = new List<IObserver<ITweet>>();

            //Signup the delegate for the MatchingTweetReceived event
            this._internalStream.MatchingTweetReceived += (sender, args) =>

        public IDisposable Subscribe(IObserver<ITweet> pipe)
            if (!_pipes.Contains(pipe))
            return new Unsubscriber();

        public void AddTrack(string s)

        public async void StartStreamAsync()
            await this._internalStream.StartStreamMatchingAnyConditionAsync();

        public void StopStream()

        private void ProcessTweet(MatchedTweetReceivedEventArgs args)
            //Update observers
            foreach (var observer in _pipes)
    } //end class


We have two fields.

  • IFilteredStream _internalStream is our interface to the private internal stream in Tweetinvi's library.
  • IList<IObserver<ITweet>> _pipes is the list of observers that sign up for updates from this TwitterStream.


Our constructor does the following:

  1. Read our keys and tokens from the app.config file. Lines 8-11
  2. Pass these credentials to Tweetinvi. Line 13
  3. Create a filtered Twitter stream exposing IFilteredStream. Line 15
  4. Specify to only receive tweets in English. Line 16 (but you can choose any language you want!)
  5. Provide a delegate to be called by the IFilteredStream when a matching tweet is received. Line 21.


Our public TwitterStream exposes methods to

  1. Subscribe to the stream via the Observer pattern. Line 28
  2. Add a filter via AddTrack(). Line 37
  3. Start/stop the stream. Lines 42 and 47.
  4. Process an incoming tweet via ProcessTweet() (used in our delegate instance). Inside this method, it just calls observer.OnNext() in accordance with the .NET Observer pattern. Line 52


Detail note: A "track" is Twitter API's terminology for a filter by keywords. One track is configured by providing a list of keywords, separated by spaces. For Twitter to send the tweet to the client stream, the tweet must contain at least ALL of the keywords in that track. A stream can have multiple tracks. If so, then the client can specify whether Twitter should send tweets that match at least one track's keywords or it must send tweets that match the keywords of every track. Since we are starting the stream by calling StartStreamMatchingAnyConditionAsync(), we receive tweets that match at least one track's keywords.


TwitterPipe Observer


Let's look at a stripped down version of our observer TwitterPipe class now. It writes to 5 AF attributes. The full implementation is in the attached Visual Studio solution.

    public class TwitterPipe : IObserver<ITweet>
        private AFElement _element;
        private IList<string> _filters;

        public TwitterPipe(AFElement element)
            this._element = element;
            this._filters = new List<string>();
            AFAttribute keywordsAttr = _element.Attributes[Attributes.Keywords.ToString()];
            string[] keywords = keywordsAttr.GetValue().Value as string[];
            this._filters = keywords.ToList<string>();

        public void SubscribeToStream(TwitterStream twitterStream)
            foreach (string s in this._filters)

        public virtual void OnNext(ITweet tweet)
            string tweetText = tweet.Text;

            //Check if this tweet matches pipe's filter
            bool isMatched = IsMatchingTweet(tweetText);
            if (isMatched)
                //Get timestamp of tweet
                AFTime timestamp = new AFTime(tweet.CreatedAt);

                //Extract tweet info and assign to AFValue and AFAttribute
                AFValue text = new AFValue(_element.Attributes["Text"], tweet.Text, timestamp);
                AFValue creator = new AFValue(_element.Attributes["Creator"], tweet.Creator.Name, timestamp);      
                AFValue location = new AFValue(_element.Attributes["Location"], tweet.Creator.Location, timestamp);

                string[] hashTagArray = tweet.Hashtags.Select<IHashtagEntity, string>(p => p.Text).ToArray<string>();
                string hashtags = String.Join(" ", hashTagArray);
                AFValue tags = new AFValue(_element.Attributes["Hashtags"], hashtags, timestamp);

                string[] urlArray = tweet.Urls.Select<IUrlEntity, string>(p => p.URL).ToArray<string>();
                string sUrl = String.Join(" ", urlArray);
                AFValue urls = new AFValue(_element.Attributes["Urls"], sUrl, timestamp);

                IList<AFValue> vals = new List<AFValue>();

                AFListData.UpdateValues(vals, AFUpdateOption.Insert, AFBufferOption.BufferIfPossible);

        private bool IsMatchingTweet(string text)
            bool isMatched = false;
            foreach (string s in this._filters)
                isMatched = text.IndexOf(s, StringComparison.OrdinalIgnoreCase) >= 0;
                if (isMatched) return true;
            return false;

        public void OnCompleted()
            throw new NotImplementedException();

        public virtual void OnError(Exception e)
            throw new NotImplementedException();
    } //end class


We have two fields.

  • AFElement _element is the element representing our "topic". For example, this would correspond to our "Big Data", "IoT", and "Smart Cities" elements in our diagram above.
  • IList<string> _filters is the list of filters for our topic. For example, for an "IoT" topic, the list could be {"IoT", "Internet of Things"}.


Our constructor does the following:

  1. Sets the AFElement field. Line 8
  2. Reads the "Keywords" AF attribute and stores them in our IList<string> _filters variable. Lines 9-12.


Our TwitterPipe exposes methods to

  1. Add the pipe to the list of subscribers of TwitterStream via SubscribeToStream(TwitterStream stream). Line 15.
  2. Process incoming tweets using OnNext() via Observer pattern implementation. Line 24
  3. Uses the private method IsMatchingTweet(string text) to check if incoming tweet matches any of the pipe's filters. Line 59


Detail note: When the TwitterPipe subscribes to the TwitterStream, it adds all of its filters as tracks for the TwitterStream. Therefore, the TwitterStream contains tracks for all of the filters from all subscribed pipes. When an incoming tweet arrives from Twitter's servers, the tweet will match the filter criteria for at least one of the pipes, but the stream will send the tweet to all pipes. It is up to the pipe via IsMatchingTweet(string text) to decide if it matches the set of filters that define the AF element's topic.


How are the Observers and Observable linked together?


This is done via an AFObservers class.


    public class AFObservers
        private AFElements _elements;

        public AFObservers(string server, string database)
            PISystem ps = new PISystems()[server];
            AFDatabase db = ps.Databases[database];
            this._elements = db.Elements;

        public void AddSubscribers(TwitterStream twitterStream)
            foreach (AFElement element in this._elements)
                TwitterPipe pipe = new TwitterPipe(element);


It contains an AFElements collection as a field representing our list of topics to track. From this element list, via AddSubscribers(), we create the TwitterPipe and subscribe it to the passed in TwitterStream.


After establishing these links, all we need to do is call StartStreamAsync() on the TwitterStream class to begin collecting tweets!


It would take too long to go through every detail in this Windows Service app. Please take a break if you managed to read this far . The explanations above should provide a good way to get started when looking through the Visual Studio source code.




Let's view all the tweets we collected on the topic of "Machine Learning".


Instead of tweets, we can also look at hash tags.


There are many possibilities now that we can collect tweets in real-time using PI. We can use Asset-Based Analytics and Event Frames, perhaps to detect when a certain topic goes viral. We can create new apps with PI that perform sentiment analyses, track trends, and offer personalized news or daily notification summaries.


Something to be explored is integration with Esri ArcGIS maps for visualizing tweets spatially in real-time. The current limitation is that only a very small percentage of tweets are geo-enabled (i.e. they send latitude/longitude along with the tweet). It is possible to extract the user profile location to guess at the geo-coordinates, but this information is often sparse and unclean, as shown below.


Some clever engineering can probably overcome this and allow us to infer or guess at the latitude and longitude. With the PI Integrator for Esri, we would then be able to visual the tweets on a map as they arrive in real-time. That will be a project for next time.



What if we sent all (sampled) tweets to PI?


Instead of applying filters or tracks, we can have Twitter send us a sample (~1%) of all the real-time tweets. This amounts to about 60 tweets/second. Can the PI server handle this load?


Of course! Here were the stats I found:

1) pibufss max events in queue: ~5

2) pisnapss avg. CPU usage: 0.6%

3) piarchss avg. CPU usage: 0.3%


Of course, there are more metrics to look at, but for the PI Server, this Twitter stream hardly makes a blip. It would be interesting to see how the PI Server would respond to the Twitter Firehose API, which sends ALL (no sampling) public tweets. This means about 6000 tweets/sec. My guess is that the server will fare just fine


I hope to have demonstrated a non-traditional, but fun use case for the PI Server. Although traditionally seen as a "process historian", our wonderful PI Server can do so much more. It can really be a very general and scalable infrastructure for any sort of high frequency real-time information, whether the data is coming from an OPC server or a streaming RESTful API.


If you find this post interesting, we invite you to join our Programming Hackathon at TechCon 2015.


Registration is now open!