问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501

sparksession 作用域

发布网友 发布时间:2022-04-23 20:27

我来回答

1个回答

热心网友 时间:2022-04-11 04:16

Apache Spark 2.0引入了SparkSession,其为用户提供了一个统一的切入点来使用Spark的各项功能,并且允许用户通过它调用DataFrame和Dataset相关API来编写Spark程序。最重要的是,它减少了用户需要了解的一些概念,使得我们可以很容易地与Spark交互。
创建SparkSession
在2.0版本之前,与Spark交互之前必须先创建SparkConf和SparkContext,代码如下:

//set up the spark configuration and create contextsval sparkConf = new SparkConf().setAppName("SparkSessionZipsExample").setMaster("local")// your handle to SparkContext to access other context like SQLContextval sc = new SparkContext(sparkConf).set("spark.some.config.option", "some-value")val sqlContext = new org.apache.spark.sql.SQLContext(sc)
然而在Spark 2.0中,我们可以通过SparkSession来实现同样的功能,而不需要显式地创建SparkConf, SparkContext 以及 SQLContext,因为这些对象已经封装在SparkSession中。使用生成器的设计模式(builder design pattern),如果我们没有创建SparkSession对象,则会实例化出一个新的SparkSession对象及其相关的上下文。
// Create a SparkSession. No need to create SparkContext// You automatically get it as part of the SparkSessionval warehouseLocation = "file:${system:user.dir}/spark-warehouse"val spark = SparkSession
.builder()
.appName("SparkSessionZipsExample")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()
到现在我们可以使用上面创建好的spark对象,并且访问其public方法。
配置Spark运行相关属性
一旦我们创建好了SparkSession,我们就可以配置Spark运行相关属性。比如下面代码片段我们修改了已经存在的运行配置选项。
//set new runtime optionsspark.conf.set("spark.sql.shuffle.partitions", 6)
spark.conf.set("spark.executor.memory", "2g")//get all settingsval configMap:Map[String, String] = spark.conf.getAll()

获取Catalog元数据
  通常我们想访问当前系统的Catalog元数据。SparkSession提供了catalog实例来操作metastore。这些方法放回的都是Dataset类型的,所有我们可以使用Dataset相关的API来访问其中的数据。如下代码片段,我们展示了所有的表并且列出当前所有的数据库:
//fetch metadata data from the catalog
scala> spark.catalog.listDatabases.show(false)+--------------+---------------------+--------------------------------------------------------+|name |description |locationUri |+--------------+---------------------+--------------------------------------------------------+|default |Default Hive database|hdfs://iteblogcluster/user/iteblog/hive/warehouse |+--------------+---------------------+--------------------------------------------------------+
scala> spark.catalog.listTables.show(false)+----------------------------------------+--------+-----------+---------+-----------+|name |database|description|tableType|isTemporary|+----------------------------------------+--------+-----------+---------+-----------+|iteblog |default |null |MANAGED |false ||table2 |default |null |EXTERNAL |false ||test |default |null |MANAGED |false |+----------------------------------------+--------+-----------+---------+-----------+

创建Dataset和Dataframe
  使用SparkSession APIs创建 DataFrames 和 Datasets的方法有很多,其中最简单的方式就是使用spark.range方法来创建一个Dataset。当我们学习如何操作Dataset API的时候,这个方法非常有用。操作如下:
scala> val numDS = spark.range(5, 100, 5)
numDS: org.apache.spark.sql.Dataset[Long] = [id: bigint]

scala> numDS.orderBy(desc("id")).show(5)+---+| id|+---+| 95|| 90|| 85|| 80|| 75|+---+only showing top 5 rows
scala> numDS.describe().show()+-------+------------------+|summary| id|+-------+------------------+| count| 19|| mean| 50.0|| stddev|28.136571693556885|| min| 5|| max| 95|+-------+------------------+scala> val langPercentDF = spark.createDataFrame(List(("Scala", 35), | ("Python", 30), ("R", 15), ("Java", 20)))
langPercentDF: org.apache.spark.sql.DataFrame = [_1: string, _2: int]

