aton.api.slurm

Description

Functions to handle Slurm calls, to run calculations in clusters.

Index

sbatch() Sbatch all calculations
squeue() Get a Pandas DataFrame with info about the submitted calculations
scancel() Scancel all calculations, or applying some filters
scancel_errors() Scancel calculations with specific errors
scancel_here() Scancel all calculations running from a specific folder
get_running_here() Get a list with all jobs running in a given folder
check_template() Checks that the slurm template is OK, and provides an example if not

  1"""
  2# Description
  3
  4Functions to handle Slurm calls, to run calculations in clusters.  
  5
  6
  7# Index
  8
  9| | |
 10| --- | --- |
 11| `sbatch()`           | Sbatch all calculations |
 12| `squeue()`           | Get a Pandas DataFrame with info about the submitted calculations |
 13| `scancel()`          | Scancel all calculations, or applying some filters |
 14| `scancel_errors()`   | Scancel calculations with specific errors |
 15| `scancel_here()`     | Scancel all calculations running from a specific folder |
 16| `get_running_here()` | Get a list with all jobs running in a given folder |
 17| `check_template()`   | Checks that the slurm template is OK, and provides an example if not |
 18
 19---
 20"""
 21
 22
 23import os
 24import pandas as pd
 25import aton.call as call
 26import aton.file as file
 27import aton.txt.find as find
 28import aton.txt.edit as edit
 29from aton._version import __version__
 30
 31
 32def sbatch(
 33        prefix:str='',
 34        template:str='template.slurm',
 35        in_ext:str='.in',
 36        out_ext:str='.out',
 37        folder=None,
 38        files:list=[],
 39        testing:bool=False,
 40    ) -> None:
 41    """Sbatch all the calculations at once.
 42
 43    Calculation names should follow `prefix_ID.ext`,
 44    with `prefix` as the common name across calculations,
 45    followed by the calculation ID, used as JOB_NAME.
 46    The extensions from `in_ext` and `out_ext` ('.in' and '.out' by default)
 47    will be used for the INPUT and OUTPUT filenames of the slurm template.
 48
 49    The slurm template, `template.slurm` by default,
 50    must contain the keywords `JOBNAME`, `INPUT` and `OUTPUT`:
 51    ```
 52    #SBATCH --job-name=JOBNAME
 53    srun --cpu_bind=cores pw.x -inp INPUT > OUTPUT
 54    ```
 55
 56    Runs from the specified `folder`, current working directory if empty.
 57
 58    If more control is required, a custom list of `files` can be specified for sbatching.
 59
 60    If `testing = True` it skips the final sbatching,
 61    just printing the commands on the screen.
 62    """
 63    print('Sbatching all calculations...\n')
 64    key_input = 'INPUT'
 65    key_output = 'OUTPUT'
 66    key_jobname = 'JOBNAME'
 67    slurm_folder = 'slurms'
 68    folder = call.here(folder)
 69    # Get input files and abort if not found
 70    if not files:
 71        inputs_raw = file.get_list(folder=folder, include=prefix, abspath=False)
 72    else:
 73        inputs_raw = files
 74    inputs = []
 75    for filename in inputs_raw:
 76        if filename.endswith(in_ext):
 77            inputs.append(filename)
 78    if len(inputs) == 0:
 79        raise FileNotFoundError(f"Input files were not found! Expected {prefix}ID.{in_ext}")
 80    # Make the folder for the sbatch'ed slurm files
 81    call.bash(f"mkdir {slurm_folder}", folder, True, True)
 82    # Get the template
 83    slurm_file = check_template(template, folder)
 84    if not slurm_file:
 85        print(f'Aborting... Please correct {template}\n')
 86        return None
 87    for filename in inputs:
 88        # Get the file ID
 89        basename: str = os.path.basename(filename)
 90        basename_out: str = basename.replace(in_ext, out_ext)
 91        calc_id = basename.replace(prefix, '')
 92        calc_id = calc_id.replace(in_ext, '')
 93        calc_id = calc_id.replace('_', '')
 94        calc_id = calc_id.replace('-', '')
 95        calc_id = calc_id.replace('.', '')
 96        # Create slurm file for this supercell
 97        slurm_id = prefix + calc_id + '.slurm'
 98        # fixing dictionary with the words to replace in the template
 99        fixing_dict = {
100            key_jobname: calc_id,
101            key_input: basename,
102            key_output: basename_out
103        }
104        edit.from_template(slurm_file, slurm_id, fixing_dict)
105        if testing:
106            call.bash(f"echo {slurm_id}", folder)
107        else:
108            call.bash(f"sbatch {slurm_id}", folder, True, False)
109        call.bash(f"mv {slurm_id} {slurm_folder}", folder, False, True)  # Do not raise error if we can't move the file
110    print(f'Done! Temporary slurm files were moved to ./{slurm_folder}/\n')
111
112
113def squeue(user) -> pd.DataFrame:
114    """Returns a Pandas DataFrame with the jobs from a specific `user`"""
115    result = call.bash(command=f'squeue -u {user}', verbose=False)
116    data = result.stdout
117    lines = data.strip().split('\n')
118    data_rows = [line.split() for line in lines[1:]]
119    df = pd.DataFrame(data_rows, columns=lines[0].split())
120    return df
121
122
123def scancel(
124        user:str,
125        text:str='',
126        status:str='',
127        jobs:list=[],
128        key_jobid:str='JOBID',
129        key_name:str='NAME',
130        key_status:str='ST',
131        testing:bool=False,
132        ) -> None:
133    """Cancel all `user` jobs.
134
135    If a particular `text` string is provided,
136    only the calculations containing said string in the name will be deleted.
137
138    If a particular `status` string is provided,
139    only the calculations with said status will be cancelled.
140
141    If a list of `jobs` is provided, those JOBIDs will be cancelled.
142
143    These filters can all be combined to provide strict control.
144
145    If `testing = True`, it shows the calculations that would be deleted.
146
147    if the slurm squeue titles are different in your cluster,
148    you can specify them with `key_jobid`, `key_status` and `key_name`.
149    """
150    df = squeue(user)
151    if testing:
152        print('aton.api.slurm.scancel(testing=True):')
153        print(f'The following calculations would be killed for the user {user}')
154        print(f'{key_jobid}   {key_status}   {key_name}')
155    jobid_list = df[key_jobid].tolist()
156    name_list = df[key_name].tolist()
157    status_list = df[key_status].tolist()
158    for i, jobid in enumerate(jobid_list):
159        name = name_list[i]
160        st = status_list[i]
161        job = jobid_list[i]
162        # Should we delete this process?
163        bool_1: bool = status == '' and text == ''
164        bool_2: bool = status == st and text == ''
165        bool_3: bool = status == '' and text in name
166        bool_4: bool = status == st and text in name
167        will_delete: bool = bool_1 or bool_2 or bool_3 or bool_4
168        if jobs:
169            jobs = [str(i).strip() for i in jobs]
170            will_delete = will_delete and job in jobs
171        if will_delete:
172            if testing:
173                print(f'{jobid}   {st}   {name}')
174            else:
175                call.bash(f'scancel {jobid}')
176
177
178def scancel_errors(
179        user:str='',
180        errors=['oom_killed', 'OOM Killed', 'Out Of Memory'],
181        folder=None,
182        prefix:str='slurm-',
183        sufix:str='.out',
184        key_jobid:str='JOBID',
185        testing=False,
186        ) -> None:
187    """Cancel all running jobs with any matching `errors` in a given `folder`.
188
189    By default it matches Out Of Memory errors (OOM).
190    The jobs will be detected from the `<prefix>JOBID<sufix>` files, `slurm-JOBID.out` by default.
191
192    If an `user` name is provided, it additionally checks if the jobs are actually scheduled
193    to avoid reporting or killing jobs that have already finished.
194    If the slurm squeue 'JOBID' title is different in your cluster,
195    change it with `key_jobid`.
196
197    If `testing=True`, it will only print the jobs that would be cancelled.
198    """
199    filenames = get_running_here(user=user, folder=folder, prefix=prefix, sufix=sufix, key_jobid=key_jobid, get_files=True)
200    filenames_to_cancel = []
201    jobs_to_cancel = []
202    # Ensure errors is a list
203    if not isinstance(errors, list):
204        errors = [str(errors)]
205    # Find errors in the slurm files
206    for f in filenames:
207        matches = []
208        for e in errors:
209            m = find.lines(filepath=f, key=e)
210            matches.extend(m)
211        if matches:
212            filenames_to_cancel.append(f)
213    # Convert filenames to JOBIDs
214    for f in filenames_to_cancel:
215        f = f.replace(prefix, '')
216        f = f.replace(sufix, '')
217        jobs_to_cancel.append(f)
218    # Report or scancel
219    if testing:
220        print('The following jobs would be cancelled:')
221        for job in jobs_to_cancel:
222            print(f'{job}')
223        return None
224    for job in jobs_to_cancel:
225        call.bash(f'scancel {job}', folder)
226    return None
227
228
229def scancel_here(
230        user:str='',
231        folder=None,
232        prefix:str='slurm-',
233        sufix:str='.out',
234        key_jobid:str='JOBID',
235        testing:bool=False,
236        ) -> None:
237    """Cancel all running jobs in a given `folder`.
238
239    The jobs will be detected from the `<prefix>JOBID<sufix>` files, `slurm-JOBID.out` by default.
240
241    If an `user` name is provided, it additionally checks if the jobs are actually scheduled
242    to avoid reporting or killing jobs that have already finished.
243    If the slurm squeue 'JOBID' title is different in your cluster,
244    change it with `key_jobid`.
245
246    If `testing=True`, it will only print the jobs that would be cancelled.
247    """
248    jobs_to_cancel = get_running_here(user=user, folder=folder, prefix=prefix, sufix=sufix, key_jobid=key_jobid, get_files=False)
249    if testing:
250        print('The following jobs would be cancelled:')
251        for job in jobs_to_cancel:
252            print(f'{job}')
253        return None
254    for job in jobs_to_cancel:
255        call.bash(f'scancel {job}', folder)
256    return None
257
258
259def get_running_here(
260        user:str='',
261        folder=None,
262        prefix:str='slurm-',
263        sufix:str='.out',
264        key_jobid:str='JOBID',
265        get_files=False
266        ) -> list:
267    """Return a list with all running `jobs` in a given `folder`, or current working directory if not specified.
268
269    The jobs will be detected from the `<prefix>JOBID<sufix>` files, `slurm-JOBID.out` by default.
270
271    If an `user` name is provided, it additionally checks if the jobs are actually scheduled
272    to avoid reporting or killing jobs that have already finished.
273    If the slurm squeue 'JOBID' title is different in your cluster,
274    change it with `key_jobid`.
275
276    If `get_files=True`, a list with the filenames will be returned instead of the JOBIDs.
277    """
278    filenames = file.get_list(folder=folder, include=prefix, abspath=False)
279    if not filenames:
280        raise FileNotFoundError(f'To detect the calculations, {prefix}JOBID{sufix} files are needed!\nConfigure the folder, as well as the prefix and sufix if necessary.')
281    jobs = []
282    for filename in filenames:
283        filename = filename.replace(prefix, '')
284        filename = filename.replace(sufix, '')
285        jobs.append(filename)
286    # Check if the jobs are actually scheduled
287    if user:
288        df = squeue(user)
289        jobid_list = df[key_jobid].tolist()
290        jobs = [job for job in jobs if job in jobid_list]
291    # Return the filenames if specified
292    if get_files:
293        if user:
294            filenames = [f for f in filenames if any(job in f for job in jobs)]
295        return filenames
296    return jobs
297
298
299def check_template(
300        template:str='template.slurm',
301        folder=None,
302    ) -> str:
303    """Check the slurm `template` inside `folder`, to be used by `sbatch()`.
304
305    The current working directory is used if `folder` is not provided.
306    If the file does not exist or is invalid, creates a `template_EXAMPLE.slurm` file for reference.
307    """
308    folder = call.here(folder)
309    slurm_example = 'template_EXAMPLE.slurm'
310    new_slurm_file = os.path.join(folder, slurm_example)
311    # Default slurm template
312    content =f"""# Automatic slurm template created with ATON {__version__}
313# https://pablogila.github.io/aton
314
315#!/bin/bash
316#SBATCH --partition=general
317#SBATCH --qos=regular
318#SBATCH --job-name=JOBNAME
319#SBATCH --ntasks=16
320#SBATCH --time=12:00:00
321#SBATCH --mem=64G
322
323module purge
324module load QuantumESPRESSO/7.3-foss-2023a
325
326srun --cpu_bind=cores pw.x -inp INPUT > OUTPUT
327"""
328    # If the slurm template does not exist, create one
329    slurm_file = file.get(folder, template, return_anyway=True)
330    if not slurm_file:
331        with open(new_slurm_file, 'w') as f:
332            f.write(content)
333        print(f'!!! WARNING:  Slurm template missing, an example was generated automatically:\n'
334              f'{slurm_example}\n'
335              f'PLEASE CHECK it, UPDATE it and RENAME it to {template}\n'
336              'before using aton.api.slurm.sbatch()\n')
337        return None
338    # Check that the slurm file contains the INPUT_FILE, OUTPUT_FILE and JOB_NAME keywords
339    key_input = find.lines(slurm_file, 'INPUT')
340    key_output = find.lines(slurm_file, 'OUTPUT')
341    key_jobname = find.lines(slurm_file, 'JOBNAME')
342    missing = []
343    if not key_input:
344        missing.append('INPUT')
345    if not key_output:
346        missing.append('OUTPUT')
347    if not key_jobname:
348        missing.append('JOBNAME')
349    if len(missing) > 0:
350        with open(new_slurm_file, 'w') as f:
351            f.write(content)
352        print('!!! WARNING:  Some keywords were missing from your slurm template,\n'
353              f'PLEASE CHECK the example at {slurm_example}\n'
354              'before using aton.api.slurm.sbatch()\n'
355              f'The following keywords were missing from your {template}:')
356        for key in missing:
357            print(key)
358        print('')
359        return None
360    return slurm_file  # Ready to use!
def sbatch( prefix: str = '', template: str = 'template.slurm', in_ext: str = '.in', out_ext: str = '.out', folder=None, files: list = [], testing: bool = False) -> None:
 33def sbatch(
 34        prefix:str='',
 35        template:str='template.slurm',
 36        in_ext:str='.in',
 37        out_ext:str='.out',
 38        folder=None,
 39        files:list=[],
 40        testing:bool=False,
 41    ) -> None:
 42    """Sbatch all the calculations at once.
 43
 44    Calculation names should follow `prefix_ID.ext`,
 45    with `prefix` as the common name across calculations,
 46    followed by the calculation ID, used as JOB_NAME.
 47    The extensions from `in_ext` and `out_ext` ('.in' and '.out' by default)
 48    will be used for the INPUT and OUTPUT filenames of the slurm template.
 49
 50    The slurm template, `template.slurm` by default,
 51    must contain the keywords `JOBNAME`, `INPUT` and `OUTPUT`:
 52    ```
 53    #SBATCH --job-name=JOBNAME
 54    srun --cpu_bind=cores pw.x -inp INPUT > OUTPUT
 55    ```
 56
 57    Runs from the specified `folder`, current working directory if empty.
 58
 59    If more control is required, a custom list of `files` can be specified for sbatching.
 60
 61    If `testing = True` it skips the final sbatching,
 62    just printing the commands on the screen.
 63    """
 64    print('Sbatching all calculations...\n')
 65    key_input = 'INPUT'
 66    key_output = 'OUTPUT'
 67    key_jobname = 'JOBNAME'
 68    slurm_folder = 'slurms'
 69    folder = call.here(folder)
 70    # Get input files and abort if not found
 71    if not files:
 72        inputs_raw = file.get_list(folder=folder, include=prefix, abspath=False)
 73    else:
 74        inputs_raw = files
 75    inputs = []
 76    for filename in inputs_raw:
 77        if filename.endswith(in_ext):
 78            inputs.append(filename)
 79    if len(inputs) == 0:
 80        raise FileNotFoundError(f"Input files were not found! Expected {prefix}ID.{in_ext}")
 81    # Make the folder for the sbatch'ed slurm files
 82    call.bash(f"mkdir {slurm_folder}", folder, True, True)
 83    # Get the template
 84    slurm_file = check_template(template, folder)
 85    if not slurm_file:
 86        print(f'Aborting... Please correct {template}\n')
 87        return None
 88    for filename in inputs:
 89        # Get the file ID
 90        basename: str = os.path.basename(filename)
 91        basename_out: str = basename.replace(in_ext, out_ext)
 92        calc_id = basename.replace(prefix, '')
 93        calc_id = calc_id.replace(in_ext, '')
 94        calc_id = calc_id.replace('_', '')
 95        calc_id = calc_id.replace('-', '')
 96        calc_id = calc_id.replace('.', '')
 97        # Create slurm file for this supercell
 98        slurm_id = prefix + calc_id + '.slurm'
 99        # fixing dictionary with the words to replace in the template
