Delta表能够经过多种办法创立。创立表的办法主要取决于您对东西集的了解程度。如果您主要是SQL开发人员,能够运用SQL的CREATE TABLE来创立Delta表,而Python用户或许更喜欢运用DataFrameWriter API或细粒度且易于运用的DeltaTableBuilder API。

在创立表时,您能够界说生成列,其值是依据Delta表中其他列上的用户指定函数主动生成的。虽然存在一些约束,但生成列是丰富Delta表形式的强大办法。

Delta表能够经过规范的ANSI SQL或运用盛行的PySpark DataFrameReader API来读取。您能够运用经典的SQL INSERT句子写入Delta表,也能够将DataFrame附加到表中。最终,利用SQL的COPY INTO选项是快速附加大量数据的绝佳办法。

依据您常常运用的查询形式对Delta表进行分区能够明显改善查询和DML功用。组成Delta表的各个文件将以子目录的形式安排,这些子目录与分区列的值对齐。

Delta Lake答应您在业务日志中的提交条目中相关自界说元数据。这能够用于标记敏感的提交以进行审计。您还能够在表特点中存储自界说标签,就像您能够为云资源增加标签一样,现在您也能够将这些标签与Delta表相关起来。您还能够修正某些Delta功用。例如,您能够将delta.appendonly特点相关到表上以避免删除和更新。

创立Delta表

Delta Lake答应咱们以三种不同的办法创立表:

  1. SQL数据界说言语(DDL)指令 SQL开发人员现已十分了解经典的CREATE TABLE指令,您只需增加一些特点即可运用它来创立Delta表。
  2. PySpark DataFrameWriter API 大数据Python(和Scala)开发人员很或许现已十分了解这个API,您能够持续运用它来操作Delta表。
  3. DeltaTableBuilder API 这是专为Delta表规划的新API。它选用了盛行的Builder形式,为每个Delta表和列特点供给十分精密的操控。

在接下来的章节中,咱们将亲自体验这些表创立办法中的每一种。

运用SQL DDL创立Delta表

在Spark核算环境中运用的SQL版别被称为Spark SQL,它是Spark支持的ANSI SQL的变种。Spark SQL通常与ANSI规范SQL兼容。有关Spark SQL变种的更多详细信息,请参阅Spark文档。

正如前面说到的,您能够在Spark SQL中运用规范的SQL DDL指令来创立Delta表:

%sql
-- Create a Delta table by specifying the delta format, followed
-- by the path in quotes
CREATE TABLE IF NOT EXISTS delta.`/mnt/datalake/book/chapter03/rateCard`
(
    rateCodeId   INT,
    rateCodeDesc STRING
)
USING DELTA

您运用的表名的表明法是file_format | path_to_table,其间file_format是delta,而path_to_table是Delta表的途径。在实践应用中,运用这种格局或许会变得繁琐,由于文件途径或许会变得适当长。这便是目录的用处。目录答应您运用database.table_name表明法注册表,其间database是表的逻辑分组,而table_name是表的缩写。例如,如果您首要创立了一个名为taxidb的数据库,如下所示:

%sql
CREATE DATABASE IF NOT EXISTS taxidb;

然后,您能够依照以下办法创立上述表:

%sql
-- Create the table using the taxidb catalog
CREATE TABLE IF NOT EXISTS taxidb.rateCard
(
    rateCodeId   INT,
    rateCodeDesc STRING
)
USING DELTA
LOCATION '/mnt/datalake/book/chapter03/rateCard'

从此刻开始,您能够将这个Delta表称为taxidb.rateCard,这比delta./mnt/datalake/book/chapter03/rateCard或者或许更长的途径更简略回忆和键入。Spark生态系统中最广泛运用的目录是Hive目录。

当在创立表的数据湖方位运转目录列表时,您会看到咱们的目录是空的(由于您尚未加载任何数据),除了包含表的业务日志的_delta_log目录:

%sh
ls -al /dbfs/mnt/datalake/book/chapter03/rateCard
total 12
drwxrwxrwx 2 root root 4096 Dec  2 19:02 .
drwxrwxrwx 2 root root 4096 Dec  2 19:02 ..
drwxrwxrwx 2 root root 4096 Dec  2 16:40 _delta_log

当您翻开_delta_log目录时,您会看到咱们的第一个业务日志条目:

