SparkSQL_"spark.sql(\"update usr_info set 'age_range'=-1 wh-程序员宅基地

技术标签: PySpark  Spark  Sql  

1. 预览

  • Spark SQL是Spark处理结构化数据的模块
  • Spark SQL的功能之一是执行SQL查询, 也支持Hive, 查询结果以Dataset/DataFrame的形式返回
  • 一个DataFrame是一个Dataset组成的指定列

2. 开始入门

2.1 起始点: SparkSession

Spark SQL中所有功能的入口是SparkSession类.

使用SparkSession.builder创建一个SparkSession

>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.appName('appName').config('spark.some.config.option', 'some-value').getOrCreate()
>>> spark
SparkSession - in-memory

SparkContext

Spark UI

Version
v2.2.1
Master
local[*]
AppName
appName

2.2 创建数据框

可以从以下对象中创建DataFrame:
- 从一个已经存在的RDD
- 从Spark数据源
- 从Hive表

示例: 基于一个JSON文件创建一个DataFrame

>>> df = spark.read.json(r"C:\Install\spark-2.2.1-bin-hadoop2.7\examples\src\main\resources\people.json")
>>> df.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

2.3 DataFrame相关操作

举例简单介绍一下DataFrame的操作, 完整列表请参考DataFrame函数指南

>>> df.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

>>> df.select('name').show()
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

>>> df.select(df['name'], df['age']+1).show()
+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+

>>> df.filter(df['age']>12).show()
+---+------+
|age|  name|
+---+------+
| 30|  Andy|
| 19|Justin|
+---+------+

>>> df.groupBy('age').count().show()
+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+

2.4 运行SQL查询

SparkSession的sql函数可以运行SQL查询, 返回DataFrame

>>> df.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+
>>> df.createOrReplaceTempView('people')
>>> sqlDF = spark.sql('select * from people')
>>> sqlDF.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

2.5 全局临时视图

  • Spark SQL中的临时视图是session级别的, 会随着session的消失而消失.
  • 如果想让一个临时视图在所有session中相互传递并且可用, 直到Spark应用退出, 可以创建一个全局的临时视图.
  • 全局的临时视图存在于系统数据库global_temp中, 我们必须加上库名去引用
    select * from global_temp.table1
>>> df.createGlobalTempView('people')
>>> spark.sql('select * from global_temp.people').show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+
>>> spark.newSession().sql('select * from global_temp.people').show()  # 新建一个session结果还是一样, 说明是全局的
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

2.6 创建Datasets(Java and Scala) pass

2.7 将RDD转化为DataFrame的两种方式

  • textfile => [Row1, Row2, …] => DataFrame
>>> from pyspark.sql import Row
>>> sc = spark.sparkContext

>>> lines = sc.textFile(r'C:\Install\spark-2.2.1-bin-hadoop2.7\examples\src\main\resources\people.txt')
>>> parts = lines.map(lambda x: x.split(","))
>>> people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
>>> people.collect()
[Row(age=29, name='Michael'),
 Row(age=30, name='Andy'),
 Row(age=19, name='Justin')]
>>> schemaPeople = spark.createDataFrame(people)
>>> schemaPeople.show()
+---+-------+
|age|   name|
+---+-------+
| 29|Michael|
| 30|   Andy|
| 19| Justin|
+---+-------+
>>> schemaPeople.createOrReplaceTempView('people')
>>> teenagers = spark.sql('select * from people where age between 13 and 19')
>>> teenagers.show()
+---+------+
|age|  name|
+---+------+
| 19|Justin|
+---+------+
  • textfile => [data, schema] => DataFrame
    • RDD从原始的RDD穿件一个RDD的toples或者一个列表;
    • Step 1 被创建后, 创建 Schema 表示一个 StructType 匹配 RDD 中的结构.
    • 通过 SparkSession 提供的 createDataFrame 方法应用 Schema 到 RDD .
>>> from pyspark.sql.types import StringType, IntegerType, StructType, StructField
>>> sc = spark.sparkContext
>>> lines = sc.textFile(r'C:\Install\spark-2.2.1-bin-hadoop2.7\examples\src\main\resources\people.txt')
>>> parts = lines.map(lambda x: x.split(','))
>>> people = parts.map(lambda p: (p[0], p[1].strip()))
>>> fields = [StructField(name, StringType(), True) for name in ['name', 'age']]
>>> schema = StructType(fields)
>>> schemaPeople = spark.createDataFrame(people, schema=schema)
>>> result = spark.sql('select name from people')
>>> result.show()
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

2.8 Aggregations(Java and Scala) pass

  • Untyped User-Defined Aggregate Functions
  • Type-Safe User-Defined Aggregate Functions

