CockroachDB Enterprise changefeeds can stream change data out to Apache Kafka with different configuration settings and options. Confluent Cloud provides a fully managed service for running Apache Kafka as well as the Confluent Cloud Schema Registry.
A schema registry is a repository for schemas, which allows you to share and manage schemas between different services. Confluent Cloud Schema Registries map to Kafka topics in your Confluent Cloud environment.
In this tutorial, you will set up a changefeed to stream data from CockroachDB to a Kafka cluster in Confluent Cloud. You will also connect a Schema Registry that retrieves the schemas from your changefeed's messages.
An overview of the workflow involves creating and connecting the following:
- Confluent Cloud Kafka cluster
- Confluent Schema Registry
- Changefeed streaming to your Confluent Cloud Kafka cluster
Before you begin
You will need the following set up before starting this tutorial:
- A CockroachDB cluster. You can use a CockroachDB Cloud or CockroachDB self-hosted cluster.
- If you are using CockroachDB CockroachDB Cloud, refer to the Quickstart with CockroachDB guide. For CockroachDB self-hosted clusters, refer to the install page.
- A Confluent Cloud account. See Confluent's Get started page for details.
- The Confluent CLI. See Install Confluent CLI to set this up. This tutorial uses v3.3.0 of the Confluent CLI. Note that you can also complete the steps in this tutorial in Confluent's Cloud console.
This tutorial uses the Cockroach Labs movr
workload as an example database.
Step 1. Create a Confluent Cloud Kafka cluster
In this step, you'll use the Confluent CLI to create and configure a Kafka cluster.
Ensure you are logged in to Confluent Cloud:
confluent login --save
These instructions use the
--save
flag to store your username and password to a local file for convenience during this tutorial, but you can omit this flag if you would prefer to manually authenticate yourself each time.List the environments in your Confluent Cloud account:
confluent environment list
If you haven't created an environment explicitly, this command will list a default environment. You can use the default environment for this tutorial.
If you would prefer to create an environment, run the following command with a name for your environment:
confluent environment create {ENVIRONMENT NAME}
Set the environment that you would like to create your cluster in, using the environment's
ID
, which starts withenv-
:confluent environment use {ENVIRONMENT ID}
Create a Kafka cluster:
confluent kafka cluster create movr-confluent-tutorial --cloud "gcp" --region "us-east1"
Here the name of the cluster is
movr-confluent-tutorial
, but you can change this for your cluster.Note that the
--cloud
and--region
flags are required when running thecreate
command. See Confluent's documentation onconfluent kafka cluster create
.The
create
command returns your new cluster's details, with a format similar to the following:+---------------+--------------------------------------------------------+ | ID | lkc-{ID} | | Name | movr-confluent-tutorial | | Type | BASIC | | Ingress | 100 | | Egress | 100 | | Storage | 5 TB | | Provider | gcp | | Availability | single-zone | | Region | us-east1 | | Status | PROVISIONING | | Endpoint | SASL_SSL://pkc-4yyd6.us-east1.gcp.confluent.cloud:9092 | | API Endpoint | https://pkac-ew1dj.us-east1.gcp.confluent.cloud | | REST Endpoint | https://pkc-4yyd6.us-east1.gcp.confluent.cloud:443 | +---------------+--------------------------------------------------------+
You'll need this information later in the tutorial, but you can also access this status at any time with the following command:
confluent kafka cluster describe {CLUSTER ID}
Note:It can take up to 5 minutes for your Kafka cluster to provision. The
Status
field in the cluster's details will change fromPROVISIONING
toUP
once your Kafka cluster is ready.
Step 2. Create a cluster API key and secret
In this step, you'll create an API key and secret for your Kafka cluster, which you'll need for connecting to your changefeed.
Create the API key for your Kafka cluster:
confluent api-key create --resource {CLUSTER ID}
You will receive output displaying your API and secret key.
To make the consumer setup easier later in the tutorial, you can store the API key locally and set it as your active API key:
confluent api-key store --resource {CLUSTER ID}
confluent api-key use {API KEY} --resource {CLUSTER ID}
This will prompt you to enter your API and secret key. Use the
--force
flag if you already have a key stored in your local environment.
Step 3. Create Kafka topics
Next, you'll create the Kafka topics for your changefeed messages.
Ensure you have the correct active Kafka cluster:
confluent kafka cluster use {CLUSTER ID}
Set Kafka cluster "lkc-{ID}" as the active cluster for environment "env-{ID}".
Run the following command to create a topic:
confluent kafka topic create users
users
will be the topic name for this tutorial. If needed, you can change the topic name for your purposes and run the previous command for each topic you would like to create.Tip:If you are using a Dedicated Confluent Cloud cluster, you can enable auto topic creation. For further detail, see Enable automatic topic creation.
Step 4. Create a Confluent Schema Registry
In this step, you'll create the Schema Registry in your environment.
Enable the Schema Registry for the active environment:
confluent schema-registry cluster enable --cloud "gcp" --geo "us"
The --cloud
and --geo
flags are required with this enable
command. See the confluent schema-registry cluster enable
docs for more detail. To match the Kafka cluster setup for this tutorial, the cloud
is set to Google Cloud Platform and the geo
to the US.
You will receive output showing the Schema Registry's ID and its endpoint URL:
+--------------+----------------------------------------------------+
| Id | lsrc-816zp7 |
| Endpoint URL | https://psrc-x77pq.us-central1.gcp.confluent.cloud |
+--------------+----------------------------------------------------+
Step 5. Create a Schema Registry API key and secret
Generate an API and secret key for the Schema Registry using the ID from your output:
confluent api-key create --resource {SCHEMA REGISTRY ID}
The output will display your API key and secret. You'll need these to create your Kafka consumer and start your changefeed.
Step 6. Create a Kafka consumer
In this step, you'll start a Kafka consumer for the changefeed messages.
Run the following command to create a consumer:
confluent kafka topic consume users \
--value-format avro \
--from-beginning \
--schema-registry-endpoint {SCHEMA REGISTRY ENDPOINT URL} \
--schema-registry-api-key {SCHEMA REGISTRY API KEY} \
--schema-registry-api-secret {SCHEMA REGISTRY SECRET}
In this command, you need to pass the following Schema Registry details:
For this command to run successfully, ensure that confluent kafka cluster describe {CLUSTER ID}
returns a Status
of UP
.
Your terminal will wait for messages after this command has run successfully.
Run confluent schema-registry cluster describe
to access details for the Schema Registry, if needed.
Step 7. Prepare your CockroachDB cluster
To create your changefeed, you'll prepare your CockroachDB cluster with the movr
workload and enable rangefeeds.
In a new terminal window, initiate the
movr
workload for your cluster:cockroach workload init movr {"CONNECTION STRING"}
Run the workload to generate some data:
cockroach workload run movr --duration=1m {"CONNECTION STRING"}
Start a SQL session for your CockroachDB cluster:
cockroach sql --url {"CONNECTION STRING"}
Set your organization name and Enterprise license key:
SET CLUSTER SETTING cluster.organization = '<organization name>';
SET CLUSTER SETTING enterprise.license = '<secret>';
Before you can create an Enterprise changefeed, it is necessary to enable rangefeeds on your cluster:
SET CLUSTER SETTING kv.rangefeed.enabled = true;
Step 8. Create a changefeed
Before running the CREATE CHANGEFEED
statement, you must URL-encode both the cluster's and the Schema Registry's API secret key.
You can also create external connections to define a name for the Kafka and Confluent Schema Registry URIs. This allows you to interact with your defined name instead of the provider-specific URI.
Construct the Kafka URI:
Use the
Endpoint
from your cluster details and precede it with thekafka://
scheme. For example, an endpoint ofpkc-4yyd6.us-east1.gcp.confluent.cloud:9092
would be:kafka://pkc-4yyd6.us-east1.gcp.confluent.cloud:9092
.Since the Kafka cluster uses
SASL
authentication, you need to pass the following parameters. This includes the cluster API and secret key you created in Step 2:tls_enabled=true
sasl_enabled=true
sasl_user={CLUSTER API KEY}
sasl_password={URL-ENCODED CLUSTER SECRET KEY}
sasl_mechanism=PLAIN
"kafka://{KAFKA ENDPOINT}?tls_enabled=true&sasl_enabled=true&sasl_user={CLUSTER API KEY}&sasl_password={URL-ENCODED CLUSTER SECRET KEY}&sasl_mechanism=PLAIN"
Create an external connection for the Kafka URI:
CREATE EXTERNAL CONNECTION kafka AS "kafka://{KAFKA ENDPOINT}?tls_enabled=true&sasl_enabled=true&sasl_user={CLUSTER API KEY}&sasl_password={URL-ENCODED CLUSTER SECRET KEY}&sasl_mechanism=PLAIN"
To construct the Confluent Schema Registry URI, you need:
- Schema Registry API Key created in Step 5.
- URL-encoded Schema Registry secret key created in Step 5.
- The
Endpoint URL
from the Schema Registry's details created in Step 4. Make sure to add the:443
port to the end of this URL. For example,psrc-x77pq.us-central1.gcp.confluent.cloud:443
.
"https://{SCHEMA REGISTRY API KEY}:{URL-ENCODED SCHEMA REGISTRY SECRET KEY}@{SCHEMA REGISTRY ENDPOINT URL}:443"
Tip:Use the
timeout={duration}
query parameter (duration string) in your Confluent Schema Registry URI to change the default timeout for contacting the schema registry. By default, the timeout is 30 seconds.Create an external connection for the Confluent Schema Registry URI:
CREATE EXTERNAL CONNECTION confluent_registry AS "https://{SCHEMA REGISTRY API KEY}:{URL-ENCODED SCHEMA REGISTRY SECRET KEY}@{SCHEMA REGISTRY ENDPOINT URL}:443"
Create the changefeed with any other options you need to configure your changefeed:
CREATE CHANGEFEED FOR TABLE users INTO "external://kafka" WITH updated, format = avro, confluent_schema_registry = "external://confluent_registry";
See Options for a list of all available Enterprise changefeed options.
Tip:Use the
changefeed.schema_registry.retry_count
metric to measure the number of request retries performed when sending requests to the schema registry. For more detail on monitoring changefeeds, refer to Monitor and Debug Changefeeds.
Step 9. Verify the output
Move to the terminal window in which you started the Kafka consumer. As the changefeed runs, you will see the change data messages similar to the following:
. . .
{"after":{"users":{"name":{"string":"Michael Clark"},"address":{"string":"85957 Ashley Junctions"},"credit_card":{"string":"4144089313"},"id":{"string":"d84cf3b6-7029-4d4d-aa81-e5caa9cce09e"},"city":{"string":"seattle"}}},"updated":{"string":"1659643584586630201.0000000000"}}
{"after":{"users":{"address":{"string":"17068 Christopher Isle"},"credit_card":{"string":"6664835435"},"id":{"string":"11b99275-92ce-4244-be61-4dae21973f87"},"city":{"string":"amsterdam"},"name":{"string":"John Soto"}}},"updated":{"string":"1659643585384406152.0000000000"}}
{"after":{"users":{"id":{"string":"a4666991-0292-4b00-8df0-d807c10eded5"},"city":{"string":"boston"},"name":{"string":"Anthony Snow"},"address":{"string":"74151 Carrillo Ramp"},"credit_card":{"string":"2630730025"}}},"updated":{"string":"1659643584990243411.0000000000"}}
{"updated":{"string":"1659643584877025654.0000000000"},"after":{"users":{"city":{"string":"seattle"},"name":{"string":"Tanya Holmes"},"address":{"string":"19023 Murphy Mall Apt. 79"},"credit_card":{"string":"6549598808"},"id":{"string":"434d4827-945f-4c7a-8d10-05c03e3bbeeb"}}}}
. . .
You can also view the messages for your cluster in the Confluent Cloud console in the Topics sidebar under the Messages tab.
You can use the Schema tab to view the schema for a specific topic.