点击领取

400-616-5551

您所在位置: 首页> 学习资讯> 大数据分析入门课程都讲什么?

大数据分析入门课程都讲什么?

发布百知教育 来源:学习资讯 2019-09-24

本文为《大数据分析师入门课程》系列的第8篇,主要讲解大数据分析师必须了解的Spark基础知识,前7篇分别是JAVA基础SCALA基础SQL基础SQL进阶HIVE基础HIVE进阶HDFS&YARN基础


依照惯例,首先,我们就以下三个问题进行简单说明。


  • 为什么讲Spark?

  • 本文的主要目标是什么?

  • 本文的讲解思路是什么?



为什么讲Spark?



随着并行数据分析变得越来越流行,各行各业的使用者们迫切需要更好的数据分析工具,Spark 应运而生。作为MapReduce的继任者,Spark具备以下优势特性。


1.高效性

内存计算下,Spark 比 MapReduce 快100倍。Spark使用最先进的DAG调度程序、查询优化程序和物理执行引擎,实现批量和流式数据的高性能。


2.易用性

Spark支持Java、Python和Scala的API,还支持超过80种高级算法,使用户可以快速构建多样的应用。


3.通用性

Spark提供了统一的解决方案。Spark可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。


这些不同类型的处理都可以在同一个应用中无缝使用。这对于企业应用来说,就可使用一个平台来进行不同的工程实现,减少了人力开发和平台部署成本。


4.兼容性

Spark能够跟很多开源工程兼容使用。如Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,并且Spark可以读取多种数据源,如HDFS、HBase、MySQL等。


对于任何一家已经部署好Hadoop基础集群的企业来说,在不需要进行任何数据迁移和处理的情况下,就可以快速使用上Spark强大的数据处理和计算能力。



本文的主要目标是什么?



通过以上对Spark几大特性的介绍,可以看出Spark可支持开展多种大数据相关工作,比如大数据算法、大数据底层架构开发、数据分析等。


由于本文属于《大数据分析师入门课程》系列,故本文针对Spark的内容讲解主要是围绕数据分析工作中经常涉及到的核心概念和常用的操作等相关知识点。


学习完本文,你将会对如何使用Spark完成数据分析工作有一个更深入的理解。



本文的讲解思路是什么?



第1部分,主要讲解Spark相比MapReduce的优势。


第2部分,主要跟大家一起来看下Spark生态系统具体包括哪些组成部分。


第3部分,针对Spark的三种主要数据组织类型进行一一介绍、对其三者之间的异同点进行总结以及给出三者之间转化的方式。


第4部分,共享变量,包括广播变量和累加器。


第5部分,主要介绍Spark中使用YARN进行资源管理时,任务的提交流程和方式。

 

话不多说,让我们直接开始吧!


SPARK与MAPREDUCE比较


MapReduce作为Hadoop的两大重要组成部分之一,相信绝大部分从业者在大数据学习之初,都学习过这个分布式计算框架。


Spark继承了其分布式并行计算的优点,并改善了MapReduce的几个不足,具体表现为以下几个方面。


1.Spark可以把中间结果放在内存中,迭代运算效率高


MapReduce把中间计算结果放在磁盘上,这样势必会影响整体的运行速度,而Spark使用DAG分布式并行计算编程框架,缩短了中间处理环节,提高了处理效率。


2.Spark容错性高


Spark引进了弹性分布式数据集(Resilient Distribued Dataset,RDD)的概念,它是分布在一组节点中的只读对象集合,这些集合是弹性的,如果数据集一部分丢失,可以对它们进行重建。另外,RDD在计算时,可以通过CheckPoint来实现容错。


3.Spark更加易用


Spark提供了丰富的开箱即用的数据集操作算子,使处理和分析数据变得更加简单。


SPARK生态系统


前面对Spark的特性和优势介绍了很多,接下来让我们直接来看看Spark生态系统的全貌吧。


大数据分析


Spark已经发展成为包含众多子项目的大数据计算平台。 


伯克利(Spark的创造者,加州大学伯克利分校)将Spark的整个生态系统称为伯克利数据分析栈(BDAS)。


