PostgreSQL-CDC

    Supported Version

    In order to set up the PostgreSQL Extract Node, the following provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with Sort Connectors JAR bundles.

    Setup PostgreSQL server

    Change Data Capture (CDC) allows you to track and propagate changes in a PostgreSQL database to downstream consumers based on its Write-Ahead Log (WAL). You need to ensure that the upstream database is configured to support logical replication. Before using the PostgreSQL connector to monitor the changes committed on a PostgreSQL server, decide which logical decoding plug-in you intend to use. If you plan not to use the native pgoutput logical replication stream support, then you must install the logical decoding plug-in into the PostgreSQL server.

    pgoutput

    1. Check the wal_level configuration setting:
    1. SHOW wal_level;

    The default value is replica. For CDC, you’ll need to set it to logical in the database configuration file (postgresql.conf). Keep in mind that changing the wal_level requires a restart of the Postgres instance and can affect database performance.

    1. To configure the replication slot regardless of the decoder being used, specify the following in the postgresql.conf file:

    decoderbufs is based on Protobuf and maintained by the Debezium community. it.

    1. shared_preload_libraries = 'decoderbufs'
    1. To configure the replication slot regardless of the decoder being used, specify the following in the postgresql.conf file:

    Usage for SQL API

    1. CREATE TABLE `postgresTable`(
    2. `name` STRING,
    3. `age` INT
    4. ) WITH (
    5. 'connector' = 'postgres-cdc',
    6. 'hostname' = 'localhost',
    7. 'password' = 'inlong',
    8. 'database-name' = 'postgres',
    9. 'schema-name' = 'public',
    10. 'port' = '5432',
    11. 'table-name' = 'user',
    12. 'decoding.plugin.name' = 'pgoutput',
    13. 'slot.name' = 'feaafacbaddadc'
    14. )

    Usage for InLong Manager Client

    TODO: It will be supported in the future.

    PostgreSQL Extract Node Options

    Note: slot.name is recommended to set for different tables to avoid the potential PSQLException: ERROR: replication slot “flink” is active for PID 974 error.
    Note: PSQLException: ERROR: all replication slots are in use Hint: Free one or increase max_replication_slots. We can delete slot by the following statement.

    The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields:

    1. db_name STRING METADATA FROM 'database_name' VIRTUAL,
    2. table_name STRING METADATA FROM 'table_name' VIRTUAL,
    3. `name` STRING,
    4. `age` INT
    5. ) WITH (
    6. 'connector' = 'postgres-cdc',
    7. 'hostname' = 'localhost',
    8. 'username' = 'postgres',
    9. 'password' = 'inlong',
    10. 'database-name' = 'postgres',
    11. 'schema-name' = 'public',
    12. 'port' = '5432',
    13. 'table-name' = 'user',
    14. 'decoding.plugin.name' = 'pgoutput',
    15. 'slot.name' = 'feaafacbaddadc'

    Data Type Mapping