通过kettle组件消费Kafka下topic中数据

Posted on Posted in 经验示例

通过kettle消费Kafka消费者数据

原文地址: https://blog.csdn.net/qq_41600330/article/details/122845271

环境介绍:

Oracle 11g

kafka2.8.0

kafka-connect-oracle

kettle 9.1

用8.0的版本测试,流程有调整,放在最后

1.启动

1.1启动zookeeper

1.2启动Kafka

1.3启动连接器

2.kettle配置

2.1添加转换流程

2.1.1设置Kafka consumer属性

transformation:新建一个,用于返回流中的结果

connection:选择direct ,右侧输入bootstrap-server

topics:选择需要消费的topic

consumer-group:定义消费者组

duration: 处理批次间隔

number-of-records: 处理批次条数

maximum concurrent batches:最大并发批次

message prefetch limit:消息预取限制,防止读取数据量太大,造成kettle挂掉

offset-management: 偏移量管理

在读取到记录时,提交偏移量。

当duration和number-of-records之中任一个满足时,就会把读取到的数据发送给后续流程。

topic中默认信息:

返回记录的路径,会在上边新建的transformation中生成get-records-from-stream流程,

此时看到新增的转换是这样的,并给他添加一个写日志操作(也可以不加),写日志中获取一下字段,方便在控制台输出获取到的记录

设置偏移量,其他不用管

2.2.3字段选择

选择需要的字段

2.2.4json-input

从字段获取源,并解析json

2.2.5表输出

提交记录数设置为1,否则不到记录数不会提交到数据库表中;

提前创先存储信息相关表;

字段映射关系:

2.2.6保存,启动

3.效果

kettle8.0版本

流程设计如下:

在kafka-consumer组件中添加的transformation中进行输出和解析,只需运行Kafka-consumer所在的转换即可

————————————————

                            版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

                        

原文链接:https://blog.csdn.net/qq_41600330/article/details/122845271