Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Coordinator for imap integration."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections.abc import Mapping
7 from datetime import datetime, timedelta
8 import email
9 from email.header import decode_header, make_header
10 from email.message import Message
11 from email.utils import parseaddr, parsedate_to_datetime
12 import logging
13 from typing import TYPE_CHECKING, Any
14 
15 from aioimaplib import AUTH, IMAP4_SSL, NONAUTH, SELECTED, AioImapException
16 
17 from homeassistant.config_entries import ConfigEntry
18 from homeassistant.const import (
19  CONF_PASSWORD,
20  CONF_PORT,
21  CONF_USERNAME,
22  CONF_VERIFY_SSL,
23  CONTENT_TYPE_TEXT_PLAIN,
24 )
25 from homeassistant.core import HomeAssistant
26 from homeassistant.exceptions import (
27  ConfigEntryAuthFailed,
28  ConfigEntryError,
29  TemplateError,
30 )
31 from homeassistant.helpers.json import json_bytes
32 from homeassistant.helpers.template import Template
33 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
34 from homeassistant.util import dt as dt_util
35 from homeassistant.util.ssl import (
36  SSLCipherList,
37  client_context,
38  create_no_verify_ssl_context,
39 )
40 
41 from .const import (
42  CONF_CHARSET,
43  CONF_CUSTOM_EVENT_DATA_TEMPLATE,
44  CONF_EVENT_MESSAGE_DATA,
45  CONF_FOLDER,
46  CONF_MAX_MESSAGE_SIZE,
47  CONF_SEARCH,
48  CONF_SERVER,
49  CONF_SSL_CIPHER_LIST,
50  DEFAULT_MAX_MESSAGE_SIZE,
51  DOMAIN,
52  MESSAGE_DATA_OPTIONS,
53 )
54 from .errors import InvalidAuth, InvalidFolder
55 
56 _LOGGER = logging.getLogger(__name__)
57 
58 BACKOFF_TIME = 10
59 
60 EVENT_IMAP = "imap_content"
61 MAX_ERRORS = 3
62 MAX_EVENT_DATA_BYTES = 32168
63 
64 DIAGNOSTICS_ATTRIBUTES = ["date", "initial"]
65 
66 
67 async def connect_to_server(data: Mapping[str, Any]) -> IMAP4_SSL:
68  """Connect to imap server and return client."""
69  ssl_cipher_list: str = data.get(CONF_SSL_CIPHER_LIST, SSLCipherList.PYTHON_DEFAULT)
70  if data.get(CONF_VERIFY_SSL, True):
71  ssl_context = client_context(ssl_cipher_list=SSLCipherList(ssl_cipher_list))
72  else:
73  ssl_context = create_no_verify_ssl_context()
74  client = IMAP4_SSL(data[CONF_SERVER], data[CONF_PORT], ssl_context=ssl_context)
75  _LOGGER.debug(
76  "Wait for hello message from server %s on port %s, verify_ssl: %s",
77  data[CONF_SERVER],
78  data[CONF_PORT],
79  data.get(CONF_VERIFY_SSL, True),
80  )
81  await client.wait_hello_from_server()
82  if client.protocol.state == NONAUTH:
83  _LOGGER.debug(
84  "Authenticating with %s on server %s",
85  data[CONF_USERNAME],
86  data[CONF_SERVER],
87  )
88  await client.login(data[CONF_USERNAME], data[CONF_PASSWORD])
89  if client.protocol.state not in {AUTH, SELECTED}:
90  raise InvalidAuth("Invalid username or password")
91  if client.protocol.state == AUTH:
92  _LOGGER.debug(
93  "Selecting mail folder %s on server %s",
94  data[CONF_FOLDER],
95  data[CONF_SERVER],
96  )
97  await client.select(data[CONF_FOLDER])
98  if client.protocol.state != SELECTED:
99  raise InvalidFolder(f"Folder {data[CONF_FOLDER]} is invalid")
100  return client
101 
102 
104  """Class to parse an RFC822 email message."""
105 
106  def __init__(self, raw_message: bytes) -> None:
107  """Initialize IMAP message."""
108  self.email_messageemail_message = email.message_from_bytes(raw_message)
109 
110  @staticmethod
111  def _decode_payload(part: Message) -> str:
112  """Try to decode text payloads.
113 
114  Common text encodings are quoted-printable or base64.
115  Falls back to the raw content part if decoding fails.
116  """
117  try:
118  decoded_payload: Any = part.get_payload(decode=True)
119  if TYPE_CHECKING:
120  assert isinstance(decoded_payload, bytes)
121  content_charset = part.get_content_charset() or "utf-8"
122  return decoded_payload.decode(content_charset)
123  except ValueError:
124  # return undecoded payload
125  return str(part.get_payload())
126 
127  @property
128  def headers(self) -> dict[str, tuple[str, ...]]:
129  """Get the email headers."""
130  header_base: dict[str, tuple[str, ...]] = {}
131  for key, value in self.email_messageemail_message.items():
132  header_instances: tuple[str, ...] = (str(value),)
133  if header_base.setdefault(key, header_instances) != header_instances:
134  header_base[key] += header_instances
135  return header_base
136 
137  @property
138  def message_id(self) -> str | None:
139  """Get the message ID."""
140  value: str
141  for header, value in self.email_messageemail_message.items():
142  if header == "Message-ID":
143  return value
144  return None
145 
146  @property
147  def date(self) -> datetime | None:
148  """Get the date the email was sent."""
149  # See https://www.rfc-editor.org/rfc/rfc2822#section-3.3
150  date_str: str | None
151  if (date_str := self.email_messageemail_message["Date"]) is None:
152  return None
153  try:
154  mail_dt_tm = parsedate_to_datetime(date_str)
155  except ValueError:
156  _LOGGER.debug(
157  "Parsed date %s is not compliant with rfc2822#section-3.3", date_str
158  )
159  return None
160  return mail_dt_tm
161 
162  @property
163  def sender(self) -> str:
164  """Get the parsed message sender from the email."""
165  return str(parseaddr(self.email_messageemail_message["From"])[1])
166 
167  @property
168  def subject(self) -> str:
169  """Decode the message subject."""
170  decoded_header = decode_header(self.email_messageemail_message["Subject"] or "")
171  subject_header = make_header(decoded_header)
172  return str(subject_header)
173 
174  @property
175  def text(self) -> str:
176  """Get the message text from the email.
177 
178  Will look for text/plain or use/ text/html if not found.
179  """
180  message_text: str | None = None
181  message_html: str | None = None
182  message_untyped_text: str | None = None
183 
184  part: Message
185  for part in self.email_messageemail_message.walk():
186  if part.get_content_type() == CONTENT_TYPE_TEXT_PLAIN:
187  if message_text is None:
188  message_text = self._decode_payload_decode_payload(part)
189  elif part.get_content_type() == "text/html":
190  if message_html is None:
191  message_html = self._decode_payload_decode_payload(part)
192  elif (
193  part.get_content_type().startswith("text")
194  and message_untyped_text is None
195  ):
196  message_untyped_text = str(part.get_payload())
197 
198  if message_text is not None and message_text.strip():
199  return message_text
200 
201  if message_html:
202  return message_html
203 
204  if message_untyped_text:
205  return message_untyped_text
206 
207  return str(self.email_messageemail_message.get_payload())
208 
209 
211  """Base class for imap client."""
212 
213  config_entry: ConfigEntry
214  custom_event_template: Template | None
215 
216  def __init__(
217  self,
218  hass: HomeAssistant,
219  imap_client: IMAP4_SSL,
220  entry: ConfigEntry,
221  update_interval: timedelta | None,
222  ) -> None:
223  """Initiate imap client."""
224  self.imap_clientimap_client = imap_client
225  self.auth_errors: int = 0
226  self._last_message_uid_last_message_uid: str | None = None
227  self._last_message_id_last_message_id: str | None = None
228  self.custom_event_templatecustom_event_template = None
229  self._diagnostics_data: dict[str, Any] = {}
230  self._event_data_keys: list[str] = entry.data.get(
231  CONF_EVENT_MESSAGE_DATA, MESSAGE_DATA_OPTIONS
232  )
233  self._max_event_size: int = entry.data.get(
234  CONF_MAX_MESSAGE_SIZE, DEFAULT_MAX_MESSAGE_SIZE
235  )
236  _custom_event_template = entry.data.get(CONF_CUSTOM_EVENT_DATA_TEMPLATE)
237  if _custom_event_template is not None:
238  self.custom_event_templatecustom_event_template = Template(_custom_event_template, hass=hass)
239  super().__init__(
240  hass,
241  _LOGGER,
242  name=DOMAIN,
243  update_interval=update_interval,
244  )
245 
246  async def async_start(self) -> None:
247  """Start coordinator."""
248 
249  async def _async_reconnect_if_needed(self) -> None:
250  """Connect to imap server."""
251  if self.imap_client is None:
252  self.imap_client = await connect_to_server(self.config_entry.data)
253 
254  async def _async_process_event(self, last_message_uid: str) -> None:
255  """Send a event for the last message if the last message was changed."""
256  response = await self.imap_clientimap_client.fetch(last_message_uid, "BODY.PEEK[]")
257  if response.result == "OK":
258  message = ImapMessage(response.lines[1])
259  # Set `initial` to `False` if the last message is triggered again
260  initial: bool = True
261  if (message_id := message.message_id) == self._last_message_id_last_message_id:
262  initial = False
263  self._last_message_id_last_message_id = message_id
264  data = {
265  "entry_id": self.config_entryconfig_entry.entry_id,
266  "server": self.config_entryconfig_entry.data[CONF_SERVER],
267  "username": self.config_entryconfig_entry.data[CONF_USERNAME],
268  "search": self.config_entryconfig_entry.data[CONF_SEARCH],
269  "folder": self.config_entryconfig_entry.data[CONF_FOLDER],
270  "initial": initial,
271  "date": message.date,
272  "sender": message.sender,
273  "subject": message.subject,
274  "uid": last_message_uid,
275  }
276  data.update({key: getattr(message, key) for key in self._event_data_keys})
277  if self.custom_event_templatecustom_event_template is not None:
278  try:
279  data["custom"] = self.custom_event_templatecustom_event_template.async_render(
280  data, parse_result=True
281  )
282  _LOGGER.debug(
283  "IMAP custom template (%s) for msguid %s (%s) rendered to: %s, initial: %s",
284  self.custom_event_templatecustom_event_template,
285  last_message_uid,
286  message_id,
287  data["custom"],
288  initial,
289  )
290  except TemplateError as err:
291  data["custom"] = None
292  _LOGGER.error(
293  "Error rendering IMAP custom template (%s) for msguid %s "
294  "failed with message: %s",
295  self.custom_event_templatecustom_event_template,
296  last_message_uid,
297  err,
298  )
299  if "text" in data:
300  data["text"] = message.text[: self._max_event_size]
301  self._update_diagnostics_update_diagnostics(data)
302  if (size := len(json_bytes(data))) > MAX_EVENT_DATA_BYTES:
303  _LOGGER.warning(
304  "Custom imap_content event skipped, size (%s) exceeds "
305  "the maximal event size (%s), sender: %s, subject: %s",
306  size,
307  MAX_EVENT_DATA_BYTES,
308  message.sender,
309  message.subject,
310  )
311  return
312 
313  self.hasshass.bus.fire(EVENT_IMAP, data)
314  _LOGGER.debug(
315  "Message with id %s (%s) processed, sender: %s, subject: %s, initial: %s",
316  last_message_uid,
317  message_id,
318  message.sender,
319  message.subject,
320  initial,
321  )
322 
323  async def _async_fetch_number_of_messages(self) -> int | None:
324  """Fetch last message and messages count."""
325  await self._async_reconnect_if_needed_async_reconnect_if_needed()
326  await self.imap_clientimap_client.noop()
327  result, lines = await self.imap_clientimap_client.search(
328  self.config_entryconfig_entry.data[CONF_SEARCH],
329  charset=self.config_entryconfig_entry.data[CONF_CHARSET],
330  )
331  if result != "OK":
332  raise UpdateFailed(
333  f"Invalid response for search '{self.config_entry.data[CONF_SEARCH]}': {result} / {lines[0]}"
334  )
335  # Check we do have returned items.
336  #
337  # In rare cases, when no UID's are returned,
338  # only the status line is returned, and not an empty line.
339  # See: https://github.com/home-assistant/core/issues/132042
340  #
341  # Strictly the RfC notes that 0 or more numbers should be returned
342  # delimited by a space.
343  #
344  # See: https://datatracker.ietf.org/doc/html/rfc3501#section-7.2.5
345  if len(lines) == 1 or not (count := len(message_ids := lines[0].split())):
346  self._last_message_uid_last_message_uid = None
347  return 0
348  last_message_uid = (
349  str(message_ids[-1:][0], encoding=self.config_entryconfig_entry.data[CONF_CHARSET])
350  if count
351  else None
352  )
353  if (
354  count
355  and last_message_uid is not None
356  and self._last_message_uid_last_message_uid != last_message_uid
357  ):
358  self._last_message_uid_last_message_uid = last_message_uid
359  await self._async_process_event_async_process_event(last_message_uid)
360 
361  return count
362 
363  async def _cleanup(self, log_error: bool = False) -> None:
364  """Close resources."""
365  if self.imap_clientimap_client:
366  try:
367  if self.imap_clientimap_client.has_pending_idle():
368  self.imap_clientimap_client.idle_done()
369  await self.imap_clientimap_client.stop_wait_server_push()
370  await self.imap_clientimap_client.close()
371  await self.imap_clientimap_client.logout()
372  except (AioImapException, TimeoutError):
373  if log_error:
374  _LOGGER.debug("Error while cleaning up imap connection")
375  finally:
376  self.imap_clientimap_client = None
377 
378  async def shutdown(self, *_: Any) -> None:
379  """Close resources."""
380  await self._cleanup_cleanup(log_error=True)
381 
382  def _update_diagnostics(self, data: dict[str, Any]) -> None:
383  """Update the diagnostics."""
384  self._diagnostics_data.update(
385  {key: value for key, value in data.items() if key in DIAGNOSTICS_ATTRIBUTES}
386  )
387  custom: Any | None = data.get("custom")
388  self._diagnostics_data["custom_template_data_type"] = str(type(custom))
389  self._diagnostics_data["custom_template_result_length"] = (
390  None if custom is None else len(f"{custom}")
391  )
392  self._diagnostics_data["event_time"] = dt_util.now().isoformat()
393 
394  @property
395  def diagnostics_data(self) -> dict[str, Any]:
396  """Return diagnostics info."""
397  return self._diagnostics_data
398 
399 
401  """Class for imap client."""
402 
403  def __init__(
404  self, hass: HomeAssistant, imap_client: IMAP4_SSL, entry: ConfigEntry
405  ) -> None:
406  """Initiate imap client."""
407  _LOGGER.debug(
408  "Connected to server %s using IMAP polling", entry.data[CONF_SERVER]
409  )
410  super().__init__(hass, imap_client, entry, timedelta(seconds=10))
411 
412  async def _async_update_data(self) -> int | None:
413  """Update the number of unread emails."""
414  try:
415  messages = await self._async_fetch_number_of_messages_async_fetch_number_of_messages()
416  except (
417  AioImapException,
418  UpdateFailed,
419  TimeoutError,
420  ) as ex:
421  await self._cleanup_cleanup()
422  self.async_set_update_errorasync_set_update_error(ex)
423  raise UpdateFailed from ex
424  except InvalidFolder as ex:
425  _LOGGER.warning("Selected mailbox folder is invalid")
426  await self._cleanup_cleanup()
427  self.async_set_update_errorasync_set_update_error(ex)
428  raise ConfigEntryError("Selected mailbox folder is invalid.") from ex
429  except InvalidAuth as ex:
430  await self._cleanup_cleanup()
431  self.auth_errorsauth_errors += 1
432  if self.auth_errorsauth_errors <= MAX_ERRORS:
433  _LOGGER.warning("Authentication failed, retrying")
434  else:
435  _LOGGER.warning(
436  "Username or password incorrect, starting reauthentication"
437  )
438  self.config_entryconfig_entry.async_start_reauth(self.hasshass)
439  self.async_set_update_errorasync_set_update_error(ex)
440  raise ConfigEntryAuthFailed from ex
441 
442  self.auth_errorsauth_errors = 0
443  return messages
444 
445 
447  """Class for imap client."""
448 
449  def __init__(
450  self, hass: HomeAssistant, imap_client: IMAP4_SSL, entry: ConfigEntry
451  ) -> None:
452  """Initiate imap client."""
453  _LOGGER.debug("Connected to server %s using IMAP push", entry.data[CONF_SERVER])
454  super().__init__(hass, imap_client, entry, None)
455  self._push_wait_task_push_wait_task: asyncio.Task[None] | None = None
456  self.number_of_messagesnumber_of_messages: int | None = None
457 
458  async def _async_update_data(self) -> int | None:
459  """Update the number of unread emails."""
460  await self.async_startasync_startasync_start()
461  return self.number_of_messagesnumber_of_messages
462 
463  async def async_start(self) -> None:
464  """Start coordinator."""
465  self._push_wait_task_push_wait_task = self.hasshass.async_create_background_task(
466  self._async_wait_push_loop_async_wait_push_loop(), "Wait for IMAP data push"
467  )
468 
469  async def _async_wait_push_loop(self) -> None:
470  """Wait for data push from server."""
471  while True:
472  try:
473  self.number_of_messagesnumber_of_messages = await self._async_fetch_number_of_messages_async_fetch_number_of_messages()
474  except InvalidAuth as ex:
475  self.auth_errorsauth_errors += 1
476  await self._cleanup_cleanup()
477  if self.auth_errorsauth_errors <= MAX_ERRORS:
478  _LOGGER.warning("Authentication failed, retrying")
479  else:
480  _LOGGER.warning(
481  "Username or password incorrect, starting reauthentication"
482  )
483  self.config_entryconfig_entry.async_start_reauth(self.hasshass)
484  self.async_set_update_errorasync_set_update_error(ex)
485  await asyncio.sleep(BACKOFF_TIME)
486  except InvalidFolder as ex:
487  _LOGGER.warning("Selected mailbox folder is invalid")
488  await self._cleanup_cleanup()
489  self.async_set_update_errorasync_set_update_error(ex)
490  await asyncio.sleep(BACKOFF_TIME)
491  continue
492  except (
493  UpdateFailed,
494  AioImapException,
495  TimeoutError,
496  ) as ex:
497  await self._cleanup_cleanup()
498  self.async_set_update_errorasync_set_update_error(ex)
499  await asyncio.sleep(BACKOFF_TIME)
500  continue
501  else:
502  self.auth_errorsauth_errors = 0
503  self.async_set_updated_dataasync_set_updated_data(self.number_of_messagesnumber_of_messages)
504  try:
505  idle: asyncio.Future = await self.imap_clientimap_client.idle_start()
506  await self.imap_clientimap_client.wait_server_push()
507  self.imap_clientimap_client.idle_done()
508  async with asyncio.timeout(10):
509  await idle
510 
511  except (AioImapException, TimeoutError):
512  _LOGGER.debug(
513  "Lost %s (will attempt to reconnect after %s s)",
514  self.config_entryconfig_entry.data[CONF_SERVER],
515  BACKOFF_TIME,
516  )
517  await self._cleanup_cleanup()
518  await asyncio.sleep(BACKOFF_TIME)
519 
520  async def shutdown(self, *_: Any) -> None:
521  """Close resources."""
522  if self._push_wait_task_push_wait_task:
523  self._push_wait_task_push_wait_task.cancel()
524  await super().shutdown()
None __init__(self, HomeAssistant hass, IMAP4_SSL imap_client, ConfigEntry entry, timedelta|None update_interval)
Definition: coordinator.py:222
None __init__(self, HomeAssistant hass, IMAP4_SSL imap_client, ConfigEntry entry)
Definition: coordinator.py:405
None __init__(self, HomeAssistant hass, IMAP4_SSL imap_client, ConfigEntry entry)
Definition: coordinator.py:451
IMAP4_SSL connect_to_server(Mapping[str, Any] data)
Definition: coordinator.py:67
IssData update(pyiss.ISS iss)
Definition: __init__.py:33
ssl.SSLContext client_context(SSLCipherList ssl_cipher_list=SSLCipherList.PYTHON_DEFAULT)
Definition: ssl.py:137
ssl.SSLContext create_no_verify_ssl_context(SSLCipherList ssl_cipher_list=SSLCipherList.PYTHON_DEFAULT)
Definition: ssl.py:144