PostgreSQL Historical Log by Table

In my current project, there is a need for tracking data changes in the PostgreSQL tables. The end goal is, if a row changes, we copy the previous row before the change transaction completes and write it to a logging table. We will accomplish this with in the following steps:

  •  Creating a table LIKE our table that needs logging
  •  Create a function for our table
  •  Apply that function as a trigger

Table Like

First we need an example table to get started with. For a simple example, lets use a basic address table.

create table address
(
address_id integer not null,
type varchar(100),
street1 varchar(120) not null,
street2 varchar(120),
street3 varchar(120),
street4 varchar(120),
city varchar(80),
po_box_code varchar(20) not null,
phone_number varchar(50),
date_created timestamp with time zone not null default current_timestamp
);

view raw
address_table.sql
hosted with ❤ by GitHub

This basic table has enough constraints to make a decent example. We need to create a copy of this table, one where the columns are the same name and type, but without all of the constraints. Luckily for us, PostgreSQL provides a feature for just a situation. For this, we want to use the like_option for the CREATE TABLE statement. According the latest documentation (PostgreSQL 12) at the time of writing this post:

The LIKE clause specifies a table from which the new table automatically copies all column names, their data types, and their not-null constraints.

Unlike INHERITS, the new table and original table are completely decoupled after creation is complete. Changes to the original table will not be applied to the new table, and it is not possible to include data of the new table in scans of the original table.

Also unlike INHERITS, columns and constraints copied by LIKE are not merged with similarly named columns and constraints. If the same name is specified explicitly or in another LIKE clause, an error is signaled.

The optional like_option clauses specify which additional properties of the original table to copy. Specifying INCLUDING copies the property, specifying EXCLUDING omits the property. EXCLUDING is the default. If multiple specifications are made for the same kind of object, the last one is used.

We will want to exclude all constraints so that when our trigger fires, it can write any data to the columns without worrying if those columns are valid. The resulting table definition looks like the following:

create table logging_address
(
like address EXCLUDING CONSTRAINTS,
operation char(10) not null,
date_operated timestamp with time zone not null default current_timestamp
);

view raw
logging_address.sql
hosted with ❤ by GitHub

Logging Function

Next, there needs to be a trigger that logs the data. To create a new trigger in PostgreSQL, you follow these steps:

    • First, create a trigger function using CREATE FUNCTION statement.
    • Second, bind the trigger function to a table by using CREATE TRIGGER statement.

A trigger function is similar to an ordinary function. However, a trigger function does not take any argument and has a return value with the type of trigger. Inside this trigger function, insert the old data into the logging table. This makes the trigger function as follows:

create function address_trigger_function()
returns trigger as $$
BEGIN
insert into logging_address_address (address_id, type, street1, street2, street3, street4, city, po_box_code, phone_number, date_created, operation)
values (old.address_id, old.type, old.street1,old.street2,old.street3,old.street4,old.city,old.po_box_code,old.phone_number, old.date_created,TG_OP);
RETURN NEW;
end;
$$ LANGUAGE plpgsql;

TG_OP is Data type text; a string of INSERT, UPDATE, DELETE, or TRUNCATE telling for which operation the trigger was fired.

Implementing the Trigger

As we said earlier, Second, bind the trigger function to a table by using CREATE TRIGGER statement. This part is fairly easy.

CREATE TRIGGER address_versioning_trigger
BEFORE UPDATE OR DELETE ON location.address
FOR EACH ROW EXECUTE PROCEDURE address_trigger_function();

view raw
create_trigger.sql
hosted with ❤ by GitHub

Now, whenever you insert, update, or delete a record in the address table, the operation is logged in the logging address table.

New Pluralsight Courses Released!

My new Pluralsight courses Cleaning and Preparing Data in Microsoft Azure and Architecting Xamarin.Forms Applications for Code Reuse were just released! Here are the synopsis:

Cleaning and Preparing Data in Microsoft Azure

Abstract

