Data pipelines with CrateDB and Kestra¶
Kestra provides open‑source workflow automation and orchestration.
You can use it to automate and manage complex workflows efficiently. It integrates with Postgres, Git, Docker, Kubernetes, and more. The web UI lets you create, modify, and manage workflows without writing code.
In this usage guide, we will show you how CrateDB integrates with Kestra using the PostgreSQL plugin to create an efficient and scalable data pipeline.
Run Kestra on Docker¶
You can quickly start Kestra using Docker. First, install Docker on your machine if you haven’t already.
docker run --rm --name=kestra --publish=8080:8080 docker.io/kestra/kestra:latest
This starts the Kestra server on your local machine. Access it by navigating to http://localhost:8080 in your web browser. From there, you can start creating workflows and automating your processes using Kestra’s web interface.
Deploy CrateDB clusters¶
To deploy a new cluster on CrateDB Cloud, sign up for a CrateDB Cloud account. New organizations receive a free trial credit for cluster deployment, scaling, and other operations. After signing up, create a cluster by selecting Create Cluster and choosing your preferred cloud provider and region. In this example, we use a 1‑node cluster with 4 GiB of storage, sufficient for development and low‑traffic applications.
Once your cluster is up and running, use CrateDB’s powerful distributed SQL database features via the web-based Admin UI.
Provision databases¶
Let’s import some data on the first cluster. Navigate to the cluster overview page and click the Learn how to import data link.
CREATE TABLE "nyc_taxi" (
"congestion_surcharge" REAL,
"dolocationid" INTEGER,
"extra" REAL,
"fare_amount" REAL,
"improvement_surcharge" REAL,
"mta_tax" REAL,
"passenger_count" INTEGER,
"payment_type" INTEGER,
"pickup_datetime" TIMESTAMP WITH TIME ZONE,
"pulocationid" INTEGER,
"ratecodeid" INTEGER,
"store_and_fwd_flag" TEXT,
"tip_amount" REAL,
"tolls_amount" REAL,
"total_amount" REAL,
"trip_distance" REAL,
"vendorid" INTEGER) WITH ("column_policy" = 'dynamic', "number_of_replicas" = '0', "refresh_interval" = 10000);
COPY "nyc_taxi" FROM 'https://s3.amazonaws.com/crate.sampledata/nyc.yellowcab/yc.2019.07.gz' WITH (compression = 'gzip');
On the second cluster, create an empty nyc_taxi table.
Transfer data¶
You can move data between CrateDB clusters in several ways. The following example shows how to do this with Kestra.
Define workflow¶
As a next step, we will create a new Kestra Flow to move data between clusters.
Flows in Kestra implement workflows. Each flow uses a declarative YAML model and contains all the tasks in the order they will run. A flow must have an identifier (id), a namespace, and a list of tasks.
Define source¶
To read from a PostgreSQL-compatible database, such as CrateDB,
Kestra offers the io.kestra.plugin.jdbc.postgresql.Query plugin.
The Query task allows users to execute custom SQL queries against a database and use the results in their workflow. It offers various parameters such as auto-committing SQL statements, specifying access-control rules, and storing fetch results.
The following snippet shows the declaration of our workflow and the specification of the first task that selects data from the nyc_taxi table and runs the query on the first CrateDB cluster:
id: cratedb-kestra
namespace: io.kestra.crate
tasks:
- id: query
type: io.kestra.plugin.jdbc.postgresql.Query
url: jdbc:postgresql://cratedb-kestra.aks1.westeurope.azure.cratedb.net:5432/
username: admin
password: my_super_secret_password
sql: SELECT * FROM doc.nyc_taxi LIMIT 1000
store: true
In this task, we set the store parameter to true to store results for the next task.
Define target¶
In Kestra, a batch task is a type of task that allows you to fetch rows from a table and bulk insert them into another one. We will use an instance of a batch task to fetch the results of the first task and to insert the results into the table on the second cluster.
The following snippet shows the Batch task declaration used for inserting data into the table on the second CrateDB cluster:
- id: update
type: io.kestra.plugin.jdbc.postgresql.Batch
from: "{{ outputs.query.uri }}"
url: jdbc:postgresql://cratedb-kestra2.aks1.eastus2.azure.cratedb.net:5432/
username: admin
password: my_super_secret_password
sql: |
INSERT INTO doc.nyc_taxi VALUES( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )
When a Kestra task executes, it may create or modify a resource, such as a file, database record, or API endpoint.
Kestra allows the output produced by the task to be stored in the execution flow context and used by subsequent tasks.
The output object captures information about the task results, including any resources created or modified.
In our example, output.query.uri refers to the URI of the resource the previous task created.
Specifically, it refers to the URI of the database records returned by the SELECT statement.
Define result¶
Finally, once the data are imported to the second table, let’s create a new task that selects data from that table:
- id: select
type: io.kestra.plugin.jdbc.postgresql.Query
url: jdbc:postgresql://cratedb-kestra2.aks1.eastus2.azure.cratedb.net:5432/
username: admin
password: my_super_secret_password
sql: SELECT MAX_BY(passenger_count, fare_amount) FROM doc.nyc_taxi
store: false
In the last task, we select the passenger_count value with the highest fare_amount using the MAX_BY aggregation function. MAX_BY ranks among CrateDB’s latest aggregation functions. Learn more in our recent blog post.
Execute the flow¶
To execute the flow click on the New Execution button below the Flow specification. To monitor the execution of your Flow, check the Logs view:

The Logs view shows the execution status of each task, as well as the running times. There are other ways to monitor the execution of Flows in Kestra and we refer to the official Kestra documentation for a better overview of its full capabilities.
Finally, let’s check the data in the second cluster. As illustrated below, exactly 1000 records were inserted:

Wrap up¶
If you need to automatically manage CrateDB data pipelines, Kestra provides a strong solution. It lets you define workflows without writing code and integrates with PostgreSQL (and CrateDB), Kubernetes, Docker, Git, and more.
In this usage guide, we have also shown how to deploy your CrateDB cluster in a few clicks. If you want to try it out and enjoy all of the CrateDB features, sign up for the CrateDB Cloud trial.
To learn more about updates, features, and other questions you might have, join the CrateDB community.

