PostgreSQL → ClickHouse CDC Support — Full Production Pipeline with Verification Toolkit#1245
Open
minguyen9988 wants to merge 71 commits intodevelopfrom
Open
PostgreSQL → ClickHouse CDC Support — Full Production Pipeline with Verification Toolkit#1245minguyen9988 wants to merge 71 commits intodevelopfrom
minguyen9988 wants to merge 71 commits intodevelopfrom
Conversation
subkanthi
reviewed
Mar 2, 2026
subkanthi
reviewed
Mar 2, 2026
subkanthi
requested changes
Mar 2, 2026
subkanthi
requested changes
Mar 2, 2026
subkanthi
requested changes
Mar 2, 2026
added 10 commits
March 3, 2026 06:48
added 3 commits
March 4, 2026 02:14
added 4 commits
March 13, 2026 20:40
- Fix testSourceDirectory in sink-connector/pom.xml: changed from src/test to src/test/java to prevent java.lang.SecurityException (prohibited package name java.*) - Update ClickHouseDataTypeMapperDDLTest: DateTime64 expectations now include UTC timezone - Update ClickHouseTableOperationsBaseTest: DateTime64 expectations now include UTC timezone
This was referenced Mar 16, 2026
subkanthi
requested changes
Mar 24, 2026
…for heartbeat events - Consolidate 30 individual @test methods into 7 @ParameterizedTest methods with @MethodSource in UtilsSchemaAwareTest.java (addresses review comment #22) - Add null check for chStruct in DebeziumChangeEventCapture.java:1184 to skip Debezium internal heartbeat events that produce null ClickHouseStruct
subkanthi
requested changes
Mar 31, 2026
subkanthi
requested changes
Mar 31, 2026
… delegate to DBMetadata
subkanthi
requested changes
Apr 4, 2026
…stgresConnectorConfig, move extractDebeziumSchema to ClickHouseConverter, reduce test timeouts - Restore ReplicatedRMTClickHouse22TIT and ReplicatedRMTDDLClickHouse22TIT tests from origin/2.9.1, upgrading ClickHouse Docker image from 22.3 to 24.8 - Move extractDebeziumSchema() and safeGet() from PostgresSchemaChangeDetector to ClickHouseConverter as public static methods; PostgresSchemaChangeDetector now delegates to ClickHouseConverter.extractDebeziumSchema() - Extract PostgresConnectorConfig class from DebeziumChangeEventCapture to encapsulate postgres-specific fields (schemaPrefixEnabled, databaseSchemaSuffix, commonSchemaTemplate, commonDatabasePrefix, postgresSchemaChangeDetector) and their initialization logic; removes TODO comment - Reduce AlterTableModifyColumnIT retry count from 40 to 10 (max wait from 200s to 50s)
subkanthi
added a commit
that referenced
this pull request
Apr 7, 2026
Snapshot of sink-connector/python from minguyen/postgres (PR #1245) for review and merge independent of the Postgres/Java connector changes. Includes Python-focused .gitignore entries for the toolkit. Made-with: Cursor
…python) Restore sink-connector/python to match 2.9.1 so PR #1245 focuses on Postgres/Java connector work. Python toolkit changes live on feature/ch-sink-tools-python for a dedicated PR. Made-with: Cursor
Collaborator
Collaborator
Author
|
thanks |
added 2 commits
April 7, 2026 22:10
…ing type When PostgreSQL text[] columns are auto-created as Nullable(String) in ClickHouse, the ARRAY branch in PreparedStatementFieldMapper calls setArray() which triggers ClassCastException. Now checks the CH column type first — if it is not Array(), serializes the array value as a JSON string using setString(). Fixes ClassCastException on LiteLLM_VerificationToken table for columns: models, allowed_cache_controls, allowed_routes, policies, access_group_ids
…taException PostgreSQL connectors use "db" in the source struct, not "databaseName" (MySQL). Check schema.field() existence before calling Struct.get() to avoid DataException on every record during schema drift detection.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
PR: PostgreSQL → ClickHouse CDC Support — Full Production Pipeline with Verification Toolkit
Table of Contents
ch-sink-tools)Summary
This PR brings PostgreSQL → ClickHouse CDC replication from 40% production readiness to 100%, covering the entire journey from a state where only INSERT operations worked through to production-verified replication of 97.7 million rows across 36 tables with cryptographic integrity verification. The work spans the Java sink connector (ANTLR4-based DDL parser, factory pattern isolation, schema drift detection), a new Python toolchain for parallel bulk snapshots and cross-database checksum verification, and 18 iterative validation runs that identified and explained every single data divergence down to 6 rows (0.0000083%). All four production readiness gates (Development, Staging, Pilot, Full Rollout) now pass.
Motivation / Problem Statement
What Was Broken
Before this branch, the PostgreSQL CDC path in the ClickHouse sink connector was in an incomplete state:
postgres_dumper.py, no bulk snapshot pipelineintervaltype crashes the parsersnapshot.mode: initialWhat Worked
pgoutputplugin connecting to PostgreSQL logical replicationReplacingMergeTreetable creation with_version/is_deletedcolumnsWhy This Work Was Needed
The connector was being deployed for production use against a 97.7M-row PostgreSQL database (system). With UPDATE, DELETE, and TRUNCATE untested, no bulk snapshot mechanism, and a DDL parser that crashed on
intervaltypes, the deployment would have resulted in silent data loss, schema corruption, and undetectable divergence. There was no way to even verify whether replication was correct — checksum verification did not exist.What Changed
Java Sink Connector
New Files
PostgreSQLDDLParserService.javaintervaland other complex typesDDLParserFactory.javaMySQLDDLParserServiceorPostgreSQLDDLParserServicebased on database typePostgreSQLDDLListener.javaPostgresSchemaChangeDetector.javaPostgresSchemaReconciler.javaALTER TABLEstatements when schema drift is detectedKey Behavioral Changes
intervaltypes withTABLE METADATA not retrievederrors. The ANTLR4 parser handles all 40+ PostgreSQL types.DDLParserFactoryensures the MySQL DDL path is completely isolated from PostgreSQL changes, preventing regressions in the existing MySQL pipeline.ALTER TABLEstatements to ClickHouse.ANTLR4 Grammars
New Files
PostgreSQLParser.g4PostgreSQLLexer.g4These grammars live alongside the existing MySQL grammars (
MySqlParser.g4,MySqlLexer.g4) and are compiled by the ANTLR4 Maven plugin during the build.Python Tools (
ch-sink-tools)Package name:
ch-sink-toolsDistribution:
ch_sink_tools-0.2.0-py3-none-any.whlEntry points: 9 CLI commands
Snapshot Pipeline (New)
postgres_dumper.py_debezium_offsettable so the CDC connector starts from exactly the right position.postgres_type_mapper.pyCREATE TABLE ... ENGINE = ReplacingMergeTree(_version)statements withis_deletedand_versioncolumns.db/postgres.pypostgres_loader.pyclickhouse-client --query "INSERT INTO ... FORMAT TSV"Checksum Verification (New)
top_level_postgres_checksum.pypostgres_table_checksum.pyauto_diff.py_expressions.pyCLI Entry Points
ch-checksumch-pg-checksumch-pg-countch-ch-checksumch-ch-countch-pg-dumpch-mysql-checksumch-mysql-dumpch-mysql-loadIntegration Tests
New Test Classes
PostgresInitialDockerIT.javaPostgresInitialDockerWKeeperMapStorageIT.javaPostgresDeleteOperationsIT.javaPostgresUpdateOperationsIT.javaPostgresSchemaDriftIT.javaDeployment Configuration
config_postgres_system.ymlrun_checksum.shArchitecture
Two-Phase Snapshot + CDC Pipeline
Debezium's
snapshot.mode: initialcaused OOM on the 97.7M-row system database. The solution separates the initial snapshot from ongoing CDC:sequenceDiagram participant PG as PostgreSQL participant Dumper as postgres_dumper.py participant CH as ClickHouse participant Debezium as Sink Connector Note over PG,Dumper: Phase 1: Bulk Snapshot Dumper->>PG: SELECT pg_current_wal_lsn() PG-->>Dumper: LSN = 0/1A2B3C4D Dumper->>PG: COPY table TO STDOUT (×8 parallel threads) PG-->>Dumper: TSV data streams Dumper->>CH: INSERT INTO table FORMAT TSV (bulk load) Dumper->>CH: INSERT INTO _debezium_offset (saved_lsn) Note over Debezium,CH: Phase 2: CDC (snapshot.mode=never) Debezium->>PG: START_REPLICATION SLOT ... LSN 0/1A2B3C4D PG-->>Debezium: WAL stream (INSERT/UPDATE/DELETE/DDL) Debezium->>CH: ReplacingMergeTree INSERT (with _version, is_deleted)Key design decision: The saved LSN from the dump is injected into ClickHouse's
_debezium_offsettable so the connector starts from exactly the right WAL position — no data duplication, no data loss.Factory Pattern for MySQL/PostgreSQL Isolation
classDiagram class DDLParserFactory { +createParser(dbType: String) DDLParserService } class DDLParserService { <<interface>> +parseDDL(ddl: String) List~AlterStatement~ } class MySQLDDLParserService { +parseDDL(ddl: String) List~AlterStatement~ } class PostgreSQLDDLParserService { +parseDDL(ddl: String) List~AlterStatement~ } class PostgreSQLDDLListener { -alterStatements: List~AlterStatement~ +enterCreateTable(ctx) +enterAlterTable(ctx) +enterColumnDefinition(ctx) } DDLParserFactory ..> DDLParserService : creates DDLParserService <|.. MySQLDDLParserService DDLParserService <|.. PostgreSQLDDLParserService PostgreSQLDDLParserService --> PostgreSQLDDLListener : usesThis ensures the MySQL path is completely unaffected by any PostgreSQL DDL parsing changes.
PostgreSQL DDL Capture via Event Triggers
PostgreSQL logical replication does not capture DDL natively (unlike MySQL binlog). The solution uses a multi-step event trigger pipeline:
flowchart LR A[DDL Statement<br/>on PostgreSQL] -->|event trigger| B[_debezium_ddl_log<br/>metadata table] B -->|replicated via CDC| C[Sink Connector] C -->|DDLParserFactory| D{Database Type?} D -->|MySQL| E[MySQLDDLParserService<br/>existing ANTLR4 parser] D -->|PostgreSQL| F[PostgreSQLDDLParserService<br/>new ANTLR4 parser] F --> G[PostgreSQLDDLListener] G --> H[ClickHouse ALTER TABLE]_debezium_ddl_logmetadata table (replicated via CDC as regular data)DDLParserFactoryroutes to the appropriate parser based on database typePostgreSQLDDLParserServiceuses ANTLR4 to translate PostgreSQL DDL → ClickHouse DDLSchema Drift Detection and Auto-Reconciliation
flowchart TD A[CDC Event Arrives] --> B[PostgresSchemaChangeDetector] B --> C{Schema matches<br/>ClickHouse table?} C -->|Yes| D[Process normally] C -->|No| E[Detect change type] E --> F[ADD COLUMN] E --> G[DROP COLUMN] E --> H[ALTER COLUMN TYPE] E --> I[RENAME COLUMN] F --> J[PostgresSchemaReconciler] G --> J H --> J I --> J J --> K[Generate ALTER TABLE DDL] K --> L[Apply to ClickHouse] L --> DThe connector compares each incoming Debezium CDC event's schema against the current ClickHouse table schema. When drift is detected,
PostgresSchemaReconcilergenerates and applies the correspondingALTER TABLEstatement before processing the event.Checksum Verification Architecture
flowchart TD A[ch-pg-checksum] --> B[top_level_postgres_checksum.py] B --> C{WAL pause<br/>enabled?} C -->|Yes| D[Pause WAL replay on PG standby] C -->|No| E[Skip] D --> F[Flush connector via REST API] E --> F F --> G[Phase A: CH Snapshot<br/>~1.1 seconds] G --> H[Phase B: PG Query<br/>~15 minutes] H --> I{Compare per-table<br/>checksums} I -->|PASS| J[Report PASS] I -->|FAIL| K{Auto-diff<br/>enabled?} K -->|Yes| L[auto_diff.py<br/>Binary search] K -->|No| M[Report FAIL] L --> N[Report divergent rows]Three-Tier Checksum Algorithm
Four-Bucket MD5 Accumulation
Each row's MD5 hash (128 bits) is split into 4 × 32-bit integers. These are summed independently across all rows in each bucket, then compared between PostgreSQL and ClickHouse. This avoids single-hash collision vulnerabilities that arise when XOR-aggregating MD5 values.
Auto-Diff Binary Search
When checksums fail,
auto_diff.pyuses XOR-aggregate functions (bit_xoron PostgreSQL /groupBitXoron ClickHouse) to perform recursive 10-way chunk subdivision of the primary key range:flowchart TD A[Table FAIL: 72.4M rows] --> B[Split into 10 chunks] B --> C[XOR-aggregate each chunk on PG and CH] C --> D{Which chunks<br/>differ?} D --> E[Chunk 3: 7.2M rows] D --> F[Chunk 7: 7.2M rows] E --> G[Split into 10 sub-chunks] F --> H[Split into 10 sub-chunks] G --> I[...] H --> J[...] I --> K[6 divergent rows found] J --> K style A fill:#f96,stroke:#333 style K fill:#6f9,stroke:#333Performance: Found 6 divergent rows in 72,382,183 rows in 115 seconds (4 levels of 10-way subdivision).
Data Type Coverage
40+ PostgreSQL types mapped to ClickHouse equivalents:
smallint/int2Int16integer/int4/serialInt32bigint/int8/bigserialInt64numeric(p,s)/decimal(p,s)Decimal(p,s)real/float4Float32double precision/float8Float64smallserialInt16moneyDecimal(19,2)character(n)/char(n)Stringcharacter varying(n)/varchar(n)StringtextStringbyteaStringboolean/boolBooluuidUUIDdateDate32Date32supports 1900–2299)timestampDateTime64(6)timestamp with time zone/timestamptzDateTime64(6, 'UTC')timeStringtime with time zone/timetzStringintervalStringjsonStringjsonbStringxmlStringinetStringcidrStringmacaddrStringmacaddr8Stringbit(n)Stringbit varying(n)/varbit(n)StringpointStringlineStringlsegStringboxStringpathStringpolygonStringcircleStringinteger[]Array(Int32)text[]Array(String)uuid[]Array(UUID)boolean[]Array(Bool)hstoreMap(String, String)tstzrangeStringint4rangeStringnumrangeStringdaterangeStringenumStringtsvectorStringtsqueryStringoidUInt32Verification Results
Checksum Run Progression (Runs 11–28)
Final Metrics
xxx— 72,382,183 rowsRoot Cause Analysis
Auto-diff binary search in 72,382,183 rows found exactly 6 divergent rows (0.0000083%):
Divergent Row IDs: 365768857, 365995738, 366168733, 366197677, 366227330, 366227332
All 6 rows belong to the same mock user (
aabbas-mock-1,owner_id=1877) and share the same two root causes:xx.tags'→\') in JSONB string values during CDC serializationxx.untilDateTime64range limitxx.value'') to ClickHouseNULLforNullable(String)columnsWhy These Are Acceptable for Production
skip_columnsconfiguration excludes these columns from checksum comparison. The connector team has been notified for an upstream fix.Running Instructions
1. Build the Java Connector JAR
To run the tests (requires Docker):
2. Build the Python Wheel
After installation, all 9 CLI commands are available:
ch-pg-checksum --help ch-pg-dump --help # etc.3. Run the Initial Snapshot (postgres_dumper)
What this does:
COPY ... TO STDOUTfor each table in parallel (8 threads)CREATE TABLEDDL usingpostgres_type_mapper.pyclickhouse-client_debezium_offsettableExpected output (system, 36 tables):
4. Start the CDC Connector
Critical config settings for PostgreSQL:
5. Run Checksum Verification
What this does:
Expected output:
6. Run Auto-Diff
Auto-diff runs automatically when checksum verification finds FAIL tables (if enabled in config). To run manually:
Configuration Reference
The main configuration file is
config_postgres_system.yml. Key sections:PostgreSQL Connection
ClickHouse Connection
Checksum Configuration
Auto-Diff Configuration
Breaking Changes
DDLParserFactoryintroducedMySQLDDLParserServicesnapshot.mode: initialrequired for PostgreSQLinitialsnapshot for PostgreSQL (OOM on large databases)postgres_dumper.pyfor initial snapshot, then setsnapshot.mode: never_debezium_ddl_logtable required on PostgreSQLKnown Issues / Limitations
Connector Bugs (Upstream — Debezium / Sink Connector)
'') converted toNULLforNullable(String)columnsskip_columnsin checksum configxx'→\')alerts_alerteventClickHouse Type Limitations
DateTime64range capped at year 2299DateTime64(6)caps at 2299-12-31intervaltypeintervalhas no ClickHouse equivalentString— human-readable representation preservedtime/timetztypeStringAuto-Diff Known Issues
id, it conflicts with an internal alias in the XOR-aggregate query_xor_hashFiles Changed
New Files — Java
sink-connector-lightweight/src/main/java/.../ddl/parser/PostgreSQLDDLParserService.javasink-connector-lightweight/src/main/java/.../ddl/parser/DDLParserFactory.javasink-connector-lightweight/src/main/java/.../ddl/parser/PostgreSQLDDLListener.javasink-connector-lightweight/src/main/java/.../schema/PostgresSchemaChangeDetector.javasink-connector-lightweight/src/main/java/.../schema/PostgresSchemaReconciler.javaNew Files — ANTLR4 Grammars
sink-connector-lightweight/src/main/antlr4/postgres/PostgreSQLParser.g4sink-connector-lightweight/src/main/antlr4/postgres/PostgreSQLLexer.g4New Files — Integration Tests
sink-connector-lightweight/src/test/.../PostgresInitialDockerIT.javasink-connector-lightweight/src/test/.../PostgresInitialDockerWKeeperMapStorageIT.javasink-connector-lightweight/src/test/.../PostgresDeleteOperationsIT.javasink-connector-lightweight/src/test/.../PostgresUpdateOperationsIT.javasink-connector-lightweight/src/test/.../PostgresSchemaDriftIT.javaNew Files — Python
sink-connector/python/db_dump/postgres_dumper.pysink-connector/python/db_load/postgres_type_mapper.pysink-connector/python/db/postgres.pysink-connector/python/db_load/postgres_loader.pysink-connector/python/db_compare/top_level_postgres_checksum.pysink-connector/python/db_compare/postgres_table_checksum.pysink-connector/python/db_compare/auto_diff.pysink-connector/python/db_compare/_expressions.pyNew Files — Deployment
sink-connector-lightweight/deployment/system/config_postgres_system.ymlsink-connector-lightweight/deployment/system/run_checksum.shNew Files — Planning Documents
plans/postgres/*.md(23 files)Modified Files
sink-connector-lightweight/pom.xml.gitignorePlanning Documents
The
plans/postgres/directory contains 23 documents tracking the full journey from assessment through production verification:ch-sink-toolswheel package design with 9 CLI entry pointsStory Arc
The 23 documents trace six phases of work:
Assessment (Plans 01–09, EXECUTIVE-SUMMARY) — PostgreSQL CDC at 40% readiness. UPDATE/DELETE untested. No batch dump. Only INSERT works.
intervaltype crashes DDL parser.Architecture & Design (Plans 08, 10) — Factory pattern for MySQL/PostgreSQL isolation. ANTLR4 DDL parser replacing regex. Event trigger system for DDL capture. Two-phase snapshot+CDC pipeline design.
Implementation (Plan 11) — Python snapshot pipeline built and deployed to system.
interval → Stringmapping fix. ANTLR parser integrated. Full 36-table snapshot completed with 8 parallel threads.Checksum Development (Plans 12–16) — Three-tier checksum algorithm with four-bucket MD5 accumulation. LSN-aware verification. TZ bug found and fixed. WAL replay pause designed and implemented. Connector flush/resume API integrated.
Iterative Validation (Plans 13, 15, 17–18) — 18 checksum runs with progressive bug fixes. Run 26 achieved 36/36 PASS with 97.7M rows. Root causes of all mismatches identified.
Automation & Packaging (Plans 19–21) — Auto-diff binary search finds 6 divergent rows in 115 seconds.
ch_sink_toolswheel package with 9 CLI entry points.