forked from dpkp/kafka-python
-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support connections through SOCKS5 proxies #153
Open
wbarnha
wants to merge
4
commits into
wbarnha:master
Choose a base branch
from
hnousiainen:socks5_support
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 3 commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,248 @@ | ||
try: | ||
from urllib.parse import urlparse | ||
except ImportError: | ||
from urlparse import urlparse | ||
|
||
import errno | ||
import logging | ||
import random | ||
import socket | ||
import struct | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
|
||
class ProxyConnectionStates: | ||
DISCONNECTED = '<disconnected>' | ||
CONNECTING = '<connecting>' | ||
NEGOTIATE_PROPOSE = '<negotiate_propose>' | ||
NEGOTIATING = '<negotiating>' | ||
AUTHENTICATING = '<authenticating>' | ||
REQUEST_SUBMIT = '<request_submit>' | ||
REQUESTING = '<requesting>' | ||
READ_ADDRESS = '<read_address>' | ||
COMPLETE = '<complete>' | ||
|
||
|
||
class Socks5Wrapper: | ||
"""Socks5 proxy wrapper | ||
|
||
Manages connection through socks5 proxy with support for username/password | ||
authentication. | ||
""" | ||
|
||
def __init__(self, proxy_url, afi): | ||
self._buffer_in = b'' | ||
self._buffer_out = b'' | ||
self._proxy_url = urlparse(proxy_url) | ||
self._sock = None | ||
self._state = ProxyConnectionStates.DISCONNECTED | ||
self._target_afi = socket.AF_UNSPEC | ||
|
||
proxy_addrs = self.dns_lookup(self._proxy_url.hostname, self._proxy_url.port, afi) | ||
# TODO raise error on lookup failure | ||
self._proxy_addr = random.choice(proxy_addrs) | ||
|
||
@classmethod | ||
def is_inet_4_or_6(cls, gai): | ||
"""Given a getaddrinfo struct, return True iff ipv4 or ipv6""" | ||
return gai[0] in (socket.AF_INET, socket.AF_INET6) | ||
|
||
@classmethod | ||
def dns_lookup(cls, host, port, afi=socket.AF_UNSPEC): | ||
"""Returns a list of getaddrinfo structs, optionally filtered to an afi (ipv4 / ipv6)""" | ||
# XXX: all DNS functions in Python are blocking. If we really | ||
# want to be non-blocking here, we need to use a 3rd-party | ||
# library like python-adns, or move resolution onto its | ||
# own thread. This will be subject to the default libc | ||
# name resolution timeout (5s on most Linux boxes) | ||
try: | ||
return list(filter(cls.is_inet_4_or_6, | ||
socket.getaddrinfo(host, port, afi, | ||
socket.SOCK_STREAM))) | ||
except socket.gaierror as ex: | ||
log.warning("DNS lookup failed for proxy %s:%d, %r", host, port, ex) | ||
return [] | ||
|
||
def socket(self, family, sock_type): | ||
"""Open and record a socket. | ||
|
||
Returns the actual underlying socket | ||
object to ensure e.g. selects and ssl wrapping works as expected. | ||
""" | ||
self._target_afi = family # Store the address family of the target | ||
afi, _, _, _, _ = self._proxy_addr | ||
self._sock = socket.socket(afi, sock_type) | ||
return self._sock | ||
|
||
def _flush_buf(self): | ||
"""Send out all data that is stored in the outgoing buffer. | ||
|
||
It is expected that the caller handles error handling, including non-blocking | ||
as well as connection failure exceptions. | ||
""" | ||
while self._buffer_out: | ||
sent_bytes = self._sock.send(self._buffer_out) | ||
self._buffer_out = self._buffer_out[sent_bytes:] | ||
|
||
def _peek_buf(self, datalen): | ||
"""Ensure local inbound buffer has enough data, and return that data without | ||
consuming the local buffer | ||
|
||
It's expected that the caller handles e.g. blocking exceptions""" | ||
while True: | ||
bytes_remaining = datalen - len(self._buffer_in) | ||
if bytes_remaining <= 0: | ||
break | ||
data = self._sock.recv(bytes_remaining) | ||
if not data: | ||
break | ||
self._buffer_in = self._buffer_in + data | ||
|
||
return self._buffer_in[:datalen] | ||
|
||
def _read_buf(self, datalen): | ||
"""Read and consume bytes from socket connection | ||
|
||
It's expected that the caller handles e.g. blocking exceptions""" | ||
buf = self._peek_buf(datalen) | ||
if buf: | ||
self._buffer_in = self._buffer_in[len(buf):] | ||
return buf | ||
|
||
def connect_ex(self, addr): | ||
"""Runs a state machine through connection to authentication to | ||
proxy connection request. | ||
|
||
The somewhat strange setup is to facilitate non-intrusive use from | ||
BrokerConnection state machine. | ||
|
||
This function is called with a socket in non-blocking mode. Both | ||
send and receive calls can return in EWOULDBLOCK/EAGAIN which we | ||
specifically avoid handling here. These are handled in main | ||
BrokerConnection connection loop, which then would retry calls | ||
to this function.""" | ||
|
||
if self._state == ProxyConnectionStates.DISCONNECTED: | ||
self._state = ProxyConnectionStates.CONNECTING | ||
|
||
if self._state == ProxyConnectionStates.CONNECTING: | ||
_, _, _, _, sockaddr = self._proxy_addr | ||
ret = self._sock.connect_ex(sockaddr) | ||
if not ret or ret == errno.EISCONN: | ||
self._state = ProxyConnectionStates.NEGOTIATE_PROPOSE | ||
else: | ||
return ret | ||
|
||
if self._state == ProxyConnectionStates.NEGOTIATE_PROPOSE: | ||
if self._proxy_url.username and self._proxy_url.password: | ||
# Propose username/password | ||
self._buffer_out = b"\x05\x01\x02" | ||
else: | ||
# Propose no auth | ||
self._buffer_out = b"\x05\x01\x00" | ||
self._state = ProxyConnectionStates.NEGOTIATING | ||
|
||
if self._state == ProxyConnectionStates.NEGOTIATING: | ||
self._flush_buf() | ||
buf = self._read_buf(2) | ||
if buf[0:1] != b"\x05": | ||
log.error("Unrecognized SOCKS version") | ||
self._state = ProxyConnectionStates.DISCONNECTED | ||
self._sock.close() | ||
return errno.ECONNREFUSED | ||
|
||
if buf[1:2] == b"\x00": | ||
# No authentication required | ||
self._state = ProxyConnectionStates.REQUEST_SUBMIT | ||
elif buf[1:2] == b"\x02": | ||
# Username/password authentication selected | ||
userlen = len(self._proxy_url.username) | ||
passlen = len(self._proxy_url.password) | ||
self._buffer_out = struct.pack( | ||
"!bb{}sb{}s".format(userlen, passlen), | ||
1, # version | ||
userlen, | ||
self._proxy_url.username.encode(), | ||
passlen, | ||
self._proxy_url.password.encode(), | ||
) | ||
self._state = ProxyConnectionStates.AUTHENTICATING | ||
else: | ||
log.error("Unrecognized SOCKS authentication method") | ||
self._state = ProxyConnectionStates.DISCONNECTED | ||
self._sock.close() | ||
return errno.ECONNREFUSED | ||
|
||
if self._state == ProxyConnectionStates.AUTHENTICATING: | ||
self._flush_buf() | ||
buf = self._read_buf(2) | ||
if buf == b"\x01\x00": | ||
# Authentication succesful | ||
self._state = ProxyConnectionStates.REQUEST_SUBMIT | ||
else: | ||
log.error("Socks5 proxy authentication failure") | ||
self._state = ProxyConnectionStates.DISCONNECTED | ||
self._sock.close() | ||
return errno.ECONNREFUSED | ||
|
||
if self._state == ProxyConnectionStates.REQUEST_SUBMIT: | ||
if self._target_afi == socket.AF_INET: | ||
addr_type = 1 | ||
addr_len = 4 | ||
elif self._target_afi == socket.AF_INET6: | ||
addr_type = 4 | ||
addr_len = 16 | ||
else: | ||
log.error("Unknown address family, %r", self._target_afi) | ||
self._state = ProxyConnectionStates.DISCONNECTED | ||
self._sock.close() | ||
return errno.ECONNREFUSED | ||
|
||
self._buffer_out = struct.pack( | ||
"!bbbb{}sh".format(addr_len), | ||
5, # version | ||
1, # command: connect | ||
0, # reserved | ||
addr_type, # 1 for ipv4, 4 for ipv6 address | ||
socket.inet_pton(self._target_afi, addr[0]), # either 4 or 16 bytes of actual address | ||
addr[1], # port | ||
) | ||
self._state = ProxyConnectionStates.REQUESTING | ||
|
||
if self._state == ProxyConnectionStates.REQUESTING: | ||
self._flush_buf() | ||
buf = self._read_buf(2) | ||
if buf[0:2] == b"\x05\x00": | ||
self._state = ProxyConnectionStates.READ_ADDRESS | ||
else: | ||
log.error("Proxy request failed: %r", buf[1:2]) | ||
self._state = ProxyConnectionStates.DISCONNECTED | ||
self._sock.close() | ||
return errno.ECONNREFUSED | ||
|
||
if self._state == ProxyConnectionStates.READ_ADDRESS: | ||
# we don't really care about the remote endpoint address, but need to clear the stream | ||
buf = self._peek_buf(2) | ||
if buf[0:2] == b"\x00\x01": | ||
_ = self._read_buf(2 + 4 + 2) # ipv4 address + port | ||
elif buf[0:2] == b"\x00\x05": | ||
_ = self._read_buf(2 + 16 + 2) # ipv6 address + port | ||
else: | ||
log.error("Unrecognized remote address type %r", buf[1:2]) | ||
self._state = ProxyConnectionStates.DISCONNECTED | ||
self._sock.close() | ||
return errno.ECONNREFUSED | ||
self._state = ProxyConnectionStates.COMPLETE | ||
|
||
if self._state == ProxyConnectionStates.COMPLETE: | ||
return 0 | ||
|
||
# not reached; | ||
# Send and recv will raise socket error on EWOULDBLOCK/EAGAIN that is assumed to be handled by | ||
# the caller. The caller re-enters this state machine from retry logic with timer or via select & family | ||
log.error("Internal error, state %r not handled correctly", self._state) | ||
self._state = ProxyConnectionStates.DISCONNECTED | ||
if self._sock: | ||
self._sock.close() | ||
return errno.ECONNREFUSED |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not the cleanest way to go about including this functionality, but a feature added is better than a feature removed...