Skip to content

Commit

Permalink
Copy Files From Source Repo (2023-09-15 17:35)
Browse files Browse the repository at this point in the history
  • Loading branch information
olprod committed Sep 16, 2023
1 parent 515ae0a commit a7075c0
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 56 deletions.
38 changes: 19 additions & 19 deletions Instructions/Labs/03b-medallion-lakehouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ lab:

1. 在 Power BI 门户左下角,选择 Power BI 图标并切换到“数据工程”体验 。 如果你未看到数据工程体验,请与 Fabric 管理员联系,并请求[启用 Fabric](https://learn.microsoft.com/fabric/admin/fabric-switch)

2. 在“**Synapse 数据工程**”主页中,新建名为“**销售**”的**湖屋**
2. 在“Synapse 数据工程”主页中,新建名为“销售”的湖屋

大约一分钟后,一个新的空湖屋创建完成。 需要将一些数据引入数据湖屋进行分析。 有多种方法可以执行此操作,但在本练习中,只需将文本文件下载到本地计算机(或者实验室 VM,如果适用),然后将其上传到湖屋。

Expand Down Expand Up @@ -155,11 +155,11 @@ lab:

10. 使用 ****▷** (*运行单元格*)** 按钮运行单元格以执行代码。

11. 在湖屋资源管理器窗格的“Tables”部分选择“...”,然后选择“刷新” 。 此时会看到列出的新 sales_silver 表。 **▲** (三角形图标)表示它是一个 Delta 表。
11. 在湖屋资源管理器窗格的“Tables”部分选择“...”,然后选择“刷新” 。 此时会看到列出的新 sales_silver 表。 ▲(三角形图标)表示它是一个 Delta 表。

![湖屋中 sales_silver 表的屏幕截图。](./Images/sales-silver-table.png)

> **注意**:如果看不到新表,请等待几秒钟,然后再次选择“**刷新**”,或刷新整个浏览器选项卡。
> 注意:如果看不到新表,请等待几秒钟,然后再次选择“刷新”,或刷新整个浏览器选项卡。

12. 现在,你将对 Delta 表执行**更新插入操作**,根据特定条件更新现有记录,并在找不到匹配项时插入新记录。 添加新代码块并粘贴以下代码:

Expand Down Expand Up @@ -255,7 +255,7 @@ lab:

2. 在湖屋资源管理器窗格中,选择“添加”,然后选择前面创建的“Sales”湖屋来添加“Sales”湖屋。 你将在资源管理器窗格的“Tables”部分看到列出的 sales_silver 表 。

3. 在现有代码块中,删除样本文本并**添加以下代码**,以便将数据加载到数据帧并开始构建星型架构,然后运行它:
3. 在现有代码块中,删除样本文本并添加以下代码,以便将数据加载到数据帧并开始构建星型架构,然后运行它:

```python
# Load data to the dataframe as a starting point to create the gold layer
Expand All @@ -282,7 +282,7 @@ lab:

> 注意:你可以随时运行 `display(df)` 命令来查看工作进度。 在这种情况下,可以运行“display(dfdimDate_gold)”来查看 dimDate_gold 数据帧的内容。

5. 在新代码块中,**添加并运行以下代码**,为你的日期维度 **dimdate_gold** 创建数据帧:
5. 在新代码块中,添加并运行以下代码,为你的日期维度 dimdate_gold 创建数据帧:

```python
from pyspark.sql.functions import col, dayofmonth, month, year, date_format
Expand All @@ -302,7 +302,7 @@ lab:
display(dfdimDate_gold.head(10))
```

6. 你将代码分离到新的代码块中,以便了解和观察在转换数据时笔记本中发生的情况。 在另一个新代码块中,**添加并运行以下代码**以在新数据传入时更新日期维度
6. 你将代码分离到新的代码块中,以便了解和观察在转换数据时笔记本中发生的情况。 在另一个新代码块中,添加并运行以下代码以在新数据传入时更新日期维度

```python
from delta.tables import *
Expand Down Expand Up @@ -335,7 +335,7 @@ lab:
```

恭喜! 你的日期维度全部设置好了。 现在,你将创建客户维度。
7. 要生成客户维度表,**添加新代码块**,然后粘贴并运行以下代码:
7. 要生成客户维度表,添加新代码块,然后粘贴并运行以下代码:

```python
from pyspark.sql.types import *
Expand All @@ -352,7 +352,7 @@ lab:
.execute()
```

8. 在新代码块中,**添加并运行以下代码**以删除重复的客户,选择特定列,然后拆分“CustomerName”列以创建“First”和“Last”姓名列:
8. 在新代码块中,添加并运行以下代码以删除重复的客户,选择特定列,然后拆分“CustomerName”列以创建“First”和“Last”姓名列:

```python
from pyspark.sql.functions import col, split
Expand All @@ -365,7 +365,7 @@ lab:

# Display the first 10 rows of the dataframe to preview your data

display(dfdimDate_gold.head(10))
display(dfdimCustomer_silver .head(10))
```

此处,你通过执行各种转换(例如删除重复项、选择特定列以及拆分“CustomerName”列以创建“First”和“Last”名称列)创建了一个新的数据帧 dfdimCustomer_silver。 结果是一个数据帧,其中包含已清理和结构化的客户数据,包括从“CustomerName”列中提取的单独“First”和“Last”名称列。
Expand All @@ -385,12 +385,12 @@ lab:

# Display the first 10 rows of the dataframe to preview your data

display(dfdimDate_gold.head(10))
display(dfdimCustomer_gold.head(10))
```

此处,将清理和转换客户数据 (dfdimCustomer_silver),方法是执行左反联接以排除 dimCustomer_gold 表中已存在的重复项,然后使用 monotonically_increasing_id() 函数生成唯一的 CustomerID 值。

10. 现在,可以确保客户表在新数据传入时保持最新状态。 **在新代码块中**,粘贴并运行以下代码:
10. 现在,可以确保客户表在新数据传入时保持最新状态。 在新代码块中,粘贴并运行以下代码:

```python
from delta.tables import *
Expand Down Expand Up @@ -470,16 +470,16 @@ lab:
display(dfdimProduct_gold.head(10))
```

14. 与对其他维度执行的操作类似,你需要确保产品表在新数据传入时保持最新。 **在新代码块中**,粘贴并运行以下代码:
14. 与对其他维度执行的操作类似,你需要确保产品表在新数据传入时保持最新。 在新代码块中,粘贴并运行以下代码:

```python
from delta.tables import *
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, 'Tables/dimproduct_gold')
deltaTable = DeltaTable.forPath(spark, 'Tables/dimproduct_gold')

dfUpdates = dfdimProduct_gold
dfUpdates = dfdimProduct_gold

deltaTable.alias('silver') \
deltaTable.alias('silver') \
.merge(
dfUpdates.alias('updates'),
'silver.ItemName = updates.ItemName AND silver.ItemInfo = updates.ItemInfo'
Expand All @@ -503,7 +503,7 @@ lab:

**构建好维度后,最后一步是创建事实数据表。**

15. **在新代码块中**,粘贴并运行以下代码以创建**事实数据表**
15. 在新代码块中,粘贴并运行以下代码以创建事实数据表

```python
from pyspark.sql.types import *
Expand All @@ -520,7 +520,7 @@ lab:
.execute()
```

16. **在新代码块中**,粘贴并运行以下代码以创建**新的数据帧**,将销售数据与客户和产品信息(包括客户 ID、商品 ID、订单日期、数量、单价和税费)合并:
16. 在新代码块中,粘贴并运行以下代码以创建新的数据帧,将销售数据与客户和产品信息(包括客户 ID、商品 ID、订单日期、数量、单价和税费)合并:

```python
from pyspark.sql.functions import col
Expand Down Expand Up @@ -583,7 +583,7 @@ lab:

此处,将使用 Delta Lake 的合并操作将新的销售额数据 (dffactSales_gold) 同步和更新到 factsales_gold 表。 此操作将比较现有数据(银表)和新数据(更新数据帧)之间的订单日期、客户 ID 和商品 ID,更新匹配记录并根据需要插入新记录。

现在,你有了一个经过策划和建模的****,可用于报告和分析。
现在,你有了一个经过策划和建模的金层,可用于报告和分析。

## 创建数据集

Expand Down
62 changes: 25 additions & 37 deletions Instructions/Labs/10-ingest-notebooks.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ lab:

完成本实验室大约需要 30 分钟。

对于这种体验,我们将跨多个笔记本代码单元格生成代码,这可能无法反映你会如何在环境中执行此操作;但是它对于调试很有用
对于此体验,你将跨多个笔记本代码单元格生成代码,这可能无法反映出你在你的环境中执行此操作的方式,但是,它对于调试是有用的

由于我们还在使用示例数据集,因此优化并不能反映在大规模生产中可能看到的情况;但是,仍然可以看到改进,而当每一毫秒都很重要时,优化就是关键。
由于你也在使用示例数据集,因此优化并不能反映出在大规模生产中你可能看到的情况。但是,你仍然可以看到改进,而当每一毫秒都很重要时,优化就是关键。

> 注意:完成本练习需要 Microsoft Fabric 许可证 。 有关如何启用免费 Fabric 试用版许可证的详细信息,请参阅 [Fabric 入门](https://learn.microsoft.com/fabric/get-started/fabric-trial)
>
Expand All @@ -22,7 +22,7 @@ lab:

首先创建一个启用了 Fabric 试用版的工作区、一个新的湖屋以及湖屋中的目标文件夹。

1. 登录 [Microsoft Fabric](https://app.fabric.microsoft.com) (`https://app.fabric.microsoft.com`),然后选择“Synapse 数据工程”体验。
1. 登录 [Microsoft Fabric](https://app.fabric.microsoft.com) (`https://app.fabric.microsoft.com`),然后选择“**数据工程**”体验。

![Synapse 数据工程体验的屏幕截图](Images/data-engineering-home.png)

Expand All @@ -36,7 +36,7 @@ lab:

1. 在工作区中,选择“+ 新建”>“湖屋”,提供一个名称,然后选择“创建” 。

> :memo: 注意:创建一个没有表或文件的新湖屋可能需要几分钟时间
> **注意:** 创建一个没有******文件**的新湖屋可能需要几分钟时间
![新湖屋的屏幕截图](Images/new-lakehouse.png)

Expand All @@ -54,14 +54,14 @@ lab:

1. 从湖屋的顶部菜单中,选择“打开笔记本”>“新建笔记本”,新建的笔记本会立即打开。

> :bulb: 提示:可以从此笔记本中访问湖屋资源管理器,并且可以在完成本练习时刷新以查看进度。
> **提示:** 可以从此笔记本中访问湖屋资源管理器,并且可以在完成本练习时刷新以查看进度。
1. 请注意,在默认单元格中,代码设置为“PySpark (Python)”。

1. 将以下代码插入代码单元格,该代码将:
1. 声明连接字符串的参数
1. 生成连接字符串
1. 将数据读取到 DataFrame
- 声明连接字符串的参数
- 生成连接字符串
- 将数据读取到 DataFrame

```Python
# Azure Blob Storage access info
Expand All @@ -81,7 +81,7 @@ lab:

预期结果:命令应会成功并输出 `wasbs://nyctlc@azureopendatastorage.blob.core.windows.net/yellow`

> :memo: 注意:Spark 会话从第一次代码运行时开始,因此可能需要更长时间才能完成。
> **注意:** Spark 会话从第一次代码运行时开始,因此可能需要更长时间才能完成。

1. 若要将数据写入文件,现在需要 RawData 文件夹的 ABFS 路径 。

Expand All @@ -99,9 +99,9 @@ lab:
blob_df.limit(1000).write.mode("overwrite").parquet(output_parquet_path)
```

1. output_parquet_path应如下所示:`abfss://Spark@onelake.dfs.fabric.microsoft.com/DPDemo.Lakehouse/Files/RawData/yellow_taxi`
1. 添加你的 **RawData** ABFS 路径并选择“ **▷ 运行单元格**”以将 1000 行写入 yellow_taxi.parquet 文件。

1. 选择代码单元格旁边的“▷ 运行单元格”,将 1000 行写入 yellow_taxi.parquet 文件。
1. output_parquet_path应如下所示:`abfss://Spark@onelake.dfs.fabric.microsoft.com/DPDemo.Lakehouse/Files/RawData/yellow_taxi`

1. 若要确认从湖屋资源管理器加载数据,请选择“文件”>...>“刷新”。

Expand All @@ -116,6 +116,9 @@ lab:
```python
from pyspark.sql.functions import col, to_timestamp, current_timestamp, year, month

# Read the parquet data from the specified path
raw_df = spark.read.parquet(output_parquet_path)

# Add dataload_datetime column with current timestamp
filtered_df = raw_df.withColumn("dataload_datetime", current_timestamp())

Expand All @@ -132,10 +135,10 @@ lab:

1. 选择代码单元格旁边的“▷ 运行单元格”。

* 这会添加一个时间戳列 dataload_datetime,记录数据加载到 Delta 表的时间
* 筛选 storeAndFwdFlag 中的 NULL
* 将筛选的数据加载到 Delta 表中
* 显示单行进行验证
- 这会添加一个时间戳列 dataload_datetime,记录数据加载到 Delta 表的时间
- 筛选 storeAndFwdFlag 中的 NULL
- 将筛选的数据加载到 Delta 表中
- 显示单行进行验证

1. 查看并确认显示的结果,如下图所示:

Expand All @@ -151,10 +154,10 @@ lab:

```python
from pyspark.sql.functions import col, to_timestamp, current_timestamp, year, month

# Read the parquet data from the specified path
raw_df = spark.read.parquet("**InsertYourABFSPathHere**")
raw_df = spark.read.parquet(output_parquet_path)

# Add dataload_datetime column with current timestamp
opt_df = raw_df.withColumn("dataload_datetime", current_timestamp())

Expand All @@ -175,8 +178,6 @@ lab:
display(opt_df.limit(1))
```

1. 再次获取 ABFS 路径,并在运行单元格之前更新块中的代码 。

1. 确认结果与优化代码之前的结果相同。

现在,记下这两个代码块的运行时间。 运行时间会有所不同,但可以看到使用优化的代码可以显著提高性能。
Expand Down Expand Up @@ -216,29 +217,16 @@ lab:
opttable_df = spark.sql('SELECT * FROM yellow_taxi_opt')

# Display results
display(opttable_df.limit(3))
display(opttable_df.limit(10))
```

1. 现在,选择顶部菜单栏中的“全部运行”。
1. 现在,为这两个查询中的第一个查询选择“**运行单元格**”按钮旁边的 ▼ 箭头,然后从下拉列表中选择“**运行此单元格及其之下**”。

这将运行所有代码单元格,并让你看到从开始到结束的完整过程。 你将能够看到优化代码块和非优化代码块之间的执行时间
这将运行最后两个代码单元格。 请注意查询具有非优化数据的表和具有优化数据的表之间的执行时间差异

## 清理资源

在本练习中,你了解了如何创建:

* 工作区
* 湖屋
* Fabric 笔记本
* PySpark 代码,用于:
* 与外部数据源连接
* 将数据读取到 DataFrame
* 将 DataFrame 数据写入 Parquet 文件
* 读取 Parquet 文件中的数据
* 转换 DataFrame 中的数据
* 将 DataFrame 数据加载到 Delta 表
* 优化 Delta 表写入
* 使用 SQL 查询 Delta 表数据
在本练习中,你在 Fabric 中将笔记本与 PySpark 配合使用来加载数据,并将其保存到 Parquet。 然后,你使用该 Parquet 文件进一步转换数据,并优化了 Delta 表写入。 最后,你使用 SQL 查询 Delta 表。

如果已完成探索,可删除为本练习创建的工作区。

Expand Down

0 comments on commit a7075c0

Please sign in to comment.