In today's Internet of Things world, where a lot of everyday products are embedded with hardware and software that allow integration of the physical and computer domains, a lot of information is being generated that will need to be stored and processed.This is where Event Hubs can be used for high throughput and event processing. Because Azure Service Bus queues, relays or topics have scale limits, Microsoft introduced Event hubs, last year, that are highly scalable data ingress services that can handle millions of events per second. It can provide event and telemetry ingress to the cloud at massive scale, due to its partition design, with low latency and high reliability. Event Hubs are used where you may want to ingest, persist and process a high volume of data. For PI users this can be a repository where events from multiple devices can be temporarily stored in event hubs and be brought determinately into the PI System. Data from PI can also be published to event hubs to be picked up by other systems.

 

 

To send your event data to the Event Hub you must use an event publisher. You can publish an event using AMPQ 1.0 or HTTPS as either a individual or batched transaction, in which AMQP is preferable because of its TLS stateful channel. The data is stored in various partitions with the number of partitions (2 - 32) specified at creation where the number of partitions is determined by the number of consumers that are downstream reading in parallel. The sender can use a Partition Key used to map incoming event data into specific partitions for the purposes of data organization. Partition keys are important for organizing data for downstream processing but are independent of the partitions themselves. If partitions keys are not used then a round robin approach is used for the partition assignment. Events can be stored up to 30 days maximum with a minimum of 24 hours, with the standard tier being 1 - 7 days, and are managed by the service bus and cannot be removed prior to their expiration.

 

Any entity that reads event data from an Event Hub is a consumer. Consumers read events through partitions in a consumer group using AMPQ 1.0, in which each consumer only reads a specific subset, or partition, of the message stream. This pattern enables horizontal scale for event processing and provides other stream-focused features. There is no need to poll for available data from the event hubs. Consumers use consumer groups which is a state, offset or position of the entire event hub. Consumer groups enable multiple consumers to each have a separate view of the event stream, and to read the stream independently at differing rates with their own offsets. There can be up to 20 consumer groups in a standard tier event hub with all event hubs have a default consumer group. As a consumer you can use an offset into the partition to specify where to start reading. The offset can be a byte or time-stamp that is stored by the consuming entity. Checkpointing can be used to mark the position within a partition and return to that mark in case the reader disconnects. When the reader reconnects it begins reading at the checkpoint that was previously submitted by the last reader of that partition in that consumer group. It is also possible to return to older data by specifying a lower offset from this checkpointing process. Checkpointing can provide robustness in the event of a failover between readers running on different machines.

 

Event Hubs provide capacity by Throughput Unit (TU) and a single TU provides a 1000 events/second ingress and double that for egress. The default for TU is 1 and is shared by all Event Hubs in that namespace and can be changed to a value from 1 - 20.

 

Authentication is provided by SAS and is provided at the namespace and event hub level. Publishers and Consumers can be given certain privileges such as Send, Listen and Manage. A SAS token is generated from a SAS key and is an SHA hash of a URL, encoded in a specific format. Using the name of the key (policy) and the token, Service Bus can regenerate the hash and thus authenticate the sender.

 

 

 

It is easy to get started publishing and consuming event hubs from Azure. If you don't have a Azure subscription you can sign up for a free 30 day trial. Event hubs are created at the namespace level of the Service Bus.

 

When you create a new event hub you will need to give it a name inside the namespace (i.e. ds03)

You can then specify the number of partitions: 2 - 32 (default - 4) and the number of retention days:  1- 7.

 

You will also need to create a Storage Account that is used by the consumer for checkpointing.

 

The Storage Bus keys required for the connection string can be found by clicking the Manage Access Keys link at the bottom of the storage account page.

 

 

Now that you have created an event hub we can send and receive events to it. The attached zip file contains a solution to help you get started publishing and consuming data from event hubs. Once you have downloaded and unzipped the project you will need to update the app.config for both the sender and receiver projects. You will need to use the keys specific for Send and Listen for the event hub. The receiver requires a storage connection and you will need to provide a connection string for it as well.

 

