Live Stream on Jan 23rd: Unlocking Real Time Insights in the Renewable Energy Sector with CrateDB

Register now
Skip to content
Blog

Replicating CDC Events from DynamoDB to CrateDB

In this article, we focus on replicating data from DynamoDB in real time to CrateDB, using Change Data Capture (CDC) events.

Architecture

To be cloud-native, we only want to use AWS services to implement our data pipeline. We will use the following services:

  • DynamoDB: The starting point where data resides and is actively changed through INSERT, UPDATE, or DELETE statements.
  • Kinesis: DynamoDB will ingest CDC events into Kinesis as a message queue.
  • Lambda: CDC events must be decoded, interpreted, and translated into standard SQL. Lambda is a perfect fit to carry out such compute tasks in a serverless mode.
  • Elastic Container Registry (ECR): To facilitate testability and dependency management of our Lambda function, we bundle it as an Open Container Initiative (OCI) image that we push to an ECR repository.
  • CloudFormation: To be production-ready, we will deploy all components as a CloudFormation template, following the Infrastructure as code (IaC) paradigm.

Dynamo_to_Lambda

Lambda Function

The core component of our CDC replay solution is the Lambda function. In a simplified way, it works as shown below. The two core dependencies used are:

  1. commons-codec: Provides the logic to transform a DynamoDB CDC event into a CrateDB-compatible INSERT, UPDATE, or DELETE statement.
  2. crate: The CrateDB HTTP driver for sending SQL statements to CrateDB.

The handler method gets invoked when a new batch of events arrives from Kinesis. We iterate through each of the events and forward them to CrateDB:

import base64
import json
import logging
import os

import sqlalchemy as sa
from commons_codec.transform.dynamodb import DynamoCDCTranslatorCrateDB

logger = logging.getLogger(__name__)

engine = sa.create_engine(os.environ.get("CRATEDB_SQLALCHEMY_URL", "crate://"))
cdc = DynamoCDCTranslatorCrateDB(table_name=os.environ.get("CRATEDB_TABLE", "default"))

def handler(event, _context):
    for record in event["Records"]:
          # Log and decode event.
          record_data = json.loads(base64.b64decode(record["kinesis"]["data"]).decode("utf-8"))

          # Process record.
          sql = cdc.to_sql(record_data)
          with engine.connect() as connection:
              connection.execute(sa.text(sql))

    logger.info(f"Successfully processed {len(event['Records'])} records.")

A more sophisticated version of the Lambda function can be found on GitHub.

Deployment

To set up our data pipeline, we use cratedb-toolkit. The toolkit is Python-based and includes a set of subsystems to conveniently carry out common tasks, ranging from data imports to date retention policy enforcement. In our case, it takes care of provisioning the CloudFormation template, building the OCI image, as well as deploying it.

After installing it with a simple pip install --upgrade "cratedb-toolkit[kinesis]", we can create our resources programmatically.

We start with saving the above Lambda function as kinesis_lambda.py and build an OCI image. The image gets pushed to ECR.

python_image = LambdaPythonImage(
    name="cratedb-kinesis-lambda",
    entrypoint_file=Path("kinesis_lambda.py"),
    entrypoint_handler="kinesis_lambda.handler",
)
python_image.publish() 

We can find the image on ECR when navigating to our repository:

cratedb-kinesis-lamda

Next, we define the DynamoDB table, Kinesis stream, Lambda function and connect all of them. The Lambda function also points to our CrateDB Cloud target cluster.

# Define an AWS CloudFormation software stack.
stack = DynamoDBKinesisPipe(
    project="testdrive-dynamodb",
    stage="dev",
    region="eu-west-1",
    description="DynamoDB CDC -> Kinesis Stream -> Python Lambda via OCI -> CrateDB",
    table_name="table-testdrive-demo",
    stream_name="dynamodb-cdc-demo",
    environment={
        "CRATEDB_SQLALCHEMY_URL": "crate://admin:dZ..qB@example.eks1.eu-west-1.aws.cratedb.net:4200/?ssl=true",
        "CRATEDB_TABLE": "transactions",
    },
)
# Add components to the stack.
stack.table().processor(
    LambdaFactory(
        name="DynamoDBCrateDBProcessor",
        oci_uri=python_image.uri,
        handler=python_image.entrypoint_handler,
    )
).connect() 

When navigating to CloudFormation in the AWS console, we can now see all our services deployed:

dynamodb

A full version of the provisioning script can be found on GitHub, together with exact step-by-step instructions.

Finally, in CrateDB Cloud, we provision the target table with a simple CREATE TABLE statement. We encapsulate the JSON payload into CrateDB’s OBJECT container data type that fully indexes all JSON properties: CREATE TABLE transactions (data OBJECT(DYNAMIC));

Execution

To populate the DynamoDB table with some initial data, we use the AWS CLI:

READING_SQL="{'timestamp': '2024-07-12T01:17:52', 'device': 'foo', 'temperature': 41.40, 'humidity': 89.64}"
aws dynamodb execute-statement --statement "INSERT INTO \"table-testdrive-demo\" VALUE ${READING_SQL};"

Querying the CrateDB sink table shows the data showing up almost instantly:
cratedb-sink-table

Summary

We have shown that DynamoDB CDC events can be processed and replayed to CrateDB through native AWS services. cratedb-toolkit provides ready-to-use automation that connects all components and simplifies deployment.

While this particular example focused on DynamoDB CDC events, the approach is universal by retrieving events from Kinesis, and can also be applied to other source systems for CDC events, such as MongoDB.