以SparkCore为核心,能够读取本地文件(如文本文件)、HDFS、Amazon S3、Hbase和Alluxio等数据源,利用Standalone、EC2、MESOS、YARN等资源调度管理,完成应用程序对数据的分析与处理,这些应用程序来自Spark的不同组件,包括Spark Streaming实时流处理应用、SparkSQL的即席查询、采样近似查询引擎BlinkDB的权衡查询、MLBase/Mllib的机器学习、GraphX的图处理和SparkR的数据计算等。


其中,大数据分析工作涉及到的基础知识点有SparkCore中的RDD、SparkSQL(系列课程的下一篇文章将会单独讲解SparkSQL)、如何进行本地运行测试、如何定义和使用广播变量和累计器、如何在YARN上运行等相关知识点。


这里首先简单介绍下SparkCore。


SparkCore是整个BDAS生态系统的核心组件,是一个分布式大数据处理框架,SparkCore提供了多种资源调度管理,通过内存计算、有向无环图(DAG)等机制保证分布式计算的快速,并引入了RDD的抽象保证了数据的高容错性。

 

接下来开始针对数据分析工作中涉及到的Spark相关知识点进行逐一讲解。



03

RDD&DataFrame&DataSet



Spark提供了三种数据集的抽象概念,RDD、DataFrame和DataSet,分别对应了同名的三个对象和三组不同的API,接下来我们来进行逐个讲解,并举例说明其通常的用法。


  • RDD


RDD全称Resilient Distributed Dataset,弹性分布式数据集,它是记录的只读分区集合,是Spark的基本数据结构,见名释义:


弹性,表现在两个方面,一是当计算过程中内存不足时可刷写到磁盘等外存上,可与外存做灵活的数据交换;二是RDD使用了一种“血统”的容错机制,在结构更新和丢失后可随时根据血统进行数据模型的重建;


分布式,可分布在多台机器上进行并行计算;


数据集,一组只读的、可分区的分布式数据集合,集合内包含了多个分区,分区依照特定规则将具有相同属性的数据记录放在一起,每个分区相当于一个数据集片段。

 

在实际数据分析工作中,怎么样创建一个RDD呢?


创建方式一般有以下两种:



val num = Array(1,2,3,4,5)

val rdd = sc.parallelize(num)

// 或者

val rdd = sc.makeRDD(num)


2.使用本地文件或HDFS创建RDD,RDD的数据源是本地文件系统或HDFS的数据,使用 textFile 方法创建RDD。


val rdd = sc.textFile("hdfs://hans/data_warehouse/test/data")


RDD支持两种类型的操作:转换(Transformation)和行动(Action)。


转换操作是从已经存在的数据集中创建一个新的数据集,而行动操作是在数据集上进行计算后返回结果到 Driver。


转换操作都具有 Lazy 特性,即 Spark 不会立刻进行实际的计算,只会记录执行的轨迹,只有触发行动操作的时候,它才会根据 DAG 图真正执行。


数据分析工作中常用的转换与行动操作如下图所示:


转换算子

含义

map(func)

返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成

filter(func)

返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成

flatMap(func)

类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)

mapPartitions(func)

类似于map,但独立地在RDD的每一个分片上运行

mapPartitionsWithIndex(func)

类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是

(Int, Interator[T]) => Iterator[U]

sample(withReplacement, fraction, seed)

根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子

union(otherDataset)

对源RDD和参数RDD求并集后返回一个新的RDD

intersection(otherDataset)

对源RDD和参数RDD求交集后返回一个新的RDD

distinct([numTasks]))

对源RDD进行去重后返回一个新的RDD

groupByKey([numTasks])

在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD

reduceByKey(func, [numTasks])

在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置

aggregateByKey(zeroValue)(seqOp, combOp,  [numTasks])

先按分区聚合 再总的聚合   每次要跟初始值交流 例如:aggregateByKey(0)(_+_ , _+_)  对(K,V)的RDD进行操作

sortByKey([ascending], [numTasks])

