CrateDB Blog | Development, integrations, IoT, & more

Data Processing and Analytics with Dask and CrateDB

Written by Marija Selakovic | 2023-06-21

In this tutorial, we’ll explore how to leverage the power of CrateDB, a distributed SQL database, in conjunction with Dask, to perform efficient data processing and analysis tasks.

Dask is a parallel computing library that enables distributed computing for tasks such as data processing and machine learning.

Prerequisites

Before getting started, you should have the following installed:

  • Python 3
  • CrateDB
  • Dask

You can use pip to install Dask. To make sure that you have everything required for the most common uses of Dask (e.g., Dask Dataframe, Dask Array, etc) use the following command:

Inserting data

For this tutorial, we chose to use the California housing prices dataset, also available on Kaggle. This dataset is a popular dataset for regression tasks, consisting of median house values in census tracts in California, making it an excellent starting point for implementing basic machine learning algorithms.

Before importing data, create a california_housing table in CrateDB:

Use COPY FROM command to import housing data:

Using Dask to query the data

Dask provides three methods to read a SQL query or q database table into a Dataframe: read_sql, read_sql_table, and read_sql_query. The read_sql method is a convenience wrapper around the other two and it will delegate to a specific function based on the provided input. To use this method you will need the following parameters:

  • sql: name of a SQL table in a database or an SQL query to be executed,
  • uri: the full sqlalchemy URI for the database connection
  • index_col: the index column. The index column is used by Dask to split up the query on multiple machines.

Now, let’s load the data from a California housing dataset to a Dask Dataframe:

In the above example, we read the data from california_housing dataset and use total_rooms as an index column.

If you want to run read_sql with a query to be executed, you will need to provide an SQLAlchemy Selectable query. The following example shows how to query several columns from california_housing table and load the result to the Dask Dataframe.

Now that we loaded the data we can use the df.head() to show the first n rows in the dataset:

Linear regression with Dask and CrateDB

In the following example, we will illustrate how to perform a linear regression task on the California housing data. We will train a machine learning model that predicts the median house value based on the other variables in the dataset. Before we start, we need to categorize the ocean_proximity column as the only non-number column:

The above code will transform the last column so that it contains a number representing a certain category, as illustrated below:

The next step is to split the data into training and testing sets and for that, we can use the dask_ml library. For the linear regression estimator to work with the data we need to transform training and testing sets into Dask arrays:

Now we can perform a linear regression task on the data. First, we need to create a linear regression estimator and fit the estimator to the training data:

In the last line, we use the estimator to make predictions on the testing data. To evaluate the performance of our linear regression model, we can calculate the mean squared error (MSE) and the coefficient of determination (R²) on the testing data:

The last two lines will output the mean squared error and R² score for our linear regression model.

Using Dask to write to CrateDB

Dask also provides support for storing Dask Dataframe to a SQL table with the to_sql method. To illustrate the concurrent write of a Dask Dataframe to CrateDB we first create a Pandas DataFrame using the makeTimeDataFrame function with a frequency of one second and a total of 1,5 million periods as illustrated below.

Then, we create the Dask Dataframe from the Pandas Dataframe and divide the data into 4 partitions, allowing for parallel processing:

Finally, with the to_sql() method we load the data to a CrateDB database:

The to_sql() method takes several arguments:

  • "demo": the name of the table where the data will be loaded.
  • uri: the connection string to the CrateDB database.
  • index=False: specifies that the index column in the DataFrame should not be included in the database table.
  • if_exists="replace": specifies that if the table already exists, it should be replaced with the new data. Other possible values are ‘fail’, ‘replace’, and ‘append’.
  • chunksize=10000: the number of rows to be inserted at a time. It may be helpful to experiment with different chunk sizes to find the optimal value for the specific use case.
  • parallel=True: specifies that the insertion process should be done in parallel.

On an M1 machine with 16 GB of RAM, the entire process of loading the 1.5 million records worth of data into the database, takes approximately 15 seconds. Without using parallel=True, the total runtime increases to 22 seconds, thus demonstrating that it is more efficient than running insert operations subsequently.

Conclusions

In this tutorial, we’ve covered the essentials of using CrateDB with Dask for efficient data processing and analysis. By combining the distributed capabilities of CrateDB with the parallel computing power of Dask, you can unlock the potential to handle large-scale datasets, perform complex queries, and leverage advanced analytics techniques.

To learn more about updates, features, and other questions you might have, join our CrateDB community.