Spark SQL JSON数据处理
标签:
小编还为您整理了以下内容,可能对您也有帮助:
sparksql可以处理的数据源包括哪些
包括数据文件、Hive表、RDD、外部数据库。SparkSQL是Spark用来处理结构化数据的模块,常用的数据源有:常用的结构化文件如:Json、Parquet、Orc、Avro、TextJdbc相关的数据库Hive表。
什么是spark
spark是一个通用计算框架。
Spark是一个通用计算框架,用于快速处理大规模数据。Spark是一种与Hadoop相似的开源集群计算环境,但Spark在内存中执行任务,比Hadoop更快。Spark支持多种数据源,如CSV、JSON、HDFS、SQL等,并提供了多种高级工具,Spark还提供了分布式计算中的数据共享和缓存机制,使得大规模数据处理变得更加高效和可靠。
Spark支持多种编程语言,如Java、Python、Scala和R语言,并且还提供了超过80种高级算法,使用户可以快速构建不同的应用。同时,Spark还支持交互式的Python和Scala的shell,可以非常方便地在这些shell中使用Spark集群来验证解决问题的方法,可以非常方便地与其他的开源产品进行融合,可以访问各种数据源。
常见的计算机框架:
1、TensorFlow
TensorFlow是一个用于机器学习和深度学习的开源框架,由Google开发。它支持多种编程语言,如Python、C++、Java和Scala等,并提供了丰富的API和工具,使得用户可以轻松地构建和训练各种机器学习模型。TensorFlow还具有高度的可扩展性,可以处理大规模的数据集和模型,并且可以在不同的硬件平台上运行。
2、PyTorch
PyTorch是一个用于机器学习和深度学习的开源框架,由开发。它基于动态图模式,使得模型的构建和调试非常容易。PyTorch还提供了强大的GPU加速功能,可以在短时间内对大规模数据集进行训练。
3、Apache Spark
Apache Spark是一个用于大数据处理的快速、通用和容错的开源框架,由Apache软件基金会开发。它支持多种编程语言,如Java、Python、Scala和R等,并提供了多种高级工具和算法,如Spark SQL、Spark Streaming、Spark GraphX等。此外,Spark还提供了分布式计算中的数据共享和缓存机制,使得大规模数据处理变得更加高效和可靠。
如何使用 Spark SQL
一、启动方法
/data/spark-1.4.0-bin-cdh4/bin/spark-sql --master spark://master:7077 --total-executor-cores 10 --executor-memory 1g --executor-cores 2
注:/data/spark-1.4.0-bin-cdh4/为spark的安装路径
/data/spark-1.4.0-bin-cdh4/bin/spark-sql –help 查看启动选项
--master MASTER_URL 指定master url
--executor-memory MEM 每个executor的内存,默认为1G
--total-executor-cores NUM 所有executor的总核数
-e <quoted-query-string> 直接执行查询SQL
-f <filename> 以文件方式批量执行SQL
二、Spark sql对hive支持的功能
1、查询语句:SELECT GROUP BY ORDER BY CLUSTER BY SORT BY
2、hive操作运算:
1) 关系运算:= ==, <>, <, >, >=, <=
2) 算术运算:+, -, *, /, %
3) 逻辑运算:AND, &&, OR, ||
4) 复杂的数据结构
5) 数学函数:(sign, ln, cos, etc)
6) 字符串函数:
3、 UDF
4、 UDAF
5、 用户定义的序列化格式
6、join操作:JOIN {LEFT|RIGHT|FULL} OUTER JOIN LEFT SEMI JOIN CROSS JOIN
7、 unions操作:
8、 子查询: SELECT col FROM ( SELECT a + b AS col from t1) t2
9、Sampling
10、 Explain
11、 分区表
12、 视图
13、 hive ddl功能:CREATE TABLE、CREATE TABLE AS SELECT、ALTER TABLE
14、 支持的数据类型:TINYINT SMALLINT INT BIGINT BOOLEAN FLOAT DOUBLE STRING BINARY TIMESTAMPDATE ARRAY MAP STRUCT
三、Spark sql 在客户端编程方式进行查询数据
1、启动spark-shell
./spark-shell --master spark://master:7077 --total-executor-cores 10 --executor-memory 1g --executor-cores 2
2、编写程序
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("../examples/src/main/resources/people.json")
查看所有数据:df.show()
查看表结构:df.printSchema()
只看name列:df.select("name").show()
对数据运算:df.select(df("name"), df("age") + 1).show()
过滤数据:df.filter(df("age") > 21).show()
分组统计:df.groupBy("age").count().show()
1、查询txt数据
import sqlContext.implicits._
case class Person(name: String, age: Int)
val people = sc.textFile("../examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
2、parquet文件
val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")
3、hdfs文件
val df = sqlContext.read.load("hdfs://namenode.Hadoop:9000/user/hive/warehouse/spark_test.db/test_parquet/part-r-00001.gz.parquet")
4、保存查询结果数据
val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet“)
四、Spark sql性能调优
缓存数据表:sqlContext.cacheTable("tableName")
取消缓存表:sqlContext.uncacheTable("tableName")
spark.sql.inMemoryColumnarStorage.compressedtrue 当设置为true时,Spark SQL将为基于数据统计信息的每列自动选择一个压缩算法。
spark.sql.inMemoryColumnarStorage.batchSize 10000 柱状缓存的批数据大小。更大的批数据可以提高内存的利用率以及压缩效率,但有OOMs的风险
如何使用 Spark SQL
一、启动方法
/data/spark-1.4.0-bin-cdh4/bin/spark-sql --master spark://master:7077 --total-executor-cores 10 --executor-memory 1g --executor-cores 2
注:/data/spark-1.4.0-bin-cdh4/为spark的安装路径
/data/spark-1.4.0-bin-cdh4/bin/spark-sql –help 查看启动选项
--master MASTER_URL 指定master url
--executor-memory MEM 每个executor的内存,默认为1G
--total-executor-cores NUM 所有executor的总核数
-e <quoted-query-string> 直接执行查询SQL
-f <filename> 以文件方式批量执行SQL
二、Spark sql对hive支持的功能
1、查询语句:SELECT GROUP BY ORDER BY CLUSTER BY SORT BY
2、hive操作运算:
1) 关系运算:= ==, <>, <, >, >=, <=
2) 算术运算:+, -, *, /, %
3) 逻辑运算:AND, &&, OR, ||
4) 复杂的数据结构
5) 数学函数:(sign, ln, cos, etc)
6) 字符串函数:
3、 UDF
4、 UDAF
5、 用户定义的序列化格式
6、join操作:JOIN {LEFT|RIGHT|FULL} OUTER JOIN LEFT SEMI JOIN CROSS JOIN
7、 unions操作:
8、 子查询: SELECT col FROM ( SELECT a + b AS col from t1) t2
9、Sampling
10、 Explain
11、 分区表
12、 视图
13、 hive ddl功能:CREATE TABLE、CREATE TABLE AS SELECT、ALTER TABLE
14、 支持的数据类型:TINYINT SMALLINT INT BIGINT BOOLEAN FLOAT DOUBLE STRING BINARY TIMESTAMPDATE ARRAY MAP STRUCT
三、Spark sql 在客户端编程方式进行查询数据
1、启动spark-shell
./spark-shell --master spark://master:7077 --total-executor-cores 10 --executor-memory 1g --executor-cores 2
2、编写程序
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("../examples/src/main/resources/people.json")
查看所有数据:df.show()
查看表结构:df.printSchema()
只看name列:df.select("name").show()
对数据运算:df.select(df("name"), df("age") + 1).show()
过滤数据:df.filter(df("age") > 21).show()
分组统计:df.groupBy("age").count().show()
1、查询txt数据
import sqlContext.implicits._
case class Person(name: String, age: Int)
val people = sc.textFile("../examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
2、parquet文件
val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")
3、hdfs文件
val df = sqlContext.read.load("hdfs://namenode.Hadoop:9000/user/hive/warehouse/spark_test.db/test_parquet/part-r-00001.gz.parquet")
4、保存查询结果数据
val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet“)
四、Spark sql性能调优
缓存数据表:sqlContext.cacheTable("tableName")
取消缓存表:sqlContext.uncacheTable("tableName")
spark.sql.inMemoryColumnarStorage.compressedtrue 当设置为true时,Spark SQL将为基于数据统计信息的每列自动选择一个压缩算法。
spark.sql.inMemoryColumnarStorage.batchSize 10000 柱状缓存的批数据大小。更大的批数据可以提高内存的利用率以及压缩效率,但有OOMs的风险
大数据技术常用的数据处理方式有哪些?
大数据技术常用的数据处理方式,有传统的ETL工具利用多线程处理文件的方式;有写MapRece,有利用Hive结合其自定义函数,也可以利用Spark进行数据清洗等,每种方式都有各自的使用场景。
在实际的工作中,需要根据不同的特定场景来选择数据处理方式。
1、传统的ETL方式
传统的ETL工具比如Kettle、Talend、Informatica等,可视化操作,上手比较快,但是随着数据量上升容易导致性能出问题,可优化的空间不大。
2、Maprece
写Maprece进行数据处理,需要利用java、python等语言进行开发调试,没有可视化操作界面那么方便,在性能优化方面,常见的有在做小表跟大表关联的时候,可以先把小表放到缓存中(通过调用Maprece的api),另外可以通过重写Combine跟Partition的接口实现,压缩从Map到rece中间数据处理量达到提高数据处理性能。
3、Hive
在没有出现Spark之前,Hive可谓独占鳌头,涉及离线数据的处理基本都是基于Hive来做的,Hive采用sql的方式底层基于Hadoop的Maprece计算框架进行数据处理,在性能优化上也不错。
4、Spark
Spark基于内存计算的准Maprece,在离线数据处理中,一般使用Spark sql进行数据清洗,目标文件一般是放在hdf或者nfs上,在书写sql的时候,尽量少用distinct,group by recebykey 等之类的算子,要防止数据倾斜。
大数据技术常用的数据处理方式有哪些?
大数据技术常用的数据处理方式,有传统的ETL工具利用多线程处理文件的方式;有写MapRece,有利用Hive结合其自定义函数,也可以利用Spark进行数据清洗等,每种方式都有各自的使用场景。
在实际的工作中,需要根据不同的特定场景来选择数据处理方式。
1、传统的ETL方式
传统的ETL工具比如Kettle、Talend、Informatica等,可视化操作,上手比较快,但是随着数据量上升容易导致性能出问题,可优化的空间不大。
2、Maprece
写Maprece进行数据处理,需要利用java、python等语言进行开发调试,没有可视化操作界面那么方便,在性能优化方面,常见的有在做小表跟大表关联的时候,可以先把小表放到缓存中(通过调用Maprece的api),另外可以通过重写Combine跟Partition的接口实现,压缩从Map到rece中间数据处理量达到提高数据处理性能。
3、Hive
在没有出现Spark之前,Hive可谓独占鳌头,涉及离线数据的处理基本都是基于Hive来做的,Hive采用sql的方式底层基于Hadoop的Maprece计算框架进行数据处理,在性能优化上也不错。
4、Spark
Spark基于内存计算的准Maprece,在离线数据处理中,一般使用Spark sql进行数据清洗,目标文件一般是放在hdf或者nfs上,在书写sql的时候,尽量少用distinct,group by recebykey 等之类的算子,要防止数据倾斜。
应用Spark技术,SoData数据机器人实现快速、通用数据治理
Spark是处理海量数据的快速通用引擎。作为大数据处理技术,Spark经常会被人们拿来与Hadoop比较。
Hadoop已经成了大数据技术的事实标准,Hadoop MapRece也非常适合于对大规模数据集合进行批处理操作,但是其本身还存在一些缺陷。具体表现在:
1、Hadoop MapRee的表达能力有限。所有计算都需要转换成Map和 Rece两个操作,不能适用于所有场景,对于复杂的数据处理过程难以描述。
2、磁盘I/O开销大。Hadoop MapRece要求每个步骤间的数据序列化到磁盘,所以I/O成本很高,导致交互分析和迭代算法开销很大,而几乎所有的最优化和机器学习都是迭代的。所以,Hadoop MapRece不适合于交互分析和机器学习。
3、计算延迟高。如果想要完成比较复杂的工作,就必须将一系列的MapRece作业串联起来然后顺序执行这些作业。每一个作业都是高时延的,而且只有在前一个作业完成之后下一个作业才能开始启动。因此,Hadoop MapRece不能胜任比较复杂的、多阶段的计算服务。
Spark借鉴Hadoop MapRece技术发展而来,继承了其分布式并行计算的优点的同时,改进了MapRece的许多缺陷。具体优势如下:
1、Spark提供广泛的数据集操作类型(20+种),支持Java,Python和Scala API,支持交互式的Python和Scala的shell。比Hadoop更加通用。
2、Spark提供Cache机制来支持需要反复迭代的计算或者多次数据共享,减少数据读取的I/O开销。Spark使用内存缓存来提升性能,因此进行交互式分析也足够快速,缓存同时提升了迭代算法的性能,这使得Spark非常适合数据理论任务,特别是机器学习。
3、Spark提供了内存计算,把中间结果放到内存中,带来了更高的迭代运算效率。通过支持有向无环图(DAG)的分布式并行计算的编程框架,减少迭代过程中数据需要写入磁盘的需求,提高处理效率。
此外,Spark还能与Hadoop无缝衔接,Spark可以使用YARN作为它的集群管理器,可以读取HDFS、HBase等一切Hadoop的数据。
Spark在最近几年发展迅速,相较于其他大数据平台或框架,Spark的代码库最为活跃。截止目前,最新发布的版本为Spark3.3.0。
也有许多数据治理工具,为了实现实时、通用的数据治理而采用Spark技术。以飞算推出的SoData数据机器人为例,是一套实时+批次、批流一体、高效的数据开发治理工具,能够帮助企业快速实现数据应用。
相较于传统数据加工流程,SoData数据机器人实现了流批一体数据同步机制,基于Spark和Flink框架进行深度二次开发,实现数据采集、集成、转换、装载、加工、落盘全流程实时+批次处理的极致体验,秒级延迟,稳定高效平均延迟5-10s,快速响应企业数据应用需求。
除了具备Spark数据处理的优势,SoData数据机器人的Spark体系还支持从各种数据源执行SQL生成Spark字典表,边开发边调试的Spark-SQL开发,支持任意结果集输出到各类数据库。可视化的运维、开发方式也能在极大降低数据开发、治理、应用门槛的同时,提升效率。
在某综合医院的信息化建设中,SoData数据机器人曾在5分钟内完成原本需要8-9小时才能完成的数据迁移工作。
目前,SoData数据机器人已应用于金融、医疗、能源等多个行业,将持续通过创新技术,为各行业组织机构带来更优质、快速的数据开发、治理、应用体验。
应用Spark技术,SoData数据机器人实现快速、通用数据治理
Spark是处理海量数据的快速通用引擎。作为大数据处理技术,Spark经常会被人们拿来与Hadoop比较。
Hadoop已经成了大数据技术的事实标准,Hadoop MapRece也非常适合于对大规模数据集合进行批处理操作,但是其本身还存在一些缺陷。具体表现在:
1、Hadoop MapRee的表达能力有限。所有计算都需要转换成Map和 Rece两个操作,不能适用于所有场景,对于复杂的数据处理过程难以描述。
2、磁盘I/O开销大。Hadoop MapRece要求每个步骤间的数据序列化到磁盘,所以I/O成本很高,导致交互分析和迭代算法开销很大,而几乎所有的最优化和机器学习都是迭代的。所以,Hadoop MapRece不适合于交互分析和机器学习。
3、计算延迟高。如果想要完成比较复杂的工作,就必须将一系列的MapRece作业串联起来然后顺序执行这些作业。每一个作业都是高时延的,而且只有在前一个作业完成之后下一个作业才能开始启动。因此,Hadoop MapRece不能胜任比较复杂的、多阶段的计算服务。
Spark借鉴Hadoop MapRece技术发展而来,继承了其分布式并行计算的优点的同时,改进了MapRece的许多缺陷。具体优势如下:
1、Spark提供广泛的数据集操作类型(20+种),支持Java,Python和Scala API,支持交互式的Python和Scala的shell。比Hadoop更加通用。
2、Spark提供Cache机制来支持需要反复迭代的计算或者多次数据共享,减少数据读取的I/O开销。Spark使用内存缓存来提升性能,因此进行交互式分析也足够快速,缓存同时提升了迭代算法的性能,这使得Spark非常适合数据理论任务,特别是机器学习。
3、Spark提供了内存计算,把中间结果放到内存中,带来了更高的迭代运算效率。通过支持有向无环图(DAG)的分布式并行计算的编程框架,减少迭代过程中数据需要写入磁盘的需求,提高处理效率。
此外,Spark还能与Hadoop无缝衔接,Spark可以使用YARN作为它的集群管理器,可以读取HDFS、HBase等一切Hadoop的数据。
Spark在最近几年发展迅速,相较于其他大数据平台或框架,Spark的代码库最为活跃。截止目前,最新发布的版本为Spark3.3.0。
也有许多数据治理工具,为了实现实时、通用的数据治理而采用Spark技术。以飞算推出的SoData数据机器人为例,是一套实时+批次、批流一体、高效的数据开发治理工具,能够帮助企业快速实现数据应用。
相较于传统数据加工流程,SoData数据机器人实现了流批一体数据同步机制,基于Spark和Flink框架进行深度二次开发,实现数据采集、集成、转换、装载、加工、落盘全流程实时+批次处理的极致体验,秒级延迟,稳定高效平均延迟5-10s,快速响应企业数据应用需求。
除了具备Spark数据处理的优势,SoData数据机器人的Spark体系还支持从各种数据源执行SQL生成Spark字典表,边开发边调试的Spark-SQL开发,支持任意结果集输出到各类数据库。可视化的运维、开发方式也能在极大降低数据开发、治理、应用门槛的同时,提升效率。
在某综合医院的信息化建设中,SoData数据机器人曾在5分钟内完成原本需要8-9小时才能完成的数据迁移工作。
目前,SoData数据机器人已应用于金融、医疗、能源等多个行业,将持续通过创新技术,为各行业组织机构带来更优质、快速的数据开发、治理、应用体验。
sparkSQL和spark有什么区别?
Spark为结构化数据处理引入了一个称为Spark SQL的编程模块。简而言之,sparkSQL是Spark的前身,是在Hadoop发展过程中,为了给熟悉RDBMS但又不理解MapRece的技术人员提供快速上手的工具。
sparkSQL提供了一个称为DataFrame(数据框)的编程抽象,DF的底层仍然是RDD,并且可以充当分布式SQL查询引擎。
SparkSql有哪些特点呢?
1)引入了新的RDD类型SchemaRDD,可以像传统数据库定义表一样来定义SchemaRDD。
2)在应用程序中可以混合使用不同来源的数据,如可以将来自HiveQL的数据和来自SQL的数据进行Join操作。
3)内嵌了查询优化框架,在把SQL解析成逻辑执行计划之后,最后变成RDD的计算。
Spark SQL(十):Hive On Spark
Hive是目前大数据领域,事实上的SQL标准。其底层默认是基于MapRece实现的,但是由于MapRece速度实在比较慢,因此这几年,陆续出来了新的SQL查询引擎,包括Spark SQL,Hive On Tez,Hive On Spark等。
Spark SQL与Hive On Spark是不一样的。Spark SQL是Spark自己研发出来的针对各种数据源,包括Hive、JSON、Parquet、JDBC、RDD等都可以执行查询的,一套基于Spark计算引擎的查询引擎。因此它是Spark的一个项目,只不过提供了针对Hive执行查询的工功能而已,适合在一些使用Spark技术栈的大数据应用类系统中使用。
而Hive On Spark,是Hive的一个项目,它是将Spark作为底层的查询引擎(不通过MapRece作为唯一的查询引擎)。Hive On Spark,只适用于Hive,在可预见的未来,很有可能Hive默认的底层引擎就从MapRece切换为Spark了;适合于将原有的Hive数据仓库以及数据统计分析替换为Spark引擎,作为全公司通用的大数据统计分析引擎。
Hive On Spark做了一些优化:
1、Map Join
Spark SQL默认对join是支持使用broadcast机制将小表广播到各个节点上,以进行join的。但是问题是,这会给Driver和Worker带来很大的内存开销。因为广播的数据要一直保留在Driver内存中。所以目前采取的是,类似乎MapRece的Distributed Cache机制,即提高HDFS replica factor的复制因子,以让数据在每个计算节点上都有一个备份,从而可以在本地进行数据读取。
2、Cache Table
对于某些需要对一张表执行多次操作的场景,Hive On Spark内部做了优化,即将要多次操作的表cache到内存中,以便于提升性能。但是这里要注意,并不是对所有的情况都会自动进行cache。所以说,Hive On Spark还有很多不完善的地方。
Hive QL语句 =>
语法分析 => AST =>
生成逻辑执行计划 => Operator Tree =>
优化逻辑执行计划 => Optimized Operator Tree =>
生成物理执行计划 => Task Tree =>
优化物理执行计划 => Optimized Task Tree =>
执行优化后的Optimized Task Tree