Skip to content

Commit

Permalink
Bump negentropy to 311013ce
Browse files Browse the repository at this point in the history
Signed-off-by: Yuki Kishimoto <[email protected]>
  • Loading branch information
yukibtc committed Jan 25, 2025
1 parent 1470b8b commit 4f09609
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 27 deletions.
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ async-utility = "0.3"
async-wsocket = { git = "https://github.com/yukibtc/async-wsocket", rev = "5fba7927576064ac0698a4ee3df0d26e5cf726dd" }
atomic-destructor = { version = "0.3", default-features = false }
js-sys = "0.3"
negentropy = { version = "0.4", default-features = false }
#negentropy = { version = "0.4", default-features = false }
negentropy = { git = "https://github.com/rust-nostr/negentropy", rev = "311013ce05dd3f670d9d9c444c09195837837271", default-features = false }
negentropy-deprecated = { package = "negentropy", version = "0.3", default-features = false }
nostr = { version = "0.38", path = "./crates/nostr", default-features = false }
nostr-connect = { version = "0.38", path = "./crates/nostr-connect", default-features = false }
Expand Down
18 changes: 9 additions & 9 deletions crates/nostr-relay-builder/src/local/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use async_utility::futures_util::stream::{self, SplitSink};
use async_utility::futures_util::{SinkExt, StreamExt};
use async_wsocket::native::{self, Message, WebSocketStream};
use atomic_destructor::AtomicDestroyer;
use negentropy::{Bytes, Id, Negentropy, NegentropyStorageVector};
use negentropy::{Id, Negentropy, NegentropyStorageVector};
use nostr_database::prelude::*;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpListener;
Expand Down Expand Up @@ -286,7 +286,7 @@ impl InnerLocalRelay {

async fn handle_client_msg<S>(
&self,
session: &mut Session,
session: &mut Session<'_>,
ws_tx: &mut WsTx<S>,
msg: ClientMessage,
addr: &SocketAddr,
Expand Down Expand Up @@ -678,17 +678,17 @@ impl InnerLocalRelay {
// Construct negentropy storage, add items and seal
let mut storage = NegentropyStorageVector::with_capacity(items.len());
for (id, timestamp) in items.into_iter() {
let id: Id = Id::new(id.to_bytes());
let id: Id = Id::from_byte_array(id.to_bytes());
storage.insert(timestamp.as_u64(), id)?;
}
storage.seal()?;

// Construct negentropy client
let mut negentropy = Negentropy::new(storage, 60_000)?;
let mut negentropy = Negentropy::owned(storage, 60_000)?;

// Reconcile
let bytes: Bytes = Bytes::from_hex(initial_message)?;
let message: Bytes = negentropy.reconcile(&bytes)?;
let bytes: Vec<u8> = hex::decode(initial_message)?;
let message: Vec<u8> = negentropy.reconcile(&bytes)?;

// Update subscriptions
session
Expand All @@ -700,7 +700,7 @@ impl InnerLocalRelay {
ws_tx,
RelayMessage::NegMsg {
subscription_id,
message: message.to_hex(),
message: hex::encode(message),
},
)
.await
Expand All @@ -712,15 +712,15 @@ impl InnerLocalRelay {
match session.negentropy_subscription.get_mut(&subscription_id) {
Some(negentropy) => {
// Reconcile
let bytes: Bytes = Bytes::from_hex(message)?;
let bytes: Vec<u8> = hex::decode(message)?;
let message = negentropy.reconcile(&bytes)?;

// Reply
self.send_msg(
ws_tx,
RelayMessage::NegMsg {
subscription_id,
message: message.to_hex(),
message: hex::encode(message),
},
)
.await
Expand Down
6 changes: 3 additions & 3 deletions crates/nostr-relay-builder/src/local/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,14 @@ impl Nip42Session {
}
}

pub(super) struct Session {
pub(super) struct Session<'a> {
pub subscriptions: HashMap<SubscriptionId, Vec<Filter>>,
pub negentropy_subscription: HashMap<SubscriptionId, Negentropy<NegentropyStorageVector>>,
pub negentropy_subscription: HashMap<SubscriptionId, Negentropy<'a, NegentropyStorageVector>>,
pub nip42: Nip42Session,
pub tokens: Tokens,
}

impl Session {
impl Session<'_> {
const MIN: Duration = Duration::from_secs(60);

fn calculate_elapsed_time(&self, now: Instant, last: Instant) -> Duration {
Expand Down
13 changes: 11 additions & 2 deletions crates/nostr-relay-pool/src/relay/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
use std::fmt;
use std::time::Duration;

use nostr::event;
use nostr::event::builder;
use nostr::event::{self, builder};
use nostr::message::MessageHandleError;
use nostr::util::hex;
use nostr_database::DatabaseError;
use tokio::sync::{broadcast, SetError};

Expand All @@ -30,6 +30,8 @@ pub enum Error {
EventBuilder(builder::Error),
/// Partial Event error
PartialEvent(event::partial::Error),
/// Hex error
Hex(hex::Error),
/// Negentropy error
Negentropy(negentropy::Error),
/// Negentropy error
Expand Down Expand Up @@ -132,6 +134,7 @@ impl fmt::Display for Error {
Self::Event(e) => write!(f, "{e}"),
Self::EventBuilder(e) => write!(f, "{e}"),
Self::PartialEvent(e) => write!(f, "{e}"),
Self::Hex(e) => write!(f, "{e}"),
Self::Negentropy(e) => write!(f, "{e}"),
Self::NegentropyDeprecated(e) => write!(f, "{e}"),
Self::Database(e) => write!(f, "{e}"),
Expand Down Expand Up @@ -220,6 +223,12 @@ impl From<event::partial::Error> for Error {
}
}

impl From<hex::Error> for Error {
fn from(e: hex::Error) -> Self {
Self::Hex(e)
}
}

impl From<negentropy::Error> for Error {
fn from(e: negentropy::Error) -> Self {
Self::Negentropy(e)
Expand Down
22 changes: 12 additions & 10 deletions crates/nostr-relay-pool/src/relay/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use async_utility::{task, time};
use async_wsocket::futures_util::{self, SinkExt, StreamExt};
use async_wsocket::{ConnectionMode, Message};
use atomic_destructor::AtomicDestroyer;
use negentropy::{Bytes, Id, Negentropy, NegentropyStorageVector};
use negentropy::{Id, Negentropy, NegentropyStorageVector};
use negentropy_deprecated::{Bytes as BytesDeprecated, Negentropy as NegentropyDeprecated};
use nostr::event::raw::RawEvent;
use nostr::secp256k1::rand::{self, Rng};
Expand Down Expand Up @@ -1567,10 +1567,12 @@ impl InnerRelay {
output: &mut Reconciliation,
) -> Result<(), Error> {
// Prepare the negentropy client
let mut negentropy: Negentropy<NegentropyStorageVector> = prepare_negentropy_client(items)?;
let storage: NegentropyStorageVector = prepare_negentropy_storage(items)?;
let mut negentropy: Negentropy<NegentropyStorageVector> =
Negentropy::borrowed(&storage, NEGENTROPY_FRAME_SIZE_LIMIT)?;

// Initiate reconciliation
let initial_message: Bytes = negentropy.initiate()?;
let initial_message: Vec<u8> = negentropy.initiate()?;

// Subscribe
let mut notifications = self.internal_notification_sender.subscribe();
Expand Down Expand Up @@ -1606,10 +1608,10 @@ impl InnerRelay {
let mut curr_need_ids: Vec<Id> = Vec::new();

// Parse message
let query: Bytes = Bytes::from_hex(message)?;
let query: Vec<u8> = hex::decode(message)?;

// Reconcile
let msg: Option<Bytes> = negentropy.reconcile_with_ids(
let msg: Option<Vec<u8>> = negentropy.reconcile_with_ids(
&query,
&mut curr_have_ids,
&mut curr_need_ids,
Expand All @@ -1618,7 +1620,7 @@ impl InnerRelay {
// Handle message
self.handle_neg_msg(
subscription_id,
msg.map(|m| m.to_bytes()),
msg,
curr_have_ids.into_iter().map(neg_id_to_event_id),
curr_need_ids.into_iter().map(neg_id_to_event_id),
opts,
Expand Down Expand Up @@ -1877,23 +1879,23 @@ fn neg_depr_to_event_id(id: BytesDeprecated) -> Option<EventId> {
EventId::from_slice(id.as_bytes()).ok()
}

fn prepare_negentropy_client(
fn prepare_negentropy_storage(
items: Vec<(EventId, Timestamp)>,
) -> Result<Negentropy<NegentropyStorageVector>, Error> {
) -> Result<NegentropyStorageVector, Error> {
// Compose negentropy storage
let mut storage = NegentropyStorageVector::with_capacity(items.len());

// Add items
for (id, timestamp) in items.into_iter() {
let id: Id = Id::new(id.to_bytes());
let id: Id = Id::from_byte_array(id.to_bytes());
storage.insert(timestamp.as_u64(), id)?;
}

// Seal
storage.seal()?;

// Build negentropy client
Ok(Negentropy::new(storage, NEGENTROPY_FRAME_SIZE_LIMIT)?)
Ok(storage)
}

/// Check if negentropy is supported
Expand Down

0 comments on commit 4f09609

Please sign in to comment.