Exploring Azure Event Hubs – Code Implementation

This post is a continuation of a previous post, which describes the architecture required to implement a model where offsite (remote) auto dealerships subscribe to inventory data being streamed from a central truck dealership named Constoso Trucks.  If you have not read this post yet, you may want to take a moment to read through the contents and get up to speed on the solution that is being described in this post.

 

Before Getting Started


All of the code that will be discussed in this section is stored in GitHub at the following location: https://github.com/jixer/eventhub-demo.  See this post for an overview of how to download code from GitHub.

  

Overview of VS Projects


The main solution file is in the root of the GitHub source code repository and is named “EventHubDemo_V1.sln”.  Open this file in VisualStudio 2013 or 2012 in order to browse the contents.  Here is the main solution’s structure:

image010

  

Here is a brief description of each project:

  • The SalesApp Forms application serves as the main inventory management system and uses the DAL library to access its database.
  • The Consumer console application is the listener application that sits in the offsite locations.  It listens to the main EventHub for inventory updates and uses those to update its own local database via the UTruck.Offsite.Dal library.
  • The Shared project contains the message contracts that are shared between the producer and consumer applications.  It also contains some libraries that simplify the serialization of the data to XML text.

 

Next, let’s take a look at the setup required to configure our Azure Event Hub.

Implementation of Event Hub


The first thing we need to do is create the Event Hub in Azure.  The steps are fairly straight forward.  Once you are logged into Azure, use the “New” menu item at the bottom right, and navigate to the Event Hub section (at the time of writing this article, the feature is in Preview).

image012

 

Next, provide the necessary details for the event hub.  You can either leverage an existing service bus namespace or create a new one.  I opted in the example below to create a new one.

image014

 

On page 2, provide the partition count and message retention days.  I will not be covering these in this post, but there is a good amount of information regarding these concepts on MSDN.

image016

 

Clicking on the check mark button will finish the wizard.  Once you have completed the wizard, you will be directed to the service bus section in the portal where you will see that the namespace is being activated (ignore the “synapse” namespace in the image; this is a leftover item in my Azure account).  Activation can typically take anywhere between 30 seconds and 5 minutes.

image018

 

Once the namespace is created, you will see in the portal that it is in an “Active” status.

image020

 

If you browse into the namespace and navigate to the “Event Hubs” section, you should see something similar to the following.

image022

 

Code Implementation: Main Dealership


The overall application is fairly straightforward.  There is a main sales application that interfaces with the dealership’s main inventory system.  The application allows users to Add inventory.

image023

 

When new items are added to inventory (via the “Add” button), an event is sent to the Event Hub that instructs all listeners that “Refresh” their data.  This logic is implemented in the main code behind for Form1 (Form1.cs).


private void btnAddInventory_Click(object sender, EventArgs e)
{
    var truck = new Truck()
    {
        Id = Guid.Parse(txtNewTruckId.Text),
        Make = txtNewTruckMake.Text,
        Model = txtNewTruckModel.Text,
        Year = int.Parse(txtNewTruckYear.Text),
        Msrp = double.Parse(txtNewTruckMsrp.Text)
    };
    var inventoryItem = new InventoryItem()
    {
        Id = Guid.NewGuid(),
        AvailableTruck = truck
    };
    using (var ctx = new MainContext())
    {
        ctx.Trucks.Add(truck);
        ctx.Inventory.Add(inventoryItem);
        ctx.SaveChanges();
    }

    var convertedTruck = Mapper.Map<InventoryTruck>(truck);
    var eventSender = new EventSender();
    eventSender.SendData(convertedTruck);
}

 

The first few lines of code are responsible for creating and adding the truck to the database.   These can be ignored for now.  The main logic that will be of interest to us is the code that is used to send the new inventory item to service bus, which is abstracted behind the EventSender class:

public class EventSender
{
    private EventHubSender _sender;
    public EventHubSender Sender
    {
        get
        {
            if (_sender == null)
                _sender = CreatePartitionedSender();
            return _sender;
        }
    }