This course targets software developers and data scientists looking to understand the initial steps in a machine learning solution. The content will showcase methods and tools available using Microsoft Azure.

Description

No data science project of merit has ever started with great data ready to plug into an algorithm. In this course, Cleaning and Preparing Data in Microsoft Azure, you’ll learn foundational knowledge of the steps required to utilize data in a machine learning project. First, you’ll discover different types of data and languages. Next, you’ll learn about managing large data sets and handling bad data. Finally, you’ll explore how to utilize Azure Notebooks. When you’re finished with this course, you’ll have the skills and knowledge of preparing data needed for use in Microsoft Azure. Software required: Microsoft Azure.

Architecting Xamarin.Forms Applications for Code Reuse

Abstract

A well-architected application is flexible to changing business requirements. This course will teach you how to architect Xamarin.Forms applications in a way that promotes reusable patterns.

Description

As business requirements change, so do solution assumptions. In this course, Architecting Xamarin.Forms Applications for Code Reuse, you’ll learn different architectural patterns in Xamarin.Forms. First, you’ll explore project structure and organization. Next, you’ll discover patterns and standards to promote code sharing. Finally, you’ll learn how to utilize dependency injection in Xamarin.Forms. When you’re finished with this course, you’ll have the skills and knowledge of architecting Xamarin.Forms projects needed to optimally promote code reuse.

New Pluralsight Course Released!

My new Pluralsight course Sourcing Data in Microsoft Azure was just released! Here is the synopsis:

Abstract

This course targets software developers looking to source data from inside and outside of the cloud. The content will also showcase methods and tools available using Microsoft Azure.

Description

The cloud has nearly infinite compute power for processing. In this course, Sourcing Data in Microsoft Azure, you’ll learn foundational knowledge of data types, data policy, and finding data. First, you’ll learn how to register data sources with Azure Data Catalog. Next, you’ll discover how to extract, transform, and load data with Azure Data Factory. Finally, you’ll explore how to set up data processing with Azure HD Insight. When you’re finished with this course, you’ll have the skills and knowledge of the tools and processes needed to source data in Microsoft Azure. Software required: Microsoft Azure portal.

New Pluralsight Course Released!

My new Pluralsight course Deploying and Managing Models in Microsoft Azure was just released! Here is the synopsis:

Abstract

In this course, you’ll learn about how data science practitioners can utilize tools for managing the models they create. You’ll also see those tools showcased in Microsoft Azure.

Description

One of the most overlooked processes in data science is managing the life cycle of models. In this course, Deploying and Managing Models in Microsoft Azure, you’ll gain foundational knowledge of Azure Machine Learning. First, you’ll discover how to create and utilize Azure Machine Learning. Next, you’ll find out how to integrate with Azure DevOps. Finally, you’ll explore how to utilize them together to automate the deployment and management of models. When you’re finished with this course, you’ll have the skills and knowledge of model life cycle management needed to manage a machine learning project. Software required: Microsoft Azure.

Using Angular Kendo Grid with Elastic Search and ASP.NET Core

There was a need for using a Kendo Grid in an Angular 5 website where the backing store for the data was Elastic Search. Utilizing the filtering on local data was simple enough but for the needs of filtering there needed to be server side integration. The server was running ASP.NET Core.

To get started create a view and view model for Angular to expose the grid.

