1 """Support for Telegram bot using polling."""
5 from telegram
import Update
6 from telegram.error
import NetworkError, RetryAfter, TelegramError, TimedOut
7 from telegram.ext
import ApplicationBuilder, CallbackContext, TypeHandler
11 from .
import BaseTelegramBotEntity
13 _LOGGER = logging.getLogger(__name__)
17 """Set up the Telegram polling platform."""
18 pollbot =
PollBot(hass, bot, config)
20 hass.bus.async_listen_once(EVENT_HOMEASSISTANT_START, pollbot.start_polling)
21 hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, pollbot.stop_polling)
26 async
def process_error(update: Update, context: CallbackContext) ->
None:
27 """Telegram bot error handler."""
36 except (TimedOut, NetworkError, RetryAfter):
40 if update
is not None:
41 _LOGGER.error(
'Update "%s" caused error: "%s"', update, error)
43 _LOGGER.error(
"%s: %s", error.__class__.__name__, error)
47 """Controls the Application object that holds the bot and an updater.
49 The application is set up to pass telegram updates to `self.handle_update`
53 """Create Application to poll for updates."""
57 self.
applicationapplication.add_handler(TypeHandler(Update, self.handle_update))
58 self.
applicationapplication.add_error_handler(process_error)
61 """Start the polling task."""
62 _LOGGER.debug(
"Starting polling")
64 await self.
applicationapplication.updater.start_polling(error_callback=error_callback)
68 """Stop the polling task."""
69 _LOGGER.debug(
"Stopping polling")
def __init__(self, hass, bot, config)
def stop_polling(self, event=None)
def start_polling(self, event=None)
None process_error(Update update, CallbackContext context)
None error_callback(Exception error, Update|None update=None)
def async_setup_platform(hass, bot, config)