Skip to content

Commit

Permalink
WIP using new Message on peer event loop
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrieldemian committed Aug 15, 2024
1 parent b10bb28 commit dbe81a7
Show file tree
Hide file tree
Showing 8 changed files with 463 additions and 326 deletions.
197 changes: 187 additions & 10 deletions crates/vincenzo/src/extensions/core/codec.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use bytes::{Buf, BufMut, BytesMut};
use futures::SinkExt;
use futures::{Sink, SinkExt};
use std::io::Cursor;
use tokio::io;
use tokio::{io, sync::oneshot};
use tokio_util::codec::{Decoder, Encoder};
use tracing::trace;
use tracing::{debug, trace, warn};

use super::{Block, BlockInfo, Message};
use crate::{
bitfield::Bitfield, error::Error, extensions::extended::ExtensionTrait,
peer::Peer,
bitfield::Bitfield, disk::DiskMsg, error::Error,
extensions::extended::ExtensionTrait, peer::Peer,
};

/// Core messages exchanged after a successful handshake.
Expand Down Expand Up @@ -69,11 +69,11 @@ impl TryFrom<u8> for CoreId {
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct CoreCodec;

impl Encoder<Core> for CoreCodec {
type Error = io::Error;
type Error = Error;

fn encode(
&mut self,
Expand Down Expand Up @@ -273,13 +273,190 @@ impl ExtensionTrait for CoreCodec {
// maybe create another trait for an extension that has an id?
const ID: u8 = 255;

async fn handle_msg<T: SinkExt<Message> + Sized + std::marker::Unpin>(
async fn handle_msg<
T: SinkExt<Message>
+ Sized
+ std::marker::Unpin
+ Sink<Message, Error = Error>,
>(
&self,
msg: Self::Msg,
msg: &Self::Msg,
peer: &mut Peer,
sink: &mut T,
) -> Result<(), Error> {
todo!()
let local = peer.ctx.local_addr;
let remote = peer.ctx.remote_addr;

match msg {
Core::KeepAlive => {
debug!("{local} keepalive");
}
Core::Bitfield(bitfield) => {
// take entire pieces from bitfield
// and put in pending_requests
debug!("{local} bitfield");

let mut b = peer.ctx.pieces.write().await;
*b = bitfield.clone();

// remove excess bits
let pieces =
peer.torrent_ctx.info.read().await.pieces() as usize;

if bitfield.len() != pieces && pieces > 0 && peer.have_info {
unsafe {
b.set_len(pieces);
}
}

debug!("{local} bitfield is len {:?}", bitfield.len());
drop(b);

let peer_has_piece = peer.has_piece_not_in_local().await;
debug!("{local} peer_has_piece {peer_has_piece}");

if peer_has_piece {
debug!("{local} interested due to Bitfield");

peer.session.state.am_interested = true;
sink.send(Core::Interested.into()).await?;

if peer.can_request() {
peer.prepare_for_download().await;
peer.request_block_infos(sink).await?;
}
}
}
Core::Unchoke => {
peer.session.state.peer_choking = false;
debug!("{local} unchoke");

if peer.can_request() {
peer.prepare_for_download().await;
peer.request_block_infos(sink).await?;
}
}
Core::Choke => {
peer.session.state.peer_choking = true;
debug!("{local} choke");
peer.free_pending_blocks().await;
}
Core::Interested => {
debug!("{local} interested");
peer.session.state.peer_interested = true;
}
Core::NotInterested => {
debug!("{local} NotInterested");
peer.session.state.peer_interested = false;
}
Core::Have(piece) => {
debug!("{local} Have {piece}");
// Have is usually sent when the peer has downloaded
// a new piece, however, some peers, after handshake,
// send an incomplete bitfield followed by a sequence of
// have's. They do this to try to prevent censhorship
// from ISPs.
// Overwrite pieces on bitfield, if the peer has one
let ctx = peer.ctx.clone();
let mut pieces = ctx.pieces.write().await;

if pieces.clone().get(*piece).is_none() {
warn!(
"{local} sent Have but it's bitfield is out of bounds"
);
warn!("initializing an empty bitfield with the len of the piece {piece}");
*pieces = Bitfield::from_vec(vec![0u8; *piece]);
}

pieces.set(*piece, true);
drop(pieces);

let torrent_ctx = peer.torrent_ctx.clone();
let local_bitfield = torrent_ctx.bitfield.read().await;
let piece = local_bitfield.get(*piece);

// maybe become interested in peer and request blocks
if !peer.session.state.am_interested {
if let Some(a) = piece {
if *a {
debug!("already have this piece, ignoring");
} else {
debug!(
"We do not have this piece, sending interested"
);
debug!("{local} we are interested due to Have");

peer.session.state.am_interested = true;
sink.send(Core::Interested.into()).await?;

if peer.can_request() {
peer.prepare_for_download().await;
peer.request_block_infos(sink).await?;
}
}
}
}
}
Core::Piece(block) => {
debug!("{local} piece {}", block.index);
debug!(
"index: {:?}, begin: {:?}, len: {:?}",
block.index,
block.begin,
block.block.len()
);

peer.handle_piece_msg(block.clone()).await?;
if peer.can_request() {
peer.prepare_for_download().await;
peer.request_block_infos(sink).await?;
}
}
Core::Cancel(block_info) => {
debug!("{local} cancel from {remote}");
debug!("{block_info:?}");
peer.incoming_requests.remove(block_info);
}
Core::Request(block_info) => {
debug!("{local} request from {remote}");
debug!("{block_info:?}");

if !peer.session.state.peer_choking {
let begin = block_info.begin;
let index = block_info.index as usize;
let (tx, rx) = oneshot::channel();

// check if peer is not already requesting this block
if peer.incoming_requests.contains(block_info) {
// TODO: if peer keeps spamming us, close connection
warn!("Peer sent duplicate block request");
}

peer.incoming_requests.insert(block_info.clone());

peer.torrent_ctx
.disk_tx
.send(DiskMsg::ReadBlock {
block_info: block_info.clone(),
recipient: tx,
info_hash: peer.torrent_ctx.info_hash,
})
.await?;

let bytes = rx.await?;

let block = Block { index, begin, block: bytes };
let _ = sink.send(Core::Piece(block).into()).await;
}
}
Core::Extended(_, _) => {
// this branch is only used when the local peer convert an
// extended enum message to Core::Message just
// to send on the sink.
}
}

Ok(())
}

fn codec(&self) -> Self::Codec {
Expand Down
Loading

0 comments on commit dbe81a7

Please sign in to comment.