OPC-UA and MQTT are the two protocols that move data off industrial equipment and IoT devices. Getting those readings into a SQL database without writing a custom integration layer is where most setups stall. This tutorial shows how Telegraf bridges that gap: OPC-UA or MQTT on the input side, CrateDB on the output, and standard SQL from the moment the first reading lands.
What Telegraf does in this stack
Telegraf separates data collection from data delivery. Input plugins read from sources: OPC-UA servers, MQTT brokers, Modbus devices, Kafka topics, and dozens more. Output plugins write to destinations. The two sides are independent.
In this setup:
inputs.opcuareads from factory-floor OPC-UA serversinputs.mqtt_consumersubscribes to MQTT broker topicsoutputs.cratedbwrites everything to CrateDB in a consistent schema
You can run both inputs simultaneously. OPC-UA readings from PLCs and MQTT readings from edge sensors land in the same table and are queryable together in a single SQL statement.
Configuring the OPC-UA input
OPC-UA is the dominant protocol for factory-floor communication: PLCs, SCADA systems, process historians, and most modern industrial equipment expose an OPC-UA server by default. Telegraf's polling input reads configured node values at a fixed interval.
This input block reads temperature and pressure nodes and tags each reading with the production line and plant:
[[inputs.opcua]] name = "opcua" endpoint = "opc.tcp://plc-host:4840" connect_timeout = "10s" request_timeout = "5s" security_policy = "None" security_mode = "None" [[inputs.opcua.nodes]] name = "temperature" namespace = "3" identifier_type = "s" identifier = "Temperature" tags = [["line", "line1"], ["plant", "plant_a"]] [[inputs.opcua.nodes]] name = "pressure" namespace = "3" identifier_type = "s" identifier = "Pressure" tags = [["line", "line1"], ["plant", "plant_a"]]
endpoint is the OPC-UA server URL on your PLC or SCADA host. The port is typically 4840.
namespace and identifier locate the node in the OPC-UA address space. The exact values come from your server's browse tree. Tools like UaExpert (free from Unified Automation) let you browse the server and copy node identifiers directly.
tags adds key-value pairs to every reading from this node. Tag values become filterable columns in CrateDB. Add as many tags as your query needs: line, plant, asset ID, equipment type.
For secured OPC-UA connections with Basic256Sha256 or Aes256_Sha256_RsaPss policies, add your certificate and key paths to the config block. The Telegraf documentation covers the certificate setup for each security mode.
If your OPC-UA server supports subscriptions (push-based delivery rather than polling), use inputs.opcua_listener instead. The node configuration is the same; the delivery model differs.
Configuring the MQTT input
MQTT is the protocol of choice for edge devices and sensors that need a lightweight pub/sub channel: vibration sensors, energy meters, environmental monitors, and constrained hardware that cannot run an OPC-UA server.
This input block subscribes to factory sensor topics and expects JSON payloads:
[[inputs.mqtt_consumer]] servers = ["tcp://mqtt-broker:1883"] topics = [ "factory/+/temperature", "factory/+/pressure", "factory/+/vibration" ] data_format = "json" json_time_key = "ts" json_time_format = "unix_ms" topic_tag = "topic" [inputs.mqtt_consumer.tags] protocol = "mqtt"
The + wildcard in topic patterns matches one path segment. factory/+/temperature captures factory/line1/temperature, factory/line2/temperature, and all other lines in a single subscription.
data_format controls how Telegraf parses each message payload. json works for most modern IoT devices. For binary formats such as Protobuf or Avro, Telegraf has corresponding parsers: set data_format to the matching type and add the parser-specific config.
json_time_key tells Telegraf which JSON field contains the reading timestamp. Set json_time_format to match how your devices encode time: unix_ms for milliseconds since epoch, unix for seconds, RFC3339 for ISO 8601 strings. If your devices do not include a timestamp in the payload, remove both lines. Telegraf uses its own clock.
topic_tag preserves the full MQTT topic as a tag on every record. This lets you filter by topic in SQL: WHERE tags['topic'] LIKE 'factory/line1/%'.
The CrateDB output
[[outputs.cratedb]] url = "postgres://crate@localhost/doc?sslmode=disable" timeout = "5s" table = "sensor_data" table_create = true key_separator = "_"
table_create = true creates the target table on the first write. No DDL required before you start. All inputs writing to this output share the same table.
For CrateDB Cloud, use your cluster credentials:
[[outputs.cratedb]] url = "postgres://your-user:your-password@your-cluster.cratedb.net:5432/doc?sslmode=require" timeout = "5s" table = "sensor_data" table_create = true key_separator = "_"
For a full walkthrough of output plugin configuration, the data model CrateDB uses for Telegraf writes, and how to update Grafana dashboards for the object column schema, see Migrating from InfluxDB to CrateDB: The Telegraf Output Plugin Swap.
The auto-created table in CrateDB
The auto-created table has this structure:
| column_name | data_type |
|---|---|
| hash_id | TEXT |
| timestamp | TIMESTAMP WITH TIME ZONE |
| name | TEXT |
| tags | OBJECT |
| fields | OBJECT |
| day | TIMESTAMP WITH TIME ZONE |
name is the Telegraf measurement name: "opcua" for OPC-UA readings, "mqtt_consumer" for MQTT readings. tags holds all tag key-value pairs. fields holds all numeric sensor values. day is an auto-generated partition column.
Run this query if you want to create the table manually:
CREATE TABLE IF NOT EXISTS doc.sensor_data ( hash_id TEXT, "timestamp" TIMESTAMP WITH TIME ZONE, name TEXT, tags OBJECT (DYNAMIC), fields OBJECT (DYNAMIC), day TIMESTAMP WITH TIME ZONE GENERATED ALWAYS AS date_trunc('day', "timestamp"), PRIMARY KEY (hash_id, day) ) PARTITIONED BY (day);
If Telegraf is not yet connected to your cluster, run this query to insert sample data and test your queries against a live table.
INSERT INTO doc.sensor_data (hash_id, "timestamp", name, tags, fields) VALUES ('a1b2c3d4e5f6', '2026-06-04T09:59:00.000Z', 'opcua', {plant = 'plant_a', line = 'line1', asset_id = 'asset-001'}, {temperature = 71.4, pressure = 1.011}), ('b2c3d4e5f6a1', '2026-06-04T09:59:10.000Z', 'opcua', {plant = 'plant_a', line = 'line1', asset_id = 'asset-001'}, {temperature = 72.3, pressure = 1.013}), ('c3d4e5f6a1b2', '2026-06-04T09:59:20.000Z', 'opcua', {plant = 'plant_a', line = 'line1', asset_id = 'asset-001'}, {temperature = 73.1, pressure = 1.015}), ('d4e5f6a1b2c3', '2026-06-04T09:59:30.000Z', 'opcua', {plant = 'plant_a', line = 'line1', asset_id = 'asset-002'}, {temperature = 74.0, pressure = 1.018}), ('e5f6a1b2c3d4', '2026-06-04T09:59:40.000Z', 'opcua', {plant = 'plant_a', line = 'line1', asset_id = 'asset-002'}, {temperature = 73.8, pressure = 1.017}), ('f6a1b2c3d4e5', '2026-06-04T09:59:50.000Z', 'opcua', {plant = 'plant_a', line = 'line1', asset_id = 'asset-002'}, {temperature = 72.9, pressure = 1.014}), -- different line — filtered out by WHERE tags['line'] = 'line1' ('a1b2c3d4e5f7', '2026-06-04T09:59:20.000Z', 'opcua', {plant = 'plant_a', line = 'line2', asset_id = 'asset-003'}, {temperature = 68.5, pressure = 0.998}), -- different protocol — filtered out by WHERE name = 'opcua' ('a1b2c3d4e5f8', '2026-06-04T09:59:30.000Z', 'mqtt_consumer', {plant = 'plant_a', line = 'line1', asset_id = 'asset-001'}, {vibration = 0.42, humidity = 55.0});
Verifying the data landed
After restarting Telegraf, confirm data is arriving:
SELECT * FROM doc.sensor_data LIMIT 5;
To see which sources are active and how many records each has written:
SELECT DISTINCT name, COUNT(*) AS row_count FROM doc.sensor_data GROUP BY name ORDER BY row_count DESC;
Querying sensor data
All queries use standard SQL. Values inside the fields and tags object columns use bracket notation.
Read the latest readings from a production line
SELECT timestamp, fields['temperature'] AS temperature, fields['pressure'] AS pressure FROM doc.sensor_data WHERE name = 'opcua' AND tags['plant'] = 'plant_a' AND tags['line'] = 'line1' ORDER BY timestamp DESC LIMIT 20;
Aggregate over a rolling time window
SELECT date_trunc('minute', timestamp) AS bucket, AVG(fields['temperature']) AS avg_temp, MAX(fields['temperature']) AS max_temp, MIN(fields['pressure']) AS min_pressure FROM doc.sensor_data WHERE name = 'opcua' AND tags['line'] = 'line1' AND timestamp > NOW() - INTERVAL '1 hour' GROUP BY bucket ORDER BY bucket;
This query maps directly to a Grafana time-series panel. Add CrateDB as a PostgreSQL data source in Grafana, paste this query, and you have a live dashboard.
Correlating OPC-UA and MQTT in one query
This is where a SQL analytics layer adds something a purpose-built time-series database cannot match without an external join step.
OPC-UA data from PLCs (name = 'opcua') and MQTT data from edge sensors (name = 'mqtt_consumer') land in the same table. One query reads across both protocols, correlating by line and time bucket:
SELECT opc.timestamp, opc.tags['line'] AS line, opc.fields['temperature'] AS plc_temperature, opc.fields['pressure'] AS plc_pressure, mqtt.fields['vibration'] AS sensor_vibration FROM doc.sensor_data AS opc JOIN doc.sensor_data AS mqtt ON date_trunc('minute', opc.timestamp) = date_trunc('minute', mqtt.timestamp) AND opc.tags['line'] = mqtt.tags['line'] WHERE opc.name = 'opcua' AND mqtt.name = 'mqtt_consumer' AND opc.tags['line'] = 'line1' AND opc.timestamp > NOW() - INTERVAL '30 minutes' ORDER BY opc.timestamp DESC;
Now, let's create and populate the "assets metadata table, with equipment context:
CREATE TABLE IF NOT EXISTS doc.assets ( asset_id TEXT PRIMARY KEY, asset_type TEXT, location TEXT );
INSERT INTO doc.assets (asset_id, asset_type, location) VALUES ('asset-001', 'conveyor', 'plant_a / line1 / zone-a'), ('asset-002', 'compressor', 'plant_a / line1 / zone-b'), ('asset-003', 'conveyor', 'plant_a / line2 / zone-a'), ('asset-004', 'heat-exchanger', 'plant_a / line2 / zone-c');
If you join the sensor_data table against it in the same statement:
SELECT s.timestamp, s.fields['temperature'] AS temperature, s.fields['vibration'] AS vibration, a.asset_type, a.location FROM doc.sensor_data AS s JOIN doc.assets AS a ON s.tags['asset_id'] = a.asset_id WHERE s.name = 'opcua' AND s.timestamp > NOW() - INTERVAL '1 hour' ORDER BY s.timestamp DESC;
No separate pipeline step. No data export. The join runs on data that arrived seconds ago.
Where this fits in the industrial stack
Telegraf handles the OT connectivity layer. CrateDB handles the analytics layer. The industrial data historian typically stays at the OT layer: historians are optimized for ingestion and process data storage at the edge. CrateDB sits above it as the SQL analytics layer where dashboards, anomaly detection queries, and cross-plant comparisons run.
For a complete picture of how this fits into the broader IoT analytics architecture (including ingestion, storage, and query patterns for multi-site deployments), see Real-Time IoT Analytics at Scale.
Next steps
The full stack runs in under 30 minutes from a fresh install: Telegraf collecting from your OPC-UA server or MQTT broker, CrateDB storing the data, and a Grafana dashboard querying it. The OT connectivity objection is a config file, not a project.
To test the query patterns against pre-loaded sensor data before touching your production pipeline, run queries on live data at cratedb.com/explore.