博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark的TopN解决方案(键唯一的情况、键不唯一的情况)
阅读量:2491 次
发布时间:2019-05-11

本文共 5827 字,大约阅读时间需要 19 分钟。

TopN问题:上星期访问次数最多的10个URL是哪些?所有猫中体重最大的10只猫是哪些?


本文使用 MapReduce/Hadoop的TopN解决方案,假设所有输入键都是唯一的。也就是说,对于一个给定的输入集合{<K,V>},所有K都是唯一的。

例如对于下面的猫,cat1不会再出现第二次

输入:

top10data.txt

cat1,12cat2,13cat3,14cat4,15cat5,10cat100,100cat200,200cat300,300cat1001,1001cat67,67cat22,22cat23,23cat1000,1000cat2000,2000cat400,400cat500,500

top10data2.txt:

cat8,8cat9,9cat7,7cat6,6cat450,450cat350,350cat150,150cat3000,3000cat1500,1500cat1601,1601cat201,201cat222,222
Spark解决方案:使用mapPartitions()函数对每一个分区求top10(),最后使用collect()创建最终的top10列表

键唯一的情况:

//导入必要的类import java.util.{SortedMap, TreeMap}import akka.io.Udp.SO.Broadcastimport org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}object TopN_1 {  def main(args: Array[String]): Unit = {    //连接SparkMaster    val conf = new SparkConf().setAppName("Chenjie's first spark App").setMaster("local")    val sc = new SparkContext(conf)    //从HDFS中读取输入文件并创建RDD    val lines = sc.textFile("hdfs://pc1:9000/input/top10data*.txt")    lines.foreach(println)//输出观察    val broadcastTopN = sc.broadcast[Integer](10)//指定TopN的N值    val broadcastDirection = sc.broadcast("top")//指定取最大的还是最小的,top/bottom    val pairs = lines.map { line =>      val tokens = line.split(",")      (tokens(0), tokens(1).toInt)    }//将每一行按照猫名和体重分开,并映射为以猫名为key,体重为value的键值对    val partitions:RDD[SortedMap[Integer, String]] = pairs.mapPartitions {iter =>      //对于每一个分区应用map      println("-----------------------开始处理一个分区---------------------------")      import scala.collection.mutable.ArrayBuffer      val arrayBuffer = ArrayBuffer[SortedMap[Integer, String]]()      var top10: SortedMap[Integer, String] = new TreeMap[Integer, String]()//为各输入分区创建一个本地top10列表      while (iter.hasNext) {        val tuple = iter.next        top10.put(tuple._2, tuple._1)        if (top10.size() > broadcastTopN.value) {          if(broadcastDirection.value.equals("top"))             top10.remove(top10.firstKey())          else            top10.remove(top10.lastKey())        }      }      arrayBuffer += (top10)      println("这个分区的top10如下:")      arrayBuffer.foreach(println)      arrayBuffer.iterator    }    var finaltop10: SortedMap[Integer, String] = new TreeMap[Integer, String]()    val alltop10:Array[SortedMap[Integer, String]] = partitions.collect()//使用collect()创建最终的top10列表    for(localtop10:SortedMap[Integer, String] <- alltop10){      localtop10.entrySet().toArray.foreach{entry=>        finaltop10.put(entry.toString.split("=")(0).toInt,entry.toString.split("=")(1))        if(finaltop10.size() > 10){          finaltop10.remove(finaltop10.firstKey())        }      }    }    println("总的top10如下:")    finaltop10.entrySet().toArray().foreach(println)  }}

键不唯一的情况:仅仅只需要对键值对集合运用reduceByKey操作即可

