Robin has been speaking at conferences since 2009 including QCon, Devoxx, Strata, Kafka Summit, and Øredev. You can find many of his talks online at rmoff.net/talks/, and his blog articles at rmoff.net/. Outside of work he enjoys drinking good beer and eating fried breakfasts, although generally not at the same time.
Hello Robin I was trying following your video but the voluble connector is not longer supported. So the step in 9 min is not possible to follow. Can you give another options to produce data like that connector. I also was trying to use the kafka-console-producer command but I could not get data inserted into my bucket yet.
Robin i need your help. I´m new on development with kafka. So i have kafka connect with cdc as consumer and producer with jdbc connect sink. I can to to a upsert correctly, but i can´t to do work the delete operation. It is possible to use jdbc connect sink for make work all operations like insert, update and delete...? Can you help me please with a example kafka connect sql server to sql server without use debezium?
create stream test02 (col1 int, col2 varchar) with (kafka_topic='test02', partitions=1, value_format='AVRO'); only returned col1 and col2, i don't see the default rowtime and rowkey ksql 7.3.2
realized this is a "old'ish" video... you dont show at any time how you started your kafkacat container, also of course now kafkacat has been replaced/renamed to kcat
Hi, Is there a way to modify the payload as needed? For example, if the kafka message is {"C":1, "C2":"Test"}, In elastic search, the document need to be created as {"C":1, "searchFields":{"C2":"Test"}} Is there a way to achieve this?
why why oh why have i not found this before.... would have saved me so much grieve, hehehe... now to figure out why my local cp kafka connect does not want to sink to MongoAtlas hosted collection...
Awesome content Robin. Thanks. One question, if possible. In a topic where I can't guarantee the order of the messages, does changing timestamp for TABLE, solves the problem with late messages? The table records should reflect the last event, based not on the topic arrival, but in the event it self ...
Just wondering if there is already a video of creating a custom partionner and use it in my connector? I've been struggling to day to implement that since I'm not a Java guy .. thanks
@Robin, I have Azure Synapse Sync JDBC driver kafka-connect-azure-sql-dw-1.0.7.jar in the path /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc but still I get error - Caused by: java.sql.SQLException: No suitable driver found for "jdbc:sqlserver I have tried all possible options no of the option worked.
Hi Robin, I am a software engineer at a startup. Last year we build a pipeline to sync our postgres data to elasticsearch and cassandra. It was all custom java code with lot of operational handling. Thank you for this video, I am planning to use connect for those pipelines.
I tried in MongoSinkConnector, I am seeing a failure with this message DataException: Only Map objects supported in absence of schema for [field insertion], fund java.lang.String \t in requireMap. Correct me, do I need to use value.convertor with value as JsonConvertor?
I'm attempting to create a stream from a multi schema topic, similar to the train example at the end of this video. I would then split into multiple streams/topics for each message type. However, I can't seem to create the stream in a way that it's populated with the messages from the multi schema topic. Is there an actual example you can reference for this scenario?
Thank you Robin for excellent explanation. In all your examples you're using with auto.create option with true , where I was trying to auto.create with false where I am creating the table before hand but I am getting error as below. I have tried all possible option. Will you able to help . Thanks in advance. Caused by: io.confluent.connect.jdbc.sink.TableAlterOrCreateException: Table "table_name" is missing and auto-creation is disabled
Thank you for tutorials Robin . If the schema changes frequently based on business requirements then each time we have to drop the stream and create it . Please let me know
Hi @Robin Moffatt, Many thank you. Request clarification if in my below example, I'm thinking of Kafka implementation correctly please: Say a train route is A-B-C-D-E-F-G-H. Train 1 starts at station A and stops only in stations C and F to reach destination H. In the above, in the dashboard display on stations C, F and H, if we want to display where is the train?(Example: train has already left station A and is between station B and C). To display the highly volatile information when the train is on the move, we use Apache Streams API? In other words, as kafka updates the topic (where is the train currently), the implementation should call subscribe() and poll() methods to pull data from partition every so often and display the message in real-time "Train is presently between station B and C and is running 6 minutes late". I shall much appreciate your confirmation that I am thinking of a right example please.
Hi, yes I think you would use some stream processing logic to do this, I guess with some kind of timer to trigger since there'd be no event as such until it was next received from the source.
Hi @@rmoff Thank you. I shall be grateful to confirm kafka streaming is actually about an event that could change it's state/value over a period of time. In other words, the event here is "where is the train". The dashboard displaying between station B and C and later between C and D. Due to arrive in 5 minutes and later due in 2 min/1 min. Shall I be right in my assumption: Kafka streaming is used for streaming the data pertaining to an event which changes state and value every so often please? Many thank you.
@@raghavankasthuri2908 You'd probably model this in a way that each train is emitting events regularly, or when they arrive in stations. Kafka streams (or any other stream processor) would subscribe to those stream events and emit arrival time estimations for all the relevant stations.
Very interesting series of videos. Very helpful. A little remark: at 38:58, it seems that the order value to be inserted was way higher that the currently displayed maximum (22190899.73 vs 216233.09) and still this value was not updated.
Why every one using conflunet kafka thsi and that, I wanted to do it in production and confluent kafka is not open source. Can anyone suggest any article or video to refer, I want to load csv or json file to kafka as a table.
My data from Kafka is schema-less (string key, json values). I want to sink them to postgres. Any tips? Do I need schema registry? what converters transformers(if any) do i need?
Hi Robin, I'm working with Strimzi Kafka on openshift, and when I pulled the images for KSQL server and KSQL cli with 0.15.0 versions I got this error while writing any command.: The server has encountered an incompatible entry in its log and cannot process further DDL statements. This is most likely due to the service being rolled back to an earlier version. and the problem that when I'm using newer versions or the latest version I got the below error when I'm trying to identify fields as KEY or PRIMARY KEY: KSQL currently only supports KEY columns named ROWKEY, extraneous input 'PRIMARY.' so im struggiling to deal with keys and i have to go live soon so please kindly your support. Thanks in advance.
Thanks for the video. Though it is about 2 years old, I still find it useful. However, any idea why the error "Failed to create connector: {"error_code":409,"message":"Connector SINK_FOO_01_0 already exists"}" when I run for the first time, CREATE SINK CONNECTOR SINK_FOO_01_0 WITH (...) following your steps? Everything else worked up to that point.
@@RobinNMoffatt to implement outbox pattern from sql within a transaction. So it would mean for example with postgres to use pg_logical_emit_messages function to store the message at the same time as storing an entity into a table. But all this from ksql or flinksql. Does it make sense ?
Hello Robin! Good Video, spanish : Tienes un video que se use un topico Dead Letter Queue donde guarde varios mensajes con diferentes schemas ,mejor dicho que use las propiedades confluent.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy y TopicRecordNameStrategy
Hi Robin, Awesome video! I really appreciate this. I was wondering, I did these two smts and had JSON_SR configured everywhere. but the key was always prepended with a \" and appended with an "\ between the quotes. What is causing this
"Great video, [Robin]! I really appreciated the clear and concise way you explained concepts . Your examples were also really helpful in solidifying my understanding of the kafka use case. Overall, I think you did an excellent job and I look forward to seeing more of your videos in the future. Keep up the great work!"
@rmoff, thanks for your videos, are the best! We have a situation: We have implemented Debezium with kafka, we are experience a performance issue when both, source and sink connectors, are working at the same time the consumer performance decrease significantly, here some number of what we have: 1 Kafka Cluster with 3 nodes (8GB ram -Xms: 4096m -Xmx: 4096m each) - All the topics with Replication Factor = 3 1 Zookeper Cluster with 3 nodes (4GB RAM -Xms: 2048m -Xmx: 2048m each) 1 Connect Cluster with 5 nodes (12GB RAM c/u - -Xms: 6168m -Xmx: 6168m) with 1 Partition and 1 task but we have tried with 1 / 5 / 100 partitions and 1 / 5 / 15 / 100 task 1 Source Connector per table (4 tables) 1 Sink Connector per Table (4 tables) We are not using Schema Registry The problem: the Target DB has a delay that increase when the quantity of messages is more than 750msjs per seconds. Mesage size = 6kb Now we are processing 2000 msjs of 6kb per second (This was the best performance that we get and it was enabling ONLY the sink connectors) When we enabled only the sink connectors the performance is good, the same with source conectors with no sinks, performance is great Resources are OK, source and target environments have pleanty of CPU and Memory. When we see the messages consume per second there is like a wait for 30sec and then it consume some msjs and wait and then again. Question, what can we do to improve the consumer performance? Could you help me?
@rmoff, thanks for your videos, are the best! We have a situation: We have implemented Debezium with kafka, we are experience a performance issue when both, source and sink connectors, are working at the same time the consumer performance decrease significantly, here some number of what we have: 1 Kafka Cluster with 3 nodes (8GB ram -Xms: 4096m -Xmx: 4096m each) - All the topics with Replication Factor = 3 1 Zookeper Cluster with 3 nodes (4GB RAM -Xms: 2048m -Xmx: 2048m each) 1 Connect Cluster with 5 nodes (12GB RAM c/u - -Xms: 6168m -Xmx: 6168m) with 1 Partition and 1 task but we have tried with 1 / 5 / 100 partitions and 1 / 5 / 15 / 100 task 1 Source Connector per table (4 tables) 1 Sink Connector per Table (4 tables) We are not using Schema Registry The problem: the Target DB has a delay that increase when the quantity of messages is more than 750msjs per seconds. Mesage size = 6kb Now we are processing 2000 msjs of 6kb per second (This was the best performance that we get and it was enabling ONLY the sink connectors) When we enabled only the sink connectors the performance is good, the same with source conectors with no sinks, performance is great Resources are OK, source and target environments have pleanty of CPU and Memory. When we see the messages consume per second there is like a wait for 30sec and then it consume some msjs and wait and then again. Question, what can we do to improve the consumer performance? Could you help us?