1 """Component to allow running Python scripts."""
6 from numbers
import Number
10 from typing
import Any
12 from RestrictedPython
import (
13 compile_restricted_exec,
18 from RestrictedPython.Eval
import default_guarded_getitem
19 from RestrictedPython.Guards
import (
21 guarded_iter_unpack_sequence,
22 guarded_unpack_sequence,
24 import voluptuous
as vol
41 _LOGGER = logging.getLogger(__name__)
43 DOMAIN =
"python_script"
45 FOLDER =
"python_scripts"
47 CONFIG_SCHEMA = vol.Schema({DOMAIN: vol.Schema(dict)}, extra=vol.ALLOW_EXTRA)
49 ALLOWED_HASS = {
"bus",
"services",
"states"}
50 ALLOWED_EVENTBUS = {
"fire"}
51 ALLOWED_STATEMACHINE = {
60 ALLOWED_SERVICEREGISTRY = {
"services",
"has_service",
"call"}
71 ALLOWED_DATETIME = {
"date",
"time",
"datetime",
"timedelta",
"tzinfo"}
85 CONF_FIELDS =
"fields"
89 """When a script error occurs."""
92 def setup(hass: HomeAssistant, config: ConfigType) -> bool:
93 """Initialize the Python script component."""
94 path = hass.config.path(FOLDER)
96 if not os.path.isdir(path):
97 _LOGGER.warning(
"Folder %s not found in configuration folder", FOLDER)
102 def reload_scripts_handler(call: ServiceCall) ->
None:
103 """Handle reload service calls."""
106 hass.services.register(DOMAIN, SERVICE_RELOAD, reload_scripts_handler)
112 """Discover python scripts in folder."""
113 path = hass.config.path(FOLDER)
115 if not os.path.isdir(path):
116 _LOGGER.warning(
"Folder %s not found in configuration folder", FOLDER)
119 def python_script_service_handler(call: ServiceCall) -> ServiceResponse:
120 """Handle python script service calls."""
121 return execute_script(hass, call.service, call.data, call.return_response)
123 existing = hass.services.services.get(DOMAIN, {}).keys()
124 for existing_service
in existing:
125 if existing_service == SERVICE_RELOAD:
127 hass.services.remove(DOMAIN, existing_service)
130 services_yaml = os.path.join(path,
"services.yaml")
131 if os.path.exists(services_yaml):
136 for fil
in glob.iglob(os.path.join(path,
"*.py")):
137 name = os.path.splitext(os.path.basename(fil))[0]
138 hass.services.register(
141 python_script_service_handler,
142 supports_response=SupportsResponse.OPTIONAL,
146 CONF_NAME: services_dict.get(name, {}).
get(
"name", name),
147 CONF_DESCRIPTION: services_dict.get(name, {}).
get(
"description",
""),
148 CONF_FIELDS: services_dict.get(name, {}).
get(
"fields", {}),
153 IOPERATOR_TO_OPERATOR = {
160 "//=": operator.floordiv,
161 "/=": operator.truediv,
162 "<<=": operator.lshift,
163 ">>=": operator.rshift,
164 "@=": operator.matmul,
171 """Implement augmented-assign (+=, -=, etc.) operators for restricted code.
173 See RestrictedPython's `visit_AugAssign` for details.
175 if not isinstance(target, (list, Number, str)):
176 raise ScriptError(f
"The {op!r} operation is not allowed on a {type(target)}")
177 op_fun = IOPERATOR_TO_OPERATOR.get(op)
179 raise ScriptError(f
"The {op!r} operation is not allowed")
180 return op_fun(target, operand)
185 """Execute a script."""
186 filename = f
"{name}.py"
188 with open(hass.config.path(FOLDER, filename), encoding=
"utf8")
as fil:
190 return execute(hass, filename, source, data, return_response=return_response)
194 def execute(hass, filename, source, data=None, return_response=False):
195 """Execute Python source."""
197 compiled = compile_restricted_exec(source, filename=filename)
201 "Error loading script %s: %s", filename,
", ".join(compiled.errors)
205 if compiled.warnings:
207 "Warning loading script %s: %s", filename,
", ".join(compiled.warnings)
210 def protected_getattr(obj, name, default=None):
211 """Restricted method to get attributes."""
212 if name.startswith(
"async_"):
213 raise ScriptError(
"Not allowed to access async methods")
216 and name
not in ALLOWED_HASS
218 and name
not in ALLOWED_EVENTBUS
219 or obj
is hass.states
220 and name
not in ALLOWED_STATEMACHINE
221 or obj
is hass.services
222 and name
not in ALLOWED_SERVICEREGISTRY
224 and name
not in ALLOWED_DT_UTIL
226 and name
not in ALLOWED_DATETIME
227 or isinstance(obj, TimeWrapper)
228 and name
not in ALLOWED_TIME
230 raise ScriptError(f
"Not allowed to access {obj.__class__.__name__}.{name}")
232 return getattr(obj, name, default)
235 "datetime": datetime,
244 "enumerate": enumerate,
246 builtins = safe_builtins.copy()
247 builtins.update(utility_builtins)
248 builtins.update(limited_builtins)
249 builtins.update(extra_builtins)
250 logger = logging.getLogger(f
"{__name__}.{filename}")
251 restricted_globals = {
252 "__builtins__": builtins,
253 "_print_": StubPrinter,
254 "_getattr_": protected_getattr,
255 "_write_": full_write_guard,
257 "_getitem_": default_guarded_getitem,
258 "_iter_unpack_sequence_": guarded_iter_unpack_sequence,
259 "_unpack_sequence_": guarded_unpack_sequence,
260 "_inplacevar_": guarded_inplacevar,
268 _LOGGER.info(
"Executing %s: %s", filename, data)
270 exec(compiled.code, restricted_globals)
272 "Output of python_script: `%s`:\n%s",
274 restricted_globals[
"output"],
277 if not isinstance(restricted_globals[
"output"], dict):
278 output_type = type(restricted_globals[
"output"])
279 restricted_globals[
"output"] = {}
281 f
"Expected `output` to be a dictionary, was {output_type}"
283 except ScriptError
as err:
286 logger.error(
"Error executing script: %s", err)
288 except Exception
as err:
291 f
"Error executing script ({type(err).__name__}): {err}"
293 logger.exception(
"Error executing script")
296 return restricted_globals[
"output"]
300 """Class to handle printing inside scripts."""
303 """Initialize our printer."""
307 _LOGGER.warning(
"Don't use print() inside scripts. Use logger.info() instead")
311 """Wrap the time module."""
317 """Sleep method that warns once."""
318 if not TimeWrapper.warned:
319 TimeWrapper.warned =
True
321 "Using time.sleep can reduce the performance of Home Assistant"
324 time.sleep(*args, **kwargs)
327 """Fetch an attribute from Time module."""
328 attribute = getattr(time, attr)
329 if callable(attribute):
331 def wrapper(*args, **kw):
332 """Wrap to return callable method if callable."""
333 return attribute(*args, **kw)
def __init__(self, _getattr_)
def _call_print(self, *objects, **kwargs)
def sleep(self, *args, **kwargs)
def __getattr__(self, attr)
web.Response get(self, web.Request request, str config_key)
None open(self, **Any kwargs)
None discover_scripts(HomeAssistant hass)
bool setup(HomeAssistant hass, ConfigType config)
Any guarded_inplacevar(str op, Any target, Any operand)
def execute(hass, filename, source, data=None, return_response=False)
def execute_script(hass, name, data=None, return_response=False)
None async_set_service_schema(HomeAssistant hass, str domain, str service, dict[str, Any] schema)
dict load_yaml_dict(str|os.PathLike[str] fname, Secrets|None secrets=None)
None raise_if_invalid_filename(str filename)