100        fixing_dict = {
101            key_jobname: calc_id,
102            key_input: basename,
103            key_output: basename_out
104        }
105        edit.from_template(slurm_file, slurm_id, fixing_dict)
106        if testing:
107            call.bash(f"echo {slurm_id}", folder)
108        else:
109            call.bash(f"sbatch {slurm_id}", folder, True, False)
110        call.bash(f"mv {slurm_id} {slurm_folder}", folder, False, True)  # Do not raise error if we can't move the file
111    print(f'Done! Temporary slurm files were moved to ./{slurm_folder}/\n')

Sbatch all the calculations at once.

Calculation names should follow prefix_ID.ext, with prefix as the common name across calculations, followed by the calculation ID, used as JOB_NAME. The extensions from in_ext and out_ext ('.in' and '.out' by default) will be used for the INPUT and OUTPUT filenames of the slurm template.

The slurm template, template.slurm by default, must contain the keywords JOBNAME, INPUT and OUTPUT:

#SBATCH --job-name=JOBNAME
srun --cpu_bind=cores pw.x -inp INPUT > OUTPUT

Runs from the specified folder, current working directory if empty.

If more control is required, a custom list of files can be specified for sbatching.

If testing = True it skips the final sbatching, just printing the commands on the screen.

def squeue(user) -> pandas.core.frame.DataFrame:
114def squeue(user) -> pd.DataFrame:
115    """Returns a Pandas DataFrame with the jobs from a specific `user`"""
116    result = call.bash(command=f'squeue -u {user}', verbose=False)
117    data = result.stdout
118    lines = data.strip().split('\n')
119    data_rows = [line.split() for line in lines[1:]]
120    df = pd.DataFrame(data_rows, columns=lines[0].split())
121    return df

