-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sources): multicast udp socket support #22099
base: master
Are you sure you want to change the base?
feat(sources): multicast udp socket support #22099
Conversation
@@ -402,6 +402,31 @@ impl ToValue for SocketAddr { | |||
} | |||
} | |||
|
|||
impl Configurable for Ipv4Addr { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inspired from the SocketAddr
implementation a few lines above
This looks awesome🤯 |
Hi @nomalord perhaps you could test this? |
I don't really have experience with rust, but I'll try to compile the feature branch and update you🥹 |
Maybe this is helpful to you:
|
Thank you for popping in @pront 😃. I have left to include some tests to validate this PR and also add documentation & changelog. I will update this as soon as I can and mark the pr as non-draft then |
} | ||
|
||
fn generate_schema(_: &RefCell<SchemaGenerator>) -> Result<SchemaObject, GenerateError> { | ||
// TODO: We don't need anything other than a string schema to (de)serialize a `Ipv4Addr`, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the same TODOs as in
vector/lib/vector-config/src/stdlib.rs
Line 391 in e63e2bf
fn generate_schema(_: &RefCell<SchemaGenerator>) -> Result<SchemaObject, GenerateError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you plan to add validation in this PR? I understand we have similar TODOs for SocketAddr
. If you don't, this is fine with me and we can resolve this PR thread.
send_lines_udp_from(next_addr(), addr, lines) | ||
} | ||
|
||
fn send_lines_udp_from( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactored this a little bit, because the socket from where we will send the packets must be on the same interface as where the source is listening. And, with multicast groups, 127.0.0.1
seems to not work propertly and we have to use both 0.0.0.0
for the source's listening socket and for the socket that will send the packets. Hope I'm explaining this well..
.join_multicast_v4(group_addr, *listen_addr.ip()) | ||
.map_err(|error| { | ||
// TODO: is this considered a `SocketBindError`? or should we create a new error for this case? | ||
emit!(SocketBindError { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this considered a SocketBindError
? or should we create a new error for this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Strictly speaking joining and binding are different. If it's not too much effort, we can introduce a new error and include the group addr.
// We could support Ipv6 multicast with the | ||
// https://doc.rust-lang.org/std/net/struct.UdpSocket.html#method.join_multicast_v6 method | ||
// and specifying the interface index as `0`, in order to bind all interfaces. | ||
panic!("IPv6 multicast is not supported") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about adding ipv6 support too... But as no one requested it yet, I thought i wouldn't be worth to implement & test it in this PR. Ayn thoughts about this? Should I include ipv6 support in this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call. Also, a separate PR is better because it make review easier. We can leave this as a potential improvement until there's demand for it.
panic!("IPv6 multicast is not supported") | |
unimplemented!("IPv6 multicast is not supported") |
// socket is bound to, and not `IP_ADDR_ANY`. We need to use the same address | ||
// for the multicast group join that the user has set in the config. | ||
// if systemd{N} fd sockets are required to work too, we should investigate on this. | ||
SocketListenAddr::SystemdFd(_) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could use the UdpSocket::local_addr
with this socket, but I wonder if this would ever output IPADDR_ANY
(0.0.0.0) instead of the real interface address that socket is bound to.
multicast ip required setting 0.0.0.0 as a bind address to work. Otherwise, the interface would filter out packets targeting the multicast ip address.
I think it would be difficult to get this working as I would have to investigate more about this, and no one requested it... Any thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can leave this as unimplemented
and potential future work.
}; | ||
for group_addr in config.multicast_groups { | ||
socket | ||
.join_multicast_v4(group_addr, *listen_addr.ip()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should I use listen_addr.ip()
here o just hardcode Ipv4Addr::UNSPECIFIED
? If 0.0.0.0
is not set, the interface would filter out the packets targeting the multicast ip
see the method documentation here: https://doc.rust-lang.org/std/net/struct.UdpSocket.html#method.join_multicast_v4
and more info about ip multicast here: https://stackoverflow.com/questions/10692956/what-does-it-mean-to-bind-a-multicast-udp-socket
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Anyway, if either listen_addr.ip()
or Ipv4Addr::UNSPECIFIED
is used here, this would only if the source's address in config is 0.0.0.0
as setting it to 127.0.0.1
(for example) would filter out any packet going to the multicast addres
for example, this config:
[sources.multicast_udp]
type = "socket"
mode = "udp"
address = "127.0.0.1:4242"
multicast_groups = ["224.0.0.2"]
[sinks.console]
type = "console"
inputs = ["multicast_udp"]
encoding.codec = "json"
and this command echo hello | nc 224.0.0.2 4242 -u
wont work because the socket address is no 0..0.0.0
.
If we hardcode this to Ipv4Addr::UNSPECIFIED
, the behaviour would be the same; that config won't work.
Is this clear enough?
} | ||
|
||
#[tokio::test] | ||
#[should_panic] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had to use #[should_panic]
here as init_udp_with_config
does not return a Result
to know whether the initialization failed or not
vector/src/sources/socket/mod.rs
Line 1018 in 12bf074
.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense 👍
TIOLI: Leave a TODO comment to refactor init_udp_with_config
to return a Result
error, | ||
}) | ||
})?; | ||
info!(message = "Joined multicast group.", group = %group_addr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To give a bit of feedback about group joining
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's ok to use info!
here
cc @jszwedko to double check from a UX perspective 🙏
The `socket` source with `udp` mode now supports joining multicast groups via the `multicast_groups` option | ||
of that source. This allows the source to receive multicast packets from the specified multicast groups. | ||
|
||
Note that in order to work properly, the `socket` address must be set to `0.0.0.0` and not |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this notice needed if it was added too to website docs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding this. We can definitely keep it here.
Hi @pront, this PR is now ready to review, just notifying, no hurries! |
Thank you 🚀 I will get to this soon. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @jorgehermo9, great work. Nothing major came out, mostly nits.
The `socket` source with `udp` mode now supports joining multicast groups via the `multicast_groups` option | ||
of that source. This allows the source to receive multicast packets from the specified multicast groups. | ||
|
||
Note that in order to work properly, the `socket` address must be set to `0.0.0.0` and not |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding this. We can definitely keep it here.
} | ||
|
||
fn generate_schema(_: &RefCell<SchemaGenerator>) -> Result<SchemaObject, GenerateError> { | ||
// TODO: We don't need anything other than a string schema to (de)serialize a `Ipv4Addr`, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you plan to add validation in this PR? I understand we have similar TODOs for SocketAddr
. If you don't, this is fine with me and we can resolve this PR thread.
|
||
fn send_lines_udp_from( | ||
from: SocketAddr, | ||
addr: SocketAddr, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit:
addr: SocketAddr, | |
to: SocketAddr, |
Here and bellow
init_udp_with_config(tx, config).await; | ||
|
||
// We must send packets to the same interface the `socket_address` is bound to | ||
// in order to receive the multicast packets this `from` socket sends |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit:
// in order to receive the multicast packets this `from` socket sends | |
// in order to receive the multicast packets the `from` socket sends. |
} | ||
|
||
#[tokio::test] | ||
#[should_panic] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense 👍
TIOLI: Leave a TODO comment to refactor init_udp_with_config
to return a Result
// We could support Ipv6 multicast with the | ||
// https://doc.rust-lang.org/std/net/struct.UdpSocket.html#method.join_multicast_v6 method | ||
// and specifying the interface index as `0`, in order to bind all interfaces. | ||
panic!("IPv6 multicast is not supported") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call. Also, a separate PR is better because it make review easier. We can leave this as a potential improvement until there's demand for it.
panic!("IPv6 multicast is not supported") | |
unimplemented!("IPv6 multicast is not supported") |
error, | ||
}) | ||
})?; | ||
info!(message = "Joined multicast group.", group = %group_addr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's ok to use info!
here
cc @jszwedko to double check from a UX perspective 🙏
// socket is bound to, and not `IP_ADDR_ANY`. We need to use the same address | ||
// for the multicast group join that the user has set in the config. | ||
// if systemd{N} fd sockets are required to work too, we should investigate on this. | ||
SocketListenAddr::SystemdFd(_) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can leave this as unimplemented
and potential future work.
.join_multicast_v4(group_addr, *listen_addr.ip()) | ||
.map_err(|error| { | ||
// TODO: is this considered a `SocketBindError`? or should we create a new error for this case? | ||
emit!(SocketBindError { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Strictly speaking joining and binding are different. If it's not too much effort, we can introduce a new error and include the group addr.
Thanks for the review @pront! I will happily address the comments as I prefer to fix those nits before merging! |
Closes #5732
This PR is still in draft. I have a few pending
TODOs
and also missing tests to propertly check this. Although, the happy path is working.In order to test this, use this vector config:
and with this command
you can see logs in vector.
I would like to receive some feedback about how the configuration of this setting should look like.
Also, note that IPv6 is not supported. We can work on that, but maybe it is not worth it if no one request that.
@nomalord take a look into this please, It would be great if you can build the binary from this branch and test if it works in your systems.
@dalesample as you were the first requester of this I also ping you, just in case (although this issue was created 4 years ago)