<?xml version="1.0" encoding="utf-8"?>
<configuration>
  <system.serviceModel>
    <extensions>
      <!-- In this extension section we are introducing all known service bus extensions. User can remove the ones they don't need. -->
      <behaviorExtensions>
        <add name="connectionStatusBehavior" type="Microsoft.ServiceBus.Configuration.ConnectionStatusElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
        <add name="transportClientEndpointBehavior" type="Microsoft.ServiceBus.Configuration.TransportClientEndpointBehaviorElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
        <add name="serviceRegistrySettings" type="Microsoft.ServiceBus.Configuration.ServiceRegistrySettingsElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
      </behaviorExtensions>
      <bindingElementExtensions>
        <add name="netMessagingTransport" type="Microsoft.ServiceBus.Messaging.Configuration.NetMessagingTransportExtensionElement, Microsoft.ServiceBus,  Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
        <add name="tcpRelayTransport" type="Microsoft.ServiceBus.Configuration.TcpRelayTransportElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
        <add name="httpRelayTransport" type="Microsoft.ServiceBus.Configuration.HttpRelayTransportElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
        <add name="httpsRelayTransport" type="Microsoft.ServiceBus.Configuration.HttpsRelayTransportElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
        <add name="onewayRelayTransport" type="Microsoft.ServiceBus.Configuration.RelayedOnewayTransportElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
      </bindingElementExtensions>
      <bindingExtensions>
        <add name="basicHttpRelayBinding" type="Microsoft.ServiceBus.Configuration.BasicHttpRelayBindingCollectionElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
        <add name="webHttpRelayBinding" type="Microsoft.ServiceBus.Configuration.WebHttpRelayBindingCollectionElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
        <add name="ws2007HttpRelayBinding" type="Microsoft.ServiceBus.Configuration.WS2007HttpRelayBindingCollectionElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
        <add name="netTcpRelayBinding" type="Microsoft.ServiceBus.Configuration.NetTcpRelayBindingCollectionElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
        <add name="netOnewayRelayBinding" type="Microsoft.ServiceBus.Configuration.NetOnewayRelayBindingCollectionElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
        <add name="netEventRelayBinding" type="Microsoft.ServiceBus.Configuration.NetEventRelayBindingCollectionElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
        <add name="netMessagingBinding" type="Microsoft.ServiceBus.Messaging.Configuration.NetMessagingBindingCollectionElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
      </bindingExtensions>
    </extensions>
  </system.serviceModel>
  <appSettings>
    <!-- Azure Storage Connection String. -->
    <add key="AzureStorageConnectionString" value="DefaultEndpointsProtocol=https;AccountName=YourAccountName;AccountKey=UT3bd2ysBN7BdKh+1RDdmB/9fxgTEs2ZAROSe+FQyTbXLgbaP0ShIB9Ik4v/90Gfb8lglZOHqmVsKMPRK5q8UQ=="/>
   <!--add key="Microsoft.ServiceBus.ConnectionString"-->
    <add key="Microsoft.ServiceBus.ConnectionString" value="Endpoint=sb://YourNamespace.servicebus.windows.net/;SharedAccessKeyName=Website;SharedAccessKey=JsW2L2qF80pQdES0Yi0gWuXeV6v7JUrWN5yEKLIDpNc="/>
   </appSettings>
  <startup>
    <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5.1"/>
  </startup>
</configuration>


 

You will then need to resolve the Microsoft references from NuGet and the OSIsoft.AFSDK, OSIsoft.PI.Configuration,OSIsoft.PI.Net and OSIsoft.PI.Net.Core from the GAC on the local machine where the PI AF client has been installed.

 

For the AF reference used in the Receiver you will need to have PI AF Client installed and reference the OSIsoft.AFSDK.dll.

 

Code snippet for the Sender that uses the EventHubConnectionString and the EventHubName to create a simple event hub client and send the data to the event hub asynchronously.

 

using System.Configuration;
using System.Collections.Generic;
using System;
using System.Text;
using System.Threading.Tasks;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using Newtonsoft.Json;


namespace Sender
{
    class Program
    {
        static string eventHubName;
                
