In a sentence, CDC or Change Data Capture is an event driven pattern that involves watching a data store for changes and then producing events to a message broker when those changes occur to then allow for an event consumer, that is listening for these messages, to consume and act upon them.
At Ori, we've been experimenting with CDC to significantly reduce complexity of handling database changes, specifically around our notification and history features.
In this post I'm going to go over a practical example of how to create a simple micro-blogging-esque application to demonstrate this pattern with a PostgreSQL database, Apache Pulsar as the message broker, Debezium for the CDC component and Go for the application code itself. Much of, if not all of, what will be demonstrated using these tools is also achievable using another type of database such as MySQL or Cassandra.
Similarly, there are alternate choices for a message broker that would also work such as Apache Kafka. If you're already familiar with Kafka and haven't yet played around with Pulsar, this makes a great excuse to do so. For using different tooling there is documentation on Debezium's website which details how you may configure for a different database or message broker.
The only commands that are to be ran on the host to get this up and running are:
docker
docker-compose
go
bash
curl
cat
This keeps things simple and only really requires Docker and Go to be installed on most systems for you to follow along or create a similar kind of setup. While all the software we depend on, such as Postgres, will be run in Docker containers, the Go parts of the application will be compiled and ran locally.
The commands in this post can each be executed in order. They will create all the required directories and files and also get everything built and running. Also note that this example is to highlight how these tools can work together and is not an by any means a production example so far as security among other things go.
The following is a diagram of how things will look:
With all that in mind, lets take a look at how this can all be put together.
We'll need somewhere to create the Dockerfiles and other configuration.
This is the sample config that we'll be using for Postgres. Adjust where needed but the main things to note here are the fact we are specifying the shared_preload_libraries
as decoderbufs,wal2json
as both of these will be used as part of our CDC setup. wal2json
is a popular plugin that will be used with Postgres to decode data from the transaction log to JSON which we can then use to produced as an event. We also need to set the wal_level
to logical
. decoderbufs
is the other plugin used which is the decoder used to deliver data as protocol buffers, for use with Debezium.
Rather than pulling a pre-built Docker image, we'll create them from a Dockerfile so that is it a little clearer where everything is and how the various pieces of software work with each other.
First up is the Dockerfile for our Postgres setup, which is essentially the official Postgres image with two additional plugins we chose to install.
You'll notice that a couple of things are being installed here. Two popular plugins for Postgres mentioned earlier in Postgres' configuration - the wal2json
and decoderbuffs
plugins. Other than that it is mostly plain old Postgres. Let's now build this image:
And now for Pulsar:
Notice that here all we are doing in addition to using the official Pulsar image is installing the Debezium CDC connector. Let's now build this one:
With both images now built, we'll use docker-compose
to bring up an instance of each:
There's not a lot going on in docker-compose.yaml
other than setting some env vars for Portgres and exposing ports so that we're able to connect to Postgres and Pulsar. We can now bring both of these up (you can bring these up right away or you may wait until we've also written the application code):
The application consists of basically three things. There is going to be an HTTP server that handles incoming requests, a database table to store new posts in and an additional application to consume events from Pulsar. We'll start with the database.
Next lets create a new schema and table for our microposts to be stored in. To make this handy for future use we'll store in a script:
Here we are simply creating a new schema and a new microblog.posts
table to store the contents of a post. Let's now create those on-the-fly:
In the above command we're creating a script that executes a command in the Pulsar container which configures Debezium to watch for data changes in the tables within the microblog
schema by giving Pulsar access to our database using the postgres
user (you'll want to create a new Postgres user for this in a production environment).
An important point here is that the name of the topic events will be produced to are predetermined (and in our case created automatically) using this format: ${server_name}.${schema_name}.${table_name}
which means that in our case we will be interested in consuming on the topic named microblogsrv.microblog.posts
.
Let's get that running and detach from it.
To start of we'll need to initialize a new Go project and fetch the required dependencies.
Next we'll create the application server itself. We'll keep things as simple as possible here and not try anything fancy. Just an HTTP server using Go's http
package and a simple database connection pool so that records can be created in our Postgres instance:
Let's start the server now to verify that it can connect to our database and listen on the specified port:
This may take a few moments to build for the first run. If all is well, you should see the following message once it is running:
This means we can now attempt to make a request and see if our post gets created. But before we do that, lets first consume on the topic where events are produced to when new posts are created. Similar to Kafka, Pulsar also comes with a bunch of useful commands that allow you to easily interact with the instance. We'll use the pulsar-client consume
command within the Pulsar container to verify that Debezium is producing these events when we create a new post.
Again we'll create a small script for the consumer so that it can be used again in the future if needed. With the server running in our initial tab, you'll need to open a new one and run the following:
And now let's execute that:
There will be some initial output, and then it will wait for new events to be produced.
With Postgres, Pulsar, Debezium and our application server now all running and configured, we can open up a new terminal window and make a request to the server:
And if all went well, we should have got a 201 response that resembles the following:
We can expand the JSON output from that message to see what fields were set:
This verifies that our post was created in the database and that a message was sent to Pulsar. As you can see in the JSON message, we have access to the data in the row that was created along with some useful metadata and source data.
We can also take a look in the database to see our newly created post:
Let's now run that:
Going a step further with this, we can also now write our own consumer application in Go to actually do something with the event:
A few things to note in the consumer. The microblogEvent
type is a type I've created based on the JSON of the Pulsar messages. I've included all fields that came with the message so that you have a better idea of what data and fields you are given access to.
Pulsar's official Go SDK is used to connect to our Pulsar instance and consume on the microblogsrv.microblog.posts
topic. Once a message is received it is going to print out a message that contains the title of the newly created post.
We can now Ctrl+C out of the consumer we started earlier and start our custom Go consumer:
Note that this may also take a few moments to compile the dependencies. You'll see output indicating that the consumer has started once it is ready. In another window we can run a similar curl
command to the previous one to create a second post:
If you switch back to the custom consumer tab you will notice that is has received a new message and in our case will have printed out the title of the new post.
And there we have an end-to-end sample application that demonstrates how you can implement CDC using Postgres, Pulsar, Debezium and Go. To bring everything down it is as simple as:
To run a second time given that the Docker images are already built, we can use the scripts created throughout this post to get everything running again like so and verify it is all working:
And with that we have a very simple end-to-end application that utilises Debezium to demonstrate CDC in a fairly hands-on manner. There are lots of use cases to CDC. Some common ones include audit logging, keeping data in sync across a microservice architecture, data replication, cache invalidation and the list goes on. The basic idea is that we are becoming reactive to our data and can act when changes to data occur.