Home Assistant Unofficial Reference 2024.12.1
ring_buffer.py
Go to the documentation of this file.
1 """Implementation of a ring buffer using bytearray."""
2 
3 
4 class RingBuffer:
5  """Basic ring buffer using a bytearray.
6 
7  Not threadsafe.
8  """
9 
10  def __init__(self, maxlen: int) -> None:
11  """Initialize empty buffer."""
12  self._buffer_buffer = bytearray(maxlen)
13  self._pos_pos = 0
14  self._length_length = 0
15  self._maxlen_maxlen = maxlen
16 
17  @property
18  def maxlen(self) -> int:
19  """Return the maximum size of the buffer."""
20  return self._maxlen_maxlen
21 
22  @property
23  def pos(self) -> int:
24  """Return the current put position."""
25  return self._pos_pos
26 
27  def __len__(self) -> int:
28  """Return the length of data stored in the buffer."""
29  return self._length_length
30 
31  def put(self, data: bytes) -> None:
32  """Put a chunk of data into the buffer, possibly wrapping around."""
33  data_len = len(data)
34  new_pos = self._pos_pos + data_len
35  if new_pos >= self._maxlen_maxlen:
36  # Split into two chunks
37  num_bytes_1 = self._maxlen_maxlen - self._pos_pos
38  num_bytes_2 = new_pos - self._maxlen_maxlen
39 
40  self._buffer_buffer[self._pos_pos : self._maxlen_maxlen] = data[:num_bytes_1]
41  self._buffer_buffer[:num_bytes_2] = data[num_bytes_1:]
42  new_pos = new_pos - self._maxlen_maxlen
43  else:
44  # Entire chunk fits at current position
45  self._buffer_buffer[self._pos_pos : self._pos_pos + data_len] = data
46 
47  self._pos_pos = new_pos
48  self._length_length = min(self._maxlen_maxlen, self._length_length + data_len)
49 
50  def getvalue(self) -> bytes:
51  """Get bytes written to the buffer."""
52  if (self._pos_pos + self._length_length) <= self._maxlen_maxlen:
53  # Single chunk
54  return bytes(self._buffer_buffer[: self._length_length])
55 
56  # Two chunks
57  return bytes(self._buffer_buffer[self._pos_pos :] + self._buffer_buffer[: self._pos_pos])