        //
        static void Main(string[] args)
        {
            if (args.Length < 1)
            {
                Console.ForegroundColor = ConsoleColor.Red;
                Console.WriteLine("Incorrect number of commandline arguments. Expected: EventHub Name.");
                Console.ResetColor();
                Console.WriteLine("Press Ctrl-C or Enter to exit.");
                Console.ReadLine();
                return;
            }
            else
            {
                eventHubName = args[0];
                Console.WriteLine("EventHub Name: " + eventHubName);
            }


            Console.WriteLine("Press Ctrl-C to stop");
            Console.WriteLine("Press Enter to start");
            Console.ReadLine();
            
            List<string> cityList = new List<string>(new string[] { "Los Angeles","San Francisco","Seattle","New York City","Dallas","Atlanta","Charlotte" });
            
            SendingWeatherMessages(cityList);
           
        }


        //Publish the weather data to the Event Hub
        static async Task SendingWeatherMessages(List<string>cityList)
        {
            var eventHubConnectionString = GetEventHubConnectionString();
            var eventHubClient = EventHubClient.CreateFromConnectionString(eventHubConnectionString, eventHubName);
            List<Task> tasks = new List<Task>();
            
            try
            {
                List<string>.Enumerator e = cityList.GetEnumerator();
                while (e.MoveNext())
                {
                    var eventStream = new WeatherStream()
                    {
                        timeStamp = DateTime.Now,
                        location = e.Current.ToString(),
                        temp = new Random().Next(0,40),
                        wind = new Random().Next(0,20),
                        direction = new Random().Next(0,359),
                        humidity = new Random().Next(10, 80),
                        pressure = new Random().Next(25, 30),
                        bStatus = Convert.ToBoolean(new Random().Next(0, 1))
                    };
                    var bytes = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(eventStream));
                    Console.WriteLine("{0} > Sending message: {1}", DateTime.Now.ToString(), eventStream);
                    tasks.Add(eventHubClient.SendAsync(new EventData(bytes)));
                    Task.WaitAll(tasks.ToArray());


                }
            }
            catch (Exception exception)
            {
                Console.ForegroundColor = ConsoleColor.Red;
                Console.WriteLine("{0} > Exception: {1}", DateTime.Now.ToString(), exception.Message);
                Console.ResetColor();
            }
            //delay 
            await Task.Delay(200);
        }


        //Create the Event Hub from the Connection String in the app.config
        static bool CreateEventHub()
        {
            try
            {
                var manager = NamespaceManager.CreateFromConnectionString(GetEventHubConnectionString());


                // Create the Event Hub
                Console.WriteLine("Creating Event Hub...");
                manager.CreateEventHubIfNotExistsAsync(eventHubName).Wait();
                Console.WriteLine("Event Hub is created...");
            }
            catch (AggregateException ex)
            {
                Console.WriteLine(ex.Flatten());
                return false;
            }
            return true;        
        }
        
        //Get the Event Hub Connection String from the app.config
        static string GetEventHubConnectionString()
        {
            var connectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];
            if (string.IsNullOrEmpty(connectionString))
            {
                Console.WriteLine("Did not find Microsoft.ServiceBus.ConnectionString in app.config");
                return string.Empty;
            }
            try
            {
                var builder = new ServiceBusConnectionStringBuilder(connectionString);
                builder.TransportType = TransportType.Amqp;
                return builder.ToString();
            }
            catch (Exception exception)
            {
                Console.ForegroundColor = ConsoleColor.Red;
                Console.WriteLine(exception.Message);
                Console.ResetColor();
            }
            return null;
        }
    }
}

 

A code snippet of the Receiver uses the EventHubHost and EventProcessorFactory to manage the consuming of the events.

 

using System;
using System.Linq;
using System.Configuration;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;


namespace Receiver
{
    class Program
    {
        static string eventHubName;
        static string hostName;
        static string consumerGroupName;
        static EventProcessorHost host;
        static weathereventprocessorfactory factory;
        public static AF MyAF;


