Тёмный

Kafka Connect JDBC sink deep-dive: Working with Primary Keys 

Robin Moffatt
Подписаться 4,2 тыс.
Просмотров 12 тыс.
50% 1

The Kafka Connect JDBC Sink can be used to stream data from a Kafka topic to a database such as Oracle, Postgres, MySQL, DB2, etc. This video explains how to configure it to handle primary keys based on your data using the `pk.mode` and `pk.fields` configuration options.
✍️ [Blog] Kafka Connect JDBC Sink deep-dive: Working with Primary Keys rmoff.net/2021/03/12/kafka-co...
💾 Kafka Connect JDBC Connector download: www.confluent.io/hub/confluen...
📑 Kafka Connect JDBC Sink documentation: docs.confluent.io/kafka-conne...
----
☁️ Confluent Cloud (fully managed Apache Kafka, Kafka Connect, ksqlDB, Schema Registry): www.confluent.io/confluent-cl...
🤔 Questions? Join the Confluent Community at confluent.io/community/ask-th...
----
Learn more about Kafka Connect here:
🏃‍♂️ Quick: rmoff.dev/what-is-kafka-connect
🚶 More detail: rmoff.dev/zero-to-hero
----
🕐 Timecodes:
00:00:00 Introduction
00:00:12 What is Kafka Connect?
00:00:20 Introduction to the Kafka Connect JDBC sink connector
00:00:42 Primary Keys - introduction
00:01:16 Creating a connector - no primary key in the target database
00:06:12 Creating a connector - using a field from the message value as primary key in the target table
00:12:35 Configuring UPSERT operations for Kafka Connect JDBC sink connector
00:16:17 Creating a connector - composite key from fields in the value of the message
00:19:25 Keys in Kafka Messages
00:20:45 Primitives and Complex data types in Kafka messages keys
00:21:37 Creating a connector - setting the primary key using the a Kafka message's primitive key
00:29:26 Configuring DELETEs from tombstone messages
00:32:05 Using structured keys from Kafka message in the primary key of the target table
00:36:08 Specifying specific fields from structured key to use as primary key
00:37:42 Retaining a field from a structured key as a non-key field in the target table
00:41:34 Recap - pk.mode and pk.fields
00:42:23 Recap - Serialization

Наука

Опубликовано:

 

3 авг 2024

Поделиться:

Ссылка:

Скачать:

Готовим ссылку...

Добавить в:

Мой плейлист
Посмотреть позже
Комментарии : 43   
@prasa1
@prasa1 3 года назад
Thank You so much for this great video. Appreciate all the good work you are doing in terms of creating and publishing these demos
@rmoff
@rmoff 3 года назад
Glad it was helpful! :)
@almeida1738
@almeida1738 2 года назад
Excellent, well illustrated and so helpful! Thank you
@rmoff
@rmoff 2 года назад
Glad it was helpful!
@umangsingh4858
@umangsingh4858 Год назад
Great video.
@lupzhohenheim5333
@lupzhohenheim5333 3 года назад
Good stuff Robin.
@rmoff
@rmoff 3 года назад
Thanks :)
@ppsps5728
@ppsps5728 3 года назад
As always great video..Thanks a lot for sharing these with us..! Do you plan to connect Confluent CDC connector for Oracle in future videos? Also which terminal theme/plugin you're using .. Love that !!
@rmoff
@rmoff 3 года назад
Thanks for the feedback! I'll certainly add that connector to my TODO list :) I'm using oh-my-zsh with bira theme and a custom colour theme :)
@xingshengqian2395
@xingshengqian2395 2 года назад
Hi Robin, thank you for the video on JDBC Sink. Great content! I have 2 quick questions: 1) Whhen create a JDBC sink, what if the table already exists and you don't want to delete it and create from scratch 2) Can you change the primary key column as the first column instead of the last column in the sink table?
@rmoff
@rmoff 2 года назад
hi @Xingsheg - please post these to forum.confluent.io/ and I will try to answer there :)
@mamataranisen9262
@mamataranisen9262 2 года назад
Hi Robin, thanks for this demonstration. How can we produce tombstone messages ? I don't have debezium src connector in my system. Is there any workaround?
@sivakumarm1381
@sivakumarm1381 6 месяцев назад
Thank you Robin for excellent explanation. In all your examples you're using with auto.create option with true , where I was trying to auto.create with false where I am creating the table before hand but I am getting error as below. I have tried all possible option. Will you able to help . Thanks in advance. Caused by: io.confluent.connect.jdbc.sink.TableAlterOrCreateException: Table "table_name" is missing and auto-creation is disabled
@Extraneon
@Extraneon 3 года назад
Hi Robin, thanks for the video. I love the extra details! I have a question that is not so much about CDC, but on how to share the normalised data of a database as a nice single entity (document) with the organisation. As an example I imagine an order, which refers to an address and a list of order items each in their own table. The order, the address and the order items are all mutable - the customer can change it all and as a result I want to send to my clients a single Kafka document with everything in it. How would you go about this? Both in combining the 3 change streams into a single new stream regardless of what changed, and only sharing the nice API with the organisation? In short, I'd love a video with a special focus on sharing a maintainable API with the rest of the organisation.
@rmoff
@rmoff 3 года назад
Hi, this is a great question! Head over to forum.confluent.io/c/architecture-and-design/29 and post it there :)
@ismailchafai1767
@ismailchafai1767 3 года назад
Thank you so much Robin for all the videos, really really helpful. I've started from zero and growing to hero with your material. One topic that is still confusing to me is schema-registry in testing scenario. Do you have any material on this ? I've managed to use mock schema registry in unit tests using a url starting with mock://. However, when trying this in integration tests, I have the famous error `org.apache.kafka.common.errors.SerializationException: Error retrieving Avro unknown schema for id 1`. Any input ? Many thanks
@rmoff
@rmoff 3 года назад
Hi Ismail, I'm glad you find my videos useful :) Regarding your question I suggest you head over to forum.confluent.io/c/schema-registry/19 and ask there.
@kevinbegrow4748
@kevinbegrow4748 3 года назад
Thank you Robin, this video helped a lot. I have one question: I still having a performance issue when using sink connector in upsert mode and with pk for record_values and the PK over 3 fields. The lag grows quickly even though kafka connect workers is a 3 nodes cluster as well as broker. I use 30 partitions and 30 sink tasks. The DB it selfs is a SQL server which should have enough power. But we are talking about 220 million rows which would need to be created. Btw - same works fast in insert and pk set to none. Do you have any idea?
@rmoff
@rmoff 3 года назад
I don't have any suggestions, other than speak to your DBA :) The fact that it works fine without PK demonstrates that it's a performance issue on the database side.
@amawaziny
@amawaziny 3 года назад
Thank you so much for this great video, it helped me alot. Could you please make a video on Debezium connector, ksqlDB and JDBC Sink connector write changes to postgres? Example: two databases with different structure, got CDC from the source with Debezium and transform it with ksqlDB then write it down to postgres with JDBC Sink
@rmoff
@rmoff 3 года назад
Sure - do you have an example of the kind of different structures that you're dealing with?
@amawaziny
@amawaziny 3 года назад
Yes, the source db contains three tables 1- REQUESTS with columns (ID NUMBER PK, CREATE_DATE TIMESTAMP(6), CREATED_BY VARCHAR2(255 char)) 2- REQUEST_DETAILS with columns (REQUEST_ID FK, DISTRICT_ID NUMBER(19) FK , NOTES VARCHAR2(4000 char)) 3- DISTRICTS is a lookup table with columns (ID, NAME) And in Sink database there are just two tables 1- CASES with columns (ID NUMBER PK, CREATE_DATE TIMESTAMP(6), CREATED_BY VARCHAR2(255 char), DISTRICT_ID NUMBER(19) FK , NOTES VARCHAR2(4000 char)) 2- LIST_OF_VALUES with columns (ID, NAME, PARENT_ID FK on the same table) We want to capture changes on the source database using Debezium connector and transform them using ksqlDB then write them down with JDBC Sink connector. Note: the ids in the lookup table are not the same so I think we will use ksqlDB table to put this mapping and update it when changed, then join the captured changes with this table to get the correct id. Also it's not only one direction sync, it's bidirectional Thank you in advance :)
@rmoff
@rmoff 3 года назад
@@amawaziny Can you post this over at forum.confluent.io/and I can try and answer there
@walterferreiradossantos2378
@walterferreiradossantos2378 25 дней назад
Robin i need your help. I´m new on development with kafka. So i have kafka connect with cdc as consumer and producer with jdbc connect sink. I can to to a upsert correctly, but i can´t to do work the delete operation. It is possible to use jdbc connect sink for make work all operations like insert, update and delete...? Can you help me please with a example kafka connect sql server to sql server without use debezium?
@stavroskoureas9069
@stavroskoureas9069 Год назад
How to handle tables which are only having columns as primary key and not any other column value? The syntax generated from JDBC Sink is wrong -> java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "public"."ModelSegment" ("OID","tenantx_id") VALUES (6,1) ON CONFLICT ("OID","tenantx_id") DO UPDATE SET was aborted: ERROR: syntax error at end of input Any suggestion?
@raskoldi
@raskoldi 3 года назад
Hi Robin first of all thanks for effort and all other thing you do for help. So one thing is confusing me a lot is converters which you said at 4:49 "when you are reading a data from a topic we have to know to how to sterilized" when we reading data from topic to sink somewhere in DB so you know it was avro and you used avro converter however in your other video which "Kafka connect in action JDBC sink" you made similar example again creating stream with topic test01 and format was avro again but this time when configuring sink connector you specified the value converter with StringConverter instead of Avro like this video. So this converters are interchangeable like if we can use String Converter if it written to topic as Avro or vice versa?
@rmoff
@rmoff 2 года назад
Hi Ali, Can you link to the specific bit in the other video where I use a StringConverter? If the data is Avro, you have to use the Avro converter to deserialise it.
@EzzouineMohammed
@EzzouineMohammed 2 года назад
Hello Robin, I have a topic powered by kafka streams developed by ksql with criteria in the wehre part and the result is owned by kafka connect sink to a destination database, I want to delete the record from the database table if its value is changed over time and is not returned by the stream is this possible?
@shalandichannel
@shalandichannel 2 года назад
is it possible to handle composite primary keys on record_keys without having to declare them explicitly in the connector? I am using JDBC to sink many tables at once and many of these topics have different key names, some have just one PK and others have 2, all record keys are defined in JSON format, without AVRO schema. It would be nice to know a flexible approach to handle this.
@shalandichannel
@shalandichannel 2 года назад
i have found the answer myself. It works without declaring explicit fields in the pk.fields property
@piusono
@piusono Год назад
Thanks for the video. Though it is about 2 years old, I still find it useful. However, any idea why the error "Failed to create connector: {"error_code":409,"message":"Connector SINK_FOO_01_0 already exists"}" when I run for the first time, CREATE SINK CONNECTOR SINK_FOO_01_0 WITH (...) following your steps? Everything else worked up to that point.
@rmoff
@rmoff Год назад
Hi, I don't work with Kafka directly anymore. A good place to go for help is www.confluent.io/en-gb/community/ask-the-community/. thanks.
@sivakaranb7265
@sivakaranb7265 3 года назад
Hi Robin, Thanks for the video. I have a question related to this. How can we create single sink connectors for multiple tables with primary key column names are different in each table. Also how to do the same if we enable the delete mode. It will be helpful if you give some idea.
@rmoff
@rmoff 3 года назад
You can't - you need one connector per unique set of primary key column names
@sivakaranb7265
@sivakaranb7265 3 года назад
Oh thanks for the reply.. I got that.. so one connector for one table i need to go, Am i right?
@rmoff
@rmoff 3 года назад
@@sivakaranb7265 Yes - unless multiple tables happened to share the same primary key field(s). For example, if they all had a key of ID then you could (I think) have multiple topics/tables in a single connector
@sivakaranb7265
@sivakaranb7265 3 года назад
Ok. Thank you Robin.
@tirelibirefe
@tirelibirefe 2 года назад
can ksql connect to tls secured Kafka? How?
@rmoff
@rmoff 2 года назад
Yes, see docs.ksqldb.io/en/latest/operate-and-deploy/installation/server-config/security/#configure-ksqldb-for-secured-apache-kafka-clusters If you have more questions then head over to forum.confluent.io/
@MohamedAyman-wu6gk
@MohamedAyman-wu6gk Год назад
Hi Robin, I'm working with Strimzi Kafka on openshift, and when I pulled the images for KSQL server and KSQL cli with 0.15.0 versions I got this error while writing any command.: The server has encountered an incompatible entry in its log and cannot process further DDL statements. This is most likely due to the service being rolled back to an earlier version. and the problem that when I'm using newer versions or the latest version I got the below error when I'm trying to identify fields as KEY or PRIMARY KEY: KSQL currently only supports KEY columns named ROWKEY, extraneous input 'PRIMARY.' so im struggiling to deal with keys and i have to go live soon so please kindly your support. Thanks in advance.
@rmoff
@rmoff Год назад
Hi, A good place to go for help is www.confluent.io/en-gb/community/ask-the-community/. thanks.
@vinaygold20
@vinaygold20 2 года назад
Hi Robin, I have a scenario to do 'Insert Only'. I just need to allow inserts onto a table. I should not allow updates. To handle this, I user insert.mode as 'insert' but sink connector is failing with duplicates error (a primary key violation from rds mysql). how can i handle this situtaion. Please help.
@rmoff
@rmoff 2 года назад
Please ask this question at forum.confluent.io/. Thanks!
Далее
From Zero to Hero with Kafka Connect
33:49
Просмотров 27 тыс.
Integrating Oracle and Kafka
54:49
Просмотров 22 тыс.
Kafka Connect Iceberg Sink
40:20
Просмотров 427
ksqlDB and the Kafka Connect JDBC Sink
37:35
Просмотров 8 тыс.
ksqlDB HOWTO: Stateful Aggregates
13:56
Просмотров 2,8 тыс.
How to create a Kafka Connect connector
33:40
Kafka Connect in Action: Loading a CSV file into Kafka
43:21
From Zero to Hero with Kafka Connect by Robin Moffatt
44:41
Wait... PostgreSQL can do WHAT?
20:33
Просмотров 191 тыс.
Building your First Connector for Kafka Connect
1:18:46
Exploring the Kafka Connect REST API
20:56
Просмотров 10 тыс.