在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD

sortBy(func,[ascending], [numTasks])

与sortByKey类似,但是更灵活  第一个参数是根据什么排序,第二个是怎么排序,false表示倒序,第三个参数表示排序后分区数 ,默认与原RDD一样

join(otherDataset, [numTasks])

在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD,相当于内连接(求交集)

cogroup(otherDataset, [numTasks])

在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD

cartesian(otherDataset)

两个RDD的笛卡尔积

pipe(command, [envVars])

调用外部程序

coalesce(numPartitions)   

重新分区,第一个参数是要分多少区,第二个参数是否shuffle,默认false

repartition(numPartitions)

重新分区,必须shuffle,参数表示是要分多少区

repartitionAndSortWithinPartitions(partitioner)

重新分区+排序,比先分区再排序效率高,对(K,V)的RDD进行操作

foldByKey(zeroValue)(seqOp)

该函数用于(K,V)做折叠,合并处理  ,与aggregate类似   第一个括号的参数应用于每个V值  第二括号函数是聚合例如:_+_

combineByKey

合并相同的key的值,如rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m:  Int, n: Int) => m + n)

partitionBy(partitioner)

对RDD进行分区,partitioner是分区器

cache

RDD缓存,可以避免重复计算从而减少时间,区别:cache内部调用了persist算子,cache默认就一个缓存级别MEMORY-ONLY,而persist则可以选择不同缓存级别

persist

Subtract(rdd)

返回前RDD元素不在后RDD的RDD

leftOuterJoin

leftOuterJoin类似于SQL中的左外关联left outer join,返回结果以前面的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。

rightOuterJoin

rightOuterJoin类似于SQL中的右外关联right outer join,返回结果以参数中的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可

subtractByKey

substractByKey和基本转换操作中的subtract类似只不过这里是针对K的,返回在主RDD中出现且不在otherRDD中出现的元素


动作算子

含义

reduce(func)

通过func函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的

collect()

在驱动程序中,以数组的形式返回数据集的所有元素

count()

返回RDD的元素个数

first()

返回RDD的第一个元素(类似于take(1))

take(n)

返回一个由数据集的前n个元素组成的数组

takeSample(withReplacement,num, [seed])

返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子

saveAsTextFile(path)

将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本

saveAsSequenceFile(path) 

将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以是HDFS或者其他Hadoop支持的文件系统

saveAsObjectFile(path) 

先将RDD转为数组,然后序列化,然后将结果变换为(null,byteWritable)

countByKey()

针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数

foreach(func)

在数据集的每一个元素上,运行函数func进行更新

aggregate

先对分区进行操作,再总体操作


最后通过一段代码示例来看看具体怎么使用这些算子,这里会演示部分算子的使用情况,读者朋友们可以动手实际操作一下,以加强对以上所有算子的学习和了解。


import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

 

object SparkWordCountWithScala {

  def main(args: Array[String]): Unit = {

 

    val conf = new SparkConf()

    /**

      * 如果这个参数不设置,默认认为你运行的是集群模式

      * 如果设置成local代表运行的是local模式

      */

   conf.setMaster("local")

    //设置任务名

   conf.setAppName("WordCount")

    //创建SparkCore的程序入口

    val sc = new SparkContext(conf)

    //读取文件生成RDD

    val file: RDD[String] = sc.textFile("E:\\hello.txt")

    //把每一行数据按照,分割

    val word: RDD[String] = file.flatMap(_.split(","))

    //让每一个单词都出现一次

    val wordOne: RDD[(String, Int)] = word.map((_,1))

    //单词计数

    val wordCount: RDD[(String, Int)] = wordOne.reduceByKey(_+_)

    //按照单词出现的次数降序排序

    val sortRdd: RDD[(String, Int)] = wordCount.sortBy(tuple => tuple._2,false)

    //将最终的结果进行保存

    sortRdd.saveAsTextFile("E:\\result")

 

    sc.stop()

  }

}

示例代码中,将运行模式设置为local模式,就可以在本地对代码进行调试。


  • DataFrame&DataSet