3. 数据源

  • spark.read.load(path): 读取parquet文件为DataFrame
  • spark.write.save(path): 保存DataFrame为parquet文件
  • spark.read.load(path, format="json"): 指定源数据类型为给定格式
    • json
    • parquet
    • jdbc
    • orc
    • libsvm
    • csv
    • text
  • spark.write.save(path, format="json")
  • spark.sql(“select * from parquet.examples/src/main/resources/users.parquet“): 直接在文件上运行SQL
>>> df = spark.read.load(r'C:\Install\spark-2.2.1-bin-hadoop2.7\examples\src\main\resources\users.parquet')
>>> df.select('name', 'favorite_color').write.save('nameAndFavColors.parquet')
  • 保存模式(Scala and Java) pass

3.1.4 保存到持久表

>>> df = spark.read.load(r'C:\Install\spark-2.2.1-bin-hadoop2.7\examples\src\main\resources\users.parquet')
>>> df.show()

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+
>>> # df.write.option('path1', 'path2').saveAsTable()

DataFrames 也可以使用 saveAsTable 命令作为 persistent tables (持久表)保存到 Hive metastore 中. 请注意, existing Hive deployment (现有的 Hive 部署)不需要使用此功能. Spark 将为您创建默认的 local Hive metastore (本地 Hive metastore)(使用 Derby ). 与 createOrReplaceTempView 命令不同, saveAsTable 将 materialize (实现) DataFrame 的内容, 并创建一个指向 Hive metastore 中数据的指针. 即使您的 Spark 程序重新启动, Persistent tables (持久性表)仍然存在, 因为您保持与同一个 metastore 的连接. 可以通过使用表的名称在 SparkSession 上调用 table 方法来创建 persistent tabl (持久表)的 DataFrame .

对于 file-based (基于文件)的 data source (数据源), 例如 text, parquet, json等, 您可以通过 path 选项指定 custom table path (自定义表路径), 例如 df.write.option(“path”, “/some/path”).saveAsTable(“t”) . 当表被 dropped (删除)时, custom table path (自定义表路径)将不会被删除, 并且表数据仍然存在. 如果未指定自定义表路径, Spark 将把数据写入 warehouse directory (仓库目录)下的默认表路径. 当表被删除时, 默认的表路径也将被删除.

从 Spark 2.1 开始, persistent datasource tables (持久性数据源表)将 per-partition metadata (每个分区元数据)存储在 Hive metastore 中. 这带来了几个好处:

由于 metastore 只能返回查询的必要 partitions (分区), 因此不再需要将第一个查询上的所有 partitions discovering 到表中.
Hive DDLs 如 ALTER TABLE PARTITION … SET LOCATION 现在可用于使用 Datasource API 创建的表.
请注意, 创建 external datasource tables (外部数据源表)(带有 path 选项)的表时, 默认情况下不会收集 partition information (分区信息). 要 sync (同步) metastore 中的分区信息, 可以调用 MSCK REPAIR TABLE .

3.1.5 分桶, 排序和分区

对于基于文件的数据源, 也可以对输出进行bucket, sort和partition操作, 前二者仅适用于持久表

>>> df.write.bucketBy(42, 'name').sortBy('age').saveAsTable('pepple_buckted')

在使用Dataset API时, partitioning可以同时和save和saveAsTable一起使用

df.write.partitionBy('favorite_color').format('parquet').save('namesPartyByColor.parquet')

可以为单个表使用partitioning和bucketing

df = spark.read.parquet(r'C:\Install\spark-2.2.1-bin-hadoop2.7\examples\src\main\resources\users.parquet')
df.write.partitionBy('favorite_color').bucketBy(42, 'name').saveAsTable('people_partitioned_bucketed')

partitionBy 创建一个 directory structure (目录结构), 如 Partition Discovery 部分所述. 因此, 对 cardinality (基数)较高的 columns 的适用性有限. 相反, bucketBy 可以在固定数量的 buckets 中分配数据, 并且可以在 a number of unique values is unbounded (多个唯一值无界时)使用数据.

3.2 Parquet Files

3.2.1 以编程的方式加载数据

>>> peopleDF = spark.read.json(r'C:\Install\spark-2.2.1-bin-hadoop2.7\examples\src\main\resources\people.json')
>>> peopleDF.write.parquet('people.parquet')
>>> parquetFile = spark.read.parquet('people.parquet')
>>> parquetFile.createOrReplaceTempView('parquetFile')
>>> spark.sql('select name from parquetFile where age between 13 and 19').show()
+------+
|  name|
+------+
|Justin|
+------+

3.2.2 分区发现

3.2.3 模式合并

3.2.4 Hive metastore Oarquet table conversion

3.2.5 配置

可以使用SparkSession上的setConf方法或使用SQL运行set key=value命令来完成parquet的设置

