1 """Data coordinator for monarch money."""
4 from dataclasses
import dataclass
5 from datetime
import datetime, timedelta
7 from aiohttp
import ClientResponseError
8 from gql.transport.exceptions
import TransportServerError
9 from monarchmoney
import LoginFailedException
10 from typedmonarchmoney
import TypedMonarchMoney
11 from typedmonarchmoney.models
import (
13 MonarchCashflowSummary,
22 from .const
import LOGGER
27 """Data class to hold monarch data."""
29 account_data: dict[str, MonarchAccount]
30 cashflow_summary: MonarchCashflowSummary
34 """Data update coordinator for Monarch Money."""
36 config_entry: ConfigEntry
42 client: TypedMonarchMoney,
44 """Initialize the coordinator."""
54 """Obtain subscription ID in setup phase."""
56 sub_details: MonarchSubscription = (
57 await self.
clientclient.get_subscription_details()
59 except (TransportServerError, LoginFailedException, ClientResponseError)
as err:
64 """Fetch data for all accounts."""
68 account_data, cashflow_summary = await asyncio.gather(
69 self.
clientclient.get_accounts_as_dict_with_id_key(),
70 self.
clientclient.get_cashflow_summary(
71 start_date=f
"{now.year}-01-01", end_date=f
"{now.year}-12-31"
75 return MonarchData(account_data=account_data, cashflow_summary=cashflow_summary)
79 """Return cashflow summary."""
80 return self.
datadata.cashflow_summary
84 """Return accounts."""
85 return list(self.
datadata.account_data.values())
89 """Return value accounts."""
90 return [x
for x
in self.
accountsaccounts
if x.is_value_account]
94 """Return accounts that aren't assets."""
95 return [x
for x
in self.
accountsaccounts
if x.is_balance_account]
MonarchCashflowSummary cashflow_summary(self)
list[MonarchAccount] value_accounts(self)
MonarchData _async_update_data(self)
None __init__(self, HomeAssistant hass, TypedMonarchMoney client)
list[MonarchAccount] accounts(self)
list[MonarchAccount] balance_accounts(self)