diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index a5fdd59f1..22af16770 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -958,23 +958,10 @@ impl FfiConversations { pub async fn list( &self, opts: FfiListConversationsOptions, - ) -> Result>, GenericError> { - let inner = self.inner_client.as_ref(); - let convo_list: Vec> = inner - .find_groups(opts.into())? - .into_iter() - .map(|group| Arc::new(group.into())) - .collect(); - - Ok(convo_list) - } - - pub async fn list_conversations( - &self, ) -> Result>, GenericError> { let inner = self.inner_client.as_ref(); let convo_list: Vec> = inner - .list_conversations()? + .list_conversations(opts.into())? .into_iter() .map(|conversation_item| { Arc::new(FfiConversationListItem { @@ -992,12 +979,21 @@ impl FfiConversations { pub async fn list_groups( &self, opts: FfiListConversationsOptions, - ) -> Result>, GenericError> { + ) -> Result>, GenericError> { let inner = self.inner_client.as_ref(); - let convo_list: Vec> = inner - .find_groups(GroupQueryArgs::from(opts).conversation_type(ConversationType::Group))? + let convo_list: Vec> = inner + .list_conversations( + GroupQueryArgs::from(opts).conversation_type(ConversationType::Group), + )? .into_iter() - .map(|group| Arc::new(group.into())) + .map(|conversation_item| { + Arc::new(FfiConversationListItem { + conversation: conversation_item.group.into(), + last_message: conversation_item + .last_message + .map(|stored_message| stored_message.into()), + }) + }) .collect(); Ok(convo_list) @@ -1006,12 +1002,19 @@ impl FfiConversations { pub async fn list_dms( &self, opts: FfiListConversationsOptions, - ) -> Result>, GenericError> { + ) -> Result>, GenericError> { let inner = self.inner_client.as_ref(); - let convo_list: Vec> = inner - .find_groups(GroupQueryArgs::from(opts).conversation_type(ConversationType::Dm))? + let convo_list: Vec> = inner + .list_conversations(GroupQueryArgs::from(opts).conversation_type(ConversationType::Dm))? .into_iter() - .map(|group| Arc::new(group.into())) + .map(|conversation_item| { + Arc::new(FfiConversationListItem { + conversation: conversation_item.group.into(), + last_message: conversation_item + .last_message + .map(|stored_message| stored_message.into()), + }) + }) .collect(); Ok(convo_list) @@ -2718,13 +2721,14 @@ mod tests { .await .unwrap(); let bo_group = &bo_groups[0]; - bo_group.sync().await.unwrap(); + bo_group.conversation.sync().await.unwrap(); // alix published + processed group creation and name update assert_eq!(alix_provider.conn_ref().intents_published(), 2); assert_eq!(alix_provider.conn_ref().intents_deleted(), 2); bo_group + .conversation .update_group_name("Old Name2".to_string()) .await .unwrap(); @@ -2746,6 +2750,7 @@ mod tests { // Uncomment the following lines to add more group name updates bo_group + .conversation .update_group_name("Old Name3".to_string()) .await .unwrap(); @@ -2795,7 +2800,10 @@ mod tests { .unwrap(); // Step 4: List conversations and verify - let conversations = alix_conversations.list_conversations().await.unwrap(); + let conversations = alix_conversations + .list(FfiListConversationsOptions::default()) + .await + .unwrap(); // Ensure the group is included assert_eq!(conversations.len(), 1, "Alix should have exactly 1 group"); @@ -2832,7 +2840,10 @@ mod tests { .unwrap(); // Step 4: List conversations and verify - let conversations = alix_conversations.list_conversations().await.unwrap(); + let conversations = alix_conversations + .list(FfiListConversationsOptions::default()) + .await + .unwrap(); // Ensure the group is included assert_eq!(conversations.len(), 1, "Alix should have exactly 1 group"); @@ -2881,7 +2892,10 @@ mod tests { .unwrap(); // Step 7: Fetch the conversation list - let conversations = conversations_api.list_conversations().await.unwrap(); + let conversations = conversations_api + .list(FfiListConversationsOptions::default()) + .await + .unwrap(); // Step 8: Assert the correct order of conversations assert_eq!( @@ -2951,11 +2965,19 @@ mod tests { let alix_group1 = alix_groups[0].clone(); let alix_group5 = alix_groups[5].clone(); - let bo_group1 = bo.conversation(alix_group1.id()).unwrap(); - let bo_group5 = bo.conversation(alix_group5.id()).unwrap(); + let bo_group1 = bo.conversation(alix_group1.conversation.id()).unwrap(); + let bo_group5 = bo.conversation(alix_group5.conversation.id()).unwrap(); - alix_group1.send("alix1".as_bytes().to_vec()).await.unwrap(); - alix_group5.send("alix1".as_bytes().to_vec()).await.unwrap(); + alix_group1 + .conversation + .send("alix1".as_bytes().to_vec()) + .await + .unwrap(); + alix_group5 + .conversation + .send("alix1".as_bytes().to_vec()) + .await + .unwrap(); let bo_messages1 = bo_group1 .find_messages(FfiListMessagesOptions::default()) @@ -3016,6 +3038,7 @@ mod tests { .unwrap() { group + .conversation .remove_members(vec![bo.account_address.clone()]) .await .unwrap(); @@ -3542,17 +3565,26 @@ mod tests { .unwrap(); assert_eq!(bo_groups.len(), 1); let bo_group = bo_groups[0].clone(); - bo_group.sync().await.unwrap(); + bo_group.conversation.sync().await.unwrap(); let bo_messages1 = bo_group + .conversation .find_messages(FfiListMessagesOptions::default()) .await .unwrap(); assert_eq!(bo_messages1.len(), first_msg_check); - bo_group.send("hello2".as_bytes().to_vec()).await.unwrap(); + bo_group + .conversation + .send("hello2".as_bytes().to_vec()) + .await + .unwrap(); message_callbacks.wait_for_delivery(None).await.unwrap(); - bo_group.send("hello3".as_bytes().to_vec()).await.unwrap(); + bo_group + .conversation + .send("hello3".as_bytes().to_vec()) + .await + .unwrap(); message_callbacks.wait_for_delivery(None).await.unwrap(); alix_group.sync().await.unwrap(); @@ -3565,9 +3597,10 @@ mod tests { alix_group.send("hello4".as_bytes().to_vec()).await.unwrap(); message_callbacks.wait_for_delivery(None).await.unwrap(); - bo_group.sync().await.unwrap(); + bo_group.conversation.sync().await.unwrap(); let bo_messages2 = bo_group + .conversation .find_messages(FfiListMessagesOptions::default()) .await .unwrap(); @@ -3803,7 +3836,7 @@ mod tests { let bola_group = bola_groups.first().unwrap(); // Check Bola's group for the added_by_inbox_id of the inviter - let added_by_inbox_id = bola_group.added_by_inbox_id().unwrap(); + let added_by_inbox_id = bola_group.conversation.added_by_inbox_id().unwrap(); // // Verify the welcome host_credential is equal to Amal's assert_eq!( @@ -3986,24 +4019,26 @@ mod tests { let bola_group = bola_groups.first().unwrap(); bola_group + .conversation .update_group_name("new_name".to_string()) .await .unwrap_err(); // Verify that bo CAN update the image url bola_group + .conversation .update_group_image_url_square("https://example.com/image.png".to_string()) .await .unwrap(); // Verify we can read the correct values from the group - bola_group.sync().await.unwrap(); + bola_group.conversation.sync().await.unwrap(); alix_group.sync().await.unwrap(); assert_eq!( - bola_group.group_image_url_square().unwrap(), + bola_group.conversation.group_image_url_square().unwrap(), "https://example.com/image.png" ); - assert_eq!(bola_group.group_name().unwrap(), ""); + assert_eq!(bola_group.conversation.group_name().unwrap(), ""); assert_eq!( alix_group.group_image_url_square().unwrap(), "https://example.com/image.png" @@ -4091,10 +4126,12 @@ mod tests { let bola_group = bola_groups.first().unwrap(); bola_group + .conversation .update_group_name("new_name".to_string()) .await .unwrap_err(); let result = bola_group + .conversation .update_group_name("New Group Name".to_string()) .await; assert!(result.is_err()); @@ -4107,6 +4144,7 @@ mod tests { // Verify that Bola can update the group description let result = bola_group + .conversation .update_group_description("New Description".to_string()) .await; assert!(result.is_ok()); @@ -4990,6 +5028,7 @@ mod tests { .list(FfiListConversationsOptions::default()) .await .unwrap()[0] + .conversation .find_messages(FfiListMessagesOptions::default()) .await .unwrap(); @@ -4998,6 +5037,7 @@ mod tests { .list(FfiListConversationsOptions::default()) .await .unwrap()[0] + .conversation .find_messages(FfiListMessagesOptions::default()) .await .unwrap(); diff --git a/xmtp_api_grpc/src/grpc_api_helper.rs b/xmtp_api_grpc/src/grpc_api_helper.rs index 0fdadaecf..202768a7b 100644 --- a/xmtp_api_grpc/src/grpc_api_helper.rs +++ b/xmtp_api_grpc/src/grpc_api_helper.rs @@ -31,7 +31,7 @@ use xmtp_proto::{ #[tracing::instrument(level = "trace", skip_all)] pub async fn create_tls_channel(address: String) -> Result { - let span = tracing::trace_span!("grpc_connect", address); + let span = tracing::debug_span!("grpc_connect", address); let channel = Channel::from_shared(address) .map_err(|e| Error::new(ErrorKind::SetupCreateChannelError).with(e))? // Purpose: This setting controls the size of the initial connection-level flow control window for HTTP/2, which is the underlying protocol for gRPC. diff --git a/xmtp_mls/src/client.rs b/xmtp_mls/src/client.rs index 24806dbf7..69be71c2e 100644 --- a/xmtp_mls/src/client.rs +++ b/xmtp_mls/src/client.rs @@ -674,11 +674,14 @@ where .collect()) } - pub fn list_conversations(&self) -> Result>, ClientError> { + pub fn list_conversations( + &self, + args: GroupQueryArgs, + ) -> Result>, ClientError> { Ok(self .store() .conn()? - .fetch_conversation_list()? + .fetch_conversation_list(args)? .into_iter() .map(|conversation_item| { let message = conversation_item.message_id.and_then(|message_id| { diff --git a/xmtp_mls/src/storage/encrypted_store/conversation_list.rs b/xmtp_mls/src/storage/encrypted_store/conversation_list.rs index 58f387a7c..2ee0d72f2 100644 --- a/xmtp_mls/src/storage/encrypted_store/conversation_list.rs +++ b/xmtp_mls/src/storage/encrypted_store/conversation_list.rs @@ -1,8 +1,12 @@ use super::schema::conversation_list::dsl::conversation_list; -use crate::storage::group::{ConversationType, GroupMembershipState}; +use crate::storage::consent_record::ConsentState; +use crate::storage::group::{ConversationType, GroupMembershipState, GroupQueryArgs}; use crate::storage::group_message::{ContentType, DeliveryStatus, GroupMessageKind}; use crate::storage::{DbConnection, StorageError}; -use diesel::{QueryDsl, Queryable, RunQueryDsl, Table}; +use diesel::dsl::sql; +use diesel::{ + BoolExpressionMethods, ExpressionMethods, JoinOnDsl, QueryDsl, Queryable, RunQueryDsl, Table, +}; use serde::{Deserialize, Serialize}; #[derive(Queryable, Debug, Clone, Deserialize, Serialize)] @@ -53,17 +57,111 @@ pub struct ConversationListItem { } impl DbConnection { - pub fn fetch_conversation_list(&self) -> Result, StorageError> { - let query = conversation_list + pub fn fetch_conversation_list>( + &self, + args: A, + ) -> Result, StorageError> { + use crate::storage::schema::consent_records::dsl as consent_dsl; + use crate::storage::schema::conversation_list::dsl as conversation_list_dsl; + + let GroupQueryArgs { + allowed_states, + created_after_ns, + created_before_ns, + limit, + conversation_type, + consent_state, + include_sync_groups, + include_duplicate_dms, + } = args.as_ref(); + let mut query = conversation_list .select(conversation_list::all_columns()) + .filter(conversation_list_dsl::conversation_type.ne(ConversationType::Sync)) .into_boxed(); - Ok(self.raw_query(|conn| query.load::(conn))?) + + if !include_duplicate_dms { + // Group by dm_id and grab the latest group (conversation stitching) + query = query.filter(sql::( + "id IN ( + SELECT id + FROM groups + GROUP BY CASE WHEN dm_id IS NULL THEN id ELSE dm_id END + ORDER BY last_message_ns DESC + )", + )); + } + + if let Some(limit) = limit { + query = query.limit(*limit); + } + + if let Some(allowed_states) = allowed_states { + query = query.filter(conversation_list_dsl::membership_state.eq_any(allowed_states)); + } + + if let Some(created_after_ns) = created_after_ns { + query = query.filter(conversation_list_dsl::created_at_ns.gt(created_after_ns)); + } + + if let Some(created_before_ns) = created_before_ns { + query = query.filter(conversation_list_dsl::created_at_ns.lt(created_before_ns)); + } + + if let Some(conversation_type) = conversation_type { + query = query.filter(conversation_list_dsl::conversation_type.eq(conversation_type)); + } + + let mut conversations = if let Some(consent_state) = consent_state { + if *consent_state == ConsentState::Unknown { + let query = query + .left_join( + consent_dsl::consent_records + .on(sql::("lower(hex(groups.id))") + .eq(consent_dsl::entity)), + ) + .filter( + consent_dsl::state + .is_null() + .or(consent_dsl::state.eq(ConsentState::Unknown)), + ) + .select(conversation_list::all_columns()) + .order(conversation_list_dsl::created_at_ns.asc()); + + self.raw_query(|conn| query.load::(conn))? + } else { + let query = query + .inner_join( + consent_dsl::consent_records + .on(sql::("lower(hex(groups.id))") + .eq(consent_dsl::entity)), + ) + .filter(consent_dsl::state.eq(*consent_state)) + .select(conversation_list::all_columns()) + .order(conversation_list_dsl::created_at_ns.asc()); + + self.raw_query(|conn| query.load::(conn))? + } + } else { + self.raw_query(|conn| query.load::(conn))? + }; + + // Were sync groups explicitly asked for? Was the include_sync_groups flag set to true? + // Then query for those separately + if matches!(conversation_type, Some(ConversationType::Sync)) || *include_sync_groups { + let query = conversation_list_dsl::conversation_list + .filter(conversation_list_dsl::conversation_type.eq(ConversationType::Sync)); + let mut sync_groups = self.raw_query(|conn| query.load(conn))?; + conversations.append(&mut sync_groups); + } + + Ok(conversations) } } #[cfg(test)] pub(crate) mod tests { use crate::storage::group::tests::{generate_group, generate_group_with_created_at}; + use crate::storage::group::GroupQueryArgs; use crate::storage::tests::with_connection; use crate::Store; use wasm_bindgen_test::wasm_bindgen_test; @@ -88,7 +186,9 @@ pub(crate) mod tests { } // Fetch the conversation list - let conversation_list = conn.fetch_conversation_list().unwrap(); + let conversation_list = conn + .fetch_conversation_list(GroupQueryArgs::default()) + .unwrap(); assert_eq!(conversation_list.len(), 1, "Should return one group"); assert_eq!( conversation_list[0].id, group.id, @@ -123,7 +223,9 @@ pub(crate) mod tests { message.store(conn).unwrap(); // Fetch the conversation list - let conversation_list = conn.fetch_conversation_list().unwrap(); + let conversation_list = conn + .fetch_conversation_list(GroupQueryArgs::default()) + .unwrap(); assert_eq!(conversation_list.len(), 3, "Should return all three groups"); assert_eq!( @@ -159,7 +261,9 @@ pub(crate) mod tests { first_message.store(conn).unwrap(); // Fetch the conversation list and check last message - let mut conversation_list = conn.fetch_conversation_list().unwrap(); + let mut conversation_list = conn + .fetch_conversation_list(GroupQueryArgs::default()) + .unwrap(); assert_eq!(conversation_list.len(), 1, "Should return one group"); assert_eq!( conversation_list[0].sent_at_ns.unwrap(), @@ -178,7 +282,9 @@ pub(crate) mod tests { second_message.store(conn).unwrap(); // Fetch the conversation list again and validate the last message is updated - conversation_list = conn.fetch_conversation_list().unwrap(); + conversation_list = conn + .fetch_conversation_list(GroupQueryArgs::default()) + .unwrap(); assert_eq!( conversation_list[0].sent_at_ns.unwrap(), 2000, diff --git a/xmtp_mls/src/storage/encrypted_store/schema_gen.rs b/xmtp_mls/src/storage/encrypted_store/schema_gen.rs index 82a76b76a..e3f160d14 100644 --- a/xmtp_mls/src/storage/encrypted_store/schema_gen.rs +++ b/xmtp_mls/src/storage/encrypted_store/schema_gen.rs @@ -1,5 +1,7 @@ // @generated automatically by Diesel CLI. +use crate::storage::schema::conversation_list; + diesel::table! { association_state (inbox_id, sequence_id) { inbox_id -> Text, @@ -143,4 +145,5 @@ diesel::allow_tables_to_appear_in_same_query!( refresh_state, user_preferences, wallet_addresses, + conversation_list );