Home Assistant Unofficial Reference 2024.12.1
sensor.py
Go to the documentation of this file.
1 """Support for haveibeenpwned (email breaches) sensor."""
2 
3 from __future__ import annotations
4 
5 from datetime import timedelta
6 from http import HTTPStatus
7 import logging
8 
9 import requests
10 import voluptuous as vol
11 
13  PLATFORM_SCHEMA as SENSOR_PLATFORM_SCHEMA,
14  SensorEntity,
15 )
16 from homeassistant.const import CONF_API_KEY, CONF_EMAIL
17 from homeassistant.core import HomeAssistant
19 from homeassistant.helpers.entity_platform import AddEntitiesCallback
20 from homeassistant.helpers.event import track_point_in_time
21 from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
22 from homeassistant.util import Throttle
23 import homeassistant.util.dt as dt_util
24 
25 _LOGGER = logging.getLogger(__name__)
26 
27 DATE_STR_FORMAT = "%Y-%m-%d %H:%M:%S"
28 
29 HA_USER_AGENT = "Home Assistant HaveIBeenPwned Sensor Component"
30 
31 MIN_TIME_BETWEEN_FORCED_UPDATES = timedelta(seconds=5)
32 MIN_TIME_BETWEEN_UPDATES = timedelta(minutes=15)
33 
34 URL = "https://haveibeenpwned.com/api/v3/breachedaccount/"
35 
36 PLATFORM_SCHEMA = SENSOR_PLATFORM_SCHEMA.extend(
37  {
38  vol.Required(CONF_EMAIL): vol.All(cv.ensure_list, [cv.string]),
39  vol.Required(CONF_API_KEY): cv.string,
40  }
41 )
42 
43 
45  hass: HomeAssistant,
46  config: ConfigType,
47  add_entities: AddEntitiesCallback,
48  discovery_info: DiscoveryInfoType | None = None,
49 ) -> None:
50  """Set up the HaveIBeenPwned sensor."""
51  emails = config[CONF_EMAIL]
52  api_key = config[CONF_API_KEY]
53  data = HaveIBeenPwnedData(emails, api_key)
54 
55  add_entities(HaveIBeenPwnedSensor(data, email) for email in emails)
56 
57 
59  """Implementation of a HaveIBeenPwned sensor."""
60 
61  _attr_attribution = "Data provided by Have I Been Pwned (HIBP)"
62 
63  def __init__(self, data, email):
64  """Initialize the HaveIBeenPwned sensor."""
65  self._state_state = None
66  self._data_data = data
67  self._email_email = email
68  self._unit_of_measurement_unit_of_measurement = "Breaches"
69 
70  @property
71  def name(self):
72  """Return the name of the sensor."""
73  return f"Breaches {self._email}"
74 
75  @property
77  """Return the unit the value is expressed in."""
78  return self._unit_of_measurement_unit_of_measurement
79 
80  @property
81  def native_value(self):
82  """Return the state of the device."""
83  return self._state_state
84 
85  @property
87  """Return the attributes of the sensor."""
88  val = {}
89  if self._email_email not in self._data_data.data:
90  return val
91 
92  for idx, value in enumerate(self._data_data.data[self._email_email]):
93  tmpname = f"breach {idx + 1}"
94  datetime_local = dt_util.as_local(
95  dt_util.parse_datetime(value["AddedDate"])
96  )
97  tmpvalue = f"{value['Title']} {datetime_local.strftime(DATE_STR_FORMAT)}"
98  val[tmpname] = tmpvalue
99 
100  return val
101 
102  async def async_added_to_hass(self) -> None:
103  """Get initial data."""
104  # To make sure we get initial data for the sensors ignoring the normal
105  # throttle of 15 minutes but using an update throttle of 5 seconds
106  self.hasshass.async_add_executor_job(self.update_nothrottleupdate_nothrottle)
107 
108  def update_nothrottle(self, dummy=None):
109  """Update sensor without throttle."""
110  self._data_data.update_no_throttle()
111 
112  # Schedule a forced update 5 seconds in the future if the update above
113  # returned no data for this sensors email. This is mainly to make sure
114  # that we don't get HTTP Error "too many requests" and to have initial
115  # data after hass startup once we have the data it will update as
116  # normal using update
117  if self._email_email not in self._data_data.data:
119  self.hasshass,
120  self.update_nothrottleupdate_nothrottle,
121  dt_util.now() + MIN_TIME_BETWEEN_FORCED_UPDATES,
122  )
123  return
124 
125  self._state_state = len(self._data_data.data[self._email_email])
126  self.schedule_update_ha_stateschedule_update_ha_state()
127 
128  def update(self) -> None:
129  """Update data and see if it contains data for our email."""
130  self._data_data.update()
131 
132  if self._email_email in self._data_data.data:
133  self._state_state = len(self._data_data.data[self._email_email])
134 
135 
137  """Class for handling the data retrieval."""
138 
139  def __init__(self, emails, api_key):
140  """Initialize the data object."""
141  self._email_count_email_count = len(emails)
142  self._current_index_current_index = 0
143  self.datadata = {}
144  self._email_email = emails[0]
145  self._emails_emails = emails
146  self._api_key_api_key = api_key
147 
148  def set_next_email(self):
149  """Set the next email to be looked up."""
150  self._current_index_current_index = (self._current_index_current_index + 1) % self._email_count_email_count
151  self._email_email = self._emails_emails[self._current_index_current_index]
152 
154  """Get the data for a specific email."""
155  self.updateupdate(no_throttle=True)
156 
157  @Throttle(MIN_TIME_BETWEEN_UPDATES, MIN_TIME_BETWEEN_FORCED_UPDATES)
158  def update(self, **kwargs):
159  """Get the latest data for current email from REST service."""
160  try:
161  url = f"{URL}{self._email}?truncateResponse=false"
162  header = {"User-Agent": HA_USER_AGENT, "hibp-api-key": self._api_key_api_key}
163  _LOGGER.debug("Checking for breaches for email: %s", self._email_email)
164  req = requests.get(url, headers=header, allow_redirects=True, timeout=5)
165 
166  except requests.exceptions.RequestException:
167  _LOGGER.error("Failed fetching data for %s", self._email_email)
168  return
169 
170  if req.status_code == HTTPStatus.OK:
171  self.datadata[self._email_email] = sorted(
172  req.json(), key=lambda k: k["AddedDate"], reverse=True
173  )
174 
175  # Only goto next email if we had data so that
176  # the forced updates try this current email again
177  self.set_next_emailset_next_email()
178 
179  elif req.status_code == HTTPStatus.NOT_FOUND:
180  self.datadata[self._email_email] = []
181 
182  # only goto next email if we had data so that
183  # the forced updates try this current email again
184  self.set_next_emailset_next_email()
185 
186  else:
187  _LOGGER.error(
188  "Failed fetching data for %s (HTTP Status_code = %d)",
189  self._email_email,
190  req.status_code,
191  )
None schedule_update_ha_state(self, bool force_refresh=False)
Definition: entity.py:1244
None setup_platform(HomeAssistant hass, ConfigType config, AddEntitiesCallback add_entities, DiscoveryInfoType|None discovery_info=None)
Definition: sensor.py:49
def add_entities(account, async_add_entities, tracked)
Definition: sensor.py:40