Returns a Pandas DataFrame with the jobs from a specific user

def scancel( user: str, text: str = '', status: str = '', jobs: list = [], key_jobid: str = 'JOBID', key_name: str = 'NAME', key_status: str = 'ST', testing: bool = False) -> None:
124def scancel(
125        user:str,
126        text:str='',
127        status:str='',
128        jobs:list=[],
129        key_jobid:str='JOBID',
130        key_name:str='NAME',
131        key_status:str='ST',
132        testing:bool=False,
133        ) -> None:
134    """Cancel all `user` jobs.
135
136    If a particular `text` string is provided,
137    only the calculations containing said string in the name will be deleted.
138
139    If a particular `status` string is provided,
140    only the calculations with said status will be cancelled.
141
142    If a list of `jobs` is provided, those JOBIDs will be cancelled.
143
144    These filters can all be combined to provide strict control.
145
146    If `testing = True`, it shows the calculations that would be deleted.
147
148    if the slurm squeue titles are different in your cluster,
149    you can specify them with `key_jobid`, `key_status` and `key_name`.
150    """
151    df = squeue(user)
152    if testing:
153        print('aton.api.slurm.scancel(testing=True):')
154        print(f'The following calculations would be killed for the user {user}')
155        print(f'{key_jobid}   {key_status}   {key_name}')
156    jobid_list = df[key_jobid].tolist()
157    name_list = df[key_name].tolist()
158    status_list = df[key_status].tolist()
159    for i, jobid in enumerate(jobid_list):
160        name = name_list[i]
161        st = status_list[i]
162        job = jobid_list[i]
163        # Should we delete this process?
164        bool_1: bool = status == '' and text == ''
165        bool_2: bool = status == st and text == ''
166        bool_3: bool = status == '' and text in name
167        bool_4: bool = status == st and text in name
168        will_delete: bool = bool_1 or bool_2 or bool_3 or bool_4
169        if jobs:
170            jobs = [str(i).strip() for i in jobs]
171            will_delete = will_delete and job in jobs
172        if will_delete:
173            if testing:
174                print(f'{jobid}   {st}   {name}')
175            else:
176                call.bash(f'scancel {jobid}')

