Please note: MQTT support was deprecated in CrateDB 3.2 and will probably be removed by 4.0
MQTT (Message Queue Telemetry Transport) is a machine-to-machine communication protocol particularly well suited for machine data and Internet of Things (IoT) applications. It has gained a lot of popularity because of its simplicity and low network resource usage.
The MQTT protocol employs a publish-subscribe model where clients publish messages to a central broker which then dispatches them to subscribers and then acknowledges them according to the specified quality of service.
This model works well with event driven architectures where the subscribers can react to delivered messages.
However, sometimes it's useful to to store these messages so that you can analyze them and react to them (or a filtered or transformed version of them) a later point in time.
Normally, to achieve this, the subscribers would each need to individually transform (if necessary) and persist the messages as they receive them from the broker.
CrateDB simplifies this by acting as an MQTT endpoint that writes messages to user defined tables according to user defined rules. These tables can then be queried or polled at will by CrateDB clients.
With this setup, no intermediary brokers or message queues are needed.
Let's try this out for ourselves.
In this post I will show you how to set up a simple script that publishes system monitoring information via MQTT to CrateDB acting as an MQTT endpoint.
Publishing System Monitoring Data
Feel free to use any MQTT client to send messages on topic events/system_load with a valid JSON payload.
I will show you how to setup a Python 3 script that does this.
Create a project folder for the script:
$ mkdir system_load_mqtt_producer
$ cd system_load_mqtt_producer
We're going to need two dependencies:
List them as requirements, like so:
$ cat > requirements.txt << EOF
paho-mqtt==1.3.0
psutil==5.4.1
EOF
Now, create a file called
publish.py
with the following code:
import time
import paho.mqtt.client as mqtt
import psutil
if __name__ == '__main__':
mqttc = mqtt.Client()
mqttc.max_inflight_messages_set(1000)
mqttc.connect('127.0.0.1', port=1883, keepalive=60)
while True:
time.sleep(1)
payload = '' \
.format(psutil.cpu_percent(), psutil.virtual_memory().percent)
result = mqttc.publish('events/system_load', qos=1, payload=payload)
print(result)
This simple script will send an MQTT message every second reporting the system CPU and memory usage.
If you don't already have CrateDB installed, download and install CrateDB.
We need to configure CrateDB to act as a MQTT endpoint. This is as simple as setting the ingestion.mqtt.enabled
configuration setting to true.
Find your config/crate.yml
file, and add this line:
ingestion.mqtt.enabled: true
MQTT ingestion has now been activated.
By default, CrateDB will listen for MQTT messages on port 1883, but this can be changed using the ingestion.mqtt.port
setting. We could also enable SSL if we wanted, but we'll skip that for this tutorial.
Start CrateDB.
The output from CrateDB should indicate that it is listening on port 1883:
publish_address {127.0.0.1:1883}, bound_addresses {127.0.0.1:1883}
Before we can start publishing messages, we need to create a table where CrateDB can store MQTT messages.
Start crash and connect to your CrateDB node:
\c localhost:4200
If you are not running CrateDB locally, replace localhost
with the correct hostname.
The target table (where the messages will be written) needs to have the same data structure as the MQTT message structure.
So, create table like this:
CREATE TABLE IF NOT EXISTS mqtt.system_information (
client_id STRING,
packet_id INTEGER,
topic STRING,
ts TIMESTAMP,
payload OBJECT(IGNORED),
PRIMARY KEY ("client_id", "packet_id")
);
Next, create the ingestion rule that will route the system monitoring events to the mqtt.system_information
table:
CREATE INGEST RULE system_monitoring
ON mqtt
WHERE topic LIKE 'events/system%'
INTO mqtt.system_information;
This instructs CrateDB to create an ingestion rule named system_monitoring
that will write MQTT messages with a topic that starts with events/system
to the mqtt.system_information
table.
Notice that the topic is a field in the MQTT message.
You can construct more complex filtering scenarios involving other fields, if you want to. For example, adding AND client_id='FE:12:34:56'
to the WHERE
clause would filter messages so that only messages from the FE:12:34:56
client would be written to the table.
Furthermore, you can use the timestamp field to filter messages that are received during certain hours or on certain days.
Let's review what we have:
mqtt.system_information
table when the topic starts with events/system
events/system_load
Let's test things out by starting the script.
First, make sure CrateDB is running.
Them, in the script folder you created before, run this:
python3 -m venv venv
This initialises a Python virtual environment using the env
directory. You will need to run this command every time you want to start the script in a new shell.
Now, install the dependencies, like so:
pip install -r requirements.txt
Finally, run the script:
python publish.py
Now, connect to CrateDB and check the mqtt.system_information
table.
You should see something like this:
select * from mqtt.system_information;
+--------------------------------------+-----------+-----------------------------------------------------------+--------------------+---------------+
| client_id | packet_id | payload | topic | ts |
+--------------------------------------+-----------+-----------------------------------------------------------+--------------------+---------------+
| 16e69b59-cf38-4eca-a3cf-91202025aebf | 3 | {"cpu_percent": "12.6", "virtual_memory_percent": "75.7"} | events/system_load | 1510315872014 |
| 16e69b59-cf38-4eca-a3cf-91202025aebf | 5 | {"cpu_percent": "12.8", "virtual_memory_percent": "75.8"} | events/system_load | 1510315874023 |
| 16e69b59-cf38-4eca-a3cf-91202025aebf | 7 | {"cpu_percent": "16.4", "virtual_memory_percent": "76.0"} | events/system_load | 1510315876026 |
Hey presto! Our script is publishing system load information via MQTT to CrateDB, which is then writing it to a table.
In this post, we learned how to:
From here, you could experiment further by writing a client that polls CrateDB for messages.
Or, if you were feeling really adventurous, you could write your own ingestion source plugin for CrateDB. And if it’s something the community could benefit from, we’re interested in paying you. Get in touch.