CHPC Parallelization

Parallel (an exercise)

Now, we’ll use the Rscript for using the parallel package Case 2: Single job, multicore job:

Exercise:

  1. Pull the file 02-mclapply.R to your local machine and then send it to hpc.
  2. Copy a version of the slurm script from 01-sapply.slurm to have the name 02-mclapply.slurm (or a similar name) and add the SBATCH option:
#SBATCH --cpus-per-task=4

Also, change the R script to 02-mclapply.R.

  1. Run the results and send it back to your local machine.

Modify by leaving open the number of cores requested

[JC to revisit with question to CHPC]

Threading (chatgpt definition): Threading in a cluster environment is a way to distribute and manage parallel tasks across multiple cores, processors, or even nodes on a computing cluster. In R, threading enables functions to run simultaneously rather than sequentially, making the most of the cluster’s computational resources to speed up processing.

“By default we have turned off multi-threading by setting the environmental variable OMP_NUM_THREADS to 1, i.e. 

setenv OMP_NUM_THREADS 1   # Tcsh/Csh Shell
export OMP_NUM_THREADS=1   # Bash Shell

to facilitate easier use of parallel independent calculations. If you want to run R in a multithreaded fashion (e.g. on a compute node), we strongly recommend not to use more threads than there are physical cores on the node.” (Using R at CHPC)

Running many independent R batch calculations as one job (on Utah’s cluster environment)

https://www.chpc.utah.edu/documentation/software/r-language.php#rmulti

“We mentioned above that both versions of R were built using the multi-threaded MKL library. The thread based parallelization is useful for vectorized R programs, but, not all workflows vectorize. Therefore, if one has many independent calculations to run, it is more efficient to run single threaded R and use SLURM’s capability of running independent calculations within a job in parallel.” (Running many independent R batch calculations as one job)

Single-threaded R on each core

In this approach, you need to create three files: (1) script file, (2) slurm file, (3) a shell (wrapper) file that can execute a job targetting specific cores

Create a new directory called pi-nodes-and-cores:

cd chpc-examples
mkdir pi-nodes-and-cores
cd pi-nodes-and-cores

Create the script file

Copy the script for pi 01-sapply.R into pi-nodes-and-cores with a new name 03-sapply.R:

cp ../pi/01-sapply.R 03-sapply.R

Make the following modifications which we’ll revisit later:

# parameters (objects that will be set in a slurm/shell script)
if(!is.element("seed",ls())) seed <- 12322
if(!is.element("out_dir",ls())) outdir <- "sims"


# Model parameters
nsims <- 1e3
n     <- 1e4

# Function to simulate pi
simpi <- function(i) {

  p <- matrix(runif(n*2, -1, 1), ncol = 2)
  mean(sqrt(rowSums(p^2)) <= 1) * 4

}

# Approximation
# set.seed(12322)
ans <- sapply(1:nsims, simpi)

message("Pi: ", mean(ans))

# Save answer to a file denoted by it's seed
saveRDS(ans, paste0(out_dir,"/s_",seed,".rds"))

Create the slurm file

Create a slurm file

vim nodes-and-cores.sl

Include something similar to the following:

#!/bin/bash
  
#SBATCH --time=00:02:00 
# SBATCH --mail-type=END
# SBATCH --mail-user=jonathan.chipman@hci.utah.edu
#SBATCH -o o_%j 
#SBATCH -e e_%j
# SBATCH -a 1-9
# SBATCH --account=chipman-np
# SBATCH --partition=chipman-np
# SBATCH --account=hci-collab
# SBATCH --partition=hci-kp
#SBATCH --account=owner-guest
#SBATCH --partition=notchpeak-guest
# SBATCH --partition=kingspeak-guest
# SBATCH --account=chipman
# SBATCH --partition=notchpeak-freecycle
# SBATCH --partition=kingspeak
# SBATCH --constraint="m256|m384|m512|m1024" # for KP
# SBATCH --constraint="m256|m384|m512|m768" # for NP
# SBATCH --constraint="rom"
#SBATCH --job-name=n-a-c

