diff --git a/my/smscalls.py b/my/smscalls.py index dbcf8b23..23fb5ccb 100644 --- a/my/smscalls.py +++ b/my/smscalls.py @@ -20,7 +20,7 @@ class smscalls(user_config): from datetime import datetime, timezone from pathlib import Path -from typing import NamedTuple, Iterator, Set, Tuple, Optional +from typing import NamedTuple, Iterator, Set, Tuple, Optional, Any, Dict, List from lxml import etree @@ -150,6 +150,165 @@ def _extract_messages(path: Path) -> Iterator[Res[Message]]: ) +class MMSContentPart(NamedTuple): + sequence_index: int + content_type: str + filename: str + text: Optional[str] + data: Optional[str] + + +class MMS(NamedTuple): + dt: datetime + dt_readable: str + parts: List[MMSContentPart] + # NOTE: these is often something like 'Name 1, Name 2', but might be different depending on your client + who: Optional[str] + # NOTE: This can be a single phone number, or multiple, split by '~' or ','. Its better to think + # of this as a 'key' or 'conversation ID', phone numbers are also present in 'addresses' + phone_number: str + addresses: List[Tuple[str, int]] + # 1 = Received, 2 = Sent, 3 = Draft, 4 = Outbox + message_type: int + + @property + def from_user(self) -> str: + # since these can be group messages, we can't just check message_type, + # we need to iterate through and find who sent it + # who is CC/'To' is not obvious in many message clients + # + # 129 = BCC, 130 = CC, 151 = To, 137 = From + for (addr, _type) in self.addresses: + if _type == 137: + return addr + else: + # hmm, maybe return instead? but this probably shouldnt happen, means + # something is very broken + raise RuntimeError(f'No from address matching 137 found in {self.addresses}') + + @property + def from_me(self) -> bool: + return self.message_type == 2 + + +def mms() -> Iterator[Res[MMS]]: + files = get_files(config.export_path, glob='sms-*.xml') + + emitted: Set[Tuple[datetime, Optional[str], str]] = set() + for p in files: + for c in _extract_mms(p): + if isinstance(c, Exception): + yield c + continue + key = (c.dt, c.phone_number, c.from_user) + if key in emitted: + continue + emitted.add(key) + yield c + + +def _resolve_null_str(value: Optional[str]) -> Optional[str]: + if value is None: + return None + # hmm.. theres some risk of the text actually being 'null', but theres + # no way to distinguish that from XML values + if value == 'null': + return None + return value + + +def _extract_mms(path: Path) -> Iterator[Res[MMS]]: + tr = etree.parse(str(path)) + + for mxml in tr.findall('mms'): + dt = mxml.get('date') + dt_readable = mxml.get('readable_date') + message_type = mxml.get('msg_box') + + who = mxml.get('contact_name') + if who is not None and who in UNKNOWN: + who = None + phone_number = mxml.get('address') + + if dt is None or dt_readable is None or message_type is None or phone_number is None: + mxml_str = etree.tostring(mxml).decode('utf-8') + yield RuntimeError(f'Missing one or more required attributes [date, readable_date, msg_box, address] in {mxml_str}') + continue + + addresses: List[Tuple[str, int]] = [] + for addr_parent in mxml.findall('addrs'): + for addr in addr_parent.findall('addr'): + addr_data = addr.attrib + user_address = addr_data.get('address') + user_type = addr_data.get('type') + if user_address is None or user_type is None: + addr_str = etree.tostring(addr_parent).decode() + yield RuntimeError(f'Missing one or more required attributes [address, type] in {addr_str}') + continue + if not user_type.isdigit(): + yield RuntimeError(f'Invalid address type {user_type} {type(user_type)}, cannot convert to number') + continue + addresses.append((user_address, int(user_type))) + + content: List[MMSContentPart] = [] + + for part_root in mxml.findall('parts'): + + for part in part_root.findall('part'): + + # the first item is an SMIL XML element encoded as a string which describes + # how the rest of the parts are laid out + # https://www.w3.org/TR/SMIL3/smil-timing.html#Timing-TimeContainerSyntax + # An example: + # + # + # This seems pretty useless, so we should try and skip it, and just return the + # text/images/data + # + # man, attrib is some internal cpython ._Attrib type which can't + # be typed by any sort of mappingproxy. maybe a protocol could work..? + part_data: Dict[str, Any] = part.attrib # type: ignore + seq: Optional[str] = part_data.get('seq') + if seq == '-1': + continue + + if seq is None or not seq.isdigit(): + yield RuntimeError(f'seq must be a number, was seq={seq} {type(seq)} in {part_data}') + continue + + charset_type: Optional[str] = _resolve_null_str(part_data.get('ct')) + filename: Optional[str] = _resolve_null_str(part_data.get('name')) + # in some cases (images, cards), the filename is set in 'cl' instead + if filename is None: + filename = _resolve_null_str(part_data.get('cl')) + text: Optional[str] = _resolve_null_str(part_data.get('text')) + data: Optional[str] = _resolve_null_str(part_data.get('data')) + + if charset_type is None or filename is None or (text is None and data is None): + yield RuntimeError(f'Missing one or more required attributes [ct, name, (text, data)] must be present in {part_data}') + continue + + content.append( + MMSContentPart( + sequence_index=int(seq), + content_type=charset_type, + filename=filename, + text=text, + data=data + ) + ) + + yield MMS( + dt=_parse_dt_ms(dt), + dt_readable=dt_readable, + who=who, + phone_number=phone_number, + message_type=int(message_type), + parts=content, + addresses=addresses, + ) + + # See https://github.com/karlicoss/HPI/pull/90#issuecomment-702422351 # for potentially parsing timezone from the readable_date def _parse_dt_ms(d: str) -> datetime: @@ -162,4 +321,5 @@ def stats() -> Stats: return { **stat(calls), **stat(messages), + **stat(mms), } diff --git a/tests/smscalls.py b/tests/smscalls.py index d063de13..ef787861 100644 --- a/tests/smscalls.py +++ b/tests/smscalls.py @@ -4,6 +4,7 @@ # TODO implement via stat? def test() -> None: - from my.smscalls import calls, messages + from my.smscalls import calls, messages, mms assert len(list(calls())) > 10 assert len(list(messages())) > 10 + assert len(list(mms())) > 10