Home Assistant Unofficial Reference 2024.12.1
notify.py
Go to the documentation of this file.
1 """Twitter platform for notify component."""
2 
3 from __future__ import annotations
4 
5 from datetime import datetime, timedelta
6 from functools import partial
7 from http import HTTPStatus
8 import json
9 import logging
10 import mimetypes
11 import os
12 
13 from TwitterAPI import TwitterAPI
14 import voluptuous as vol
15 
17  ATTR_DATA,
18  ATTR_TARGET,
19  PLATFORM_SCHEMA as NOTIFY_PLATFORM_SCHEMA,
20  BaseNotificationService,
21 )
22 from homeassistant.const import CONF_ACCESS_TOKEN, CONF_USERNAME
23 from homeassistant.core import HomeAssistant
25 from homeassistant.helpers.event import async_track_point_in_time
26 from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
27 
28 _LOGGER = logging.getLogger(__name__)
29 
30 CONF_CONSUMER_KEY = "consumer_key"
31 CONF_CONSUMER_SECRET = "consumer_secret"
32 CONF_ACCESS_TOKEN_SECRET = "access_token_secret"
33 
34 ATTR_MEDIA = "media"
35 
36 PLATFORM_SCHEMA = NOTIFY_PLATFORM_SCHEMA.extend(
37  {
38  vol.Required(CONF_ACCESS_TOKEN): cv.string,
39  vol.Required(CONF_ACCESS_TOKEN_SECRET): cv.string,
40  vol.Required(CONF_CONSUMER_KEY): cv.string,
41  vol.Required(CONF_CONSUMER_SECRET): cv.string,
42  vol.Optional(CONF_USERNAME): cv.string,
43  }
44 )
45 
46 
48  hass: HomeAssistant,
49  config: ConfigType,
50  discovery_info: DiscoveryInfoType | None = None,
51 ) -> TwitterNotificationService:
52  """Get the Twitter notification service."""
54  hass,
55  config[CONF_CONSUMER_KEY],
56  config[CONF_CONSUMER_SECRET],
57  config[CONF_ACCESS_TOKEN],
58  config[CONF_ACCESS_TOKEN_SECRET],
59  config.get(CONF_USERNAME),
60  )
61 
62 
63 class TwitterNotificationService(BaseNotificationService):
64  """Implementation of a notification service for the Twitter service."""
65 
66  def __init__(
67  self,
68  hass,
69  consumer_key,
70  consumer_secret,
71  access_token_key,
72  access_token_secret,
73  username,
74  ):
75  """Initialize the service."""
76  self.default_userdefault_user = username
77  self.hasshass = hass
78  self.apiapi = TwitterAPI(
79  consumer_key, consumer_secret, access_token_key, access_token_secret
80  )
81 
82  def send_message(self, message="", **kwargs):
83  """Tweet a message, optionally with media."""
84  data = kwargs.get(ATTR_DATA)
85  targets = kwargs.get(ATTR_TARGET)
86 
87  media = None
88  if data:
89  media = data.get(ATTR_MEDIA)
90  if not self.hasshass.config.is_allowed_path(media):
91  _LOGGER.warning("'%s' is not a whitelisted directory", media)
92  return
93 
94  if targets:
95  for target in targets:
96  callback = partial(self.send_message_callbacksend_message_callback, message, target)
97  self.upload_media_then_callbackupload_media_then_callback(callback, media)
98  else:
99  callback = partial(self.send_message_callbacksend_message_callback, message, self.default_userdefault_user)
100  self.upload_media_then_callbackupload_media_then_callback(callback, media)
101 
102  def send_message_callback(self, message, user, media_id=None):
103  """Tweet a message, optionally with media."""
104  if user:
105  user_resp = self.apiapi.request("users/lookup", {"screen_name": user})
106  user_id = user_resp.json()[0]["id"]
107  if user_resp.status_code != HTTPStatus.OK:
108  self.log_error_resplog_error_resp(user_resp)
109  else:
110  _LOGGER.debug("Message posted: %s", user_resp.json())
111 
112  event = {
113  "event": {
114  "type": "message_create",
115  "message_create": {
116  "target": {"recipient_id": user_id},
117  "message_data": {"text": message},
118  },
119  }
120  }
121  resp = self.apiapi.request("direct_messages/events/new", json.dumps(event))
122  else:
123  resp = self.apiapi.request(
124  "statuses/update", {"status": message, "media_ids": media_id}
125  )
126 
127  if resp.status_code != HTTPStatus.OK:
128  self.log_error_resplog_error_resp(resp)
129  else:
130  _LOGGER.debug("Message posted: %s", resp.json())
131 
132  def upload_media_then_callback(self, callback, media_path=None) -> None:
133  """Upload media."""
134  if not media_path:
135  callback()
136  return
137 
138  with open(media_path, "rb") as file:
139  total_bytes = os.path.getsize(media_path)
140  (media_category, media_type) = self.media_infomedia_info(media_path)
141  resp = self.upload_media_initupload_media_init(media_type, media_category, total_bytes)
142 
143  if 199 > resp.status_code < 300:
144  self.log_error_resplog_error_resp(resp)
145  return
146 
147  media_id = resp.json()["media_id"]
148  media_id = self.upload_media_chunkedupload_media_chunked(file, total_bytes, media_id)
149 
150  resp = self.upload_media_finalizeupload_media_finalize(media_id)
151  if 199 > resp.status_code < 300:
152  self.log_error_resplog_error_resp(resp)
153  return
154 
155  if resp.json().get("processing_info") is None:
156  callback(media_id)
157  return
158 
159  self.check_status_until_donecheck_status_until_done(media_id, callback)
160 
161  def media_info(self, media_path):
162  """Determine mime type and Twitter media category for given media."""
163  (media_type, _) = mimetypes.guess_type(media_path)
164  media_category = self.media_category_for_typemedia_category_for_type(media_type)
165  _LOGGER.debug(
166  "media %s is mime type %s and translates to %s",
167  media_path,
168  media_type,
169  media_category,
170  )
171  return media_category, media_type
172 
173  def upload_media_init(self, media_type, media_category, total_bytes):
174  """Upload media, INIT phase."""
175  return self.apiapi.request(
176  "media/upload",
177  {
178  "command": "INIT",
179  "media_type": media_type,
180  "media_category": media_category,
181  "total_bytes": total_bytes,
182  },
183  )
184 
185  def upload_media_chunked(self, file, total_bytes, media_id):
186  """Upload media, chunked append."""
187  segment_id = 0
188  bytes_sent = 0
189  while bytes_sent < total_bytes:
190  chunk = file.read(4 * 1024 * 1024)
191  resp = self.upload_media_appendupload_media_append(chunk, media_id, segment_id)
192  if not HTTPStatus.OK <= resp.status_code < HTTPStatus.MULTIPLE_CHOICES:
193  self.log_error_resp_appendlog_error_resp_append(resp)
194  return None
195  segment_id = segment_id + 1
196  bytes_sent = file.tell()
197  self.log_bytes_sentlog_bytes_sent(bytes_sent, total_bytes)
198  return media_id
199 
200  def upload_media_append(self, chunk, media_id, segment_id):
201  """Upload media, APPEND phase."""
202  return self.apiapi.request(
203  "media/upload",
204  {"command": "APPEND", "media_id": media_id, "segment_index": segment_id},
205  {"media": chunk},
206  )
207 
208  def upload_media_finalize(self, media_id):
209  """Upload media, FINALIZE phase."""
210  return self.apiapi.request(
211  "media/upload", {"command": "FINALIZE", "media_id": media_id}
212  )
213 
214  def check_status_until_done(self, media_id, callback, *args) -> None:
215  """Upload media, STATUS phase."""
216  resp = self.apiapi.request(
217  "media/upload",
218  {"command": "STATUS", "media_id": media_id},
219  method_override="GET",
220  )
221  if resp.status_code != HTTPStatus.OK:
222  _LOGGER.error("Media processing error: %s", resp.json())
223  processing_info = resp.json()["processing_info"]
224 
225  _LOGGER.debug("media processing %s status: %s", media_id, processing_info)
226 
227  if processing_info["state"] in {"succeeded", "failed"}:
228  callback(media_id)
229  return
230 
231  check_after_secs = processing_info["check_after_secs"]
232  _LOGGER.debug(
233  "media processing waiting %s seconds to check status", str(check_after_secs)
234  )
235 
236  when = datetime.now() + timedelta(seconds=check_after_secs)
237  myself = partial(self.check_status_until_donecheck_status_until_done, media_id, callback)
238  async_track_point_in_time(self.hasshass, myself, when)
239 
240  @staticmethod
241  def media_category_for_type(media_type):
242  """Determine Twitter media category by mime type."""
243  if media_type is None:
244  return None
245 
246  if media_type.startswith("image/gif"):
247  return "tweet_gif"
248  if media_type.startswith("video/"):
249  return "tweet_video"
250  if media_type.startswith("image/"):
251  return "tweet_image"
252 
253  return None
254 
255  @staticmethod
256  def log_bytes_sent(bytes_sent, total_bytes):
257  """Log upload progress."""
258  _LOGGER.debug("%s of %s bytes uploaded", str(bytes_sent), str(total_bytes))
259 
260  @staticmethod
261  def log_error_resp(resp):
262  """Log error response."""
263  obj = json.loads(resp.text)
264  if "errors" in obj:
265  error_message = obj["errors"]
266  elif "error" in obj:
267  error_message = obj["error"]
268  else:
269  error_message = resp.text
270  _LOGGER.error("Error %s: %s", resp.status_code, error_message)
271 
272  @staticmethod
274  """Log error response, during upload append phase."""
275  obj = json.loads(resp.text)
276  error_message = obj["errors"][0]["message"]
277  error_code = obj["errors"][0]["code"]
278  _LOGGER.error(
279  "Error %s: %s (Code %s)", resp.status_code, error_message, error_code
280  )
def upload_media_append(self, chunk, media_id, segment_id)
Definition: notify.py:200
def __init__(self, hass, consumer_key, consumer_secret, access_token_key, access_token_secret, username)
Definition: notify.py:74
def send_message_callback(self, message, user, media_id=None)
Definition: notify.py:102
None check_status_until_done(self, media_id, callback, *args)
Definition: notify.py:214
def upload_media_init(self, media_type, media_category, total_bytes)
Definition: notify.py:173
None upload_media_then_callback(self, callback, media_path=None)
Definition: notify.py:132
def upload_media_chunked(self, file, total_bytes, media_id)
Definition: notify.py:185
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
None open(self, **Any kwargs)
Definition: lock.py:86
TwitterNotificationService get_service(HomeAssistant hass, ConfigType config, DiscoveryInfoType|None discovery_info=None)
Definition: notify.py:51
CALLBACK_TYPE async_track_point_in_time(HomeAssistant hass, HassJob[[datetime], Coroutine[Any, Any, None]|None]|Callable[[datetime], Coroutine[Any, Any, None]|None] action, datetime point_in_time)
Definition: event.py:1462