Cancel all user jobs.

If a particular text string is provided, only the calculations containing said string in the name will be deleted.

If a particular status string is provided, only the calculations with said status will be cancelled.

If a list of jobs is provided, those JOBIDs will be cancelled.

These filters can all be combined to provide strict control.

If testing = True, it shows the calculations that would be deleted.

if the slurm squeue titles are different in your cluster, you can specify them with key_jobid, key_status and key_name.

def scancel_errors( user: str = '', errors=['oom_killed', 'OOM Killed', 'Out Of Memory'], folder=None, prefix: str = 'slurm-', sufix: str = '.out', key_jobid: str = 'JOBID', testing=False) -> None:
179def scancel_errors(
180        user:str='',
181        errors=['oom_killed', 'OOM Killed', 'Out Of Memory'],
182        folder=None,
183        prefix:str='slurm-',
184        sufix:str='.out',
185        key_jobid:str='JOBID',
186        testing=False,
187        ) -> None:
188    """Cancel all running jobs with any matching `errors` in a given `folder`.
189
190    By default it matches Out Of Memory errors (OOM).
191    The jobs will be detected from the `<prefix>JOBID<sufix>` files, `slurm-JOBID.out` by default.
192
193    If an `user` name is provided, it additionally checks if the jobs are actually scheduled
194    to avoid reporting or killing jobs that have already finished.
195    If the slurm squeue 'JOBID' title is different in your cluster,
196    change it with `key_jobid`.
197
198    If `testing=True`, it will only print the jobs that would be cancelled.
199    """
200    filenames = get_running_here(user=user, folder=folder, prefix=prefix, sufix=sufix, key_jobid=key_jobid, get_files=True)
201    filenames_to_cancel = []
202    jobs_to_cancel = []
203    # Ensure errors is a list
204    if not isinstance(errors, list):
205        errors = [str(errors)]
206    # Find errors in the slurm files
207    for f in filenames:
208        matches = []
209        for e in errors:
210            m = find.lines(filepath=f, key=e)
211            matches.extend(m)
212        if matches:
213            filenames_to_cancel.append(f)
214    # Convert filenames to JOBIDs
215    for f in filenames_to_cancel:
216        f = f.replace(prefix, '')
217        f = f.replace(sufix, '')
218        jobs_to_cancel.append(f)
219    # Report or scancel
220    if testing:
221        print('The following jobs would be cancelled:')
222        for job in jobs_to_cancel:
223            print(f'{job}')
224        return None
225    for job in jobs_to_cancel:
226        call.bash(f'scancel {job}', folder)
227    return None