scala> val lpDF = langPercentDF.withColumnRenamed("_1", "language").withColumnRenamed("_2", "percent")
lpDF: org.apache.spark.sql.DataFrame = [language: string, percent: int]

scala> lpDF.orderBy(desc("percent")).show(false)+--------+-------+ |language|percent|+--------+-------+|Scala |35 ||Python |30 ||Java |20 ||R |15 |+--------+-------+

使用SparkSession读取CSV
创建完SparkSession之后,我们就可以使用它来读取数据,下面代码片段是使用SparkSession来从csv文件中读取数据:

val df = sparkSession.read.option("header","true").
csv("src/main/resources/sales.csv")

上面代码非常像使用SQLContext来读取数据,我们现在可以使用SparkSession来替代之前使用SQLContext编写的代码。下面是完整的代码片段:

package com.iteblogimport org.apache.spark.sql.SparkSession/*** Spark Session example**/object SparkSessionExample {def main(args: Array[String]) {val sparkSession = SparkSession.builder.master("local").appName("spark session example").getOrCreate()val df = sparkSession.read.option("header","true").csv("src/main/resources/sales.csv")df.show()}}

使用SparkSession API读取JSON数据
  我们可以使用SparkSession来读取JSON、CVS或者TXT文件,甚至是读取parquet表。比如在下面代码片段里面,我将读取邮编数据的JSON文件,并且返回DataFrame对象:
// read the json file and create the dataframe
scala> val jsonFile = "/user/iteblog.json"
jsonFile: String = /user/iteblog.json
scala> val zipsDF = spark.read.json(jsonFile)
zipsDF: org.apache.spark.sql.DataFrame = [_id: string, city: string ... 3 more fields]

scala> zipsDF.filter(zipsDF.col("pop") > 40000).show(10, false)+-----+----------+-----------------------+-----+-----+|_id |city |loc |pop |state|+-----+----------+-----------------------+-----+-----+|01040|HOLYOKE |[-72.626193, 42.202007]|43704|MA ||01085|MONTGOMERY|[-72.754318, 42.129484]|40117|MA ||01201|PITTSFIELD|[-73.247088, 42.453086]|50655|MA ||01420|FITCHBURG |[-71.803133, 42.579563]|41194|MA ||01701|FRAMINGHAM|[-71.425486, 42.300665]|65046|MA ||01841|LAWRENCE |[-71.166997, 42.711545]|45555|MA ||01902|LYNN |[-70.941989, 42.469814]|41625|MA ||01960|PEABODY |[-70.961194, 42.532579]|47685|MA ||02124|DORCHESTER|[-71.072898, 42.287984]|48560|MA ||02146|BROOKLINE |[-71.128917, 42.339158]|56614|MA |+-----+----------+-----------------------+-----+-----+only showing top 10 rows

在SparkSession中还用Spark SQL
  通过SparkSession我们可以访问Spark SQL中所有函数,正如你使用SQLContext访问一样。下面代码片段中,我们创建了一个表,并在其中使用SQL查询:
// Now create an SQL table and issue SQL queries against it without// using the sqlContext but through the SparkSession object.// Creates a temporary view of the DataFrame
scala> zipsDF.createOrReplaceTempView("zips_table")

scala> zipsDF.cache()
res3: zipsDF.type = [_id: string, city: string ... 3 more fields]

scala> val resultsDF = spark.sql("SELECT city, pop, state, _id FROM zips_table")
resultsDF: org.apache.spark.sql.DataFrame = [city: string, pop: bigint ... 2 more fields]

