Тёмный
Robin Moffatt
Robin Moffatt
Robin Moffatt
Подписаться
Robin has been speaking at conferences since 2009 including QCon, Devoxx, Strata, Kafka Summit, and Øredev. You can find many of his talks online at rmoff.net/talks/, and his blog articles at rmoff.net/. Outside of work he enjoys drinking good beer and eating fried breakfasts, although generally not at the same time.
ksqlDB HOWTO: Handling Time
9:08
3 года назад
ksqlDB HOWTO: Stateful Aggregates
13:56
3 года назад
ksqlDB HOWTO: Joins
10:23
3 года назад
ksqlDB HOWTO: Schema Manipulation
10:56
3 года назад
ksqlDB HOWTO: Filtering
9:25
3 года назад
Exploring the Kafka Connect REST API
20:56
3 года назад
From Zero to Hero with Kafka Connect
33:49
3 года назад
Kafka Connect in 60 seconds
1:01
3 года назад
Комментарии
@jorgeluiscarhuaricaaguilar4849
Hello Robin I was trying following your video but the voluble connector is not longer supported. So the step in 9 min is not possible to follow. Can you give another options to produce data like that connector. I also was trying to use the kafka-console-producer command but I could not get data inserted into my bucket yet.
@rmoff
@rmoff 21 час назад
Check out shadowtraffic.io - it's what Voluble was initially conceived as.
@walterferreiradossantos2378
@walterferreiradossantos2378 2 дня назад
Robin i need your help. I´m new on development with kafka. So i have kafka connect with cdc as consumer and producer with jdbc connect sink. I can to to a upsert correctly, but i can´t to do work the delete operation. It is possible to use jdbc connect sink for make work all operations like insert, update and delete...? Can you help me please with a example kafka connect sql server to sql server without use debezium?
@jeremykenn
@jeremykenn 20 дней назад
create stream test02 (col1 int, col2 varchar) with (kafka_topic='test02', partitions=1, value_format='AVRO'); only returned col1 and col2, i don't see the default rowtime and rowkey ksql 7.3.2
@georgelza
@georgelza Месяц назад
realized this is a "old'ish" video... you dont show at any time how you started your kafkacat container, also of course now kafkacat has been replaced/renamed to kcat
@Saikrishnabollabathini
@Saikrishnabollabathini Месяц назад
Hi, Is there a way to modify the payload as needed? For example, if the kafka message is {"C":1, "C2":"Test"}, In elastic search, the document need to be created as {"C":1, "searchFields":{"C2":"Test"}} Is there a way to achieve this?
@user-ld8op1lb1p
@user-ld8op1lb1p Месяц назад
So Amazing
@georgelza
@georgelza Месяц назад
why why oh why have i not found this before.... would have saved me so much grieve, hehehe... now to figure out why my local cp kafka connect does not want to sink to MongoAtlas hosted collection...
@user-zc7vb8mc8s
@user-zc7vb8mc8s Месяц назад
Awesome content Robin. Thanks. One question, if possible. In a topic where I can't guarantee the order of the messages, does changing timestamp for TABLE, solves the problem with late messages? The table records should reflect the last event, based not on the topic arrival, but in the event it self ...
@amuse_me
@amuse_me Месяц назад
Where can I find the documentation that you used? I really appreciate any help you can provide.
@LMGaming0
@LMGaming0 2 месяца назад
Just wondering if there is already a video of creating a custom partionner and use it in my connector? I've been struggling to day to implement that since I'm not a Java guy .. thanks
@Pinaldba
@Pinaldba 2 месяца назад
@Robin, I have Azure Synapse Sync JDBC driver kafka-connect-azure-sql-dw-1.0.7.jar in the path /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc but still I get error - Caused by: java.sql.SQLException: No suitable driver found for "jdbc:sqlserver I have tried all possible options no of the option worked.
@AjayMatai
@AjayMatai 2 месяца назад
Is there a Kafka connector that would read from Kafka, do some transforms and write back to another Kafka topic?
@LMGaming0
@LMGaming0 2 месяца назад
4 years later, still usefull
@shibimukesh9369
@shibimukesh9369 2 месяца назад
Using mongodb sink connector tried to flatten message but iam getting error
@shibimukesh9369
@shibimukesh9369 2 месяца назад
How to handle nested document..only root level is working
@vivekshah1664
@vivekshah1664 3 месяца назад
Hi Robin, I am a software engineer at a startup. Last year we build a pipeline to sync our postgres data to elasticsearch and cassandra. It was all custom java code with lot of operational handling. Thank you for this video, I am planning to use connect for those pipelines.
@timothyvandermeer9845
@timothyvandermeer9845 3 месяца назад
FANTASTIC. thank you brother
@Abhijit_Patra
@Abhijit_Patra 3 месяца назад
I am trying to do this but I get a null value.
@chintanthakker4129
@chintanthakker4129 3 месяца назад
I tried in MongoSinkConnector, I am seeing a failure with this message DataException: Only Map objects supported in absence of schema for [field insertion], fund java.lang.String \t in requireMap. Correct me, do I need to use value.convertor with value as JsonConvertor?
@user-qd4nw5mi6y
@user-qd4nw5mi6y 3 месяца назад
I'm attempting to create a stream from a multi schema topic, similar to the train example at the end of this video. I would then split into multiple streams/topics for each message type. However, I can't seem to create the stream in a way that it's populated with the messages from the multi schema topic. Is there an actual example you can reference for this scenario?
@wongsiewwai
@wongsiewwai 4 месяца назад
thank you.
@nesreenmohd665
@nesreenmohd665 4 месяца назад
Thanks
@sivakumarm1381
@sivakumarm1381 5 месяцев назад
Thank you Robin for excellent explanation. In all your examples you're using with auto.create option with true , where I was trying to auto.create with false where I am creating the table before hand but I am getting error as below. I have tried all possible option. Will you able to help . Thanks in advance. Caused by: io.confluent.connect.jdbc.sink.TableAlterOrCreateException: Table "table_name" is missing and auto-creation is disabled
@Agrossio
@Agrossio 7 месяцев назад
Great Video!! Will this work for processing a csv with 1.000.000 registries ?? Would it last less than an hour to save it in an Oracle Database??
@karthikb.s.k.4486
@karthikb.s.k.4486 7 месяцев назад
Thank you for tutorials Robin . If the schema changes frequently based on business requirements then each time we have to drop the stream and create it . Please let me know
@raghavankasthuri2908
@raghavankasthuri2908 9 месяцев назад
Hi @Robin Moffatt, Many thank you. Request clarification if in my below example, I'm thinking of Kafka implementation correctly please: Say a train route is A-B-C-D-E-F-G-H. Train 1 starts at station A and stops only in stations C and F to reach destination H. In the above, in the dashboard display on stations C, F and H, if we want to display where is the train?(Example: train has already left station A and is between station B and C). To display the highly volatile information when the train is on the move, we use Apache Streams API? In other words, as kafka updates the topic (where is the train currently), the implementation should call subscribe() and poll() methods to pull data from partition every so often and display the message in real-time "Train is presently between station B and C and is running 6 minutes late". I shall much appreciate your confirmation that I am thinking of a right example please.
@rmoff
@rmoff 9 месяцев назад
Hi, yes I think you would use some stream processing logic to do this, I guess with some kind of timer to trigger since there'd be no event as such until it was next received from the source.
@raghavankasthuri2908
@raghavankasthuri2908 9 месяцев назад
Hi @@rmoff Thank you. I shall be grateful to confirm kafka streaming is actually about an event that could change it's state/value over a period of time. In other words, the event here is "where is the train". The dashboard displaying between station B and C and later between C and D. Due to arrive in 5 minutes and later due in 2 min/1 min. Shall I be right in my assumption: Kafka streaming is used for streaming the data pertaining to an event which changes state and value every so often please? Many thank you.
@RobertMetzger
@RobertMetzger 8 месяцев назад
@@raghavankasthuri2908 You'd probably model this in a way that each train is emitting events regularly, or when they arrive in stations. Kafka streams (or any other stream processor) would subscribe to those stream events and emit arrival time estimations for all the relevant stations.
@raghavankasthuri2908
@raghavankasthuri2908 8 месяцев назад
@@RobertMetzger many thank you. much appreciated.
@dswan01
@dswan01 9 месяцев назад
So I assume you can use ksql to do transformation of data and then pass to stream to update target database. Is this correct?
@nejbahadnane7517
@nejbahadnane7517 9 месяцев назад
Thanks 🙏 great video
@rmoff
@rmoff 9 месяцев назад
Thanks, glad you liked it :)
@sdon1011
@sdon1011 10 месяцев назад
Very interesting series of videos. Very helpful. A little remark: at 38:58, it seems that the order value to be inserted was way higher that the currently displayed maximum (22190899.73 vs 216233.09) and still this value was not updated.
@shibilpm9873
@shibilpm9873 10 месяцев назад
Why every one using conflunet kafka thsi and that, I wanted to do it in production and confluent kafka is not open source. Can anyone suggest any article or video to refer, I want to load csv or json file to kafka as a table.
@user-br1tt1wf7v
@user-br1tt1wf7v 11 месяцев назад
Great stuff , You help me understand it easily . Thx
@hamoudy41
@hamoudy41 11 месяцев назад
My data from Kafka is schema-less (string key, json values). I want to sink them to postgres. Any tips? Do I need schema registry? what converters transformers(if any) do i need?
@mukeshdharajiya1261
@mukeshdharajiya1261 11 месяцев назад
Thank you so much for your best explanation 🏆
@rmoff
@rmoff 9 месяцев назад
Glad it was helpful!
@MohamedAyman-wu6gk
@MohamedAyman-wu6gk 11 месяцев назад
Hi Robin, I'm working with Strimzi Kafka on openshift, and when I pulled the images for KSQL server and KSQL cli with 0.15.0 versions I got this error while writing any command.: The server has encountered an incompatible entry in its log and cannot process further DDL statements. This is most likely due to the service being rolled back to an earlier version. and the problem that when I'm using newer versions or the latest version I got the below error when I'm trying to identify fields as KEY or PRIMARY KEY: KSQL currently only supports KEY columns named ROWKEY, extraneous input 'PRIMARY.' so im struggiling to deal with keys and i have to go live soon so please kindly your support. Thanks in advance.
@rmoff
@rmoff 11 месяцев назад
Hi, A good place to go for help is www.confluent.io/en-gb/community/ask-the-community/. thanks.
@piusono
@piusono 11 месяцев назад
Thanks for the video. Though it is about 2 years old, I still find it useful. However, any idea why the error "Failed to create connector: {"error_code":409,"message":"Connector SINK_FOO_01_0 already exists"}" when I run for the first time, CREATE SINK CONNECTOR SINK_FOO_01_0 WITH (...) following your steps? Everything else worked up to that point.
@rmoff
@rmoff 11 месяцев назад
Hi, I don't work with Kafka directly anymore. A good place to go for help is www.confluent.io/en-gb/community/ask-the-community/. thanks.
@scr33tch
@scr33tch Год назад
Thanks Robin. Your videos are by far the best, most detailed kafka resources.
@rmoff
@rmoff 9 месяцев назад
Thank you! :)
@marcorossi2268
@marcorossi2268 Год назад
What if i want to shred the json into submessages and publish each section to a different topic that ends into a different destanation table
@rmoff
@rmoff Год назад
Hi, I don't work with Kafka directly anymore. A good place to go for help is www.confluent.io/en-gb/community/ask-the-community/. thanks.
@Evkayne
@Evkayne Год назад
thank you
@user-rd4ih1uw8p
@user-rd4ih1uw8p Год назад
Great tutorial and very helpful! Thanks for this
@emmanuelharel
@emmanuelharel Год назад
Hello, does anyone know if it is possible to write to the WAL of a postgres database from ksql ?
@emmanuelharel
@emmanuelharel Год назад
@@RobinNMoffatt to implement outbox pattern from sql within a transaction. So it would mean for example with postgres to use pg_logical_emit_messages function to store the message at the same time as storing an entity into a table. But all this from ksql or flinksql. Does it make sense ?
@robertoojeda3263
@robertoojeda3263 Год назад
Hello Robin! Good Video, spanish : Tienes un video que se use un topico Dead Letter Queue donde guarde varios mensajes con diferentes schemas ,mejor dicho que use las propiedades confluent.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy y TopicRecordNameStrategy
@alexandersk.8963
@alexandersk.8963 Год назад
Amazing materials for me, Robin, thank you a lot!
@tanakpek6270
@tanakpek6270 Год назад
Hi Robin, Awesome video! I really appreciate this. I was wondering, I did these two smts and had JSON_SR configured everywhere. but the key was always prepended with a \" and appended with an "\ between the quotes. What is causing this
@rmoff
@rmoff Год назад
Hi, glad you liked the video! a good place to go for help is www.confluent.io/en-gb/community/ask-the-community/. thanks.
@rushij6874
@rushij6874 Год назад
"Great video, [Robin]! I really appreciated the clear and concise way you explained concepts . Your examples were also really helpful in solidifying my understanding of the kafka use case. Overall, I think you did an excellent job and I look forward to seeing more of your videos in the future. Keep up the great work!"
@mathiasyeremiaaryadi9097
@mathiasyeremiaaryadi9097 Год назад
Can we make the insert query persistent ? So then it will be running automatically, not at that time when we write the query
@bonolomokgadi9025
@bonolomokgadi9025 Год назад
Hi Robin. I'm seeing the below error. Key converter is in StringConverter "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler \tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:220) \tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:142) \tat org.apache.kafka.connect.runtime.TransformationChain.transformRecord(TransformationChain.java:70) \tat org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50) \tat org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:357) \tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:271) \tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200) \tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255) \tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) \tat java.util.concurrent.FutureTask.run(FutureTask.java:266) \tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) \tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) \tat java.lang.Thread.run(Thread.java:750) Caused by: org.apache.kafka.connect.errors.DataException: Field does not exist: ENTITY_NO \tat org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:89) \tat org.apache.kafka.connect.transforms.ValueToKey.apply(ValueToKey.java:67) \tat org.apache.kafka.connect.runtime.TransformationChain.lambda$transformRecord$0(TransformationChain.java:70) \tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:166) \tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:200) \t... 12 more " Below is the config "key.ignore":"false", "transforms": "CastToInt64,CastToString,copyIdToKey,extractKeyFromStruct", "transforms.copyIdToKey.type": "org.apache.kafka.connect.transforms.ValueToKey", "transforms.copyIdToKey.fields": "ENTITY_NO", "transforms.extractKeyFromStruct.type": "org.apache.kafka.connect.transforms.ExtractField$Key", "transforms.extractKeyFromStruct.field": "ENTITY_NO", "transforms.CastToInt64.spec": "REQUESTING_ENTITY_NO:int64", "transforms.CastToInt64.type": "org.apache.kafka.connect.transforms.Cast$Value", "transforms.CastToString.spec": "REQUESTING_ENTITY_NO:string", "transforms.CastToString.type": "org.apache.kafka.connect.transforms.Cast$Value",
@carlaguelpa4574
@carlaguelpa4574 Год назад
@rmoff, thanks for your videos, are the best! We have a situation: We have implemented Debezium with kafka, we are experience a performance issue when both, source and sink connectors, are working at the same time the consumer performance decrease significantly, here some number of what we have: 1 Kafka Cluster with 3 nodes (8GB ram -Xms: 4096m -Xmx: 4096m each) - All the topics with Replication Factor = 3 1 Zookeper Cluster with 3 nodes (4GB RAM -Xms: 2048m -Xmx: 2048m each) 1 Connect Cluster with 5 nodes (12GB RAM c/u - -Xms: 6168m -Xmx: 6168m) with 1 Partition and 1 task but we have tried with 1 / 5 / 100 partitions and 1 / 5 / 15 / 100 task 1 Source Connector per table (4 tables) 1 Sink Connector per Table (4 tables) We are not using Schema Registry The problem: the Target DB has a delay that increase when the quantity of messages is more than 750msjs per seconds. Mesage size = 6kb Now we are processing 2000 msjs of 6kb per second (This was the best performance that we get and it was enabling ONLY the sink connectors) When we enabled only the sink connectors the performance is good, the same with source conectors with no sinks, performance is great Resources are OK, source and target environments have pleanty of CPU and Memory. When we see the messages consume per second there is like a wait for 30sec and then it consume some msjs and wait and then again. Question, what can we do to improve the consumer performance? Could you help me?
@carlaguelpa4574
@carlaguelpa4574 Год назад
@rmoff, thanks for your videos, are the best! We have a situation: We have implemented Debezium with kafka, we are experience a performance issue when both, source and sink connectors, are working at the same time the consumer performance decrease significantly, here some number of what we have: 1 Kafka Cluster with 3 nodes (8GB ram -Xms: 4096m -Xmx: 4096m each) - All the topics with Replication Factor = 3 1 Zookeper Cluster with 3 nodes (4GB RAM -Xms: 2048m -Xmx: 2048m each) 1 Connect Cluster with 5 nodes (12GB RAM c/u - -Xms: 6168m -Xmx: 6168m) with 1 Partition and 1 task but we have tried with 1 / 5 / 100 partitions and 1 / 5 / 15 / 100 task 1 Source Connector per table (4 tables) 1 Sink Connector per Table (4 tables) We are not using Schema Registry The problem: the Target DB has a delay that increase when the quantity of messages is more than 750msjs per seconds. Mesage size = 6kb Now we are processing 2000 msjs of 6kb per second (This was the best performance that we get and it was enabling ONLY the sink connectors) When we enabled only the sink connectors the performance is good, the same with source conectors with no sinks, performance is great Resources are OK, source and target environments have pleanty of CPU and Memory. When we see the messages consume per second there is like a wait for 30sec and then it consume some msjs and wait and then again. Question, what can we do to improve the consumer performance? Could you help us?
@schoesa
@schoesa Год назад
If i run docker-compose up -d it hangs everytime downloading the Kafka Connect JDBC hub plugin
@rmoff
@rmoff Год назад
hi, the best place to get help is at www.confluent.io/en-gb/community/ask-the-community/ :)
@heinhtetzaw9463
@heinhtetzaw9463 Год назад
When I create a stream with Avro format, I m getting Unable to create schema from topic, Connection reset error.
@rmoff
@rmoff Год назад
hi, the best place to get help is at www.confluent.io/en-gb/community/ask-the-community/ :)