最新动态
Flink 获取 Kafka 中的数据,分流存储到 Redis、MySQL 中
2024-11-09 19:40

Fl<em></em>ink 获取 Kafka 中的数据,分流存储到 Redis、MySQL 中

案例:实时处理电商订单信息

使用 消费 中的数据,并进行相应的数据统计计算。 数据格式为

 

字段描述为

其中 order_status 订单状态的描述为:1001:创建订单、1002:支付订单、1003:取消订单、1004:完成订单、1005:申请退回、1006:退回完成。

提示:pom 文件依赖,放在在文章最后有需要自取。

需求一:统计商城实时订单实收金额

注意(需要考虑订单状态,若有取消订单、申请退回、退回完成则不计入订单实收金额,其他状态的则累加 代码实现

 

测试 这里我就不开启 了,直接使用上一篇文章中写的 Flume 从端口获取数据,输出到多端(Kafka、HDFS)单文件 文件,去监控端口转存到 中。

如果没有配置 Flume,可以直接使用 开启一个生产者: (注意需要进入到 Kafka 安装目录下)

开启 Flume 监控端口后,就可以直接用 登陆端口,进行发送数据。 发送数据

 

运行结果

成功

需求二:将上面的最后计算的结果,存储到 Redis 中(Key 为:totalprice

关键知识点

Redis Sink

Flink 有专门的 Sink 到 Redis 的对象(RedisSink

创建时,还需要设置输入数据的泛型

需要传入的数据为 Redis 的基本配置:主机、端口…,redisSinkMapper 方法。 我们先创建一个 构造器

 

创建一个 RedisSink(MyRedis 见下方 自定义 RedisMapper 类

自定义 RedisMapper 类

类源码

 

通过源码我们可以知道, 表示传入数据的泛型,还需要重写三个方法(创建 Redis 描述器:使用什么方法写入 Redis,比如:HSET、SET…),(传入 Redis 中键值对的 Key 值),(传入 Redis 中键值对的值)。

代码示例

 

整体代码如下

 

需要先启动 Redis(后台启动) 进入 Redis 交互界面

输入数据

 

测试结果

需求三:使用侧边流,监控发现 order_status 字段为退回完成,将退回总额存入到 Redis 中,将 order_status 字段为取消订单的存入到 MySQL 中(Sink 到 MySQL 的偷懒没有仔细写了,直接放在最后的代码里面了)。
侧输出流

需要创建一个为侧输出流的变量: 参数说明

  • :输入数据类型
  • :设置创建侧边流的 id,必须为字符串类型。

然后我们就可以使用万能的 进行分流操作

 

注意我们这里使用了侧输出流,上面的 filter 过滤就可以去掉了,因为我们等于使用侧输出流的方式将数据过滤掉了。

完整代码

 

文件中需要的依赖

    以上就是本篇文章【Flink 获取 Kafka 中的数据,分流存储到 Redis、MySQL 中】的全部内容了,欢迎阅览 ! 文章地址:http://keair.bhha.com.cn/quote/548.html 
     动态      相关文章      文章      同类文章      热门文章      栏目首页      网站地图      返回首页 康宝晨移动站 http://keair.bhha.com.cn/mobile/ , 查看更多