Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """Component to allow running Python scripts."""
2 
3 import datetime
4 import glob
5 import logging
6 from numbers import Number
7 import operator
8 import os
9 import time
10 from typing import Any
11 
12 from RestrictedPython import (
13  compile_restricted_exec,
14  limited_builtins,
15  safe_builtins,
16  utility_builtins,
17 )
18 from RestrictedPython.Eval import default_guarded_getitem
19 from RestrictedPython.Guards import (
20  full_write_guard,
21  guarded_iter_unpack_sequence,
22  guarded_unpack_sequence,
23 )
24 import voluptuous as vol
25 
26 from homeassistant.const import CONF_DESCRIPTION, CONF_NAME, SERVICE_RELOAD
27 from homeassistant.core import (
28  HomeAssistant,
29  ServiceCall,
30  ServiceResponse,
31  SupportsResponse,
32 )
33 from homeassistant.exceptions import HomeAssistantError, ServiceValidationError
34 from homeassistant.helpers.service import async_set_service_schema
35 from homeassistant.helpers.typing import ConfigType
36 from homeassistant.loader import bind_hass
37 from homeassistant.util import raise_if_invalid_filename
38 import homeassistant.util.dt as dt_util
39 from homeassistant.util.yaml.loader import load_yaml_dict
40 
41 _LOGGER = logging.getLogger(__name__)
42 
43 DOMAIN = "python_script"
44 
45 FOLDER = "python_scripts"
46 
47 CONFIG_SCHEMA = vol.Schema({DOMAIN: vol.Schema(dict)}, extra=vol.ALLOW_EXTRA)
48 
49 ALLOWED_HASS = {"bus", "services", "states"}
50 ALLOWED_EVENTBUS = {"fire"}
51 ALLOWED_STATEMACHINE = {
52  "entity_ids",
53  "all",
54  "get",
55  "is_state",
56  "is_state_attr",
57  "remove",
58  "set",
59 }
60 ALLOWED_SERVICEREGISTRY = {"services", "has_service", "call"}
61 ALLOWED_TIME = {
62  "sleep",
63  "strftime",
64  "strptime",
65  "gmtime",
66  "localtime",
67  "ctime",
68  "time",
69  "mktime",
70 }
71 ALLOWED_DATETIME = {"date", "time", "datetime", "timedelta", "tzinfo"}
72 ALLOWED_DT_UTIL = {
73  "utcnow",
74  "now",
75  "as_utc",
76  "as_timestamp",
77  "as_local",
78  "utc_from_timestamp",
79  "start_of_local_day",
80  "parse_datetime",
81  "parse_date",
82  "get_age",
83 }
84 
85 CONF_FIELDS = "fields"
86 
87 
89  """When a script error occurs."""
90 
91 
92 def setup(hass: HomeAssistant, config: ConfigType) -> bool:
93  """Initialize the Python script component."""
94  path = hass.config.path(FOLDER)
95 
96  if not os.path.isdir(path):
97  _LOGGER.warning("Folder %s not found in configuration folder", FOLDER)
98  return False
99 
100  discover_scripts(hass)
101 
102  def reload_scripts_handler(call: ServiceCall) -> None:
103  """Handle reload service calls."""
104  discover_scripts(hass)
105 
106  hass.services.register(DOMAIN, SERVICE_RELOAD, reload_scripts_handler)
107 
108  return True
109 
110 
111 def discover_scripts(hass: HomeAssistant) -> None:
112  """Discover python scripts in folder."""
113  path = hass.config.path(FOLDER)
114 
115  if not os.path.isdir(path):
116  _LOGGER.warning("Folder %s not found in configuration folder", FOLDER)
117  return
118 
119  def python_script_service_handler(call: ServiceCall) -> ServiceResponse:
120  """Handle python script service calls."""
121  return execute_script(hass, call.service, call.data, call.return_response)
122 
123  existing = hass.services.services.get(DOMAIN, {}).keys()
124  for existing_service in existing:
125  if existing_service == SERVICE_RELOAD:
126  continue
127  hass.services.remove(DOMAIN, existing_service)
128 
129  # Load user-provided service descriptions from python_scripts/services.yaml
130  services_yaml = os.path.join(path, "services.yaml")
131  if os.path.exists(services_yaml):
132  services_dict = load_yaml_dict(services_yaml)
133  else:
134  services_dict = {}
135 
136  for fil in glob.iglob(os.path.join(path, "*.py")):
137  name = os.path.splitext(os.path.basename(fil))[0]
138  hass.services.register(
139  DOMAIN,
140  name,
141  python_script_service_handler,
142  supports_response=SupportsResponse.OPTIONAL,
143  )
144 
145  service_desc = {
146  CONF_NAME: services_dict.get(name, {}).get("name", name),
147  CONF_DESCRIPTION: services_dict.get(name, {}).get("description", ""),
148  CONF_FIELDS: services_dict.get(name, {}).get("fields", {}),
149  }
150  async_set_service_schema(hass, DOMAIN, name, service_desc)
151 
152 
153 IOPERATOR_TO_OPERATOR = {
154  "%=": operator.mod,
155  "&=": operator.and_,
156  "**=": operator.pow,
157  "*=": operator.mul,
158  "+=": operator.add,
159  "-=": operator.sub,
160  "//=": operator.floordiv,
161  "/=": operator.truediv,
162  "<<=": operator.lshift,
163  ">>=": operator.rshift,
164  "@=": operator.matmul,
165  "^=": operator.xor,
166  "|=": operator.or_,
167 }
168 
169 
170 def guarded_inplacevar(op: str, target: Any, operand: Any) -> Any:
171  """Implement augmented-assign (+=, -=, etc.) operators for restricted code.
172 
173  See RestrictedPython's `visit_AugAssign` for details.
174  """
175  if not isinstance(target, (list, Number, str)):
176  raise ScriptError(f"The {op!r} operation is not allowed on a {type(target)}")
177  op_fun = IOPERATOR_TO_OPERATOR.get(op)
178  if not op_fun:
179  raise ScriptError(f"The {op!r} operation is not allowed")
180  return op_fun(target, operand)
181 
182 
183 @bind_hass
184 def execute_script(hass, name, data=None, return_response=False):
185  """Execute a script."""
186  filename = f"{name}.py"
187  raise_if_invalid_filename(filename)
188  with open(hass.config.path(FOLDER, filename), encoding="utf8") as fil:
189  source = fil.read()
190  return execute(hass, filename, source, data, return_response=return_response)
191 
192 
193 @bind_hass
194 def execute(hass, filename, source, data=None, return_response=False):
195  """Execute Python source."""
196 
197  compiled = compile_restricted_exec(source, filename=filename)
198 
199  if compiled.errors:
200  _LOGGER.error(
201  "Error loading script %s: %s", filename, ", ".join(compiled.errors)
202  )
203  return None
204 
205  if compiled.warnings:
206  _LOGGER.warning(
207  "Warning loading script %s: %s", filename, ", ".join(compiled.warnings)
208  )
209 
210  def protected_getattr(obj, name, default=None):
211  """Restricted method to get attributes."""
212  if name.startswith("async_"):
213  raise ScriptError("Not allowed to access async methods")
214  if (
215  obj is hass
216  and name not in ALLOWED_HASS
217  or obj is hass.bus
218  and name not in ALLOWED_EVENTBUS
219  or obj is hass.states
220  and name not in ALLOWED_STATEMACHINE
221  or obj is hass.services
222  and name not in ALLOWED_SERVICEREGISTRY
223  or obj is dt_util
224  and name not in ALLOWED_DT_UTIL
225  or obj is datetime
226  and name not in ALLOWED_DATETIME
227  or isinstance(obj, TimeWrapper)
228  and name not in ALLOWED_TIME
229  ):
230  raise ScriptError(f"Not allowed to access {obj.__class__.__name__}.{name}")
231 
232  return getattr(obj, name, default)
233 
234  extra_builtins = {
235  "datetime": datetime,
236  "sorted": sorted,
237  "time": TimeWrapper(),
238  "dt_util": dt_util,
239  "min": min,
240  "max": max,
241  "sum": sum,
242  "any": any,
243  "all": all,
244  "enumerate": enumerate,
245  }
246  builtins = safe_builtins.copy()
247  builtins.update(utility_builtins)
248  builtins.update(limited_builtins)
249  builtins.update(extra_builtins)
250  logger = logging.getLogger(f"{__name__}.{filename}")
251  restricted_globals = {
252  "__builtins__": builtins,
253  "_print_": StubPrinter,
254  "_getattr_": protected_getattr,
255  "_write_": full_write_guard,
256  "_getiter_": iter,
257  "_getitem_": default_guarded_getitem,
258  "_iter_unpack_sequence_": guarded_iter_unpack_sequence,
259  "_unpack_sequence_": guarded_unpack_sequence,
260  "_inplacevar_": guarded_inplacevar,
261  "hass": hass,
262  "data": data or {},
263  "logger": logger,
264  "output": {},
265  }
266 
267  try:
268  _LOGGER.info("Executing %s: %s", filename, data)
269  # pylint: disable-next=exec-used
270  exec(compiled.code, restricted_globals) # noqa: S102
271  _LOGGER.debug(
272  "Output of python_script: `%s`:\n%s",
273  filename,
274  restricted_globals["output"],
275  )
276  # Ensure that we're always returning a dictionary
277  if not isinstance(restricted_globals["output"], dict):
278  output_type = type(restricted_globals["output"])
279  restricted_globals["output"] = {}
280  raise ScriptError( # noqa: TRY301
281  f"Expected `output` to be a dictionary, was {output_type}"
282  )
283  except ScriptError as err:
284  if return_response:
285  raise ServiceValidationError(f"Error executing script: {err}") from err
286  logger.error("Error executing script: %s", err)
287  return None
288  except Exception as err:
289  if return_response:
290  raise HomeAssistantError(
291  f"Error executing script ({type(err).__name__}): {err}"
292  ) from err
293  logger.exception("Error executing script")
294  return None
295 
296  return restricted_globals["output"]
297 
298 
300  """Class to handle printing inside scripts."""
301 
302  def __init__(self, _getattr_):
303  """Initialize our printer."""
304 
305  def _call_print(self, *objects, **kwargs):
306  """Print text."""
307  _LOGGER.warning("Don't use print() inside scripts. Use logger.info() instead")
308 
309 
311  """Wrap the time module."""
312 
313  # Class variable, only going to warn once per Home Assistant run
314  warned = False
315 
316  def sleep(self, *args, **kwargs):
317  """Sleep method that warns once."""
318  if not TimeWrapper.warned:
319  TimeWrapper.warned = True
320  _LOGGER.warning(
321  "Using time.sleep can reduce the performance of Home Assistant"
322  )
323 
324  time.sleep(*args, **kwargs)
325 
326  def __getattr__(self, attr):
327  """Fetch an attribute from Time module."""
328  attribute = getattr(time, attr)
329  if callable(attribute):
330 
331  def wrapper(*args, **kw):
332  """Wrap to return callable method if callable."""
333  return attribute(*args, **kw)
334 
335  return wrapper
336  return attribute
def _call_print(self, *objects, **kwargs)
Definition: __init__.py:305
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
None open(self, **Any kwargs)
Definition: lock.py:86
None discover_scripts(HomeAssistant hass)
Definition: __init__.py:111
bool setup(HomeAssistant hass, ConfigType config)
Definition: __init__.py:92
Any guarded_inplacevar(str op, Any target, Any operand)
Definition: __init__.py:170
def execute(hass, filename, source, data=None, return_response=False)
Definition: __init__.py:194
def execute_script(hass, name, data=None, return_response=False)
Definition: __init__.py:184
None async_set_service_schema(HomeAssistant hass, str domain, str service, dict[str, Any] schema)
Definition: service.py:844
dict load_yaml_dict(str|os.PathLike[str] fname, Secrets|None secrets=None)
Definition: loader.py:180
None raise_if_invalid_filename(str filename)
Definition: __init__.py:23