    public void SendData(InventoryTruck item)
    {
        // overload
        SendData(Sender, item);
    }

    private static EventHubSender CreatePartitionedSender()
    {
        // EventHubClient model (uses implicit factory instance, so all links on same connection)
        EventHubClient eventHubClient = EventHubClient.Create("contoso-trucks");

        // return sender for partition '1'
        return eventHubClient.CreatePartitionedSender("1");
    }

    private static void SendData(EventHubSender partitionedSender, InventoryTruck item)
    {
        // create the change event
        var changeEvent = new InventoryChangeEvent() { ChangeType = InventoryChangeType.Refresh, Truck = item };

        // create a memory stream with the XML serialized data and use to create event data object
        var strm = SerializationHelper.SerializeToMemoryStream(changeEvent);
        EventData data = new EventData(strm);

        // send event data to Event Hub
        partitionedSender.Send(data);
    }
}

 

Look closely at the SendData(…) method.  The SendData method leverages an EventHub sender.  Withing this method, the EventHubClient.Create() method is supplied the name of our Event Hub.  In addition, there are some settings in the App.config that instruct the Create() method how to create the EventHubClient.  Within the appSettings section of the App.config for the SalesApp, you will find the following configuration:

<appSettings>
  <!-- Service Bus specific app setings for messaging connections -->
  <add
	key="Microsoft.ServiceBus.ConnectionString"
	value="Endpoint=sb://contoso-trucks-ns.servicebus.windows.net/;SharedSecretIssuer=owner;SharedSecretValue=ehdUmNAdHDWesadnUuVef7OH9m0jJjShEJ5mxGtVFAg=" />
</appSettings>

   

This is all that is necessary to create the partitioned sender.

The EventHubSender object is used to send the EventData object to the bus.  The EventData object is created by providing a stream (contains the serialized XML text that represents our underlying InventoryTruck item).

The application also allows user to sell inventory.

image030

 

The implementation of the “Sell” feature is very similar to the “Add” feature, as shown in the following code (excerpt from Form1.cs):

private void btnSell_Click(object sender, EventArgs e)
{
    Guid truckGuid = Guid.Parse(txtTruckId.Text);
    Truck soldTruck;
    using (var ctx = new MainContext())
    {
        var inventoryQuery = ctx.Inventory.Where(x => x.AvailableTruck.Id == truckGuid);
        var inventoryItem = inventoryQuery.FirstOrDefault();
        soldTruck = inventoryQuery.Select(x => x.AvailableTruck).FirstOrDefault();
        if (inventoryItem != null && soldTruck != null)
        {
            var sale = new Sale()
            {
                Id = Guid.NewGuid(),
                CustomerName = txtName.Text,
                SaleAmount = double.Parse(txtAmount.Text),
                SaleDate = DateTime.Now,
                TruckSold = soldTruck
            };
            ctx.Sales.Add(sale);
            ctx.Inventory.Remove(inventoryItem);
        }
    }

    var convertedTruck = Mapper.Map<InventoryTruck>(soldTruck);
    var eventSender = new EventSender();
    eventSender.SendData(convertedTruck);
}

 

First, the button handler creates the sale object and updates its own database appropriately.  The last three lines are responsible for converting the object to the Shared.InventoryTruck item, which is then sent to the bus using another one of our crafty EventSender objects.

 

Code Implementation: Example Offsite Location


The offsite location is responsible for monitoring the Event Hub for new events.  We can fire up the Consumer console application and fire through a couple inventory updates to see what happens:

  1. Start the Consumer console application
  2. Start SalesApp Forms application
  3. Add a new Truck in the SalesApp
  4. Press any key other than ‘q’ in the Conumer console window to force it to retrieve another record
  5. Repeat steps 3 – 4 two more times

 

After repeating those steps, you should see something similar to the screenshot below.

image034

 

