Skip to content

Commit

Permalink
better node distribution for solar module
Browse files Browse the repository at this point in the history
  • Loading branch information
bnb32 committed Sep 19, 2024
1 parent a146672 commit 5b0ca8d
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 52 deletions.
86 changes: 58 additions & 28 deletions sup3r/solar/solar.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ def __init__(
nsrdb_fp,
t_slice=slice(None),
tz=-6,
time_shift=-12,
agg_factor=1,
nn_threshold=0.5,
cloud_threshold=0.99,
Expand Down Expand Up @@ -67,12 +66,6 @@ def __init__(
the GAN is trained on data in local time and therefore the output
in sup3r_fps should be treated as local time. For example, -6 is
CST which is default for CONUS training data.
time_shift : int | None
Number of hours to shift time axis. This can be used, for
example, to shift the time index for daily data so that the time
stamp for a given day starts at hour zero instead of at
noon, as is the case for most GCM data. In this case ``time_shift``
would be -12
agg_factor : int
Spatial aggregation factor for nsrdb-to-GAN-meta e.g. the number of
NSRDB spatial pixels to average for a single sup3r GAN output site.
Expand All @@ -91,7 +84,6 @@ def __init__(
self.nn_threshold = nn_threshold
self.cloud_threshold = cloud_threshold
self.tz = tz
self.time_shift = time_shift
self._nsrdb_fp = nsrdb_fp
self._sup3r_fps = sup3r_fps
if isinstance(self._sup3r_fps, str):
Expand Down Expand Up @@ -203,8 +195,7 @@ def time_index(self):
-------
pd.DatetimeIndex
"""
ti = self.gan_data.time_index[self.t_slice]
return ti.shift(self.time_shift, freq='h')
return self.gan_data.time_index[self.t_slice]

@property
def out_of_bounds(self):
Expand Down Expand Up @@ -521,7 +512,7 @@ def get_node_cmd(cls, config):
import_str += 'from rex import init_logger;\n'
import_str += f'from sup3r.solar import {cls.__name__}'

fun_str = get_fun_call_str(cls.run_temporal_chunk, config)
fun_str = get_fun_call_str(cls.run_temporal_chunks, config)

log_file = config.get('log_file', None)
log_level = config.get('log_level', 'INFO')
Expand Down Expand Up @@ -590,22 +581,21 @@ def write(self, fp_out, features=('ghi', 'dni', 'dhi')):
logger.info(f'Finished writing file: {fp_out}')

@classmethod
def run_temporal_chunk(
def run_temporal_chunks(
cls,
fp_pattern,
nsrdb_fp,
fp_out_suffix='irradiance',
tz=-6,
time_shift=-12,
agg_factor=1,
nn_threshold=0.5,
cloud_threshold=0.99,
features=('ghi', 'dni', 'dhi'),
temporal_id=None,
temporal_ids=None,
):
"""Run the solar module on all spatial chunks for a single temporal
chunk corresponding to the fp_pattern. This typically gets run from the
CLI.
"""Run the solar module on all spatial chunks for each temporal
chunk corresponding to the fp_pattern and the given list of
temporal_ids. This typically gets run from the CLI.
Parameters
----------
Expand All @@ -623,12 +613,6 @@ def run_temporal_chunk(
the GAN is trained on data in local time and therefore the output
in sup3r_fps should be treated as local time. For example, -6 is
CST which is default for CONUS training data.
time_shift : int | None
Number of hours to shift time axis. This can be used, for
example, to shift the time index for daily data so that the time
stamp for a given day starts at hour zero instead of at
noon, as is the case for most GCM data. In this case ``time_shift``
would be -12
agg_factor : int
Spatial aggregation factor for nsrdb-to-GAN-meta e.g. the number of
NSRDB spatial pixels to average for a single sup3r GAN output site.
Expand All @@ -643,10 +627,57 @@ def run_temporal_chunk(
features : list | tuple
List of features to write to disk. These have to be attributes of
the Solar class (ghi, dni, dhi).
temporal_id : str | None
One of the unique zero-padded temporal id's from the file chunks
that match fp_pattern. This input typically gets set from the CLI.
If None, this will run all temporal indices.
temporal_ids : list | None
Lise of zero-padded temporal ids from the file chunks that match
fp_pattern. This input typically gets set from the CLI. If None,
this will run all temporal indices.
"""
if temporal_ids is None:
cls._run_temporal_chunk(
fp_pattern=fp_pattern,
nsrdb_fp=nsrdb_fp,
fp_out_suffix=fp_out_suffix,
tz=tz,
agg_factor=agg_factor,
nn_threshold=nn_threshold,
cloud_threshold=cloud_threshold,
features=features,
temporal_id=temporal_ids,
)
else:
for temporal_id in temporal_ids:
cls._run_temporal_chunk(
fp_pattern=fp_pattern,
nsrdb_fp=nsrdb_fp,
fp_out_suffix=fp_out_suffix,
tz=tz,
agg_factor=agg_factor,
nn_threshold=nn_threshold,
cloud_threshold=cloud_threshold,
features=features,
temporal_id=temporal_id,
)

@classmethod
def _run_temporal_chunk(
cls,
fp_pattern,
nsrdb_fp,
fp_out_suffix='irradiance',
tz=-6,
agg_factor=1,
nn_threshold=0.5,
cloud_threshold=0.99,
features=('ghi', 'dni', 'dhi'),
temporal_id=None,
):
"""Run the solar module on all spatial chunks for a single temporal
chunk corresponding to the fp_pattern. This typically gets run from the
CLI.
See Also
--------
:meth:`run_temporal_chunks`
"""

temp = cls.get_sup3r_fps(fp_pattern, ignore=f'_{fp_out_suffix}.h5')
Expand Down Expand Up @@ -685,7 +716,6 @@ def run_temporal_chunk(
kwargs = {
't_slice': t_slice,
'tz': tz,
'time_shift': time_shift,
'agg_factor': agg_factor,
'nn_threshold': nn_threshold,
'cloud_threshold': cloud_threshold,
Expand Down
92 changes: 68 additions & 24 deletions sup3r/solar/solar_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
TODO: This should be modified to enable distribution of file groups across
nodes instead of requesting a node for a single file
"""

import copy
import logging
import os

import click
import numpy as np

from sup3r import __version__
from sup3r.solar import Solar
Expand All @@ -19,8 +21,12 @@

@click.group()
@click.version_option(version=__version__)
@click.option('-v', '--verbose', is_flag=True,
help='Flag to turn on debug logging. Default is not verbose.')
@click.option(
'-v',
'--verbose',
is_flag=True,
help='Flag to turn on debug logging. Default is not verbose.',
)
@click.pass_context
def main(ctx, verbose):
"""Sup3r Solar Command Line Interface"""
Expand All @@ -29,37 +35,59 @@ def main(ctx, verbose):


@main.command()
@click.option('--config_file', '-c', required=True,
type=click.Path(exists=True),
help='sup3r solar configuration .json file.')
@click.option('-v', '--verbose', is_flag=True,
help='Flag to turn on debug logging. Default is not verbose.')
@click.option(
'--config_file',
'-c',
required=True,
type=click.Path(exists=True),
help='sup3r solar configuration .json file.',
)
@click.option(
'-v',
'--verbose',
is_flag=True,
help='Flag to turn on debug logging. Default is not verbose.',
)
@click.pass_context
def from_config(ctx, config_file, verbose=False, pipeline_step=None):
"""Run sup3r solar from a config file."""
config = BaseCLI.from_config_preflight(ModuleName.SOLAR, ctx, config_file,
verbose)
config = BaseCLI.from_config_preflight(
ModuleName.SOLAR, ctx, config_file, verbose
)
exec_kwargs = config.get('execution_control', {})
hardware_option = exec_kwargs.pop('option', 'local')
log_pattern = config.get('log_pattern', None)
fp_pattern = config['fp_pattern']
basename = config['job_name']
fp_sets, _, temporal_ids, _, _ = Solar.get_sup3r_fps(fp_pattern)
logger.info('Solar module found {} sets of chunked source files to run '
'on. Submitting to {} nodes based on the number of temporal '
'chunks'.format(len(fp_sets), len(set(temporal_ids))))

for i_node, temporal_id in enumerate(sorted(set(temporal_ids))):
temporal_ids = sorted(set(temporal_ids))
max_nodes = config.get('max_nodes', len(temporal_ids))
max_nodes = min((max_nodes, len(temporal_ids)))
logger.info(
'Solar module found {} sets of chunked source files to run '
'on. Submitting to {} nodes based on the number of temporal '
'chunks {} and the requested number of nodes {}'.format(
len(fp_sets),
max_nodes,
len(temporal_ids),
config.get('max_nodes', None),
)
)

temporal_id_chunks = np.array_split(temporal_ids, max_nodes)
for i_node, temporal_ids in enumerate(temporal_id_chunks):
node_config = copy.deepcopy(config)
node_config['log_file'] = (
log_pattern if log_pattern is None
else os.path.normpath(log_pattern.format(node_index=i_node)))
name = ('{}_{}'.format(basename, str(i_node).zfill(6)))
log_pattern
if log_pattern is None
else os.path.normpath(log_pattern.format(node_index=i_node))
)
name = '{}_{}'.format(basename, str(i_node).zfill(6))
ctx.obj['NAME'] = name
node_config['job_name'] = name
node_config["pipeline_step"] = pipeline_step
node_config['pipeline_step'] = pipeline_step

node_config['temporal_id'] = temporal_id
node_config['temporal_ids'] = temporal_ids
cmd = Solar.get_node_cmd(node_config)

if hardware_option.lower() in AVAILABLE_HARDWARE_OPTIONS:
Expand All @@ -68,9 +96,16 @@ def from_config(ctx, config_file, verbose=False, pipeline_step=None):
kickoff_local_job(ctx, cmd, pipeline_step)


def kickoff_slurm_job(ctx, cmd, pipeline_step=None, alloc='sup3r',
memory=None, walltime=4, feature=None,
stdout_path='./stdout/'):
def kickoff_slurm_job(
ctx,
cmd,
pipeline_step=None,
alloc='sup3r',
memory=None,
walltime=4,
feature=None,
stdout_path='./stdout/',
):
"""Run sup3r on HPC via SLURM job submission.
Parameters
Expand All @@ -96,8 +131,17 @@ def kickoff_slurm_job(ctx, cmd, pipeline_step=None, alloc='sup3r',
stdout_path : str
Path to print .stdout and .stderr files.
"""
BaseCLI.kickoff_slurm_job(ModuleName.SOLAR, ctx, cmd, alloc, memory,
walltime, feature, stdout_path, pipeline_step)
BaseCLI.kickoff_slurm_job(
ModuleName.SOLAR,
ctx,
cmd,
alloc,
memory,
walltime,
feature,
stdout_path,
pipeline_step,
)


def kickoff_local_job(ctx, cmd, pipeline_step=None):
Expand Down

0 comments on commit 5b0ca8d

Please sign in to comment.