推广 热搜:   公司  快速  中国  企业    行业  设备  上海  未来 

Flink 获取 Kafka 中的数据,分流存储到 Redis、MySQL 中

   日期:2024-11-09     移动:http://keair.bhha.com.cn/mobile/quote/548.html

Fl<em></em>ink 获取 Kafka 中的数据,分流存储到 Redis、MySQL 中

案例:实时处理电商订单信息

使用 消费 中的数据,并进行相应的数据统计计算。 数据格式为

 

字段描述为

其中 order_status 订单状态的描述为:1001:创建订单、1002:支付订单、1003:取消订单、1004:完成订单、1005:申请退回、1006:退回完成。

提示:pom 文件依赖,放在在文章最后有需要自取。

需求一:统计商城实时订单实收金额

注意(需要考虑订单状态,若有取消订单、申请退回、退回完成则不计入订单实收金额,其他状态的则累加 代码实现

 

测试 这里我就不开启 了,直接使用上一篇文章中写的 Flume 从端口获取数据,输出到多端(Kafka、HDFS)单文件 文件,去监控端口转存到 中。

如果没有配置 Flume,可以直接使用 开启一个生产者: (注意需要进入到 Kafka 安装目录下)

开启 Flume 监控端口后,就可以直接用 登陆端口,进行发送数据。 发送数据

 

运行结果

成功

需求二:将上面的最后计算的结果,存储到 Redis 中(Key 为:totalprice

关键知识点

Redis Sink

Flink 有专门的 Sink 到 Redis 的对象(RedisSink

创建时,还需要设置输入数据的泛型

需要传入的数据为 Redis 的基本配置:主机、端口…,redisSinkMapper 方法。 我们先创建一个 构造器

 

创建一个 RedisSink(MyRedis 见下方 自定义 RedisMapper 类

自定义 RedisMapper 类

类源码

 

通过源码我们可以知道, 表示传入数据的泛型,还需要重写三个方法(创建 Redis 描述器:使用什么方法写入 Redis,比如:HSET、SET…),(传入 Redis 中键值对的 Key 值),(传入 Redis 中键值对的值)。

代码示例

 

整体代码如下

 

需要先启动 Redis(后台启动) 进入 Redis 交互界面

输入数据

 

测试结果

需求三:使用侧边流,监控发现 order_status 字段为退回完成,将退回总额存入到 Redis 中,将 order_status 字段为取消订单的存入到 MySQL 中(Sink 到 MySQL 的偷懒没有仔细写了,直接放在最后的代码里面了)。
侧输出流

需要创建一个为侧输出流的变量: 参数说明

  • :输入数据类型
  • :设置创建侧边流的 id,必须为字符串类型。

然后我们就可以使用万能的 进行分流操作

 

注意我们这里使用了侧输出流,上面的 filter 过滤就可以去掉了,因为我们等于使用侧输出流的方式将数据过滤掉了。

完整代码

 

文件中需要的依赖

本文地址:http://keair.bhha.com.cn/quote/548.html    康宝晨 http://keair.bhha.com.cn/ , 查看更多

特别提示:本信息由相关用户自行提供,真实性未证实,仅供参考。请谨慎采用,风险自负。


相关最新动态
推荐最新动态
点击排行
网站首页  |  关于我们  |  联系方式  |  使用协议  |  版权隐私  |  网站地图  |  排名推广  |  广告服务  |  积分换礼  |  网站留言  |  RSS订阅  |  违规举报  |  粤ICP备2023022329号