本文共 5827 字,大约阅读时间需要 19 分钟。
TopN问题:上星期访问次数最多的10个URL是哪些?所有猫中体重最大的10只猫是哪些?
本文使用 MapReduce/Hadoop的TopN解决方案,假设所有输入键都是唯一的。也就是说,对于一个给定的输入集合{<K,V>},所有K都是唯一的。
例如对于下面的猫,cat1不会再出现第二次
输入:
top10data.txt
cat1,12cat2,13cat3,14cat4,15cat5,10cat100,100cat200,200cat300,300cat1001,1001cat67,67cat22,22cat23,23cat1000,1000cat2000,2000cat400,400cat500,500
top10data2.txt:
cat8,8cat9,9cat7,7cat6,6cat450,450cat350,350cat150,150cat3000,3000cat1500,1500cat1601,1601cat201,201cat222,222Spark解决方案:使用mapPartitions()函数对每一个分区求top10(),最后使用collect()创建最终的top10列表
键唯一的情况:
//导入必要的类import java.util.{SortedMap, TreeMap}import akka.io.Udp.SO.Broadcastimport org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}object TopN_1 { def main(args: Array[String]): Unit = { //连接SparkMaster val conf = new SparkConf().setAppName("Chenjie's first spark App").setMaster("local") val sc = new SparkContext(conf) //从HDFS中读取输入文件并创建RDD val lines = sc.textFile("hdfs://pc1:9000/input/top10data*.txt") lines.foreach(println)//输出观察 val broadcastTopN = sc.broadcast[Integer](10)//指定TopN的N值 val broadcastDirection = sc.broadcast("top")//指定取最大的还是最小的,top/bottom val pairs = lines.map { line => val tokens = line.split(",") (tokens(0), tokens(1).toInt) }//将每一行按照猫名和体重分开,并映射为以猫名为key,体重为value的键值对 val partitions:RDD[SortedMap[Integer, String]] = pairs.mapPartitions {iter => //对于每一个分区应用map println("-----------------------开始处理一个分区---------------------------") import scala.collection.mutable.ArrayBuffer val arrayBuffer = ArrayBuffer[SortedMap[Integer, String]]() var top10: SortedMap[Integer, String] = new TreeMap[Integer, String]()//为各输入分区创建一个本地top10列表 while (iter.hasNext) { val tuple = iter.next top10.put(tuple._2, tuple._1) if (top10.size() > broadcastTopN.value) { if(broadcastDirection.value.equals("top")) top10.remove(top10.firstKey()) else top10.remove(top10.lastKey()) } } arrayBuffer += (top10) println("这个分区的top10如下:") arrayBuffer.foreach(println) arrayBuffer.iterator } var finaltop10: SortedMap[Integer, String] = new TreeMap[Integer, String]() val alltop10:Array[SortedMap[Integer, String]] = partitions.collect()//使用collect()创建最终的top10列表 for(localtop10:SortedMap[Integer, String] <- alltop10){ localtop10.entrySet().toArray.foreach{entry=> finaltop10.put(entry.toString.split("=")(0).toInt,entry.toString.split("=")(1)) if(finaltop10.size() > 10){ finaltop10.remove(finaltop10.firstKey()) } } } println("总的top10如下:") finaltop10.entrySet().toArray().foreach(println) }}
键不唯一的情况:仅仅只需要对键值对集合运用reduceByKey操作即可
//导入必要的类import java.util.{SortedMap, TreeMap}import akka.io.Udp.SO.Broadcastimport org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}object TopN_1 { def main(args: Array[String]): Unit = { //连接SparkMaster val conf = new SparkConf().setAppName("Chenjie's first spark App").setMaster("local") val sc = new SparkContext(conf) //从HDFS中读取输入文件并创建RDD val lines = sc.textFile("hdfs://pc1:9000/input/top10data*.txt") lines.foreach(println)//输出观察 val broadcastTopN = sc.broadcast[Integer](10)//指定TopN的N值 val broadcastDirection = sc.broadcast("top")//指定取最大的还是最小的,top/bottom val pairs = lines.map { line => val tokens = line.split(",") (tokens(0), tokens(1).toInt) }//将每一行按照猫名和体重分开,并映射为以猫名为key,体重为value的键值对 val partitions:RDD[SortedMap[Integer, String]] = pairs.reduceByKey(_+_).mapPartitions {iter => //对于每一个分区应用map println("-----------------------开始处理一个分区---------------------------") import scala.collection.mutable.ArrayBuffer val arrayBuffer = ArrayBuffer[SortedMap[Integer, String]]() var top10: SortedMap[Integer, String] = new TreeMap[Integer, String]()//为各输入分区创建一个本地top10列表 while (iter.hasNext) { val tuple = iter.next top10.put(tuple._2, tuple._1) if (top10.size() > broadcastTopN.value) { if(broadcastDirection.value.equals("top")) top10.remove(top10.firstKey()) else top10.remove(top10.lastKey()) } } arrayBuffer += (top10) println("这个分区的top10如下:") arrayBuffer.foreach(println) arrayBuffer.iterator } var finaltop10: SortedMap[Integer, String] = new TreeMap[Integer, String]() val alltop10:Array[SortedMap[Integer, String]] = partitions.collect()//使用collect()创建最终的top10列表 for(localtop10:SortedMap[Integer, String] <- alltop10){ localtop10.entrySet().toArray.foreach{entry=> finaltop10.put(entry.toString.split("=")(0).toInt,entry.toString.split("=")(1)) if(finaltop10.size() > 10){ finaltop10.remove(finaltop10.firstKey()) } } } println("总的top10如下:") finaltop10.entrySet().toArray().foreach(println) }}
此外,还可以用takeOrdered()方法实现
//导入必要的类import java.util.{SortedMap, TreeMap}import akka.io.Udp.SO.Broadcastimport org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}object TopN_1 { def main(args: Array[String]): Unit = { //连接SparkMaster val conf = new SparkConf().setAppName("Chenjie's first spark App").setMaster("local") val sc = new SparkContext(conf) //从HDFS中读取输入文件并创建RDD val lines = sc.textFile("hdfs://pc1:9000/input/top10data*.txt") lines.foreach(println) //输出观察 val broadcastTopN = sc.broadcast[Integer](10) //指定TopN的N值 val broadcastDirection = sc.broadcast("top") //指定取最大的还是最小的,top/bottom val pairs = lines.map { line => val tokens = line.split(",") (tokens(0), tokens(1).toInt) } //将每一行按照猫名和体重分开,并映射为以猫名为key,体重为value的键值对 pairs.reduceByKey(_ + _).takeOrdered(10)(new Ordering[(String,Int)]{ override def compare(x: (String, Int), y: (String, Int)): Int = - x._2.compareTo(y._2) }).foreach(println) }}
转载地址:http://ckqrb.baihongyu.com/