Тёмный

Kafka Connect in Action: Elasticsearch 

Robin Moffatt
Подписаться 4,1 тыс.
Просмотров 23 тыс.
50% 1

This is a walkthrough of how to stream data from #ApacheKafka to #Elasticsearch using #KafkaConnect and the Elasticsearch sink connector. You can try the whole tutorial for yourself here: rmoff.dev/kafka-elasticsearch
ℹ️Contents:
00:00:00 Introduction
00:07:17 Updating documents in Elasticsearch
00:11:38 Deleting documents in Elasticsearch with the sink connector
00:13:25 Funny when live demos don't work
00:16:14 Schemas and Elasticsearch Document Mappings
00:25:57 Sending data to Elasticsearch without a declared schema
00:28:57 Getting Timestamps from Kafka to Elasticsearch
00:33:14 Using dynamic field mapping to guess at date field types
00:36:21 Using Single Message Transform to set timestamp schema type explicitly
00:38:49 Dynamic Templates for data type mapping from Kafka in Elasticsearch
00:47:20 Changing the target index name in Elasticsearch
00:48:01 Using RegEx to change the target index name
00:49:35 Time-based partitioning of indices in Elasticsearch
00:51:57 Combining Single Message Transform in Kafka Connect
00:53:00 Error handling
00:57:13 Configure Kafka Connect to not fail on deserialisation errors
00:59:31 Setting up a Dead Letter Queue
01:02:23 Connector error handling outside of the Kafka Connect framework
Try it out for yourself: rmoff.dev/kafka-elasticsearch
--
☁️ Confluent Cloud ☁️
Confluent Cloud is a managed Apache Kafka and Confluent Platform service. It scales to zero and lets you get started with Apache Kafka at the click of a mouse. You can signup at confluent.cloud/signup?... and use code 60DEVADV for $60 towards your bill (small print: www.confluent.io/confluent-cl...)

Наука

Опубликовано:

 

11 июл 2024

Поделиться:

Ссылка:

Скачать:

Готовим ссылку...

Добавить в:

Мой плейлист
Посмотреть позже
Комментарии : 50   
@ptssrinivasan
@ptssrinivasan 4 года назад
Thanks for Video. This video provided me good start for Kafka & ELK !!
@sonicjs2332
@sonicjs2332 2 года назад
Why can't all tech demos be this good? Great video, very succinct and easy to follow!
@rmoff
@rmoff 2 года назад
Thanks :)
@jester667
@jester667 2 года назад
Because you have to have a deep understanding of the topic you are talking about. It can't be achieved if you release about two videos a week from the whole spectrum of current and hot IT topics. It takes years to master a subject and be able to understand it thoroughly enough to teach about it so smooth.
@vigneshmuthusamy9332
@vigneshmuthusamy9332 2 года назад
You're such a life saver, providing such useful videos. Thank you so much for the knowledge you share.
@rmoff
@rmoff 2 года назад
Happy to help!
@adautosb
@adautosb 3 года назад
Really helpful video! Great job man!
@rmoff
@rmoff 3 года назад
Glad it helped!
@carlaguelpa4574
@carlaguelpa4574 Год назад
@rmoff, thanks for your videos, are the best! We have a situation: We have implemented Debezium with kafka, we are experience a performance issue when both, source and sink connectors, are working at the same time the consumer performance decrease significantly, here some number of what we have: 1 Kafka Cluster with 3 nodes (8GB ram -Xms: 4096m -Xmx: 4096m each) - All the topics with Replication Factor = 3 1 Zookeper Cluster with 3 nodes (4GB RAM -Xms: 2048m -Xmx: 2048m each) 1 Connect Cluster with 5 nodes (12GB RAM c/u - -Xms: 6168m -Xmx: 6168m) with 1 Partition and 1 task but we have tried with 1 / 5 / 100 partitions and 1 / 5 / 15 / 100 task 1 Source Connector per table (4 tables) 1 Sink Connector per Table (4 tables) We are not using Schema Registry The problem: the Target DB has a delay that increase when the quantity of messages is more than 750msjs per seconds. Mesage size = 6kb Now we are processing 2000 msjs of 6kb per second (This was the best performance that we get and it was enabling ONLY the sink connectors) When we enabled only the sink connectors the performance is good, the same with source conectors with no sinks, performance is great Resources are OK, source and target environments have pleanty of CPU and Memory. When we see the messages consume per second there is like a wait for 30sec and then it consume some msjs and wait and then again. Question, what can we do to improve the consumer performance? Could you help us?
@carloshfmaciel
@carloshfmaciel 3 года назад
Very well explained!
@rmoff
@rmoff 3 года назад
Thanks :)
@norindermedflera5849
@norindermedflera5849 3 года назад
Brilliant!
@rmoff
@rmoff 3 года назад
Thanks :)
@ariunbattogtokhjargal8906
@ariunbattogtokhjargal8906 3 года назад
You saved my ass. Thanks man
@rmoff
@rmoff 3 года назад
Happy to have helped :)
@luizakharatyan
@luizakharatyan 2 года назад
Thanks for the video, it is really helpful like all of your tutorials. I had a question - what if for delete action we send a message with not-null body? Is there any way to apply transformation with some condition? Like to apply drop value transformation only on rows that have deleted flag true in their bodies?
@rmoff
@rmoff 2 года назад
Glad you like the video :) A good place to ask your question is over on forum.confluent.io/.
@samgargle
@samgargle 4 года назад
@rmoff, Thanks for demonstrating this. I have one question I am pretty new to Kafka so pardon my knowledge. We have streamed multiple SQL tables to kafka using source connector. Now If we have to read data from multiple topics using single connector, how we can set primary key of table to ElasticSearch _id
@rmoff
@rmoff 4 года назад
If you set the key of the Kafka message when you *ingest* the data from the tables then you can set key.ignore=false and the sink will use the message key (which would be the table key) for the Elasticsearch ID. For any more questions, the best place to ask this is on: → Mailing list: groups.google.com/forum/#!forum/confluent-platform or → Slack group: cnfl.io/slack
@jeremykenn
@jeremykenn 20 дней назад
create stream test02 (col1 int, col2 varchar) with (kafka_topic='test02', partitions=1, value_format='AVRO'); only returned col1 and col2, i don't see the default rowtime and rowkey ksql 7.3.2
@Saikrishnabollabathini
@Saikrishnabollabathini Месяц назад
Hi, Is there a way to modify the payload as needed? For example, if the kafka message is {"C":1, "C2":"Test"}, In elastic search, the document need to be created as {"C":1, "searchFields":{"C2":"Test"}} Is there a way to achieve this?
@ptssrinivasan
@ptssrinivasan 3 года назад
I tried dynamic template in Elastic search (ES) , but it not working. in ES always take Order_TS_EPOCH is taken as long type
@jogendersingh4095
@jogendersingh4095 4 года назад
Thank you for the knowledge on Kafka Connect !!! It is really helpful. Had a question - How can we transform the data to a different format before pushing it onto elastic ??
@rmoff
@rmoff 4 года назад
Can you explain what you mean by "different format", maybe with an example?
@jogendersingh4095
@jogendersingh4095 4 года назад
@@rmoff Hello Robin. I am reading data in Avro format, the data has many fields (nested objects), I want to extract a particular nested object from it and then flatten it and push only this extracted object to elastic. I am confused can this be done with custom transformation? Thank You for your reply !!!!
@rmoff
@rmoff 4 года назад
@@jogendersingh4095 Yes Single Message Transform can do this (dropping fields, flattening remaining ones). If that doesn't suffice then check out ksqlDB ru-vid.com/video/%D0%B2%D0%B8%D0%B4%D0%B5%D0%BE-ad02yDTAZx0.html (that vid is about JDBC sink but the principles shown still apply)
@alexandersk.8963
@alexandersk.8963 Год назад
Amazing materials for me, Robin, thank you a lot!
@vvmanyam1
@vvmanyam1 3 года назад
Thanks for the video. With key.ignore=false , Kafka Event Key is mapping to _id of Index, is there a way to also populate topic key as additional attribute in index?
@rmoff
@rmoff 3 года назад
If you're using ksqlDB you could create a new field that holds the key value. It would also be a good use of Single Message Transform but I'm not aware of one that exists already that does it.
@vigneshmuthusamy9332
@vigneshmuthusamy9332 2 года назад
Hi Robin, am trying this out without the KSQLDB. These are the steps I followed, 1. Started all services using docker-compose, except KSQLDB and KCat. 2. Created kafka topic 3. Created elasticsearch index 4. Created sink connector 5. Published one message into the topic from CLI and verified the message present within kafka topic. Issue: There are no messages in the ES index. I checked the status of the connector, it's running with no errors. But, the message is not sent to the index. Is there any explicit mapping to be done for the messages to be sent to that particular index? (There wasn't any while using ksqldb, but I doubt there might be something, when not using it.)
@rmoff
@rmoff 2 года назад
This is best asked over at forum.confluent.io/
@georgelza
@georgelza 2 года назад
question, is there something we need to install on our local docker-compose build of confluent-platform before executing the below command:
@georgelza
@georgelza 2 года назад
might be answering my own question, the environment used here is a sub environment on demo-scene. busy pulling it now to then retry the steps. would be great to have a video still of the cp-all-in-one environment, and then see step for step how to enable a elastic sink connector on it... aka how to add the connector library via a fellow docker image and what to change where.
@ashishpadakannaya9446
@ashishpadakannaya9446 4 года назад
Is it possible to create *specific* elastic search schema at kafka-topic level? For our use case, we use eager global ordinals for certain fields (www.elastic.co/guide/en/elasticsearch/reference/master/eager-global-ordinals.html) How would we define this at a Kafka level?
@rmoff
@rmoff 4 года назад
Not that I know of. You could pre-create the index and set schema.ignore=true in the connector.
@ashishpadakannaya9446
@ashishpadakannaya9446 4 года назад
Please accept my knees! Note for the uninitiated: You might need to configure docker to allocate around 3 gigs of RAM especially for Kafka-connect.
@rmoff
@rmoff 4 года назад
Yes, good point, thanks @ashish. For the full stack including Confluent Control Center and everything I usually allocate 8GB.
@MohanRadhakrishnan
@MohanRadhakrishnan 2 года назад
We want a query log with a JDBC connector and Connect the Elastic. Does a stream makes sense here ? Some consumers send to Elastic Sink and some others send to devices. Different topics. How do consumer groups help here ?
@rmoff
@rmoff 2 года назад
With the data in Kafka you can route it to different places as you require, either by pre-processing it with ksqlDB/Kafka Streams, or using Predicate/Filter in the Kafka Connect sink Single Message Transform configuration. If you have more questions please head over to forum.confluent.io/
@ajpieri
@ajpieri 3 года назад
I have seen a number of tutorials that say to use the JSON converter and to set value.converter.schemas.enable to true, however, that doesn't seem to be the case for me. The data is being put into Kafka via a connector that uses the io.confluent.connect.avro.AvroConverter. This means the data is in the topic with an Avro schema. In order for me to get the Elasticsearch Sink Connector to work, I needed to set it to use the io.confluent.connect.avro.AvroConverter also. Not the JSON converter. This was a little confusing for me, because I thought the JSON converter would take it from Avro to JSON, but that doesn't seem to be the case. I guess the AvroConverter will do this if it is already in Avro. If it is in JSON, it will convert it to Avro? Thoughts?
@rmoff
@rmoff 3 года назад
Hi Tony, check out www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained. A converter in Kafka Connect is a way of telling it how to serialise the data into bytes (for a source connector) or deserialise from bytes (for a sink connector). If you're still not clear on this then head over to forum.confluent.io/and I will try and answer your question there.
@yatinbaraiya9145
@yatinbaraiya9145 3 года назад
@robin Moffatt First of thanks and excellent video and presentation , I want the data as below way Mysql--->Kafka-Connect--->ElasticSearch , that works perfect with insert/update DML sql command , But DDL statement not work , like "alter table order Add iCount Int; ", that change not reflected to elasticsearch , any solution for that
@rmoff
@rmoff 3 года назад
Hmmm, I'm not sure. The best place to ask any further questions is: → Slack group: cnfl.io/slack or → Mailing list: groups.google.com/forum/#!forum/confluent-platform
@LinuxGamerYT
@LinuxGamerYT 2 года назад
I have a question, what can i do if my kafka its in another place and its not on my local?
@rmoff
@rmoff 2 года назад
You can configure your Kafka Connect worker to connect to your Kafka cluster on its remote location. If you have further questions please do post details at forum.confluent.io/
@vigneshmuthusamy9332
@vigneshmuthusamy9332 2 года назад
Is it possible to send a kafka message without key, to a topic connected to kafka-connector?
@rmoff
@rmoff 2 года назад
Yes. With the Elasticsearch connector you set key.ignore=true.
@thenaughtyanil
@thenaughtyanil 2 года назад
why do we need ksql? Can somebody help to understand this.
@rmoff
@rmoff 2 года назад
Here are a couple of videos that you'll find useful: * developer.confluent.io/learn-kafka/ksqldb/intro/ * ru-vid.com/video/%D0%B2%D0%B8%D0%B4%D0%B5%D0%BE-7mGBxG2NhVQ.html
@leamon9024
@leamon9024 3 года назад
Hi, nice video! How do I pass a elasticsearch configuration file inside the kafka-connect docker container without using kSQL? For example: { 'connector.class': 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector', 'connection.url' : 'elasticsearch:9200', 'value.converter' : 'io.confluent.connect.avro.AvroConverter', 'value.converter.schema.registry.url' : 'schema-registry:8081', 'key.converter' : 'io.confluent.connect.avro.AvroConverter', 'key.converter.schema.registry.url' : 'schema-registry:8081', 'type.name' : '_doc', 'topics' : 'test01', 'key.ignore' : 'true', 'schema.ignore' : 'false' }
Далее
From Zero to Hero with Kafka Connect
33:49
Просмотров 27 тыс.
Про Kafka (основы)
49:23
Просмотров 373 тыс.
24 часа в самом маленьком отеле
21:19
На чем играют ПРО | Standoff 2
07:25
Просмотров 248 тыс.
마시멜로우로 체감되는 요즘 물가
00:20
What is Apache Kafka®?
11:42
Просмотров 342 тыс.
Kafka Connect in Action: Loading a CSV file into Kafka
43:21
fluentbit - kafka - vector - opensearch
1:00:35
Просмотров 2,9 тыс.
Integrating Oracle and Kafka
54:49
Просмотров 22 тыс.
An introduction to ksqlDB
1:16:09
Просмотров 13 тыс.
Apache Kafka Crash Course
1:18:06
Просмотров 417 тыс.
Как работает экосистема Apple?
18:08