参数名称 默认值 含义
spark.sql.parquet.binaryAsString false 一些其他 Parquet-producing systems (Parquet 生产系统), 特别是 Impala, Hive 和旧版本的 Spark SQL , 在 writing out (写出) Parquet schema 时, 不区分 binary data (二进制数据)和 strings (字符串). 该 flag 告诉 Spark SQL 将 binary data (二进制数据)解释为 string (字符串)以提供与这些系统的兼容性.
spark.sql.parquet.int96AsTimestamp true 一些 Parquet-producing systems , 特别是 Impala 和 Hive , 将 Timestamp 存入INT96 . 该 flag 告诉 Spark SQL 将 INT96 数据解析为 timestamp 以提供与这些系统的兼容性.
spark.sql.parquet.cacheMetadata true 打开 Parquet schema metadata 的缓存. 可以加快查询静态数据.
spark.sql.parquet.compression.codec snappy 在编写 Parquet 文件时设置 compression codec (压缩编解码器)的使用. 可接受的值包括: uncompressed, snappy, gzip, lzo .
spark.sql.parquet.filterPushdown true 设置为 true 时启用 Parquet filter push-down optimization .
spark.sql.hive.convertMetastoreParquet true 当设置为 false 时, Spark SQL 将使用 Hive SerDe 作为 parquet tables , 而不是内置的支持.
spark.sql.parquet.mergeSchema false 当为 true 时, Parquet data source (Parquet 数据源) merges (合并)从所有 data files (数据文件)收集的 schemas , 否则如果没有可用的 summary file , 则从 summary file 或 random data file 中挑选 schema .
spark.sql.optimizer.metadataOnly true 如果为 true , 则启用使用表的 metadata 的 metadata-only query optimization 来生成 partition columns (分区列)而不是 table scans (表扫描). 当 scanned (扫描)的所有 columns (列)都是 partition columns (分区列)并且 query (查询)具有满足 distinct semantics (不同语义)的 aggregate operator (聚合运算符)时, 它将适用.

3.3 Json数据集

通过spark.read.json方法将数据集加载为DataFrame, 支持两种格式:
- 常规的多行json文件, 如\examples\src\main\resources\people.json
- 独立有效的json对象rdd, 如['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']

>>> sc = spark.sparkContext
>>> path = r'C:\Install\spark-2.2.1-bin-hadoop2.7\examples\src\main\resources\people.json'
>>> peopleDF = spark.read.json(path)
>>> peopleDF.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)
>>> peopleDF.createOrReplaceTempView('people')
>>> spark.sql('select name from people where age between 13 and 19').show()
+------+
|  name|
+------+
|Justin|
+------+

>>> jsonStrings = ['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']
>>> rdd = sc.parallelize(jsonStrings)
>>> people2 = spark.read.json(rdd)
>>> people2.show()
+---------------+----+
|        address|name|
+---------------+----+
|[Columbus,Ohio]| Yin|
+---------------+----+

3.4 Hive表

  • Spark SQL还支持读取和写入Hive中的数据.
  • 但是, Hive中有大量的依赖关系, 因此这些依赖关系不包含在默认Spark分发中.
  • 如果在类路径中找到Hive依赖项, Spark将自动加载它们.
  • 请注意, 这些Hive依赖关系也必须存在于所有工作节点上, 因为它们将需要访问Hive序列化和反序列化库SerDes, 以访问存储在Hive中的数据
  • 通过修改hive-site.xml, core-site.xml, hdfs-site.xml文件来完成配置
  • 当使用Hive时, 必须用Hive支持实例化SparkSession, 包括:
    • 连接到持续的Hive转移
    • 支持Hive serdes
    • 支持Hive用户自定义功能
  • 没有现有Hive部署的用户仍可以启用Hive支持
  • 当hive-site.xml未配置时, 上下文会自动在当前目录创建metastore_db, 并创建有spark.sql.warehouse.dir配置的目录, 该目录默认为Sprk应用程序当前目录中的spark-warehouse目录
  • 请注意,自从2.0.0以来,hive-site.xml 中的 hive.metastore.warehouse.dir 属性已被弃用。 而是使用 spark.sql.warehouse.dir 来指定仓库中数据库的默认位置
  • 您可能需要向启动 Spark 应用程序的用户授予写权限
from os.path import expanduser, join, abspath
from pyspark.sql import SparkSession, Row
warehouse_location=abspath('spark-warehouse')
spark = SparkSession.builder.config('spark.sql.warehouse.dir', warehouse_location).enableHiveSupport().getOrCreate()
spark.sql('CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive')
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

3.4.1 指定Hive表的存储格式