Let’s take a look at the code.  Within the Program.cs file, you will find in the beginning section, the logic necessary to create the EventHubReceiver.  The EventHubReceiver is responsible for receiving the messages from the Event Hub.  Here is the logic that is used to create the EventHubReceiver:

static void Main(string[] args)
{
    //***Section Snipped****//

    // obtain the consumer
    EventHubReceiver receiver = GetEventHubReceiver(state);             

    //***Section Snipped****//
}

private static EventHubReceiver GetEventHubReceiver(SiteState state)
{
    // Create the EventHubClient and retrieve the default consumer group
    EventHubClient eventHubClient = EventHubClient.Create("contoso-trucks");
    EventHubConsumerGroup defaultConsumberGroup = eventHubClient.GetDefaultConsumerGroup();

    // if state ontains an offset, then use it; otherwise use the default (beginning of stream)
    if (string.IsNullOrEmpty(state.Offset))
        return defaultConsumberGroup.CreateReceiver("1");
    else
        return defaultConsumberGroup.CreateReceiver("1", state.Offset);
}

 

Similar to the send side, we must first create an EventHubClient.  The event hub client is created by specifying our “contoso-trucks” event hub in conjunction with the necessary service bus namespace specification in the App.config.  The EventHubClient then creates a receiver through the use of the default consumer group.  The consumer application maintains its state through the use of a state object which it saves and restores to a local “state.dat” file.

public class SiteState
{
    public string Offset { get; set; }

    public SiteState()
    {
    }

    public SiteState(string offset)
    {
        Offset = offset;
    }
}

The main loop responsible for retrieving messages from the Event Hub:

while (continueProcessing)
{
    // retreive a batch of data
    var data = receiver.Receive(10, TimeSpan.FromSeconds(10));

    // iterate through events
    foreach (EventData item in data)
    {
        // deserialize the InventoryChangeEvent from the message body
        var bodyStream = item.GetBodyStream();
        InventoryChangeEvent changeEvent = SerializationHelper.TryExtractInventoryChangedEvent(bodyStream);

        // verify that deserialization was successful
        if (changeEvent != null)
        {
            // update inventory depending on type of change
            if (changeEvent.ChangeType == InventoryChangeType.Refresh)
            {
                var mergeTruck = Mapper.Map<InventoryItem>(changeEvent.Truck);
                MergeInventory(mergeTruck);
                Console.WriteLine("Merged 1 truck into inventory");
            }
            else if (changeEvent.ChangeType == InventoryChangeType.Sold)
            {
                RemoveInventory(changeEvent.Truck.Id);
                Console.WriteLine("Removed 1 truck into inventory");
            }
        }

        // update the saved state with the new offset
        state.Offset = item.Offset;
    }

    // save state on each round
    StateManager.WriteState(state);

    // management of the console host
    Console.WriteLine("Press any key to continue or 'q' to quit...");
    var keyPress = Console.ReadKey(false);
    continueProcessing = keyPress.KeyChar != 'q';
}

 

The logic here grabs ten message batches of EventData.  It then iterates over these once they are received and writes the contained data into its local data source.

Now let’s take a look at things in action.  If we delete state file and spin up a new instance of the Consumer application, it should be capable of receiving the three messages that were originally published to the bus.

Copy the compiled files for the Consumer application to a new place.  Additionally, delete the existing .dat file and point the connectionString in the app.config file to a new database.  If we fire up the application, we should see in the console output that three messages were read from the EventHub.  This scenario represents our offline site.  The offline site is capable of starting at the very beginning of the event stream and retrieving all messages until it is caught up.

image042

 

Now that both instances are at the latest point in time, they should both begin to receive events in real-time as they are written the event hub.  The snappiness (speed) you will see when submitting a couple sell and add inventory orders is brought to you through the amazing throughput supported by AMQP and the Event Hub platform.

 

Conclusion


Hope you enjoyed this post.  Please use the social features below to share this with your friends.

 

 

 

Add your comment Add your comment

Previously