Enabling and configuring Change Data Capture (CDC)

How to enable and configure Change Data Capture (CDC) in K8ssandra using Apache Pulsar

If you have an Apache Pulsar cluster available you can make data flow from Apache Cassandra into Apache Pulsar when it undergoes a mutation using our CDC (Change Data Capture) functionality.

This can be useful in real time data integration scenarios, where you want to feed data into multiple data stores but don’t want to modify your application to write to all of them (this often introduces complexities around acknowledgements, retries and consistency).

At present, only Pulsar destinations are supported, however Pulsar has a wide array of connectors which can then feed data into diverse downstream systems.

How to enable CDC

Have a Pulsar cluster available

Firstly, you will need a Pulsar Cluster available. One can be deployed for test purposes by running the below commands:

helm repo add datastax-pulsar https://datastax.github.io/pulsar-helm-chart
helm repo update
helm install pulsar --create-namespace -n pulsar datastax-pulsar/pulsar

Deploy a K8ssandraCluster

Next, you’ll want to deploy a K8ssandraCluster with the Pulsar service’s location referenced:

apiVersion: k8ssandra.io/v1alpha1
kind: K8ssandraCluster
metadata:
  name: test
spec:
  auth: false
  cassandra:
    serverVersion: "4.0.4"
    datacenters:
      - metadata:
          name: dc1
        size: 3
        cdc:
          pulsarServiceUrl: pulsar://pulsar-proxy.pulsar.svc.cluster.local:6650
        storageConfig:
          cassandraDataVolumeClaimSpec:
            storageClassName: standard
            accessModes:
              - ReadWriteOnce
            resources:
              requests:
                storage: 5Gi

Create your tables with cdc=true

When creating your tables, you’ll need to do so with cdc=true as below:

kubectl exec -it test-dc1-default-sts-0 -- bash
cqlsh test-dc1-all-pods-service.default.svc.cluster.local
CREATE KEYSPACE db1 WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'dc1':'3'};
CREATE TABLE IF NOT EXISTS db1.table1 (key text PRIMARY KEY, c1 text) WITH cdc=true;

Create the Pulsar connectors for your tables

There are several ways to create the Pulsar connectors. One is via the admin GUI, but the other is via the CLI, as below:

PULSAR_POD=$(kubectl get pods -n pulsar -l=app=pulsar -l=component=bastion -o=jsonpath='{.items[0].metadata.name}')
kubectl exec -it -n pulsar $PULSAR_POD -- bash

./bin/pulsar-admin source create \
    --source-type cassandra-source \
    --tenant public \
    --namespace default \
    --name cassandra-source-db1-table1 \
    --destination-topic-name data-db1.table1 \
    --source-config  '{
    "events.topic": "persistent://public/default/events-db1.table1",
    "keyspace": "db1",
    "table": "table1",
    "contactPoints": "test-dc1-all-pods-service.default.svc.cluster.local",
    "port": 9042,
    "loadBalancing.localDc": "dc1",
    "auth.provider": "None"
    }';

Note that you will need to create separate connectors for each table whose mutations you want to capture.

Watch the Pulsar topic

This can be done via the command line as below:

./bin/pulsar-client consume persistent://public/default/data-db1.table1 --schema-type auto_consume --subscription-position Earliest --subscription-name mysubs --num-messages 0

Mutate data in Cassandra

To demonstrate that this is all working, you can run the following statements in cqlsh.

INSERT INTO db1.table1 (key,c1) VALUES ('0','bob1');
INSERT INTO db1.table1 (key,c1) VALUES ('0','bob2'); INSERT INTO db1.table1 (key,c1) VALUES ('1','bob2');
DELETE FROM db1.table1 WHERE key='1';
ALTER TABLE db1.table1 ADD c2 int;
CREATE TYPE db1.t1 (a text, b text);
ALTER TABLE db1.table1 ADD c3 t1;
INSERT INTO db1.table1 (key,c1, c2, c3) VALUES ('3','bob', 1, {a:'a', b:'b'});
INSERT INTO db1.table1 (key,c1, c2, c3) VALUES ('4','bob', 1, {a:'a', b:'b'});
INSERT INTO db1.table1 (key,c1, c2, c3) VALUES ('5','bob', 1, {a:'a', b:'b'});
INSERT INTO db1.table1 (key,c1, c2, c3) VALUES ('6','bob', 1, {a:'a', b:'b'});
INSERT INTO db1.table1 (key,c1, c2, c3) VALUES ('7','bob', 1, {a:'a', b:'b'});
INSERT INTO db1.table1 (key,c1, c2, c3) VALUES ('8','bob', 1, {a:'a', b:'b'});
INSERT INTO db1.table1 (key,c1, c2, c3) VALUES ('9','bob', 1, {a:'a', b:'b'});
INSERT INTO db1.table1 (key,c1, c2, c3) VALUES ('10','bob', 1, {a:'a', b:'b'});

Last modified April 18, 2024: Release v1.15.0 (9071e39)