import gzip
import io
import json
import logging
import os
import tempfile
import uuid
import zipfile
from datetime import datetime, timezone
from pathlib import Path
from typing import List, Optional, Tuple
import geopandas as gpd
import pandas as pd
import pandera.pandas as pa
import requests
from bs4 import BeautifulSoup
from dateutil import parser as dateparser
from dateutil.relativedelta import relativedelta
from lat_lon_parser import parse as parse_lat_lon
from ocha_lens.utils.storm import (
_create_storm_id,
_to_gdf,
check_coordinate_bounds,
check_crs,
check_quadrant_list,
)
# Set up logging
logger = logging.getLogger(__name__)
# Basin code mapping from ATCF to standard codes
# Note: IBTrACS and ECMWF use "EP" for the entire Eastern North Pacific
# (east of 180°W), which includes both NHC (east of 140°W) and CPHC
# (140°W-180°W) areas of responsibility
BASIN_CODE_MAPPING = {
"al": "NA", # Atlantic → North Atlantic
"ep": "EP", # Eastern Pacific (NHC area: east of 140°W)
"cp": "EP", # Central Pacific (CPHC area: 140°W-180°W) → mapped to EP for standardization
}
# NHC API URL
NHC_CURRENT_STORMS_URL = "https://www.nhc.noaa.gov/CurrentStorms.json"
# NHC storm name lookup table
NHC_STORM_TABLE_URL = "https://ftp.nhc.noaa.gov/atcf/archive/storm.table"
# ATCF Archive URL pattern
ATCF_ARCHIVE_URL = (
"https://ftp.nhc.noaa.gov/atcf/archive/{year}/"
"a{basin}{number:02d}{year}.dat.gz"
)
# ATCF aid_public URL pattern (for current/recent years)
ATCF_AID_PUBLIC_URL = (
"https://ftp.nhc.noaa.gov/atcf/aid_public/"
"a{basin}{number:02d}{year}.dat.gz"
)
# ATCF A-deck column names (first 35 standard columns)
# Format documentation: https://ftp.nhc.noaa.gov/atcf/README
# See also: https://github.com/palewire/atcf-data-parser
ATCF_ADECK_COLUMNS = [
"basin", # 0: Basin identifier (AL, EP, CP, etc.)
"cy", # 1: Annual cyclone number (01-99)
"yyyymmddhh", # 2: Warning Date-Time-Group
"technum", # 3: Objective technique sorting number
"tech", # 4: Technique (model/method identifier)
"tau", # 5: Forecast period (hours, 0-168)
"lat", # 6: Latitude (tenths of degrees, N/S)
"lon", # 7: Longitude (tenths of degrees, E/W)
"vmax", # 8: Maximum sustained wind speed (knots)
"mslp", # 9: Minimum sea level pressure (mb)
"ty", # 10: Level of tropical cyclone development
"rad", # 11: Wind intensity for radii (34, 50, 64 kt)
"windcode", # 12: Radius code (AAA, NEQ, etc.)
"rad1", # 13: Radius in NE quadrant (nm)
"rad2", # 14: Radius in SE quadrant (nm)
"rad3", # 15: Radius in SW quadrant (nm)
"rad4", # 16: Radius in NW quadrant (nm)
"pouter", # 17: Pressure of outermost closed isobar
"router", # 18: Radius of outermost closed isobar
"rmw", # 19: Radius of maximum winds
"gusts", # 20: Gust speed (knots)
"eye", # 21: Eye diameter (nm)
"subregion", # 22: Sub-region code
"maxseas", # 23: Maximum seas (ft)
"initials", # 24: Forecaster's initials
"dir", # 25: Storm direction (degrees)
"speed", # 26: Storm speed (knots)
"stormname", # 27: Storm name
"depth", # 28: Depth (D=deep, M=medium, S=shallow)
"seas", # 29: Seas (Wave height, ft)
"seascode", # 30: Seas radius code
"seas1", # 31: Seas radius in NE quadrant
"seas2", # 32: Seas radius in SE quadrant
"seas3", # 33: Seas radius in SW quadrant
"seas4", # 34: Seas radius in NW quadrant
]
# Extend with additional columns for newer ATCF formats (following HDX pattern)
# These handle extended ATCF data that can have up to 75+ columns
for i in range(1, 21):
ATCF_ADECK_COLUMNS.append(f"userdefine{i}")
ATCF_ADECK_COLUMNS.append(f"userdata{i}")
# Official forecast technique identifiers
OFFICIAL_FORECAST_TECHS = [
"OFCL", # Official NHC forecast
"OFCP", # Official forecast (alternate)
"HFIP", # HFIP consensus (sometimes used as official)
]
# Wind radii column names
WIND_RADII_COLUMNS = [
"quadrant_radius_34",
"quadrant_radius_50",
"quadrant_radius_64",
]
# Schema for storm metadata
# Note: genesis_basin represents the storm's designation basin (where it originated)
# and remains constant throughout the storm's lifecycle
STORM_SCHEMA = pa.DataFrameSchema(
{
"atcf_id": pa.Column(str, nullable=False),
"name": pa.Column(
str,
nullable=True,
checks=[
pa.Check(
lambda s: s.upper() == s,
error="County must be uppercase",
element_wise=True,
)
],
),
"number": pa.Column(str, nullable=False),
"season": pa.Column(
"int64", pa.Check.between(1840, 2050), nullable=False
),
"genesis_basin": pa.Column(
str,
pa.Check.isin(list(BASIN_CODE_MAPPING.values())),
nullable=False,
),
"provider": pa.Column(str, nullable=True),
"storm_id": pa.Column(str, nullable=True),
},
strict=True,
coerce=True,
unique=["atcf_id", "storm_id"],
report_duplicates="all",
)
# Schema for track data
# Note: basin represents the storm's standardized designation basin and is
# constant for all track points. It does NOT represent the storm's current
# geographic location. Both NHC (east of 140°W) and CPHC (140°W-180°W) areas
# are coded as "EP" to match IBTrACS/ECMWF standardization.
TRACK_SCHEMA = pa.DataFrameSchema(
{
"atcf_id": pa.Column(str, nullable=False),
"provider": pa.Column(str, nullable=False),
"basin": pa.Column(
str,
pa.Check.isin(list(BASIN_CODE_MAPPING.values())),
nullable=False,
),
"issued_time": pa.Column(pd.Timestamp, nullable=False),
"valid_time": pa.Column(pd.Timestamp, nullable=False),
"leadtime": pa.Column("Int64", pa.Check.ge(0), nullable=False),
"wind_speed": pa.Column(
float, pa.Check.between(0, 300), nullable=False
),
"pressure": pa.Column(
float, pa.Check.between(800, 1100), nullable=True
),
"max_wind_radius": pa.Column("Int64", pa.Check.ge(0), nullable=True),
"last_closed_isobar_radius": pa.Column(
"Int64", pa.Check.ge(0), nullable=True
),
"last_closed_isobar_pressure": pa.Column(
"Int64", pa.Check.between(800, 1100), nullable=True
),
"gust_speed": pa.Column(
"Int64", pa.Check.between(0, 400), nullable=True
),
"nature": pa.Column(str, nullable=True),
"quadrant_radius_34": pa.Column(
"object", checks=pa.Check(check_quadrant_list), nullable=True
),
"quadrant_radius_50": pa.Column(
"object", checks=pa.Check(check_quadrant_list), nullable=True
),
"quadrant_radius_64": pa.Column(
"object", checks=pa.Check(check_quadrant_list), nullable=True
),
"number": pa.Column(str, nullable=True),
"storm_id": pa.Column(str, nullable=True),
"point_id": pa.Column(str, nullable=False),
"geometry": pa.Column(gpd.array.GeometryDtype, nullable=False),
},
strict=True,
coerce=True,
unique=[
"atcf_id",
"valid_time",
"leadtime",
"issued_time",
"geometry",
"wind_speed",
],
report_duplicates="all",
checks=[
pa.Check(
lambda gdf: check_crs(gdf, "EPSG:4326"),
error="CRS must be EPSG:4326",
),
pa.Check(
lambda gdf: check_coordinate_bounds(gdf),
error="All coordinates must be within valid bounds",
),
],
)
# Derive base columns from TRACK_SCHEMA for raw data functions
# Raw data has name, latitude, longitude instead of storm_id, point_id, geometry
def _get_base_columns():
"""Get column list for raw data (before get_tracks transformation)."""
cols = list(TRACK_SCHEMA.columns.keys())
# Remove columns added by get_tracks
for col in ["storm_id", "point_id", "geometry"]:
if col in cols:
cols.remove(col)
# Add columns used by raw data but removed by get_tracks
cols.extend(["name", "latitude", "longitude"])
return cols
BASE_COLUMNS = _get_base_columns()
# Helper Functions
def _fetch_storm_names() -> dict:
"""
Fetch storm name lookup table from NHC archive.
Returns
-------
dict
Mapping of ATCF ID (uppercase) to storm name
Example: {'AL132023': 'LEE', 'EP092023': 'HILARY'}
"""
try:
response = requests.get(NHC_STORM_TABLE_URL, timeout=30)
response.raise_for_status()
# Parse the table
storm_names = {}
for line in response.text.strip().split("\n"):
if not line.strip():
continue
# Split by comma
cols = [c.strip() for c in line.split(",")]
if len(cols) < 21:
continue
name = cols[0] # Column 1: Storm name
atcf_id = cols[20] # Column 21: ATCF ID
if name and atcf_id:
# Convert to uppercase to match output ATCF ID format
storm_names[atcf_id.upper()] = name
logger.info(f"Loaded {len(storm_names)} storm names from NHC archive")
return storm_names
except Exception as e:
logger.warning(f"Failed to fetch storm name table: {e}")
return {}
def _get_basin_code(atcf_id: str) -> str:
"""
Extract and convert ATCF basin code to standard basin code.
Parameters
----------
atcf_id : str
ATCF storm identifier (e.g., "EP092023", "AL142024")
Returns
-------
str
Standard basin code (NA or EP)
"""
basin_prefix = atcf_id[:2].lower()
return BASIN_CODE_MAPPING.get(basin_prefix, basin_prefix.upper())
def _parse_valid_time(valid_time_str: str, issuance: str) -> pd.Timestamp:
"""
Convert relative forecast time to absolute timestamp.
NHC advisories use relative time format like "29/1200Z" which needs
to be converted to an absolute timestamp based on the issuance time.
Parameters
----------
valid_time_str : str
Forecast time in format "DD/HHMMZ" (e.g., "29/1200Z")
issuance : str
Advisory issuance timestamp (ISO format or parseable string)
Returns
-------
pd.Timestamp
UTC timestamp for the valid time
"""
# Parse issuance time
issuance_dt = dateparser.parse(issuance)
# Clean up the valid time string
time_part = valid_time_str.replace("Z", "").strip()
# Parse day and time components
if "/" in time_part:
day_str, hhmm = time_part.split("/")
else:
# Sometimes just "HHMM" without day
day_str = str(issuance_dt.day)
hhmm = time_part
day = int(day_str)
hour = int(hhmm[:2]) if len(hhmm) >= 2 else 0
minute = int(hhmm[2:4]) if len(hhmm) >= 4 else 0
# Create forecast datetime with the specified day/time
forecast_dt = issuance_dt.replace(
day=day, hour=hour, minute=minute, second=0, microsecond=0
)
# Handle month rollover (if forecast day < issuance day, add 1 month)
if day < issuance_dt.day:
forecast_dt = forecast_dt + relativedelta(months=1)
# Convert to pandas Timestamp (forecast_dt already has timezone from issuance_dt)
return pd.Timestamp(forecast_dt)
def _get_provider(atcf_id_or_basin: str) -> str:
"""
Determine the provider (NHC or CPHC) based on ATCF ID or basin prefix.
Parameters
----------
atcf_id_or_basin : str
ATCF ID (e.g., "CP012024") or basin prefix (e.g., "cp")
Returns
-------
str
Provider name ("nhc" or "cphc")
"""
# Extract basin prefix (first 2 chars, lowercase)
basin_prefix = atcf_id_or_basin[:2].lower()
return "cphc" if basin_prefix == "cp" else "nhc"
def _parse_atcf_lat_lon(lat_str: str, lon_str: str) -> Tuple[float, float]:
"""
Parse ATCF latitude and longitude format.
ATCF coordinates are in tenths of degrees with N/S/E/W suffix.
Examples: "253N" = 25.3°N, "755W" = 75.5°W
Parameters
----------
lat_str : str
Latitude string (e.g., "253N", "185S")
lon_str : str
Longitude string (e.g., "755W", "1234E")
Returns
-------
tuple of (float, float)
Latitude and longitude in decimal degrees
"""
# Parse latitude
lat_value = float(lat_str[:-1]) / 10.0
lat_dir = lat_str[-1]
if lat_dir == "S":
lat_value = -lat_value
# Parse longitude
lon_value = float(lon_str[:-1]) / 10.0
lon_dir = lon_str[-1]
if lon_dir == "W":
lon_value = -lon_value
return lat_value, lon_value
def _parse_atcf_adeck(file_path: Path) -> pd.DataFrame:
"""
Parse ATCF A-deck file to standardized DataFrame format.
ATCF A-deck files contain forecast data in comma-delimited format.
This function reads the file, filters for official forecasts (OFCL),
and converts to the standard NHC DataFrame schema.
Parameters
----------
file_path : Path
Path to ATCF A-deck file (.dat or .dat.gz)
Returns
-------
pd.DataFrame
Standardized DataFrame with NHC schema fields
"""
# Extract expected ATCF ID from filename (remove leading 'a' and extension)
# Filename format: aal012023.dat or aal012023.dat.gz
# Uppercase basin letters to match IBTrACS format (AL012023 not al012023)
filename = (
file_path.stem
if file_path.suffix != ".gz"
else file_path.stem.replace(".dat", "")
)
atcf_id = (
filename[1:3].upper() + filename[3:]
) # Uppercase basin code (first 2 chars)
# Determine if file is gzipped
if file_path.suffix == ".gz":
open_func = gzip.open
mode = "rt" # Read as text
else:
open_func = open
mode = "r"
# Read CSV with pre-defined columns (no header in ATCF files)
# Note: ATCF files can have variable number of columns per line (30-75+)
# Using extended column list (75 columns) to handle all ATCF formats
try:
with open_func(file_path, mode) as f:
df = pd.read_csv(
f,
header=None,
names=ATCF_ADECK_COLUMNS,
skipinitialspace=True,
na_values=["", " ", "null"],
keep_default_na=True,
engine="python", # Python engine handles variable-length rows
)
except Exception as e:
logger.error(f"Failed to read ATCF file {file_path}: {e}")
return pd.DataFrame(columns=BASE_COLUMNS)
logger.debug(f"Read {len(df)} records from {file_path}")
# Filter for official forecasts only (OFCL, OFCP, etc.)
# Excludes CARQ (best track observations) and model forecasts
df = df[df["tech"].isin(OFFICIAL_FORECAST_TECHS)].copy()
if df.empty:
logger.warning(
f"No official forecast data (OFCL) found in {file_path}. "
f"Storm may not have had forecasts issued."
)
return pd.DataFrame(columns=BASE_COLUMNS)
logger.debug(f"Filtered to {len(df)} official forecast records")
# Parse issued time (YYYYMMDDHH)
df["issued_time"] = pd.to_datetime(
df["yyyymmddhh"], format="%Y%m%d%H", utc=True
)
# Calculate valid time (issued_time + tau hours)
df["valid_time"] = df.apply(
lambda row: row["issued_time"] + pd.Timedelta(hours=row["tau"]), axis=1
)
# Parse coordinates
coords = df.apply(
lambda row: _parse_atcf_lat_lon(row["lat"], row["lon"])
if pd.notna(row["lat"]) and pd.notna(row["lon"])
else (None, None),
axis=1,
)
df["latitude"] = coords.apply(lambda x: x[0])
df["longitude"] = coords.apply(lambda x: x[1])
df["atcf_id"] = atcf_id
# Validate ATCF ID from filename matches data
year = df["yyyymmddhh"].astype(str).str[:4]
expected_ids = (
df["basin"].str.upper() + df["cy"].astype(str).str.zfill(2) + year
)
# Normalize both IDs by removing punctuation for comparison
# .replace(".0") is to force equivalence of 'EP10.02012' and 'EP102012'
expected_id_normalized = expected_ids.unique()[0].replace(".0", "")
if expected_id_normalized != atcf_id:
logger.error(
f"ATCF ID mismatch in {file_path.name}: "
f"filename={atcf_id}, data={expected_id_normalized}. "
f"Skipping file."
)
return None
if len(expected_ids.unique()) > 1:
logger.warning(
f"Multiple ATCF IDs found in {file_path.name}: {expected_ids.unique()}"
)
# Map basin to standard codes
df["basin_std"] = df["basin"].str.lower().map(BASIN_CODE_MAPPING)
# Get provider based on original basin prefix (before standardization)
df["provider"] = df["basin"].apply(_get_provider)
# Extract storm number
df["number"] = df["cy"].astype(str).str.zfill(2)
# Get storm name from official NHC storm.table
# This is more reliable than ATCF stormname field which is blank for OFCL forecasts
storm_name_lookup = _fetch_storm_names()
# Map storm names using ATCF ID
df["name"] = df["atcf_id"].map(storm_name_lookup)
# Fallback to stormname field if lookup fails (for very recent storms)
if df["name"].isna().any():
df["name"] = df["name"].fillna(
df["stormname"].replace(["", " ", "UNNAMED"], None)
)
# Forward-fill within storm if still missing
df["name"] = df.groupby("atcf_id")["name"].ffill().bfill()
# Wind speed and pressure (float columns - NaN is appropriate)
df["wind_speed"] = pd.to_numeric(df["vmax"], errors="coerce")
df["pressure"] = pd.to_numeric(df["mslp"], errors="coerce")
# ATCF uses 0 as missing value indicator - replace with NaN
df.loc[df["pressure"] == 0, "pressure"] = pd.NA
df.loc[df["wind_speed"] == 0, "wind_speed"] = pd.NA
# Additional fields from ATCF to match IBTrACS schema
# These are Int64 columns - use pd.NA for consistency
# Convert to Int64 and replace 0 with pd.NA (ATCF uses 0 for missing values)
for new_col, atcf_col in [
("max_wind_radius", "rmw"),
("last_closed_isobar_radius", "router"),
("last_closed_isobar_pressure", "pouter"),
("gust_speed", "gusts"),
]:
df[new_col] = pd.to_numeric(df[atcf_col], errors="coerce").astype(
"Int64"
)
df.loc[df[new_col] == 0, new_col] = pd.NA
# Nature field (ty column) - storm type/classification (string column)
df["nature"] = df["ty"].replace(["", " "], pd.NA)
# Leadtime
df["leadtime"] = df["tau"].astype("int64")
# Parse wind radii from ATCF format
# Wind radii are stored in separate rows with rad, rad1-rad4 columns
# Group by forecast key (atcf_id + issued_time + tau) and collect radii
df["rad"] = pd.to_numeric(df["rad"], errors="coerce")
df["rad1"] = pd.to_numeric(df["rad1"], errors="coerce")
df["rad2"] = pd.to_numeric(df["rad2"], errors="coerce")
df["rad3"] = pd.to_numeric(df["rad3"], errors="coerce")
df["rad4"] = pd.to_numeric(df["rad4"], errors="coerce")
# Create forecast key for grouping
df["forecast_key"] = (
df["atcf_id"]
+ "_"
+ df["issued_time"].astype(str)
+ "_"
+ df["tau"].astype(str)
)
# Extract wind radii rows (where rad is 34, 50, or 64)
radii_mask = df["rad"].isin([34, 50, 64])
radii_df = df[radii_mask].copy()
# Create dictionary to store radii by forecast_key
radii_dict = {}
for _, row in radii_df.iterrows():
key = row["forecast_key"]
rad_val = int(row["rad"])
if key not in radii_dict:
radii_dict[key] = {}
# Create list [NE, SE, SW, NW]
radii_list = [row["rad1"], row["rad2"], row["rad3"], row["rad4"]]
# Convert to int list if all values are valid (>= 0) and at least one is positive
# Zero is valid - it means wind doesn't extend in that quadrant at this threshold
if all(pd.notna(r) and r >= 0 for r in radii_list) and any(
r > 0 for r in radii_list
):
radii_list = [int(r) for r in radii_list]
else:
radii_list = None
# Store in dictionary by wind threshold
radii_dict[key][f"quadrant_radius_{rad_val}"] = radii_list
# Initialize wind radii columns with object dtype to hold lists
for col in WIND_RADII_COLUMNS:
df[col] = pd.Series([None] * len(df), dtype="object")
# Merge radii back to main dataframe
for key, radii in radii_dict.items():
indices = df[df["forecast_key"] == key].index
# Assign to each row individually to avoid pandas broadcasting issues
for radii_col in WIND_RADII_COLUMNS:
if radii_col in radii:
for idx in indices:
df.at[idx, radii_col] = radii[radii_col]
# Remove duplicate rows after merging wind radii.
# ATCF files store wind radii at different thresholds (34, 50, 64 kt) on
# separate rows. When these supplementary rows have the same position as
# the main forecast, they are duplicates. In older data (e.g., 2001),
# supplementary rows may have slightly different positions (typically <1°)
# and truncated data (mslp=0, blank fields) - these are kept as separate
# rows to preserve all position information from the source data.
# Note: pressure is intentionally excluded from the subset because
# supplementary rows often have mslp=0 (→NaN) while main rows have
# real pressure values.
df = df.drop_duplicates(
subset=[
"forecast_key",
"latitude",
"longitude",
"wind_speed",
],
keep="first",
)
# Select and rename columns to match schema
# Use BASE_COLUMNS but substitute basin_std for basin during selection
select_cols = [
col if col != "basin" else "basin_std" for col in BASE_COLUMNS
]
result = df[select_cols].rename(columns={"basin_std": "basin"})
# Drop rows with missing coordinates or wind_speed
initial_count = len(result)
result = result.dropna(subset=["latitude", "longitude", "wind_speed"])
dropped = initial_count - len(result)
if dropped > 0:
logger.warning(
f"Dropped {dropped} rows with null wind_speed from {file_path.name}"
)
logger.info(
f"Parsed {len(result)} valid forecast points from {file_path.name}"
)
return result
def _fetch_forecast_advisory(url: str) -> Optional[str]:
"""
Fetch forecast advisory HTML from NHC and extract text content.
Parameters
----------
url : str
URL to the forecast advisory HTML page
Returns
-------
str or None
Extracted text content from <pre> tag, or None if fetch fails
"""
try:
response = requests.get(url, timeout=(10, 30))
response.raise_for_status()
# Parse HTML and extract <pre> tag content
soup = BeautifulSoup(response.content, "html.parser")
pre_tag = soup.find("pre")
if pre_tag is None:
logger.warning(f"No <pre> tag found in advisory at {url}")
return None
return pre_tag.get_text()
except requests.exceptions.Timeout:
logger.warning(f"Timeout fetching advisory from {url}")
return None
except requests.exceptions.RequestException as e:
logger.warning(f"Failed to fetch advisory from {url}: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error parsing advisory from {url}: {e}")
return None
def _parse_forecast_advisory(
advisory_text: str,
storm_id: str,
storm_name: str,
issuance: str,
basin: str,
) -> List[dict]:
"""
Parse NHC forecast advisory text to extract forecast points.
Parses lines starting with "FORECAST VALID" or "OUTLOOK VALID" to
extract position and wind speed forecasts. Also handles "REMNANTS OF
CENTER LOCATED NEAR" lines for dissipating storms.
Parameters
----------
advisory_text : str
Raw text from <pre> tag in advisory HTML
storm_id : str
ATCF storm identifier (e.g., "EP092023")
storm_name : str
Storm name
issuance : str
Issuance timestamp of advisory
basin : str
Basin code (NA or EP)
Returns
-------
list of dict
Forecast points with valid_time, latitude, longitude, wind_speed
"""
forecast_points = []
lines = advisory_text.split("\n")
# Validate ATCF ID matches advisory text
# Look for pattern like "MIAMI FL AL132023" in header
import re
atcf_pattern = r"MIAMI FL\s+([A-Z]{2}\d{6})"
for line in lines[:10]: # Check first 10 lines only
match = re.search(atcf_pattern, line)
if match:
advisory_atcf_id = match.group(1).upper()
if advisory_atcf_id != storm_id.upper():
logger.error(
f"ATCF ID mismatch: parameter={storm_id}, "
f"advisory text={advisory_atcf_id}. Skipping advisory."
)
return []
break
latitude = longitude = maxwind = valid_time = gust_speed = None
radii_34 = radii_50 = radii_64 = None
for ln in lines:
# Save original line for pattern matching before preprocessing
original_ln = ln
# Preprocessing the line (following HDX pattern)
ln = ln.replace("...", " ")
ln = ln.replace(" ", " ")
forecast_line = ln.split()
# Parse FORECAST VALID or OUTLOOK VALID lines
if (
ln.startswith("FORECAST VALID") or ln.startswith("OUTLOOK VALID")
) and len(forecast_line) >= 5:
# Save previous forecast point before starting new one
if all([latitude, longitude, maxwind, valid_time]) and maxwind > 0:
forecast_points.append(
{
"atcf_id": storm_id,
"name": storm_name.upper(),
"basin": basin,
"valid_time": valid_time,
"latitude": latitude,
"longitude": longitude,
"wind_speed": maxwind,
"pressure": pd.NA,
"max_wind_radius": pd.NA, # Not in forecast advisory text
"last_closed_isobar_radius": pd.NA,
"last_closed_isobar_pressure": pd.NA,
"gust_speed": gust_speed
if gust_speed is not None
else pd.NA,
"nature": pd.NA,
"quadrant_radius_34": radii_34,
"quadrant_radius_50": radii_50,
"quadrant_radius_64": radii_64,
}
)
# Reset for next forecast point
latitude = longitude = maxwind = valid_time = gust_speed = None
radii_34 = radii_50 = radii_64 = None
try:
valid_time = _parse_valid_time(forecast_line[2], issuance)
latitude = parse_lat_lon(forecast_line[3])
longitude = parse_lat_lon(forecast_line[4])
except Exception as e:
logger.debug(
f"Could not parse position from: {ln[:80]} | Error: {e}"
)
latitude = longitude = valid_time = None
continue
# Parse REMNANTS OF CENTER LOCATED NEAR lines
elif (
ln.startswith("REMNANTS OF CENTER LOCATED NEAR")
and len(forecast_line) >= 8
):
try:
valid_time = _parse_valid_time(forecast_line[8], issuance)
latitude = parse_lat_lon(forecast_line[5])
longitude = parse_lat_lon(forecast_line[6])
except Exception:
logger.debug(f"Could not parse remnants from: {ln[:50]}")
latitude = longitude = valid_time = None
continue
# Parse MAX WIND lines and optional GUSTS
# Example: "MAX WIND 115 KT GUSTS 140 KT"
if ln.startswith("MAX WIND") and len(forecast_line) >= 3:
try:
maxwind = int(forecast_line[2])
# Check for GUSTS on same line
if "GUSTS" in forecast_line:
gusts_idx = forecast_line.index("GUSTS")
if gusts_idx + 1 < len(forecast_line):
gust_speed = int(forecast_line[gusts_idx + 1])
except ValueError:
logger.debug(f"Could not parse wind from: {ln[:50]}")
maxwind = None
# Parse wind radii lines: "64 KT... 25NE 20SE 15SW 15NW"
# Check original line (before preprocessing removed "...")
if "KT..." in original_ln and any(
original_ln.startswith(f"{kt} KT") for kt in ["64", "50", "34"]
):
try:
# Extract wind threshold (e.g., "64" from ["64", "KT", "50NE", ...])
kt_val = int(forecast_line[0])
# After preprocessing, line is: ["64", "KT", "50NE", "40SE", "35SW", "50NW."]
# Radii values start at index 2
if len(forecast_line) >= 6:
# Extract numerical values from strings like "25NE"
ne = int("".join(filter(str.isdigit, forecast_line[2])))
se = int("".join(filter(str.isdigit, forecast_line[3])))
sw = int("".join(filter(str.isdigit, forecast_line[4])))
nw = int("".join(filter(str.isdigit, forecast_line[5])))
radii_list = [ne, se, sw, nw]
# Assign to appropriate threshold
if kt_val == 64:
radii_64 = radii_list
elif kt_val == 50:
radii_50 = radii_list
elif kt_val == 34:
radii_34 = radii_list
except (ValueError, IndexError) as e:
logger.debug(
f"Could not parse wind radii from: {ln[:50]} - {e}"
)
# Save the last forecast point after loop ends
if all([latitude, longitude, maxwind, valid_time]) and maxwind > 0:
forecast_points.append(
{
"atcf_id": storm_id,
"name": storm_name.upper(),
"basin": basin,
"valid_time": valid_time,
"latitude": latitude,
"longitude": longitude,
"wind_speed": maxwind,
"pressure": pd.NA,
"max_wind_radius": pd.NA, # Not in forecast advisory text
"last_closed_isobar_radius": pd.NA,
"last_closed_isobar_pressure": pd.NA,
"gust_speed": gust_speed if gust_speed is not None else pd.NA,
"nature": pd.NA,
"quadrant_radius_34": radii_34,
"quadrant_radius_50": radii_50,
"quadrant_radius_64": radii_64,
}
)
logger.debug(
f"Parsed {len(forecast_points)} forecast points for {storm_id}"
)
return forecast_points
def _fetch_current_storms_json() -> Optional[dict]:
"""
Fetch current storms JSON from NHC CurrentStorms.json file.
Returns
-------
dict or None
Parsed JSON response, or None if fetch fails
"""
try:
response = requests.get(NHC_CURRENT_STORMS_URL, timeout=10)
response.raise_for_status()
return response.json()
except requests.exceptions.Timeout:
logger.error(f"Timeout fetching {NHC_CURRENT_STORMS_URL}")
return None
except requests.exceptions.RequestException as e:
logger.error(f"Failed to fetch current storms: {e}")
return None
except json.JSONDecodeError as e:
logger.error(f"Failed to parse JSON response: {e}")
return None
def _extract_current_observation(storm_dict: dict) -> dict:
"""
Extract current observation from storm dictionary.
Parameters
----------
storm_dict : dict
Storm dictionary from NHC CurrentStorms.json
Returns
-------
dict
Observation record with standardized fields
"""
# Uppercase basin code to match IBTrACS format (AL012023 not al012023)
atcf_id_raw = storm_dict["id"]
atcf_id = atcf_id_raw[:2].upper() + atcf_id_raw[2:]
basin = _get_basin_code(atcf_id)
provider = _get_provider(atcf_id)
# Parse last update time
last_update = pd.Timestamp(storm_dict["lastUpdate"])
return {
"atcf_id": atcf_id,
"name": storm_dict.get("name").upper(),
"number": atcf_id[2:4], # Extract "09" from "EP092023"
"basin": basin,
"provider": provider,
"issued_time": last_update,
"valid_time": last_update,
"leadtime": 0,
"wind_speed": float(storm_dict.get("intensity", 0)),
"pressure": float(storm_dict.get("pressure", 0))
if storm_dict.get("pressure")
else pd.NA,
"latitude": float(storm_dict.get("latitudeNumeric", 0)),
"longitude": float(storm_dict.get("longitudeNumeric", 0)),
"max_wind_radius": pd.NA, # Not available in CurrentStorms.json
"last_closed_isobar_radius": pd.NA,
"last_closed_isobar_pressure": pd.NA,
"gust_speed": pd.NA,
"nature": pd.NA,
"quadrant_radius_34": None, # Not available in CurrentStorms.json
"quadrant_radius_50": None,
"quadrant_radius_64": None,
}
def _process_nhc_to_df(
raw_data: dict,
include_observations: bool = True,
include_forecasts: bool = True,
) -> pd.DataFrame:
"""
Process raw NHC JSON data to DataFrame.
Extracts current observations and fetches/parses forecast advisories
for each active storm. If no active storms or all fetches fail,
returns an empty DataFrame.
Parameters
----------
raw_data : dict
Parsed JSON from NHC CurrentStorms.json
include_observations : bool, default True
Whether to include current observations
include_forecasts : bool, default True
Whether to fetch and include forecast advisories
Returns
-------
pd.DataFrame
Combined DataFrame with observations and/or forecasts
"""
all_records = []
# Check if there are active storms
active_storms = raw_data.get("activeStorms", [])
if not active_storms:
logger.info("No active storms currently")
return pd.DataFrame(columns=BASE_COLUMNS)
logger.info(f"Processing {len(active_storms)} active storms")
for storm in active_storms:
# Uppercase basin code to match IBTrACS format (AL012023 not al012023)
atcf_id_raw = storm["id"]
atcf_id = atcf_id_raw[:2].upper() + atcf_id_raw[2:]
storm_name = storm.get("name", "Unnamed")
basin = _get_basin_code(atcf_id)
logger.debug(f"Processing storm {atcf_id} ({storm_name})")
# Extract current observation
if include_observations:
try:
obs = _extract_current_observation(storm)
all_records.append(obs)
logger.debug(f"Extracted current observation for {atcf_id}")
except Exception as e:
logger.warning(
f"Failed to extract observation for {atcf_id}: {e}"
)
# Fetch and parse forecast advisory
if include_forecasts:
forecast_advisory = storm.get("forecastAdvisory")
if forecast_advisory:
advisory_url = forecast_advisory.get("url")
issuance = forecast_advisory.get("issuance")
if advisory_url and issuance:
logger.info(
f"Fetching forecast advisory for {atcf_id} from {advisory_url}"
)
advisory_text = _fetch_forecast_advisory(advisory_url)
if advisory_text:
logger.info(
f"Successfully fetched advisory for {atcf_id}, parsing..."
)
forecast_points = _parse_forecast_advisory(
advisory_text, atcf_id, storm_name, issuance, basin
)
logger.info(
f"Parsed {len(forecast_points)} forecast points for {atcf_id}"
)
# Add metadata and leadtime to forecast points
provider = _get_provider(atcf_id)
issued_time = pd.Timestamp(issuance, tz="UTC")
for point in forecast_points:
point["provider"] = provider
point["number"] = atcf_id[2:4]
point["issued_time"] = issued_time
# Calculate leadtime in hours
leadtime = int(
(
point["valid_time"] - issued_time
).total_seconds()
/ 3600
)
point["leadtime"] = leadtime
all_records.extend(forecast_points)
logger.debug(
f"Added {len(forecast_points)} forecast points for {atcf_id}"
)
else:
logger.warning(
f"Failed to fetch advisory text for {atcf_id} from {advisory_url}"
)
else:
logger.debug(
f"Missing advisory URL or issuance for {atcf_id}"
)
else:
logger.debug(f"No forecastAdvisory found for {atcf_id}")
if not all_records:
logger.warning("No records extracted from active storms")
return pd.DataFrame(columns=BASE_COLUMNS)
logger.info(f"Extracted {len(all_records)} total records")
df = pd.DataFrame(all_records)
# Coerce datetime columns to naive UTC. Observation rows (from
# pd.Timestamp(...Z)) and forecast rows (from dateparser +
# relativedelta) can end up with mismatched tz state under some
# pandas/dateutil versions, yielding object dtype and breaking
# downstream .dt accessors. Normalize to UTC and then drop the tz so
# the column is plain datetime64[ns] — matches the storms DB schema
# (TIMESTAMP without time zone) and avoids tz-aware/naive comparison
# mismatches in downstream filters.
for col in ("valid_time", "issued_time"):
if col in df.columns:
df[col] = pd.to_datetime(df[col], utc=True).dt.tz_localize(None)
# Coerce numeric columns. Forecast points set pd.NA for fields not in
# the advisory text (e.g. pressure); mixing pd.NA with floats yields
# object dtype, which pandera can't coerce to float64. pd.to_numeric
# turns pd.NA into NaN for float columns; Int64 columns get the
# nullable-int dtype explicitly so pd.NA is preserved.
for col in ("wind_speed", "pressure"):
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce")
for col in (
"leadtime",
"max_wind_radius",
"last_closed_isobar_radius",
"last_closed_isobar_pressure",
"gust_speed",
):
if col in df.columns:
df[col] = pd.array(df[col], dtype="Int64")
return df
# Public API Functions
def _list_available_atcf_files(year: int, basin: str) -> List[int]:
"""
List available ATCF A-deck files for a given year and basin.
For recent years (current year and previous year), checks aid_public
directory first. Falls back to archive directory for older years.
Parameters
----------
year : int
Year to check
basin : str
Basin code (AL, EP, or CP)
Returns
-------
list of int
Storm numbers that have available files
"""
import re
from datetime import datetime
from ftplib import FTP
basin_lower = basin.lower()
ftp_host = "ftp.nhc.noaa.gov"
# For current/recent years (current year + previous year), check aid_public first
# This handles cases where previous year's data hasn't been archived yet
current_year = datetime.utcnow().year
if year >= current_year - 1:
ftp_path = "/atcf/aid_public/"
source = "aid_public"
else:
ftp_path = f"/atcf/archive/{year}/"
source = "archive"
# Connect to FTP server
with FTP(ftp_host, timeout=30) as ftp:
ftp.login() # Anonymous login
# List files in directory
files = ftp.nlst(ftp_path)
# Pattern: aal012023.dat.gz, aep092023.dat.gz, etc.
pattern = rf"a{basin_lower}(\d{{2}}){year}\.dat\.gz"
storm_numbers = []
for file in files:
# Extract just the filename from full path
filename = file.split("/")[-1]
match = re.match(pattern, filename)
if match:
storm_numbers.append(int(match.group(1)))
storm_numbers = sorted(set(storm_numbers))
logger.info(
f"Found {len(storm_numbers)} {basin} storms in {year} ({source}): "
f"{storm_numbers if len(storm_numbers) <= 10 else f'{storm_numbers[:10]}...'}"
)
return storm_numbers
[docs]
def download_nhc_archive(
year: int,
basin: str = "AL",
cache_dir: str = "storm",
use_cache: bool = True,
) -> List[Path]:
"""
Download ATCF archive files for all storms in a given year and basin.
Queries the FTP server to find all available storms for the specified
year and basin, then downloads only those files. Files are saved
with archive naming: a{basin}{number}{year}.dat
(e.g., aal012023.dat) in the {cache_dir}/raw/atcf/ subdirectory.
For recent years (current year and previous year), files are downloaded
from the aid_public directory. For older years, files are downloaded
from the archive directory.
Parameters
----------
year : int
Year to download (e.g., 2023, 2024, 2025)
basin : str, default "AL"
Basin code: "AL" (Atlantic), "EP" (Eastern Pacific),
or "CP" (Central Pacific)
cache_dir : str, default "storm"
Directory to store downloaded files
use_cache : bool, default True
Whether to use existing cached files if available
Returns
-------
list of Path
Paths to downloaded ATCF files
"""
# Create cache directory
cache_path = Path(cache_dir) / "raw" / "atcf"
os.makedirs(cache_path, exist_ok=True)
# Query FTP server to find available storms
storm_numbers = _list_available_atcf_files(year, basin)
if not storm_numbers:
logger.warning(f"No storms found for {basin} {year}")
return []
downloaded_files = []
basin_lower = basin.lower()
# NOTE: Archival usually happens in the spring, where the a-deck files are moved from aid_public to archive.
# For now we're just simplifying things to assume that we pull data from the previous year from the archive url,
# so this needs to happen after things have been migrated.
from datetime import datetime
current_year = datetime.utcnow().year
if year == current_year:
url_template = ATCF_AID_PUBLIC_URL
else:
url_template = ATCF_ARCHIVE_URL
for num in storm_numbers:
# Construct filename to match archive naming: aal012023.dat
filename = f"a{basin_lower}{num:02d}{year}.dat"
file_path = cache_path / filename
# Check cache
if use_cache and file_path.exists():
logger.debug(f"Using cached file: {file_path}")
downloaded_files.append(file_path)
continue
# Construct download URL
url = url_template.format(year=year, basin=basin_lower, number=num)
logger.debug(f"Downloading from {url}")
try:
# Download file
response = requests.get(url, timeout=30)
response.raise_for_status()
# Decompress gzip content and save
with gzip.open(io.BytesIO(response.content), "rt") as gz_file:
content = gz_file.read()
with open(file_path, "w") as f:
f.write(content)
logger.info(f"Downloaded and decompressed {filename}")
downloaded_files.append(file_path)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
logger.debug(
f"Storm {basin_lower}{num:02d}{year} not found (404)"
)
else:
logger.warning(f"HTTP error downloading {url}: {e}")
except Exception as e:
logger.warning(f"Failed to download {url}: {e}")
if not downloaded_files:
logger.warning(f"No files downloaded for {basin} basin, year {year}")
else:
logger.info(
f"Downloaded {len(downloaded_files)} ATCF files for {basin} {year}"
)
return downloaded_files
def _load_nhc_archive(
year: int,
basin: str = "AL",
cache_dir: str = "storm",
use_cache: bool = True,
) -> Optional[pd.DataFrame]:
"""
Load NHC archive data from ATCF files.
Downloads (if needed) and parses ATCF A-deck files for historical
storm forecast data.
Parameters
----------
year : int
Year to load
basin : str, default "AL"
Basin code (AL, EP, or CP)
cache_dir : str, default "storm"
Directory for cached files
use_cache : bool, default True
Whether to use existing cached files
Returns
-------
pd.DataFrame or None
Combined DataFrame with all storm data, or None if no data
"""
# Download ATCF files
logger.info(f"Loading archive data for {basin} basin, year {year}")
file_paths = download_nhc_archive(
year=year,
basin=basin,
cache_dir=cache_dir,
use_cache=use_cache,
)
if not file_paths:
logger.warning(f"No ATCF files available for {basin} {year}")
return None
# Parse each file and combine
all_dfs = []
for file_path in file_paths:
df = _parse_atcf_adeck(file_path)
if not df.empty:
all_dfs.append(df)
if not all_dfs:
logger.warning(f"No data parsed from {basin} {year} files")
return None
# Combine all DataFrames
combined_df = pd.concat(all_dfs, ignore_index=True)
logger.info(
f"Loaded {len(combined_df)} records from "
f"{len(all_dfs)} storms in {basin} {year}"
)
return combined_df
[docs]
def download_nhc(
cache_dir: str = "storm",
use_cache: bool = False,
) -> Optional[Path]:
"""
Download current NHC storm data in JSON format.
Fetches active storm data from the National Hurricane Center's
CurrentStorms.json API and saves to local cache directory. Files are
named using the latest forecast issuance time.
Parameters
----------
cache_dir : str, default "storm"
Directory to store raw JSON files
use_cache : bool, default False
Whether to use existing cached file if available
Returns
-------
Path or None
Path to downloaded JSON file, None if download failed or no active storms
"""
# Create cache directory
cache_path = Path(cache_dir) / "raw"
os.makedirs(cache_path, exist_ok=True)
# Check if we should use an existing cached file
if use_cache:
# Look for any existing file
existing_files = list(cache_path.glob("nhc_*.json"))
if existing_files:
# Use the most recent file
latest_file = max(existing_files, key=lambda p: p.stat().st_mtime)
logger.info(f"Using cached file: {latest_file}")
return latest_file
# Fetch data from NHC
logger.info("Downloading current storms from NHC")
data = _fetch_current_storms_json()
if data is None:
logger.error("Failed to download NHC data")
return None
# Check if there are active storms
active_storms = data.get("activeStorms", [])
if not active_storms:
logger.info("No active storms currently. Not saving file.")
return None
# Extract latest issuance time from all active storms
latest_issuance = None
for storm in active_storms:
issuance_str = storm.get("lastUpdate")
issuance_dt = pd.Timestamp(issuance_str, tz="UTC")
if latest_issuance is None or issuance_dt > latest_issuance:
latest_issuance = issuance_dt
# Use latest issuance for filename, or fall back to current time
filename = f"nhc_{latest_issuance.strftime('%Y%m%d_%H%M')}.json"
file_path = cache_path / filename
# Save to file (overwrite if exists)
try:
with open(file_path, "w") as f:
json.dump(data, f, indent=2)
logger.info(f"Saved NHC data to {file_path}")
return file_path
except Exception as e:
logger.error(f"Failed to save NHC data: {e}")
return None
[docs]
def load_nhc(
file_path: Optional[str] = None,
cache_dir: str = "storm",
use_cache: bool = False,
year: Optional[int] = None,
basin: Optional[str] = None,
) -> Optional[pd.DataFrame]:
"""
Load and process NHC storm data from CurrentStorms.json or historical archive.
Supports two modes:
1. **Current mode** (default): Downloads current storms from NHC CurrentStorms.json
2. **Archive mode**: Downloads historical ATCF data when year is specified
Parameters
----------
file_path : str, optional
Path to NHC JSON file. If None, downloads data
cache_dir : str, default "storm"
Directory for caching downloaded files
use_cache : bool, default False
Whether to use existing cached file if available
year : int, optional
Year for archive mode (e.g., 2023). If specified, loads
historical ATCF data instead of current json data
basin : str, optional
Basin code for archive mode: "AL" (Atlantic), "EP" (Eastern
Pacific), or "CP" (Central Pacific). If None, loads all basins
Returns
-------
pd.DataFrame or None
DataFrame with combined observations and forecasts, or None if
loading fails
"""
# Archive mode: year is specified
if year is not None:
# If basin is None, load all basins
if basin is None:
logger.info(f"Loading archive data for all basins in {year}")
all_dfs = []
for basin_code in ["AL", "EP", "CP"]:
df = _load_nhc_archive(
year=year,
basin=basin_code,
cache_dir=cache_dir,
use_cache=use_cache,
)
if df is not None and not df.empty:
all_dfs.append(df)
if not all_dfs:
logger.warning(f"No data loaded for any basin in {year}")
return None
combined_df = pd.concat(all_dfs, ignore_index=True)
logger.info(
f"Loaded {len(combined_df)} records from "
f"{len(all_dfs)} basins in {year}"
)
return combined_df
else:
logger.info(f"Loading archive data for {basin} {year}")
return _load_nhc_archive(
year=year,
basin=basin,
cache_dir=cache_dir,
use_cache=use_cache,
)
# Current mode: download from API or load from file
if file_path is None:
logger.info("No file path provided, downloading current storms")
file_path = download_nhc(cache_dir=cache_dir, use_cache=use_cache)
if file_path is None:
logger.error("Failed to download NHC data")
return None
else:
file_path = Path(file_path)
# Load JSON data
try:
with open(file_path, "r") as f:
data = json.load(f)
logger.info(f"Loaded NHC data from {file_path}")
except Exception as e:
logger.error(f"Failed to load JSON from {file_path}: {e}")
return None
# Process to DataFrame
df = _process_nhc_to_df(data)
if df.empty:
logger.warning("No data extracted from NHC JSON")
return df
logger.info(f"Loaded {len(df)} records from NHC")
return df
[docs]
def get_storms(df: pd.DataFrame) -> pd.DataFrame:
"""
Extract storm metadata from NHC DataFrame.
Creates a dataset with one row per storm containing identifying
information. This provides a summary of all storms in the dataset
with their basic metadata.
Parameters
----------
df : pd.DataFrame
DataFrame from load_nhc()
Returns
-------
pd.DataFrame
Storm metadata with schema validation applied
"""
if df.empty:
logger.warning("Input DataFrame is empty")
return pd.DataFrame(columns=list(STORM_SCHEMA.columns.keys()))
df_ = df.copy()
# Calculate season from valid_time (coerce via tz-aware UTC and then
# drop the tz, in case the column came in as object dtype from mixed
# tz-aware / tz-naive rows)
df_["season"] = (
pd.to_datetime(df_["valid_time"], utc=True)
.dt.tz_localize(None)
.dt.year
)
# Group by atcf_id to get one row per storm
df_storms = (
df_.groupby("atcf_id")
.agg(
{
"name": "first",
"number": "first",
"basin": "first",
"season": "first",
"provider": "first",
}
)
.reset_index()
)
# Rename basin to genesis_basin
df_storms = df_storms.rename(columns={"basin": "genesis_basin"})
# Create storm_id for named storms
df_storms["storm_id"] = df_storms.apply(_create_storm_id, axis=1)
logger.info(f"Extracted {len(df_storms)} unique storms")
# Validate against schema
return STORM_SCHEMA.validate(df_storms)
[docs]
def get_tracks(df: pd.DataFrame) -> gpd.GeoDataFrame:
"""
Extract track-level data from NHC DataFrame.
Creates a GeoDataFrame with one row per track point (observation or
forecast), including geometry for spatial analysis.
Parameters
----------
df : pd.DataFrame
DataFrame from load_nhc()
Returns
-------
gpd.GeoDataFrame
"""
if df.empty:
logger.warning("Input DataFrame is empty")
# Return empty GeoDataFrame with correct schema
empty_df = pd.DataFrame(columns=list(TRACK_SCHEMA.columns.keys()))
return gpd.GeoDataFrame(empty_df, geometry="geometry", crs="EPSG:4326")
df_ = df.copy()
# Get storms to merge in storm_id (only select needed columns)
df_storms = get_storms(df)
df_tracks = df_.merge(
df_storms[["atcf_id", "storm_id"]], on="atcf_id", how="left"
)
# Drop any extra columns not in schema (like 'name' from merge)
schema_columns = list(TRACK_SCHEMA.columns.keys())
extra_columns = [
col
for col in df_tracks.columns
if col not in schema_columns
and col != "latitude"
and col != "longitude"
]
if extra_columns:
df_tracks = df_tracks.drop(columns=extra_columns)
# Generate point_id for each record
df_tracks["point_id"] = [str(uuid.uuid4()) for _ in range(len(df_tracks))]
# Normalize longitude to [-180, 180]
df_tracks["longitude"] = df_tracks["longitude"].apply(
lambda lon: lon - 360 if lon > 180 else lon
)
# Convert to GeoDataFrame using utility function
gdf_tracks = _to_gdf(df_tracks)
logger.info(f"Created GeoDataFrame with {len(gdf_tracks)} track points")
# Validate against schema
return TRACK_SCHEMA.validate(gdf_tracks)
# =============================================================================
# NHC Wind Speed Probability (WSP)
# =============================================================================
# GIS archive base URL (5km product, 120-hr cumulative)
# Available 2017-04-19 — present (only issued when storms are active)
GIS_ARCHIVE_INDEX_URL = "https://www.nhc.noaa.gov/gis/forecast/archive/"
GIS_ARCHIVE_ZIP_URL = (
"https://www.nhc.noaa.gov/gis/forecast/archive/{issuance}_wsp_120hr5km.zip"
)
# Maps shapefile PERCENTAGE strings to lower-bound integers (e.g. "5-10%" → 5)
WSP_PERCENTAGE_MAP = {
"<5%": 0,
"5-10%": 5,
"10-20%": 10,
"20-30%": 20,
"30-40%": 30,
"40-50%": 40,
"50-60%": 50,
"60-70%": 60,
"70-80%": 70,
"80-90%": 80,
">90%": 90,
}
WSP_POLYGON_SCHEMA = pa.DataFrameSchema(
{
"issued_time": pa.Column(pd.Timestamp, nullable=False),
"wind_threshold_kt": pa.Column(
int,
pa.Check.isin([34, 50, 64]),
nullable=False,
),
"percentage": pa.Column(
int,
pa.Check.isin(list(WSP_PERCENTAGE_MAP.values())),
nullable=False,
),
"geometry": pa.Column(gpd.array.GeometryDtype, nullable=True),
},
strict=True,
coerce=True,
)
[docs]
def get_wsp(
issued_time: Optional[str] = None,
start: Optional[str] = None,
end: Optional[str] = None,
cache_dir: str = "storm",
use_cache: bool = True,
) -> gpd.GeoDataFrame:
"""
Load NHC 5km wind speed probability polygons.
Three modes:
1. **Current** (default, no arguments): fetches the latest issuance from
``CurrentStorms.json``. Only available when storms are active.
2. **Single issuance**: pass ``issued_time`` as YYYYMMDDHH or ISO timestamp.
3. **Archive range**: pass ``start`` (and optionally ``end``) to fetch all
available issuances in the date range from the NHC GIS archive.
Parameters
----------
issued_time : str, optional
Single issuance timestamp (YYYYMMDDHH or ISO format, e.g. '2023082200').
start : str, optional
Start of date range for archive mode (YYYYMMDDHH or ISO format).
end : str, optional
End of date range. Defaults to now. Only used with ``start``.
cache_dir : str, default "storm"
Directory for cached zip files (used in archive mode).
use_cache : bool, default True
Whether to use cached zip files if available.
Returns
-------
gpd.GeoDataFrame
Columns: issued_time, wind_threshold_kt, percentage, geometry (EPSG:4326).
One row per (issued_time, wind threshold, probability band). Empty if no
data is available.
"""
if start is not None:
return _load_nhc_wsp_archive(
start=pd.Timestamp(start, tz="UTC"),
end=(
pd.Timestamp(end, tz="UTC")
if end
else pd.Timestamp.now(tz="UTC")
),
cache_dir=cache_dir,
use_cache=use_cache,
)
if issued_time is not None:
if len(issued_time) == 10 and issued_time.isdigit():
ts = pd.Timestamp(
datetime.strptime(issued_time, "%Y%m%d%H").replace(
tzinfo=timezone.utc
)
)
else:
ts = pd.Timestamp(issued_time, tz="UTC")
ts_str = ts.strftime("%Y%m%d%H")
url = GIS_ARCHIVE_ZIP_URL.format(issuance=ts_str)
logger.info(f"Fetching WSP shapefile for {ts_str}")
gdf = _load_wsp_from_url(url, ts)
if gdf.empty:
return gdf
return WSP_POLYGON_SCHEMA.validate(gdf)
return _load_nhc_wsp_current()
def _load_nhc_wsp_current() -> gpd.GeoDataFrame:
"""Fetch the latest WSP issuance from the active storm JSON."""
data = _fetch_current_storms_json()
if data is None:
logger.info("No active storms — no WSP product available")
return _empty_wsp_gdf()
active_storms = data.get("activeStorms", [])
# All active storms share the same basin-wide WSP product
gis_meta = next(
(
s["windSpeedProbabilitiesGIS"]
for s in active_storms
if s.get("windSpeedProbabilitiesGIS")
and s["windSpeedProbabilitiesGIS"].get("zipFile5km")
),
None,
)
if gis_meta is None:
logger.info("No active storms — no WSP product available")
return _empty_wsp_gdf()
issued_time = pd.Timestamp(gis_meta["issuance"], tz="UTC")
zip_url = gis_meta["zipFile5km"]
logger.info(f"Fetching current WSP from {zip_url} ({issued_time})")
gdf = _load_wsp_from_url(zip_url, issued_time)
if gdf.empty:
return gdf
return WSP_POLYGON_SCHEMA.validate(gdf)
def _load_nhc_wsp_archive(
start: pd.Timestamp,
end: pd.Timestamp,
cache_dir: str,
use_cache: bool,
) -> gpd.GeoDataFrame:
"""Fetch all available WSP shapefiles in the archive between start and end."""
logger.info(
f"Scanning archive for WSP issuances between {start} and {end}"
)
issuances = _list_wsp_archive_issuances(start, end)
if not issuances:
logger.warning(f"No WSP archive files found between {start} and {end}")
return _empty_wsp_gdf()
logger.info(f"Found {len(issuances)} archive issuances to fetch")
cache_path = Path(cache_dir) / "raw" / "nhc_wsp"
if use_cache:
cache_path.mkdir(parents=True, exist_ok=True)
all_gdfs = []
for ts in issuances:
ts_str = ts.strftime("%Y%m%d%H")
cache_file = cache_path / f"{ts_str}_wsp_120hr5km.zip"
if use_cache and cache_file.exists():
logger.debug(f"Using cached file {cache_file.name}")
zip_bytes = cache_file.read_bytes()
else:
url = GIS_ARCHIVE_ZIP_URL.format(issuance=ts_str)
try:
zip_bytes = _fetch_wsp_zip(url)
except Exception as e:
logger.warning(f"Failed to fetch {ts_str}: {e}")
continue
if use_cache:
cache_file.write_bytes(zip_bytes)
gdf = _parse_wsp_zip(zip_bytes, ts)
if not gdf.empty:
all_gdfs.append(gdf)
if not all_gdfs:
return _empty_wsp_gdf()
combined = gpd.GeoDataFrame(
pd.concat(all_gdfs, ignore_index=True),
geometry="geometry",
crs="EPSG:4326",
)
return WSP_POLYGON_SCHEMA.validate(combined)
def _load_wsp_from_url(
url: str, issued_time: pd.Timestamp
) -> gpd.GeoDataFrame:
"""Fetch a single WSP zip by URL and return parsed polygons."""
zip_bytes = _fetch_wsp_zip(url)
return _parse_wsp_zip(zip_bytes, issued_time)
def _fetch_wsp_zip(url: str) -> bytes:
"""Download a NHC WSP GIS zip and return raw bytes."""
response = requests.get(url, timeout=60)
response.raise_for_status()
return response.content
def _parse_wsp_zip(
zip_bytes: bytes, issued_time: pd.Timestamp
) -> gpd.GeoDataFrame:
"""
Parse a NHC 5km WSP zip into a flat GeoDataFrame.
Each zip contains 3 shapefiles (34, 50, 64 kt), each with up to 11 rows
(one per probability band). Returns all three thresholds combined into a
single GeoDataFrame.
Parameters
----------
zip_bytes : bytes
Raw bytes of a NHC 5km WSP zip file.
issued_time : pd.Timestamp
Issuance timestamp to attach to every row.
Returns
-------
gpd.GeoDataFrame
Columns: issued_time, wind_threshold_kt, percentage, geometry (EPSG:4326).
"""
gdfs = []
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as z:
with tempfile.TemporaryDirectory() as tmp:
z.extractall(tmp)
for kt in [34, 50, 64]:
candidates = [
f
for f in z.namelist()
if f.endswith(".shp") and f"wsp{kt}knt" in f
]
if not candidates:
logger.warning(f"No {kt}kt shapefile found in zip")
continue
gdf = gpd.read_file(os.path.join(tmp, candidates[0])).to_crs(
"EPSG:4326"
)
gdf = gdf[["PERCENTAGE", "geometry"]].rename(
columns={"PERCENTAGE": "percentage"}
)
gdf["percentage"] = gdf["percentage"].map(WSP_PERCENTAGE_MAP)
gdf["issued_time"] = issued_time
gdf["wind_threshold_kt"] = kt
gdfs.append(
gdf[
[
"issued_time",
"wind_threshold_kt",
"percentage",
"geometry",
]
]
)
if not gdfs:
return _empty_wsp_gdf()
return gpd.GeoDataFrame(
pd.concat(gdfs, ignore_index=True),
geometry="geometry",
crs="EPSG:4326",
)
def _list_wsp_archive_issuances(
start: pd.Timestamp,
end: pd.Timestamp,
) -> List[pd.Timestamp]:
"""
Scrape the NHC GIS archive index for available 5km WSP issuances.
Parameters
----------
start : pd.Timestamp
Start of the date range (inclusive).
end : pd.Timestamp
End of the date range (inclusive).
Returns
-------
list of pd.Timestamp
Available issuance timestamps in [start, end], sorted chronologically.
"""
response = requests.get(GIS_ARCHIVE_INDEX_URL, timeout=30)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
issuances = []
for link in soup.find_all("a", href=True):
href = link["href"]
if "_wsp_120hr5km.zip" not in href:
continue
filename = href.split("/")[-1]
ts_str = filename.split("_wsp_")[0]
try:
ts = pd.Timestamp(
datetime.strptime(ts_str, "%Y%m%d%H").replace(
tzinfo=timezone.utc
)
)
except ValueError:
continue
if start <= ts <= end:
issuances.append(ts)
return sorted(issuances)
def _empty_wsp_gdf() -> gpd.GeoDataFrame:
"""Return an empty GeoDataFrame with the WSP polygon schema columns."""
empty_df = pd.DataFrame(columns=list(WSP_POLYGON_SCHEMA.columns.keys()))
return gpd.GeoDataFrame(empty_df, geometry="geometry", crs="EPSG:4326")