Тёмный

Kafka Connect in Action: JDBC Sink 

Robin Moffatt
Подписаться 4,2 тыс.
Просмотров 39 тыс.
50% 1

Опубликовано:

 

8 сен 2024

Поделиться:

Ссылка:

Скачать:

Готовим ссылку...

Добавить в:

Мой плейлист
Посмотреть позже
Комментарии : 109   
@user-vu4tj1ix8u
@user-vu4tj1ix8u 3 года назад
Simply the best instructor i have ever seen. Great job!
@rmoff
@rmoff 3 года назад
Thank you for your kind words! :)
@carlaguelpa4574
@carlaguelpa4574 Год назад
@rmoff, thanks for your videos, are the best! We have a situation: We have implemented Debezium with kafka, we are experience a performance issue when both, source and sink connectors, are working at the same time the consumer performance decrease significantly, here some number of what we have: 1 Kafka Cluster with 3 nodes (8GB ram -Xms: 4096m -Xmx: 4096m each) - All the topics with Replication Factor = 3 1 Zookeper Cluster with 3 nodes (4GB RAM -Xms: 2048m -Xmx: 2048m each) 1 Connect Cluster with 5 nodes (12GB RAM c/u - -Xms: 6168m -Xmx: 6168m) with 1 Partition and 1 task but we have tried with 1 / 5 / 100 partitions and 1 / 5 / 15 / 100 task 1 Source Connector per table (4 tables) 1 Sink Connector per Table (4 tables) We are not using Schema Registry The problem: the Target DB has a delay that increase when the quantity of messages is more than 750msjs per seconds. Mesage size = 6kb Now we are processing 2000 msjs of 6kb per second (This was the best performance that we get and it was enabling ONLY the sink connectors) When we enabled only the sink connectors the performance is good, the same with source conectors with no sinks, performance is great Resources are OK, source and target environments have pleanty of CPU and Memory. When we see the messages consume per second there is like a wait for 30sec and then it consume some msjs and wait and then again. Question, what can we do to improve the consumer performance? Could you help me?
@georgelza
@georgelza 4 года назад
Still learning, and so getting blown away by the amazing simple capabilities... once you've figured out how...
@jeffyeh4537
@jeffyeh4537 2 года назад
omg, Robin, your video save my internship! Thank you soooooo much!
@rmoff
@rmoff 2 года назад
Happy to help! :D
@OwenRubel
@OwenRubel 4 года назад
great bit on schemas. Going to link to this on my next video. :)
@mukeshdharajiya1261
@mukeshdharajiya1261 Год назад
Thank you so much for your best explanation 🏆
@rmoff
@rmoff 11 месяцев назад
Glad it was helpful!
@jennana8003
@jennana8003 3 года назад
Thank you for sharing!
@rmoff
@rmoff 3 года назад
My pleasure!
@timothyvandermeer9845
@timothyvandermeer9845 5 месяцев назад
FANTASTIC. thank you brother
@JonWolski
@JonWolski 4 года назад
Off-topic tip: if you pipe your logs into `less -r` instead of `more` you can get a pager that interprets the ANSI color sequences
@hamoudy41
@hamoudy41 Год назад
My data from Kafka is schema-less (string key, json values). I want to sink them to postgres. Any tips? Do I need schema registry? what converters transformers(if any) do i need?
@BenedictReydo
@BenedictReydo 2 года назад
why I keep getting empty set of tables in mysql while the connector worked? I also already make sure that I had my mysql connector on the directory
@krishnachaitanyapulagam2769
@krishnachaitanyapulagam2769 4 года назад
It’s a nice video appreciate your efforts. Can we create Key Schema as well for SOME_JASON _AS_AVRO? I have similar requirement where I need to do Upsert using JDBS Sink connector which reads from topic created out of streams.
@rmoff
@rmoff 4 года назад
Hi, the best place to ask this is on: → Slack group: cnfl.io/slack or → Mailing list: groups.google.com/forum/#!forum/confluent-platform
@dswan01
@dswan01 11 месяцев назад
So I assume you can use ksql to do transformation of data and then pass to stream to update target database. Is this correct?
@rishikesanravichandran9914
@rishikesanravichandran9914 3 года назад
So when converting plain JSON to JSON schema using KsqlDB stream, can we specify only certain fields? In my case, I have a nested JSON for more than 100 field values, so do I have to explicitly mention each of them while creating a KsqlDB stream?
@rmoff
@rmoff 3 года назад
Yes, if you want to use them. If you have 100 on the input message but only want to use 5 of them you can just specify those 5, but the remainder will be dropped from any output message that you create
@rmoff
@rmoff 3 года назад
This is why using plain JSON is kind of a pain, because you end up with the situation of having to manually enter schemas. Whatever is producing that nested JSON of 100 values should instead serialise the data onto Kafka using Avro/Protobuf/JSONSchema.
@rishikesanravichandran9914
@rishikesanravichandran9914 3 года назад
@@rmoff Thanks for the clarification. BTW, your video was very informative.
@JonathanLevinTKY
@JonathanLevinTKY 4 года назад
Can you update an existing mysql table that has more columns that the Kafka stream has - like an id/primary key?
@niklaslehmann6551
@niklaslehmann6551 2 года назад
When creating the sink connector I get "Failed to find any class that implements Connector and which name matches io.confluent.connect.jdbc.JdbcSinkConnector ..." So after already 3 min i cannot even follow your guide. Maybe there is something wrong with the docker containers?
@rmoff
@rmoff 2 года назад
It sounds like the JDBC connector isn't installed correctly in your connect worker. Did you restart it after installing it? If you're still stuck head to forum.confluent.io/ and post full details of what guide you're following and steps you've taken. Thanks.
@niklaslehmann6551
@niklaslehmann6551 2 года назад
@@rmoff Thanks that solved it. My proxy setting were blocking the download of the jdbc connector. After disabling it worked
@rmoff
@rmoff 2 года назад
@@niklaslehmann6551 Great!
@vishalmalhotra2243
@vishalmalhotra2243 3 года назад
@Robin Moffatt Is it possible to have a a mysql database table as source and we have a topic here say "sourceTopic" and then we use this "sourceTopic" in our sink curl config and the sink happens in another mysql table. Basically trying to set a table and audit table kind of scenario here.
@rmoff
@rmoff 3 года назад
I'm not quite clear - you want to read and write from the same MySQL database but from and to different tables? There's no reason why you couldn't do that, in theory. Perhaps head over to forum.confluent.io/ and ask there, with a bit more detail and context around what you're trying to do.
@vishalmalhotra2243
@vishalmalhotra2243 3 года назад
@@rmoff I have a table "books" in database motor. This is my source and for source connection I created a topic "mysql-books". So far all is good I am able to see messages on confluent platform UI. Now these messages I want to sink into another database called motor-audit so that in audit I am able to see all the changes that happened to the table "books". I have given the topic "mysql-books" in my sink curl for sink connector since changes are being published to this topic ,but this is not coming up and giving errors like - 1.This is with respect to the basic streams example you showed - Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro value schema for id 3 - 2. Error from my objective - [2021-04-28 18:14:48,231] ERROR WorkerSinkTask{id=sink-jdbc-mysql-02-json-0} Error converting message value in topic 'mysql-books' partition 0 at offset 0 and timestamp 1619602271548: Converting byte[] to Kafka Connect data failed due to serialization error: (org.apache.kafka.connect.runtime.WorkerSinkTask:547) org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
@rmoff
@rmoff 3 года назад
@@vishalmalhotra2243 Please post this at forum.confluent.io/ and I can help you there :)
@MrNiceseb
@MrNiceseb 4 года назад
Any changes needed for connection to Postgres instead of mysql?
@rmoff
@rmoff 4 года назад
You will need to amend the JDBC URL, as well as making sure that the Postgres JDBC driver is installed.
@dmitriisergeev306
@dmitriisergeev306 3 года назад
@@rmoff you show on 6:44 that we have already Postgres driver. Is it not enough ?
@rmoff
@rmoff 3 года назад
@@dmitriisergeev306 Correct, the Postgres JDBC Driver ships with it, so you should not need to install it again.
@vishnumurali522
@vishnumurali522 4 года назад
Hi Robin I understand this tutorial clearly..it is very clear and easy to understand....But I am having a doubt. U r using Ksql to create a stream from existing topic and change it to type Avro and create a new topic I am not using KSQL so how can I do this without creating streams..
@rmoff
@rmoff 4 года назад
If you're not using ksqlDB and you want to apply a schema to the data you'll need to find another route, such as writing your own Kafka Streams application to do this. That's why it's always better to get the *producer* of the data to use a serialisation method that supports schemas (Avro, Protobuf, etc).
@vishnumurali522
@vishnumurali522 4 года назад
@@rmoff so as u said I am having my springboot application and configure that to use AvroSerializer for value converter thn if I send normal json data it will store in DB? Or else we need to add something else?
@rmoff
@rmoff 4 года назад
@@vishnumurali522 Yes, if you write Avro from your source application then you can use the AvroConverter in the JDBC sink and it will work. If you've got any more questions then head over to the #clients or #connect channel on cnfl.io/slack :-)
@shikhachawla7259
@shikhachawla7259 4 года назад
Thanks for sharing it and I am following your videos a lot for learning Kafka, I have tried setting up JDBC sink connector to insert into SQL server with the batch size of 500, but it inserts into SQL server one by one rather than in batches which has a negative impact on SQL Server IO, Is something you can suggest to get the batch insertion working?Will look forward to your response. Thanks
@rmoff
@rmoff 4 года назад
Hi, the best place to ask this is on: → Mailing list: groups.google.com/forum/#!forum/confluent-platform or → Slack group: cnfl.io/slack
@kiran0711
@kiran0711 4 года назад
Hi Robin, I am a great fan of your KAFKA JDBC source and sink connector. We are right now facing a challenge where in using the KAFKA JDBC Connectors we are unable to connect to ORACLE Database which is in Cloud Kerberoized Environment . Any video or details would be a great help .
@rmoff
@rmoff 4 года назад
Hi Kiran, this isn't a configuration I've tried, sorry. You could try asking at cnfl.io/slack.
@heinhtetzaw9463
@heinhtetzaw9463 Год назад
When I create a stream with Avro format, I m getting Unable to create schema from topic, Connection reset error.
@rmoff
@rmoff Год назад
hi, the best place to get help is at www.confluent.io/en-gb/community/ask-the-community/ :)
@Pinaldba
@Pinaldba 4 месяца назад
@Robin, I have Azure Synapse Sync JDBC driver kafka-connect-azure-sql-dw-1.0.7.jar in the path /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc but still I get error - Caused by: java.sql.SQLException: No suitable driver found for "jdbc:sqlserver I have tried all possible options no of the option worked.
@aidangmccarthy
@aidangmccarthy 4 года назад
Hi Robin, Thanks for sharing. Can you use the Protobuff format from 5.5 instead of Avro? Also for MySQL, I notice you are using version 8. Will this also work on MySQL 5.6 or 5.7? Many thanks, Aidan
@rmoff
@rmoff 4 года назад
Yes as of Confluent Platform 5.5 you can use Avro, Protobuf, or JSON Schema It'll work fine with earlier version of mySQL too.
@sanket386
@sanket386 Год назад
How can i create a schema with payload for nested json data?
@naironviana
@naironviana 3 года назад
Hi! I'm having trouble working with timestamp (date) values. Can't reflect changes in sink table if source table has a timestamp (with current_timestamp default). Is there any specific transformation to set in sink/source connectors to solve this problem? Thanks!
@rmoff
@rmoff 3 года назад
Hi, the best place to ask this is forum.confluent.io/ :)
@shashanksrivastava5654
@shashanksrivastava5654 3 года назад
Hi Robin, Is it possible to save single topic data into multiple table of Postgres
@rmoff
@rmoff 3 года назад
Yes, you can create multiple connectors reading from the same topic and route it to different tables using an Single Message Transform like RegexRouter (see rmoff.net/2020/12/11/twelve-days-of-smt-day-4-regexrouter/)
@panjisadewo4891
@panjisadewo4891 2 года назад
iam using jdbc connector sink from confluent developer how to setting this? please help
@rmoff
@rmoff 2 года назад
Please ask this question at forum.confluent.io/. Thanks!
@vinaygold20
@vinaygold20 2 года назад
Hi Robin, regarding 'blacklist : "COL2"'. It is loading NULL incase of insert. BUT, IF i need to load the value while inserting and DO NOT want to update. then ? what should be the configuration ?
@rmoff
@rmoff 2 года назад
Please ask this question at forum.confluent.io/. Thanks!
@MahmoudShash
@MahmoudShash 3 года назад
hi @Robin just asking if the kafka connect works with Cloudera kafka and if I can use it in Production.
@rmoff
@rmoff 3 года назад
Kafka Connect is part of Apache Kafka. It's widely used in Production. I'm not familiar with what Cloudera support; you'd need to check with them.
@user-ow6jk5ix5q
@user-ow6jk5ix5q 4 года назад
Is there a good way to diagnose why a connector becomes degraded? Is it ok to use a symlink to put the jar under the jdbc drivers folder?
@rmoff
@rmoff 4 года назад
1. Diagnosing connector issues - look at the Kafka Connect worker log 2. symlink - I don't know, try it :)
@rmoff
@rmoff 4 года назад
For further help, head to #connect on cnfl.io/slack
@SirKober
@SirKober 2 года назад
Isn't there a way to make a JDBC Sink perform an INSERT IGNORE? If I use mode insert I always get duplicate errors. Can it really be that this case has not been considered? (MySQL dialect)
@rmoff
@rmoff 2 года назад
I'm not aware of this being an option. Does it work if you use `upsert` instead?
@thetrilbies1
@thetrilbies1 2 года назад
I tried to replicate the steps in 1:51. When inserting a row into TEST01, I get a column name ROWKEY does not exists.
@arunbhat105
@arunbhat105 Год назад
same error for me too. were u able to solve the issue ?
@sarathbaiju6040
@sarathbaiju6040 2 года назад
Hi i have question regarding the automatic table creation in the sink connector, how we can define the custom colum name the sink connector configuration. for eg: by default the column name is after_userId, after_firstName . i want to change it to UserId and FirstName. how we can do this in connector configuration?
@rmoff
@rmoff 2 года назад
Hi Sarath, this is a great question to ask over at forum.confluent.io/ :)
@sarathbaiju6040
@sarathbaiju6040 2 года назад
@@rmoffthanks for the reply, i got the answer. I use transform (smt) to achieve my requirement
@talahootube
@talahootube 4 года назад
Great video, many thanks. May i get your document in reference of this? Thx
@rmoff
@rmoff 4 года назад
You can find the JDBC Sink connector docs here: rmoff.dev/01r
@talahootube
@talahootube 4 года назад
@@rmoff ok, thanks ya
@shuchikumari8031
@shuchikumari8031 2 года назад
Can you plz help me reagarding how to connect mysql to kafka. If it is possible can you plz share any documentation Or link regarding this. Thank you .
@JordanCricketMoore
@JordanCricketMoore 2 года назад
You may want to start at the Debezium documentation
@rmoff
@rmoff 2 года назад
If you want to get data from MySQL into Kafka then check out rmoff.dev/no-more-silos
@nirmaladesai6964
@nirmaladesai6964 3 года назад
Do we need any settings in oracle database to start with?
@rmoff
@rmoff 3 года назад
You'll need a user with the appropriate permissions to write to the table(s). If you've got any more questions then head over to forum.confluent.io/ :)
@prateekshajoshi4539
@prateekshajoshi4539 2 года назад
How to connect Kafka with MySQL database?
@rmoff
@rmoff 2 года назад
You're on the right video! The Kafka Connect JDBC Sink enables you to stream data from Kafka to MySQL. If you have more questions do head over to forum.confluent.io/
@dmitrysergeev1845
@dmitrysergeev1845 3 года назад
For me it is not working . I have sink-jdbc-mysql-01 | SINK | io.confluent.connect.jdbc.JdbcSinkConnector | WARNING (0/1 tasks RUNNING) . How i can fix ?
@rmoff
@rmoff 3 года назад
You need to look at the Kafka Connect worker log to find out the actual error. RU-vid comment threads aren't the easiest place to help debug, so head over to #connect on cnfl.io/slack
@vignesh9458
@vignesh9458 3 года назад
@Robin Moffatt Is it possible to store all the kafka data to a single column as a json or AVRO ? can you please help?
@rmoff
@rmoff 3 года назад
Do you mean push the complex value of the Kafka message into a single target field in the database? If so, no I don't think this is supported. For more questions, head to forum.confluent.io/
@vignesh9458
@vignesh9458 3 года назад
@@rmoff exactly Robin. My requirement is to read the Json data from kafka and store it exactly as it is in a single column
@rmoff
@rmoff 3 года назад
@@vignesh9458 I think you would need to pre-process the topic and embed the message in a field first (since the data has to be serialised with a schema). If you would like to post this as a question over at forum.confluent.io/ I can try and answer properly there.
@vignesh9458
@vignesh9458 3 года назад
@@rmoff sure I will post there.
@sumitpratap999
@sumitpratap999 3 года назад
How to use , "pk.mode":"none", using DB sequence?
@rmoff
@rmoff 3 года назад
@Sumit Can you post full details of your question to forum.confluent.io? I can answer it over there. Thanks!
@sumitpratap999
@sumitpratap999 3 года назад
@@rmoff Done, "JDBC Sinc connector using DB Sequence for Primary key". Thanks for reply.
@abhrasarkar8040
@abhrasarkar8040 3 года назад
Hello sir, I couldn't find my 'mysql-connector-java-8.0.20.jar' in this folder 'confluent-hub-components/confluentinc-kafka-connect-jdbc'
@rmoff
@rmoff 3 года назад
You need to install it yourself. Have a look at rmoff.dev/fix-jdbc-driver-video.
@abhrasarkar8040
@abhrasarkar8040 3 года назад
@@rmoff I've downloaded that jdbc connector and copied into that particular path using this command 'docker cp /home/foldername/Downloads/mysql-connector-java-8.0.22/. kafka-connect:/usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/mysql-connector-java-8.0.22.' i'm not getting any error message but mysql db is not updating and also my connector is showing 'warning 0/1' status. Is there any chat interface where i can have a brief discussion about this with you? I am new to kafka. please help. Thanks in advance.
@rmoff
@rmoff 3 года назад
​@@abhrasarkar8040 The best place to ask any further questions is: → Slack group: cnfl.io/slack or → Mailing list: groups.google.com/forum/#!forum/confluent-platform
@abhrasarkar8040
@abhrasarkar8040 3 года назад
@@rmoff Thank you so much. I think i found the problem. This link is not working 'cdn.mysql.com/Downloads/Connector-J/mysql-connector-java-8.0.19.tar.gz'. So I'm getting the file from this link 'dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.22.tar.gz'. Is it fine?
@abhrasarkar8040
@abhrasarkar8040 3 года назад
@@rmoff Hello Robin! I am getting this error " java.sql.SQLSyntaxErrorException: Access denied for user 'connect_user'@'%' to database 'demo'". Can you please help me out with this?
@ashrafkhaled9790
@ashrafkhaled9790 4 года назад
How to do this with java
@rmoff
@rmoff 4 года назад
That's the point - you wouldn't. Kafka Connect is the integration API for Apache Kafka, and what you should generally use for getting data in and out of Kafka from systems such as RDBMS. Learn more: rmoff.dev/ljc-kafka-02
@antoxin
@antoxin 4 года назад
Very fast English but anyway thanks for informative video.
@rmoff
@rmoff 4 года назад
Glad it was useful - sorry if it's not always easy to follow :)
@coltennabers634
@coltennabers634 3 года назад
ur keyboard noises are driving me crazy dude fix ur mic
@rmoff
@rmoff 3 года назад
Yeah, it's kinda clackity isn't it. I've got a quieter one since I filmed that video :)
@robertoojeda3263
@robertoojeda3263 Год назад
Hello Robin! Good Video, spanish : Tienes un video que se use un topico Dead Letter Queue donde guarde varios mensajes con diferentes schemas ,mejor dicho que use las propiedades confluent.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy y TopicRecordNameStrategy
@AashishOla
@AashishOla 2 года назад
I need your help regarding kafka connect jdbc sink performance issue. I have sent you a message on twitter. Looking forward to your response
@amrish331
@amrish331 2 года назад
Hi how to handle if you have array in avro schema with jdbcsinkconnector ? I am stuck with this . Please help
@rmoff
@rmoff 2 года назад
Hi, please post this at forum.confluent.io/ :)
@AashishOla
@AashishOla 2 года назад
I need your help regarding kafka connect jdbc sink performance issue. I have sent you a message on twitter. Looking forward to your response
@rmoff
@rmoff 2 года назад
The best place to ask is www.confluent.io/en-gb/community/ask-the-community/
Далее
From Zero to Hero with Kafka Connect
33:49
Просмотров 28 тыс.
POV: Your kids ask to play the claw machine
00:20
Просмотров 9 млн
Прохожу маску ЭМОЦИИ🙀 #юмор
00:59
I've been using Redis wrong this whole time...
20:53
Просмотров 356 тыс.
Kafka Connect Iceberg Sink
40:20
Просмотров 708
ksqlDB and the Kafka Connect JDBC Sink
37:35
Просмотров 8 тыс.
Debezium in Trendyol
53:20
Просмотров 4 тыс.
7 Database Design Mistakes to Avoid (With Solutions)
11:29
Kafka Connect in Action: Elasticsearch
1:05:16
Просмотров 23 тыс.
POV: Your kids ask to play the claw machine
00:20
Просмотров 9 млн