good - Spark会把数据都载入到内存么?
1. Spark Functions
1.1 count, first
1 2 3 4 5 6 lines.= sc.textFile("file:///home/blair/../input.txt" ) lines.count() lines.first() lines.take(3 )
1.2 filter
1 2 3 4 5 6 7 pythonLines = lines.filter (lambda line: "Python" in line) def hasPython (line ): return "Python" in line pythonLines = lines.filter (hasPython)
1.3 sc init
1 2 3 4 5 6 7 from pyspark import SparkConf, SparkContextsc.stop()
1.4 avg,
1 2 3 4 5 6 7 8 9 10 data = [1 ,5 ,7 ,10 ,23 ,20 ,6 ,5 ,10 ,7 ,10 ] rdd_data = sc.parallelize(data) s = rdd_data.reduce(lambda x,y: x+y+0.0 ) n = rdd_data.count() avg = s/n print ("average:" ,avg)
1.5 求众数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 data = [1 ,5 ,7 ,10 ,23 ,20 ,7 ,5 ,10 ,7 ,10 ] rdd_data = sc.parallelize(data) rdd_count = rdd_data.map (lambda x:(x,1 )).reduceByKey(lambda x,y:x+y) max_count = rdd_count.map (lambda x:x[1 ]).reduce(lambda x,y: x if x>=y else y) print (max_count)rdd_mode = rdd_count.filter (lambda x:x[1 ]==max_count).map (lambda x:x[0 ]) mode = rdd_mode.reduce(lambda x,y:x+y+0.0 )/rdd_mode.count() print ("mode:" ,mode)
1.6 TopN
1 2 3 4 5 6 7 8 9 students = [("LiLei" ,18 ,87 ),("HanMeiMei" ,16 ,77 ),("DaChui" ,16 ,66 ),("Jim" ,18 ,77 ),("RuHua" ,18 ,50 )] n = 3 rdd_students = sc.parallelize(students) rdd_sorted = rdd_students.sortBy(lambda x:x[2 ],ascending = False ) students_topn = rdd_sorted.take(n) print (students_topn)
[(‘LiLei’, 18, 87), (‘HanMeiMei’, 16, 77), (‘Jim’, 18, 77)]
1.7 排序并返回序号
1 2 3 4 5 6 7 8 9 10 11 data = [1 ,7 ,8 ,5 ,3 ,18 ,34 ,9 ,0 ,12 ,8 ] rdd_data = sc.parallelize(data) rdd_sorted = rdd_data.map (lambda x:(x,1 )).sortByKey().map (lambda x:x[0 ]) rdd_sorted_index = rdd_sorted.zipWithIndex() print (rdd_sorted_index.collect())
output:
1 2 3 4 5 6 7 8 9 10 11 [(0 , 0 ), (1 , 1 ), (3 , 2 ), (5 , 3 ), (7 , 4 ), (8 , 5 ), (8 , 6 ), (9 , 7 ), (12 , 8 ), (18 , 9 ), (34 , 10 )]
1.8 二次排序
1 2 3 4 5 #任务:有一批学生信息表格,包括name,age,score #首先根据学生的score从大到小排序,如果score相同,根据age从大到小 students = [("LiLei",18,87),("HanMeiMei",16,77),("DaChui",16,66),("Jim",18,77),("RuHua",18,50)] rdd_students = sc.parallelize(students)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 %%writefile student.py class Student : def __init__ (self,name,age,score ): self.name = name self.age = age self.score = score def __gt__ (self,other ): if self.score > other.score: return True elif self.score==other.score and self.age>other.age: return True else : return False
1 2 3 4 5 6 7 8 9 10 11 from student import Studentrdd_sorted = rdd_students \ .map (lambda t:Student(t[0 ],t[1 ],t[2 ]))\ .sortBy(lambda x:x,ascending = False )\ .map (lambda student:(student.name,student.age,student.score)) rdd_sorted.collect()
1 2 3 4 5 [('LiLei', 18, 87), ('Jim', 18, 77), ('HanMeiMei', 16, 77), ('DaChui', 16, 66), ('RuHua', 18, 50)]
1.9 连接操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 classes = [("class1" ,"LiLei" ), ("class1" ,"HanMeiMei" ),("class2" ,"DaChui" ),("class2" ,"RuHua" )] scores = [("LiLei" ,76 ),("HanMeiMei" ,80 ),("DaChui" ,70 ),("RuHua" ,60 )] rdd_classes = sc.parallelize(classes).map (lambda x:(x[1 ],x[0 ])) rdd_scores = sc.parallelize(scores) rdd_join = rdd_scores.join(rdd_classes).map (lambda t:(t[1 ][1 ],t[1 ][0 ])) def average (iterator ): data = list (iterator) s = 0.0 for x in data: s = s + x return s/len (data) rdd_result = rdd_join.groupByKey().map (lambda t:(t[0 ],average(t[1 ]))).filter (lambda t:t[1 ]>75 ) print (rdd_result.collect())
[(‘class1’, 78.0)]
1.10 分组求众数
1 2 3 4 5 6 students = [ ("class1" ,15 ),("class1" ,15 ),("class2" ,16 ), ("class2" ,16 ),("class1" ,17 ),("class2" ,19 ) ]
run
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def mode (arr ): dict_cnt = {} for x in arr: dict_cnt[x] = dict_cnt.get(x,0 )+1 max_cnt = max (dict_cnt.values()) most_values = [k for k,v in dict_cnt.items() if v==max_cnt] s = 0.0 for x in most_values: s = s + x return s/len (most_values) rdd_students = sc.parallelize(students) rdd_classes = rdd_students.aggregateByKey([],lambda arr,x:arr+[x],lambda arr1,arr2:arr1+arr2) rdd_mode = rdd_classes.map (lambda t:(t[0 ],mode(t[1 ]))) print (rdd_mode.collect())
[(‘class1’, 15.0), (‘class2’, 16.0)]
Reference
Checking if Disqus is accessible...