1 """Support for Plex media server monitoring."""
3 from __future__
import annotations
7 from plexapi.exceptions
import NotFound
8 import requests.exceptions
19 CONF_SERVER_IDENTIFIER,
21 PLEX_UPDATE_LIBRARY_SIGNAL,
22 PLEX_UPDATE_SENSOR_SIGNAL,
24 from .helpers
import get_plex_server, pretty_title
26 LIBRARY_ATTRIBUTE_TYPES = {
27 "artist": [
"artist",
"album"],
28 "photo": [
"photoalbum"],
29 "show": [
"show",
"season"],
32 LIBRARY_PRIMARY_LIBTYPE = {
37 LIBRARY_RECENT_LIBTYPE = {
42 LIBRARY_ICON_LOOKUP = {
43 "artist":
"mdi:music",
46 "show":
"mdi:television",
49 _LOGGER = logging.getLogger(__name__)
54 config_entry: ConfigEntry,
55 async_add_entities: AddEntitiesCallback,
57 """Set up Plex sensor from a config entry."""
58 server_id = config_entry.data[CONF_SERVER_IDENTIFIER]
60 sensors: list[SensorEntity] = [
PlexSensor(hass, plexserver)]
62 def create_library_sensors():
63 """Create Plex library sensors with sync calls."""
66 for library
in plexserver.library.sections()
69 await hass.async_add_executor_job(create_library_sensors)
74 """Representation of a Plex now playing sensor."""
76 _attr_has_entity_name =
True
78 _attr_translation_key =
"plex"
79 _attr_should_poll =
False
80 _attr_native_unit_of_measurement =
"watching"
83 """Initialize the sensor."""
96 """Run when about to be added to hass."""
97 server_id = self.
_server_server.machine_identifier
101 PLEX_UPDATE_SENSOR_SIGNAL.format(server_id),
107 """Set instance object and trigger an entity state update."""
108 _LOGGER.debug(
"Refreshing sensor [%s]", self.
unique_idunique_id)
114 """Return the state attributes."""
115 return self.
_server_server.sensor_attributes
119 """Return a device description for device registry."""
121 identifiers={(DOMAIN, self.
_server_server.machine_identifier)},
123 model=
"Plex Media Server",
124 name=self.
_server_server.friendly_name,
125 sw_version=self.
_server_server.version,
126 configuration_url=f
"{self._server.url_in_use}/web",
131 """Representation of a Plex library section sensor."""
133 _attr_available =
True
134 _attr_entity_registry_enabled_default =
False
135 _attr_should_poll =
False
136 _attr_native_unit_of_measurement =
"Items"
138 def __init__(self, hass, plex_server, plex_library_section):
139 """Initialize the sensor."""
142 self.
server_idserver_id = plex_server.machine_identifier
148 self.
_attr_name_attr_name = f
"{self.server_name} Library - {plex_library_section.title}"
149 self.
_attr_unique_id_attr_unique_id = f
"library-{self.server_id}-{plex_library_section.uuid}"
152 """Run when about to be added to hass."""
156 PLEX_UPDATE_LIBRARY_SIGNAL.format(self.
server_idserver_id),
163 """Update state and attributes for the library sensor."""
164 _LOGGER.debug(
"Refreshing library sensor for '%s'", self.
namename)
170 except requests.exceptions.RequestException
as err:
172 "Could not update library sensor for '%s': %s",
180 """Update library sensor state with sync calls."""
181 primary_libtype = LIBRARY_PRIMARY_LIBTYPE.get(
186 libtype=primary_libtype, includeCollections=
False
188 for libtype
in LIBRARY_ATTRIBUTE_TYPES.get(self.
library_typelibrary_type, []):
191 libtype=libtype, includeCollections=
False
195 recent_libtype = LIBRARY_RECENT_LIBTYPE.get(
199 maxresults=1, libtype=recent_libtype
202 media = recently_added[0]
208 """Return a device description for device registry."""
213 identifiers={(DOMAIN, self.
server_idserver_id)},
215 model=
"Plex Media Server",
217 sw_version=self.
_server_server.version,
218 configuration_url=f
"{self._server.url_in_use}/web",
None async_added_to_hass(self)
def _update_state_and_attrs(self)
def __init__(self, hass, plex_server, plex_library_section)
None async_refresh_sensor(self)
_attr_extra_state_attributes
DeviceInfo|None device_info(self)
def extra_state_attributes(self)
None _async_refresh_sensor(self)
None async_added_to_hass(self)
def __init__(self, hass, plex_server)
DeviceInfo|None device_info(self)
None async_write_ha_state(self)
None async_on_remove(self, CALLBACK_TYPE func)
str|UndefinedType|None name(self)
def pretty_title(media, short_name=False)
PlexServer get_plex_server(HomeAssistant hass, str server_id)
None async_setup_entry(HomeAssistant hass, ConfigEntry config_entry, AddEntitiesCallback async_add_entities)
Callable[[], None] async_dispatcher_connect(HomeAssistant hass, str signal, Callable[..., Any] target)