Data Enrichment using IoT Hubs, Azure Functions and CrateDB¶
This integration document details how to create an enrichment pipeline between data ingested into an Azure IoT Hub and CrateDB, using serverless Azure Functions.
Abstract¶
Within IoT scale data solutions, ingestion of data from a multi-device source like an Azure IoT Hub into a database such as CrateDB for data analysis is quite common. However, many use cases also call for the raw data in the IoT Hub source to be transformed or enriched before final insertion into CrateDB.
This can be accomplished using Azure Functions: event-driven code triggered by new data flowing into an IoT Hub.
An Azure Function is a short-lived, serverless computation that is triggered by external events. The trigger produces an input payload, which is delivered to the Azure Function. The Azure Function then does computation with this payload and subsequently outputs its result to other Azure Functions, computation services, or storage services.
A common pattern is to use an Azure Function to enrich and ingest data to a CrateDB instance by connecting that Azure Function to an IoT Hub’s new messages trigger.
When new messages arrive in an IoT Hub, they will be batched and sent to the Azure Function. This Azure Function can then perform enrichment computations on the raw payload. Finally, it ingests that payload into a CrateDB instance using whichever client corresponds to the language the Azure Function is written in.
Implementation¶
Set Up¶
To illustrate how this architecture can be used, we shall create a scenario in which we have a fleet of satellites, each sending their location as a latitude and longitude coordinate pair to an Azure IoT Hub. Each payload from each satellite looks like:
{
"id": "Zero Gravitas",
"type": "unmanned",
"location": {
"longitude": -164.5984,
"latitude": -24.9734
},
"timestamp": 1588240576
}
Each payload contains an id
field with the satellite’s name, the type of
satellite, the location of the satellite, and the timestamp of when the location
metric was taken.
The goal of this enrichment is two-fold. The first is to transform the
location
item into a WKT string, such as POINT (-24.9734 -164.5984)
before it is ingested. Secondly, we wish to branch which column the payloads
are ingested into, depending on whether the satellite is manned or unmanned.
Prerequisites¶
In order to deploy this architecture, there are several prerequisites:
An Azure IoT Hub with data flowing into it. Azure has some useful data simulators.
A running CrateDB cluster
CrateDB¶
Your CrateDB tables’ structure will look like this:
CREATE TABLE IF NOT EXISTS "doc"."satellites" (
"ts" TIMESTAMP WITHOUT TIME ZONE,
"ts_month" TIMESTAMP WITHOUT TIME ZONE GENERATED ALWAYS AS date_trunc('month', "ts"),
"unmanned" OBJECT(STRICT) AS (
"id" TEXT,
"location" GEO_POINT,
"timestamp" TIMESTAMP WITHOUT TIME ZONE
),
"manned" OBJECT(STRICT) AS (
"id" TEXT,
"location" GEO_POINT,
"timestamp" TIMESTAMP WITHOUT TIME ZONE
),
"debug" OBJECT(DYNAMIC)
) CLUSTERED INTO 4 SHARDS
PARTITIONED BY ("ts_month");
Azure Function¶
You should first create an environment for your function application. Following the documentation for whichever environment you would develop in (for example, Visual Studio Code), you should have a folder structure like this:
ReferenceArchitectureProject
| - IoTHubToCrateDBFunction
| | - index.js
| | - function.json
| - node_modules
| - host.json
| - package.json
| - extensions.csproj
| - local.settings.json
Your Azure Function, IoTHubToCrateDBFunction
, will be composed of 2 files:
function.json
which defines the metadata of your function.
index.js
which is your core Azure Function.
Dependencies¶
Your Azure Function will rely on the node-postgres library to connect to CrateDB. You can install this dependency by executing:
$ npm install pg
within the ReferenceArchitectureProject
folder. The dependency should now be
in the node_modules
folder, which you will bundle with your Azure Function
when you deploy the application.
function.json
¶
Your function.json
defines what events you want to bind to, as well as
things like event hub names, consumer groups, etc.
{
"bindings": [{
"type": "eventHubTrigger",
"name": "eventHubMessages",
"direction": "in",
"eventHubName": "default",
"connection": "EventHubConnectionString",
"cardinality": "many",
"consumerGroup": "cratefunction"
}]
}
Except for consumerGroup
, all values can be kept as-is and do not need to
be customized. Here are the parameters in more detail:
Parameter |
Description |
---|---|
|
The type of binding we associate with this function. In this case, it is an event hub trigger. |
|
The name that is used for the bound data in the Azure Function. |
|
Whether the binding is for receiving data into the
function, or sending the data from the function. We
have written |
|
The name of the event hub. The event hub name value in the connection string overwrites this parameter at runtime, so you can leave this default value unchanged. |
|
The name of the Azure Function application setting that contains our event hub’s namespace connection string. |
|
Whether or not we want to use batching, or to consume
messages one by one. Batching is far more efficient
for our use case, which is why it is set to |
|
Which consumer group will be used to consume messages
from the event hub. Ensure that it is unique, and not
already being used by another consumer. If omitted,
it will use the default consumer group of |
For more information, please consult the Azure Event Hubs bindings for Azure Functions documentation.
local.settings.json¶
To run this Azure Function application locally, you need to create a
local.settings.json
file at the root of your Azure Functions Application
project. It should have the following content:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "your web storage connection",
"FUNCTIONS_WORKER_RUNTIME": "node",
"EventHubConnectionString": "connection string to your event hub",
"CrateConnectionString": "connection string to CrateDB",
"SinkTable": "doc.satellites",
"SinkColumnUnmanned": "unmanned",
"SinkColumnManned": "manned"
}
}
The AzureWebJobsStorage
value can be obtained by navigating to your storage
account, selecting “Access keys”, and copying one of the shown connection
strings.
The EventHubConnectionString
can be copied from the
“Event Hub-compatible endpoint” field under the IoT hub’s “Built-in endpoints”
section. Ensure that the event hub connection string includes the
EntityPath=EVENTHUBNAME
at the end of it.
The CrateConnectionString
should be of the form
postgres://username:password@cratedbaddress:5432
. If the CrateDB cluster
requires SSL, append ?ssl=true.
SinkTable
is the name of the table you will be ingesting data into, with the
SinkColumnManned
and SinkColumnUnmanned
parameters defining what columns
you will ingest manned and unmanned satellite data into, respectively.
These settings are not deployed when you deploy the Azure Function. Instead,
before you deploy, you should set up an Azure Function Application within the
Azure Portal. You can do this directly from VSCode, or using the
Azure CLI. All settings shown above within the Values
object need to be set
up within the Azure Function App’s Application Settings, which can be found
under the “Configuration” menu item.
index.js¶
This file defines your Azure Function. Here it is in full, annotated so that you can go through it step by step:
// Importing the pg library we will be using to connect to CrateDB
const { Pool } = require('pg');
// Importing settings from the Azure Function Application's settings
const CRATE_CONNECTION_STRING = process.env['CrateConnectionString'];
const SINK_TABLE = process.env['SinkTable'];
const SINK_COLUMN_TIMESTAMPS = "ts";
const SINK_COLUMN_UNMANNED = process.env['SinkColumnUnmanned'];
const SINK_COLUMN_MANNED = process.env['SinkColumnManned'];
const SINK_COLUMN_DEBUG = "debug";
// A pool of connections to CrateDB that our Azure Function can utilize.
// Notice that this instantiation takes place outside of our Azure Function itself.
// This means we can use this pool across multiple Azure Function evocations.
const cratePool = new Pool({
connectionString: CRATE_CONNECTION_STRING,
idleTimeoutMillis: 15000,
connectionTimeoutMillis: 5000,
query_timeout: 30000,
});
// This is the Azure Function that will be called when new event hub messages are processed.
// It receives the context, an object that provides us information and functions to do with the context
// of the invocation, as well as an array of event hub messages.
module.exports = async function (context, eventHubMessages) {
// Initialise empty arrays for storing rows
let timestamps = []
let manned = []
let unmanned = []
let debugs = []
// Iterate over the received event hub messages
for (var i = 0; i < eventHubMessages.length; i++) {
// Extract the timestamp from the message
let timestamp = eventHubMessages[i]['timestamp'];
if (timestamp === undefined) {
context.log("Timestamp missing");
context.log(`'${JSON.stringify(eventHubMessages[i])}'`);
continue;
}
// Getting the enqueued time of the event hub payload for debugging purposes
let debug = {
'enqueued_time': context.bindingData.enqueuedTimeUtc
};
// Extract the satellite type
let satelliteType = eventHubMessages[i]['type'];
// Ditch the type, now that we no longer need it in the payload.
eventHubMessages[i]['type'] = undefined
// Replace the location with a WKT string
let longitude = eventHubMessages[i]['location']['longitude']
let latitude = eventHubMessages[i]['location']['latitude']
eventHubMessages[i]['location'] = `POINT ( ${longitude} ${latitude})`
// Set the specific column depending on whether the type is manned or unmanned
if (satelliteType === undefined) {
context.log("Satellite type missing");
context.log(`'${JSON.stringify(eventHubMessages[i])}'`);
continue;
} else if (satelliteType === 'manned') {
timestamps.push(timestamp)
manned.push(`'${JSON.stringify(eventHubMessages[i])}'`)
unmanned.push("null")
debugs.push(`'${JSON.stringify(debug)}'`)
} else if (satelliteType === 'unmanned') {
timestamps.push(timestamp)
manned.push("null")
unmanned.push(`'${JSON.stringify(eventHubMessages[i])}'`)
debugs.push(`'${JSON.stringify(debug)}'`)
} else {
context.log("Incompatible satellite type", messageType);
context.log(`'${JSON.stringify(eventHubMessages[i])}'`);
}
};
// Construct SQL insertion statement
// We do it this way so we can bulk insert the whole payload of event hub messages at once, rather than inserting row by row.
// However, the pg client does not support bulk inserts on the client side. Instead, we use UNNEST to do bulk insertion on the server side.
// See: https://cratedb.com/blog/bulk-inserts-with-unnest for more information.
const stmt = `INSERT INTO ${SINK_TABLE} (${SINK_COLUMN_TIMESTAMPS}, ${SINK_COLUMN_UNMANNED}, ${SINK_COLUMN_MANNED}, ${SINK_COLUMN_DEBUG}) ` +
`(SELECT * FROM UNNEST (['${timestamps.join("','")}'], [${unmanned}], [${manned}], [${debugs}]));`
const crateClient = await cratePool.connect();
const queryPromise = crateClient.query(stmt)
.catch(err => {
context.log.error(err);
throw err;
})
.finally(() => crateClient.release());
};
Testing¶
Before deploying the function, you can test it locally. For VSCode, please see the documentation’s debugging section for details. The following JSON document can be used as a test message:
{"input": "{\"id\": \"Zero Gravitas\", \"type\": \"unmanned\", \"location\": {\"longitude\": -164.5984,\"latitude\": -24.9734},\"timestamp\": 1588240576000}"}
To test the deployed Azure Function against an actual IoT hub, you can install VSCode’s Azure IoT Hub extension. Its documentation describes how to create a new device and send a device-to-cloud (D2C) message for testing purposes.