Cancel all running jobs with any matching errors in a given folder.

By default it matches Out Of Memory errors (OOM). The jobs will be detected from the <prefix>JOBID<sufix> files, slurm-JOBID.out by default.

If an user name is provided, it additionally checks if the jobs are actually scheduled to avoid reporting or killing jobs that have already finished. If the slurm squeue 'JOBID' title is different in your cluster, change it with key_jobid.

If testing=True, it will only print the jobs that would be cancelled.

def scancel_here( user: str = '', folder=None, prefix: str = 'slurm-', sufix: str = '.out', key_jobid: str = 'JOBID', testing: bool = False) -> None:
230def scancel_here(
231        user:str='',
232        folder=None,
233        prefix:str='slurm-',
234        sufix:str='.out',
235        key_jobid:str='JOBID',
236        testing:bool=False,
237        ) -> None:
238    """Cancel all running jobs in a given `folder`.
239
240    The jobs will be detected from the `<prefix>JOBID<sufix>` files, `slurm-JOBID.out` by default.
241
242    If an `user` name is provided, it additionally checks if the jobs are actually scheduled
243    to avoid reporting or killing jobs that have already finished.
244    If the slurm squeue 'JOBID' title is different in your cluster,
245    change it with `key_jobid`.
246
247    If `testing=True`, it will only print the jobs that would be cancelled.
248    """
249    jobs_to_cancel = get_running_here(user=user, folder=folder, prefix=prefix, sufix=sufix, key_jobid=key_jobid, get_files=False)
250    if testing:
251        print('The following jobs would be cancelled:')
252        for job in jobs_to_cancel:
253            print(f'{job}')
254        return None
255    for job in jobs_to_cancel:
256        call.bash(f'scancel {job}', folder)
257    return None

