1 """Component that will help set the OpenALPR cloud for ALPR processing."""
3 from __future__
import annotations
6 from base64
import b64encode
7 from http
import HTTPStatus
11 import voluptuous
as vol
16 PLATFORM_SCHEMA
as IMAGE_PROCESSING_PLATFORM_SCHEMA,
17 ImageProcessingDeviceClass,
18 ImageProcessingEntity,
35 _LOGGER = logging.getLogger(__name__)
38 ATTR_PLATES =
"plates"
39 ATTR_VEHICLES =
"vehicles"
41 EVENT_FOUND_PLATE =
"image_processing.found_plate"
43 OPENALPR_API_URL =
"https://api.openalpr.com/v1/recognize"
60 PLATFORM_SCHEMA = IMAGE_PROCESSING_PLATFORM_SCHEMA.extend(
62 vol.Required(CONF_API_KEY): cv.string,
63 vol.Required(CONF_REGION): vol.All(vol.Lower, vol.In(OPENALPR_REGIONS)),
71 async_add_entities: AddEntitiesCallback,
72 discovery_info: DiscoveryInfoType |
None =
None,
74 """Set up the OpenALPR cloud API platform."""
75 confidence = config[CONF_CONFIDENCE]
77 "secret_key": config[CONF_API_KEY],
80 "country": config[CONF_REGION],
85 camera[CONF_ENTITY_ID], params, confidence, camera.get(CONF_NAME)
87 for camera
in config[CONF_SOURCE]
92 """Base entity class for ALPR image processing."""
94 _attr_device_class = ImageProcessingDeviceClass.ALPR
97 """Initialize base ALPR entity."""
98 self.
platesplates: dict[str, float] = {}
103 """Return the state of the entity."""
108 for i_pl, i_co
in self.
platesplates.items():
109 if i_co > confidence:
116 """Return device specific state attributes."""
117 return {ATTR_PLATES: self.
platesplates, ATTR_VEHICLES: self.
vehiclesvehicles}
120 """Send event with new plates and store data."""
121 run_callback_threadsafe(
127 """Send event with new plates and store data.
129 Plates are a dict in follow format:
130 { '<plate>': confidence }
131 This method must be run in the event loop.
135 for plate, confidence
in plates.items()
138 new_plates = set(plates) - set(self.
platesplates)
141 for i_plate
in new_plates:
142 self.
hasshass.bus.async_fire(
147 ATTR_CONFIDENCE: plates.get(i_plate),
157 """Representation of an OpenALPR cloud entity."""
159 def __init__(self, camera_entity, params, confidence, name=None):
160 """Initialize OpenALPR cloud API."""
170 self.
_name_name = f
"OpenAlpr {split_entity_id(camera_entity)[1]}"
174 """Return minimum confidence for send events."""
179 """Return camera entity id from process pictures."""
184 """Return the name of the entity."""
185 return self.
_name_name
190 This method is a coroutine.
193 params = self.
_params_params.copy()
195 body = {
"image_bytes":
str(b64encode(image),
"utf-8")}
198 async
with asyncio.timeout(self.
timeouttimeout):
199 request = await websession.post(
200 OPENALPR_API_URL, params=params, data=body
203 data = await request.json()
205 if request.status != HTTPStatus.OK:
206 _LOGGER.error(
"Error %d -> %s", request.status, data.get(
"error"))
209 except (TimeoutError, aiohttp.ClientError):
210 _LOGGER.error(
"Timeout for OpenALPR API")
217 for row
in data[
"plate"][
"results"]:
220 for p_data
in row[
"candidates"]:
222 result.update({p_data[
"plate"]:
float(p_data[
"confidence"])})
float|None confidence(self)
None process_plates(self, dict[str, float] plates, int vehicles)
def extra_state_attributes(self)
None async_process_plates(self, dict[str, float] plates, int vehicles)
def async_process_image(self, image)
def __init__(self, camera_entity, params, confidence, name=None)
None async_setup_platform(HomeAssistant hass, ConfigType config, AddEntitiesCallback async_add_entities, DiscoveryInfoType|None discovery_info=None)
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)