Тёмный

🎄Twelve Days of SMT 🎄 - Day 2: ValueToKey and ExtractField 

Robin Moffatt
Подписаться 4,2 тыс.
Просмотров 2,6 тыс.
50% 1

This video explains using the ValueToKey and ExtractField SMT to set the key of a message based on a field in the value of the message.
Setting the key of a Kafka message is important as it ensures correct logical processing when consumed across multiple partitions, as well as being a requirement when joining to messages in other topics. When using Kafka Connect the connector may already set the key, which is great. If not, you can use these two Single Message Transforms (SMT) to set it as part of the pipeline based on a field in the value part of the message.
Kafka Connect is the primary method for doing integration with Apache Kafka and other systems. It includes Single Message Transform (SMT) which can be used to modify messages as they pass through the pipeline in or out of Kafka.
---
✍️Blog: rmoff.net/2020/12/09/twelve-d...
👾 Demo code: github.com/confluentinc/demo-...
Learn more about Kafka Connect here:
🏃‍♂️ Quick: • Kafka Connect in 60 se...
🚶 More detail: rmoff.dev/kafka-connect-zero-...
---
☁️ Confluent Cloud: confluent.cloud/signup?...
💾Download Confluent Platform: www.confluent.io/download/?ut...
📺 Kafka Connect connector deep-dives: • Kafka Connect
✍️Kafka Connect documentation: docs.confluent.io/current/con...
🧩Confluent Hub: www.confluent.io/hub/?...

Наука

Опубликовано:

 

8 дек 2020

Поделиться:

Ссылка:

Скачать:

Готовим ссылку...

Добавить в:

Мой плейлист
Посмотреть позже
Комментарии : 19   
@davidjohn9083
@davidjohn9083 3 года назад
Helpful explination , thanks man 👍
@rmoff
@rmoff 3 года назад
Glad it helped
@tanakpek6270
@tanakpek6270 Год назад
Hi Robin, Awesome video! I really appreciate this. I was wondering, I did these two smts and had JSON_SR configured everywhere. but the key was always prepended with a \" and appended with an "\ between the quotes. What is causing this
@rmoff
@rmoff Год назад
Hi, glad you liked the video! a good place to go for help is www.confluent.io/en-gb/community/ask-the-community/. thanks.
@muks
@muks 2 года назад
Awesome, Robin, That was good video. I wonder though, if we have the "transforms.extractKeyFromStruct.field":"id" then why do we still need to use "transforms.copyIdToKey.fields": "id", ?? If I understand from these two config props is that to extract a key from value we need to first copy that value from source field to destination field and then use transformation such as extracting the id in our case and putting it back into the key field. Please suggest if my understanding is correct? Thanks as always awesome knowledge transfer there...
@sappysaplala
@sappysaplala 3 года назад
Great video and great series on SMT Robin. I've been trying this out with the debezium connector for Oracle and encountering some challenges with the copyIdToKey,extractKeyFromStruct which are failing because of Field does not exist for ID field. Any tips on debugging? Thank you.
@rmoff
@rmoff 3 года назад
I think Debezium can set the key value based on the primary key in the source. If you want a different field then this would be the correct SMT to use. What value.converter are you using?
@sappysaplala
@sappysaplala 3 года назад
Thanks @@rmoff! io.confluent.connect.avro.AvroConverter is what the default connector is of my kafka-connect, docker image. I'm not specifying a converter in the connector configuration. And yes Debezium provides a key based on the source table's primary key but it is in struct format {ID=1} and I'd like to just get the integer portion of the key which is why I'm trying to use extractKeyFromStruct. I've also tried using other fields from the value but same issue persists. Will test this out more.
@sappysaplala
@sappysaplala 3 года назад
@@rmoff Saw that the transformations are being applied to the schema changes topic which doesn't have the ID field. Tried recreating the connector with "include.schema.changes": "false" and now it works like a charm. thank you!
@marcorossi2268
@marcorossi2268 Год назад
What if i want to shred the json into submessages and publish each section to a different topic that ends into a different destanation table
@rmoff
@rmoff Год назад
Hi, I don't work with Kafka directly anymore. A good place to go for help is www.confluent.io/en-gb/community/ask-the-community/. thanks.
@bonolomokgadi9025
@bonolomokgadi9025 Год назад
Hi Robin. I'm seeing the below error. Key converter is in StringConverter "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler \tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:220) \tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:142) \tat org.apache.kafka.connect.runtime.TransformationChain.transformRecord(TransformationChain.java:70) \tat org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50) \tat org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:357) \tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:271) \tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200) \tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255) \tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) \tat java.util.concurrent.FutureTask.run(FutureTask.java:266) \tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) \tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) \tat java.lang.Thread.run(Thread.java:750) Caused by: org.apache.kafka.connect.errors.DataException: Field does not exist: ENTITY_NO \tat org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:89) \tat org.apache.kafka.connect.transforms.ValueToKey.apply(ValueToKey.java:67) \tat org.apache.kafka.connect.runtime.TransformationChain.lambda$transformRecord$0(TransformationChain.java:70) \tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:166) \tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:200) \t... 12 more " Below is the config "key.ignore":"false", "transforms": "CastToInt64,CastToString,copyIdToKey,extractKeyFromStruct", "transforms.copyIdToKey.type": "org.apache.kafka.connect.transforms.ValueToKey", "transforms.copyIdToKey.fields": "ENTITY_NO", "transforms.extractKeyFromStruct.type": "org.apache.kafka.connect.transforms.ExtractField$Key", "transforms.extractKeyFromStruct.field": "ENTITY_NO", "transforms.CastToInt64.spec": "REQUESTING_ENTITY_NO:int64", "transforms.CastToInt64.type": "org.apache.kafka.connect.transforms.Cast$Value", "transforms.CastToString.spec": "REQUESTING_ENTITY_NO:string", "transforms.CastToString.type": "org.apache.kafka.connect.transforms.Cast$Value",
@brittanyblassingill
@brittanyblassingill 2 года назад
Getting a "no suitable driver found error" for jdbc when trying to create the connector. Any tips?
@rmoff
@rmoff 2 года назад
Check out this note: www.confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector/#no-suitable-driver-found If you have more questions please do head to forum.confluent.io/
@abdallhbukhari9847
@abdallhbukhari9847 2 года назад
Hi Dear, can you please tell me what can I do for removing the data type like ("string", "int") from the payload?
@nurkandzhsupov6953
@nurkandzhsupov6953 3 года назад
Also how to create multiple fields for TimestampConverter?
@rmoff
@rmoff 3 года назад
How do you mean? If you can clarify this then I can try and cover it in the video I'm about to record that covers the TimestampConverter :)
@nurkandzhsupov6953
@nurkandzhsupov6953 3 года назад
"transforms":"createKey,extractInt", "transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey", "transforms.createKey.fields":"id", "transforms.extractInt.type":"org.apache.kafka.connect.transforms.ExtractField$Key", "transforms.extractInt.field":"id" Is it right approach?
@rmoff
@rmoff 3 года назад
Yes, that looks right
Далее
🎄Twelve Days of SMT 🎄 - Day 3: Flatten
9:42
Просмотров 1,6 тыс.
🎄Twelve Days of SMT 🎄 - Day 8: TimestampConverter
19:14
ksqlDB HOWTO: Schema Manipulation
10:56
Просмотров 1,6 тыс.
ksqlDB HOWTO: Integration with other systems
5:16
Просмотров 1,5 тыс.
🎄Twelve Days of SMT 🎄 - Day 10: ReplaceField
12:57