Тёмный

How to Stream Data using Apache Kafka & Debezium from Postgres | Real Time ETL | ETL | Part 2 

BI Insights Inc
Подписаться 14 тыс.
Просмотров 16 тыс.
50% 1

Опубликовано:

 

21 окт 2024

Поделиться:

Ссылка:

Скачать:

Готовим ссылку...

Добавить в:

Мой плейлист
Посмотреть позже
Комментарии : 45   
@BiInsightsInc
@BiInsightsInc 7 месяцев назад
Link to the series: ru-vid.com/group/PLaz3Ms051BAkwR7d9voHsflTRmumfkGVW
@sunsas4275
@sunsas4275 Год назад
Hello! As a newbie data engineer, I've found your videos to be incredibly helpful. The way you explain concepts makes it easy for me to grasp and apply them in my work. Thank you for sharing your knowledge and helping me on my learning journey! Looking forward the your next videos
@allthatyouare
@allthatyouare 9 месяцев назад
Gold content. Thank you!
@aniketrele7688
@aniketrele7688 3 месяца назад
Hi, Is the connector name and topic name always same? Can you name your ropic something else? If you want to have multiple topic for 1 connector then it will be helpful. Thanks in advance.
@BiInsightsInc
@BiInsightsInc 3 месяца назад
Hi there, no your connector name can be different than your topic name. You can have multiple connectors read from the same topic.
@edisonngizwenayo5752
@edisonngizwenayo5752 10 месяцев назад
Hello! Thank you for the amazing content which briefly explain data streaming with CDC, and I just have a quick question regarding the location in container where debezium store all configuration made when setting up a connector. I am asking this for the purpose of knowing how someone can persist a connection for later usage even when the container stop. Thanks
@BiInsightsInc
@BiInsightsInc 10 месяцев назад
You can save the connector configs in the /kafka/config folder and attach it in the yml file. This way you can persist the connector settings and start the connector via CLI. Connector settings are only removed when you destroy and re-create the debezium container. On the stop/restart the connector settings are persisted.
@MrSanjuuk
@MrSanjuuk 3 месяца назад
Where is the pyspark script video
@chald244
@chald244 8 месяцев назад
How to handle pipeline disruptions. Can you provide some insights for the below referred points? 1. There seems to be known limitation with PostgreSQL database that transactions that are already read by CDC replication task can’t be reprocessed even when the task is restarted from old LSN Value. 2. Also it appears, the task cant be moved between the replicate servers without coordinating with the PostgreSQL DBA on updating the pg_hba.conf file. Can we create a script to overcome this or any better alternatives.
@BiInsightsInc
@BiInsightsInc 6 месяцев назад
Hey Chald244, you can use the below function to read the logical replication data. The peek function behaves just like the pg_logical_slot_get_binary_changes() function, except that changes are returned but not consumed; that is, they will be returned again on future calls. You may also want to take a look at what plugin you are using to create the replication slot as it will determine what sort of values are returned when querying the replication data. The default is `pgouput` you may want to create the logical replication slot with `test_decoding` to get the text data back otherwise, you may need to decode the wal data before you can use it. Here is the documentation for the replication functions. www.postgresql.org/docs/9.4/functions-admin.html pg_logical_slot_peek_binary_changes(slot_name name, upto_lsn pg_lsn, upto_nchanges int, VARIADIC options text[])
@andriifadieiev9757
@andriifadieiev9757 Год назад
Hello! Just found your amazing channel and enjoying it a lot. I have a question about subject. I reproduced your setup and it works just fine for inserts and updates. But I noticed that on delete no message is produced to kafka topic. Any tips on how to fix this? In any case thank you for your content!
@andriifadieiev9757
@andriifadieiev9757 Год назад
Actually after some debezium docs reading I achieved it by adding "transforms.unwrap.delete.handling.mode": "rewrite" to connector config
@r4m0n5t3r
@r4m0n5t3r 7 месяцев назад
Hi. Newbie here. I am encountering this error ModuleNotFoundError: No module named 'kafka.vendor.six.moves' when I tried to run something via jupyter. Any suggestion how to fix this?
@BiInsightsInc
@BiInsightsInc 7 месяцев назад
Per their docs this appears to be a Python 3.12 issue. I'd suggest you downgrade your Python version and try again.
@ayocs2
@ayocs2 Месяц назад
Hello Sir, how do i get entire CDC like insert, delete, updates?
@BiInsightsInc
@BiInsightsInc Месяц назад
Hey there, you can enable the "key.converter.schemas.enable" in the connector. This will include the schema level changes. It will give you additional details and fields called "before" and "after". These will contain the state of a row before and after an event. This way you can updates and deletes in the database.
@macetesdev5738
@macetesdev5738 25 дней назад
Why you did not created a table from select command
@BiInsightsInc
@BiInsightsInc 23 дня назад
You can create a table via the select command but using the DDL you get the fine grain control over the table definition.
@jootuubanen7727
@jootuubanen7727 7 месяцев назад
How about deletes with this technic and setup?
@BiInsightsInc
@BiInsightsInc 7 месяцев назад
The connector continuously captures row-level changes that insert, update, and delete database content and that were committed to a PostgreSQL database. You can modify the connector to include few more properties to get the deleted flag in the kye/value pair. Below are the relevant properties. Here is the link to debezium docs: debezium.io/documentation/reference/stable/transformations/event-flattening.html transforms.unwrap.drop.tombstones=false transforms.unwrap.delete.handling.mode=rewrite
@timiayoade9953
@timiayoade9953 Год назад
Hi. I love your videos. I have been trying this project for months now, but still getting "connecting to my Ip address refused ". please how can I solve this problem? I am stucked here for months now
@BiInsightsInc
@BiInsightsInc Год назад
Try and connect to this postgres database outside of this project and make sure you are able to connect. Here are few steps to remedy the Postgres connection issues. In the Potsgres installed directory locate and open the postgresql.conf. add this line to that file listen_addresses = '*' Then open file named pg_hba.conf. Add this line to that file. host all all 0.0.0.0/0 md5 Now restart your pogresql server and try again.
@technicalking4711
@technicalking4711 7 месяцев назад
How can i do the same with Amazon Dynamo db, can you please make video on this.
@BiInsightsInc
@BiInsightsInc 7 месяцев назад
You can check the AWS stack and see if there any service that supports CDC with Dynamo db. Amazon DynamoDB is a NoSQL database and it may not offer same capabilities as a traditional SQL database. This streaming stack depends on the PostgreSQL built in replication proccess.
@thejasreddy6859
@thejasreddy6859 Год назад
it is possible please do the videos on flink and use language scala
@hungnguyenthanh4101
@hungnguyenthanh4101 Год назад
Can you make a video showing ETL using Kafka to extract, PySpark to process, and upload to S3, Can you use Airflow to manage?
@BiInsightsInc
@BiInsightsInc Год назад
I will try and cover this in the upcoming videos in the PySpark series. Stay tuned.
@hungnguyenthanh4101
@hungnguyenthanh4101 Год назад
@@BiInsightsInc Thank you so much, I'm looking forward to your video.
@paulaganbi5236
@paulaganbi5236 Год назад
Hello, I'm currently encountering the error keyerror: 'PGPASS'. Please I would love to know how to resolve this
@BiInsightsInc
@BiInsightsInc Год назад
This means you re missing the environment variables “PGPASS”. You can remove it and provide the password in the script to resolve it. Another option is to define this environment variables and it should work as expected.
@paulaganbi5236
@paulaganbi5236 Год назад
@@BiInsightsInc thank you so much for your quick response. Am I supposed to put my postgress SU password in that parameter?
@paulaganbi5236
@paulaganbi5236 Год назад
and what would I put in place of PGUID?
@BiInsightsInc
@BiInsightsInc Год назад
@@paulaganbi5236 that will be your db username.
@paulaganbi5236
@paulaganbi5236 Год назад
Hi Haq I'm still encountering difficulties. Please do I put the postgres server password or the database password in the password parameter? Thanks
@hungnguyenthanh4101
@hungnguyenthanh4101 Год назад
hi, can you give me the file to import the data tables like in the video
@BiInsightsInc
@BiInsightsInc Год назад
Please check the description. All files used in the video are available in the GitHub repo.
@thejasreddy6859
@thejasreddy6859 Год назад
apache flink
Далее
What’s your height?🩷🙀💚
00:59
Просмотров 3,8 млн
Solving one of PostgreSQL's biggest weaknesses.
17:12
Просмотров 205 тыс.
Replication from MySQL to SQLServer (KAFKA)
16:21
Просмотров 1,6 тыс.
Data Pipelines: Using CDC to Ingest Data into Kafka
7:20