PostgreSQL-CDC
Supported Version
In order to set up the PostgreSQL Extract Node
, the following provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with Sort Connectors JAR bundles.
Setup PostgreSQL server
Change Data Capture (CDC) allows you to track and propagate changes in a PostgreSQL database to downstream consumers based on its Write-Ahead Log (WAL). You need to ensure that the upstream database is configured to support logical replication. Before using the PostgreSQL connector to monitor the changes committed on a PostgreSQL server, decide which logical decoding plug-in you intend to use. If you plan not to use the native pgoutput
logical replication stream support, then you must install the logical decoding plug-in into the PostgreSQL server.
pgoutput
- Check the
wal_level
configuration setting:
SHOW wal_level;
The default value is replica. For CDC, you’ll need to set it to logical
in the database configuration file (postgresql.conf). Keep in mind that changing the wal_level requires a restart of the Postgres instance and can affect database performance.
- To configure the replication slot regardless of the decoder being used, specify the following in the postgresql.conf file:
decoderbufs is based on Protobuf and maintained by the Debezium community. it.
shared_preload_libraries = 'decoderbufs'
- To configure the replication slot regardless of the decoder being used, specify the following in the postgresql.conf file:
Usage for SQL API
CREATE TABLE `postgresTable`(
`name` STRING,
`age` INT
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'localhost',
'password' = 'inlong',
'database-name' = 'postgres',
'schema-name' = 'public',
'port' = '5432',
'table-name' = 'user',
'decoding.plugin.name' = 'pgoutput',
'slot.name' = 'feaafacbaddadc'
)
Usage for InLong Manager Client
TODO: It will be supported in the future.
PostgreSQL Extract Node Options
Note: slot.name
is recommended to set for different tables to avoid the potential PSQLException: ERROR: replication slot “flink” is active for PID 974 error.
Note: PSQLException: ERROR: all replication slots are in use Hint: Free one or increase max_replication_slots. We can delete slot by the following statement.
The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields:
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
`name` STRING,
`age` INT
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'localhost',
'username' = 'postgres',
'password' = 'inlong',
'database-name' = 'postgres',
'schema-name' = 'public',
'port' = '5432',
'table-name' = 'user',
'decoding.plugin.name' = 'pgoutput',
'slot.name' = 'feaafacbaddadc'