Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Data coordinator for monarch money."""
2 
3 import asyncio
4 from dataclasses import dataclass
5 from datetime import datetime, timedelta
6 
7 from aiohttp import ClientResponseError
8 from gql.transport.exceptions import TransportServerError
9 from monarchmoney import LoginFailedException
10 from typedmonarchmoney import TypedMonarchMoney
11 from typedmonarchmoney.models import (
12  MonarchAccount,
13  MonarchCashflowSummary,
14  MonarchSubscription,
15 )
16 
17 from homeassistant.config_entries import ConfigEntry
18 from homeassistant.core import HomeAssistant
19 from homeassistant.exceptions import ConfigEntryError
20 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
21 
22 from .const import LOGGER
23 
24 
25 @dataclass
27  """Data class to hold monarch data."""
28 
29  account_data: dict[str, MonarchAccount]
30  cashflow_summary: MonarchCashflowSummary
31 
32 
34  """Data update coordinator for Monarch Money."""
35 
36  config_entry: ConfigEntry
37  subscription_id: str
38 
39  def __init__(
40  self,
41  hass: HomeAssistant,
42  client: TypedMonarchMoney,
43  ) -> None:
44  """Initialize the coordinator."""
45  super().__init__(
46  hass=hass,
47  logger=LOGGER,
48  name="monarchmoney",
49  update_interval=timedelta(hours=4),
50  )
51  self.clientclient = client
52 
53  async def _async_setup(self) -> None:
54  """Obtain subscription ID in setup phase."""
55  try:
56  sub_details: MonarchSubscription = (
57  await self.clientclient.get_subscription_details()
58  )
59  except (TransportServerError, LoginFailedException, ClientResponseError) as err:
60  raise ConfigEntryError("Authentication failed") from err
61  self.subscription_idsubscription_id = sub_details.id
62 
63  async def _async_update_data(self) -> MonarchData:
64  """Fetch data for all accounts."""
65 
66  now = datetime.now()
67 
68  account_data, cashflow_summary = await asyncio.gather(
69  self.clientclient.get_accounts_as_dict_with_id_key(),
70  self.clientclient.get_cashflow_summary(
71  start_date=f"{now.year}-01-01", end_date=f"{now.year}-12-31"
72  ),
73  )
74 
75  return MonarchData(account_data=account_data, cashflow_summary=cashflow_summary)
76 
77  @property
78  def cashflow_summary(self) -> MonarchCashflowSummary:
79  """Return cashflow summary."""
80  return self.datadata.cashflow_summary
81 
82  @property
83  def accounts(self) -> list[MonarchAccount]:
84  """Return accounts."""
85  return list(self.datadata.account_data.values())
86 
87  @property
88  def value_accounts(self) -> list[MonarchAccount]:
89  """Return value accounts."""
90  return [x for x in self.accountsaccounts if x.is_value_account]
91 
92  @property
93  def balance_accounts(self) -> list[MonarchAccount]:
94  """Return accounts that aren't assets."""
95  return [x for x in self.accountsaccounts if x.is_balance_account]
None __init__(self, HomeAssistant hass, TypedMonarchMoney client)
Definition: coordinator.py:43