%sh
ls -al /dbfs/mnt/datalake/book/chapter03/rateCard/_delta_log
total 15
drwxrwxrwx 2 root root 4096 Dec  2 19:02 .
drwxrwxrwx 2 root root 4096 Dec  2 19:02 ..
-rwxrwxrwx 1 root root 1886 Dec  2 19:02 00000000000000000000.crc
-rwxrwxrwx 1 root root  939 Dec  2 19:02 00000000000000000000.json

在第2章关于业务日志的评论中,您现已了解了能够写入业务日志条目的不同操作。其间一个操作是元数据操作,它描绘了表的形式、分区列(如果适用)和其他信息。这个元数据操作总是写入为咱们的新表创立的第一个业务日志条目中。

要找到这个元数据操作,您能够在业务条目中查找字符串”metadata”:

%sh
grep metadata /dbfs/mnt/datalake/book/chapter03/rateCard
  /_delta_log/00000.json > /tmp/metadata.json
python -m json.tool /tmp/metadata.json

这会发生以下输出:

{
    "metaData": {
        "id": "f79c4c11-a807-49bc-93f4-2bbe778e2a04",
        "format": {
            "provider": "parquet",
            "options": {}
        },
        "schemaString": "{"type":"struct",
        "fields":[{"name":"rateCodeId",
        "type":"integer","nullable":true,
        "metadata":{}},{"name":"rateCodeDesc",
        "type":"string","nullable":true,
        "metadata":{}}]}",
        "partitionColumns": [],
        "configuration": {},
        "createdTime": 1670007736533
    }
}

在这里,您能够看到Delta Lake已将表的形式写入业务日志条目,以及一些审计和分区信息。

DESCRIBE句子

SQL的DESCRIBE指令可用于回来Parquet文件或Delta表的基本元数据信息。回来表的元数据包含每个列的以下信息:

  • 列名
  • 列数据类型
  • 应用于列的任何注释

以下是一个表级别的DESCRIBE指令示例:

%sql
DESCRIBE TABLE taxidb.rateCard;
 -------------- ----------- --------- 
| col_name     | data_type | comment |
 -------------- ----------- --------- 
| rateCodeId   | int       | <null>  |
| rateCodeDesc | string    | <null>  |
 -------------- ----------- --------- 

当您想要查找Delta Lake特定的特点时,还能够运用DESCRIBE TABLE EXTENDED指令,它供给更详细的元数据信息,包含以下通用特点:

  • 创立表的数据库的目录称号(在这种情况下是Hive元数据存储)
  • Hive数据库
  • 表名
  • 底层文件的方位
  • 表的一切者
  • 表特点

还包含以下Delta Lake特定的特点:

  • delta.minReaderVersion 读取该Delta表的读取器所需的最低协议读取器版别。
  • delta.minWriterVersion 向该Delta表写入数据的写入器所需的最低协议写入器版别。有关一切可用表特点的完好列表,请参阅Delta Lake文档。

以下是DESCRIBE TABLE EXTENDED指令的示例:

%sql
DESCRIBE TABLE EXTENDED taxidb.rateCard;

这会生成以下输出:

 ------------------------------ ------------------------------ --------- 
| col_name                     | data_type                    | comment |
 ------------------------------ ------------------------------ --------- 
| rateCodeId                   | int                          | <null>  |
| rateCodeDesc                 | string                       | <null>  |
|                              |                              |         |
| # Detailed Table Information |                              |         |
| Catalog                      | hive_metastore               |         |
| Database                     | taxidb                       |         |
| Table                        | ratecard                     |         |
| Type                         | EXTERNAL                     |         |
| Location                     | dbfs:/.../chapter03/rateCard |         |
| Provider                     | delta                        |         |
| Owner                        | root                         |         |
| Table Properties             | [delta.minReaderVersion=1,   |         |
|                              |  delta.minWriterVersion=2]   |         |
 ------------------------------ ------------------------------ --------- 

到目前为止,咱们现已介绍了怎么运用SQL DDL创立Delta表。在接下来的部分,咱们将切换回Python,并看看怎么运用了解的PySpark DataFrames来创立新的Delta表。

运用DataFrameWriter API创立Delta表