理解了RDD,DataFrame理解起来就比较容易了,DataFrame的思想来源于Python的pandas库,RDD是一个数据集,DataFrame在RDD的基础上加了Schema(描述数据的信息,可以认为是元数据,DataFrame曾经就有个名字叫SchemaRDD)。


假设RDD中的两行数据长这样:




那么DataFrame中的数据长这样:


从上面两个图可以看出,DataFrame比RDD多了一个表头信息(Schema),像一张表了。



DataFrame还配套了新的操作数据的方法,DataFrame API(如df.select())和SQL(select id, name from xx_table where ...),让我们通过代码示例来了解下DataFrame的常见操作。


import org.apache.spark.sql.SparkSession

 val spark = SparkSession

   .builder()

   .appName("Spark SQL basicexample")

   .getOrCreate()

 //引入Spark的隐式类型转换,如将RDD转换成DataFrame

 import spark.implicits._

 val df = spark.read.json("/data/tmp/SparkSQL/people.json")

 df.show() //打印DataFrame的内容

//+---+-------+

//|age|   name|

//+---+-------+

//|   |Michael|

//| 19|   Andy|

//| 30| Justin|

//+---+-------+

df.printSchema()  //打印出DataFrame的表结构

//root

// |-- age: string (nullable = true)

// |-- name: string (nullable = true)

df.select("name").show()

//等同于select name fromDataFrame的SQL语句

df.select($"name",$"age" + 1).show()

//等同于select name,age+1 from DataFrame的SQL语句

//此处注意,如果对列进行操作,所有列名前都必须加上$符号

df.filter($"age" > 21).show()

//等同于select * fromDataFrame where age>21 的SQL语句

df.groupBy("age").count().show()

//等同于select age,count(age)from DataFrame group by age;

//同时也可以直接写SQL进行DataFrame数据的分析

df.createOrReplaceTempView("people")

val sqlDF = spark.sql("selectage,count(age) from people group by age")

sqlDF.show()


DataSet是DataFrame API的扩展。相较于RDD来说,DataSet提供了强类型支持,区别也是给RDD的每行数据加了类型约束。


假设RDD中的两行数据长这样:


那么DataSet中的数据长这样:


或者长这样(每行数据是个Object):


引入DataSet有两个重要原因:


1.对于分布式程序来讲,提交一次作业太费劲了(要编译、打包、上传、运行),如果提交到集群运行时才发现错误,实在是浪费时间,DataSet相比DataFrame来说,它提供了编译时类型检查。


示例如下:

代码中json文件中并没有score字段,但是用DataFrame能编译通过,运行时才报异常。


32.png


而使用DataSet实现,会在代码编写时就报错,出错提前到了编译之前。


33.png


2.引入DataSet另一个重要原因是RDD转换DataFrame后不可逆,但RDD转换DataSet是可逆的。


示例如下:

通过RDD创建DataFrame,再通过DataFrame转换成RDD,发现RDD的类型变成了Row类型。


34.png


通过RDD创建DataSet,再通过DataSet转换为RDD,发现RDD还是原始类型。


35.png

 

因为DataSet吸收了RDD和DataFrame的优点,实际工作使用中,所以可以像操作RDD和DataFrame一样来操作DataSet,看下边一个简单的例子。


因为DataSet吸收了RDD和DataFrame的优点,实际工作使用中,所以可以像操作RDD和DataFrame一样来操作DataSet


参考文献


[1]《Spark之深入理解RDD结构》作者 TccccD


[2]《Spark RDD详解》作者 通凡


[3]《Spark快速大数据分析》作者 [美] Holden Karau  [美] Andy Konwinski [美] Patrick Wendell  [加] Matei Zaharia

[4]《Spark on YARN 的两种模式》作者  Python之简


[5] 《图解Spark核心技术与案例实战》作者 郭景瞻

[6] Spark官网


文章转载自公众号  大数据与人工智能 , 作者 HappyMint


上一篇:Java数据结构 | 新手教程,建议收藏!

下一篇:想去IT教育培训机构学习,先了解这些

相关推荐