Skip to content

Commit

Permalink
Merge pull request #242 from MrPowers/202-drop-pyspark2
Browse files Browse the repository at this point in the history
Drop Spark-2 support and update dependencies
  • Loading branch information
SemyonSinchenko authored Jul 14, 2024
2 parents e1b66c8 + c92c381 commit 407f463
Show file tree
Hide file tree
Showing 11 changed files with 1,190 additions and 1,254 deletions.
40 changes: 9 additions & 31 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,12 @@ jobs:
fail-fast: false
matrix:
include:
- pyspark-version: 2.4.8 # latest published 2.x version
pip-packages: "pypandoc==1.7 pyspark==2.4.8" # downgrade of pypandoc necessary
- pyspark-version: 3.0.3
pip-packages: "pyspark==3.0.3"
- pyspark-version: 3.1.3
pip-packages: "pyspark==3.1.3"
- pyspark-version: 3.2.4
pip-packages: "pyspark==3.2.4"
- pyspark-version: 3.3.2
pip-packages: "pyspark==3.3.2"
- pyspark-version: 3.4.0
pip-packages: "pyspark==3.4.0"
- pyspark-version: 3.3.4
pip-packages: "pyspark==3.3.4"
- pyspark-version: 3.4.3
pip-packages: "pyspark==3.4.3"
- pyspark-version: 3.5.1
pip-packages: "pyspark==3.5.1"

steps:
- uses: actions/checkout@v1
Expand All @@ -39,33 +33,17 @@ jobs:
uses: actions/setup-java@v3
with:
distribution: 'zulu'
java-version: '8' # Supported by Spark 2.x & 3.x

- name: Get supported Python Version depending on PySpark
uses: haya14busa/action-cond@v1
id: python_version
with:
cond: ${{ startsWith(matrix.pyspark-version, '2.') }}
if_true: '3.7' # latest supported version for PySpark 2.x
if_false: '3.9' # PySpark 3+
java-version: '17' # Spark 4.0 will drop Java 11;

- name: Set up Python ${{ steps.python_version.outputs.value }}
uses: actions/setup-python@v2
with:
python-version: ${{ steps.python_version.outputs.value }}

- name: Get supported Poetry version
uses: haya14busa/action-cond@v1
id: poetry_version
with:
cond: ${{ startsWith(matrix.pyspark-version, '2.') }}
if_true: '1.5.1' # latest supported version for PySpark 2.x
if_false: '1.6.1' # PySpark 3+
python-version: '3.10'

- name: Install Poetry
uses: snok/install-poetry@v1
with:
version: ${{ steps.poetry_version.outputs.value }}
version: '1.6.1'

- name: Cache Poetry virtualenv
uses: actions/cache@v1
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ jobs:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: 3.9
python-version: '3.10'
- name: Run Ruff
uses: chartboost/ruff-action@v1
with:
version: 0.0.291
version: 0.5.1
17 changes: 17 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# COMMON CLI COMMANDS FOR DEVELOPMENT

all: help

.PHONY: install_test
install_test:
@poetry install --with=development,testing
Expand All @@ -19,3 +21,18 @@ test:
.PHONY: lint
lint:
@poetry run ruff check --fix quinn

.PHONY: format
format:
@poetry run ruff format quinn

.PHONY: help
help:
@echo '................... Quin ..........................'
@echo 'help - print that message'
@echo 'lint - run linter'
@echo 'format - reformat the code'
@echo 'test - run tests'
@echo 'install_test - install test deps'
@echo 'install_deps - install dev deps'
@echo 'update_deps - update and install deps'
2,288 changes: 1,119 additions & 1,169 deletions poetry.lock

Large diffs are not rendered by default.

38 changes: 23 additions & 15 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "quinn"
version = "0.10.3"
version = "1.0.0"
description = "Pyspark helper methods to maximize developer efficiency"
authors = ["MrPowers <[email protected]>"]

Expand All @@ -22,7 +22,7 @@ build-backend = "poetry.masonry.api"
###########################################################################

[tool.poetry.dependencies]
python = ">=3.7,<4.0"
python = ">=3.9,<4.0"


###########################################################################
Expand All @@ -42,26 +42,28 @@ optional = true
optional = true

[tool.poetry.group.development.dependencies]
pyspark = ">2"
pyspark = ">3"
semver = "^3"

