Live Stream: Turbocharge your aggregations, search & AI models & get real-time insights

Register now
Skip to content
Blog

Getting started with CrateDB as an MQTT endpoint

This article is more than 4 years old

Please note: MQTT support was deprecated in CrateDB 3.2 and will probably be removed by 4.0

MQTT (Message Queue Telemetry Transport) is a machine-to-machine communication protocol particularly well suited for machine data and Internet of Things (IoT) applications. It has gained a lot of popularity because of its simplicity and low network resource usage.

The MQTT protocol employs a publish-subscribe model where clients publish messages to a central broker which then dispatches them to subscribers and then acknowledges them according to the specified quality of service.

MQTT Broker Schema

This model works well with event driven architectures where the subscribers can react to delivered messages.

However, sometimes it's useful to to store these messages so that you can analyze them and react to them (or a filtered or transformed version of them) a later point in time.

Normally, to achieve this, the subscribers would each need to individually transform (if necessary) and persist the messages as they receive them from the broker.

MQTT Broker Schema 2

CrateDB simplifies this by acting as an MQTT endpoint that writes messages to user defined tables according to user defined rules. These tables can then be queried or polled at will by CrateDB clients.

With this setup, no intermediary brokers or message queues are needed.

CrateDB as an MQTT endpoint

Let's try this out for ourselves.

In this post I will show you how to set up a simple script that publishes system monitoring information via MQTT to CrateDB acting as an MQTT endpoint.

Publishing System Monitoring Data

Feel free to use any MQTT client to send messages on topic events/system_load with a valid JSON payload.

I will show you how to setup a Python 3 script that does this.

Create a project folder for the script:

$ mkdir system_load_mqtt_producer
$ cd system_load_mqtt_producer

We're going to need two dependencies:

List them as requirements, like so:

$ cat > requirements.txt << EOF
paho-mqtt==1.3.0
psutil==5.4.1
EOF

Now, create a file called

publish.py

with the following code:

import time
import paho.mqtt.client as mqtt
import psutil

if __name__ == '__main__':
    mqttc = mqtt.Client()
    mqttc.max_inflight_messages_set(1000)
    mqttc.connect('127.0.0.1', port=1883, keepalive=60)

    while True:
        time.sleep(1)

        payload = '' \
            .format(psutil.cpu_percent(), psutil.virtual_memory().percent)

        result = mqttc.publish('events/system_load', qos=1, payload=payload)
        print(result)

This simple script will send an MQTT message every second reporting the system CPU and memory usage.

Configure CrateDB as an MQTT Endpoint

If you don't already have CrateDB installed, download and install CrateDB.

We need to configure CrateDB to act as a MQTT endpoint. This is as simple as setting the ingestion.mqtt.enabled configuration setting to true.

Find your config/crate.yml file, and add this line:

ingestion.mqtt.enabled: true

MQTT ingestion has now been activated.

By default, CrateDB will listen for MQTT messages on port 1883, but this can be changed using the ingestion.mqtt.port setting. We could also enable SSL if we wanted, but we'll skip that for this tutorial.

Start CrateDB.

The output from CrateDB should indicate that it is listening on port 1883:

publish_address {127.0.0.1:1883}, bound_addresses {127.0.0.1:1883}

Before we can start publishing messages, we need to create a table where CrateDB can store MQTT messages.

Start crash and connect to your CrateDB node:

\c localhost:4200

If you are not running CrateDB locally, replace localhost with the correct hostname.

The target table (where the messages will be written) needs to have the same data structure as the MQTT message structure.

So, create table like this:

CREATE TABLE IF NOT EXISTS mqtt.system_information (
    client_id STRING,
    packet_id INTEGER,
    topic STRING,
    ts TIMESTAMP,
    payload OBJECT(IGNORED),
    PRIMARY KEY ("client_id", "packet_id")
);

Next, create the ingestion rule that will route the system monitoring events to the mqtt.system_information table:

CREATE INGEST RULE system_monitoring
                ON mqtt
             WHERE topic LIKE 'events/system%'
              INTO mqtt.system_information;

This instructs CrateDB to create an ingestion rule named system_monitoring that will write MQTT messages with a topic that starts with events/system to the mqtt.system_information table.

Notice that the topic is a field in the MQTT message.

You can construct more complex filtering scenarios involving other fields, if you want to. For example, adding AND client_id='FE:12:34:56' to the WHERE clause would filter messages so that only messages from the FE:12:34:56 client would be written to the table.

Furthermore, you can use the timestamp field to filter messages that are received during certain hours or on certain days.

Test Your Setup

Let's review what we have:

  • CrateDB configured as an MQTT endpoint that writes messages to the mqtt.system_information table when the topic starts with events/system
  • A small python script that regularly publishes MQTT messages with the topic events/system_load

Let's test things out by starting the script.

First, make sure CrateDB is running.

Them, in the script folder you created before, run this:

python3 -m venv venv

This initialises a Python virtual environment using the env directory. You will need to run this command every time you want to start the script in a new shell.

Now, install the dependencies, like so:

pip install -r requirements.txt

Finally, run the script:

python publish.py

Now, connect to CrateDB and check the mqtt.system_information table.

You should see something like this:

select * from mqtt.system_information;
+--------------------------------------+-----------+-----------------------------------------------------------+--------------------+---------------+
| client_id                            | packet_id | payload                                                   | topic              |            ts |
+--------------------------------------+-----------+-----------------------------------------------------------+--------------------+---------------+
| 16e69b59-cf38-4eca-a3cf-91202025aebf |         3 | {"cpu_percent": "12.6", "virtual_memory_percent": "75.7"} | events/system_load | 1510315872014 |
| 16e69b59-cf38-4eca-a3cf-91202025aebf |         5 | {"cpu_percent": "12.8", "virtual_memory_percent": "75.8"} | events/system_load | 1510315874023 |
| 16e69b59-cf38-4eca-a3cf-91202025aebf |         7 | {"cpu_percent": "16.4", "virtual_memory_percent": "76.0"} | events/system_load | 1510315876026 |

Hey presto! Our script is publishing system load information via MQTT to CrateDB, which is then writing it to a table.

Wrap Up

In this post, we learned how to:

  • Write a simple Python script that publishes MQTT messages
  • Configure crate as an MQTT endpoint
  • Create a an ingestion rule that writes MQTT messages to a CrateDB table

From here, you could experiment further by writing a client that polls CrateDB for messages.

Or, if you were feeling really adventurous, you could write your own ingestion source plugin for CrateDB. And if it’s something the community could benefit from, we’re interested in paying you. Get in touch.