创建Hive表时
- 需要定义输入格式和输出格式
- 还需要定义该表如何将数据反序列化为行, 或将行序列化为数据, 即serde
- 以下选项可用于指定存储格式 (“serde”, “input format”, “output format”),例如,CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')
- 默认情况下,我们将以纯文本形式读取表格文件
- 请注意,Hive 存储处理程序在创建表时不受支持,您可以使用 Hive 端的存储处理程序创建一个表,并使用 Spark SQL 来读取它

Property Name Meaning
fileFormat fileFormat是一种存储格式规范的包,包括 “serde”,”input format” 和 “output format”。 目前我们支持6个文件格式:’sequencefile’,’rcfile’,’orc’,’parquet’,’textfile’和’avro’。
inputFormat, outputFormat 这两个选项将相应的 “InputFormat” 和 “OutputFormat” 类的名称指定为字符串文字,例如: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat。 这两个选项必须成对出现,如果您已经指定了 “fileFormat” 选项,则无法指定它们。
serde 此选项指定 serde 类的名称。 当指定 fileFormat 选项时,如果给定的 fileFormat 已经包含 serde 的信息,那么不要指定这个选项。 目前的 “sequencefile”, “textfile” 和 “rcfile” 不包含 serde 信息,你可以使用这3个文件格式的这个选项。
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim 这些选项只能与 “textfile” 文件格式一起使用。它们定义如何将分隔的文件读入行。

3.4.2 与不同版本的Hive Metastore进行交互

3.5 JDBC连接其他数据库

  • Spark SQL可以使用JDBC读取其他数据库的数据, 这个功能优于使用JdbcRDD(因为直接返回DataFrame结果)
  • 要开始使用, 你需要在Spark类路径中包含特定数据库的JDBC driver程序. 例如, 要从Spark Shell连接到postgres, 运行命令:spark-shell –driver-class-path postgresql-9.4.1207.jar –jars postgresql-9.4.1207.jar
  • Spark支持以下option
    • url: 要连接的JDBC URL, 如jdbc:postgresql://localhost/test?user=fred&password=secret
    • dbtable: 需读取的JDBC表
    • driver: 用于连接到此URL的JDBC driver程序的类名
    • partitionColumn, lowerBound, upperBound:如果指定了这些选项,则必须指定这些选项。 另外,必须指定 numPartitions. 他们描述如何从多个 worker 并行读取数据时将表给分区。partitionColumn 必须是有问题的表中的数字列。 请注意,lowerBound 和 upperBound 仅用于决定分区的大小,而不是用于过滤表中的行。 因此,表中的所有行将被分区并返回。此选项仅适用于读操作。
    • numPartitions: 在表读写中可以用于并行度的最大分区数。这也确定并发JDBC连接的最大数量。 如果要写入的分区数超过此限制,则在写入之前通过调用 coalesce(numPartitions) 将其减少到此限制。
    • fetchsize:JDBC 抓取的大小,用于确定每次数据往返传递的行数。 这有利于提升 JDBC driver 的性能,它们的默认值较小(例如: Oracle 是 10 行)。 该选项仅适用于读取操作。
    • batchsize: JDBC 批处理的大小,用于确定每次数据往返传递的行数。 这有利于提升 JDBC driver 的性能。 该选项仅适用于写操作。默认值为 1000.
    • isolationLevel: 事务隔离级别,适用于当前连接。 它可以是 NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ, 或 SERIALIZABLE 之一,对应于 JDBC 连接对象定义的标准事务隔离级别,默认为 READ_UNCOMMITTED。 此选项仅适用于写操作。请参考 java.sql.Connection 中的文档。
    • truncate: 这是一个与 JDBC 相关的选项。 启用 SaveMode.Overwrite 时,此选项会导致 Spark 截断现有表,而不是删除并重新创建。 这可以更有效,并且防止表元数据(例如,索引)被移除。 但是,在某些情况下,例如当新数据具有不同的模式时,它将无法工作。 它默认为 false。 此选项仅适用于写操作。
    • createTableOptions: 这是一个与JDBC相关的选项。 如果指定,此选项允许在创建表时设置特定于数据库的表和分区选项(例如:CREATE TABLE t (name string) ENGINE=InnoDB. )。此选项仅适用于写操作。
    • createTableColumnTypes: 使用数据库列数据类型而不是默认值,创建表时。 数据类型信息应以与 CREATE TABLE 列语法相同的格式指定(例如:”name CHAR(64), comments VARCHAR(1024)”)。 指定的类型应该是有效的 spark sql 数据类型。此选项仅适用于写操作。
jdbcDF = spark.read.format('jdbc').option('url', 'jdbc:postgresql:dbserver').option('dbtable', 'schema.tablename').option('user', 'username').optioni('password', 'password').load()

jdbcDF2 = spark.read.jdbc('jdbc:postgresql:dbserver', 'schema.tablename', properise={
   'user': 'username', 'password': 'password'})

jdbcDF.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .save()

