Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """Support for Iperf3 network measurement tool."""
2 
3 from __future__ import annotations
4 
5 from datetime import timedelta
6 import logging
7 
8 import iperf3
9 import voluptuous as vol
10 
12  DOMAIN as SENSOR_DOMAIN,
13  SensorDeviceClass,
14  SensorEntityDescription,
15  SensorStateClass,
16 )
17 from homeassistant.const import (
18  CONF_HOST,
19  CONF_HOSTS,
20  CONF_MONITORED_CONDITIONS,
21  CONF_PORT,
22  CONF_PROTOCOL,
23  CONF_SCAN_INTERVAL,
24  UnitOfDataRate,
25 )
26 from homeassistant.core import HomeAssistant, ServiceCall
28 from homeassistant.helpers.discovery import async_load_platform
29 from homeassistant.helpers.dispatcher import dispatcher_send
30 from homeassistant.helpers.event import async_track_time_interval
31 from homeassistant.helpers.typing import ConfigType
32 
33 DOMAIN = "iperf3"
34 DATA_UPDATED = f"{DOMAIN}_data_updated"
35 
36 _LOGGER = logging.getLogger(__name__)
37 
38 CONF_DURATION = "duration"
39 CONF_PARALLEL = "parallel"
40 CONF_MANUAL = "manual"
41 
42 DEFAULT_DURATION = 10
43 DEFAULT_PORT = 5201
44 DEFAULT_PARALLEL = 1
45 DEFAULT_PROTOCOL = "tcp"
46 DEFAULT_INTERVAL = timedelta(minutes=60)
47 
48 ATTR_DOWNLOAD = "download"
49 ATTR_UPLOAD = "upload"
50 ATTR_VERSION = "Version"
51 ATTR_HOST = "host"
52 
53 SENSOR_TYPES: tuple[SensorEntityDescription, ...] = (
55  key=ATTR_DOWNLOAD,
56  name=ATTR_DOWNLOAD.capitalize(),
57  icon="mdi:download",
58  state_class=SensorStateClass.MEASUREMENT,
59  device_class=SensorDeviceClass.DATA_RATE,
60  native_unit_of_measurement=UnitOfDataRate.MEGABITS_PER_SECOND,
61  ),
63  key=ATTR_UPLOAD,
64  name=ATTR_UPLOAD.capitalize(),
65  icon="mdi:upload",
66  state_class=SensorStateClass.MEASUREMENT,
67  device_class=SensorDeviceClass.DATA_RATE,
68  native_unit_of_measurement=UnitOfDataRate.MEGABITS_PER_SECOND,
69  ),
70 )
71 SENSOR_KEYS: list[str] = [desc.key for desc in SENSOR_TYPES]
72 
73 PROTOCOLS = ["tcp", "udp"]
74 
75 HOST_CONFIG_SCHEMA = vol.Schema(
76  {
77  vol.Required(CONF_HOST): cv.string,
78  vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.port,
79  vol.Optional(CONF_DURATION, default=DEFAULT_DURATION): vol.Range(5, 10),
80  vol.Optional(CONF_PARALLEL, default=DEFAULT_PARALLEL): vol.Range(1, 20),
81  vol.Optional(CONF_PROTOCOL, default=DEFAULT_PROTOCOL): vol.In(PROTOCOLS),
82  }
83 )
84 
85 CONFIG_SCHEMA = vol.Schema(
86  {
87  DOMAIN: vol.Schema(
88  {
89  vol.Required(CONF_HOSTS): vol.All(cv.ensure_list, [HOST_CONFIG_SCHEMA]),
90  vol.Optional(CONF_MONITORED_CONDITIONS, default=SENSOR_KEYS): vol.All(
91  cv.ensure_list, [vol.In(SENSOR_KEYS)]
92  ),
93  vol.Optional(CONF_SCAN_INTERVAL, default=DEFAULT_INTERVAL): vol.All(
94  cv.time_period, cv.positive_timedelta
95  ),
96  vol.Optional(CONF_MANUAL, default=False): cv.boolean,
97  }
98  )
99  },
100  extra=vol.ALLOW_EXTRA,
101 )
102 
103 SERVICE_SCHEMA = vol.Schema({vol.Optional(ATTR_HOST, default=None): cv.string})
104 
105 
106 async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
107  """Set up the iperf3 component."""
108  hass.data[DOMAIN] = {}
109 
110  conf = config[DOMAIN]
111  for host in conf[CONF_HOSTS]:
112  data = hass.data[DOMAIN][host[CONF_HOST]] = Iperf3Data(hass, host)
113 
114  if not conf[CONF_MANUAL]:
115  async_track_time_interval(hass, data.update, conf[CONF_SCAN_INTERVAL])
116 
117  def update(call: ServiceCall) -> None:
118  """Service call to manually update the data."""
119  called_host = call.data[ATTR_HOST]
120  if called_host in hass.data[DOMAIN]:
121  hass.data[DOMAIN][called_host].update()
122  else:
123  for iperf3_host in hass.data[DOMAIN].values():
124  iperf3_host.update()
125 
126  hass.services.async_register(DOMAIN, "speedtest", update, schema=SERVICE_SCHEMA)
127 
128  hass.async_create_task(
130  hass,
131  SENSOR_DOMAIN,
132  DOMAIN,
133  {CONF_MONITORED_CONDITIONS: conf[CONF_MONITORED_CONDITIONS]},
134  config,
135  )
136  )
137 
138  return True
139 
140 
142  """Get the latest data from iperf3."""
143 
144  def __init__(self, hass, host):
145  """Initialize the data object."""
146  self._hass_hass = hass
147  self._host_host = host
148  self.datadata = {ATTR_DOWNLOAD: None, ATTR_UPLOAD: None, ATTR_VERSION: None}
149 
150  def create_client(self):
151  """Create a new iperf3 client to use for measurement."""
152  client = iperf3.Client()
153  client.duration = self._host_host[CONF_DURATION]
154  client.server_hostname = self._host_host[CONF_HOST]
155  client.port = self._host_host[CONF_PORT]
156  client.num_streams = self._host_host[CONF_PARALLEL]
157  client.protocol = self._host_host[CONF_PROTOCOL]
158  client.verbose = False
159  return client
160 
161  @property
162  def protocol(self):
163  """Return the protocol used for this connection."""
164  return self._host_host[CONF_PROTOCOL]
165 
166  @property
167  def host(self):
168  """Return the host connected to."""
169  return self._host_host[CONF_HOST]
170 
171  @property
172  def port(self):
173  """Return the port on the host connected to."""
174  return self._host_host[CONF_PORT]
175 
176  def update(self, now=None):
177  """Get the latest data from iperf3."""
178  if self.protocolprotocolprotocol == "udp":
179  # UDP only have 1 way attribute
180  result = self._run_test_run_test(ATTR_DOWNLOAD)
181  self.datadata[ATTR_DOWNLOAD] = self.datadata[ATTR_UPLOAD] = getattr(
182  result, "Mbps", None
183  )
184  self.datadata[ATTR_VERSION] = getattr(result, "version", None)
185  else:
186  result = self._run_test_run_test(ATTR_DOWNLOAD)
187  self.datadata[ATTR_DOWNLOAD] = getattr(result, "received_Mbps", None)
188  self.datadata[ATTR_VERSION] = getattr(result, "version", None)
189  self.datadata[ATTR_UPLOAD] = getattr(
190  self._run_test_run_test(ATTR_UPLOAD), "sent_Mbps", None
191  )
192 
193  dispatcher_send(self._hass_hass, DATA_UPDATED, self.hosthost)
194 
195  def _run_test(self, test_type):
196  """Run and return the iperf3 data."""
197  client = self.create_clientcreate_client()
198  client.reverse = test_type == ATTR_DOWNLOAD
199  try:
200  result = client.run()
201  except (AttributeError, OSError, ValueError) as error:
202  _LOGGER.error("Iperf3 error: %s", error)
203  return None
204 
205  if result is not None and hasattr(result, "error") and result.error is not None:
206  _LOGGER.error("Iperf3 error: %s", result.error)
207  return None
208 
209  return result
bool async_setup(HomeAssistant hass, ConfigType config)
Definition: __init__.py:106
IssData update(pyiss.ISS iss)
Definition: __init__.py:33
None async_load_platform(core.HomeAssistant hass, Platform|str component, str platform, DiscoveryInfoType|None discovered, ConfigType hass_config)
Definition: discovery.py:152
None dispatcher_send(HomeAssistant hass, str signal, *Any args)
Definition: dispatcher.py:137
CALLBACK_TYPE async_track_time_interval(HomeAssistant hass, Callable[[datetime], Coroutine[Any, Any, None]|None] action, timedelta interval, *str|None name=None, bool|None cancel_on_shutdown=None)
Definition: event.py:1679