# Job Parameters
export EXE=nodes-and-cores.sh
export SCRIPT_FILE=03-sapply.R
export OUT_DIR=sims

# Load R
module load R

# Run an array of serial jobs
export OMP_NUM_THREADS=1

echo " Calculation started at:`date`"
echo " #$SLURM_TASKS_PER_NODE cores detected on `hostname`"

# Create the my.config.$SLURM_JOBID file on the fly
for (( i=0; i < $SLURM_TASKS_PER_NODE ; i++ )); \
 do echo $i $EXE $i $SCRIPT_FILE $OUT_DIR ; \
done > my.config.$UUFSCELL.$SLURM_JOBID

# Running a task on each core
srun --multi-prog my.config.$UUFSCELL.$SLURM_JOBID

# Run aggregate results
# Rscript nodes-and-cores-agg.R

# Clean-up unnessecary files
# rm R_* o_ e_* my*

echo " Calculation ended at:`date`"
  1. SLURM options: I’ve set my slurm file to easily toggle between different slurm options. By putting a space between #SBATCH (i.e., # SBATCH) the command is commented out. If I wished to target the kingspeak-guest partition, I could comment out notchpeak-guest and uncomment kingspeak-guest.

  2. # SBATCH -a 1-9: This is a command for a single job running an “array” of sub-jobs. We’ll revisit this later.

  3. JOB PARAMETERS: Here we’ve created variable names that will be called with creating multiple versions of the script.

  4. my.config.[] files: Here we create a file to target each core on the node, denoted by $UUFSCELL.$SLURM_JOBID.

  5. srun --multi-prog: Schedule slurm to run each configuration file

Create the shell file

vim nodes-and-cores.sh
#!/bin/bash 
  
# NOTES ON COMMAND LINE VARIABLES:
#   EXE         : rwapper.sh 
#   TASK_ID     : Id of the task
#   SCRIPT_FILE : Script is identical for each task
#   OUT_DIR     : EACH task has its own output directory

# Retrieve variable from the command line
EXE=$0          # Sets EXE to the script's name (the name with which it was called)
TASK_ID=$1      # Gets the first argument passed to the script (task ID)
SCRIPT_FILE=$2  # Gets the second argument passed to the script (the script file name)
OUT_DIR=$3      # Gets the third argument passed to the script (where to save the output)

if [ "$#" -ne 3 ] ; then
     echo "  ERROR: Command line needs 3 parameters (refer to slurm file for-loop, 3 parameters after $EXE)"
     echo "  Current arg list: $@"
else
     echo "  TaskID:$TASK_ID started at `date`"

     seed=`date +%s`${TASK_ID}
     Rfile=R_${SLURM_JOBID}_${TASK_ID}'.R'

     echo 'seed    <- '"${seed: -7}"  >  $Rfile
     echo 'out_dir <- paste0("'${OUT_DIR}'","/")' >> $Rfile
     cat ${SCRIPT_FILE}                  >> $Rfile
     chmod +x $Rfile

     # Run file
     Rscript $Rfile # > $TASK_ID.out 2>&1

     echo "  TaskID:$TASK_ID ended at `date`"
fi
  1. Bash if-else: Checks the number of input parameters (set in the slurm for-loop) and stops if different than expected (in this case 3 parameters)

  2. seed: Uses a combination of today’s date (time in seconds since Jan 1, 1970) and the task_id (an id for each iteration)

  3. Rfile: Creates a unique file name for each iteration

  4. Creating the R file: The first echo statements print to the new file Rfile and the script file is appended at the end. … Going back, these lines are created before the first few lines in 03-sapply.R. Hence, 03-sapply.R can create these variables even if not previously created (as done here).

  5. chmod +x: Ensures the owner has executable permissions for the file (so that the file can run).

  6. Rscript: Runs the file

Run the file and watch

Stepping back: Though that set up looks like alot, most of it can be simply modified for a given script and key simulation parameters.

Now, let’s run the slurm file and see what happens:

sbatch nodes-and-cores.sl
watch sq

At any point run ctrl+c to get out of watch sq. Look at the files created:

ls
vim my.[file]
vim R_[file]
vim o_[file]
vim e_[file]
cd sims
ls

Count the number of files created in sims (for each file there are 1K replicates):

ls | wc -l

Now, we need a file to aggregate results. In this case, the results are a vector, which can combined as below. With more complex output, I often find it most helpful to use ondemand to open RStudio and write a script that aggregates results.

vim nodes-and-cores-agg.R 
# List all .rds files in the directory
file_paths <- list.files(path = "sims", full.names = TRUE)

# Read and combine into a single vector
ests <- unlist(lapply(file_paths, readRDS))

save(ests, file="03-pi-ests.rds")

At this point, you can transfer the results back to your local machine!

Ramping up using arrays and adding new options

A job array performs the same slurm job on multiple iterations.

Go back to the slurm file, uncomment the array option, and run again sbatch.

vim nodes-and-cores.sl

Uncommenting the array option will now run 9 times the number of replicates as before.

#SBATCH -a 1-9
watch sq

Now, check how many simulation iterations have been saved:

cd sims
ls
ls | wc -l

View job details

Calling sacct can show job details (though the default printing can be busy):

sacct -j [job_number]

Here the sa alias can be helpful:

sa [job_number]

Modifying the slurm call to delete temporary files and to aggregate results

Very quickly the number of files in your directory can grow.

You can manually delete undesired files:

rm o_* e_* R_* my*

And/or, you can add these into your slurm script

vim nodes-and-cores.sl

Uncomment the line:

rm R_* o_ e_* my*

You could also uncomment the line to aggregate the files

Rscript nodes-and-cores-agg.R

/scratch/local

See Utah’s storage policies.

For temporary calculations that will be re-aggregated, consider using the scratch directory. These files will be deleted when not touched for 60 days.

Create a new directory called pi-nodes-and-cores-scratch and copy files from pi-nodes-and-cores