import { Component } from '@angular/core';
import { process, State } from '@progress/kendo-data-query';
import { sampleProducts } from './products';
import {
GridComponent,
GridDataResult,
DataStateChangeEvent
} from '@progress/kendo-angular-grid';
@Component({
selector: 'my-app',
template: `
<kendo-grid
[data]="gridData"
[pageSize]="state.take"
[skip]="state.skip"
[sort]="state.sort"
[filter]="state.filter"
[sortable]="true"
[pageable]="true"
[filterable]="true"
(dataStateChange)="dataStateChange($event)">
<kendo-grid-column field="ProductID" title="ID" width="40" [filterable]="false">
</kendo-grid-column>
<kendo-grid-column field="ProductName" title="Product Name">
</kendo-grid-column>
<kendo-grid-column field="FirstOrderedOn" title="First Ordered On" width="240" filter="date" format="{0:d}">
</kendo-grid-column>
<kendo-grid-column field="UnitPrice" title="Unit Price" width="180" filter="numeric" format="{0:c}">
</kendo-grid-column>
<kendo-grid-column field="Discontinued" width="120" filter="boolean">
<ng-template kendoGridCellTemplate let-dataItem>
<input type="checkbox" [checked]="dataItem.Discontinued" disabled/>
</ng-template>
</kendo-grid-column>
</kendo-grid>
`
})
export class AppComponent {
public state: State = {
skip: 0,
take: 5,
// Initial filter descriptor
filter: {
logic: 'and',
filters: [{ field: 'ProductName', operator: 'contains', value: 'Chef' }]
}
};
public gridData: GridDataResult = process(sampleProducts, this.state);
public dataStateChange(state: DataStateChangeEvent): void {
this.state = state;
this.gridData = process(sampleProducts, this.state);
}
}

view raw
app.component.ts
hosted with ❤ by GitHub

import { NgModule } from '@angular/core';
import { BrowserModule } from '@angular/platform-browser';
import { BrowserAnimationsModule } from '@angular/platform-browser/animations';
import { HttpClientModule } from '@angular/common/http';
import { GridModule } from '@progress/kendo-angular-grid';
import { AppComponent } from './app.component';
@NgModule({
bootstrap: [
AppComponent
],
declarations: [
AppComponent
],
imports: [
BrowserModule,
BrowserAnimationsModule,
HttpClientModule,
GridModule
]
})
export class AppModule { }

view raw
app.module.ts
hosted with ❤ by GitHub

import { AppModule } from './app.module';
import { platformBrowserDynamic } from '@angular/platform-browser-dynamic';
const platform = platformBrowserDynamic();
platform.bootstrapModule(AppModule);

view raw
main.ts
hosted with ❤ by GitHub

To wire the view and view model to the server side data, there needs to be an Angular HTTP service and an ASP.NET Core Controller. The controller needs to be able to accept the filter and paging options of the grid as the user changes them and react to them server side. To accomplish this, some objects need to be created to handle the request:

First, the filter object, which is changed whenever a new filter is selected or is cleared; must be mapped to a C# object that can be serialized. The structure of the Kendo Grid filter is as such:

filter: {
      logic: 'and',
      filters: [{ field: 'ProductName', operator: 'contains', value: 'Chef' }]
}

To make that object transportable to C#, lets create a POCO:

public class CompositeFilterDescriptor
{
[JsonProperty("logic")]
public string Logic { get; set; }
[JsonProperty("filters")]
public ICollection<CompositeFilterDescriptor> Filters { get; set; }
[JsonProperty("field")]
public string Field { get; set; }
[JsonProperty("operator")]
public string Operator { get; set; }
[JsonProperty("value")]
public object Value { get; set; }
[JsonProperty("ignoreCase")]
public bool? IgnoreCase { get; set; }
}

public class Page
{
public int Skip { get; set; }
public int Take { get; set; }
}

view raw
Page.cs
hosted with ❤ by GitHub

public class RequestDto
{
public CompositeFilterDescriptor Filters { get; set; }
public Page Page { get; set; }
}

view raw
RequestDto.cs
hosted with ❤ by GitHub

Now lets create an ASP.NET Core controller endpoint for our Filter.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using Nest;
public class GridController : Controller
{
private readonly ElasticClient _elasticClient;
public GridController()
{
//Initalize ElasticClient
}
[HttpPost]
public async Task<IActionResult> PostRequest([FromBody]RequestDto request)
{
CompositeFilterMapper compositeFilterMapper = new CompositeFilterMapper();
//Grab a page from elastic
selector = search =>
search.Index("your-index")
.From(request.Page.Skip)
.Size(request.Page.Take)
.Query(q =>
compositeFilterMapper.GetQueryContainer(q));
var searchResponse = _elasticClient.Search(selector);
return Ok(new ResponseDto
{
Results = searchResponse.Documents,
Count = searchResponse.Total
});
}
}