jdbcDF2.write \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={
   "user": "username", "password": "password"})

# Specifying create table column data types on write
jdbcDF.write \
    .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={
   "user": "username", "password": "password"})

3.6 故障排除

JDBC driver 程序类必须对客户端会话和所有执行程序上的原始类加载器可见。 这是因为 Java 的 DriverManager 类执行安全检查,导致它忽略原始类加载器不可见的所有 driver 程序,当打开连接时。一个方便的方法是修改所有工作节点上的compute_classpath.sh 以包含您的 driver 程序 JAR。
一些数据库,例如 H2,将所有名称转换为大写。 您需要使用大写字母来引用 Spark SQL 中的这些名称。

4. 性能调优

对于某些工作负载,可以通过缓存内存中的数据或打开一些实验选项来提高性能。

4.1 在内存中缓存数据

Spark SQL 可以通过调用 spark.catalog.cacheTable(“tableName”) 或 dataFrame.cache() 来使用内存中的列格式来缓存表。 然后,Spark SQL 将只扫描所需的列,并将自动调整压缩以最小化内存使用量和 GC 压力。 您可以调用 spark.catalog.uncacheTable(“tableName”) 从内存中删除该表

内存缓存的配置可以使用 SparkSession 上的 setConf 方法或使用 SQL 运行 SET key=value 命令来完成。

属性名称 默认 含义
spark.sql.inMemoryColumnarStorage.compressed true 当设置为 true 时,Spark SQL 将根据数据的统计信息为每个列自动选择一个压缩编解码器。
spark.sql.inMemoryColumnarStorage.batchSize 10000 控制批量的柱状缓存的大小。更大的批量大小可以提高内存利用率和压缩率,但是在缓存数据时会冒出 OOM 风险。

4.2 其他配置选项

以下选项也可用于调整查询执行的性能。这些选项可能会在将来的版本中被废弃,因为更多的优化是自动执行的

属性名称 默认值 含义
spark.sql.files.maxPartitionBytes 134217728 (128 MB) 在读取文件时,将单个分区打包的最大字节数。
spark.sql.files.openCostInBytes 4194304 (4 MB) 按照字节数来衡量的打开文件的估计费用可以在同一时间进行扫描。 将多个文件放入分区时使用。最好过度估计,那么具有小文件的分区将比具有较大文件的分区(首先计划的)更快。
spark.sql.broadcastTimeout 300 广播连接中的广播等待时间超时(秒)
spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) 配置执行连接时将广播给所有工作节点的表的最大大小(以字节为单位)。 通过将此值设置为-1可以禁用广播。 请注意,目前的统计信息仅支持 Hive Metastore 表,其中已运行命令 ANALYZE TABLE COMPUTE STATISTICS noscan。
spark.sql.shuffle.partitions 200 Configures the number of partitions to use when shuffling data for joins or aggregations.

5. 分布式SQL引擎

Spark SQL 也可以充当使用其 JDBC/ODBC 或命令行界面的分布式查询引擎。 在这种模式下,最终用户或应用程序可以直接与 Spark SQL 交互运行 SQL 查询,而不需要编写任何代码。

5.1 运行Thrift JDBC/ODBC

这里实现的 Thrift JDBC/ODBC 服务器对应于 Hive 1.2 中的 HiveServer2。 您可以使用 Spark 或 Hive 1.2.1 附带的直线脚本测试 JDBC 服务器。

要启动 JDBC/ODBC 服务器,请在 Spark 目录中运行以下命令:

./sbin/start-thriftserver.sh

此脚本接受所有 bin/spark-submit 命令行选项,以及 –hiveconf 选项来指定 Hive 属性。 您可以运行 ./sbin/start-thriftserver.sh –help 查看所有可用选项的完整列表。 默认情况下,服务器监听 localhost:10000. 您可以通过环境变量覆盖此行为,即:

export HIVE_SERVER2_THRIFT_PORT=<listening-port>
export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
./sbin/start-thriftserver.sh \
  --master <master-uri> \
  ...

or system properties:

./sbin/start-thriftserver.sh \
  --hiveconf hive.server2.thrift.port=<listening-port> \
  --hiveconf hive.server2.thrift.bind.host=<listening-host> \
  --master <master-uri>
  ...

现在,您可以使用 beeline 来测试 Thrift JDBC/ODBC 服务器:./bin/beeline

使用 beeline 方式连接到 JDBC/ODBC 服务器:
beeline> !connect jdbc:hive2://localhost:10000

Beeline 将要求您输入用户名和密码。 在非安全模式下,只需输入机器上的用户名和空白密码即可。 对于安全模式,请按照 beeline 文档 中的说明进行操作。

配置Hive是通过将 hive-site.xml, core-site.xml 和 hdfs-site.xml 文件放在 conf/ 中完成的。

