Skip to content

Commit

Permalink
Add support for a persistent ChannelConnection (kaspanet#229)
Browse files Browse the repository at this point in the history
  • Loading branch information
tiram88 authored Jul 20, 2023
1 parent 5cf2ed8 commit 27d61e1
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 17 deletions.
5 changes: 3 additions & 2 deletions indexes/processor/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use kaspa_core::{
};
use kaspa_index_core::notifier::IndexNotifier;
use kaspa_notify::{
connection::ChannelType,
events::{EventSwitches, EventType},
scope::{PruningPointUtxoSetOverrideScope, Scope, UtxosChangedScope},
};
Expand All @@ -27,8 +28,8 @@ impl IndexService {
pub fn new(consensus_notifier: &Arc<ConsensusNotifier>, utxoindex: Option<UtxoIndexProxy>) -> Self {
// Prepare consensus-notify objects
let consensus_notify_channel = Channel::<ConsensusNotification>::default();
let consensus_notify_listener_id =
consensus_notifier.register_new_listener(ConsensusChannelConnection::new(consensus_notify_channel.sender()));
let consensus_notify_listener_id = consensus_notifier
.register_new_listener(ConsensusChannelConnection::new(consensus_notify_channel.sender(), ChannelType::Closable));

// Prepare the index-processor notifier
// No subscriber is defined here because the subscription are manually created during the construction and never changed after that.
Expand Down
4 changes: 2 additions & 2 deletions notify/src/broadcaster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ where
mod tests {
use super::*;
use crate::{
connection::ChannelConnection,
connection::{ChannelConnection, ChannelType},
listener::Listener,
notification::test_helpers::*,
notifier::test_helpers::{
Expand Down Expand Up @@ -263,7 +263,7 @@ mod tests {
let mut notification_receivers = Vec::with_capacity(listener_count);
for _ in 0..listener_count {
let (sender, receiver) = unbounded();
let connection = TestConnection::new(sender);
let connection = TestConnection::new(sender, ChannelType::Closable);
let listener = Listener::new(connection);
listeners.push(listener);
notification_receivers.push(receiver);
Expand Down
21 changes: 18 additions & 3 deletions notify/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,32 @@ pub trait Connection: Clone + Debug + Send + Sync + 'static {
fn is_closed(&self) -> bool;
}

#[derive(Clone, Debug)]
pub enum ChannelType {
Closable,
Persistent,
}

#[derive(Clone, Debug)]
pub struct ChannelConnection<N>
where
N: Notification,
{
sender: Sender<N>,
channel_type: ChannelType,
}

impl<N> ChannelConnection<N>
where
N: Notification,
{
pub fn new(sender: Sender<N>) -> Self {
Self { sender }
pub fn new(sender: Sender<N>, channel_type: ChannelType) -> Self {
Self { sender, channel_type }
}

/// Close the connection, ignoring the channel type
pub fn force_close(&self) -> bool {
self.sender.close()
}
}

Expand Down Expand Up @@ -65,7 +77,10 @@ where
}

fn close(&self) -> bool {
self.sender.close()
match self.channel_type {
ChannelType::Closable => self.sender.close(),
ChannelType::Persistent => false,
}
}

fn is_closed(&self) -> bool {
Expand Down
3 changes: 2 additions & 1 deletion notify/src/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,7 @@ mod tests {
use super::{test_helpers::*, *};
use crate::{
collector::CollectorFrom,
connection::ChannelType,
converter::ConverterFrom,
events::EVENT_TYPE_ARRAY,
notification::test_helpers::*,
Expand Down Expand Up @@ -748,7 +749,7 @@ mod tests {
let mut notification_receivers = Vec::with_capacity(listener_count);
for _ in 0..listener_count {
let (sender, receiver) = unbounded();
let connection = TestConnection::new(sender);
let connection = TestConnection::new(sender, ChannelType::Closable);
listeners.push(notifier.register_new_listener(connection));
notification_receivers.push(receiver);
}
Expand Down
5 changes: 3 additions & 2 deletions rpc/grpc/server/src/connection_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use kaspa_grpc_core::{
},
RPC_MAX_MESSAGE_SIZE,
};
use kaspa_notify::{events::EVENT_TYPE_ARRAY, notifier::Notifier, subscriber::Subscriber};
use kaspa_notify::{connection::ChannelType, events::EVENT_TYPE_ARRAY, notifier::Notifier, subscriber::Subscriber};
use kaspa_rpc_core::{
api::rpc::DynRpcService,
notify::{channel::NotificationChannel, connection::ChannelConnection},
Expand Down Expand Up @@ -45,7 +45,8 @@ impl ConnectionHandler {
pub fn new(core_service: DynRpcService, core_notifier: Arc<Notifier<Notification, ChannelConnection>>, manager: Manager) -> Self {
// Prepare core objects
let core_channel = NotificationChannel::default();
let core_listener_id = core_notifier.register_new_listener(ChannelConnection::new(core_channel.sender()));
let core_listener_id =
core_notifier.register_new_listener(ChannelConnection::new(core_channel.sender(), ChannelType::Closable));

// Prepare internals
let core_events = EVENT_TYPE_ARRAY[..].into();
Expand Down
10 changes: 6 additions & 4 deletions rpc/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use kaspa_index_core::{
use kaspa_mining::{manager::MiningManagerProxy, mempool::tx::Orphan};
use kaspa_notify::{
collector::DynCollector,
connection::ChannelType,
events::{EventSwitches, EventType, EVENT_TYPE_ARRAY},
listener::ListenerId,
notifier::Notifier,
Expand Down Expand Up @@ -95,8 +96,8 @@ impl RpcCoreService {
) -> Self {
// Prepare consensus-notify objects
let consensus_notify_channel = Channel::<ConsensusNotification>::default();
let consensus_notify_listener_id =
consensus_notifier.register_new_listener(ConsensusChannelConnection::new(consensus_notify_channel.sender()));
let consensus_notify_listener_id = consensus_notifier
.register_new_listener(ConsensusChannelConnection::new(consensus_notify_channel.sender(), ChannelType::Closable));

// Prepare the rpc-core notifier objects
let mut consensus_events: EventSwitches = EVENT_TYPE_ARRAY[..].into();
Expand All @@ -118,8 +119,9 @@ impl RpcCoreService {
let index_converter = Arc::new(IndexConverter::new(config.clone()));
if let Some(ref index_notifier) = index_notifier {
let index_notify_channel = Channel::<IndexNotification>::default();
let index_notify_listener_id =
index_notifier.clone().register_new_listener(IndexChannelConnection::new(index_notify_channel.sender()));
let index_notify_listener_id = index_notifier
.clone()
.register_new_listener(IndexChannelConnection::new(index_notify_channel.sender(), ChannelType::Closable));

let index_events: EventSwitches = [EventType::UtxosChanged, EventType::PruningPointUtxoSetOverride].as_ref().into();
let index_collector =
Expand Down
5 changes: 3 additions & 2 deletions rpc/wrpc/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
service::Options,
};
use kaspa_grpc_client::GrpcClient;
use kaspa_notify::{events::EVENT_TYPE_ARRAY, notifier::Notifier, scope::Scope, subscriber::Subscriber};
use kaspa_notify::{connection::ChannelType, events::EVENT_TYPE_ARRAY, notifier::Notifier, scope::Scope, subscriber::Subscriber};
use kaspa_rpc_core::{
api::rpc::{DynRpcService, RpcApi},
notify::{channel::NotificationChannel, connection::ChannelConnection, mode::NotificationMode},
Expand Down Expand Up @@ -56,7 +56,8 @@ impl Server {
let rpc_core = if let Some(service) = core_service {
// Prepare rpc service objects
let notification_channel = NotificationChannel::default();
let listener_id = service.notifier().register_new_listener(ChannelConnection::new(notification_channel.sender()));
let listener_id =
service.notifier().register_new_listener(ChannelConnection::new(notification_channel.sender(), ChannelType::Closable));

// Prepare notification internals
let enabled_events = EVENT_TYPE_ARRAY[..].into();
Expand Down
3 changes: 2 additions & 1 deletion wallet/core/src/wallet.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::result::Result;
use crate::wallets::HDWalletGen1;
use kaspa_notify::{
connection::ChannelType,
listener::ListenerId,
scope::{Scope, VirtualDaaScoreChangedScope},
};
Expand Down Expand Up @@ -40,7 +41,7 @@ impl Wallet {
let (listener_id, notification_receiver) = match rpc.notification_mode() {
NotificationMode::MultiListeners => {
let notification_channel = Channel::unbounded();
let connection = ChannelConnection::new(notification_channel.sender);
let connection = ChannelConnection::new(notification_channel.sender, ChannelType::Closable);
(rpc.register_new_listener(connection), notification_channel.receiver)
}
NotificationMode::Direct => (ListenerId::default(), rpc.notification_channel_receiver()),
Expand Down

0 comments on commit 27d61e1

Please sign in to comment.