Load MQTT messages into CrateDB using Node-RED¶
Node-RED is a workflow automation tool that lets you orchestrate message flows and transformations through a web interface. This tutorial shows how to read messages from an MQTT broker with Node-RED and insert them into CrateDB.
Prerequisites¶
You need:
A running Node-RED installation.
The node-red-contrib-postgresql module installed.
A running MQTT broker. This tutorial uses HiveMQ Cloud.
Provision CrateDB¶
First of all, we create the target table in CrateDB:
CREATE TABLE nodered_target (
ts TIMESTAMP WITH TIME ZONE DEFAULT NOW() NOT NULL,
payload OBJECT(DYNAMIC)
);
Store the payload as CrateDB’s OBJECT data type to accommodate an evolving schema. For production, also consider the partitioning and sharding guide.
Publish messages to MQTT¶
First, generate data to populate the MQTT topic with Node-RED. If you already
have an MQTT topic with regular messages, you can skip this part.

The inject node creates a JSON payload with three attributes:

In this example, two fields are static; only the timestamp changes. Download the full workflow definition: flows-producer.json (1.3 KB)
Consume messages into CrateDB¶
To ingest efficiently, group messages into batches and use
multi-value INSERT statements
to avoid generating one INSERT per message:

HiveMQ Cloud: Consume messages from the configured MQTT topic.
join: Merge a given number of messages into one array. The array length determines the number of rows inserted into CrateDB in one multi-value statement.Configure the
joinnode to forward a message to the subsequentfunctionnode when either a) the array reaches a target size, or b) a timeout elapses. Tune these values based on your message rate and acceptable end‑to‑end latency.function: Reduce the array to a SQL VALUES string ((p1), (p2), ...) for the INSERT query.postgresql: Execute the INSERT using the CrateDB connection, interpolating values from the payload.
+{note} Security: Prefer parameterized queries to avoid SQL injection. If you must build a VALUES string, ensure proper escaping/encoding of all user-provided content. +
Download the full workflow definition: flows-consumer.json (2.6 KB)
Test the workflow¶
To test the workflow, click the square to the left of the timestamp node
(
)
to inject a message. In this configuration, an INSERT triggers after two
messages or after ten seconds if a second message does not arrive.
Then run a SELECT statement on your CrateDB cluster to see the inserted rows:
SELECT *
FROM nodered_target;