scala> resultsDF.show(10)+------------+-----+-----+-----+| city| pop|state| _id|+------------+-----+-----+-----+| AGAWAM|15338| MA|01001|| CUSHMAN|36963| MA|01002|| BARRE| 4546| MA|01005|| BELCHERTOWN|10579| MA|01007|| BLANDFORD| 1240| MA|01008|| BRIMFIELD| 3706| MA|01010|| CHESTER| 1688| MA|01011||CHESTERFIELD| 177| MA|01012|| CHICOPEE|23396| MA|01013|| CHICOPEE|31495| MA|01020|+------------+-----+-----+-----+only showing top 10 rows

使用SparkSession读写Hive表
下面我们将使用SparkSession创建一个Hive表,并且对这个表进行一些SQL查询,正如你使用HiveContext一样:
sparksql参数设为永久生效

当我们在Spark SQL中设置参数时,默认情况下,参数的作用范围只限于当前SparkSession或SparkContext的生命周期。一旦SparkSession或SparkContext关闭,参数的取值也会被重置为默认值。这在某些情况下可能会导致问题,特别是当我们需要在整个应用程序执行过程中保持一致的参数设置时。为了解决这个问题,我们可以将Sp...

Load Port、SMIF

威孚(苏州)半导体技术有限公司是一家专注生产、研发、销售晶圆传输设备整机模块(EFEM/SORTER)及核心零部件的高科技半导体公司。公司核心团队均拥有多年半导体行业从业经验,其中技术团队成员博士、硕士学历占比80%以上,依托丰富的软件底层...

Spark-sql读取hive分区表限制分区过滤条件及限制分区数量

1、自定义规则CheckPartitionTable类,实现Rule 然后通过此种方法创建SparkSession 2、自定义规则CheckPartitionTable类,实现Rule,将规则类追加致Optimizer.batches: Seq[Batch]中 1、CheckPartitionTable规则执行类,需要通过引入sparkSession从而获取到引入conf;需要继承Rule[LogicalPlan];2、通过splitPredicates...

tekla临时视图怎么关闭

1、首先需要创建一个基本的SparkSession作为Spark中所有功能的入口点。2、读取文件创建DataFrame。3、创建临时视图。4、创建全局临时视图。5、重新创建一个临时视图来验证其作用域为当前会话。6、重新创建一个全局临时视图来验证其作用域为一个Spark应用程序。7、关闭tekla临时视图即可。

idea上找不到sparksession

idea上找不到sparksession?答案如下:代码和设置出现错误导致!首先第一步先点击打开设置按钮,然后帐户管理在页面点击账号安全中心进入即可完成!

用spark获取日志文件中记录内容?

val spark = SparkSession.builder.appName("Log Analysis").getOrCreate()val logLines = spark.sparkContext.textFile("path/to/logs.txt")logLines.collect().foreach(println)在上面的代码中,我们首先创建了一个`SparkSession`对象,这是与Spark交互的主要入口点。然后,我们使用`textFile`方法...

sparksql优化方法是什么呢?

配置缓存方式可以通过SparkSession.setConf()方法或SQL运行SET key=value命令实现。配置选项包括:2. 其他优化配置:这些选项有助于提高查询执行性能。但请注意,部分选项可能在未来的Spark版本中被弃用。3. 连接策略提示:使用连接策略提示如BROADCAST、MERGE、SHUFFLE_HASH和SHUFFLE_REPLICATE_NL,指导Spark在...

大数据开发-Spark-一文理解Spark中的Stage,Executor,Driver..._百度知 ...

Spark的运行流程可以分为几个步骤:首先,driver(用户编写的Spark程序)创建SparkContext或SparkSession,并与Cluster Manager通信,将任务分解成Job。Job由一系列Stage组成,Stage之间的执行是串行的,由shuffle、reduceBy和save等操作触发。Task是Stage的基本执行单元,一个Stage可以包含多个Task,每个Task处理...

用Python语言写Spark