Cancel all running jobs in a given folder.

The jobs will be detected from the <prefix>JOBID<sufix> files, slurm-JOBID.out by default.

If an user name is provided, it additionally checks if the jobs are actually scheduled to avoid reporting or killing jobs that have already finished. If the slurm squeue 'JOBID' title is different in your cluster, change it with key_jobid.

If testing=True, it will only print the jobs that would be cancelled.

def get_running_here( user: str = '', folder=None, prefix: str = 'slurm-', sufix: str = '.out', key_jobid: str = 'JOBID', get_files=False) -> list:
260def get_running_here(
261        user:str='',
262        folder=None,
263        prefix:str='slurm-',
264        sufix:str='.out',
265        key_jobid:str='JOBID',
266        get_files=False
267        ) -> list:
268    """Return a list with all running `jobs` in a given `folder`, or current working directory if not specified.
269
270    The jobs will be detected from the `<prefix>JOBID<sufix>` files, `slurm-JOBID.out` by default.
271
272    If an `user` name is provided, it additionally checks if the jobs are actually scheduled
273    to avoid reporting or killing jobs that have already finished.
274    If the slurm squeue 'JOBID' title is different in your cluster,
275    change it with `key_jobid`.
276
277    If `get_files=True`, a list with the filenames will be returned instead of the JOBIDs.
278    """
279    filenames = file.get_list(folder=folder, include=prefix, abspath=False)
280    if not filenames:
281        raise FileNotFoundError(f'To detect the calculations, {prefix}JOBID{sufix} files are needed!\nConfigure the folder, as well as the prefix and sufix if necessary.')
282    jobs = []
283    for filename in filenames:
284        filename = filename.replace(prefix, '')
285        filename = filename.replace(sufix, '')
286        jobs.append(filename)
287    # Check if the jobs are actually scheduled
288    if user:
289        df = squeue(user)
290        jobid_list = df[key_jobid].tolist()
291        jobs = [job for job in jobs if job in jobid_list]
292    # Return the filenames if specified
293    if get_files:
294        if user:
295            filenames = [f for f in filenames if any(job in f for job in jobs)]
296        return filenames
297    return jobs