mkdir pi-nodes-and-cores-scratch
cd pi-nodes-and-cores-scratch
cp ../pi-nodes-and-cores/* .

Modify the slurm script as:

#SBATCH -e e_%j
#SBATCH -a 1-9
# SBATCH --account=chipman-np
# SBATCH --partition=chipman-np
# SBATCH --account=hci-collab
# SBATCH --partition=hci-kp
#SBATCH --account=owner-guest
#SBATCH --partition=notchpeak-guest
# SBATCH --partition=kingspeak-guest
# SBATCH --account=chipman
# SBATCH --partition=notchpeak-freecycle
# SBATCH --partition=kingspeak
# SBATCH --constraint="m256|m384|m512|m1024" # for KP
# SBATCH --constraint="m256|m384|m512|m768" # for NP
# SBATCH --constraint="rom"
#SBATCH --job-name=n-a-c

#set up scratch directory
HERE=$PWD
SCRDIR=/scratch/local/$USER/$SLURM_JOB_ID
mkdir -p $SCRDIR
mkdir -p $SCRDIR/sims

#copy input files and move over to the scratch directory
cp 03-sapply.R nodes-and-cores.sh nodes-and-cores-agg.R $SCRDIR
cd $SCRDIR

# Job Parameters
export EXE=nodes-and-cores.sh
export SCRIPT_FILE=03-sapply.R
export OUT_DIR=$SCRDIR/sims

# Load R
module load R

# Run an array of serial jobs
export OMP_NUM_THREADS=1

echo " Calculation started at:`date`"
echo " #$SLURM_TASKS_PER_NODE cores detected on `hostname`"

# Create the my.config.$SLURM_JOBID file on the fly
for (( i=0; i < $SLURM_TASKS_PER_NODE ; i++ )); \
 do echo $i $EXE $i $SCRIPT_FILE $OUT_DIR ; \
done > my.config.$UUFSCELL.$SLURM_JOBID

# Running a task on each core
srun --multi-prog my.config.$UUFSCELL.$SLURM_JOBID

# Run aggregate results
Rscript nodes-and-cores-agg.R


#copy output to your home directory and clean up
cp 03-pi-ests.rds $HERE
cd $HERE
rm -rf $SCRDIR

# Clean-up unnessecary files
rm R_* o_* e_* my*

echo " Calculation ended at:`date`"

Note the additions of:

  1. Creating a new directory on scratch:

HERE=\(PWD SCRDIR=/scratch/local/\)USER/$SLURM_JOB_ID mkdir -p $SCRDIR mkdir -p $SCRDIR/sims

  • HERE creates a pointer to your current working directory
  • SCRDIR creates a point to the scratch directory that will be created using mkdir.
  • sims is created within $SCRDIR to maintain the previous directory set-up
  1. Copying files to $SCRDIR and moving to $SCRDIR:

cp 03-sapply.R nodes-and-cores.sh nodes-and-cores-agg.R $SCRDIR cd $SCRDIR

  1. Aggregate: we’ve uncommented out the aggregate Rscript and copy the results back to $HERE.

  2. cd back to $HERE and removed files in $SRCDIR.

  3. Cleaned up any unwanted temporary files.

What resources are available on the cluster

To see the details of each node on the cluster, use the command:

sinfo

The aliases created last week can be used to see more information:

si
siMem

slumR

The above strategy uses all cores for each core requested. This may be good if accessing a node provides access to all cores (all or nothing).

Another strategy is to request cores across sharable nodes. This is superior when the nodes being accessed are sharable. slurm can smartly allocate each job to where there are available cores.

Here, we’ll use the slurmR package and follow the example here.

Creating the R script and submitting the slurm call

Create a new directory:

cd ~/chpc-examples
mkdir pi-slurmr

Copy and rename 02-mclapply.R and 02-mclapply.slurm

cp ../pi-mclapply/02-mclapply.R 04-slurmr_sapply.R
cp ../pi-mclapply/02-mclapply.slurm 04-slurmr_sapply.slurm

Install slurmR by doing the following:

module load R
R
install.packages("slurmR", repos = "https://cloud.r-project.org")

Modify 04-slurmr_sapply.R similar to here.

# Model parameters
nsims  <- 1e3
n      <- 1e4
# ncores <- 4L
njobs <- 4

# Function to simulate pi
simpi <- function(i,n.) {

  p <- matrix(runif(n.*2, -1, 1), ncol = 2)
  mean(sqrt(rowSums(p^2)) <= 1) * 4

}

# Approximation
# set.seed(12322)i

library(slurmR)

slurmR::opts_slurmR$set_opts(partition="notchpeak-guest", account="owner-guest")

ans <- slurmR::Slurm_sapply(1:nsims, simpi, n. = n, njobs=njobs, plan="collect")

message("Pi: ", mean(ans))

saveRDS(ans, "04-mclapply.rds")

Note the following:

  1. It is important to include the slurmR::opts_slurmR$set_opts(partition="notchpeak-guest", account="owner-guest") parameters. The Utah CHPC will not run an sbatch job without these parameters.

  2. simpi has a new input, n., and n. should be updated in p <- matrix(runif(n.*2, -1, 1), ncol = 2)

  3. Be sure to remove former calls to mclapply.

Update the slurm script as:

#!/bin/sh
#SBATCH --job-name=sapply
#SBATCH --time=00:10:00
#SBATCH --account=owner-guest
#SBATCH --partition=notchpeak-guest
#SBATCH --output o_%j 
#SBATCH --error e_%j
module load R
Rscript --vanilla 04-slurm_sapply.R

Be sure to remove the cpus-per-task (slurmR will take care of this).

Call the slurm script

sbatch 04-slurm_sapply.slurm
watch sq

Nice features of slurmR

slurmR has some appealing features, some of which include the ability to re-run array iterations that did not complete, the ability to target a specific number of iterations, and the aggregation of results.

slurmR can also be run from within R.

For more information see:

Disclaimer: slurmR could possibly be optimzed beyond the example shown above.