        static void Main(string[] args)
        {
            if (args.Length < 2)
            {
                Console.ForegroundColor = ConsoleColor.Red;
                Console.WriteLine("Incorrect number of arguments. Expected: Host <eventhubname> <hostname> [consumergrou].");
                Console.ResetColor();
                Console.WriteLine("Press Ctrl-C or Enter to exit.");
                Console.ReadLine();
                return;
            }


            eventHubName = args[0];
            Console.WriteLine("EventHub name: " + eventHubName);


            hostName = args[1];
            Console.WriteLine("Host name: " + hostName);


            if (args.Length > 2)
            {
                consumerGroupName = args[2];
            }
            else
            {
                consumerGroupName = EventHubConsumerGroup.DefaultGroupName;
            }


            Console.WriteLine("ConsumerGroup name: " + consumerGroupName);


            Console.WriteLine("Press Ctrl-C or Q to stop the host process");
            Console.WriteLine("Press Enter to start now");
            Console.ReadLine();


            StartHost().Wait();


            while (true)
            {
                if (Console.ReadKey().Key == ConsoleKey.Q)
                {
                    Console.WriteLine("Total received messages: {0}", factory.TotalMessages);
                    host.UnregisterEventProcessorAsync().Wait();
                    return;
                }
            }
        }


        //Start the host
        private static async Task StartHost()
        {
            var eventHubConnectionString = GetEventHubConnectionString();
            var storageConnectionString = GetStorageConnectionString();


            host = new EventProcessorHost(
                hostName,
                eventHubName,
                consumerGroupName,
                eventHubConnectionString,
                storageConnectionString, eventHubName.ToLowerInvariant());


            factory = new weathereventprocessorfactory(hostName);


            try
            {
                Console.WriteLine("{0} > Registering host: {1}", DateTime.Now.ToString(), hostName);
                var options = new EventProcessorOptions();
                await host.RegisterEventProcessorFactoryAsync(factory);


            }
            catch (Exception ex)
            {
                Console.ForegroundColor = ConsoleColor.Red;
                Console.WriteLine("{0} > Exception: {1}", DateTime.Now.ToString(), ex.Message);
                Console.ResetColor();
            }
        }


        //Get the Event Hub Connection String
        static string GetEventHubConnectionString()
        {
            var connectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];
            if (string.IsNullOrEmpty(connectionString))
            {
                Console.ForegroundColor = ConsoleColor.Red;
                Console.WriteLine("Did not find Service Bus connections string in appsettings (app.config)");
                Console.ResetColor();
                return string.Empty;
            }


            try
            {
                var builder = new ServiceBusConnectionStringBuilder(connectionString);
                return builder.ToString();
            }
            catch (Exception ex)
            {
                Console.ForegroundColor = ConsoleColor.Red;
                Console.WriteLine(ex.Message);
                Console.ResetColor();
            }
            return string.Empty;
        }


        //Get the Azure Storage Connection String
        private static string GetStorageConnectionString()
        {
            var connectionString = ConfigurationManager.AppSettings["AzureStorageConnectionString"];
            if (string.IsNullOrEmpty(connectionString))
            {
                Console.ForegroundColor = ConsoleColor.Red;
                Console.WriteLine("Did not find storage connections string in app.config.");
                Console.ResetColor();
                return string.Empty;
            }
            return connectionString;
        }
    }
}

 

Simple AF module creates the AF Database, elements, and attributes with PI data references and writes the event hub data to PI.

 

using System;
using System.Linq;
using System.Collections.Generic;
using System.Text;
using System.Text.RegularExpressions;
using OSIsoft.AF;
using OSIsoft.AF.Asset;
using OSIsoft.AF.PI;
using System.Threading;


namespace Receiver
{
    class AF
    {


        #region Variable Definitions
        // AF Database where asset information will be stored
        private AFDatabase Afdb = null;


        // PISystem object for PI AF 
        private PISystem PISys = null;


        //PISystems object for PI AF
        PISystems PISystems = null;


        //PIServer object for PI 
        public PIServer PISrv = null;


        //PI System name
        string PISysName = "xxxxxxxxx";  //Replace with PI System Name


        // Default AFDB name
        public string AFDBName = "Weather";




        // Default elementTemplate name
        public string ElemTemplName = "City";


        // Default root element name
        public string RootElemName = "Cities";


        // Constants
        private const int NUM_RETRIES = 25;             
        
