1 """The National Weather Service integration."""
3 from __future__
import annotations
5 from collections.abc
import Awaitable, Callable
6 from dataclasses
import dataclass
10 from pynws
import NwsNoDataError, SimpleNWS, call_with_retry
19 TimestampDataUpdateCoordinator,
26 DEFAULT_SCAN_INTERVAL,
31 from .coordinator
import NWSObservationDataUpdateCoordinator
33 _LOGGER = logging.getLogger(__name__)
35 PLATFORMS = [Platform.SENSOR, Platform.WEATHER]
37 type NWSConfigEntry = ConfigEntry[NWSData]
41 """Return unique id for entries in configuration."""
42 return f
"{latitude}_{longitude}"
47 """Data for the National Weather Service integration."""
50 coordinator_observation: NWSObservationDataUpdateCoordinator
51 coordinator_forecast: TimestampDataUpdateCoordinator[
None]
52 coordinator_forecast_hourly: TimestampDataUpdateCoordinator[
None]
56 """Set up a National Weather Service entry."""
57 latitude = entry.data[CONF_LATITUDE]
58 longitude = entry.data[CONF_LONGITUDE]
59 api_key = entry.data[CONF_API_KEY]
60 station = entry.data[CONF_STATION]
65 nws_data = SimpleNWS(latitude, longitude, api_key, client_session)
66 await nws_data.set_station(station)
68 def async_setup_update_forecast(
69 retry_interval: datetime.timedelta | float,
70 retry_stop: datetime.timedelta | float,
71 ) -> Callable[[], Awaitable[
None]]:
72 async
def update_forecast() -> None:
73 """Retrieve forecast."""
75 await call_with_retry(
76 nws_data.update_forecast,
81 except NwsNoDataError
as err:
84 return update_forecast
86 def async_setup_update_forecast_hourly(
87 retry_interval: datetime.timedelta | float,
88 retry_stop: datetime.timedelta | float,
89 ) -> Callable[[], Awaitable[
None]]:
90 async
def update_forecast_hourly() -> None:
91 """Retrieve forecast hourly."""
93 await call_with_retry(
94 nws_data.update_forecast_hourly,
99 except NwsNoDataError
as err:
102 return update_forecast_hourly
114 name=f
"NWS forecast station {station}",
115 update_method=async_setup_update_forecast(0, 0),
116 update_interval=DEFAULT_SCAN_INTERVAL,
117 request_refresh_debouncer=debounce.Debouncer(
118 hass, _LOGGER, cooldown=DEBOUNCE_TIME, immediate=
True
126 name=f
"NWS forecast hourly station {station}",
127 update_method=async_setup_update_forecast_hourly(0, 0),
128 update_interval=DEFAULT_SCAN_INTERVAL,
129 request_refresh_debouncer=debounce.Debouncer(
130 hass, _LOGGER, cooldown=DEBOUNCE_TIME, immediate=
True
135 coordinator_observation,
136 coordinator_forecast,
137 coordinator_forecast_hourly,
141 await coordinator_observation.async_refresh()
142 await coordinator_forecast.async_refresh()
143 await coordinator_forecast_hourly.async_refresh()
146 coordinator_forecast.update_method = async_setup_update_forecast(
147 RETRY_INTERVAL, RETRY_STOP
149 coordinator_forecast_hourly.update_method = async_setup_update_forecast_hourly(
150 RETRY_INTERVAL, RETRY_STOP
153 await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
159 """Unload a config entry."""
160 return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
164 """Return device registry information."""
166 entry_type=DeviceEntryType.SERVICE,
168 manufacturer=
"National Weather Service",
169 name=f
"NWS: {latitude}, {longitude}",
bool async_unload_entry(HomeAssistant hass, NWSConfigEntry entry)
bool async_setup_entry(HomeAssistant hass, NWSConfigEntry entry)
DeviceInfo device_info(float latitude, float longitude)
str base_unique_id(float latitude, float longitude)
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)