您也可以使用 Hive 附带的 beeline 脚本。

Thrift JDBC 服务器还支持通过 HTTP 传输发送 thrift RPC 消息。 使用以下设置启用 HTTP 模式作为系统属性或在 conf/ 中的 hive-site.xml 文件中启用:

hive.server2.transport.mode - Set this to value: http
hive.server2.thrift.http.port - HTTP port number to listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice

要测试,请使用 beeline 以 http 模式连接到 JDBC/ODBC 服务器:

beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>

5.2 运行Spark SQL CLI

Spark SQL CLI 是在本地模式下运行 Hive 转移服务并执行从命令行输入的查询的方便工具。 请注意,Spark SQL CLI 不能与 Thrift JDBC 服务器通信。

要启动 Spark SQL CLI,请在 Spark 目录中运行以下命令:

./bin/spark-sql
配置 Hive 是通过将 hive-site.xml, core-site.xml 和 hdfs-site.xml 文件放在 conf/ 中完成的。 您可以运行 ./bin/spark-sql –help 获取所有可用选项的完整列表。

6. 支持的Hive特性

Spark SQL 支持绝大部分的 Hive 功能,如:

  • Hive query(查询)语句, 包括:
    • SELECT
    • GROUP BY
    • ORDER BY
    • CLUSTER BY
    • SORT BY
  • 所有 Hive 操作, 包括:
    • 关系运算符 (=, ⇔, ==, <>, <, >, >=, <=, 等等)
    • 算术运算符 (+, -, *, /, %, 等等)
    • 逻辑运算符 (AND, &&, OR, ||, 等等)
    • 复杂类型的构造
    • 数学函数 (sign, ln, cos, 等等)
    • String 函数 (instr, length, printf, 等等)
  • 用户定义函数 (UDF)
  • 用户定义聚合函数 (UDAF)
  • 用户定义 serialization formats (SerDes)
  • 窗口函数
  • Joins
    • JOIN
    • {LEFT|RIGHT|FULL} OUTER JOIN
    • LEFT SEMI JOIN
    • CROSS JOIN
  • Unions
  • Sub-queries(子查询)
    • SELECT col FROM ( SELECT a + b AS col from t1) t2
  • Sampling
  • Explain
  • Partitioned tables including dynamic partition insertion
  • View
  • 所有的 Hive DDL 函数, 包括:
    • CREATE TABLE
    • CREATE TABLE AS SELECT
    • ALTER TABLE
      大部分的 Hive Data types(数据类型), 包括:
    • TINYINT
    • SMALLINT
    • INT
    • BIGINT
    • BOOLEAN
    • FLOAT
    • DOUBLE
    • STRING
    • BINARY
    • TIMESTAMP
    • DATE
    • ARRAY<>
    • MAP<>
    • STRUCT<>

6. 参考

6.1 数据类型

Spark SQL 和 DataFrames 支持下面的数据类型:

  • Numeric types
    • ByteType: Represents 1-byte signed integer numbers. The range of numbers is from -128 to 127.
    • ShortType: Represents 2-byte signed integer numbers. The range of numbers is from -32768 to 32767.
    • IntegerType: Represents 4-byte signed integer numbers. The range of numbers is from -2147483648 to 2147483647.
    • LongType: Represents 8-byte signed integer numbers. The range of numbers is from -9223372036854775808 to 9223372036854775807.
    • FloatType: Represents 4-byte single-precision floating point numbers.
    • DoubleType: Represents 8-byte double-precision floating point numbers.
    • DecimalType: Represents arbitrary-precision signed decimal numbers. Backed internally by java.math.BigDecimal. A BigDecimal consists of an arbitrary precision integer unscaled value and a 32-bit integer scale.
  • String type
    • StringType: Represents character string values.
  • Binary type
    • BinaryType: Represents byte sequence values.
  • Boolean type
    • BooleanType: Represents boolean values.
  • Datetime type
    • TimestampType: Represents values comprising values of fields year, month, day, hour, minute, and second.
    • DateType: Represents values comprising values of fields year, month, day.
  • Complex types
    • ArrayType(elementType, containsNull): Represents values comprising a sequence of elements with the type of elementType. containsNull is used to indicate if elements in a ArrayType value can have null values.
    • MapType(keyType, valueType, valueContainsNull): Represents values comprising a set of key-value pairs. The data type of keys are described by keyType and the data type of values are described by valueType. For a MapType value, keys are not allowed to have null values. valueContainsNull is used to indicate if values of a MapType value can have null values.
    • StructType(fields): Represents values with the structure described by a sequence of StructFields (fields).
      • StructField(name, dataType, nullable): Represents a field in a StructType. The name of a field is indicated by name. The data type of a field is indicated by dataType. nullable is used to indicate if values of this fields can have null values.