view raw
GridController.cs
hosted with ❤ by GitHub

public class ResponseDto<T>
{
public IEnumerable<T> Results { get; set; }
public long Count { get; set; }
}

view raw
ResponseDto.cs
hosted with ❤ by GitHub

The only thing missing now is the query to work is the CompositeFilterMapper.

class CompositeFilterMapper<T>
{
private readonly CompositeFilterDescriptor _compositeFilterDescriptor;=
public CompositeFilterMapper(CompositeFilterDescriptor compositeFilterDescriptor)
{
_compositeFilterDescriptor = compositeFilterDescriptor;
}
public QueryContainer GetQueryContainer(QueryContainerDescriptor<ElasticDataUpload<T>> query)
{
return ApplyQuery(new[] {_compositeFilterDescriptor}, _compositeFilterDescriptor.Logic, query);
}
private QueryContainerDescriptor<T> ApplyQuery(
IEnumerable<CompositeFilterDescriptor> filterFilters, string filterOperator,
QueryContainerDescriptor<T> query)
{
List<QueryContainerDescriptor<T>> innerQueries =
new List<QueryContainerDescriptor<T>>();
foreach (var filter in filterFilters)
{
if (filter.Filters != null && filter.Filters.Any())
{
innerQueries.Add(ApplyQuery(filter.Filters, filter.Logic, query));
}
else
{
innerQueries.Add((QueryContainerDescriptor<T>)CreateQuery(filter.Field, filter.Operator, filter.IgnoreCase, filter.Logic, filter.Value, query));
}
}
if (filterOperator == "and")
{
return (QueryContainerDescriptor<T>)query.Bool(b => b.Must(innerQueries.Cast<QueryContainer>().ToArray()));
}
else if (filterOperator == "or")
{
return (QueryContainerDescriptor<T>) query.Bool(b => b.Should(innerQueries.Cast<QueryContainer>().ToArray()));
}
else
{
throw new ArgumentOutOfRangeException();
}
}
private QueryContainer CreateQuery(string filterField,
string filterOperator,bool? filterIgnoreCase, string filterLogic, object value,
QueryContainerDescriptor<T> query)
{
var type = MapToType(filterField);
var propertyExpression = MapToProperty(filterField);
return CreateQuery(query,type, propertyExpression, filterOperator, value);
}
private QueryContainer CreateQuery(QueryContainerDescriptor<T> query,
Type fieldType, Expression<Func<T, object>> propertyExpression,
string filterOperator, object value)
{
if (fieldType == typeof(String))
{
return CreateStringFilter(query,propertyExpression, filterOperator, value);
}
else if (fieldType == typeof(DateTime))
{
return CreateDateTimeFilter(query, propertyExpression, filterOperator, value);
}
else
{
throw new ArgumentOutOfRangeException();
}
}
private QueryContainer CreateDateTimeFilter(QueryContainerDescriptor<ElasticDataUpload<SystemLog>> query,
Expression<Func<ElasticDataUpload<SystemLog>, object>> propertyExpression, string filterOperator,
object value)
{
switch (filterOperator)
{
case "eq":
return query.DateRange(d => d.Field(propertyExpression)
.GreaterThanOrEquals(DateMath.Anchored((DateTime)value))
.LessThanOrEquals(DateMath.Anchored((DateTime)value)));
case "neq":
return query.DateRange(d => d.Field(propertyExpression)
.GreaterThanOrEquals(DateMath.Anchored((DateTime)value))
.LessThanOrEquals(DateMath.Anchored((DateTime)value)));
case "isnull":
throw new ArgumentException("Unable to apply less than to date and time queries");
case "isnotnull":
throw new ArgumentException("Unable to apply less than to date and time queries");
case "lt":
return query.DateRange(d => d.Field(propertyExpression).LessThan(DateMath.Anchored((DateTime)value)));
case "lte":
return query.DateRange(d => d.Field(propertyExpression).LessThanOrEquals(DateMath.Anchored((DateTime)value)));
case "gt":
return query.DateRange(d => d.Field(propertyExpression).GreaterThan(DateMath.Anchored((DateTime)value)));
case "gte":
return query.DateRange(d => d.Field(propertyExpression).GreaterThanOrEquals(DateMath.Anchored((DateTime)value)));
case "startswith":
throw new ArgumentException("Unable to apply less than to date and time queries");
case "endswith":
throw new ArgumentException("Unable to apply less than to date and time queries");
case "contains":
throw new ArgumentException("Unable to apply less than to date and time queries");
case "doesnotcontain":
throw new ArgumentException("Unable to apply less than to date and time queries");
case "isempty":
throw new ArgumentException("Unable to apply less than to date and time queries");
case "isnotempty":
throw new ArgumentException("Unable to apply less than to date and time queries");
default:
throw new ArgumentOutOfRangeException();
}
}
private QueryContainer CreateStringFilter(QueryContainerDescriptor<T> query,
Expression<Func<T, object>> propertyExpression,
string filterOperator, object value)
{
switch (filterOperator)
{
case "eq":
return query.QueryString(qs => qs.Fields(fs => fs.Field(propertyExpression)).Query((string)value));
case "neq":
return query.Bool(b => b.MustNot(x => x.QueryString(qs => qs.Fields(fs => fs.Field(propertyExpression)).Query((string)value))));
case "isnull":
return query.QueryString(qs => qs.Fields(fs => fs.Field(propertyExpression)).Query(null));
case "isnotnull":
return query.Bool(b => b.MustNot(x => x.QueryString(qs => qs.Fields(fs => fs.Field(propertyExpression)).Query(null))));
case "lt":
throw new ArgumentException("Unable to apply less than to string queries");
case "lte":
throw new ArgumentException("Unable to apply less than equal to string queries");
case "gt":
throw new ArgumentException("Unable to apply greater than to string queries");
case "gte":
throw new ArgumentException("Unable to apply greater than equal to string queries");
case "startswith":
return query.Bool(b => b.Should(x => x.Term(propertyExpression, value)));
case "endswith":
return query.Bool(b => b.Should(x => x.Term(propertyExpression, value)));
case "contains":
return query.Match(qs => qs.Field(propertyExpression).Query((string)value).Operator(Operator.And));
case "doesnotcontain":
return query.Bool(b => b.MustNot(x => x.Match(qs => qs.Field(propertyExpression).Query((string)value).Operator(Operator.And))));
case "isempty":
return query.QueryString(qs => qs.Fields(fs => fs.Field(propertyExpression)).Query(""));
case "isnotempty":
return query.Bool(b => b.MustNot(x => x.QueryString(qs => qs.Fields(fs => fs.Field(propertyExpression)).Query(""))));
default:
throw new ArgumentOutOfRangeException();
}
}
private System.Linq.Expressions.Expression<Func<T, object>> MapToProperty(string fieldName)
{
switch (fieldName)
{
//return expressions from property names
default:
throw new ArgumentOutOfRangeException();
}
}
private Type MapToType(string fieldName)
{
switch (fieldName)
{
// Map field name to type
// return typeof(String);
default:
throw new ArgumentOutOfRangeException();
}
}
}

