1 """Common code for TCP component."""
3 from __future__
import annotations
9 from typing
import Final
18 CONF_UNIT_OF_MEASUREMENT,
27 from .const
import CONF_BUFFER_SIZE, CONF_VALUE_ON
28 from .model
import TcpSensorConfig
30 _LOGGER: Final = logging.getLogger(__name__)
34 """Base entity class for TCP platform."""
36 def __init__(self, hass: HomeAssistant, config: ConfigType) ->
None:
37 """Set all the config values if they exist and get initial state."""
40 self._config: TcpSensorConfig = {
41 CONF_NAME: config[CONF_NAME],
42 CONF_HOST: config[CONF_HOST],
43 CONF_PORT: config[CONF_PORT],
44 CONF_TIMEOUT: config[CONF_TIMEOUT],
45 CONF_PAYLOAD: config[CONF_PAYLOAD],
46 CONF_UNIT_OF_MEASUREMENT: config.get(CONF_UNIT_OF_MEASUREMENT),
47 CONF_VALUE_TEMPLATE: config.get(CONF_VALUE_TEMPLATE),
48 CONF_VALUE_ON: config.get(CONF_VALUE_ON),
49 CONF_BUFFER_SIZE: config[CONF_BUFFER_SIZE],
50 CONF_SSL: config[CONF_SSL],
51 CONF_VERIFY_SSL: config[CONF_VERIFY_SSL],
54 self.
_ssl_context_ssl_context: ssl.SSLContext |
None =
None
55 if self._config[CONF_SSL]:
57 if not self._config[CONF_VERIFY_SSL]:
59 self.
_ssl_context_ssl_context.verify_mode = ssl.CERT_NONE
61 self.
_state_state: str |
None =
None
66 """Return the name of this sensor."""
67 return self._config[CONF_NAME]
70 """Get the latest value for this sensor."""
71 with socket.socket(socket.AF_INET, socket.SOCK_STREAM)
as sock:
72 sock.settimeout(self._config[CONF_TIMEOUT])
74 sock.connect((self._config[CONF_HOST], self._config[CONF_PORT]))
75 except OSError
as err:
77 "Unable to connect to %s on port %s: %s",
78 self._config[CONF_HOST],
79 self._config[CONF_PORT],
86 sock, server_hostname=self._config[CONF_HOST]
90 sock.send(self._config[CONF_PAYLOAD].encode())
91 except OSError
as err:
93 "Unable to send payload %r to %s on port %s: %s",
94 self._config[CONF_PAYLOAD],
95 self._config[CONF_HOST],
96 self._config[CONF_PORT],
101 readable, _, _ = select.select([sock], [], [], self._config[CONF_TIMEOUT])
105 "Timeout (%s second(s)) waiting for a response after "
106 "sending %r to %s on port %s"
108 self._config[CONF_TIMEOUT],
109 self._config[CONF_PAYLOAD],
110 self._config[CONF_HOST],
111 self._config[CONF_PORT],
115 value = sock.recv(self._config[CONF_BUFFER_SIZE]).decode()
117 value_template = self._config[CONF_VALUE_TEMPLATE]
118 if value_template
is not None:
120 self.
_state_state = value_template.render(parse_result=
False, value=value)
121 except TemplateError:
123 "Unable to render template of %r with value: %r",
124 self._config[CONF_VALUE_TEMPLATE],
None __init__(self, HomeAssistant hass, ConfigType config)