        #endregion


       


        //Connect and initialize AFDB
        public bool Initialize()
        {
            try
            {
                 Console.WriteLine(String.Format("Initialize AF Server."));
                               
                // Connect to AF Server
                try
                {
                    PISystems = new PISystems();
                }
                catch (Exception e)
                {
                    Console.WriteLine(String.Format("An exception occurred when creating PISystems object. Exception: {0}", e.Message));
                }


                if (PISystems == null)
                {
                    Console.WriteLine("Cannot create PISystems object.");
                }


                PISys = PISystems.DefaultPISystem;
                PISrv = PIServers.GetPIServers(PISys).DefaultPIServer;
                if (PISrv == null)
                {
                    PISrv = PIServers.GetPIServers(PISys).Add(PISysName);
                    Console.WriteLine(String.Format("PI Server '{0}' does not exist in known list -- adding to list of PI Servers", "sPISysName")); 
                }
                else
                {
                    Console.WriteLine(String.Format("Default PI Server '{0}' exists in known PI Server list.", PISrv.Name.ToString())); 
                }


                if (PISys == null)
                {
                    PISys = PISystems.Add(PISysName);
                    Console.WriteLine(String.Format("AF Server '{0}' does not exist in known list -- adding to list of AF Servers", "sPISysName")); 
                }
                else
                {
                    Console.WriteLine(String.Format("AF Server '{0}' exists in known AF Server list.", PISys.Name.ToString())); 
                }


                int tries = 1;


                // If we can't connect, try every 5 seconds for up to 2 minutes (24 re-tries)
                while (tries < NUM_RETRIES)
                {
                    try
                    {
                        PISys.Connect();
                        Console.WriteLine(String.Format("Successfully connected to AF Server '{0}'.", PISys.Name.ToString())); 
                        break;
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine("Could not establish connection to AF Server. Connection will be re-attempted in 5 seconds.");
                        Thread.Sleep(5000);
                        tries++;
                    }
                }


                // Verify that AF Database exists
                string dbName = AFDBName; 
                Afdb = PISys.Databases[dbName];
                if (Afdb == null)
                {
                    try
                    {
                        Afdb = PISys.Databases.Add(dbName);
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine(String.Format("Could not create AF Database '{0}'. Exception: {1}", AFDBName, ex.Message)); 
                        Console.WriteLine("Please verify that 'AFDB' value in service configuration file is valid.");
                        return false;
                    }
                }


                if (Afdb != null)
                {
                    Console.WriteLine(String.Format("AF Database exists and is ready to receive asset information:  '{0}'.", AFDBName)); 
                    Afdb.Refresh();
                }
                else
                {
                    Console.WriteLine("AF Database could not be created or located. Please verify that 'AFDB' value in service configuration file is valid.");
                    return false;
                }


                // Create an Element Template
                AFElementTemplate elemTempl;
                elemTempl = Afdb.ElementTemplates[ElemTemplName];
                    
                if (elemTempl == null)
                    elemTempl = Afdb.ElementTemplates.Add(ElemTemplName);
                elemTempl.AllowElementToExtend = true;
                
                // Create an Attribute Template
                try
                {
                   AFAttributeTemplate AttrTempl = elemTempl.AttributeTemplates.Add("attributeTemplate");
                   AttrTempl.Description = "Attribute Template for MyElementTemplate";
                   AttrTempl.DataReferencePlugIn = AFDataReference.GetPIPointDataReference(PISys);
                   AttrTempl.ConfigString = @"\\%Server%\%Element%.%Attribute%;ReadOnly=False;ptclassname=classic;pointtype=Float32;descriptor=testdesc;location1=1";
                }
                catch
                {
                    Console.WriteLine("Attribute Template already exists");
                    
                }
                
                // Create element ()
                Afdb.Elements.Refresh();
                AFElement rootElem = Afdb.Elements[RootElemName];
                if (rootElem == null)
                    rootElem = Afdb.Elements.Add(RootElemName, elemTempl);


                Console.WriteLine("AF Class initialized successfully");
                Afdb.CheckIn();
        }
        catch (Exception ex)
        {
            Console.WriteLine(String.Format("Initialize AF Server '{0}' NOT complete. Exception:  {1}", PISysName, ex.Message)); 
            return false;
        }
        return true;
        }


