推广 热搜:   公司  快速  中国  企业    行业  设备  上海  未来 

【大数据学习篇8】 热门品类Top10分析

   日期:2024-11-14     移动:http://keair.bhha.com.cn/mobile/quote/952.html

掌握热门品类Top10分析实现思路

【大数据学习篇8】 热门品类Top10分析

掌握如何创建Spark连接并读取数据集

掌握利用Spark获取业务数据

掌握利用Spark统计品类的行为类型

掌握利用Spark过滤品类的行为类型

掌握利用Spark合并相同品类的行为类型

掌握利用Spark根据品类的行为类型进行排序

掌握将数据持久化到Hbase数据库

熟悉通过Spark On YARN运行程序

品类指商品所属分类,用户在访问电商网站时,通常会产生很多行为,例如查看商品的信息、将感兴趣的商品加入购物车和购买商品等,这些行为都将作为数据被网站存储。本章我们将通过对电商网站存储的用户行为数据进行分析,从而统计出排名前10的热门品类。

某电商网站2019年11月产生的用户行为数据存储在文件user_session.txt,该文件中的每一行数据都表示一个用户行为。

{"user_session":"0000007c-adbf-4ed7-af17-d1fef9763d67","event_type":"view","category_id":"2053013553090134275", "user_id":"560165420","product_id":"8900305","address_name":"Maryland","event_time":"2019-11-18 09:16:19"}

user_session:用于标识用户行为的唯一值。

event_type:表示用户行为的类型,包括view(查看)、cart(加入购物车)和purchase(购买)行为。

category_id:表示商品品类ID。

user_id:表示用户ID。

product_id:表示商品ID。

address_name:表示产生事件的区域。

event_time:表示产生事件的具体时间。

                

 分别统计各个品类商品的查看次数、 加入购物车次数以及购买次数。

 将同一品类中商品的查看、加入购物车 以及购买次数进行合并。

           

        自定义排序规则按照各个品类中商品的查看、 加入购物车和购买次数进行降序排序,获取 排名前10的品类,就是热门品类Top10。排 序时,优先按照各个品类商品的查看次数降 序排列,如果查看次数相同,则按照各个品 类商品的加入购物车次数进行降序排列。如 果查看次数和加入购车次数都相同,那么按 照各品类商品的购买次数进行降序排列。

       

将同一品类中商品的查看、加入购物车和 购买次数映射到自定义排序规则中进行排 序处理。         

 

        读取数据集中的行为类型(event_type)和品类ID(category_id)数据,为了便于后续聚合处理时,将相同Key的Value值进行累加,计算每个品类中不同行为出现的总次数,这里需要对输出结果的数据格式进行转换处理,将行为类型和品类ID作为Key,值1作为Value。 

统计各个品类的查看、加入购物车和购买次数。

        将聚合结果进行过滤处理,并分为三部分数据,分别是各个品类查看次数、各个品类加入购物车次数和各个品类购买次数。对过滤后的三部分数据进行转换处理,去除数据中的行为类型字段。此步目的是为了后续合并操作时,明确同一品类中不同行为类型所处位置。

        将Key值相同的Value进行合并处理,目的是为了将相同品类的查看次数、加入购物车次数和购买次数合并到一行。

         对每个品类中查看次数(viewcount)、加入购物车次数(cartcount)和购买次数(purchasecount)进行排序处理,在排序过程会涉及三类值的排序,因此这里需要使用Spark的二次排序,在排序操作时使用自定义排序的方式进行处理。

  本项目在Windows环境下通过IntelliJ IDEA工具构建Maven项目实现,需要提前在Windows环境下安装JDK1.8环境。

创建Maven项目

打开IntelliJ IDEA开发工具进入IntelliJ IDEA欢迎界面。

        在IntelliJ IDEA欢迎界面单击下拉框“Configure”,依次选择“Project Defaults”→“Project Structure”选项,配置项目使用的JDK。

配置Maven项目的组织名(GroupId)和项目工程名(ArtifactId)。 

配置项目名称(Project name)和项目本地的存放目录(Project location)。

 Maven项目创建完成后的目录结构。

导入依赖

        在项目pom.xml文件中添加如下配置内容: 对项目中Netty依赖进行多版本管理,避免本地运行出现多个版本的Netty导致程序出现NoSuchMethodError异常。 引入JSON依赖,用于解析JSON数据。 引入Hbase依赖,用于操作Hbase数据库。 引入Spark依赖,用于开发Spark数据分析程序。 指定Maven编译的JDK版本。 配置程序打包方式并指定程序主类。

创建项目目录

在项目SparkProject中新建Package包。

        在“New Package”窗口的文本输入框“Enter new package name”中输入“cn.itcast.top10”设置Package名称,用于存放实现热门品类Top10分析的类文件。

在Package包“cn.itcast.top10”新建类。

        在“Create New Class”窗口的文本输入框“Name”中输入“CategoryTop10”设置类名称,在类中实现热门品类Top10分析。

        在类CategoryTop10中定义main()方法,该方法是Java程序执行的入口,在main()方法中实现Spark Core程序。

public class CategoryTop10 {     public static void main(String[] arg){     //实现热门品类Top10分析     } }

        在main()方法中,创建JavaSparkContext和SparkConf对象,JavaSparkContext对象用于实现Spark程序,SparkConf对象用于配置Spark程序相关参数。

SparkConf conf = new SparkConf(); //设置Application名称为top3_area_product conf.setAppName("top10_category"); JavaSparkContext sc = new JavaSparkContext(conf);

         在main()方法中,调用JavaSparkContext对象的textFile()方法读取外部文件,将文件中的数据加载到textFileRDD。

JavaRDD<String> textFileRDD = sc.textFile(arg[0]);

        在main()方法中,使用mapToPair()算子转换textFileRDD的每一行数据,用于获取每一行数据中的行为类型和品类ID数据,将转换结果加载到transProductRDD。

JavaPairRDD<Tuple2<String,String>,Integer> transformRDD =     textFileRDD.mapToPair(new PairFunction<String,Tuple2<String, String>, Integer>() {     @Override     public Tuple2<Tuple2<String, String>, Integer> call(String s) throws Exception {         JSonObject json = JSONObject.parseObject(s);         String category_id = json.getString("category_id");         String event_type = json.getString("event_type");         return new Tuple2<>(new Tuple2<>(category_id,event_type), new Integer(1));     } });

        在main()方法中,使用reduceByKey()算子对transformRDD进行聚合操作,用于统计每个品类中商品被查看、加入购物车和购买的次数,将统计结果加载到aggregationRDD。

JavaPairRDD<Tuple2<String, String>, Integer> aggregationRDD =         transformRDD.reduceByKey(                 new Function2<Integer, Integer, Integer>() {     @Override     public Integer call(Integer integer1, Integer integer2)             throws Exception {         return integer1 + integer2;     } });

在main()方法中,首先使用filter()算子过滤aggregationRDD每一行数据中行为类型为加入购物车和购买的数据,只保留行为类型为查看的数据,然后使用mapToPair()算子对过滤结果进行转换,获取每一行数据中品类被查看次数和品类ID数据,最终将转换结果加载到getViewCategoryRDD。

JavaPairRDD<String,Integer> getViewCategoryRDD =aggregationRDD .filter(new Function<Tuple2<Tuple2<String, String>, Integer>, Boolean>() {         @Override         public Boolean call(Tuple2<Tuple2<String, String>, Integer> tuple2) throws Exception {             String action = tuple2._1._2;             return action.equals("view");         }     }).mapToPair(new PairFunction<Tuple2<Tuple2<String, String>, Integer>, String, Integer>() {                 @Override                 public Tuple2<String, Integer> call(Tuple2<Tuple2<String, String>, Integer> tuple2)throws Exception {                     return new Tuple2<>(tuple2._1._1,tuple2._2);                 }             });

        在main()方法中,首先使用filter()算子过滤aggregationRDD每一行数据中行为类型为查看和购买的数据,只保留行为类型为加入购物车的数据,然后使用mapToPair()算子对过滤结果进行转换,获取每一行数据中品类被加入购物车次数和品类ID数据,最终将转换结果加载到getCartCategoryRDD。

JavaPairRDD<String,Integer> getCartCategoryRDD = aggregationRDD         .filter(new Function<Tuple2<Tuple2<String, String>, Integer>, Boolean>() {             @Override             public Boolean call(Tuple2<Tuple2<String, String>, Integer> tuple2) throws Exception {                 String action = tuple2._1._2;                 return action.equals("cart");             }         }).mapToPair(new PairFunction<Tuple2<Tuple2<String, String>, Integer>, String, Integer>() {             @Override             public Tuple2<String, Integer> call(Tuple2<Tuple2<String, String>, Integer> tuple2) throws Exception {                 return new Tuple2<>(tuple2._1._1,tuple2._2);             }         });

        在main()方法中,首先使用filter()算子过滤aggregationRDD每一行数据中行为类型为查看和加入购物车的数据,只保留行为类型为购买的数据,然后使用mapToPair()算子对过滤结果进行转换,获取每一行数据中品类被购买次数和品类ID数据,最终将转换结果加载到getPurchaseCategoryRDD。

JavaPairRDD<String,Integer> getPurchaseCategoryRDD = aggregationRDD     .filter(new Function<Tuple2<Tuple2<String, String>, Integer>, Boolean>() {         @Override         public Boolean call(Tuple2<Tuple2<String, String>, Integer> tuple2) throws Exception {             String action = tuple2._1._2;             return action.equals("purchase");         }     }).mapToPair(new PairFunction<Tuple2<Tuple2<String, String>, Integer>, String, Integer>() {         @Override         public Tuple2<String, Integer> call(Tuple2<Tuple2<String, String>, Integer> tuple2) throws Exception {             return new Tuple2<>(tuple2._1._1,tuple2._2);         }     });

        在main()方法中,使用leftOuterJoin(左外连接)算子合并getViewCategoryRDD、getCartCategoryRDD和getPurchaseCategoryRDD,用于合并同一品类的查看次数、加入购物车次数和购买次数,将合并结果加载到joinCategoryRDD。

JavaPairRDD<String,Tuple2<Integer, Optional<Integer>>> tmpJoinCategoryRDD         =getViewCategoryRDD.leftOuterJoin(getCartCategoryRDD); JavaPairRDD<String,Tuple2<Tuple2<Integer, Optional<Integer>>,Optional<Integer>>> joinCategoryRDD         = tmpJoinCategoryRDD.leftOuterJoin(getPurchaseCategoryRDD);

Optional类是一个包含有可选值的包装类,它既可以含有对象也可以为空,主要为了解决空指针异常的问题,因为某些品类中的商品可能被查看但并未被购买或加入购物车。

        在包“cn.itcast.top10”中创建文件CategorySortKey.java,用于实现自定义排序。在类CategorySortKey中继承比较器接口Comparable和序列化接口Serializable,并实现Comparable接口的compareTo()方法。

import java.io.Serializable; public class CategorySortKey implements Comparable<CategorySortKey>,Serializable{      ......     @Override     public int compareTo(CategorySortKey other) {         if(viewCount - other.getViewCount() != 0) {             return (int) (viewCount - other.getViewCount());         } else if(cartCount - other.getCartCount() != 0) {             return (int) (cartCount - other.getCartCount());         } else if(purchaseCount - other.getPurchaseCount() != 0) {             return (int) (purchaseCount - other.getPurchaseCount());         }         return 0;     } }

        在main()方法中,使用mapTopair()算子转换joinCategoryRDD,将joinCategoryRDD中品类被查看次数、加入购物车次数和购买次数映射到自定义排序类CategorySortKey,通过transCategoryRDD加载转换结果。

JavaPairRDD<CategorySortKey,String> transCategoryRDD = joinCategoryRDD .mapToPair(new PairFunction<Tuple2<String, Tuple2<Tuple2<Integer, Optional<Integer>>,Optional<Integer>>>,CategorySortKey,String>() {      @Override      public Tuple2<CategorySortKey,String> call(Tuple2<String, Tuple2<Tuple2<Integer, Optional<Integer>>, Optional<Integer>>> tuple2) throws Exception {                    String category_id = tuple2._1;                    int viewcount = tuple2._2._1._1;                    int cartcount = 0;                    int purchasecount = 0;                    if (tuple2._2._1._2.isPresent()){  cartcount = tuple2._2._1._2.get().intValue();}                    if (tuple2._2._2.isPresent()){ purchasecount = tuple2._2._2.get().intValue(); }                    CategorySortKey sortKey = new CategorySortKey(viewcount, cartcount, purchasecount);                    return new Tuple2<>(sortKey,category_id);                }            });

        在main()方法中,通过sortByKey()算子对transCategoryRDD进行排序操作,使transCategoryRDD中品类被查看次数、加入购物车次数和购买次数根据自定义排序类CategorySortKey指定的排序规则进行排序,将排序结果加载到sortedCategoryRDD。

JavaPairRDD<CategorySortKey,String> sortedCategoryRDD = transCategoryRDD.sortByKey(false);

        在main()方法中,使用take()算子获取sortedCategoryRDD前10个元素,即热门品类Top10分析结果,将分析结果加载到top10CategoryList。

List<Tuple2<CategorySortKey, String>> top10CategoryList = sortedCategoryRDD.take(10);

封装工具类:

(1)在项目SparkProject的 java目录新建Package包“cn.itcast.hbase”,用于存放实现数据持久化的Java文件。在包“cn.itcast.hbase”下创建文件HbaseConnect.java,用于实现封装Hbase数据库连接工具类,在类中实现连接Hbase数据库的操作。

(2)在项目SparkProject的包“cn.itcast.hbase”中创建文件HbaseUtils.java,用于实现封装Hbase数据库操作工具类,在类中实现创建Hbase数据表和向Hbase数据表中插入数据的操作。

持久化热门品类Top10分析结果

