Live Stream: Turbocharge your aggregations, search & AI models & get real-time insights

Register now
Skip to content
Blog

rust-cratedb 1.0

This article is more than 4 years old

With the 1.0 release of my Rust driver for CrateDB, I decided to share how the driver works as well as document some of the decisions around its creation.

Check out the repository for the driver on Github.

CrateDB

CrateDB is a distributed SQL database and BLOB store that is particularly well suited for anything that generates a large volume of data and requires analysis—sensor data being a prime example.

Because CrateDB offers a RESTful endpoint that JSON-encodes SQL statements and data, a driver for CrateDB is little more than a wrapper around a HTTP client that takes care of encoding and error handling.

CrateDB also supports the Postgres wire protocol which is more efficient than JSON and makes use of streaming.

Architecture

Designing a driver API from scratch is hard. And without a common interface proposal (e.g. like JPA, or .NET's datasource), switching database drivers will require rewriting a good part of the application.

That's why I looked at rust-postgres when creating rust-cratedb. I wanted to design a familiar feeling API. And I wanted the internal design to be structured in a way so that the external SQL interface is a trait. Traits let us swap out backends, enabling better maintainability and portability later on.

Here's is a rough overview over the architecture:

Diagram Rust CrateDB Architecture

The required imports are minimal:

extern crate cratedb;
use cratedb::{Cluster, NoParams};
use cratedb::sql::QueryRunner; // SQL query trait
use cratedb::blob::{BlobContainer, BlobRef}; // BLOB container trait
use cratedb::row::ByIndex; // index-based access to the result columns

Backend

The Backend (pictured above) is where the magic happens.

The Backend takes a string payload and a few parameters and then sends the request off to a Cluster. The Cluster represents multiple CrateDB nodes that will be "load balanced" whenever a query is executed.

In fact, nodes are chosen at random:

fn get_endpoint(&self, endpoint_type: EndpointType)
  -> Option {
    if !self.nodes.is_empty() {
        let node = random::() % self.nodes.len();
        let host = self.nodes[node].as_str();
        let t = match endpoint_type {
            EndpointType::SQL => "_sql",
            EndpointType::Blob => "_blobs",
        };
        Some(format!("{}{}", host, t))
    } else {
         None
    }
}

This function selects a node and uses its address to create an endpoint for the driver. It then passes the address on to the Backend implementation attached to the Cluster.

We use HTTP for this implementation, but this could be changed as long as the following trait is implemented:

pub trait Backend {
    fn execute(&self,
               to: Option,
               payload: String)
               -> Result<(BackendResult, String), BackendError>;

    fn upload_blob(&self,
                   to: Option,
                   bucket: &str,
                   sha1: &[u8],
                   f: &mut Read)
                   -> Result<BackendResult, BackendError>;


    fn delete_blob(&self,
                   to: Option,
                   bucket: &str,
                   sha1: &[u8])
                   -> Result<BackendResult, BackendError>;

    fn fetch_blob(&self,
                  to: Option,
                  bucket: &str,
                  sha1: &[u8])
                  -> Result<(BackendResult, Box), BackendError>;
}

This interface notably uses string as the payload data type, so any encoding/decoding happens beforehand to keep dependencies separated into their layers.

Upon entering the function, a client is created (in the case of the HTTPBackend, a HTTPClientFactory is created) depending on the required encryption mode (and for maintainability):

fn delete_blob(&self,
                  to: Option,
                  bucket: &str,
                  sha1: &[u8])
                  -> Result<BackendResult, BackendError> {
  if let Ok(to) = make_blob_url(to, bucket, sha1) {
    let client =
      self.client_factory.client(to.scheme().to_string());
    client
      .delete(to)
      .send()
      .map(|r| parse_status(&r.status))
      .map_err(BackendError::from_transport)
  } else {
     Err(BackendError::new("Invalid blob url".to_string()))
  }
}

As a consequence, each time a request is issued, a new object is created and disposed of after the execution. For HTTP, these clients are basically hyper::client::Client objects. However, in future implementations, this might change.

SQL and BLOB

Using traits in Rust for separating concerns creates very neat APIs that can easily be extended later on. Consequently, it made a lot of sense to transition to two distinct interfaces when adding BLOB support. One for each domain: SQL and BLOBs.

The formerly unified SQL interface was renamed to QueryRunner, which (I admit) is not the best name, but it works for now.

However, the functions it provided stayed the same:

pub trait QueryRunner {
    fn query<SQL, S>(&self, sql: SQL, params: Option<Box<S>>)
      -> Result<(f64, RowIterator), CrateDBError>
      where SQL: Into<String>, S: Serialize;

    fn bulk_query<SQL, S>(&self, sql: SQL, params: Box<S>)
      -> Result<(f64, Vec<i64>),  CrateDBError>
      where SQL: Into<String>, S: Serialize;
}

Parameters are provided as a generic Box, which makes JSON serialization of arbitrary structs possible (thank you serde).

Allowing arbitrary types with Rust's very rigid type system (❤️) is difficult and sometimes requires a more hacky approach. For example, although the query function has a parameter of type Option, it's not possible to pass in None. This is due to None having an undefined type (it's not a Box).

In order to tell the compiler more about the type, the :: operator can be used to cast to a boxed type. Still, this is not enough, since the box needs a type that implements Serialize. As a solution, we provide a NoParams type, so the user can do stuff like c.query("create blob table b", None::<Box<NoParams>>) to pass in a None with the expected type.

While the SQL interface is relatively simple, the BLOB store requires more operations to get things done.

The BLOB functions translate directly to HTTP calls underneath, so I recommended that you decouple BLOB operations from time-sensitive code. In particular, put will create a SHA1 hash of the file contents, which may take some time.

Side note: to reduce memory consumption (and have the ability to work with GiB scale files), rust-cratedb works with streams:

pub trait BlobContainer {
    fn list>(&self, table: TBL)
      -> Result<Vec, BlobError>;
    fn put, B: Read + Seek>(&self, table: TBL, blob: &mut B)
      -> Result<BlobRef, BlobError>;
    fn delete(&self, blob: BlobRef)
      -> Result<(), BlobError>;
    fn get(&self, blob: &BlobRef)
      -> Result<Box, BlobError>;
}

When a query is executed, it runs synchronously which means that a call to these functions will block and return the server time (the f64 part of the tuple) along with the result only after the request was finished.

An async version of the API is planned. 😉

Errors

Error handling is one of the weaker points of the drive.

There are several kinds of error that should bubble up, but they do not, at the moment. But do not panic, the most important ones are handled internally. For example, connection errors are handled with exponential backoff.

Here are the situations when the driver should report Err() results:

  • I/O from web requests or provided file streams
  • Transport from in-transit web requests (and/or because the library uses this type)
  • Parsers
  • CrateDB's errors
  • HTTP statuses for BLOBs

This leads to situations where the request succeeds (no I/O errors occurred), yet the HTTP status was, for example, 4xx. This is a status that, for a SQL statement, still requires the JSON parser to parse the response (but not for a BLOB operation).

So we don't get lost in if/match cascades, it's important to fail early and provide unwrapped variables for any following code.

To achieve some of these feats, an enum was the most practical approach:

#[derive(Debug, Clone)]
pub enum BlobError {
     Action(CrateDBError),
     Transport(BackendError),
}

While this still causes some cascades to be formed, they become much shorter and much more readable:

match self.backend
.fetch_blob(url, &blob.table, &blob.sha1)
.map_err(BlobError::Transport) { ... }

With this, any error could just be forwarded in the match clause.

Example

After all this explanation, let's finally take a look at the driver's interface.

Here's what it looks like to connect to an existing CrateDB instance:

extern crate cratedb;

use cratedb::{Cluster, NoParams};
use cratedb::sql::QueryRunner;

fn main() {
    let c: Cluster = Cluster::from_string("http://localhost:4200");
    let (elapsed, rows) =  
      c.query("select * from sys.cluster", None::<Box>)
        .unwrap();
}

For more complex examples, check out the README.md.

Roadmap

CrateDB is moving ahead quickly and it's important to keep, especially regarding the Enterprise Edition and its authentication features.

Here's what's coming to rust-cratedb soon:

  • 1.1: Authentication, certificate support, better errors
  • 1.2: Postgres Protocol support (not quite sure on that yet)
  • 2.0: Async API

Please check out the Github repository and open issues, submit pull requests, or help review code. Any and all help is appreciated!