//导入必要的类import java.util.{SortedMap, TreeMap}import akka.io.Udp.SO.Broadcastimport org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}object TopN_1 {  def main(args: Array[String]): Unit = {    //连接SparkMaster    val conf = new SparkConf().setAppName("Chenjie's first spark App").setMaster("local")    val sc = new SparkContext(conf)    //从HDFS中读取输入文件并创建RDD    val lines = sc.textFile("hdfs://pc1:9000/input/top10data*.txt")    lines.foreach(println)//输出观察    val broadcastTopN = sc.broadcast[Integer](10)//指定TopN的N值    val broadcastDirection = sc.broadcast("top")//指定取最大的还是最小的,top/bottom    val pairs = lines.map { line =>      val tokens = line.split(",")      (tokens(0), tokens(1).toInt)    }//将每一行按照猫名和体重分开,并映射为以猫名为key,体重为value的键值对    val partitions:RDD[SortedMap[Integer, String]] =  pairs.reduceByKey(_+_).mapPartitions {iter =>      //对于每一个分区应用map      println("-----------------------开始处理一个分区---------------------------")      import scala.collection.mutable.ArrayBuffer      val arrayBuffer = ArrayBuffer[SortedMap[Integer, String]]()      var top10: SortedMap[Integer, String] = new TreeMap[Integer, String]()//为各输入分区创建一个本地top10列表      while (iter.hasNext) {        val tuple = iter.next        top10.put(tuple._2, tuple._1)        if (top10.size() > broadcastTopN.value) {          if(broadcastDirection.value.equals("top"))            top10.remove(top10.firstKey())          else            top10.remove(top10.lastKey())        }      }      arrayBuffer += (top10)      println("这个分区的top10如下:")      arrayBuffer.foreach(println)      arrayBuffer.iterator    }    var finaltop10: SortedMap[Integer, String] = new TreeMap[Integer, String]()    val alltop10:Array[SortedMap[Integer, String]] = partitions.collect()//使用collect()创建最终的top10列表    for(localtop10:SortedMap[Integer, String] <- alltop10){      localtop10.entrySet().toArray.foreach{entry=>        finaltop10.put(entry.toString.split("=")(0).toInt,entry.toString.split("=")(1))        if(finaltop10.size() > 10){          finaltop10.remove(finaltop10.firstKey())        }      }    }    println("总的top10如下:")    finaltop10.entrySet().toArray().foreach(println)  }}

此外,还可以用takeOrdered()方法实现

//导入必要的类import java.util.{SortedMap, TreeMap}import akka.io.Udp.SO.Broadcastimport org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}object TopN_1 {  def main(args: Array[String]): Unit = {    //连接SparkMaster    val conf = new SparkConf().setAppName("Chenjie's first spark App").setMaster("local")    val sc = new SparkContext(conf)    //从HDFS中读取输入文件并创建RDD    val lines = sc.textFile("hdfs://pc1:9000/input/top10data*.txt")    lines.foreach(println) //输出观察    val broadcastTopN = sc.broadcast[Integer](10)    //指定TopN的N值    val broadcastDirection = sc.broadcast("top") //指定取最大的还是最小的,top/bottom    val pairs = lines.map { line =>      val tokens = line.split(",")      (tokens(0), tokens(1).toInt)    } //将每一行按照猫名和体重分开,并映射为以猫名为key,体重为value的键值对    pairs.reduceByKey(_ + _).takeOrdered(10)(new Ordering[(String,Int)]{      override def compare(x: (String, Int), y: (String, Int)): Int = - x._2.compareTo(y._2)    }).foreach(println)  }}

转载地址:http://ckqrb.baihongyu.com/

你可能感兴趣的文章
关于PCB设计中过孔能否打在焊盘上的两种观点
查看>>
PCB反推理念
查看>>
京东技术架构(一)构建亿级前端读服务
查看>>
php 解决json_encode中文UNICODE转码问题
查看>>
LNMP 安装 thinkcmf提示404not found
查看>>
PHP empty、isset、innull的区别
查看>>
apache+nginx 实现动静分离
查看>>
通过Navicat远程连接MySQL配置
查看>>
phpstorm开发工具的设置用法
查看>>
Linux 系统挂载数据盘
查看>>
Git基础(三)--常见错误及解决方案
查看>>
Git(四) - 分支管理
查看>>
PHP Curl发送数据
查看>>
HTTP协议
查看>>
HTTPS
查看>>
git add . git add -u git add -A区别
查看>>
apache下虚拟域名配置
查看>>
session和cookie区别与联系
查看>>
PHP 实现笛卡尔积
查看>>
Laravel中的$loop
查看>>