案例:实时处理电商订单信息
使用 消费 中的数据,并进行相应的数据统计计算。 数据格式为:
字段描述为:
其中 order_status 订单状态的描述为:1001:创建订单、1002:支付订单、1003:取消订单、1004:完成订单、1005:申请退回、1006:退回完成。
提示:pom 文件依赖,放在在文章最后有需要自取。
需求一:统计商城实时订单实收金额
注意:(需要考虑订单状态,若有取消订单、申请退回、退回完成则不计入订单实收金额,其他状态的则累加) 代码实现:
测试: 这里我就不开启 了,直接使用上一篇文章中写的 Flume 从端口获取数据,输出到多端(Kafka、HDFS)单文件 文件,去监控端口转存到 中。
如果没有配置 Flume,可以直接使用 开启一个生产者: (注意需要进入到 Kafka 安装目录下)
开启 Flume 监控端口后,就可以直接用 登陆端口,进行发送数据。 发送数据:
运行结果:
成功!!
需求二:将上面的最后计算的结果,存储到 Redis 中(Key 为:totalprice)
关键知识点: 、
Redis Sink
Flink 有专门的 Sink 到 Redis 的对象(RedisSink):
创建时,还需要设置输入数据的泛型:
需要传入的数据为 Redis 的基本配置:主机、端口…,redisSinkMapper 方法。 我们先创建一个 构造器:
创建一个 RedisSink: (MyRedis 见下方 自定义 RedisMapper 类)
自定义 RedisMapper 类
类源码:
通过源码我们可以知道, 表示传入数据的泛型,还需要重写三个方法:(创建 Redis 描述器:使用什么方法写入 Redis,比如:HSET、SET…),(传入 Redis 中键值对的 Key 值),(传入 Redis 中键值对的值)。
代码示例:
整体代码如下:
需要先启动 Redis:(后台启动) 进入 Redis 交互界面:
输入数据:
测试结果:
需求三:使用侧边流,监控发现 order_status 字段为退回完成,将退回总额存入到 Redis 中,将 order_status 字段为取消订单的存入到 MySQL 中(Sink 到 MySQL 的偷懒没有仔细写了,直接放在最后的代码里面了)。
侧输出流
需要创建一个为侧输出流的变量: 参数说明:
- :输入数据类型;
- :设置创建侧边流的 id,必须为字符串类型。
然后我们就可以使用万能的 进行分流操作:
注意我们这里使用了侧输出流,上面的 filter 过滤就可以去掉了,因为我们等于使用侧输出流的方式将数据过滤掉了。
完整代码:
文件中需要的依赖: