diff --git a/Cargo.lock b/Cargo.lock index dabb11f8c..7a9c56f3c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7569,6 +7569,7 @@ dependencies = [ "futures", "paranoid-android", "parking_lot 0.12.3", + "prost", "rand", "thiserror 2.0.6", "tokio", @@ -7579,6 +7580,7 @@ dependencies = [ "uuid 1.11.0", "xmtp_api_grpc", "xmtp_common", + "xmtp_content_types", "xmtp_cryptography", "xmtp_id", "xmtp_mls", diff --git a/bindings_ffi/Cargo.toml b/bindings_ffi/Cargo.toml index 5831c3d7d..fc33b5e52 100644 --- a/bindings_ffi/Cargo.toml +++ b/bindings_ffi/Cargo.toml @@ -12,10 +12,12 @@ futures.workspace = true tracing.workspace = true tracing-subscriber = { workspace = true, features = ["registry", "env-filter", "fmt", "json"] } parking_lot.workspace = true +prost.workspace = true thiserror.workspace = true tokio = { workspace = true, features = ["macros"] } uniffi = { version = "0.28.0", default-features = false, features = ["tokio"] } xmtp_api_grpc = { path = "../xmtp_api_grpc" } +xmtp_content_types = { path = "../xmtp_content_types" } xmtp_cryptography = { path = "../xmtp_cryptography" } xmtp_id = { path = "../xmtp_id" } xmtp_mls = { path = "../xmtp_mls" } diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index 1bcfd7699..fc43d7fdc 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -19,8 +19,8 @@ use xmtp_mls::groups::device_sync::preference_sync::UserPreferenceUpdate; use xmtp_mls::groups::scoped_client::LocalScopedGroupClient; use xmtp_mls::groups::HmacKey; use xmtp_mls::storage::group::ConversationType; -use xmtp_mls::storage::group_message::MsgQueryArgs; use xmtp_mls::storage::group_message::SortDirection; +use xmtp_mls::storage::group_message::{ContentType, MsgQueryArgs}; use xmtp_mls::{ api::ApiClientWrapper, builder::ClientBuilder, @@ -1290,6 +1290,38 @@ pub struct FfiListMessagesOptions { pub limit: Option, pub delivery_status: Option, pub direction: Option, + pub content_types: Option>, +} + +#[derive(uniffi::Enum, Clone)] +pub enum FfiContentType { + Unknown, + Text, + GroupMembershipChange, + GroupUpdated, + Reaction, + ReadReceipt, + Reply, + Attachment, + RemoteAttachment, + TransactionReference, +} + +impl From for ContentType { + fn from(value: FfiContentType) -> Self { + match value { + FfiContentType::Unknown => ContentType::Unknown, + FfiContentType::Text => ContentType::Text, + FfiContentType::GroupMembershipChange => ContentType::GroupMembershipChange, + FfiContentType::GroupUpdated => ContentType::GroupUpdated, + FfiContentType::Reaction => ContentType::Reaction, + FfiContentType::ReadReceipt => ContentType::ReadReceipt, + FfiContentType::Reply => ContentType::Reply, + FfiContentType::Attachment => ContentType::Attachment, + FfiContentType::RemoteAttachment => ContentType::RemoteAttachment, + FfiContentType::TransactionReference => ContentType::TransactionReference, + } + } } #[derive(uniffi::Record, Clone, Default)] @@ -1362,6 +1394,9 @@ impl FfiConversation { kind, delivery_status, direction, + content_types: opts + .content_types + .map(|types| types.into_iter().map(Into::into).collect()), ..Default::default() })? .into_iter() @@ -1908,19 +1943,25 @@ mod tests { }; use crate::{ connect_to_backend, get_inbox_id_for_address, inbox_owner::SigningError, FfiConsent, - FfiConsentEntityType, FfiConsentState, FfiConversation, FfiConversationCallback, - FfiConversationMessageKind, FfiCreateGroupOptions, FfiGroupPermissionsOptions, - FfiInboxOwner, FfiListConversationsOptions, FfiListMessagesOptions, FfiMetadataField, - FfiPermissionPolicy, FfiPermissionPolicySet, FfiPermissionUpdateType, FfiSubscribeError, + FfiConsentEntityType, FfiConsentState, FfiContentType, FfiConversation, + FfiConversationCallback, FfiConversationMessageKind, FfiCreateGroupOptions, FfiDirection, + FfiGroupPermissionsOptions, FfiInboxOwner, FfiListConversationsOptions, + FfiListMessagesOptions, FfiMetadataField, FfiPermissionPolicy, FfiPermissionPolicySet, + FfiPermissionUpdateType, FfiSubscribeError, }; use ethers::utils::hex; - use std::sync::{ - atomic::{AtomicU32, Ordering}, - Arc, Mutex, + use prost::Message; + use std::{ + collections::HashMap, + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, Mutex, + }, }; use tokio::{sync::Notify, time::error::Elapsed}; use xmtp_common::tmp_path; use xmtp_common::{wait_for_eq, wait_for_ok}; + use xmtp_content_types::{read_receipt, text::TextCodec, ContentCodec}; use xmtp_cryptography::{signature::RecoverableSignature, utils::rng}; use xmtp_id::associations::{ generate_inbox_id, @@ -1931,6 +1972,7 @@ mod tests { storage::EncryptionKey, InboxOwner, }; + use xmtp_proto::xmtp::mls::message_contents::{ContentTypeId, EncodedContent}; const HISTORY_SYNC_URL: &str = "http://localhost:5558"; @@ -5027,4 +5069,95 @@ mod tests { Ok(_) => panic!("Expected an error, but got Ok"), } } + + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] + async fn test_can_list_messages_with_content_types() { + // Create test clients + let alix = new_test_client().await; + let bo = new_test_client().await; + + // Alix creates group with Bo + let alix_group = alix + .conversations() + .create_group( + vec![bo.account_address.clone()], + FfiCreateGroupOptions::default(), + ) + .await + .unwrap(); + + // Bo syncs to get the group + bo.conversations().sync().await.unwrap(); + let bo_group = bo.conversation(alix_group.id()).unwrap(); + + // Alix sends first message + alix_group.send("hey".as_bytes().to_vec()).await.unwrap(); + + // Bo syncs and responds + bo_group.sync().await.unwrap(); + let bo_message_response = TextCodec::encode("hey alix".to_string()).unwrap(); + let mut buf = Vec::new(); + bo_message_response.encode(&mut buf).unwrap(); + bo_group.send(buf).await.unwrap(); + + // Bo sends read receipt + let read_receipt_content_id = ContentTypeId { + authority_id: "xmtp.org".to_string(), + type_id: read_receipt::ReadReceiptCodec::TYPE_ID.to_string(), + version_major: 1, + version_minor: 0, + }; + let read_receipt_encoded_content = EncodedContent { + r#type: Some(read_receipt_content_id), + parameters: HashMap::new(), + fallback: None, + compression: None, + content: vec![], + }; + + let mut buf = Vec::new(); + read_receipt_encoded_content.encode(&mut buf).unwrap(); + bo_group.send(buf).await.unwrap(); + + // Alix syncs and gets all messages + alix_group.sync().await.unwrap(); + let latest_message = alix_group + // ... existing code ... + .find_messages(FfiListMessagesOptions { + direction: Some(FfiDirection::Descending), + limit: Some(1), + ..Default::default() + }) + .await + .unwrap(); + + // Verify last message is the read receipt + assert_eq!(latest_message.len(), 1); + let latest_message_encoded_content = + EncodedContent::decode(latest_message.last().unwrap().content.clone().as_slice()) + .unwrap(); + assert_eq!( + latest_message_encoded_content.r#type.unwrap().type_id, + "readReceipt" + ); + + // Get only text messages + let text_messages = alix_group + .find_messages(FfiListMessagesOptions { + content_types: Some(vec![FfiContentType::Text]), + direction: Some(FfiDirection::Descending), + limit: Some(1), + ..Default::default() + }) + .await + .unwrap(); + + // Verify last message is "hey alix" when filtered + assert_eq!(text_messages.len(), 1); + let latest_message_encoded_content = + EncodedContent::decode(text_messages.last().unwrap().content.clone().as_slice()) + .unwrap(); + let text_message = TextCodec::decode(latest_message_encoded_content).unwrap(); + assert_eq!(text_message, "hey alix"); + } } diff --git a/xmtp_mls/src/groups/mls_sync.rs b/xmtp_mls/src/groups/mls_sync.rs index f9bcba57a..522969392 100644 --- a/xmtp_mls/src/groups/mls_sync.rs +++ b/xmtp_mls/src/groups/mls_sync.rs @@ -1466,8 +1466,7 @@ where let mut result = vec![]; for payload in payloads { let mut sender_hmac = sender_hmac.clone(); - // When we switch to V2, update with the header bytes. - sender_hmac.update(&[]); + sender_hmac.update(payload); let sender_hmac = sender_hmac.finalize(); result.push(GroupMessageInput { diff --git a/xmtp_mls/src/storage/encrypted_store/mod.rs b/xmtp_mls/src/storage/encrypted_store/mod.rs index b50f09650..fed9e31b2 100644 --- a/xmtp_mls/src/storage/encrypted_store/mod.rs +++ b/xmtp_mls/src/storage/encrypted_store/mod.rs @@ -591,7 +591,7 @@ pub(crate) mod tests { EncryptedMessageStore::remove_db_files(db_path) } - #[tokio::test] + #[wasm_bindgen_test::wasm_bindgen_test(unsupported = tokio::test)] async fn test_dm_id_migration() { let db_path = tmp_path(); let opts = StorageOption::Persistent(db_path.clone());