Spark SQL中的数据类型都在pyspark.sql.types的包中, 通过以下方式访问:

from pysparl.sql.types import *
Data type Value type in Python API to access or create a data type
ByteType int or long, Note: Numbers will be converted to 1-byte signed integer numbers at runtime. Please make sure that numbers are within the range of -128 to 127. ByteType()
ShortType int or long, Note: Numbers will be converted to 2-byte signed integer numbers at runtime. Please make sure that numbers are within the range of -32768 to 32767. ShortType()
IntegerType int or long IntegerType()
LongType long, Note: Numbers will be converted to 8-byte signed integer numbers at runtime. Please make sure that numbers are within the range of -9223372036854775808 to 9223372036854775807. Otherwise, please convert data to decimal.Decimal and use DecimalType. LongType()
FloatType float, Note: Numbers will be converted to 4-byte single-precision floating point numbers at runtime. FloatType()
DoubleType float DoubleType()
DecimalType decimal.Decimal DecimalType()
StringType string StringType()
BinaryType bytearray BinaryType()
BooleanType bool BooleanType()
TimestampType datetime.datetime TimestampType()
DateType datetime.date DateType()
ArrayType list, tuple, or array ArrayType(elementType, [containsNull]), Note: The default value of containsNull is True.
MapType dict MapType(keyType, valueType, [valueContainsNull]), Note: The default value of valueContainsNull is True.
StructType list or tuple StructType(fields), Note: fields is a Seq of StructFields. Also, two fields with the same name are not allowed.
StructField The value type in Python of the data type of this field (For example, Int for a StructField with the data type IntegerType) StructField(name, dataType, [nullable]), Note: The default value of nullable is True.

6.2 NaN Semantics

当处理一些不符合标准浮点数语义的 float 或 double 类型时,对于 Not-a-Number(NaN) 需要做一些特殊处理. 具体如下:

  • NaN = NaN 返回 true.
  • 在 aggregations(聚合)操作中,所有的 NaN values 将被分到同一个组中.
  • 在 join key 中 NaN 可以当做一个普通的值.
  • NaN 值在升序排序中排到最后,比任何其他数值都大.
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/xiligey1/article/details/80267873

智能推荐

分布式光纤传感器的全球与中国市场2022-2028年:技术、参与者、趋势、市场规模及占有率研究报告_预计2026年中国分布式传感器市场规模有多大-程序员宅基地

文章浏览阅读3.2k次。本文研究全球与中国市场分布式光纤传感器的发展现状及未来发展趋势,分别从生产和消费的角度分析分布式光纤传感器的主要生产地区、主要消费地区以及主要的生产商。重点分析全球与中国市场的主要厂商产品特点、产品规格、不同规格产品的价格、产量、产值及全球和中国市场主要生产商的市场份额。主要生产商包括:FISO TechnologiesBrugg KabelSensor HighwayOmnisensAFL GlobalQinetiQ GroupLockheed MartinOSENSA Innovati_预计2026年中国分布式传感器市场规模有多大

07_08 常用组合逻辑电路结构——为IC设计的延时估计铺垫_基4布斯算法代码-程序员宅基地

文章浏览阅读1.1k次,点赞2次,收藏12次。常用组合逻辑电路结构——为IC设计的延时估计铺垫学习目的:估计模块间的delay,确保写的代码的timing 综合能给到多少HZ,以满足需求!_基4布斯算法代码

OpenAI Manager助手(基于SpringBoot和Vue)_chatgpt网页版-程序员宅基地

文章浏览阅读3.3k次,点赞3次,收藏5次。OpenAI Manager助手(基于SpringBoot和Vue)_chatgpt网页版

关于美国计算机奥赛USACO,你想知道的都在这_usaco可以多次提交吗-程序员宅基地

文章浏览阅读2.2k次。USACO自1992年举办,到目前为止已经举办了27届,目的是为了帮助美国信息学国家队选拔IOI的队员,目前逐渐发展为全球热门的线上赛事,成为美国大学申请条件下,含金量相当高的官方竞赛。USACO的比赛成绩可以助力计算机专业留学,越来越多的学生进入了康奈尔,麻省理工,普林斯顿,哈佛和耶鲁等大学,这些同学的共同点是他们都参加了美国计算机科学竞赛(USACO),并且取得过非常好的成绩。适合参赛人群USACO适合国内在读学生有意向申请美国大学的或者想锻炼自己编程能力的同学,高三学生也可以参加12月的第_usaco可以多次提交吗

MySQL存储过程和自定义函数_mysql自定义函数和存储过程-程序员宅基地

