kafka_fdw
kafka_fdw : kafka Foreign Data Wrapper for CSV formatted messages
Overview
| ID | Extension | Package | Version | Category | License | Language |
|---|---|---|---|---|---|---|
| 8730 | kafka_fdw
|
kafka_fdw
|
0.0.3 |
FDW
|
PostgreSQL
|
C
|
| Attribute | Has Binary | Has Library | Need Load | Has DDL | Relocatable | Trusted |
|---|---|---|---|---|---|---|
--s-d-r
|
No
|
Yes
|
No
|
Yes
|
yes
|
no
|
| Relationships | |
|---|---|
| See Also | pgmq
mongo_fdw
redis_fdw
wrappers
multicorn
redis
hdfs_fdw
wal2json
|
Packages
| Type | Repo | Version | PG Major Compatibility | Package Pattern | Dependencies |
|---|---|---|---|---|---|
| EXT | PIGSTY
|
0.0.3 |
18
17
16
15
14
|
kafka_fdw |
- |
| RPM | PIGSTY
|
0.0.3 |
18
17
16
15
14
|
kafka_fdw_$v |
- |
| DEB | PIGSTY
|
0.0.3 |
18
17
16
15
14
|
postgresql-$v-kafka-fdw |
- |
| Linux / PG | PG18 | PG17 | PG16 | PG15 | PG14 |
|---|---|---|---|---|---|
el8.x86_64
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
el8.aarch64
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
el9.x86_64
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
el9.aarch64
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
el10.x86_64
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
el10.aarch64
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
d12.x86_64
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
d12.aarch64
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
d13.x86_64
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
d13.aarch64
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
u22.x86_64
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
u22.aarch64
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
u24.x86_64
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
u24.aarch64
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
PIGSTY 0.0.3
|
Source
pig build pkg kafka_fdw; # build rpm/debInstall
Make sure PGDG and PIGSTY repo available:
pig repo add pgsql -u # add both repo and update cacheInstall this extension with pig:
pig install kafka_fdw; # install via package name, for the active PG version
pig install kafka_fdw -v 18; # install for PG 18
pig install kafka_fdw -v 17; # install for PG 17
pig install kafka_fdw -v 16; # install for PG 16
pig install kafka_fdw -v 15; # install for PG 15
pig install kafka_fdw -v 14; # install for PG 14Create this extension with:
CREATE EXTENSION kafka_fdw;Usage
Syntax:
CREATE EXTENSION kafka_fdw; CREATE SERVER kafka_server FOREIGN DATA WRAPPER kafka_fdw OPTIONS (brokers 'localhost:9092');Source: README
kafka_fdw is a foreign data wrapper that exposes Kafka messages as PostgreSQL foreign tables. The upstream README explicitly warns that the project is not yet production ready.
Server and Mapping
Define a foreign server with the Kafka broker list, then add a user mapping:
CREATE EXTENSION kafka_fdw;
CREATE SERVER kafka_server
FOREIGN DATA WRAPPER kafka_fdw
OPTIONS (brokers 'localhost:9092');
CREATE USER MAPPING FOR PUBLIC SERVER kafka_server;Foreign Tables
Kafka foreign tables must declare two metadata columns, one marked with partition 'true' and one marked with offset 'true'. The remaining columns describe the message payload.
CSV Messages
CREATE FOREIGN TABLE kafka_test (
part int OPTIONS (partition 'true'),
offs bigint OPTIONS (offset 'true'),
some_int int,
some_text text,
some_date date,
some_time timestamp
)
SERVER kafka_server
OPTIONS (
format 'csv',
topic 'contrib_regress',
batch_size '30',
buffer_delay '100'
);For CSV, columns are mapped by position. Upstream notes that schema enforcement depends on the message writer, so strict parsing and junk-handling options matter when input quality is uncertain.
JSON Messages
CREATE FOREIGN TABLE kafka_test_json (
part int OPTIONS (partition 'true'),
offs bigint OPTIONS (offset 'true'),
some_int int OPTIONS (json 'int_val'),
some_text text OPTIONS (json 'text_val'),
some_date date OPTIONS (json 'date_val'),
some_time timestamp OPTIONS (json 'time_val')
)
SERVER kafka_server
OPTIONS (
format 'json',
topic 'contrib_regress_json',
batch_size '30',
buffer_delay '100'
);For JSON, each column can map to an object key with the json option. The current implementation supports JSON objects, not top-level JSON arrays.
Querying and Producing
The offset and partition columns are special, and the upstream README recommends specifying them in queries whenever possible:
SELECT * FROM kafka_test WHERE part = 0 AND offs > 1000 LIMIT 60;
SELECT *
FROM kafka_test
WHERE (part = 0 AND offs > 100)
OR (part = 1 AND offs > 300)
OR (part = 3 AND offs > 700);Messages can also be produced with INSERT statements. If a partition value is supplied, it is used; otherwise Kafka’s builtin partitioner chooses one:
INSERT INTO kafka_test(part, some_int, some_text)
VALUES
(0, 5464565, 'some text goes into partition 0'),
(NULL, 5464565, 'some text goes into partition selected by kafka');Error Handling
The default behavior is permissive:
- missing trailing columns are treated as
NULL - extra fields are ignored
- unparsable values still raise errors by default
Relevant table options and helper columns include:
strict 'true'to reject column count mismatchesignore_junk 'true'to set malformed values toNULL- columns marked
junk 'true'to capture the original payload - columns marked
junk_error 'true'to capture parsing errors
Build Notes
The extension uses librdkafka and the upstream build instructions are the standard:
make && make install
make installcheckThe test setup assumes Kafka on localhost:9092 and ZooKeeper on localhost:2181.