Live Stream on Jan 23rd: Unlocking Real Time Insights in the Renewable Energy Sector with CrateDB

Register now
Skip to content
Resources > Academy > Advanced Time Series

Importing Data – Stream

Login or sign up for free CrateDB Academy: CrateDB Fundamentals

Time series applications work with real-time data in most of the cases. This chapter will provide an example how data can be streamed into CrateDB using Apache Kafka and Apache Flink. 

Ingesting time series data in a streaming fashion is crucial for several reasons. Firstly, streaming ingestion of time series data ensures that the database is always up-to-date, enabling accurate and current insights. Furthermore, it allows for real-time analysis and decision making, which is particularly important for industries where immediate action based on the latest data can lead to significant operational improvements or prevent potential issues. Lastly, it can efficiently handle the high volume and velocity of time series data, ensuring that the system can keep up with the data flow and prevent data loss. 

The architecture of the demo is designed to handle real-time ingestion and processing of time series data. It starts by frequently querying a Weather API through HTTP requests to gather the latest weather data. This data is then pushed into a Kafka topic, serving as a buffer and ensuring no data is lost even if there's a spike in incoming data. Then, a Flink job is used to consume the data from Kafka and incrementally ingest it into CrateDB. This architecture enables real-time data analysis, as the data in CrateDB is kept up-to-date with the latest weather conditions and other services can read either directly from the stream or from CrateDB. You can find the demo artifacts in our GitHub examples repository. 

In addition to the streaming data, this example demonstrates how easy it is to work with JSON data and how CrateDB’s flexible schema speeds up the development process. 

A complete JSON record is returned from the weather service with information about the location and the current weather conditions. As the details of both sections can change over time, both are created as dynamic objects in CrateDB. There is no need for a complex schema and type conversions while keeping the ability to automatically index all individual attributes for fast filtering and aggregations. Without further ado, one can create aggregations like the calculation of the average temperature in France. 

Also, the development process itself gets much easier. On the producer side, we can easily dump the JSON file into the Kafka topic. The Flink schema represents the two location and current conditions attributes, both represented as Strings, i.e. JSON-formatted text. 

With the automatic interpretation of JSON text in CrateDB, the actual write into the database is executing a simple insert of these two attributes based on their textural representations. If new attributes are added to location or current, they are automatically added in CrateDB as well, incl. type inference and automatic indexing for immediate querying. 

Let us have a look at the actual demonstration of the streaming ingest process. You can find all the resources in our cratedb examples repository on GitHub.  

As a first step, please clone the repository to your local machine. 

Once this is done, please change into the Apache Kafka Flink streaming directory. It contains a couple of resources that we will use to demonstrate the ingest. 

The env file contains multiple parameters that we need to set before we can start the demo via docker compose. In order to read the data from the weather API, we need to acquire a free API key. Please visit their website weatherapi.com, sign up for a free account and generate a new API key. We will copy it into our .env file. Furthermore, we configure to collect new data every 5 seconds from the service. 

After saving the configuration settings, we start the demo via docker compose. It will automatically download and start multiple images: Kafka and Flink, incl. zookeeper, CrateDB, as well as a producer forwarding the results of API calls into a Kafka topic. 

Once the download is complete, please open CrateDB’s admin UI in your web browser by opening http://localhost:4200. You will find an empty CrateDB instance in which we create the previously described table weather data Flink. It contains a generated timestamp to identify when a record has been inserted as well as the two sections location and current conditions as dynamic objects leveraging CrateDB's dynamic schema capabilities. 

Once the table is created, we see that the data from the Kafka topic is written automatically to CrateDB, i.e. the Flink job is running correctly. The first two records that existed in the Kafka topic by this time have been inserted successfully.  

Let us have a look at the actual data. As we can see, one more row has arrived in the meantime, and the structure from the web service call delivered in JSON has been correctly interpreted and inserted into the two objects. We can now use this data for further analysis as we demonstrated in the previous chapters about efficiently querying JSON data in CrateDB.

Take this course for free