Return a list with all running jobs in a given folder, or current working directory if not specified.

The jobs will be detected from the <prefix>JOBID<sufix> files, slurm-JOBID.out by default.

If an user name is provided, it additionally checks if the jobs are actually scheduled to avoid reporting or killing jobs that have already finished. If the slurm squeue 'JOBID' title is different in your cluster, change it with key_jobid.

If get_files=True, a list with the filenames will be returned instead of the JOBIDs.

def check_template(template: str = 'template.slurm', folder=None) -> str:
300def check_template(
301        template:str='template.slurm',
302        folder=None,
303    ) -> str:
304    """Check the slurm `template` inside `folder`, to be used by `sbatch()`.
305
306    The current working directory is used if `folder` is not provided.
307    If the file does not exist or is invalid, creates a `template_EXAMPLE.slurm` file for reference.
308    """
309    folder = call.here(folder)
310    slurm_example = 'template_EXAMPLE.slurm'
311    new_slurm_file = os.path.join(folder, slurm_example)
312    # Default slurm template
313    content =f"""# Automatic slurm template created with ATON {__version__}
314# https://pablogila.github.io/aton
315
316#!/bin/bash
317#SBATCH --partition=general
318#SBATCH --qos=regular
319#SBATCH --job-name=JOBNAME
320#SBATCH --ntasks=16
321#SBATCH --time=12:00:00
322#SBATCH --mem=64G
323
324module purge
325module load QuantumESPRESSO/7.3-foss-2023a
326
327srun --cpu_bind=cores pw.x -inp INPUT > OUTPUT
328"""
329    # If the slurm template does not exist, create one
330    slurm_file = file.get(folder, template, return_anyway=True)
331    if not slurm_file:
332        with open(new_slurm_file, 'w') as f:
333            f.write(content)
334        print(f'!!! WARNING:  Slurm template missing, an example was generated automatically:\n'
335              f'{slurm_example}\n'
336              f'PLEASE CHECK it, UPDATE it and RENAME it to {template}\n'
337              'before using aton.api.slurm.sbatch()\n')
338        return None
339    # Check that the slurm file contains the INPUT_FILE, OUTPUT_FILE and JOB_NAME keywords
340    key_input = find.lines(slurm_file, 'INPUT')
341    key_output = find.lines(slurm_file, 'OUTPUT')
342    key_jobname = find.lines(slurm_file, 'JOBNAME')
343    missing = []
344    if not key_input:
345        missing.append('INPUT')
346    if not key_output:
347        missing.append('OUTPUT')
348    if not key_jobname:
349        missing.append('JOBNAME')
350    if len(missing) > 0:
351        with open(new_slurm_file, 'w') as f:
352            f.write(content)
353        print('!!! WARNING:  Some keywords were missing from your slurm template,\n'
354              f'PLEASE CHECK the example at {slurm_example}\n'
355              'before using aton.api.slurm.sbatch()\n'
356              f'The following keywords were missing from your {template}:')
357        for key in missing:
358            print(key)
359        print('')
360        return None
361    return slurm_file  # Ready to use!

Check the slurm template inside folder, to be used by sbatch().

The current working directory is used if folder is not provided. If the file does not exist or is invalid, creates a template_EXAMPLE.slurm file for reference.