1 """Twitter platform for notify component."""
3 from __future__
import annotations
5 from datetime
import datetime, timedelta
6 from functools
import partial
7 from http
import HTTPStatus
13 from TwitterAPI
import TwitterAPI
14 import voluptuous
as vol
19 PLATFORM_SCHEMA
as NOTIFY_PLATFORM_SCHEMA,
20 BaseNotificationService,
28 _LOGGER = logging.getLogger(__name__)
30 CONF_CONSUMER_KEY =
"consumer_key"
31 CONF_CONSUMER_SECRET =
"consumer_secret"
32 CONF_ACCESS_TOKEN_SECRET =
"access_token_secret"
36 PLATFORM_SCHEMA = NOTIFY_PLATFORM_SCHEMA.extend(
38 vol.Required(CONF_ACCESS_TOKEN): cv.string,
39 vol.Required(CONF_ACCESS_TOKEN_SECRET): cv.string,
40 vol.Required(CONF_CONSUMER_KEY): cv.string,
41 vol.Required(CONF_CONSUMER_SECRET): cv.string,
42 vol.Optional(CONF_USERNAME): cv.string,
50 discovery_info: DiscoveryInfoType |
None =
None,
51 ) -> TwitterNotificationService:
52 """Get the Twitter notification service."""
55 config[CONF_CONSUMER_KEY],
56 config[CONF_CONSUMER_SECRET],
57 config[CONF_ACCESS_TOKEN],
58 config[CONF_ACCESS_TOKEN_SECRET],
59 config.get(CONF_USERNAME),
64 """Implementation of a notification service for the Twitter service."""
75 """Initialize the service."""
78 self.
apiapi = TwitterAPI(
79 consumer_key, consumer_secret, access_token_key, access_token_secret
83 """Tweet a message, optionally with media."""
84 data = kwargs.get(ATTR_DATA)
85 targets = kwargs.get(ATTR_TARGET)
89 media = data.get(ATTR_MEDIA)
90 if not self.
hasshass.config.is_allowed_path(media):
91 _LOGGER.warning(
"'%s' is not a whitelisted directory", media)
95 for target
in targets:
103 """Tweet a message, optionally with media."""
105 user_resp = self.
apiapi.request(
"users/lookup", {
"screen_name": user})
106 user_id = user_resp.json()[0][
"id"]
107 if user_resp.status_code != HTTPStatus.OK:
110 _LOGGER.debug(
"Message posted: %s", user_resp.json())
114 "type":
"message_create",
116 "target": {
"recipient_id": user_id},
117 "message_data": {
"text": message},
121 resp = self.
apiapi.request(
"direct_messages/events/new", json.dumps(event))
123 resp = self.
apiapi.request(
124 "statuses/update", {
"status": message,
"media_ids": media_id}
127 if resp.status_code != HTTPStatus.OK:
130 _LOGGER.debug(
"Message posted: %s", resp.json())
138 with open(media_path,
"rb")
as file:
139 total_bytes = os.path.getsize(media_path)
140 (media_category, media_type) = self.
media_infomedia_info(media_path)
141 resp = self.
upload_media_initupload_media_init(media_type, media_category, total_bytes)
143 if 199 > resp.status_code < 300:
147 media_id = resp.json()[
"media_id"]
151 if 199 > resp.status_code < 300:
155 if resp.json().
get(
"processing_info")
is None:
162 """Determine mime type and Twitter media category for given media."""
163 (media_type, _) = mimetypes.guess_type(media_path)
166 "media %s is mime type %s and translates to %s",
171 return media_category, media_type
174 """Upload media, INIT phase."""
175 return self.
apiapi.request(
179 "media_type": media_type,
180 "media_category": media_category,
181 "total_bytes": total_bytes,
186 """Upload media, chunked append."""
189 while bytes_sent < total_bytes:
190 chunk = file.read(4 * 1024 * 1024)
192 if not HTTPStatus.OK <= resp.status_code < HTTPStatus.MULTIPLE_CHOICES:
195 segment_id = segment_id + 1
196 bytes_sent = file.tell()
201 """Upload media, APPEND phase."""
202 return self.
apiapi.request(
204 {
"command":
"APPEND",
"media_id": media_id,
"segment_index": segment_id},
209 """Upload media, FINALIZE phase."""
210 return self.
apiapi.request(
211 "media/upload", {
"command":
"FINALIZE",
"media_id": media_id}
215 """Upload media, STATUS phase."""
216 resp = self.
apiapi.request(
218 {
"command":
"STATUS",
"media_id": media_id},
219 method_override=
"GET",
221 if resp.status_code != HTTPStatus.OK:
222 _LOGGER.error(
"Media processing error: %s", resp.json())
223 processing_info = resp.json()[
"processing_info"]
225 _LOGGER.debug(
"media processing %s status: %s", media_id, processing_info)
227 if processing_info[
"state"]
in {
"succeeded",
"failed"}:
231 check_after_secs = processing_info[
"check_after_secs"]
233 "media processing waiting %s seconds to check status",
str(check_after_secs)
236 when = datetime.now() +
timedelta(seconds=check_after_secs)
242 """Determine Twitter media category by mime type."""
243 if media_type
is None:
246 if media_type.startswith(
"image/gif"):
248 if media_type.startswith(
"video/"):
250 if media_type.startswith(
"image/"):
257 """Log upload progress."""
258 _LOGGER.debug(
"%s of %s bytes uploaded",
str(bytes_sent),
str(total_bytes))
262 """Log error response."""
263 obj = json.loads(resp.text)
265 error_message = obj[
"errors"]
267 error_message = obj[
"error"]
269 error_message = resp.text
270 _LOGGER.error(
"Error %s: %s", resp.status_code, error_message)
274 """Log error response, during upload append phase."""
275 obj = json.loads(resp.text)
276 error_message = obj[
"errors"][0][
"message"]
277 error_code = obj[
"errors"][0][
"code"]
279 "Error %s: %s (Code %s)", resp.status_code, error_message, error_code
web.Response get(self, web.Request request, str config_key)
None open(self, **Any kwargs)
CALLBACK_TYPE async_track_point_in_time(HomeAssistant hass, HassJob[[datetime], Coroutine[Any, Any, None]|None]|Callable[[datetime], Coroutine[Any, Any, None]|None] action, datetime point_in_time)