通过kettle消费Kafka消费者数据
原文地址: https://blog.csdn.net/qq_41600330/article/details/122845271
环境介绍:
Oracle 11g
kafka2.8.0
kafka-connect-oracle
kettle 9.1
用8.0的版本测试,流程有调整,放在最后
1.启动
1.1启动zookeeper
1.2启动Kafka
1.3启动连接器
2.kettle配置
2.1添加转换流程
2.1.1设置Kafka consumer属性
transformation:新建一个,用于返回流中的结果
connection:选择direct ,右侧输入bootstrap-server
topics:选择需要消费的topic
consumer-group:定义消费者组
duration: 处理批次间隔
number-of-records: 处理批次条数
maximum concurrent batches:最大并发批次
message prefetch limit:消息预取限制,防止读取数据量太大,造成kettle挂掉
offset-management: 偏移量管理
在读取到记录时,提交偏移量。
当duration和number-of-records之中任一个满足时,就会把读取到的数据发送给后续流程。
topic中默认信息:
返回记录的路径,会在上边新建的transformation中生成get-records-from-stream流程,
此时看到新增的转换是这样的,并给他添加一个写日志操作(也可以不加),写日志中获取一下字段,方便在控制台输出获取到的记录
设置偏移量,其他不用管
2.2.3字段选择
选择需要的字段
2.2.4json-input
从字段获取源,并解析json
2.2.5表输出
提交记录数设置为1,否则不到记录数不会提交到数据库表中;
提前创先存储信息相关表;
字段映射关系:
2.2.6保存,启动
3.效果
kettle8.0版本
流程设计如下:
在kafka-consumer组件中添加的transformation中进行输出和解析,只需运行Kafka-consumer所在的转换即可
————————————————
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/qq_41600330/article/details/122845271