Skip to content

Commit

Permalink
refactor(live_location_share): exclude the live location events of th…
Browse files Browse the repository at this point in the history
…e rooms own user
  • Loading branch information
torrybr committed Jan 15, 2025
1 parent fe3cc09 commit dab2960
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 23 deletions.
27 changes: 15 additions & 12 deletions crates/matrix-sdk/src/live_location_share.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,23 @@ impl ObservableLiveLocation {
/// Get a stream of [`LiveLocationShare`].
pub fn subscribe(&self) -> impl Stream<Item = LiveLocationShare> {
let stream = self.observable_room_events.subscribe();

stream! {
for await (event, room) in stream {
yield LiveLocationShare {
last_location: LastLocation {
location: event.content.location,
ts: event.origin_server_ts,
},
beacon_info: room
.get_user_beacon_info(&event.sender)
.await
.ok()
.map(|info| info.content),
user_id: event.sender,
};
if event.sender != room.own_user_id() {
yield LiveLocationShare {
last_location: LastLocation {
location: event.content.location,
ts: event.origin_server_ts,
},
beacon_info: room
.get_user_beacon_info(&event.sender)
.await
.ok()
.map(|info| info.content),
user_id: event.sender,
};
}
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions crates/matrix-sdk/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3244,6 +3244,9 @@ impl Room {
///
/// The returned observable will receive the newest event for each sync
/// response that contains an `m.beacon` event.
///
/// Returns a stream of `ObservableLiveLocation` events from other users
/// in the room, excluding the live location events of the room's own user.
pub fn observe_live_location_shares(&self) -> ObservableLiveLocation {
ObservableLiveLocation::new(&self.client, self.room_id())
}
Expand Down
114 changes: 103 additions & 11 deletions crates/matrix-sdk/tests/integration/room/beacon/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::time::{Duration, UNIX_EPOCH};

use futures_util::{pin_mut, StreamExt as _};
use futures_util::{pin_mut, FutureExt, StreamExt as _};
use js_int::uint;
use matrix_sdk::{config::SyncSettings, live_location_share::LiveLocationShare};
use matrix_sdk_test::{
Expand Down Expand Up @@ -130,8 +130,8 @@ async fn test_send_location_beacon_with_expired_live_share() {
},
"event_id": "$15139375514XsgmR:localhost",
"origin_server_ts": 1_636_829_458,
"sender": "@example:localhost",
"state_key": "@example:localhost",
"sender": "@example2:localhost",
"state_key": "@example2:localhost",
"type": "org.matrix.msc3672.beacon_info",
"unsigned": {
"age": 7034220
Expand Down Expand Up @@ -192,8 +192,8 @@ async fn test_most_recent_event_in_stream() {
},
"event_id": "$15139375514XsgmR:localhost",
"origin_server_ts": millis_time,
"sender": "@example:localhost",
"state_key": "@example:localhost",
"sender": "@example2:localhost",
"state_key": "@example2:localhost",
"type": "org.matrix.msc3672.beacon_info",
"unsigned": {
"age": 7034220
Expand Down Expand Up @@ -235,7 +235,7 @@ async fn test_most_recent_event_in_stream() {
},
"event_id": format!("$event_for_stream_{nth}"),
"origin_server_ts": 1_636_829_458,
"sender": "@example:localhost",
"sender": "@example2:localhost",
"type": "org.matrix.msc3672.beacon",
"unsigned": {
"age": 598971
Expand All @@ -256,7 +256,7 @@ async fn test_most_recent_event_in_stream() {
let LiveLocationShare { user_id, last_location, beacon_info } =
stream.next().await.expect("Another live location was expected");

assert_eq!(user_id.to_string(), "@example:localhost");
assert_eq!(user_id.to_string(), "@example2:localhost");

assert_eq!(last_location.location.uri, "geo:24.9575274619722,12.494122581370175;u=24");

Expand Down Expand Up @@ -305,8 +305,8 @@ async fn test_observe_single_live_location_share() {
},
"event_id": "$test_beacon_info",
"origin_server_ts": millis_time,
"sender": "@example:localhost",
"state_key": "@example:localhost",
"sender": "@example2:localhost",
"state_key": "@example2:localhost",
"type": "org.matrix.msc3672.beacon_info",
}
]
Expand Down Expand Up @@ -341,7 +341,7 @@ async fn test_observe_single_live_location_share() {
},
"event_id": "$location_event",
"origin_server_ts": millis_time,
"sender": "@example:localhost",
"sender": "@example2:localhost",
"type": "org.matrix.msc3672.beacon",
});

Expand All @@ -362,7 +362,7 @@ async fn test_observe_single_live_location_share() {
let LiveLocationShare { user_id, last_location, beacon_info } =
stream.next().await.expect("Another live location was expected");

assert_eq!(user_id.to_string(), "@example:localhost");
assert_eq!(user_id.to_string(), "@example2:localhost");
assert_eq!(last_location.location.uri, "geo:10.000000,20.000000;u=5");
assert_eq!(last_location.ts, current_time);

Expand All @@ -374,3 +374,95 @@ async fn test_observe_single_live_location_share() {
assert_eq!(beacon_info.timeout, Duration::from_millis(3000));
assert_eq!(beacon_info.ts, current_time);
}

#[async_test]
async fn test_observing_live_location_does_not_return_own_beacon_updates() {
let (client, server) = logged_in_client_with_server().await;

let mut sync_builder = SyncResponseBuilder::new();

let current_time = MilliSecondsSinceUnixEpoch::now();
let millis_time = current_time
.to_system_time()
.unwrap()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as u64;

mock_sync(
&server,
json!({
"next_batch": "s526_47314_0_7_1_1_1_1_1",
"rooms": {
"join": {
*DEFAULT_TEST_ROOM_ID: {
"state": {
"events": [
{
"content": {
"description": "Live Share",
"live": true,
"org.matrix.msc3488.ts": millis_time,
"timeout": 3000,
"org.matrix.msc3488.asset": { "type": "m.self" }
},
"event_id": "$15139375514XsgmR:localhost",
"origin_server_ts": millis_time,
"sender": "@example:localhost",
"state_key": "@example:localhost",
"type": "org.matrix.msc3672.beacon_info",
"unsigned": {
"age": 7034220
}
}
]
}
}
}
}

}),
None,
)
.await;
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;

let room = client.get_room(*DEFAULT_TEST_ROOM_ID).unwrap();
let observable_live_location_shares = room.observe_live_location_shares();
let stream = observable_live_location_shares.subscribe();
pin_mut!(stream);

let mut timeline_events = Vec::new();

timeline_events.push(sync_timeline_event!({
"content": {
"m.relates_to": {
"event_id": "$15139375514XsgmR:localhost",
"rel_type": "m.reference"
},
"org.matrix.msc3488.location": {
"uri": "geo:1.9575274619722,12.494122581370175;u=1"
},
"org.matrix.msc3488.ts": 1_636_829_458
},
"event_id": "$152037dfsef280074GZeOm:localhost",
"origin_server_ts": 1_636_829_458,
"sender": "@example:localhost",
"type": "org.matrix.msc3672.beacon",
"unsigned": {
"age": 598971
}
}));

sync_builder.add_joined_room(
JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_timeline_bulk(timeline_events.clone()),
);

mock_sync(&server, sync_builder.build_json_sync_response(), None).await;
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;

assert!(stream.next().now_or_never().is_none());
}

0 comments on commit dab2960

Please sign in to comment.