Spark DataFrames类似于联系数据库表或带有标题的Excel电子表格。数据以不同数据类型的行和列的形式存在。用于读取、写入和操作DataFrame的函数集合被统称为Spark DataFrameWriter API。

创立保管表

DataFrameWriter API的一个优点是你能够一起创立表并将来自Spark DataFrame的数据刺进其间,如下面的代码片段所示:

INPUT_PATH = '/databricks-datasets/nyctaxi/taxizone/taxi_rate_code.csv'
DELTALAKE_PATH = 
  'dbfs:/mnt/datalake/book/chapter03/createDeltaTableWithDataFrameWriter'
# Read the DataFrame from the input path
df_rate_codes = spark                                              
                .read                                              
                .format("csv")                                     
                .option("inferSchema", True)                       
                .option("header", True)                            
                .load(INPUT_PATH)
# Save our DataFrame as a managed Hive table
df_rate_codes.write.format("delta").saveAsTable('taxidb.rateCard')

在这里,咱们首要从taxi_rate_code.csv文件中填充DataFrame,然后经过指定.format(“delta”)选项将DataFrame保存为Delta表。表的形式将是咱们DataFrame的形式。请注意,这将是一个保管表,由于咱们没有为数据文件指定方位。您能够经过运转SQL的DESCRIBE TABLE EXTENDED指令来验证这一点:

%sql
DESCRIBE TABLE EXTENDED taxidb.rateCard;
 ------------------------------ ---------------------------------------------- 
| col_name                     | data_type                                    |
 ------------------------------ ---------------------------------------------- 
| RateCodeID                   | int                                          |
| RateCodeDesc                 | string                                       |
|                              |                                              |
| # Detailed Table Information |                                              |
| Catalog                      | hive_metastore                               |
| Database                     | taxidb                                       |
| Table                        | ratecard                                     |
| Type                         | MANAGED                                      |
| Location                     | dbfs:/user/hive/warehouse/taxidb.db/ratecard |
| Provider                     | delta                                        |
| Owner                        | root                                         |
| Is_managed_location          | true                                         |
| Table Properties             | [delta.minReaderVersion=1,                   |
|                              |  delta.minWriterVersion=2]                   |
 ------------------------------ ---------------------------------------------- 

咱们能够看到表的数据存储在/user/hive/warehouse方位,而且表的类型设置为MANAGED。

如果在表上运转SELECT指令,您能够看到数据确实成功从CSV文件加载:

%sql
SELECT * FROM taxidb.rateCard
 ------------ ----------------------- 
| RateCodeID | RateCodeDesc          |
 ------------ ----------------------- 
| 1          | Standard Rate         |
| 2          | JFK                   |
| 3          | Newark                |
| 4          | Nassau or Westchester |
| 5          | Negotiated fare       |
| 6          | Group ride            |
 ------------ ----------------------- 

创立一个非保管表

您能够经过一起指定Delta表的途径和称号来创立一个非保管表。在以下代码中,咱们按次序履行了这两个过程。首要,删除现有的表:

%sql
-- Drop the existing table
DROP TABLE IF EXISTS taxidb.rateCard;

接下来,写入并创立表:

# Next, create our Delta table, specifying both
# the path and the Delta table N=name
df_rate_codes                           
        .write                          
        .format("delta")                
        .mode("overwrite")              
        .option('path', DELTALAKE_PATH) 
        .saveAsTable('taxidb.rateCard')

再次经过履行简略的SELECT句子,咱们能够验证DataFrame的数据已被加载:

%sql
SELECT * FROM taxidb.rateCard
 ------------ ----------------------- 
| RateCodeID | RateCodeDesc          |
 ------------ ----------------------- 
| 1          | Standard Rate         |
| 2          | JFK                   |
| 3          | Newark                |
| 4          | Nassau or Westchester |
| 5          | Negotiated fare       |
| 6          | Group ride            |
 ------------ ----------------------- 

运用DeltaTableBuilder API创立Delta表

创立Delta表的最终一种办法是运用DeltaTableBuilder API。由于它专门针对Delta表规划,与传统的DataFrameWriter API相比,它供给了更高程度的细粒度操控。用户能够更轻松地指定附加信息,如列注释、表特点和生成的列。

生成器规划形式在软件言语中十分盛行。生成器形式旨在“将杂乱目标的构建与其表明分离,以便相同的构建过程能够创立不同的表明”。它用于逐步构建杂乱目标,最终一步将回来该目标。