        //Create elements, attributes and update PI values
        public bool EditElements(dynamic weatherdata)
        {
            string cityName = String.Empty;


            try
            {
                Console.WriteLine(String.Format("Editing AF Element '{0}'.", weatherdata.location.ToString()));
                cityName = weatherdata.location;




                AFValue AFVal;
                Afdb.Elements.Refresh();
                // Create element ()
                AFElement rootElem = null;
                AFElement elem = null;
                // Look for top-level object
                rootElem = Afdb.Elements[RootElemName];


                // Find asset's parent
                cityName = weatherdata.location.ToString();
                string parent = rootElem.ToString();
                
                // Create the asset under its parent
                elem = rootElem.Elements[cityName];
                
                if (elem == null)
                {
                    elem = rootElem.Elements.Add(cityName);
                }


                AFAttribute testAtt1;
                List<string> attrList = new List<string>(new string[] { "temp", "wind", "pressure", "direction", "humidity", "bStatus" });
                List<string> UOMList = new List<string>(new string[] { "degrees Celsius", "kilometer per hour", "bar", "degree", "percent"});
                string UOMstring = null;
                
                foreach (string attrName in attrList)
                {
                    if (elem.Attributes[attrName] == null)
                    {
                        testAtt1 = elem.Attributes.Add(attrName);
                        testAtt1.DataReferencePlugIn = AFDataReference.GetPIPointDataReference(PISys);
                        if (attrName.Equals("bStatus"))
                        {
                            testAtt1.ConfigString = @"\\%Server%\%Element%.%Attribute%;ptclassname=classic;ReadOnly=False;pointtype=Int16;pointsource=L";
                        }
                        else
                        {
                            if (String.Compare(attrName, "temp") == 0)
                                UOMstring = "degree Celsius";
                            else if (String.Compare(attrName, "wind") == 0)
                                UOMstring = "kilometer per hour";
                            else if (String.Compare(attrName, "pressure") == 0)
                                UOMstring = "bar";
                            else if (String.Compare(attrName, "direction") == 0)
                                UOMstring = "degree";
                            else if (String.Compare(attrName, "humidity") == 0)
                                UOMstring = "percent";
                            else UOMstring = "";


                            testAtt1.ConfigString = string.Format(@"\\%Server%\%Element%.%Attribute%;ptclassname=classic;ReadOnly=False;pointtype=Float32;UOM={0};pointsource=L",UOMstring);
                        }


                        foreach (AFAttribute attr in elem.Attributes)
                        {
                            testAtt1.DataReference.CreateConfig();
                        }
                    }
                    else
                    {
                        testAtt1 = elem.Attributes[attrName];
                    }
                    if (attrName.Equals("temp"))
                        AFVal = new AFValue(Convert.ToSingle(weatherdata.temp));
                    else if (attrName.Equals("wind"))
                        AFVal = new AFValue(Convert.ToSingle(weatherdata.wind));
                    else if (attrName.Equals("humidity"))
                        AFVal = new AFValue(Convert.ToSingle(weatherdata.humidity));
                    else if (attrName.Equals("direction"))
                        AFVal = new AFValue(Convert.ToSingle(weatherdata.direction));
                    else if (attrName.Equals("pressure"))
                        AFVal = new AFValue(Convert.ToSingle(weatherdata.pressure));
                    else
                        AFVal = new AFValue(Convert.ToSingle(weatherdata.bStatus));
                    testAtt1.SetValue(AFVal);
                }
                PISys.CheckIn(new IAFTransactable[] { rootElem, elem });
            }
            catch (Exception ex)
            {
                Console.WriteLine(String.Format("An error occurred when editing AFElements. Exception:  {0}", ex.Message));
                return false;
            }
            return true;
        }
     }
}

 

The solution as viewed in the PI System Explorer.

 

I hope you find this blog post informative and it spurs your interest. Stayed tuned for more on PI and Event Hubs in upcoming posts.

 

The full VS 2013 solution is attached.