You will need to build in your own express and type mapping for properties, but otherwise this is built for Strings and DateTimes. From this base you should be able to implement different types and queries you would need for the Kendo Grid to work with ElasticSearch.

Elastic Search – “All shards failed” on pagination

If you are trying to page past the first 10000 documents in an Elasticsearch index and have not set the max_result_window setting for that index then you may receive one of the two following errors:

  • All shards failed
  • Result window is too large, from + size must be less than or equal to: [10000] but was [*].

To resolve this the max_result_window setting must be set for the index that is being paged through. Using CURL or the Kibana dev console and make a PUT request to update the setting.

PUT {MY_INDEX}/_settings" 
{ 
   "index" : { "max_result_window" : {MAX_VALUE}} 
}

Replacing MY_INDEX with the index you wish to update the settings to and replacing MAX_VALUE with the maximum value for the result window. For the current project it was set to 50,000,000.

Storing Event Data in Elastic Search

There was a CPU and network issue when the hosts uploaded data directly from the client to Elastic Search.  To allow for the same data load without the elastic overhead running on the client the following architecture was used:

  • Hosts use Event Hubs to upload the telemetry data
  • Consume Event Hub data with Stream Analytics
  • Output Stream Analytics query to Azure Function
  • Azure Function to upload output to Elastic Search