        在类CategoryTop10中添加方法top10ToHbase(),用于将热门品类Top10分析结果持久化到Hbase数据库中,该方法包含参数top10CategoryList,表示热门品类Top10分析结果数据。

public static void top10ToHbase(List<Tuple2<CategorySortKey, String>> top10CategoryList) throws Exception{     HbaseUtils.createTable("top10","top10_category");     String[] column = {"category_id","viewcount","cartcount","purchasecount"};     String viewcount = "" , cartcount = "", purchasecount = "", category_id = "";       int count = 0;     for (Tuple2<CategorySortKey, String> top10: top10CategoryList) {         count++;         viewcount = String.valueOf(top10._1.getViewCount());         cartcount = String.valueOf(top10._1.getCartCount());         purchasecount = String.valueOf(top10._1.getPurchaseCount());         category_id = top10._2;         String[] value = {category_id,viewcount,cartcount,purchasecount};         HbaseUtils.putsToHbase("top10","rowkey_top"+count,"top10_category",column,value);     } }

        在类CategoryTop10的main()方法中,调用方法top10ToHbase()并传入参数top10CategoryList,用于在Spark程序中实现top10ToHbase()方法,将热门品类Top10分析结果持久化到Hbase数据库中的数据表top10。

try {     top10ToHbase(top10CategoryList); } catch (Exception e) {     e.printStackTrace(); } HbaseConnect.closeConnection(); sc.close();

        在IntelliJ IDEA中将热门品类Top10分析程序封装成jar包,并上传到集群环境中,通过spark-submit将程序提交到YARN中运行。

封装jar包:

在IntelliJ IDEA主界面单击右侧“Maven”选项卡打开Maven窗口。

        在Maven窗口单击展开Lifecycle折叠框,双击Lifecycle折叠框中的“package”选项,IntelliJ IDEA会自动将程序封装成jar包,封装完成后,若出现“BUILD SUCCESS”内容,则证明成功封装热门品类Top10分析程序为jar包。

        在项目SparkProject中的target目录下会生成SparkProject-1.0-SNAPSHOT.jar文件,为了便于后续与其它程序区分,这里将默认文件名称修改为CategoryTop10.jar。

将jar包上传到集群:

        使用远程连接工具SecureCRT连接虚拟机Spark01,在存放jar文件的目录/export/SparkJar/(该目录需提前创建)下执行“rz”命令,上传热门品类Top10分析程序的jar包CategoryTop10.jar。

将数据集上传到本地文件系统:

        使用远程连接工具SecureCRT连接虚拟机Spark01,在存放数据文件的目录/export/data/SparkData/(该目录需提前创建)下执行“rz”命令,将数据集user_session.txt上传至本地文件系统。

在HDFS创建存放数据集的目录:

        将数据集上传到HDFS前,需要在HDFS的根目录创建目录spark_data,用于存放数据集user_session.txt。

 hdfs dfs -mkdir /spark_data

上传数据集到HDFS:

        将本地文件系统目录/export/data/SparkData/下的数据集user_session.txt上传到HDFS的spark_data目录下。

 hdfs dfs -put /export/data/SparkData/user_session.txt /spark_data

 提交热门品类Top10分析程序到YARN集群:

        通过Spark安装目录中bin目录下的shell脚本文件spark-submit提交热门品类Top10分析程序 到Hadoop集群的YARN运行。

spark-submit

--master yarn

--deploy-mode cluster

--num-executors 3

--executor-memory 2G

--class cn.itcast.top10.CategoryTop10

/export/SparkJar/CategoryTop10.jar /spark_data/user_session.txt

查看程序运行状态:

        程序运行时在控制台会生成“Application ID”(程序运行时的唯一ID,在浏览器输入“192.168.121.132:8088”,进入YARN的Web UI界面,通过对应“Application ID”查看程序的运行状态,当程序运行完成后State为FINISHED,并且FinalStatus为SUCCEES。

查看程序运行结果

在虚拟机Spark01执行“hbase shell”命令,进入Hbase命令行工具。

在Hbase命令行工具中执行“list”命令,查看Hbase数据库中的所有数据表。

> list TAB

test  

top10      

2 row(s) in 0.1810 seconds

在Hbase命令行工具执行“scan 'top10'”命令,查询数据表top10中的数据。

本文地址:http://keair.bhha.com.cn/quote/952.html    康宝晨 http://keair.bhha.com.cn/ , 查看更多

特别提示:本信息由相关用户自行提供,真实性未证实,仅供参考。请谨慎采用,风险自负。


相关最新动态
推荐最新动态
点击排行
网站首页  |  关于我们  |  联系方式  |  使用协议  |  版权隐私  |  网站地图  |  排名推广  |  广告服务  |  积分换礼  |  网站留言  |  RSS订阅  |  违规举报  |  粤ICP备2023022329号