1 """Support for the Torque OBD application."""
3 from __future__
import annotations
7 from aiohttp
import web
8 import voluptuous
as vol
12 PLATFORM_SCHEMA
as SENSOR_PLATFORM_SCHEMA,
21 API_PATH =
"/api/torque"
23 DEFAULT_NAME =
"vehicle"
26 ENTITY_NAME_FORMAT =
"{0} {1}"
28 SENSOR_EMAIL_FIELD =
"eml"
29 SENSOR_NAME_KEY =
r"userFullName(\w+)"
30 SENSOR_UNIT_KEY =
r"userUnit(\w+)"
31 SENSOR_VALUE_KEY =
r"k(\w+)"
33 NAME_KEY = re.compile(SENSOR_NAME_KEY)
34 UNIT_KEY = re.compile(SENSOR_UNIT_KEY)
35 VALUE_KEY = re.compile(SENSOR_VALUE_KEY)
37 PLATFORM_SCHEMA = SENSOR_PLATFORM_SCHEMA.extend(
39 vol.Required(CONF_EMAIL): cv.string,
40 vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
46 """Convert pid from hex string to integer."""
53 async_add_entities: AddEntitiesCallback,
54 discovery_info: DiscoveryInfoType |
None =
None,
56 """Set up the Torque platform."""
57 vehicle: str |
None = config.get(CONF_NAME)
58 email: str |
None = config.get(CONF_EMAIL)
59 sensors: dict[int, TorqueSensor] = {}
61 hass.http.register_view(
67 """Handle data from Torque requests."""
76 sensors: dict[int, TorqueSensor],
77 async_add_entities: AddEntitiesCallback,
79 """Initialize a Torque view."""
86 def get(self, request: web.Request) -> str |
None:
87 """Handle Torque data request."""
90 if self.
emailemail
is not None and self.
emailemail != data[SENSOR_EMAIL_FIELD]:
96 is_name = NAME_KEY.match(key)
97 is_unit = UNIT_KEY.match(key)
98 is_value = VALUE_KEY.match(key)
102 names[pid] = data[key]
106 temp_unit = data[key]
107 if "\\xC2\\xB0" in temp_unit:
108 temp_unit = temp_unit.replace(
"\\xC2\\xB0", DEGREE)
110 units[pid] = temp_unit
114 self.
sensorssensors[pid].async_on_update(data[key])
116 new_sensor_entities: list[TorqueSensor] = []
117 for pid, name
in names.items():
118 if pid
not in self.
sensorssensors:
120 ENTITY_NAME_FORMAT.format(self.
vehiclevehicle, name), units.get(pid)
122 new_sensor_entities.append(torque_sensor_entity)
123 self.
sensorssensors[pid] = torque_sensor_entity
125 if new_sensor_entities:
132 """Representation of a Torque sensor."""
135 """Initialize the sensor."""
142 """Return the name of the sensor."""
143 return self.
_name_name
147 """Return the unit of measurement."""
148 return self.
_unit_unit
152 """Return the state of the sensor."""
157 """Return the default icon of the sensor."""
162 """Receive an update."""
str|None get(self, web.Request request)
None __init__(self, str|None email, str|None vehicle, dict[int, TorqueSensor] sensors, AddEntitiesCallback async_add_entities)
def __init__(self, name, unit)
def native_unit_of_measurement(self)
def async_on_update(self, value)
None async_write_ha_state(self)
None async_setup_platform(HomeAssistant hass, ConfigType config, AddEntitiesCallback async_add_entities, DiscoveryInfoType|None discovery_info=None)