在这种情况下,咱们正在构建的杂乱目标是一个Delta表。Delta表支持许多选项,规划一个具有许多参数的规范API是有挑战的。因而,DeltaTableBuilder具有许多小办法,比方addColumn(),它们都回来对生成器目标的引证。这样,咱们能够持续增加其他对addColumn()或生成器的办法的调用。咱们调用的最终一个办法是execute(),它收集接收到的一切特点,创立Delta表,并将对表的引证回来给调用者。要运用DeltaTableBuilder,咱们需要进行以下导入:

from delta.tables import *

这个示例创立了一个保管表:

# In this Create Table, you do NOT specify a location, so you are
# creating a MANAGED table
DeltaTable.createIfNotExists(spark)                              
    .tableName("taxidb.greenTaxis")                              
    .addColumn("RideId", "INT", comment = "Primary Key")         
    .addColumn("VendorId", "INT", comment = "Ride Vendor")       
    .addColumn("EventType", "STRING")                            
    .addColumn("PickupTime", "TIMESTAMP")                        
    .addColumn("PickupLocationId", "INT")                        
    .addColumn("CabLicense", "STRING")                           
    .addColumn("DriversLicense", "STRING")                       
    .addColumn("PassengerCount", "INT")                          
    .addColumn("DropTime", "TIMESTAMP")                          
    .addColumn("DropLocationId", "INT")                          
    .addColumn("RateCodeId", "INT", comment = "Ref to RateCard") 
    .addColumn("PaymentType", "INT")                             
    .addColumn("TripDistance", "DOUBLE")                         
    .addColumn("TotalAmount", "DOUBLE")                          
    .execute()

由于每个办法都回来对生成器目标的引证,咱们能够持续调用 .addColumn() 来增加每个列。最终,咱们调用 .execute() 来创立Delta表。

生成的列

Delta Lake支持生成列,这是一种特别类型的列,其值依据用户指定的函数在Delta表中的其他列上主动生成。当写入具有生成列的Delta表而且未明确为它们供给值时,Delta Lake会主动核算这些值。

让咱们创立一个示例。为了保持与出租车主题的一致性,咱们将创立一个黄色出租车表的简化版别:

%sql
CREATE TABLE taxidb.YellowTaxis
(
    RideId               INT        COMMENT 'This is our primary Key column',
    VendorId             INT,
    PickupTime           TIMESTAMP,
    PickupYear           INT        GENERATED ALWAYS AS(YEAR  (PickupTime)),
    PickupMonth          INT        GENERATED ALWAYS AS(MONTH (PickupTime)),
    PickupDay            INT        GENERATED ALWAYS AS(DAY   (PickupTime)),
    DropTime             TIMESTAMP,
    CabNumber            STRING     COMMENT 'Official Yellow Cab Number'
) USING DELTA
LOCATION "/mnt/datalake/book/chapter03/YellowTaxis.delta"
COMMENT 'Table to store Yellow Taxi data'

咱们看到了运用GENERATED ALWAYS AS的列,它从PickupTime列中提取了YEAR、MONTH和DAY。当咱们刺进记录时,这些列的值将主动填充:

%sql
INSERT INTO taxidb.YellowTaxis
    (RideId, VendorId, PickupTime, DropTime, CabNumber)
VALUES
    (5, 101, '2021-7-1T8:43:28UTC 3', '2021-7-1T8:43:28UTC 3', '51-986')

当咱们挑选该记录时,咱们能够看到生成的列已主动填充:

%sql
SELECT PickupTime, PickupYear, PickupMonth, PickupDay FROM taxidb.YellowTaxis
 --------------------------- ------------ ------------- ----------- 
| pickupTime                | pickupYear | pickupMonth | pickupDay |
 --------------------------- ------------ ------------- ----------- 
| 2021-07-01 05:43:28 00:00 | 2021       | 7           | 1         |
 --------------------------- ------------ ------------- ----------- 

当咱们明确为生成列供给一个值时,该值有必要满足约束条件(<value> ⇔ generation expression)IS TRUE,否则刺进操作将失利并报错。

在GENERATED ALWAYS AS中运用的表达式能够是任何Spark SQL函数,只要在给定相同的参数值时一直回来相同的成果,有一些例外情况咱们将很快说到。您能够考虑运用一个生成列来生成一个类似这样的唯一ID列:

%sql
CREATE OR REPLACE TABLE default.dummy
(
    ID   STRING GENERATED ALWAYS AS (UUID()),
    Name STRING
) USING DELTA

但是,当您尝试运转此操作时,您会收到以下错误消息:

Found uuid(). A generated column cannot use a non deterministic expression.

UUID()函数在每次调用时都会回来不同的值,这违反了前面的规则。以下是此规则的一些例外情况,适用于以下类型的函数:

  • 用户自界说函数
  • 聚合函数
  • 窗口函数
  • 回来多行的函数

运用列出的函数创立的GENERATED ALWAYS AS列是有用的,并能够在多种场景中十分有用,例如核算给定记录样本的规范差。

读取Delta表

在读取表时,咱们有几种选项:运用DataFrameReader的SQL和PySpark。当咱们在Databricks Community Edition中运用笔记本时,通常在笔记本内部一起运用SQL和PySpark单元格。有些操作,比方快速的SELECT,用SQL更简略和更快,而杂乱的操作有时更简略在PySpark和DataFrameReader中表达。当然,这也取决于工程师的经验和偏好。咱们主张选用一种务实的办法,依据您当前正在处理的问题,合理混合运用这两种办法。

运用SQL读取Delta表

要读取Delta表,咱们只需翻开一个SQL单元格并编写SQL查询。如果依照GitHub README文件中的阐明设置了环境,咱们将在/mnt/datalake/book/chapter03/YellowTaxisDelta文件夹中拥有一个Delta表:

%sh
ls -al /dbfs/mnt/datalake/book/chapter03/YellowTaxisDelta
total 236955
drwxrwxrwx 2 root root      4096 Dec  4 18:04 .
drwxrwxrwx 2 root root      4096 Dec  2 19:02 ..
drwxrwxrwx 2 root root      4096 Dec  4 16:41 _delta_log
-rwxrwxrwx 1 root root 134759123 Dec  4 18:04 part-00000-...-c000.snappy.parquet
-rwxrwxrwx 1 root root 107869302 Dec  4 18:04 part-00001-...-c000.snappy.parquet

咱们能够快速将Delta表方位注册到元数据存储中,如下所示:

%sql
CREATE TABLE taxidb.YellowTaxis
USING DELTA
LOCATION "/mnt/datalake/book/chapter03/YellowTaxisDelta/"

创立表之后,咱们能够快速统计记录的数量:

%sql
SELECT
    COUNT(*)
FROM
    taxidb.yellowtaxis

这会给咱们以下的计数成果:

 ---------- 
| count(1) |
 ---------- 
| 9999995  |
 ---------- 

咱们能够看到有近1000万行可供运用。咱们能够运用另一种DESCRIBE指令变体来获取表的详细信息:

%sql
DESCRIBE TABLE FORMATTED taxidb.YellowTaxis;

DESCRIBE TABLE FORMATTED指令格局化输出,使其更易阅读:

 ------------------------------ -------------------------------------- 
| col_name                     | data_type                            |
 ------------------------------ -------------------------------------- 
| RideId                       | int                                  |
| VendorId                     | int                                  |
| PickupTime                   | timestamp                            |
| DropTime                     | timestamp                            |
| PickupLocationId             | int                                  |
| DropLocationId               | int                                  |
| CabNumber                    | string                               |
| DriverLicenseNumber          | string                               |
| PassengerCount               | int                                  |
| TripDistance                 | double                               |
| RatecodeId                   | int                                  |
| PaymentType                  | int                                  |
| TotalAmount                  | double                               |
| FareAmount                   | double                               |
| Extra                        | double                               |
| MtaTax                       | double                               |
| TipAmount                    | double                               |
| TollsAmount                  | double                               |
| ImprovementSurcharge         | double                               |
|                              |                                      |
| # Detailed Table Information |                                      |
| Catalog                      | hive_metastore                       |
| Database                     | taxidb                               |
| Table                        | YellowTaxis                          |
| Type                         | EXTERNAL                             |
| Location                     | dbfs:/.../chapter03/YellowTaxisDelta |
| Provider                     | delta                                |
| Owner                        | root                                 |
| Table Properties             | [delta.minReaderVersion=1,           |
|                              |  delta.minWriterVersion=2]           |
 ------------------------------ -------------------------------------- 