Event Hub

To start, the hosts needed an Event Hub to upload the data. For other projects Azure IoT Hub can be used due to Stream Analytics being able to ingest both. Event Hub was chosen so that each client would not need to provision as a device.

Create an Event Hubs namespace

  1. Log on to the Azure portal, and click Create a resource at the top left of the screen.
  2. Click Internet of Things, and then click Event Hubs.
  3. In Create namespace, enter a namespace name. The system immediately checks to see if the name is available.
  4. After making sure the namespace name is available, choose the pricing tier (Basic or Standard). Also, choose an Azure subscription, resource group, and location in which to create the resource.
  5. Click Create to create the namespace. You may have to wait a few minutes for the system to fully provision the resources.
  6. In the portal list of namespaces, click the newly created namespace.
  7. Click Shared access policies, and then click RootManageSharedAccessKey.
  8. Click the copy button to copy the RootManageSharedAccessKey connection string to the clipboard. Save this connection string in a temporary location, such as Notepad, to use later.

Create an event hub

  1. In the Event Hubs namespace list, click the newly created namespace.
  2. In the namespace blade, click Event Hubs.
  3. At the top of the blade, click Add Event Hub.
  4. Type a name for your event hub, then click Create.

Your event hub is now created, and you have the connection strings you need to send and receive events.

Stream Analytics

Create a Stream Analytics job

  1. In the Azure portal, click the plus sign and then type STREAM ANALYTICS in the text window to the right. Then select Stream Analytics job in the results list.Create a new Stream Analytics job
  2. Enter a unique job name and verify the subscription is the correct one for your job. Then either create a new resource group or select an existing one on your subscription.
  3. Then select a location for your job. For speed of processing and reduction of cost in data transfer selecting the same location as the resource group and intended storage account is recommended.Create a new Stream Analytics job details

    Note

    You should create this storage account only once per region. This storage will be shared across all Stream Analytics jobs that are created in that region.

  4. Check the box to place your job on your dashboard and then click CREATE.job creation in progress
  5. You should see a ‘Deployment started…’ displayed in the top right of your browser window. Soon it will change to a completed window as shown below.job creation in progress

Create an Azure Stream Analytics query

After your job is created it’s time to open it and build a query. You can easily access your job by clicking the tile for it.

Job tile

In the Job Topology pane click the QUERY box to go to the Query Editor. The QUERY editor allows you to enter a T-SQL query that performs the transformation over the incoming event data.

Query box

Create data stream input from Event Hubs

Azure Event Hubs provides highly scalable publish-subscribe event ingestors. An event hub can collect millions of events per second, so that you can process and analyze the massive amounts of data produced by your connected devices and applications. Event Hubs and Stream Analytics together provide you with an end-to-end solution for real-time analytics—Event Hubs let you feed events into Azure in real time, and Stream Analytics jobs can process those events in real time. For example, you can send web clicks, sensor readings, or online log events to Event Hubs. You can then create Stream Analytics jobs to use Event Hubs as the input data streams for real-time filtering, aggregating, and correlation.

