Load MQTT messages into CrateDB using Node-RED

Niklas Schmidtmer

March 29, 2023

6 min read

Node-RED is a workflow automation tool that lets you orchestrate message flows and transformations through a web interface. This tutorial shows how to read messages from an MQTT broker with Node-RED and insert them into CrateDB.

Prerequisites

You need:

  1. A running Node-RED installation.

  2. The node-red-contrib-postgresql module installed.

  3. A running MQTT broker. This tutorial uses HiveMQ Cloud.

1

Provision CrateDB

First of all, we create the target table in CrateDB:

CREATE TABLE nodered_target (
   ts TIMESTAMP WITH TIME ZONE DEFAULT NOW() NOT NULL,
   payload OBJECT(DYNAMIC)
);

Store the payload as CrateDB’s OBJECT data type to accommodate an evolving schema. For production, also consider the partitioning and sharding guide.

2

Publish messages to MQTT

First, generate data to populate the MQTT topic with Node-RED. If you already have an MQTT topic with regular messages, you can skip this part. Screenshot 2021-09-13 at 14.58.42|690x134, 50%

The inject node creates a JSON payload with three attributes: Screenshot 2021-09-13 at 14.56.42|690x293, 50%

In this example, two fields are static; only the timestamp changes. Download the full workflow definition: flows-producer.json (1.3 KB)

3

Consume messages into CrateDB

To ingest efficiently, group messages into batches and use multi-value INSERT statements to avoid generating one INSERT per message: Screenshot 2021-09-13 at 11.57.32|690x80

  1. HiveMQ Cloud: Consume messages from the configured MQTT topic.

  2. join: Merge a given number of messages into one array. The array length determines the number of rows inserted into CrateDB in one multi-value statement.

    Configure the join node to forward a message to the subsequent function node when either a) the array reaches a target size, or b) a timeout elapses. Tune these values based on your message rate and acceptable end‑to‑end latency.

  3. function: Reduce the array to a SQL VALUES string ((p1), (p2), ...) for the INSERT query.

  4. postgresql: Execute the INSERT using the CrateDB connection, interpolating values from the payload.

+{note} Security: Prefer parameterized queries to avoid SQL injection. If you must build a VALUES string, ensure proper escaping/encoding of all user-provided content. +

Download the full workflow definition: flows-consumer.json (2.6 KB)

4

Test the workflow

To test the workflow, click the square to the left of the timestamp node (Screenshot 2021-09-13 at 14.24.50|70x68, 40%) to inject a message. In this configuration, an INSERT triggers after two messages or after ten seconds if a second message does not arrive.

Then run a SELECT statement on your CrateDB cluster to see the inserted rows:

SELECT *
FROM nodered_target;

Screenshot 2021-09-13 at 16.05.33|690x419, 75%