1 """Define the Azure DevOps DataUpdateCoordinator."""
3 from collections.abc
import Callable
4 from datetime
import timedelta
6 from typing
import Final
8 from aioazuredevops.client
import DevOpsClient
9 from aioazuredevops.helper
import (
11 work_item_types_states_filter,
12 work_items_by_type_and_state,
14 from aioazuredevops.models.build
import Build
15 from aioazuredevops.models.core
import Project
16 from aioazuredevops.models.work_item_type
import Category
25 from .const
import CONF_ORG, DOMAIN
26 from .data
import AzureDevOpsData
28 BUILDS_QUERY: Final =
"?queryOrder=queueTimeDescending&maxBuildsPerDefinition=1"
29 IGNORED_CATEGORIES: Final[list[Category]] = [Category.COMPLETED, Category.REMOVED]
33 """Handle exceptions or None to always return a value or raise."""
35 async
def handler(*args, **kwargs):
37 response = await func(*args, **kwargs)
38 except aiohttp.ClientError
as exception:
39 raise UpdateFailed
from exception
50 """Class to manage and fetch Azure DevOps data."""
59 logger: logging.Logger,
63 """Initialize global Azure DevOps data updater."""
76 @ado_exception_none_handler
79 personal_access_token: str,
81 """Authorize with Azure DevOps."""
83 personal_access_token,
86 if not self.
clientclient.authorized:
88 translation_domain=DOMAIN,
89 translation_key=
"authentication_failed",
90 translation_placeholders={
"title": self.
titletitle},
95 @ado_exception_none_handler
100 """Get the project."""
106 @ado_exception_none_handler
107 async
def _get_builds(self, project_name: str) -> list[Build] |
None:
108 """Get the builds."""
109 return await self.
clientclient.get_builds(
115 @ado_exception_none_handler
117 self, project_name: str
118 ) -> list[WorkItemTypeAndState] |
None:
119 """Get the work items."""
122 work_item_types := await self.
clientclient.get_work_item_types(
131 work_item_ids := await self.
clientclient.get_work_item_ids(
135 states=work_item_types_states_filter(
137 ignored_categories=IGNORED_CATEGORIES,
145 work_items := await self.
clientclient.get_work_items(
154 return work_items_by_type_and_state(
157 ignored_categories=IGNORED_CATEGORIES,
161 """Fetch data from Azure DevOps."""
163 builds = await self.
_get_builds_get_builds(self.project.name)
164 work_items = await self.
_get_work_items_get_work_items(self.project.name)
168 project=self.project,
170 work_items=work_items,
Project|None get_project(self, str project)
None __init__(self, HomeAssistant hass, logging.Logger logger, *ConfigEntry entry)
list[Build]|None _get_builds(self, str project_name)
list[WorkItemTypeAndState]|None _get_work_items(self, str project_name)
bool authorize(self, str personal_access_token)
AzureDevOpsData _async_update_data(self)
Callable ado_exception_none_handler(Callable func)
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)