The default timestamp of events coming from Event Hubs in Stream Analytics is the timestamp that the event arrived in the event hub, which is EventEnqueuedUtcTime. To process the data as a stream using a timestamp in the event payload, you must use the TIMESTAMP BY keyword.

Consumer groups

You should configure each Stream Analytics event hub input to have its own consumer group. When a job contains a self-join or when it has multiple inputs, some input might be read by more than one reader downstream. This situation impacts the number of readers in a single consumer group. To avoid exceeding the Event Hubs limit of five readers per consumer group per partition, it’s a best practice to designate a consumer group for each Stream Analytics job. There is also a limit of 20 consumer groups per event hub. For more information, see Event Hubs Programming Guide.

Configure an event hub as a data stream input

The following table explains each property in the New input blade in the Azure portal when you configure an event hub as input.

Property Description
Input alias A friendly name that you use in the job’s query to reference this input.
Service bus namespace An Azure Service Bus namespace, which is a container for a set of messaging entities. When you create a new event hub, you also create a Service Bus namespace.
Event hub name The name of the event hub to use as input.
Event hub policy name The shared access policy that provides access to the event hub. Each shared access policy has a name, permissions that you set, and access keys.
Event hub consumer group (optional) The consumer group to use to ingest data from the event hub. If no consumer group is specified, the Stream Analytics job uses the default consumer group. We recommend that you use a distinct consumer group for each Stream Analytics job.
Event serialization format The serialization format (JSON, CSV, or Avro) of the incoming data stream.
Encoding UTF-8 is currently the only supported encoding format.
Compression (optional) The compression type (None, GZip, or Deflate) of the incoming data stream.

When your data comes from an event hub, you have access to the following metadata fields in your Stream Analytics query:

Property Description
EventProcessedUtcTime The date and time that the event was processed by Stream Analytics.
EventEnqueuedUtcTime The date and time that the event was received by Event Hubs.
PartitionId The zero-based partition ID for the input adapter.

For example, using these fields, you can write a query like the following example:

SELECT
    EventProcessedUtcTime,
    EventEnqueuedUtcTime,
    PartitionId
FROM Input

Azure Functions (In Preview)

Azure Functions is a serverless compute service that enables you to run code on-demand without having to explicitly provision or manage infrastructure. It lets you implement code that is triggered by events occurring in Azure or third-party services. This ability of Azure Functions to respond to triggers makes it a natural output for an Azure Stream Analytics. This output adapter allows users to connect Stream Analytics to Azure Functions, and run a script or piece of code in response to a variety of events.

Azure Stream Analytics invokes Azure Functions via HTTP triggers. The new Azure Function Output adapter is available with the following configurable properties:

Property Name Description
Function App Name of your Azure Functions App
Function Name of the function in your Azure Functions App
Max Batch Size This property can be used to set the maximum size for each output batch that is sent to your Azure Function. By default, this value is 256 KB
Max Batch Count As the name indicates, this property lets you specify the maximum number of events in each batch that gets sent to Azure Functions. The default max batch count value is 100
Key If you want to use an Azure Function from another subscription, you can do so by providing the key to access your function

Note that when Azure Stream Analytics receives 413 (http Request Entity Too Large) exception from Azure function, it reduces the size of the batches it sends to Azure Functions. In your Azure function code, use this exception to make sure that Azure Stream Analytics doesn’t send oversized batches. Also, make sure that the max batch count and size values used in the function are consistent with the values entered in the Stream Analytics portal.

Also, in a situation where there is no event landing in a time window, no output is generated. As a result, computeResult function is not called. This behavior is consistent with the built-in windowed aggregate functions.

Query

The query itself if basic for now. There is no need for the advanced query features of Stream Analytics for the host data at the moment however it will be used later for creating workflows for spawning and reducing hosts.

Currently, the query will batch the data outputs from event hub every second. This is simple to accomplish this using the windowing functions provided by Stream Analytics. In the Westworld of Warcraft host query, a tumbling window batches the data every one second. The query looks as follows:

