PostgreSQL Database CDC
Overview
Section titled “Overview”The PostgresSourceOperator uses logical replication and the wal2json plugin to capture changes from PostgreSQL in real-time. It optionally sets up or verifies a publication and logical replication slot if configured to do so.
Key Features
- Optional Automatic Setup: When
ConfigureCDCInServer = true, attempts to create a publication (my_publication) and a logical replication slot (my_slot) if they do not exist. - Replica Identity: Supports setting
REPLICA IDENTITYtoDEFAULTorFULL(viaReplicaIdentityMode) for more complete update/delete data. - Initial Load: If
DoInitialLoad = true, performs a singleSELECT * FROM schema.tablebefore streaming from the WAL. - Checkpointing: Maintains the last processed LSN in the state store to resume from the same position upon restart.
Server Configuration Prerequisites for PostgreSQL
Section titled “Server Configuration Prerequisites for PostgreSQL”-
wal_level = logical
In your postgresql.conf, ensure:wal_level = logicalThen restart PostgreSQL for changes to take effect.
-
Role Privileges
The user connecting must have replication privileges (superuser or a role with replication rights) in order to create and read from logical replication slots. -
Plugin Installation
- The operator uses the wal2json plugin. Make sure wal2json is installed on your PostgreSQL instance. On some distributions, you may need to install it separately.
Installing wal2json:
On many systems, wal2json is included by default. Otherwise, install from your package manager or build from source. For example on Debian/Ubuntu:sudo apt-get install postgresql-14-wal2json -
Publication / Slot
- If
ConfigureCDCInServer = false, you must manually create a publication and replication slot. Example:
CREATE PUBLICATION my_publication FOR TABLE my_schema.my_table; SELECT * FROM pg_create_logical_replication_slot('my_slot', 'wal2json'); - If
Basic Usage Example
Section titled “Basic Usage Example”using Cortex.Streams;
using Cortex.Streams.PostgreSQL;
using Cortex.States;
// 1. Setup connection & table info
string connectionString = "Host=myHost;Database=myDB;Username=myUser;Password=myPass;";
string schemaName = "public";
string tableName = "Customers";
// 2. Configure Postgres settings
var postgresSettings = new PostgresSettings
{
DoInitialLoad = true,
PullInterval = TimeSpan.FromSeconds(3),
ConfigureCDCInServer = true,
ReplicaIdentity = ReplicaIdentityMode.Full // Ensure full row data for DELETEs
};
// 3. Create the Postgres CDC source
var pgCdcOperator = new PostgresSourceOperator(
connectionString,
schemaName,
tableName,
slotName: "my_slot",
publicationName: "my_publication",
postgresSettings
);
// 4. Build and start the stream
var stream = StreamBuilder<PostgresRecord, PostgresRecord>
.CreateNewStream("Postgres CDC Stream")
.Stream(pgCdcOperator)
.Sink(record =>
{
Console.WriteLine($"[PostgresCDC] {record.Operation} => {record.Data.Count} columns changed.");
})
.Build();
stream.Start();Handling Complex Scenarios
Section titled “Handling Complex Scenarios”- Multiple Tables: Create multiple operators (or a single publication with multiple tables) and configure them accordingly.
- Clustering / HA: For high availability, ensure the replication slot is managed in your failover strategy.
- Performance: Tuning
wal2jsonparameters, likepretty-print, or chunking large transactions, can improve throughput.