由于Spark SQL支持大多数ANSI SQL子集,咱们能够运用任何类型的杂乱查询。以下是一个示例,回来FareAmount超越50美元的CabNumbers:

%sql
SELECT
    CabNumber,
    AVG(FareAmount) AS AverageFare
FROM
    taxidb.yellowtaxis
GROUP BY
    CabNumber
HAVING
     AVG(FareAmount) > 50
ORDER BY
    2 DESC
LIMIT 5

这给咱们的成果是:

 ----------- ------------- 
| cabnumber | AverageFare |
 ----------- ------------- 
| SIR104    | 111.5       |
| T628190C  | 109.0       |
| PEACE16   | 89.7        |
| T439972C  | 89.5        |
| T802013C  | 85.0        |
 ----------- ------------- 

咱们还能够在Python中直接运用spark.sql,运用规范SQL作为参数。以下是一个简略的Python代码片段,履行与之前的SQL查询相同的操作:

number_of_results = 5
sql_statement = f"""
SELECT
    CabNumber,
    AVG(FareAmount) AS AverageFare
FROM
    taxidb.yellowtaxis
GROUP BY
    CabNumber
HAVING
     AVG(FareAmount) > 50
ORDER BY
    2 DESC
LIMIT {number_of_results}"""
df = spark.sql(sql_statement)
display(df)

这会生成与SQL相同的成果:

 ----------- ------------- 
| cabnumber | AverageFare |
 ----------- ------------- 
| SIR104    | 111.5       |
| T628190C  | 109.0       |
| PEACE16   | 89.7        |
| T439972C  | 89.5        |
| T802013C  | 85.0        |
 ----------- ------------- 

咱们主张运用三重引号语法,这样能够轻松跨多行界说字符串而无需运用持续行。此外,请注意咱们有一个名为number_of_results的变量,然后将三重引号字符串转换为f-string,并运用{}语法将变量刺进到约束中。

运用PySpark读取表

要在PySpark中读取相同的表,您能够运用DataFrameReader。例如,要完结记录的计数,咱们运用以下办法:

df = spark.read.format("delta").table("taxidb.YellowTaxis")
print(f"Number of records: {df.count():,}")

输出:

Number of records: 9,999,995

请注意,咱们指定了Delta格局,由于咱们的表是Delta表,咱们能够运用.table()办法指定咱们要读取整个表。最终,咱们运用了一个f-string,这次运用了“:,”格局化符号,它会在每三位数字之间运用逗号分隔符

接下来,让咱们从头创立之前在SQL中完结的按出租车编号排序的前五个平均费用的代码。以下是Python代码:

# Make sure to import the functions you want to use
from pyspark.sql.functions import col, avg, desc
# Read YellowTaxis into our DataFrame
df = spark.read.format("delta").table("taxidb.YellowTaxis")
# Perform the GROUP BY, average (AVG), HAVING and order by equivalents
# in pySpark
results = df.groupBy("CabNumber")                          
            .agg(avg("FareAmount").alias("AverageFare"))   
            .filter(col("AverageFare") > 50)               
            .sort(col("AverageFare").desc())               
            .take(5)                                      
# Print out the result, since this is a list and not a DataFrame
# you an use list comprehension to output the results in a single
# line
[print(result) for result in results]

咱们将得到以下输出:

Row(CabNumber='SIR104', AverageFare=111.5)
Row(CabNumber='T628190C', AverageFare=109.0)
Row(CabNumber='PEACE16', AverageFare=89.7)
Row(CabNumber='T439972C', AverageFare=89.5)
Row(CabNumber='T802013C', AverageFare=85.0)

咱们能够简略地运用 groupBy() 函数按列分组:

要核算平均值,首要咱们要运用.agg()办法。在办法内部,咱们能够指定要核算的聚合函数,本例中是.avg()(平均值)。在Python中,等效于HAVING条件的是.filter()办法,其间能够运用过滤表达式来指定筛选条件。最终,咱们运用.sort()办法对数据进行排序,然后运用.take()来提取前五个成果。需要注意的是,.take()函数将回来一个Python列表。由于咱们有一个列表,能够运用列表推导式来输出列表中的每个成果。