SELECT
    Collect()
INTO
    ElasticUploadFunction
FROM
    HostIncomingData
GROUP BY TumblingWindow(Duration(second, 1), Offset(millisecond, -1))

 

Azure Function

Create a function app

You must have a function app to host the execution of your functions. A function app lets you group functions as a logic unit for easier management, deployment, and sharing of resources.

  1. Click Create a resource in the upper left-hand corner of the Azure portal, then select Compute > Function App.Create a function app in the Azure portal
  2. Use the function app settings as specified in the table below the image.Define new function app settings
    Setting Suggested value Description
    App name Globally unique name Name that identifies your new function app. Valid characters are a-z, 0-9, and -.
    Subscription Your subscription The subscription under which this new function app is created.
    Resource Group myResourceGroup Name for the new resource group in which to create your function app.
    OS Windows Serverless hosting is currently only available when running on Windows. For Linux hosting, see Create your first function running on Linux using the Azure CLI.
    Hosting plan Consumption plan Hosting plan that defines how resources are allocated to your function app. In the default Consumption Plan, resources are added dynamically as required by your functions. In this serverless hosting, you only pay for the time your functions run.
    Location West Europe Choose a region near you or near other services your functions access.
    Storage account Globally unique name Name of the new storage account used by your function app. Storage account names must be between 3 and 24 characters in length and may contain numbers and lowercase letters only. You can also use an existing account.
  3. Click Create to provision and deploy the new function app. You can monitor the status of the deployment by clicking the Notification icon in the upper-right corner of the portal.Define new function app settingsClicking Go to resource takes you to your new function app.
[FunctionName("StreamOutputToElastic")]
public static async Task<HttpResponseMessage> Run([HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)]HttpRequestMessage req, TraceWriter log)
{
var node = new Uri("http://{username}:{password}@{ipaddress}:9200");
var settings = new ConnectionSettings(node);
settings.DefaultIndex("my-index");
var elasticClient = new ElasticClient(settings);
var records = await req.Content.ReadAsAsync<T>();
await elasticClient.IndexManyAsync(timeStampedEvents, "my-index-name");
return req.CreateResponse(HttpStatusCode.OK);
}

Elastic Search

Now the data is in Elastic Search which if the instructions in the Elastic Search setup post were followed, should be accessible from the Kibana endpoint.

Getting started with Elastic Search in Azure

For the Westworld of Warcraft project, a data store for the host data is required and due to needing to learn Elastic Search for another client, it was chosen. To get started an Elastic Search cluster needed to be deployed in the Azure environment. There is a template in the Azure Marketplace that makes setup easy.

Both Azure docs and Elastic have getting started guides that should be looked over before setting up an enterprise cluster in Azure. For the Westworld of Warcraft project, there only needed to be a public end point for ingest, a public end point for consumption, and a public end point for a jump box. Using the Azure Marketplace template, Kibana was selected as the jump and the load balancer was set to external. The load balancer set to external was specific to this project because the Westworld of Warcraft clients needed a public endpoint to upload to (this changed after a better solution was found).

ElasticInstallKibanaLoadBalance

Once the cluster successfully deployed, a public IP address is created for the load balancer. That IP address is used by the Elastic NEST library in the application. To connect to the external load balancer, use a URI that follows the following format:

{username}:{password}@{external loadbalancer ip}:9200

Now test your configuration with the following code (replacing the URI string with your components):

public class Person
{
    public int Id { get; set; }
    public string FirstName { get; set; }
    public string LastName { get; set; }
}
var settings = new ConnectionSettings
     (new Uri("{username}:{password}@{external loadbalancer ip}:9200"))
    .DefaultIndex("people");

var client = new ElasticClient(settings);

var person = new Person
{
    Id = 1,
    FirstName = "Martijn",
    LastName = "Laarman"
};

var indexResponse = client.IndexDocument(person);