[tool.poetry.group.testing.dependencies]
pytest = "^7"
chispa = "0.9.4"
chispa = "^0.10"
pytest-describe = "^2"
pyspark = ">2"
pyspark = ">3"
semver = "^3"

[tool.poetry.group.linting.dependencies]
ruff = "^0.0.291"
ruff = "^0.5"

[tool.poetry.group.docs.dependencies]
mkdocstrings-python = "^0.8.3"
mkdocs-gen-files = "^0.4.0"
mkdocs-literate-nav = "^0.6.0"
mkdocs-section-index = "^0.3.5"
markdown-include = "^0.8.1"
mkdocs = "^1"
# All the dependencies related to mkdocs;
# We are pinning only the main version of mkdocs.
mkdocstrings-python = "*"
mkdocs-gen-files = "*"
mkdocs-literate-nav = "*"
mkdocs-section-index = "*"
markdown-include = "*"
mkdocs = "^1.6"
jupyterlab = "*"
mkdocs-jupyter = "*"
mkdocs-material = "*"
Expand All @@ -74,8 +76,11 @@ markdown-exec = "*"
###########################################################################

[tool.ruff]
select = ["ALL"]
line-length = 150
extend-exclude = ["tests", "docs"]

[tool.ruff.lint]
select = ["ALL"]
ignore = [
"D100",
"D203", # Ignore blank line before summary of class
Expand All @@ -94,10 +99,13 @@ ignore = [
"PTH123", # Don't force use of Pathlib
"PTH207", # Don't force use of Pathlib
"PTH113", # Don't force use of Pathlib
"ISC001", # Recommended by Ruff
"COM812", # Recommended by Ruff
]
extend-exclude = ["tests", "docs"]

[tool.ruff.per-file-ignores]
[tool.ruff.lint.per-file-ignores]
"quinn/extensions/column_ext.py" = ["FBT003", "N802"]
"quinn/extensions/__init__.py" = ["F401", "F403"]
"quinn/__init__.py" = ["F401", "F403"]
"quinn/functions.py" = ["FBT003"]
"quinn/keyword_finder.py" = ["A002"]
4 changes: 1 addition & 3 deletions quinn/append_if_schema_identical.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ def append_if_schema_identical(source_df: DataFrame, target_df: DataFrame) -> Da
source_schema_list = [(field.name, str(field.dataType)) for field in source_schema]
target_schema_list = [(field.name, str(field.dataType)) for field in target_schema]

unmatched_cols = [
col for col in source_schema_list if col not in target_schema_list
]
unmatched_cols = [col for col in source_schema_list if col not in target_schema_list]
error_message = (
f"The schemas of the source and target dataframes are not identical."
f"From source schema column {unmatched_cols} is missing in target schema"
Expand Down
1 change: 1 addition & 0 deletions quinn/keyword_finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
"sparkContext",
]


@dataclass
class SearchResult:
"""Class to hold the results of a file search.
Expand Down
7 changes: 2 additions & 5 deletions quinn/math.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# limitations under the License.

"""Math routines for PySpark."""

from __future__ import annotations

from typing import Optional, Union
Expand Down Expand Up @@ -40,11 +41,7 @@ def rand_laplace(

u = F.rand(seed)

return (
F.when(u < F.lit(0.5), mu + beta * F.log(2 * u))
.otherwise(mu - beta * F.log(2 * (1 - u)))
.alias("laplace_random")
)
return F.when(u < F.lit(0.5), mu + beta * F.log(2 * u)).otherwise(mu - beta * F.log(2 * (1 - u))).alias("laplace_random")


def div_or_else(
Expand Down
4 changes: 2 additions & 2 deletions quinn/schema_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def _lookup_type(type_str: str) -> T.DataType:

return type_lookup[type_str]

def _convert_nullable(null_str: str) -> bool:
def _convert_nullable(null_str: Optional[str]) -> bool:
if null_str is None:
return True

Expand Down Expand Up @@ -160,7 +160,7 @@ def _convert_nullable(null_str: str) -> bool:
name=row["name"],
dataType=_lookup_type(row["type"]),
nullable=_convert_nullable(row["nullable"]) if "nullable" in row else True,
metadata=_validate_json(row["metadata"] if "metadata" in row else None),
metadata=_validate_json(row["metadata"] if "metadata" in row else None), # noqa: SIM401
)
fields.append(field)

Expand Down
8 changes: 3 additions & 5 deletions quinn/split_columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,17 +117,15 @@ def _num_delimiter(col_value1: str) -> int:
df = df.withColumn("del_length", num_udf(df[col_name])) # noqa: PD901
df.cache()
# Drop the original column if the new columns were created successfully
df = df.select( # noqa: PD901
df = df.select( # noqa: PD901
[c for c in df.columns if c not in {"del_length", col_name}],
)

elif mode == "permissive":
# Create an array of select expressions to create new columns from the split values
# Use the default value if a split value is missing or empty
select_exprs = select_exprs = [
when(length(split_col_expr.getItem(i)) > 0, split_col_expr.getItem(i))
.otherwise(default)
.alias(new_col_names[i])
select_exprs = [
when(length(split_col_expr.getItem(i)) > 0, split_col_expr.getItem(i)).otherwise(default).alias(new_col_names[i])
for i in range(len(new_col_names))
]

Expand Down
33 changes: 11 additions & 22 deletions quinn/transformations.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,7 @@ def with_some_columns_renamed(
"""

def _(df: DataFrame) -> DataFrame:
cols = [
F.col(f"`{col_name}`").alias(fun(col_name))
if change_col_name(col_name)
else F.col(f"`{col_name}`")
for col_name in df.columns
]
cols = [F.col(f"`{col_name}`").alias(fun(col_name)) if change_col_name(col_name) else F.col(f"`{col_name}`") for col_name in df.columns]
return df.select(*cols)

return _
Expand Down Expand Up @@ -120,7 +115,9 @@ def sort_columns( # noqa: C901,PLR0915
"""

def sort_nested_cols(
schema: StructType, is_reversed: bool, base_field: str="",
schema: StructType,
is_reversed: bool,
base_field: str = "",
) -> list[str]:
# recursively check nested fields and sort them
# https://stackoverflow.com/questions/57821538/how-to-sort-columns-of-nested-structs-alphabetically-in-pyspark
Expand All @@ -146,7 +143,9 @@ def parse_fields(

results.extend(
sort_nested_cols(
new_struct, is_reversed, base_field=new_base_field,
new_struct,
is_reversed,
base_field=new_base_field,
),
)
return results
Expand Down Expand Up @@ -242,10 +241,7 @@ def fix_nullability(field: StructField, result_dict: dict) -> None:
if not sort_nested:
return top_level_sorted_df

is_nested: bool = any(
isinstance(i.dataType, (StructType, ArrayType))
for i in top_level_sorted_df.schema
)
is_nested: bool = any(isinstance(i.dataType, (StructType, ArrayType)) for i in top_level_sorted_df.schema)

if not is_nested:
return top_level_sorted_df
Expand Down Expand Up @@ -281,10 +277,7 @@ def flatten_struct(df: DataFrame, col_name: str, separator: str = ":") -> DataFr
:rtype: List[Column]
"""
struct_type = complex_fields(df.schema)[col_name]
expanded = [
F.col(f"`{col_name}`.`{k}`").alias(col_name + separator + k)
for k in [n.name for n in struct_type.fields]
]
expanded = [F.col(f"`{col_name}`.`{k}`").alias(col_name + separator + k) for k in [n.name for n in struct_type.fields]]
return df.select("*", *expanded).drop(F.col(f"`{col_name}`"))


Expand All @@ -302,9 +295,7 @@ def flatten_map(df: DataFrame, col_name: str, separator: str = ":") -> DataFrame
"""
keys_df = df.select(F.explode_outer(F.map_keys(F.col(f"`{col_name}`")))).distinct()
keys = [row[0] for row in keys_df.collect()]
key_cols = [
F.col(f"`{col_name}`").getItem(k).alias(col_name + separator + k) for k in keys
]
key_cols = [F.col(f"`{col_name}`").getItem(k).alias(col_name + separator + k) for k in keys]
return df.select(
[F.col(f"`{col}`") for col in df.columns if col != col_name] + key_cols,
)
Expand Down Expand Up @@ -407,9 +398,7 @@ def explode_array(df: DataFrame, col_name: str) -> DataFrame:

# Sanitize column names with the specified replace_char
if sanitized_columns:
sanitized_columns = [
sanitize_column_name(col_name, replace_char) for col_name in df.columns
]
sanitized_columns = [sanitize_column_name(col_name, replace_char) for col_name in df.columns]
df = df.toDF(*sanitized_columns) # noqa: PD901

return df

0 comments on commit 407f463

Please sign in to comment.