# Apache Spark Overview This page describes Spark concepts that are important to understand when using dsgrid. ## Windows Users Spark does not offer a great user experience in Windows. While `pyspark` and `spark-submit` work in local mode, running a cluster requires manual configuration. The developers provide cluster management scripts in bash, and so they do not work in Windows. :::{tip} When running a cluster on your laptop we recommend that you use dsgrid in a **Windows Subsystem for Linux (WSL2)** environment instead of a native Windows environment. ::: ### Windows Setup Requirements If you are running in local mode, you will need Hadoop's `winutils.exe` because Windows doesn't support HDFS. 1. Download `winutils.exe` and `hadoop.dll` files from [winutils repository](https://github.com/steveloughran/winutils) (select the Hadoop version you are using as winutils are specific to Hadoop versions) 2. Copy them into a folder like `C:\hadoop\bin` 3. Set the environment variable `HADOOP_HOME` to `C:\hadoop` 4. Add `%HADOOP_HOME%\bin` to your `PATH` If you get an error like: *"Python was not found; run without arguments to install from the Microsoft Store, or disable this shortcut from Settings > Manage App Execution Aliases."* try setting this environment variable: `PYSPARK_PYTHON=python` (or set the value to `ipython`, if you would prefer). ## Conventions This page uses UNIX conventions for environment variables and running commands in a shell. If you run any of these commands in a native Windows environment, you will have to adjust depending on whether you are running PowerShell or the old Command shell. ### Bash Functionality Explained - `$VARIABLE_NAME` or `${VARIABLE_NAME}`: This uses the environment variable `VARIABLE_NAME`. If the variable isn't defined, bash will use an empty string. - `export VARIABLE_NAME=x`: This sets the environment variable `VARIABLE_NAME` to the value `x` in the current shell. If you want to set this variable for all future shells as well, add the statement to your shell rc file (`$HOME/.bashrc` or `$HOME/.zshrc`). - `export VARIABLE_NAME=$(hostname)`: This sets the environment variable `VARIABLE_NAME` to the string returned by the command `hostname`. In examples listed on this page where you need to enter text yourself, that text is enclosed in `<>` as in `http://:4040`. ## Spark Cluster Modes Apache provides an overview at [Spark Cluster Overview](https://spark.apache.org/docs/latest/cluster-overview.html). The most important parts to understand are how dsgrid uses different cluster modes in different environments: ### Local Computer in Local Mode All Spark components run in a single process. This is great for testing and development but is not performant. It will not use all CPUs on your system. You run in this mode when you type `pyspark` in your terminal. **Use for:** Quick testing and development only. ### Local Computer with Standalone Cluster You must install Spark and then manually start the cluster. Refer to [Installing on Laptop](#installing-on-laptop). This enables full performance on your system. It also allows you to debug your jobs in the Spark UI before or after they complete. **Use for:** Development work that requires full system performance. ### HPC Compute Node in Local Mode Use this only for quick checks. Same points above for local computer in local mode apply. :::{warning} If you use this, set the environment variable `SPARK_LOCAL_DIRS=/tmp/scratch` so that you have enough space. ::: **Use for:** Quick checks only. Create a standalone cluster for real work. ### HPC Compute Node(s) with Standalone Cluster Create a cluster on any number of compute nodes and then use all CPUs for your jobs. Refer to [Installing on HPC](#installing-on-hpc). **Use for:** Production workloads and large-scale queries. ### AWS EMR Cluster The EMR scripts in the dsgrid repo at `/emr` will create a Spark cluster on EC2 compute nodes with a cluster manager. The cluster manager allows multiple users to access a single cluster and offers better job scheduling. Refer to the README.md in that directory. **Use for:** Cloud-based large-scale production workloads. ## Running Spark Applications There are three ways of running Spark applications in Python. `spark-submit` and `pyspark`, provided by the Spark and pyspark installations, are recommended because they allow you to fully customize the execution environment. More details follow in [Tuning Configuration](#tuning-configuration-settings). ### 1. spark-submit This command will create a SparkSession, make that session available to the Python process, and run your code. ```bash $ spark-submit [options] your_script.py [your_script_options] ``` **Use for:** Running Python scripts with Spark. ### 2. pyspark This command will create a SparkSession, make that session available to the Python process, and leave you in the Python interpreter for an interactive session. ```bash $ pyspark [options] ``` **Use for:** Interactive exploration and development. ### 3. Inside Python ```python from pyspark.sql import SparkSession spark = SparkSession.builder.appName("your_app_name").getOrCreate() ``` **Use for:** Programmatic control within Python applications. ## Run pyspark through IPython or Jupyter You can configure `pyspark` to start `IPython` or `Jupyter` instead of the standard Python interpreter by setting the environment variables `PYSPARK_DRIVER_PYTHON` and `PYSPARK_DRIVER_PYTHON_OPTS`. ### IPython ```bash $ export PYSPARK_DRIVER_PYTHON=ipython ``` **Local mode:** ```bash $ pyspark ``` **Cluster mode:** ```bash $ pyspark --master=spark://$(hostname):7077 ``` Now you are in IPython instead of the standard Python interpreter. ### Jupyter ```bash $ export PYSPARK_DRIVER_PYTHON=jupyter $ export PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser --port=8889 --ip=0.0.0.0" $ export SPARK_HOME=$(python -c "import pyspark;print(pyspark.__path__[0])") ``` **Local mode:** ```bash $ pyspark ``` **Cluster mode:** ```bash $ pyspark --master=spark://$(hostname):7077 ``` Pyspark will start a Jupyter notebook and you'll see the URL printed in the terminal. If you're on a remote server, like in an HPC environment, you'll need to create an SSH tunnel in order to connect in a browser. Once you connect in a browser, enter the following in a cell in order to connect to this cluster: ```python from pyspark.sql import SparkSession spark = SparkSession.builder.appName("your_app_name").getOrCreate() ``` ## Spark UI The Spark master starts a web application at `http://:8080`. Job information is available at port 4040. You can monitor and debug all aspects of your jobs in this application. You can also inspect all cluster configuration settings. ### SSH Tunnel for Remote Access If your Spark cluster is running on a remote system, like an HPC, you may need to open an SSH tunnel to the master node. Here is how to do that on NLR's Kestrel cluster. **On your laptop:** ```bash $ export COMPUTE_NODE= $ ssh -L 4040:$COMPUTE_NODE:4040 -L 8080:$COMPUTE_NODE:8080 $USER@kestrel.hpc.nrel.gov ``` Then access: - **Master UI:** `http://localhost:8080` - **Job UI:** `http://localhost:4040` ## Installing a Spark Standalone Cluster The next sections describe how to install a standalone cluster on a local system and an HPC. ### Installing on Laptop :::{note} As stated earlier, the scripts mentioned in this section do not work in a native Windows environment. You can still start a cluster in Windows; you just have to run the Java commands yourself. ::: #### Download Spark Download your desired version from [Spark Downloads](https://spark.apache.org/downloads.html) and extract it on your system. :::{important} Choose the version that is set in `pyproject.toml` for `pyspark`. Major, minor, and patch versions must match. ::: ```bash $ cd # directions below assume $HOME $ wget https://dlcdn.apache.org/spark/spark-4.0.0/spark-4.0.0-bin-hadoop3.tgz $ tar -xzf spark-4.0.0-bin-hadoop3.tgz && rm spark-4.0.0-bin-hadoop3.tgz ``` The full instructions to create a cluster are at [Spark Standalone Mode](http://spark.apache.org/docs/latest/spark-standalone.html). The rest of this section documents the requirements for dsgrid. #### Set Environment Variables ```bash $ export SPARK_HOME=$HOME/spark-4.0.0-bin-hadoop3 $ export PATH=$PATH:$SPARK_HOME/sbin ``` Note that after doing this your system will have two versions of `pyspark`: - In your Python virtual environment where you installed dsgrid (because dsgrid installs pyspark) - In `$HOME/spark-4.0.0-bin-hadoop3/bin` If you use a conda virtual environment, when that environment is activated, its `pyspark` will be in your system path. Be sure not to add the spark bin directory to your path so that there are no collisions. :::{warning} Setting `SPARK_HOME` will affect operation of your Python `pyspark` installation in local mode. That may not be what you want if you make settings specific to the standalone cluster. ::: #### Customize Spark Configuration Settings ```bash $ cp $SPARK_HOME/conf/spark-defaults.conf.template $SPARK_HOME/conf/spark-defaults.conf $ cp $SPARK_HOME/conf/spark-env.sh.template $SPARK_HOME/conf/spark-env.sh ``` **Key settings in `spark-defaults.conf`:** - **`spark.driver.memory`** and **`spark.driver.maxResultSize`**: Set to the maximum data sizes that you expect to pull from Spark to Python, such as if you call `df.toPandas()`. `1g` is probably reasonable. - **`spark.sql.shuffle.partitions`**: Set to 1-4x the number of cores in your system. Note that the default value is 200, and you probably don't want that. - **`spark.executor.cores`** and **`spark.executor.memory`**: Set to numbers that allow creation of your desired number of executors. Spark will try to create the most number of executors such that each executor has those resources. For example, if your system has 16 cores and you assign 16g of memory to the worker, `spark.executor.cores 3` and `spark.executor.memory 5g` will result in 3 executors. #### Start the Spark Processes ```bash $ $SPARK_HOME/sbin/start-master.sh ``` Start a worker with this command. Give the worker as much memory as you can afford. Executor memory comes from this pool. You can also configure this in `spark-env.sh`. ```bash $ $SPARK_HOME/sbin/start-worker.sh -m 16g spark://$(hostname):7077 ``` If you add the `sbin` to your `PATH` environment variable, here is a one-liner: ```bash $ start-master.sh && start-worker.sh -m 24g spark://$(hostname):7077 ``` #### Stop the Spark Processes Stop all of the processes when you complete your work. ```bash $ stop-worker.sh && stop-master.sh ``` ### Installing on HPC This section describes how you can run Spark jobs on any number of HPC compute nodes. The scripts and examples described here rely on the SLURM scheduling system and have been tested on NLR's Kestrel cluster. #### Install sparkctl Install the Python package `sparkctl` to run the scripts described below: ```bash $ pip install "sparkctl[pyspark]" ``` The [sparkctl documentation](https://nrel.github.io/sparkctl/) has generic instructions to run Spark in a variety of ways. The rest of this section calls out choices that you should make to run Spark jobs with dsgrid. #### Steps to Start a Cluster 1. **Choose compute node(s) with fast local storage.** This example will allocate one node. Refer to the [Kestrel Filesystems documentation](https://nrel.github.io/HPC/Documentation/Systems/Kestrel/Filesystems/#node-file-system) for more information for this type of compute node. ```bash $ salloc -t 01:00:00 -N1 --account=dsgrid --partition=nvme --mem=240G ``` 2. **Configure Spark parameters** based on the amount of memory and CPU in each compute node. This command must be run on a compute node. The script will check for the environment variable `SLURM_JOB_ID`, which is set by `Slurm`. If you ssh'd into the compute node, it won't be set and then you have to pass it as an argument. ```bash $ sparkctl configure --start ``` Run `sparkctl configure --help` to see all options. Alternatively, or in conjunction with the above command, customize the Spark configuration files in `./conf` as necessary. 3. **Ensure that the dsgrid application uses the Spark configuration** that you just defined. ```bash $ export SPARK_CONF_DIR=$(pwd)/conf $ export JAVA_HOME=/datasets/images/apache_spark/jdk-21.0.7 ``` 4. **Follow the rest of the sparkctl instructions** for starting the cluster and running jobs. See also: - [Run dsgrid on Kestrel](../how_tos/run_on_kestrel) for complete workflow - [Start Spark Cluster on Kestrel](../how_tos/spark_cluster_on_kestrel) for detailed cluster setup ## Tuning Configuration Settings In general you want to run Spark with as many executors as possible on each worker node. The Amazon orchestration software along with the cluster manager *may* take care of that when running on AWS (you will still have to adjust `spark.sql.shuffle.partitions`). You will have to perform more customizations when running a standalone cluster on your laptop or an HPC. ### Configuration Priority Order These are listed in order of priority - later methods will override the earlier methods when allowed. #### 1. Global Spark Configuration Directory This is `$SPARK_HOME/conf` or `$SPARK_CONF_DIR`. You can customize settings in `spark-defaults.conf` and `spark-env.sh`. Make customizations here if you will use the same settings in all jobs. #### 2. Spark Launch Scripts Use `spark-submit` to run scripts. Use `pyspark` to run interactively. Both scripts offer the same startup options. You can choose to run in local mode or attach to a cluster. You can override any setting from #1. Make changes here if you will use different settings across jobs. :::{note} Some settings must be made before the Spark JVM starts, like `spark.driver.memory`, and so this is your last chance to customize those values. ::: #### 3. SparkSession Construction Inside Python You can customize things like executor settings when you construct the `SparkSession` in Python. For example, this code block will create a session where the job starts a single executor with a single core that uses all available memory. ```python from pyspark import SparkConf from pyspark.sql import SparkSession conf = SparkConf().setAppName("my_app") conf.set("spark.executor.cores", 1) conf.set("spark.executor.memory", "16g") conf.setMaster(cluster) spark = SparkSession.builder.config(conf=conf).getOrCreate() ``` #### 4. Dynamic Changes You can make changes to a limited number of settings at runtime. You can't change the number of executor cores because those have already been allocated. You can change the number of shuffle partitions that Spark will use. You may want to change that value if the sizes of the dataframes you're working on change dramatically. ```python from pyspark.sql import SparkSession spark = SparkSession.getActiveSession() spark.conf.set("spark.sql.shuffle.partitions", 500) ``` ## Creating a SparkSession with dsgrid Ensure that the dsgrid software uses the cluster with optimized settings. If you start the dsgrid Python process with the Spark scripts `spark-submit` or `pyspark` and set the `--master` option, those scripts will create a SparkSession attached to the cluster and pass it to the Python process. You can optionally set the `SPARK_CLUSTER` environment variable to the cluster URL and then dsgrid will connect to it. ```bash $ export SPARK_CLUSTER=spark://$(hostname):7077 ``` Using `SPARK_CLUSTER` is a bit simpler, but you cannot configure settings like `spark.driver.memory`, which, as stated earlier, must be set before the JVM is created. ### Using spark-submit with dsgrid Running dsgrid CLI commands through `spark-submit` requires cumbersome syntax because the tool needs to: 1. Detect that the script is Python (which is why this example uses dsgrid-cli.py instead of dsgrid). 2. Know the full path to the script (accomplished with the utility `which`). Here's how to do that: ```bash $ spark-submit --master spark://$(hostname):7077 \ $(which dsgrid-cli.py) \ query project run \ --registry-path=/scratch/${USER}/.dsgrid-registry \ query.json ``` :::{note} If you want to set a breakpoint in your code for debug purposes, you cannot use spark-submit. ::: ## Troubleshooting Configuration Problems Get used to monitoring Spark jobs in the Spark UI. The master is at `http://:8080` and jobs are at `http://:4040`. If a job seems stuck or slow, explore why. Then kill the job, make config changes, and retry. A misconfigured job will take too long or never finish. This section explains some common problems. ### spark.sql.shuffle.partitions The most common performance issue we encounter when running complex queries is due to a non-ideal setting for `spark.sql.shuffle.partitions`. The default Spark value is 200. Some online sources recommend setting it to 1-4x the total number of CPUs in your cluster. This [video](https://www.youtube.com/watch?v=daXEp4HmS-E&t=4251s) by a Spark developer offers a recommendation that has worked out better. **Use this formula:** ``` num_partitions = max_shuffle_write_size / target_partition_size ``` You will have to run your job once to determine `max_shuffle_write_size`. You can find it on the Spark UI Stages tab in the Shuffle Write column. Your `target_partition_size` should be between 128 - 200 MB. The minimum partitions value should be the total number of cores in the cluster unless you want to leave some cores available for other jobs that may be running simultaneously. ### Running Out of Space in Local Mode on HPC The `/tmp` directory on HPC filesystems is very small. If you run Spark local mode with default settings, it will try to use that directory for scratch space and then quickly fill it up and fail. Set the environment variable `SPARK_LOCAL_DIRS` to an appropriate directory. ```bash $ export SPARK_LOCAL_DIRS=/tmp/scratch ``` The scripts discussed above set this environment variable for standalone clusters on an HPC. ### Dynamic Allocation This feature is disabled by default on standalone clusters. It is described in the [Spark documentation](https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation). We have observed two cases where enabling dynamic allocation on a standalone cluster significantly improves performance: - When a Spark job produces a huge number of tasks. This can happen when checking dimension associations as well as queries that map a dataset to project dimensions. - When there is really only enough work for one executor but Spark distributes it to all executors anyway. The inter-process communication adds tons of overhead. The executors also appear to do a lot more work. With dynamic allocation Spark gradually adds executors and these problems don't always occur. The second issue sometimes occurs when submitting a dataset to a project in the registry. We recommend enabling dynamic allocation when doing this. If that is enabled then dsgrid code will reconfigure the SparkSession to use a single executor with one core. Set these values in your `spark-defaults.conf`. `spark.shuffle.service.enabled` must be set before you start the workers. ``` spark.dynamicAllocation.enabled true spark.dynamicAllocation.shuffleTracking.enabled true spark.shuffle.service.enabled true spark.shuffle.service.db.enabled = true spark.worker.cleanup.enabled = true ``` We have not observed any downside to having this feature enabled. ### Slow Local Storage Spark will write lots of temporary data to local storage during shuffle operations. If your joins cause lots of shuffling, it is very important that your local storage be fast. If you direct Spark to use the Lustre filesystem for local storage, your mileage will vary. If the system is idle, it might work. If it is saturated, your job will likely fail. ### Too Many Executors Involved in a Job Some of our Spark jobs, particularly those that create large, in-memory tables from dimension records and mappings, perform much better when there is only one executor and only one CPU. If you think this is happening then set these values in your `spark-defaults.conf`: ``` spark.executor.cores 1 # This value should be greater than half of the memory allocated to the Spark workers, which # will ensure that Spark can only create one executor. spark.executor.memory 15g ``` One common symptom of this type of problem is that a job run in local mode works better than in a standalone cluster. The likely reason is that the standalone cluster has many executors and local mode only has one. ### Partition Size Problems Spark documentation recommends that Parquet partition files be in the range of 100-200 MiB, with 128 MiB being ideal. A very high compression ratio may change that. This affects how much data each task reads from storage into memory. **Check your partition sizes:** ```bash $ find -name "*.parquet" -exec ls -lh {} + ``` **Check the total size:** ```bash $ du -sh ``` Divide the total size by the target partition size to get the desired number of partitions. **If there are too few partitions currently:** ```python df.repartition(target_partitions).write.parquet("data.parquet") ``` **If there are too many partitions currently:** ```python df.coalesce(target_partitions).write.parquet("data.parquet") ``` **If you are partitioning by a column and find that there are many very small partition files:** ```python df.repartition(column_name).write.partitionBy(column_name).parquet("data.parquet") ``` ### Executors Spilling to Disk You are observing that the Spark job seems stalled: - Most of the work in a stage is complete, but one or two executors are still running. - Only 1-2 CPUs out of the entire cluster show any activity. - You see this log message over and over in the `stderr` log files (e.g., `spark_scratch/workers/app-20250115165825-0006/1/stderr`): ``` 25/07/15 14:24:06 INFO UnsafeExternalSorter: Thread 60 spilling sort data of 4.6 GiB to disk (201 times so far) ``` If this has occurred 201 times over more than an hour, then you might be better off canceling the job and re-running with a different configuration. #### Solutions 1. **Ensure `spark.sql.shuffle.partitions` is set to a reasonable value**, as discussed above. 2. **Increase executor memory.** Kestrel compute nodes have disproportionate CPUs with respect to memory. By default, our configuration scripts will allocate 7 GB of memory per executor and this is insufficient for some queries. The error message indicates that the executor was not able to perform a sort in memory, and so spilled to disk. This is very slow. Try to double or triple the executor memory. You can do this by setting the `spark.executor.memory` value in `spark-defaults.conf`. You can set that value directly by editing the file or indirectly by setting `--executor-cores` to 10 or 15 (default is 5), thereby reducing the number of executors (by assigning each one more of the total available cores) and giving each executor more memory. You can also acquire a bigmem node, which has 2 TB of memory. On these nodes, our Spark scripts will allocate 70 GB of memory per executor. The debug partition on Kestrel usually has two bigmem nodes. 3. **Check for data skew.** This has happened frequently when performing dimension mapping operations that explode data sizes by disaggregating or duplicating data. One or two executors end up with significantly more data than the others, and get stuck. Refer to the next section. ### Data Skew If your query will produce high data skew, such as can happen with a query that produces results from large and small counties, you can use a salting technique to balance the data. For example: ```python df.withColumn("salt_column", F.lit(F.rand() * (num_partitions - 1))) \ .groupBy("county", "salt_column") \ .agg(F.sum("county")) \ .drop("salt_column") ``` dsgrid will employ this technique for specific mapping types or when you enable `handle_data_skew` in a dataset's mapping plan. ## Next Steps - Learn about [running dsgrid on Kestrel](../how_tos/run_on_kestrel) - Set up a [Spark cluster on Kestrel](../how_tos/spark_cluster_on_kestrel) - Understand [software architecture](../../software_reference/architecture) - Explore [query concepts](../project_queries/concepts)