Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """The FireServiceRota integration."""
2 
3 from __future__ import annotations
4 
5 from datetime import timedelta
6 import logging
7 
8 from pyfireservicerota import (
9  ExpiredTokenError,
10  FireServiceRota,
11  FireServiceRotaIncidents,
12  InvalidAuthError,
13  InvalidTokenError,
14 )
15 
16 from homeassistant.config_entries import ConfigEntry
17 from homeassistant.const import CONF_TOKEN, CONF_URL, CONF_USERNAME, Platform
18 from homeassistant.core import HomeAssistant
19 from homeassistant.exceptions import ConfigEntryAuthFailed
20 from homeassistant.helpers.dispatcher import dispatcher_send
21 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
22 
23 from .const import DATA_CLIENT, DATA_COORDINATOR, DOMAIN, WSS_BWRURL
24 
25 MIN_TIME_BETWEEN_UPDATES = timedelta(seconds=60)
26 
27 _LOGGER = logging.getLogger(__name__)
28 
29 PLATFORMS = [Platform.BINARY_SENSOR, Platform.SENSOR, Platform.SWITCH]
30 
31 
32 async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
33  """Set up FireServiceRota from a config entry."""
34 
35  hass.data.setdefault(DOMAIN, {})
36 
37  client = FireServiceRotaClient(hass, entry)
38  await client.setup()
39 
40  if client.token_refresh_failure:
41  return False
42 
43  async def async_update_data():
44  return await client.async_update()
45 
46  coordinator = DataUpdateCoordinator(
47  hass,
48  _LOGGER,
49  config_entry=entry,
50  name="duty binary sensor",
51  update_method=async_update_data,
52  update_interval=MIN_TIME_BETWEEN_UPDATES,
53  )
54 
55  await coordinator.async_config_entry_first_refresh()
56 
57  hass.data[DOMAIN][entry.entry_id] = {
58  DATA_CLIENT: client,
59  DATA_COORDINATOR: coordinator,
60  }
61 
62  await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
63 
64  return True
65 
66 
67 async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
68  """Unload FireServiceRota config entry."""
69 
70  await hass.async_add_executor_job(
71  hass.data[DOMAIN][entry.entry_id].websocket.stop_listener
72  )
73  unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
74  if unload_ok:
75  del hass.data[DOMAIN][entry.entry_id]
76  return unload_ok
77 
78 
80  """Handle authentication tokens."""
81 
82  def __init__(self, hass, entry, fsr):
83  """Initialize the oauth object."""
84  self._hass_hass = hass
85  self._entry_entry = entry
86 
87  self._url_url = entry.data[CONF_URL]
88  self._username_username = entry.data[CONF_USERNAME]
89  self._fsr_fsr = fsr
90 
91  async def async_refresh_tokens(self) -> bool:
92  """Refresh tokens and update config entry."""
93  _LOGGER.debug("Refreshing authentication tokens after expiration")
94 
95  try:
96  token_info = await self._hass_hass.async_add_executor_job(
97  self._fsr_fsr.refresh_tokens
98  )
99 
100  except (InvalidAuthError, InvalidTokenError) as err:
101  raise ConfigEntryAuthFailed(
102  "Error refreshing tokens, triggered reauth workflow"
103  ) from err
104 
105  _LOGGER.debug("Saving new tokens in config entry")
106  self._hass_hass.config_entries.async_update_entry(
107  self._entry_entry,
108  data={
109  "auth_implementation": DOMAIN,
110  CONF_URL: self._url_url,
111  CONF_USERNAME: self._username_username,
112  CONF_TOKEN: token_info,
113  },
114  )
115 
116  return True
117 
118 
120  """Define a FireServiceRota websocket manager object."""
121 
122  def __init__(self, hass, entry):
123  """Initialize the websocket object."""
124  self._hass_hass = hass
125  self._entry_entry = entry
126 
127  self._fsr_incidents_fsr_incidents = FireServiceRotaIncidents(on_incident=self._on_incident_on_incident)
128  self.incident_dataincident_data = None
129 
130  def _construct_url(self) -> str:
131  """Return URL with latest access token."""
132  return WSS_BWRURL.format(
133  self._entry_entry.data[CONF_URL], self._entry_entry.data[CONF_TOKEN]["access_token"]
134  )
135 
136  def _on_incident(self, data) -> None:
137  """Received new incident, update data."""
138  _LOGGER.debug("Received new incident via websocket: %s", data)
139  self.incident_dataincident_data = data
140  dispatcher_send(self._hass_hass, f"{DOMAIN}_{self._entry.entry_id}_update")
141 
142  def start_listener(self) -> None:
143  """Start the websocket listener."""
144  _LOGGER.debug("Starting incidents listener")
145  self._fsr_incidents_fsr_incidents.start(self._construct_url_construct_url())
146 
147  def stop_listener(self) -> None:
148  """Stop the websocket listener."""
149  _LOGGER.debug("Stopping incidents listener")
150  self._fsr_incidents_fsr_incidents.stop()
151 
152 
154  """Getting the latest data from fireservicerota."""
155 
156  def __init__(self, hass, entry):
157  """Initialize the data object."""
158  self._hass_hass = hass
159  self._entry_entry = entry
160 
161  self._url_url = entry.data[CONF_URL]
162  self._tokens_tokens = entry.data[CONF_TOKEN]
163 
164  self.entry_identry_id = entry.entry_id
165  self.unique_idunique_id = entry.unique_id
166 
167  self.token_refresh_failuretoken_refresh_failure = False
168  self.incident_idincident_id = None
169  self.on_dutyon_duty = False
170 
171  self.fsrfsr = FireServiceRota(base_url=self._url_url, token_info=self._tokens_tokens)
172 
174  self._hass_hass,
175  self._entry_entry,
176  self.fsrfsr,
177  )
178 
179  self.websocketwebsocket = FireServiceRotaWebSocket(self._hass_hass, self._entry_entry)
180 
181  async def setup(self) -> None:
182  """Set up the data client."""
183  await self._hass_hass.async_add_executor_job(self.websocketwebsocket.start_listener)
184 
185  async def update_call(self, func, *args):
186  """Perform update call and return data."""
187  if self.token_refresh_failuretoken_refresh_failure:
188  return None
189 
190  try:
191  return await self._hass_hass.async_add_executor_job(func, *args)
192  except (ExpiredTokenError, InvalidTokenError):
193  await self._hass_hass.async_add_executor_job(self.websocketwebsocket.stop_listener)
194  self.token_refresh_failuretoken_refresh_failure = True
195 
196  if await self.oauthoauth.async_refresh_tokens():
197  self.token_refresh_failuretoken_refresh_failure = False
198  await self._hass_hass.async_add_executor_job(self.websocketwebsocket.start_listener)
199 
200  return await self._hass_hass.async_add_executor_job(func, *args)
201 
202  async def async_update(self) -> dict | None:
203  """Get the latest availability data."""
204  data = await self.update_callupdate_call(
205  self.fsrfsr.get_availability, str(self._hass_hass.config.time_zone)
206  )
207 
208  if not data:
209  return None
210 
211  self.on_dutyon_duty = bool(data.get("available"))
212 
213  _LOGGER.debug("Updated availability data: %s", data)
214  return data
215 
216  async def async_response_update(self) -> dict | None:
217  """Get the latest incident response data."""
218 
219  if not self.incident_idincident_id:
220  return None
221 
222  _LOGGER.debug("Updating response data for incident id %s", self.incident_idincident_id)
223 
224  return await self.update_callupdate_call(self.fsrfsr.get_incident_response, self.incident_idincident_id)
225 
226  async def async_set_response(self, value) -> None:
227  """Set incident response status."""
228 
229  if not self.incident_idincident_id:
230  return
231 
232  _LOGGER.debug(
233  "Setting incident response for incident id '%s' to state '%s'",
234  self.incident_idincident_id,
235  value,
236  )
237 
238  await self.update_callupdate_call(self.fsrfsr.set_incident_response, self.incident_idincident_id, value)
bool async_setup_entry(HomeAssistant hass, ConfigEntry entry)
Definition: __init__.py:32
bool async_unload_entry(HomeAssistant hass, ConfigEntry entry)
Definition: __init__.py:67
None dispatcher_send(HomeAssistant hass, str signal, *Any args)
Definition: dispatcher.py:137