在启动 PySpark 程序之前,需要初始化 SparkSession 对象,它是所有操作的起点。对于本地单机模式,使用 "local[*]" 表示使用所有 CPU 核心,这种模式通常能满足开发阶段的需求,并且实现多线程并行运行,使代码编写过程变得简单。Spark 还支持其他分布式模式,如 Standalone,Yarn 和 Mesos 等。构建好 ...

Spark离线开发框架设计与实现

框架会根据开发者配置的任务使用资源大小,完成了SparkSession、SparkContext、SparkConf的创建,同时加载了常用环境变量,开发了通用的UDF函数(如常用的url参数解析等)。其中Application为所有应用的父类,处理流程如图所示,开发者只需编写关注绿色部分即可。 目前,离线框架所支持的常用环境变量如下表所示。 2.2 可扩展工具 ...

spark跳过错误sql

在代码中,需要设置`spark.sql.analyzer.failAmbiguousSelfJoin`配置项为`false`(默认为`true`),这样在执行SQL时就会跳过无法解析的自联接,并输出警告信息。示例代码如下所示:```python#创建SparkSessionfrompyspark.sqlimportSparkSessionspark=SparkSession.builder.appName("SkipErrorSQL").getOrCreate...

phpsession的作用域 session作用域 session的作用域与生命周期 sessionId作用域 cookie和session区别 sparksession sparksession生命周期 kubernetes spark sparksession全局会话
声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com
会计分录原材料是什么 会计原材料是什么 怎么取消微信绑定的qq号以前微信号忘了 原材料算什么会计科目 以前注册的微信号,绑定了QQ号,现在换微信了,怎么取消以前的绑定?然后把... 一个手机号注册了个微信,然后没设置其他绑定了。 我哥不知情的情况下... ...然后用手机号绑定了另一个用QQ注册的微信号,解绑以后为什么这个手机... qq绑定了微信号怎么取消?那个微信号不记得是啥了,想把现在的微信号绑定... 如果一个男人有很多优点,但是同时有一下的一些缺点,你们说能交往吗?作... 优点往往也是缺点,如真爱请照单全收 昆山区号是多少 如何运行含spark的python脚本 江苏昆山的电话区号是多少? spark输出log信息中怎么过滤INFO信息 0520 是不是一个区号? Spark-shell和Spark-submit提交程序的区别 昆山电话的区号? 安装spark需要安装scala吗 ubuntu 16.04怎么配置spark环境 spark集群搭建时报TimeoutException是怎么回事 如何在Ubuntu下搭建Spark集群 spark独立模式还需要编译吗 spark 怎么启动worker spark 4040页面 不能访问。如果使用的是spark-shell启动写程序,4040可 ... 求助,spark 提交任务到集群报错 spark必须要hadoop吗 在linux上如何配置spark环境,在linux上安装scala和spark老是失败_百度... maven中${spark.version}是什么意思? spark加载hadoop本地库的时候出现不能加载的情况要怎么解决 最新的spark支持python的什么版本 常熟电话区号是0512还是0520? 0520这个区号为什么不是给泰州市而是给了苏州下面的昆山市啊?泰州的邮政编码是225300,像扬州的地级市 江苏省区号 全国各地区号多少? 谁知到中国各个区的区号是多少 请问0520开头短消息不是电话号码那是什么? 0520是哪里的区号 常熟区号 打耳洞的时候变成“洞”的肉到哪儿去了? 耳洞变成这样需要做任何处理吗? 我耳洞还能好吗 怎么变成这样了? 耳洞变成这样了,怎么处理才能痊愈啊 耳洞变成这样怎么处理? 我想问一下耳洞打了两三天后,耳洞洞里发黑是什么原因 该怎么办? 玉米野钓鲫鱼怎么钓 我的耳洞变成了这样 急急急!!!耳洞在昨天突然变成这样!是发炎了吗?! 耳洞变成这样是耐心养好还是放弃 在钓鱼的时候怎样使用玉米饵 我的耳洞最近变成这样了怎么办