文章浏览阅读394次。1.1 存储程序1.2 创建存储过程1.3 创建自定义函数1.3.1 示例1.4 自定义函数和存储过程的区别1.5 变量的使用1.6 定义条件和处理程序1.6.1 定义条件1.6.1.1 示例1.6.2 定义处理程序1.6.2.1 示例1.7 光标的使用1.7.1 声明光标1.7.2 打开光标1.7.3 使用光标1.7.4 关闭光标1.8 流程控制的使用1.8.1 IF语句1.8.2 CASE语句1.8.3 LOOP语句1.8.4 LEAVE语句1.8.5 ITERATE语句1.8.6 REPEAT语句。_mysql自定义函数和存储过程

半导体基础知识与PN结_本征半导体电流为0-程序员宅基地

文章浏览阅读188次。半导体二极管——集成电路最小组成单元。_本征半导体电流为0

随便推点

【Unity3d Shader】水面和岩浆效果_unity 岩浆shader-程序员宅基地

文章浏览阅读2.8k次,点赞3次,收藏18次。游戏水面特效实现方式太多。咱们这边介绍的是一最简单的UV动画(无顶点位移),整个mesh由4个顶点构成。实现了水面效果(左图),不动代码稍微修改下参数和贴图可以实现岩浆效果(右图)。有要思路是1,uv按时间去做正弦波移动2,在1的基础上加个凹凸图混合uv3,在1、2的基础上加个水流方向4,加上对雾效的支持,如没必要请自行删除雾效代码(把包含fog的几行代码删除)S..._unity 岩浆shader

广义线性模型——Logistic回归模型(1)_广义线性回归模型-程序员宅基地

文章浏览阅读5k次。广义线性模型是线性模型的扩展,它通过连接函数建立响应变量的数学期望值与线性组合的预测变量之间的关系。广义线性模型拟合的形式为:其中g(μY)是条件均值的函数(称为连接函数)。另外,你可放松Y为正态分布的假设,改为Y 服从指数分布族中的一种分布即可。设定好连接函数和概率分布后,便可以通过最大似然估计的多次迭代推导出各参数值。在大部分情况下,线性模型就可以通过一系列连续型或类别型预测变量来预测正态分布的响应变量的工作。但是,有时候我们要进行非正态因变量的分析,例如:(1)类别型.._广义线性回归模型

HTML+CSS大作业 环境网页设计与实现(垃圾分类) web前端开发技术 web课程设计 网页规划与设计_垃圾分类网页设计目标怎么写-程序员宅基地

文章浏览阅读69次。环境保护、 保护地球、 校园环保、垃圾分类、绿色家园、等网站的设计与制作。 总结了一些学生网页制作的经验:一般的网页需要融入以下知识点:div+css布局、浮动、定位、高级css、表格、表单及验证、js轮播图、音频 视频 Flash的应用、ul li、下拉导航栏、鼠标划过效果等知识点,网页的风格主题也很全面:如爱好、风景、校园、美食、动漫、游戏、咖啡、音乐、家乡、电影、名人、商城以及个人主页等主题,学生、新手可参考下方页面的布局和设计和HTML源码(有用点赞△) 一套A+的网_垃圾分类网页设计目标怎么写

C# .Net 发布后,把dll全部放在一个文件夹中,让软件目录更整洁_.net dll 全局目录-程序员宅基地

文章浏览阅读614次,点赞7次,收藏11次。之前找到一个修改 exe 中 DLL地址 的方法, 不太好使,虽然能正确启动, 但无法改变 exe 的工作目录,这就影响了.Net 中很多获取 exe 执行目录来拼接的地址 ( 相对路径 ),比如 wwwroot 和 代码中相对目录还有一些复制到目录的普通文件 等等,它们的地址都会指向原来 exe 的目录, 而不是自定义的 “lib” 目录,根本原因就是没有修改 exe 的工作目录这次来搞一个启动程序,把 .net 的所有东西都放在一个文件夹,在文件夹同级的目录制作一个 exe._.net dll 全局目录

BRIEF特征点描述算法_breif description calculation 特征点-程序员宅基地

文章浏览阅读1.5k次。本文为转载,原博客地址:http://blog.csdn.net/hujingshuang/article/details/46910259简介 BRIEF是2010年的一篇名为《BRIEF:Binary Robust Independent Elementary Features》的文章中提出,BRIEF是对已检测到的特征点进行描述,它是一种二进制编码的描述子,摈弃了利用区域灰度..._breif description calculation 特征点

房屋租赁管理系统的设计和实现,SpringBoot计算机毕业设计论文_基于spring boot的房屋租赁系统论文-程序员宅基地

文章浏览阅读4.1k次,点赞21次,收藏79次。本文是《基于SpringBoot的房屋租赁管理系统》的配套原创说明文档,可以给应届毕业生提供格式撰写参考,也可以给开发类似系统的朋友们提供功能业务设计思路。_基于spring boot的房屋租赁系统论文