From 2fc20eefcc85942e9643fe98e1fc729a361889aa Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Sat, 20 Apr 2024 08:26:42 -0500 Subject: [PATCH 1/2] Add simple Spark notebook and environment --- spark.ipynb | 161 ++++++++++++++++++++++++++++++++++++++++++++++++++++ spark.yml | 16 ++++++ 2 files changed, 177 insertions(+) create mode 100644 spark.ipynb create mode 100644 spark.yml diff --git a/spark.ipynb b/spark.ipynb new file mode 100644 index 0000000..2131e31 --- /dev/null +++ b/spark.ipynb @@ -0,0 +1,161 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "f8eafed2-77a1-4691-8e8b-aeb1187ce8f5", + "metadata": { + "tags": [] + }, + "source": [ + "\n", + "Spark on Coiled\n", + "===============\n", + "\n", + "\n", + "\n", + "Coiled can run Spark Jobs.\n", + "\n", + "You get all the same Coiled ease of use features:\n", + "\n", + "1. Quick startup\n", + "2. Copies all of your local packages and code\n", + "3. Runs in any region on any hardware\n", + "4. Runs from your local notebook\n", + "\n", + "But now rather than just Dask you can run Spark too." + ] + }, + { + "cell_type": "markdown", + "id": "0d130128-ac72-4ce6-87b1-b7a20337fd2a", + "metadata": {}, + "source": [ + "### Read a little bit of data with pandas" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "09728a96-0c84-4198-ab52-4dcdfd704606", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import pandas as pd\n", + "\n", + "df = pd.read_parquet(\n", + " \"s3://coiled-data/uber/part.0.parquet\",\n", + ")\n", + "df" + ] + }, + { + "cell_type": "markdown", + "id": "3148ad8d-3de7-47b6-91a3-1d1f5a393f64", + "metadata": {}, + "source": [ + "## Start Spark cluster to read lots of data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9a9e6076-c8b3-4282-90a4-0fe3ab49440d", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import coiled\n", + "\n", + "cluster = coiled.Cluster(\n", + " n_workers=10,\n", + " worker_memory=\"16 GiB\",\n", + " region=\"us-east-2\",\n", + ")\n", + "\n", + "spark = cluster.get_spark()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "33b598a4-fe0a-43c5-8007-0e955ac193f9", + "metadata": {}, + "outputs": [], + "source": [ + "df = spark.read.parquet(\"s3a://coiled-data/uber\")\n", + "df.show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "56e2b982-af1b-4140-8cc1-414343ba1f0a", + "metadata": {}, + "outputs": [], + "source": [ + "df.count()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "829de2bc-ed09-4e10-b06f-268aa79ead59", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import dask\n", + "import dask.dataframe as dd\n", + "dask.config.set({\"dataframe.convert-string\": True}) # use PyArrow strings by default\n", + "\n", + "while True:\n", + " client.restart()\n", + "\n", + " df = dd.read_parquet(\n", + " \"s3://coiled-datasets/uber-lyft-tlc/\",\n", + " storage_options={\"anon\": True},\n", + " ).persist()\n", + "\n", + " for _ in range(10):\n", + " df[\"tipped\"] = df.tips != 0\n", + "\n", + " df.groupby(\"hvfhs_license_num\").tipped.mean().compute()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f4b6b9f9-7ef3-4ca0-b769-0dd7e4ce6b0b", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python [conda env:spark]", + "language": "python", + "name": "conda-env-spark-py" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.9" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/spark.yml b/spark.yml new file mode 100644 index 0000000..1b5940f --- /dev/null +++ b/spark.yml @@ -0,0 +1,16 @@ +name: spark +channels: + - conda-forge +dependencies: + - python=3.11 + - dask + - coiled + - ipykernel + - pyspark==3.4.1 + - pyarrow + - grpcio + - grpcio-status + - openjdk~=11.0 + - protobuf + - jupyterlab + - s3fs From c6498572351f920748750c5b2751459b36eecb2a Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Tue, 23 Apr 2024 09:23:07 -0500 Subject: [PATCH 2/2] remove last cell --- spark.ipynb | 35 ----------------------------------- 1 file changed, 35 deletions(-) diff --git a/spark.ipynb b/spark.ipynb index 2131e31..fc35b18 100644 --- a/spark.ipynb +++ b/spark.ipynb @@ -100,41 +100,6 @@ "source": [ "df.count()" ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "829de2bc-ed09-4e10-b06f-268aa79ead59", - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "import dask\n", - "import dask.dataframe as dd\n", - "dask.config.set({\"dataframe.convert-string\": True}) # use PyArrow strings by default\n", - "\n", - "while True:\n", - " client.restart()\n", - "\n", - " df = dd.read_parquet(\n", - " \"s3://coiled-datasets/uber-lyft-tlc/\",\n", - " storage_options={\"anon\": True},\n", - " ).persist()\n", - "\n", - " for _ in range(10):\n", - " df[\"tipped\"] = df.tips != 0\n", - "\n", - " df.groupby(\"hvfhs_license_num\").tipped.mean().compute()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "f4b6b9f9-7ef3-4ca0-b769-0dd7e4ce6b0b", - "metadata": {}, - "outputs": [], - "source": [] } ], "metadata": {