credit.datasets.hrrr#
HRRRDataset: PyTorch Dataset for HRRR GRIB2 data.
Supports three HRRR products (VALID_PRODUCTS):
"wrfprs"— pressure-level output (default, ~200 MB/file)"wrfnat"— native/hybrid-sigma level output (~200 MB/file, ~65 levels)"wrfsubh"— 15-minute sub-hourly surface output (surface vars only)
Tensor keys follow the pattern {user_provided_name}/{hrrr_product}/{field_type}/{dim}/{varname}
where hrrr_product is product-specific:
“wrfprs” → {user_provided_name}/`wrfprs/{field_type}/{dim}/{varname}` “wrfnat” → {user_provided_name}/`wrfnat/{field_type}/{dim}/{varname}` “wrfsubh” → {user_provided_name}/`wrfsubh/{field_type}/2d/{varname}`
dim is "3d" for multi-level variables and "2d" for surface variables.
- Tensor shapes (before DataLoader batching):
3D variables:
(n_levels, 1, y, x)2D variables:(1, 1, y, x)
The y / x spatial dimensions correspond to HRRR’s native Lambert
Conformal Conic grid; if extent is specified they reflect the cropped
sub-domain rather than the full CONUS grid (~1059 x 1799).
Two S3 path layouts are handled automatically:
- v1/v2 (before 2018-07-12):
s3://noaa-hrrr-bdp-pds/hrrr.{YYYYMMDD}/hrrr.t{HH}z.{product}f{FF:02d}.grib2
- v3/v4 (2018-07-12 onward):
s3://noaa-hrrr-bdp-pds/hrrr.{YYYYMMDD}/conus/hrrr.t{HH}z.{product}f{FF:02d}.grib2
GRIB2 reading#
Both local and remote modes use the same .idx + byte-range pipeline:
Remote mode:
Fetch the sidecar
.idxinventory (~100 KB) via HTTPS to get exact byte offsets for every GRIB message.Issue one HTTP Range GET per required message (~50-200 KB each) via
requests, with all messages fetched in parallel usingconcurrent.futures.ThreadPoolExecutor.
Local mode: reads the .idx sidecar from disk, then uses
file.seek() + file.read() — identical byte-range approach, no
full-file scan. The .idx sidecar must be present alongside the grib2;
download it with hrrr_download.py.
For a typical training sample (5 vars x 6 levels ≈ 30 messages) remote mode transfers ~3 MB instead of ~200 MB (~60-100x reduction).
Variable lookup is driven by VAR_REGISTRY. Extend it at import
time to add variables without subclassing:
from credit.datasets.hrrr import VAR_REGISTRY
VAR_REGISTRY["MYVAR"] = {
"shortName": "myvar", "typeOfLevel": "isobaricInhPa",
"idx_name": "MYVAR", "idx_level": None,
}
Example YAML (wrfprs, local mode):
data:
source:
Example_HRRR: # User-provided name (arbitrary key)
dataset_type: "HRRR"
# product: "wrfprs" # Optional for PRS product. Default is "wrfprs".
mode: "local"
base_path: "/data/hrrr"
forecast_hour: 0
levels: [250, 500, 700, 850, 925, 1000]
variables:
prognostic:
vars_3D: [T, U, V, Q, GH]
vars_2D: [t2m]
extent: [-130, -60, 20, 55]
start_datetime: "2021-06-01"
end_datetime: "2021-06-05"
timestep: "1h"
forecast_len: 0
Example YAML (wrfnat, remote mode):
data:
source:
Example_HRRR_NAT: # User-provided name (arbitrary key)
dataset_type: "HRRR"
product: "wrfnat" # Options: "wrfprs" (default), "wrfnat", "wrfsubh"
mode: "remote"
forecast_hour: 0
levels: [10, 20, 30, 40, 50] # hybrid level indices 1-65
variables:
prognostic:
vars_3D: [T, U, V, Q]
start_datetime: "2022-01-01"
end_datetime: "2022-01-31"
timestep: "1h"
forecast_len: 0
Example YAML (wrfsubh, remote mode — 15-min output):
data:
source:
Example_HRRR_SUBH: # User-provided name (arbitrary key)
dataset_type: "HRRR"
product: "wrfsubh" # Options: "wrfprs" (default), "wrfnat", "wrfsubh"
mode: "remote"
variables:
prognostic:
vars_2D: [t2m, sp, refc]
start_datetime: "2022-01-01 00:15"
end_datetime: "2022-01-31 00:00"
timestep: "15min"
forecast_len: 0
Attributes#
Classes#
CREDIT Dataset for HRRR GRIB2 data (wrfprs / wrfnat / wrfsubh). |
Functions#
|
Construct the S3 URI for a HRRR grib2 file. |
|
Construct the local filesystem path for a HRRR grib2 file. |
|
Convert an |
|
Parse a HRRR |
|
Fetch and parse the |
|
Fetch a single GRIB message via an HTTP Range request. |
|
Return a |
|
Return the float pressure levels to fetch, validating against available. |
|
Return |
|
Return native level indices to fetch, validating against available. |
|
Return the idx entry for a wrfsubh variable at a specific sub-step. |
|
Read a byte range directly from a local GRIB2 file. |
|
Read and parse the |
|
Return float32, replacing masked values with NaN. |
|
Validate the dataset request config, raising ValueError for invalid requests. |
Module Contents#
- credit.datasets.hrrr.logger#
- credit.datasets.hrrr._HRRR_V3_CUTOFF#
- credit.datasets.hrrr._S3_BUCKET = 'noaa-hrrr-bdp-pds'#
- credit.datasets.hrrr._HRRR_HTTPS_BASE = 'https://noaa-hrrr-bdp-pds.s3.amazonaws.com'#
- credit.datasets.hrrr.VAR_REGISTRY: dict[str, dict[str, str | None]]#
- credit.datasets.hrrr._MAX_REMOTE_WORKERS = 8#
- credit.datasets.hrrr._HTTP_TIMEOUT: tuple[int, int] = (10, 120)#
- credit.datasets.hrrr.VALID_PRODUCTS#
- credit.datasets.hrrr._hrrr_s3_uri(t: pandas.Timestamp, forecast_hour: int, product: VALID_PRODUCTS = 'wrfprs') str#
Construct the S3 URI for a HRRR grib2 file.
- Parameters:
t (pd.Timestamp) – Initialisation timestamp (UTC).
forecast_hour (int) – Forecast lead hour (FF), e.g.
0for analysis.product (VALID_PRODUCTS, optional) – HRRR product name. Defaults to “wrfprs”.
- Returns:
S3 URI.
- Return type:
str
- credit.datasets.hrrr._hrrr_local_path(base_path: str, t: pandas.Timestamp, forecast_hour: int, product: VALID_PRODUCTS = 'wrfprs') str#
Construct the local filesystem path for a HRRR grib2 file.
- Parameters:
base_path (str) – Root directory containing HRRR data.
t (pd.Timestamp) – Initialization timestamp (UTC).
forecast_hour (int) – Forecast lead hour (FF), e.g.
0for analysis.product (VALID_PRODUCTS, optional) – HRRR product name. Defaults to “wrfprs”.
- Returns:
Local filesystem path to the grib2 file.
- Return type:
str
- credit.datasets.hrrr._s3_uri_to_https(s3_uri: str) str#
Convert an
s3://noaa-hrrr-bdp-pds/...URI to a public HTTPS URL.- Parameters:
s3_uri (str) – SRI URI
- Returns:
Public HTTPS URL
- Return type:
str
- credit.datasets.hrrr._parse_idx(text: str) list[dict[str, str | int | None]]#
Parse a HRRR
.idxinventory file into a list of message entries.Each entry dict has keys:
var,level,byte_start,byte_end(Nonefor the last entry, meaning read to EOF).- Parameters:
text (str) – The content of the .idx file.
- Returns:
Entries parsed from the .idx, in file order.
- Return type:
list[dict[str, str | int | None]]
- credit.datasets.hrrr._fetch_idx(s3_uri: str) list[dict[str, str | int | None]]#
Fetch and parse the
.idxsidecar for a HRRR grib2 file via HTTPS.- Parameters:
s3_uri (str) – S3 URI
- Raises:
FileNotFoundError – If the
.idxfile is not found (older v1/v2 files may lack sidecars; pre-download withhrrr_download.pyand use local mode instead).- Returns:
Entries parsed from the .idx, in file order.
- Return type:
list[dict[str, str | int | None]]
- credit.datasets.hrrr._fetch_message(https_url: str, byte_start: int, byte_end: int | None, session=None) bytes#
Fetch a single GRIB message via an HTTP Range request.
- Parameters:
https_url (str) – Public HTTPS URL of the grib2 file.
byte_start (int) – First byte of the message (inclusive).
byte_end (int | None) – Last byte of the message (inclusive), or
Nonefor EOF.session (_type_, optional) – Optional
requests.Sessionfor connection reuse. Falls back to module-levelrequests.getifNone. Defaults to None
- Returns:
The raw bytes of the GRIB message for that byte range.
- Return type:
bytes
- credit.datasets.hrrr._build_prs_entry_map(idx_entries: list[dict[str, str | int | None]], idx_name: str) dict[float, dict[str, str | None]]#
Return a
{pressure_level_hPa: idx_entry}dict for a pressure-level variable.- Parameters:
idx_entries (list[dict[str, str | int | None]]) – List of entries parsed from the .idx file.
idx_name (str) – Name of the variable to filter for.
- Returns:
Mapping from pressure level (hPa) to the corresponding .idx entry for that variable.
- Return type:
dict[float, dict[str, str | None]]
- credit.datasets.hrrr._resolve_pressure_levels(requested: list[int] | None, prs_map: dict[float, dict[str, str | None]], var_name: str) list[float]#
Return the float pressure levels to fetch, validating against available.
- Parameters:
requested (list[int] | None) – List of requested pressure levels.
prs_map (dict[float, dict[str, str | None]]) – Mapping from _build_prs_entry_map()
var_name (str) – Variable name for error messages (e.g. “T”, “U”, “Q”, etc.)
- Raises:
ValueError – If any requested levels are not found in the available levels for that variable.
- Returns:
The float pressure levels to fetch.
- Return type:
list[float]
- credit.datasets.hrrr._build_nat_entry_map(idx_entries: list[dict[str, str | int | None]], idx_name: str) dict[int, dict[str, str | None]]#
Return
{hybrid_level_index: idx_entry}for a wrfnat variable.HRRR native-level
.idxentries look like:TMP:10 hybrid level:anl:
i.e.
levelends with" hybrid level"and the prefix is the integer level index (1-65, bottom-up).- Parameters:
idx_entries (list[dict[str, str | int | None]]) – List of entries parsed from the .idx file.
idx_name (str) – Name of the variable to filter for.
- Returns:
Mapping from hybrid level index to the corresponding .idx entry for that variable.
- Return type:
dict[int, dict[str, str | None]]
- credit.datasets.hrrr._resolve_nat_levels(requested: list[int] | None, nat_map: dict[int, dict[str, str | None]], var_name: str) list[int]#
Return native level indices to fetch, validating against available.
- Parameters:
requested (list[int] | None) – List of requested hybrid levels.
nat_map (dict[int, dict[str, str | None]]) – Mapping from _build_nat_entry_map()
var_name (str) – Variable name for error messages (e.g. “T”, “U”, “Q”, etc.)
- Raises:
ValueError – If any requested levels are not found in the available levels for that variable.
- Returns:
The integer native level indices to fetch.
- Return type:
list[int]
- credit.datasets.hrrr._find_subhf_entry(idx_entries: list[dict[str, str | int | None]], idx_name: str, idx_level: str, step_min: int) dict[str, str | int | None]#
Return the idx entry for a wrfsubh variable at a specific sub-step.
Sub-hourly
.idxentries have astepfield like"15 min fcst","30 min fcst","45 min fcst","60 min fcst".- Parameters:
idx_entries (list[dict[str, str | int | None]])) – Parsed
.idxentries for the wrfsubh file.idx_name (str) – Variable name as it appears in the
.idx.idx_level (str) – Level string (e.g.
"2 m above ground").step_min (int) – Sub-step in minutes (15, 30, 45, 60, …).
- Raises:
KeyError – If no matching entry is found.
- Returns:
The matching .idx entry for that variable, level, and step.
- Return type:
dict[str, str | int | None]
- credit.datasets.hrrr._fetch_bytes_local(path: str, byte_start: int, byte_end: int | None) bytes#
Read a byte range directly from a local GRIB2 file.
- Parameters:
path (str) – Absolute path to the local grib2 file.
byte_start (int) – First byte (inclusive).
byte_end (int | None) – Last byte (inclusive), or
Noneto read to EOF.
- Returns:
Raw bytes for that message.
- Return type:
bytes
- credit.datasets.hrrr._load_idx_local(grib2_path: str) list[dict[str, str | int | None]]#
Read and parse the
.idxsidecar from local disk.Expects the index at
{grib2_path}.idx. Download it alongside the grib2 withhrrr_download.py.- Parameters:
grib2_path (str) – Absolute path to the local grib2 file.
- Raises:
FileNotFoundError – If the
.idxfile is absent.- Returns:
Entries parsed from the .idx, in file order.
- Return type:
list[dict[str, str | int | None]]
- credit.datasets.hrrr._to_float32(values: numpy.ndarray) numpy.ndarray#
Return float32, replacing masked values with NaN.
- Parameters:
values (np.ndarray) – Values to convert, potentially a masked array.
- Returns:
Array with masked values filled with NaN and dtype float32.
- Return type:
np.ndarray
- credit.datasets.hrrr._validate_product_request(product_request: str) VALID_PRODUCTS#
Validate the dataset request config, raising ValueError for invalid requests.
- Parameters:
product_request (str) – The HRRR product name from the config (e.g. “wrfprs”, “wrfnat”, “wrfsubh”).
- Raises:
ValueError – If the product is not recognized or mapped to a valid HRRR product.
- Returns:
The validated HRRR product name.
- Return type:
- class credit.datasets.hrrr.HRRRDataset(data_config: dict[str, Any], return_target: bool = False)#
Bases:
credit.datasets.base_dataset.BaseDatasetCREDIT Dataset for HRRR GRIB2 data (wrfprs / wrfnat / wrfsubh).
Implements the same field-type semantics as BaseDataset:
prognostic— input at step 0 and target (autoregressive rollout)dynamic_forcing— input at every step; never a targetdiagnostic— target onlystatic— input at step 0; never a target, applies to all steps
Both modes use
pygribfor GRIB2 decoding. Remote mode fetches the.idxsidecar and issues parallel HTTP Range requests — no full file download required.See module docstring for full output format, tensor shapes, and YAML configuration examples.
- dataset_type#
Tensor key - “HRRR”
- product#
Active HRRR product (
"HRRR_PRS" / "wrfprs","HRRR_NAT" / "wrfnat", or"HRRR_SUBH" / "wrfsubh") with default value"HRRR_PRS".
- datetimes#
DatetimeIndex of valid initialisation timestamps.
- static_metadata#
Dataset-level metadata for MultiSourceDataset.
- dataset_type#
- product: VALID_PRODUCTS#
- mode: str#
- base_path: str | None#
- forecast_hour: int#
- extent: list[float] | None#
- global_levels: list[int] | None#
- num_fetch_workers: int#
- static_metadata: dict[str, Any]#
- _idx_cache: dict[str, list[dict[str, str | int | None]]]#
- _http_session = None#
- _spatial_slice: tuple[slice, slice] | None = None#
- _get_session()#
Return the shared
requests.Session, creating it on first call.Created lazily so the session is never open before a DataLoader worker forks — each worker ends up with its own independent connection pool.
- _get_spatial_slice(lats: numpy.ndarray, lons: numpy.ndarray) tuple[slice, slice]#
Return
(row_slice, col_slice)forself.extent, computed once.The HRRR grid is fixed (Lambert Conformal Conic, ~1059 × 1799), so the bounding-box row/col indices for a given
extentare identical for every message and every timestep. The result is cached after the first call so subsequent samples pay no recomputation cost.- Parameters:
lats (np.ndarray) – 2D latitude array from a decoded pygrib message.
lons (np.ndarray) – 2D longitude array from a decoded pygrib message.
- Raises:
ValueError – If
self.extentdoes not intersect the HRRR domain.- Returns:
(row_slice, col_slice)ready for direct numpy indexing. Both slices areslice(None)whenself.extentisNone.
- _register_field(field_type: credit.datasets.base_dataset.VALID_FIELD_TYPES, field_config: dict[str, list[str] | None] | None) None#
Extends the _register_field method of BaseDataset to include levels and checking with HRRR VAR_REGISTRY.
- Parameters:
field_type (VALID_FIELD_TYPES) – One of VALID_FIELD_TYPES, namely:
"prognostic","dynamic_forcing","static","diagnostic".field_config (dict[str, list[str] | None] | None) – Field-type config dict, or
None/ null to disable the field.
- Raises:
KeyError – If a variable in the field config is not in the HRRR VAR_REGISTRY.
- _extract_field(field_type: credit.datasets.base_dataset.VALID_FIELD_TYPES, t: pandas.Timestamp, sample: dict[str, Any]) None#
Replace the _extract_field method of BaseDataset to implement the HRRR-specific file resolution and fetching logic.
Load all variables for field_type at time t into sample.
Resolves the file path / URI, loads the
.idx(cached), then delegates to_extract_from_idx()with the appropriate byte fetcher for the current mode.For
wrfsubh, t is a 15-min-resolution timestamp. This method derives the HRRR init time and FF file number automatically:init_hour = t.floor("1h")step_min = minutes since init(15, 30, 45, 60, …)ff = ceil(step_min / 60)(file number within the run)If t is exactly on the hour, it is treated as the 60-min step of the previous hour’s run (
init_hour -= 1h,step_min = 60).
- Parameters:
field_type (VALID_FIELD_TYPES) – One of VALID_FIELD_TYPES, namely:
"prognostic","dynamic_forcing","static","diagnostic".t (pd.Timestamp) – Initialization timestamp (UTC). For
wrfsubh, this is a 15-min-resolution timestamp like2024-01-01T00:15:00Z.sample (dict[str, Any]) – The sample dict being built in __getitem__
- _extract_from_idx(field_type: credit.datasets.base_dataset.VALID_FIELD_TYPES, idx_entries: list[dict[str, str | int | None]], fetcher: Callable[[dict[str, str | int | None]], bytes], vd: dict[str, list[str | int]], sample: dict[str, Any], step_min: int | None = None) None#
Shared fetch-plan → parallel byte fetch → decode → tensor pipeline.
Used by both local and remote modes. The only difference between modes is the fetcher callable that maps an idx entry to raw GRIB bytes. Product-specific level dispatch (pressure vs hybrid-sigma vs sub-hourly) is handled here based on
self.product.- Parameters:
field_type (VALID_FIELD_TYPES) – One of VALID_FIELD_TYPES.
idx_entries (list[dict[str, str | int | None]]) – Parsed
.idxentries for the target file.fetcher – Callable
(entry: dict) -> bytesthat fetches the raw GRIB message for a given idx entry.vd (dict[str, list[str | int]]) – Variable dict (
vars_3D,vars_2D,levels).sample (dict[str, Any]) – Output dict to populate in-place.
step_min (int | None) – Sub-hourly step in minutes (15, 30, 45, 60, …). Only used when
self.product == "wrfsubh".