Change data capture queries allow you to define the change data emitted to your sink when you create a changefeed. The expression syntax provides a way to select columns and apply filters to further restrict or transform the data in your changefeed messages.
You can use CDC queries to do the following:
- Filter out specific rows and columns from changefeed messages to decrease the load on your downstream sink and support outbox workloads.
- Modify data before it emits to reduce the time and operational burden of filtering or transforming data downstream.
- Stabilize or customize the schema of your changefeed messages for increased compatibility with external systems.
You can use any CockroachDB-supported SQL expression syntax that is not listed in Known limitations to build a changefeed query.
See the Examples section for further use cases.
Syntax
There are two possible components to CDC queries:
- Projections select the columns that you want to emit data from.
- Predicates restrict the resulting column change data based on the filters you apply.
CREATE CHANGEFEED [INTO sink] [WITH options] AS SELECT projection FROM table [WHERE predicate];
Parameter | Description |
---|---|
sink |
Specify the sink URL to emit change data to. See Changefeed Sinks for a list of supported sinks. It is also possible to run a changefeed without a sink CREATE CHANGEFEED WITH... , which will send changes to the active SQL session. |
options |
Set options on the changefeed. See the Options table for a full list. |
projection |
Select the columns from which to emit data. |
table |
Define the table to which the columns belong. |
predicate |
Apply optional filters with a WHERE clause. |
For a SQL diagram of the CDC query syntax, see the CREATE CHANGEFEED
page.
To emit different properties for a row, specify the following explicitly in CDC queries:
cdc_prev
: A tuple-typed column that gives changefeeds access to the previous state of a row. For newly inserted rows in a table, thecdc_prev
column will emit asNULL
. See the Emit the previous state of a row example for more detail.- CDC queries support system columns, for example:
crdb_internal_mvcc_timestamp
: Records the timestamp of each row created in a table. If you do not have a timestamp column in the target table, you can accesscrdb_internal_mvcc_timestamp
in a changefeed. See the Determine the age of a row example.
When changes happen to a column that is part of a composite key, the changefeed will produce a delete message and then an insert message.
Known limitations
- You can only apply CDC queries on a single table in each statement.
- Some stable functions, notably functions that return MVCC timestamps, are overridden to return the MVCC timestamp of the event, e.g.,
transaction_timestamp
orstatement_timestamp
. Additionally, some time-based functions, such asnow()
are not supported. We recommend using thetransaction_timestamp()
function or thecrdb_internal_mvcc_timestamp
column instead. - The following are not permitted in CDC queries:
- Volatile functions.
- Sub-select queries.
- Aggregate and window functions (i.e., functions operating over many rows). #98237
delete
changefeed events will only contain the primary key. All other columns will emit asNULL
. See Capture delete messages for detail on running a CDC query that emits the deleted values. #83835ALTER CHANGEFEED
is not fully supported with changefeeds that use CDC queries. You can alter the options that a changefeed uses, but you cannot alter the changefeed target tables. #83033Creating a changefeed with CDC queries on tables with more than one column family is not supported. #127761
CDC query function support
The following table outlines functions that are useful with CDC queries:
Function | Description |
---|---|
changefeed_creation_timestamp() |
Returns the decimal MVCC timestamp when the changefeed was created. |
event_op() |
Returns a string describing the type of event. If a changefeed is running with the diff option, then this function returns 'insert' , 'update' , or 'delete' . If a changefeed is running without the diff option, it is not possible to determine an update from an insert, so event_op() returns 'upsert' or 'delete' . |
event_schema_timestamp() |
Returns the timestamp of schema change events that cause a changefeed message to emit. When the schema change event does not result in a table backfill or scan, event_schema_timestamp() will return the event's timestamp. When the schema change event does result in a table backfill or scan, event_schema_timestamp() will return the timestamp at which the backfill/scan is read — the high-water mark time of the changefeed. |
You can also use the following functions in CDC queries:
- Functions marked as "Immutable" on the Functions and Operators page.
- Non-volatile user-defined functions. See the Queries and user-defined functions example.
- Functions that rely on session data. At the time of changefeed creation, information about the current session is saved. When a CDC query includes one of the functions that use session data, the query will evaluate the saved session data.
- The following "Stable" functions:
age()
array_to_json()
array_to_string()
crdb_internal.cluster_id()
date_part()
date_trunc()
extract()
format()
jsonb_build_array()
jsonb_build_object()
to_json()
to_jsonb()
row_to_json()
overlaps()
pg_collation_for()
pg_typeof()
quote_literal()
quote_nullable()
Unsupported functions
You can not use the following functions with CDC queries:
- Functions marked as "Volatile" on the Functions and Operators page.
- Functions listed in the Known limitations section on this page.
- Functions marked as "Stable" on the Functions and Operators page, except for those listed previously.
Examples
CDC queries allow you to customize your changefeed for particular scenarios. This section outlines several possible use cases for CDC queries.
CDC queries use envelope=bare
message format by default. The bare
message envelope places the output of the SELECT
clause at the top level of the message instead of under an "after"
key. When there is additional information that the changefeed is sending, such as updated
or resolved
timestamps, the messages will include a crdb
field containing this information. Refer to the Changefeed Messages page for more detail.
Depending on how you are filtering or adapting the message envelope with a CDC query and which sink you're emitting to, message output may vary from some of the example cases in this section.
Refer to CREATE CHANGEFEED
for examples on using the foundational syntax to create a changefeed. For information on sinks, refer to the Changefeed Sinks page.
To optimize the SELECT
query you run in your changefeed statement, use the EXPLAIN
statement to view a statement plan.
Note that EXPLAIN
does not have access to cdc_prev
, therefore you will receive an error if your SELECT
query contains cdc_prev
.
Filter columns
To only emit data from specific columns in a table, you can use SELECT {columns}
to define the table's columns.
As an example, using the users
table from the movr
database, you can create a changefeed that will emit messages including only the name
and city
column data:
CREATE CHANGEFEED INTO "scheme://sink-URI" WITH updated AS SELECT name, city FROM users;
{"__crdb__": {"updated": "1685718676799158675.0000000000"}, "city": "amsterdam", "name": "Thomas Harris"}
{"__crdb__": {"updated": "1685718676799158675.0000000000"}, "city": "amsterdam", "name": "Guy Williams"}
{"__crdb__": {"updated": "1685718676799158675.0000000000"}, "city": "amsterdam", "name": "Guy Williams"}
{"__crdb__": {"updated": "1685718676799158675.0000000000"}, "city": "amsterdam", "name": "Tyler Hunter"}
{"__crdb__": {"updated": "1685718676799158675.0000000000"}, "city": "amsterdam", "name": "Tyler Hunter"}
{"__crdb__": {"updated": "1685718676799158675.0000000000"}, "city": "amsterdam", "name": "Tyler Dalton"}
{"__crdb__": {"updated": "1685718676799158675.0000000000"}, "city": "amsterdam", "name": "Dillon Martin"}
{"__crdb__": {"updated": "1685718676799158675.0000000000"}, "city": "amsterdam", "name": "Lisa Sandoval"}
{"__crdb__": {"updated": "1685718676799158675.0000000000"}, "city": "amsterdam", "name": "Deborah Carson"}
{"__crdb__": {"updated": "1685718676799158675.0000000000"}, "city": "amsterdam", "name": "David Stanton"}
{"__crdb__": {"updated": "1685718676799158675.0000000000"}, "city": "amsterdam", "name": "Maria Weber"}
Filter delete messages
To remove the delete messages from a changefeed stream, use the event_op()
function:
CREATE CHANGEFEED INTO sink AS SELECT * FROM table WHERE NOT event_op() = 'delete';
Filtering delete messages from your changefeed is helpful for certain outbox table use cases. See Queries and the outbox pattern for further detail.
Capture delete messages
Delete changefeed messages will only contain the primary key value and all other columns will emit as NULL
(see the Known limitations). To emit the deleted values, use the envelope=wrapped
, format=json
, and diff
options:
CREATE CHANGEFEED INTO 'external://cloud' WITH envelope='wrapped', format='json', diff AS SELECT * FROM users WHERE event_op() = 'delete';
This will produce a JSON object with before
and after
keys that contain the prior and current states of the row:
{"after": null, "before": {"address": "95913 Thomas Key Apt. 99", "city": "washington dc", "credit_card": "2702281601", "id": "49a8c43d-8ed8-4d50-ad99-fb314cbe20a1", "name": "Tina Jones"}}
The before
value in the delete message, produced by the diff
option, will include the entire row. That is, it will not include any projections from a CDC query.
Emit the previous state of a row
Changefeeds can access the cdc_prev
hidden column on a table to emit the previous state of a row or column. cdc_prev
is a tuple-typed column that contains the table's columns.
To emit the previous state of a row, it is necessary to explicitly call cdc_prev
:
CREATE CHANGEFEED INTO 'external://sink' AS SELECT rider_id, vehicle_id, cdc_prev FROM movr.rides;
To emit the previous state of a column, you can specify this as a named field from the cdc_prev
tuple with the following syntax:
CREATE CHANGEFEED INTO 'external://sink' AS SELECT owner_id, (cdc_prev).current_location AS previous_location FROM movr.vehicles WHERE (cdc_prev).status = 'in_use';
For newly inserted rows in a table, the cdc_prev
column will emit as NULL
.
If you do not need to select specific columns in a table or filter rows from a changefeed, you can instead create a changefeed using the diff
option to emit a before
field with each message. This field includes the value of the row before the update was applied.
Reference TTL in a CDC query
In CockroachDB, table row deletes occur as a result of regular SQL transactions or through row-level TTL. When your changefeed emits delete event messages, you may need to distinguish between these two types of deletion. For example, only emitting messages for row-level TTL deletes from your changefeed.
If you have TTL logic defined with ttl_expiration_expression
or ttl_expire_after
, you can leverage CDC queries to determine whether or not a given row was expired at the time of the changefeed event, including a delete event.
Most users should use ttl_expiration_expression
instead of ttl_expire_after
for the following reasons:
- If you add
ttl_expire_after
to an existing table, it will cause a full table rewrite, which can affect performance. Specifically, it will result in a schema change that (1) creates a new hidden columncrdb_internal_expiration
for all rows, and (2) backfills the value of that new column tonow()
+ttl_expire_after
. - You cannot use
ttl_expire_after
with an existingTIMESTAMPTZ
column. - If you use
ttl_expiration_expression
, you can use an existingTIMESTAMPTZ
column called e.g.updated_at
.
For more detail, refer to the Batch Delete Expire Data with Row-Level TTL page.
ttl_expiration_expression
In some cases, you may have custom expiration logic on rows in a table. You can also write a CDC query to emit rows that have deleted through row-level TTL using a custom TTL expression.
In the following example, the table uses the ttl_expiration_expression
storage parameter to reference the expired_at
column. To create a changefeed on this table to explicitly emit the previous state of the row for TTL deletions:
CREATE CHANGEFEED INTO 'external://sink'
AS SELECT cdc_prev FROM ttl_test_per_row
WHERE event_op() = 'delete'
AND (cdc_prev).expired_at < statement_timestamp();
For the CREATE TABLE
statement and further details on ttl_expiration_expression
, refer to Using ttl_expiration_expression
.
ttl_expire_after
When the table uses the ttl_expire_after
storage parameter, you can emit rows that were deleted after expiring from the changefeed with syntax similar to:
CREATE CHANGEFEED INTO 'external://sink'
AS SELECT cdc_prev FROM test_table
WHERE event_op() = 'delete'
AND (cdc_prev).crdb_internal_expiration < statement_timestamp();
This changefeed statement:
- Accesses the
cdc_prev
column for the previous state of the row. - Searches for
delete
events in that previous state. - Finds the TTL expiration timestamp of the deleted rows where it is earlier than the current statement timestamp.
For the CREATE TABLE
statement and further details on ttl_expire_after
, refer to Using ttl_expire_after
.
This will only emit rows that were deleted after expiring. Furthermore, consider that a transactional SQL delete during the window between the row expiring and the TTL job running will also cause this message to emit from the changefeed.
Equally, you can remove the delete messages for expired rows so that they do not emit from your changefeed:
CREATE CHANGEFEED AS SELECT cdc_prev FROM test_table
WHERE NOT (event_op() = 'delete'
AND (cdc_prev).crdb_internal_expiration < statement_timestamp());
Geofilter a changefeed
When you are working with a REGIONAL BY ROW
table, you can filter the changefeed on the crdb_region
column to create a region-specific changefeed:
CREATE CHANGEFEED INTO sink AS SELECT * FROM table WHERE crdb_region = 'europe-west2';
For more detail on targeting REGIONAL BY ROW
tables with changefeeds, see Changefeeds in Multi-Region Deployments.
If you are running changefeeds from a multi-region cluster, you may want to define which nodes take part in running the changefeed job. You can use the execution_locality
option with key-value pairs to specify the locality designations nodes must meet.
Stabilize the changefeed message schema
As changefeed messages emit from the database, message formats can vary as tables experience schema changes. You can select columns with typecasting to prevent message fields from changing during a changefeed's lifecycle:
CREATE CHANGEFEED INTO sink AS SELECT id::int, name::varchar, admin::bool FROM users;
Shard changefeed messages
CDC queries allow you to emit changefeed messages from the same table to different endpoints. As a result, you can use queries to load balance messages across changefeed sinks without the need for an intermediate system.
In this example, the query uses the ride_id
column's UUID
to shard the messages. The left()
function filters the first character from the ride_id
column and finds the specified initial characters. The example shards successfully by running a changefeed on the same table and dividing the 16 possible beginning UUID
characters through to f
.
Therefore, the first changefeed created:
CREATE CHANGEFEED INTO 'scheme://sink-URI-1'
AS SELECT * FROM movr.vehicle_location_histories
WHERE left(ride_id::string, 1) IN ('0','1','2','3');
The final changefeed created:
CREATE CHANGEFEED INTO 'scheme://sink-URI-4'
AS SELECT * FROM movr.vehicle_location_histories
WHERE left(ride_id::string, 1) IN ('c','d','e','f');
View recent changes to a row
You can use CDC queries as a tool for debugging or investigating issues from the SQL shell.
For example, you may need to identify what recently changed in a specific row. You can use the cursor
option with the desired start time and a WHERE
clause describing the row in question. Instead of sending to a sink, a "sinkless" changefeed will allow you to view the results in the SQL shell.
Find the start time. Use the
cluster_logical_timestamp()
function to calculate the logical time. This will return the logical timestamp for an hour earlier than the statement run time:SELECT cluster_logical_timestamp() - 3600000000000;
?column? ---------------------------------- 1663938662092036106.0000000000 (1 row)
Run the changefeed without a sink and pass the start time to the
cursor
option:CREATE CHANGEFEED WITH cursor='1663938662092036106.0000000000' AS SELECT * FROM vehicle_location_histories WHERE ride_id::string LIKE 'f2616bb3%';
To find changes within a time period, use
cursor
with theend_time
option:CREATE CHANGEFEED WITH cursor='1663938662092036106.0000000000', end_time='1663942405825479261.0000000000' AS SELECT * FROM vehicle_location_histories WHERE ride_id::string LIKE 'f2616bb3%';
Determine the age of a row
You can determine the age of a row by using the crdb_internal_mvcc_timestamp
system column and cdc_prev
to access the row's previous state:
CREATE CHANGEFEED INTO 'external://sink'
AS SELECT crdb_internal_mvcc_timestamp - (cdc_prev).crdb_internal_mvcc_timestamp
AS age
FROM movr.rides;
{"age": 1679504962492204986.0000000000}
{"age": 1679577387885735266.0000000000}
{"age": 1679504962492204986.0000000000}
{"age": 1679578262568913199.0000000000}
{"age": 1679504962492381317.0000000000}
{"age": 1679579853238534524.0000000000}
{"age": 1679578374708255008.0000000000}
{"age": 1679504962492381317.0000000000}
{"age": 1679578344852201733.0000000000}
{"age": 1679578242116550285.0000000000}
Recover lost messages
In the event that an incident downstream has affected some rows, you may need a way to recover or evaluate the specific rows. Create a new changefeed that only watches for the affected row(s). Here, the example uses the row's primary key:
CREATE CHANGEFEED INTO 'scheme://cloud'
AS SELECT * FROM movr.vehicle_location_histories
WHERE ride_id = 'efe6468e-f443-463f-a21c-4cb0f6ecf235';
The changefeed will return messages for the specified rows:
{"city": "washington dc", "lat": 128, "long": 11, "ride_id": "efe6468e-f443-463f-a21c-4cb0f6ecf235", "timestamp": "2023-06-02T15:11:30.316547"}
{"city": "washington dc", "lat": 45, "long": -66, "ride_id": "efe6468e-f443-463f-a21c-4cb0f6ecf235", "timestamp": "2023-06-02T15:11:33.700297"}
{"city": "washington dc", "lat": -34, "long": -49, "ride_id": "efe6468e-f443-463f-a21c-4cb0f6ecf235", "timestamp": "2023-06-02T15:11:34.050312"}
{"city": "washington dc", "lat": 1E+1, "long": -27, "ride_id": "efe6468e-f443-463f-a21c-4cb0f6ecf235", "timestamp": "2023-06-02T15:11:36.408561"}
{"city": "washington dc", "lat": 83, "long": 84, "ride_id": "efe6468e-f443-463f-a21c-4cb0f6ecf235", "timestamp": "2023-06-02T15:11:38.026542"}
The output will only include the row's history that has been changed within the garbage collection window. If the change occurred outside of the garbage collection window, it will not be returned as part of this output. See Garbage collection and changefeeds for more detail on how the garbage collection window interacts with changefeeds.
Customize changefeed messages
You can adapt your changefeed messages by filtering the columns, but it is also possible to build message fields with SQL expressions.
In this example, the query adds a summary
field to the changefeed message:
CREATE CHANGEFEED INTO 'external://cloud' AS SELECT *, owner_id::string || ' takes passengers by ' || type || '. They are currently ' || status AS summary FROM vehicles;
{"city": "los angeles", "creation_time": "2019-01-02T03:04:05", "current_location": "43051 Jonathan Fords Suite 36", "ext": {"color": "red"}, "id": "99999999-9999-4800-8000-000000000009", "owner_id": "9eb851eb-851e-4800-8000-00000000001f", "status": "in_use", "summary": "9eb851eb-851e-4800-8000-00000000001f takes passengers by scooter. They are currently in_use", "type": "scooter"}
{"city": "new york", "creation_time": "2019-01-02T03:04:05", "current_location": "64110 Richard Crescent", "ext": {"color": "black"}, "id": "00000000-0000-4000-8000-000000000000", "owner_id": "051eb851-eb85-4ec0-8000-000000000001", "status": "in_use", "summary": "051eb851-eb85-4ec0-8000-000000000001 takes passengers by skateboard. They are currently in_use", "type": "skateboard"}
{"city": "new york", "creation_time": "2019-01-02T03:04:05", "current_location": "64297 Ballard Hollow Suite 30", "ext": {"brand": "Pinarello", "color": "blue"}, "id": "0d393d59-82c0-4762-84d0-71a445283c53", "owner_id": "521a31b0-c8ff-40c4-baac-23f7daa66562", "status": "available", "summary": "521a31b0-c8ff-40c4-baac-23f7daa66562 takes passengers by bike. They are currently available", "type": "bike"}
{"city": "new york", "creation_time": "2019-01-02T03:04:05", "current_location": "86667 Edwards Valley", "ext": {"color": "black"}, "id": "11111111-1111-4100-8000-000000000001", "owner_id": "147ae147-ae14-4b00-8000-000000000004", "status": "in_use", "summary": "147ae147-ae14-4b00-8000-000000000004 takes passengers by scooter. They are currently in_use", "type": "scooter"}
{"city": "new york", "creation_time": "2019-01-02T03:04:05", "current_location": "64297 Ballard Hollow Suite 30", "ext": {"brand": "Pinarello", "color": "blue"}, "id": "64b6fc6a-8019-4b1f-bc30-0a186197ec68", "owner_id": "30583448-8eeb-48e6-8b0d-9842bb26e991", "status": "available", "summary": "30583448-8eeb-48e6-8b0d-9842bb26e991 takes passengers by bike. They are currently available", "type": "bike"}
{"city": "new york", "creation_time": "2019-01-02T03:04:05", "current_location": "64297 Ballard Hollow Suite 30", "ext": {"brand": "Pinarello", "color": "blue"}, "id": "7bd82867-8e7b-4811-a4f9-3e938792fe6c", "owner_id": "e1f215d8-1c47-47a2-b6f8-e8128db2eefb", "status": "available", "summary": "e1f215d8-1c47-47a2-b6f8-e8128db2eefb takes passengers by bike. They are currently available", "type": "bike"}
Create a scheduled changefeed to export filtered data
This example creates a nightly export of some filtered table data with a scheduled changefeed that will run just after midnight every night. The changefeed uses CDC queries to query the table and filter the data it will send to the sink:
CREATE SCHEDULE sf_skateboard FOR CHANGEFEED INTO 'external://cloud-sink' WITH format=csv
AS SELECT current_location, id, type, status FROM vehicles
WHERE city = 'san francisco' AND type = 'skateboard'
RECURRING '1 0 * * *' WITH SCHEDULE OPTIONS on_execution_failure=retry, on_previous_running=start;
The schedule options control the schedule's behavior:
- If it runs into a failure,
on_execution_failure=retry
will ensure that the schedule retries the changefeed immediately. - If the previous scheduled changefeed is still running,
on_previous_running=start
will start a new changefeed at the defined cadence.
Queries and the outbox pattern
The transactional outbox pattern provides a way to publish events reliably through an outbox table before sending to the messaging system. CDC queries can help to streamline this process by eliminating the need for an outbox table in the database. If you also have a requirement to transform the data or remove delete messages from the changefeed payload, queries can achieve this.
For example, you have three tables: users
, accounts
, and dogs
. You need to send all changes to any of those tables to a single Kafka endpoint using a specific structure. Namely, a JSON object like the following:
{
"event_timestamp": 1663698160437524000,
"table": "dogs",
"type": "create",
"data": "{ \"good_boy\": true }"
}
To achieve this, you create changefeeds directly on the tables and transform the result into the required format.
For the previous JSON example:
CREATE CHANGEFEED INTO 'kafka://endpoint?topic_name=events' AS SELECT
event_schema_timestamp()::int AS event_timestamp,
'dogs' AS table,
event_op() AS type,
jsonb_build_object('good_boy',good_boy) AS data
FROM dogs;
This statement does the following:
- Selects the
event_timestamp
of the event and casts to anINT
. - Sets the
type
of change using theevent_op()
function. - Uses
jsonb_build_object()
to construct the desired data field.
For the remaining tables, you use the same statement structure to create changefeeds that will send messages to the Kafka endpoint:
CREATE CHANGEFEED INTO 'kafka://endpoint?topic_name=events' AS SELECT
event_schema_timestamp()::int AS event_timestamp,
'users' AS table,
event_op() AS type,
jsonb_build_object('email', email, 'admin', admin) AS data
FROM users;
CREATE CHANGEFEED INTO 'kafka://endpoint?topic_name=events' AS SELECT
event_schema_timestamp()::int AS event_timestamp,
'accounts' AS table,
event_op() AS type,
jsonb_build_object('owner', owner) AS data
FROM accounts;
For a different usage of the outbox pattern, you may still want an events table to track and manage the lifecycle of an event. You can also use CDC queries in this case to filter the event management metadata out of a message.
For example, when you delete a message in your outbox table after processing it (or with row-level TTL). You can filter the delete messages from your changefeed:
CREATE CHANGEFEED INTO 'kafka://endpoint?topic_name=events' AS SELECT * FROM outbox WHERE event_op() != 'delete';
Similarly, if you have a status column in your outbox table tracking its lifecycle, you can filter out updates as well so that only the initial insert sends a message:
CREATE CHANGEFEED INTO 'scheme://sink-URI' AS SELECT status, cdc_prev FROM outbox WHERE (cdc_prev).status IS NULL;
Since all non-primary key columns will be NULL
in the cdc_prev
output for an insert message, insert messages will be sent. Updates will not send, as long as the status was not previously NULL
.
Queries and user-defined functions
You can create CDC queries that include user-defined functions.
The following CREATE FUNCTION
statement builds the doubleRevenue()
function at the database level:
CREATE FUNCTION doubleRevenue(r int)
RETURNS INT IMMUTABLE LEAKPROOF LANGUAGE SQL AS
$$ SELECT 2 * r $$;
You can then use this function within a CDC query tagetting a table in the same database:
CREATE CHANGEFEED INTO 'external://sink' AS SELECT rider_id, doubleRevenue(rides.revenue::int) FROM rides WHERE revenue < 30;
Video Demo
For a demo on how to harness CDC Queries to filer and produce JSON events, watch the following video: