1 """Code to handle a Motion Gateway."""
7 from motionblinds
import DEVICE_TYPES_WIFI, AsyncMotionMulticast, MotionGateway
11 from .const
import DEFAULT_INTERFACE
13 _LOGGER = logging.getLogger(__name__)
17 """Construct common name part of a device."""
18 if blind.device_type
in DEVICE_TYPES_WIFI:
19 return blind.blind_type
20 return f
"{blind.blind_type} {blind.mac[12:]}"
24 """Class to async connect to a Motion Gateway."""
26 def __init__(self, hass, multicast=None, interface=None):
27 """Initialize the entity."""
35 """Return the class containing all connections to the gateway."""
39 """Update all information of the gateway."""
42 for blind
in self.
gateway_devicegateway_device.device_list.values():
43 blind.Update_from_cache()
46 """Connect to the Motion Gateway."""
47 _LOGGER.debug(
"Initializing with host %s (key %s)", host, key[:3])
49 ip=host, key=key, multicast=self.
_multicast_multicast
56 "Timeout trying to connect to Motion Gateway with host %s", host
60 "Motion gateway mac: %s, protocol: %s detected",
67 """Check if the current interface supports multicast."""
68 with contextlib.suppress(socket.timeout):
73 """Get list of interface to use."""
74 interfaces = [DEFAULT_INTERFACE,
"0.0.0.0"]
75 enabled_interfaces = []
76 default_interface = DEFAULT_INTERFACE
78 adapters = await network.async_get_adapters(self.
_hass_hass)
79 for adapter
in adapters:
80 if ipv4s := adapter[
"ipv4"]:
81 ip4 = ipv4s[0][
"address"]
82 interfaces.append(ip4)
83 if adapter[
"enabled"]:
84 enabled_interfaces.append(ip4)
85 if adapter[
"default"]:
86 default_interface = ip4
88 if len(enabled_interfaces) == 1:
89 default_interface = enabled_interfaces[0]
90 interfaces.remove(default_interface)
91 interfaces.insert(0, default_interface)
95 interfaces.insert(0, self.
_interface_interface)
100 """Connect to the Motion Gateway."""
102 for interface
in interfaces:
104 "Checking Motionblinds interface '%s' with host %s", interface, host
107 check_multicast = AsyncMotionMulticast(interface=interface)
109 await check_multicast.Start_listen()
110 except socket.gaierror:
117 ip=host, key=key, multicast=check_multicast
123 check_multicast.Stop_listen()
124 except socket.gaierror:
130 "Success using Motionblinds interface '%s' with host %s",
138 "Could not find working interface for Motionblinds host %s, using"
def async_get_interfaces(self)
def async_connect_gateway(self, host, key)
def async_check_interface(self, host, key)
def __init__(self, hass, multicast=None, interface=None)
def check_interface(self)