Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support connections through SOCKS5 proxies #153

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ class KafkaAdminClient:
sasl mechanism handshake. Default: one of bootstrap servers
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
instance. (See kafka.oauth.abstract). Default: None
socks5_proxy (str): Socks5 proxy url. Default: None
kafka_client (callable): Custom class / callable for creating KafkaClient instances

"""
Expand Down Expand Up @@ -179,6 +180,7 @@ class KafkaAdminClient:
'sasl_kerberos_service_name': 'kafka',
'sasl_kerberos_domain_name': None,
'sasl_oauth_token_provider': None,
'socks5_proxy': None,

# metrics configs
'metric_reporters': [],
Expand Down
2 changes: 2 additions & 0 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ class KafkaClient:
sasl mechanism handshake. Default: one of bootstrap servers
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
instance. (See kafka.oauth.abstract). Default: None
socks5_proxy (str): Socks5 proxy URL. Default: None
raise_upon_socket_err_during_wakeup (bool): If set to True, raise an exception
upon socket error during wakeup(). Default: False
"""
Expand Down Expand Up @@ -178,6 +179,7 @@ class KafkaClient:
'sasl_kerberos_service_name': 'kafka',
'sasl_kerberos_domain_name': None,
'sasl_oauth_token_provider': None,
'socks5_proxy': None,
'raise_upon_socket_err_during_wakeup': False
}

Expand Down
21 changes: 17 additions & 4 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.fetch import FetchRequest
from kafka.protocol.parser import KafkaProtocol
from kafka.protocol.types import Int32, Int8
from kafka.scram import ScramClient
from kafka.socks5_wrapper import Socks5Wrapper
from kafka.version import __version__


Expand Down Expand Up @@ -182,6 +185,7 @@ class BrokerConnection:
sasl mechanism handshake. Default: one of bootstrap servers
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
instance. (See kafka.oauth.abstract). Default: None
socks5_proxy (str): Socks5 proxy url. Default: None
"""

DEFAULT_CONFIG = {
Expand Down Expand Up @@ -215,7 +219,8 @@ class BrokerConnection:
'sasl_plain_password': None,
'sasl_kerberos_service_name': 'kafka',
'sasl_kerberos_domain_name': None,
'sasl_oauth_token_provider': None
'sasl_oauth_token_provider': None,
'socks5_proxy': None,
}
SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL')

Expand All @@ -226,6 +231,7 @@ def __init__(self, host, port, afi, **configs):
self._sock_afi = afi
self._sock_addr = None
self._api_versions = None
self._socks5_proxy = None

self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
Expand Down Expand Up @@ -351,7 +357,11 @@ def connect(self):
log.debug('%s: creating new socket', self)
assert self._sock is None
self._sock_afi, self._sock_addr = next_lookup
self._sock = socket.socket(self._sock_afi, socket.SOCK_STREAM)
if self.config["socks5_proxy"] is not None:
self._socks5_proxy = Socks5Wrapper(self.config["socks5_proxy"], self.afi)
self._sock = self._socks5_proxy.socket(self._sock_afi, socket.SOCK_STREAM)
else:
self._sock = socket.socket(self._sock_afi, socket.SOCK_STREAM)
Comment on lines +360 to +364
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not the cleanest way to go about including this functionality, but a feature added is better than a feature removed...


for option in self.config['socket_options']:
log.debug('%s: setting socket option %s', self, option)
Expand All @@ -368,8 +378,11 @@ def connect(self):
# to check connection status
ret = None
try:
ret = self._sock.connect_ex(self._sock_addr)
except OSError as err:
if self._socks5_proxy:
ret = self._socks5_proxy.connect_ex(self._sock_addr)
else:
ret = self._sock.connect_ex(self._sock_addr)
except socket.error as err:
ret = err.errno

# Connection succeeded
Expand Down
2 changes: 2 additions & 0 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ class KafkaConsumer:
sasl mechanism handshake. Default: one of bootstrap servers
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
instance. (See kafka.oauth.abstract). Default: None
socks5_proxy (str): Socks5 proxy URL. Default: None
kafka_client (callable): Custom class / callable for creating KafkaClient instances
coordinator (callable): Custom class / callable for creating ConsumerCoordinator instances

Expand Down Expand Up @@ -312,6 +313,7 @@ class KafkaConsumer:
'sasl_kerberos_domain_name': None,
'sasl_oauth_token_provider': None,
'legacy_iterator': False, # enable to revert to < 1.4.7 iterator
'socks5_proxy': None,
'kafka_client': KafkaClient,
'coordinator': ConsumerCoordinator,
}
Expand Down
2 changes: 2 additions & 0 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ class KafkaProducer:
sasl mechanism handshake. Default: one of bootstrap servers
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
instance. (See kafka.oauth.abstract). Default: None
socks5_proxy (str): Socks5 proxy URL. Default: None
kafka_client (callable): Custom class / callable for creating KafkaClient instances

Note:
Expand Down Expand Up @@ -330,6 +331,7 @@ class KafkaProducer:
'sasl_kerberos_service_name': 'kafka',
'sasl_kerberos_domain_name': None,
'sasl_oauth_token_provider': None,
'socks5_proxy': None,
'kafka_client': KafkaClient,
}

Expand Down
248 changes: 248 additions & 0 deletions kafka/socks5_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse

import errno
import logging
import random
import socket
import struct

log = logging.getLogger(__name__)


class ProxyConnectionStates:
DISCONNECTED = '<disconnected>'
CONNECTING = '<connecting>'
NEGOTIATE_PROPOSE = '<negotiate_propose>'
NEGOTIATING = '<negotiating>'
AUTHENTICATING = '<authenticating>'
REQUEST_SUBMIT = '<request_submit>'
REQUESTING = '<requesting>'
READ_ADDRESS = '<read_address>'
COMPLETE = '<complete>'


class Socks5Wrapper:
"""Socks5 proxy wrapper

Manages connection through socks5 proxy with support for username/password
authentication.
"""

def __init__(self, proxy_url, afi):
self._buffer_in = b''
self._buffer_out = b''
self._proxy_url = urlparse(proxy_url)
self._sock = None
self._state = ProxyConnectionStates.DISCONNECTED
self._target_afi = socket.AF_UNSPEC

proxy_addrs = self.dns_lookup(self._proxy_url.hostname, self._proxy_url.port, afi)
# TODO raise error on lookup failure
self._proxy_addr = random.choice(proxy_addrs)

@classmethod
def is_inet_4_or_6(cls, gai):
"""Given a getaddrinfo struct, return True iff ipv4 or ipv6"""
return gai[0] in (socket.AF_INET, socket.AF_INET6)

@classmethod
def dns_lookup(cls, host, port, afi=socket.AF_UNSPEC):
"""Returns a list of getaddrinfo structs, optionally filtered to an afi (ipv4 / ipv6)"""
# XXX: all DNS functions in Python are blocking. If we really
# want to be non-blocking here, we need to use a 3rd-party
# library like python-adns, or move resolution onto its
# own thread. This will be subject to the default libc
# name resolution timeout (5s on most Linux boxes)
try:
return list(filter(cls.is_inet_4_or_6,
socket.getaddrinfo(host, port, afi,
socket.SOCK_STREAM)))
except socket.gaierror as ex:
log.warning("DNS lookup failed for proxy %s:%d, %r", host, port, ex)
return []

def socket(self, family, sock_type):
"""Open and record a socket.

Returns the actual underlying socket
object to ensure e.g. selects and ssl wrapping works as expected.
"""
self._target_afi = family # Store the address family of the target
afi, _, _, _, _ = self._proxy_addr
self._sock = socket.socket(afi, sock_type)
return self._sock

def _flush_buf(self):
"""Send out all data that is stored in the outgoing buffer.

It is expected that the caller handles error handling, including non-blocking
as well as connection failure exceptions.
"""
while self._buffer_out:
sent_bytes = self._sock.send(self._buffer_out)
self._buffer_out = self._buffer_out[sent_bytes:]

def _peek_buf(self, datalen):
"""Ensure local inbound buffer has enough data, and return that data without
consuming the local buffer

It's expected that the caller handles e.g. blocking exceptions"""
while True:
bytes_remaining = datalen - len(self._buffer_in)
if bytes_remaining <= 0:
break
data = self._sock.recv(bytes_remaining)
if not data:
break
self._buffer_in = self._buffer_in + data

return self._buffer_in[:datalen]

def _read_buf(self, datalen):
"""Read and consume bytes from socket connection

It's expected that the caller handles e.g. blocking exceptions"""
buf = self._peek_buf(datalen)
if buf:
self._buffer_in = self._buffer_in[len(buf):]
return buf

def connect_ex(self, addr):
"""Runs a state machine through connection to authentication to
proxy connection request.

The somewhat strange setup is to facilitate non-intrusive use from
BrokerConnection state machine.

This function is called with a socket in non-blocking mode. Both
send and receive calls can return in EWOULDBLOCK/EAGAIN which we
specifically avoid handling here. These are handled in main
BrokerConnection connection loop, which then would retry calls
to this function."""

if self._state == ProxyConnectionStates.DISCONNECTED:
self._state = ProxyConnectionStates.CONNECTING

if self._state == ProxyConnectionStates.CONNECTING:
_, _, _, _, sockaddr = self._proxy_addr
ret = self._sock.connect_ex(sockaddr)
if not ret or ret == errno.EISCONN:
self._state = ProxyConnectionStates.NEGOTIATE_PROPOSE
else:
return ret

if self._state == ProxyConnectionStates.NEGOTIATE_PROPOSE:
if self._proxy_url.username and self._proxy_url.password:
# Propose username/password
self._buffer_out = b"\x05\x01\x02"
else:
# Propose no auth
self._buffer_out = b"\x05\x01\x00"
self._state = ProxyConnectionStates.NEGOTIATING

if self._state == ProxyConnectionStates.NEGOTIATING:
self._flush_buf()
buf = self._read_buf(2)
if buf[0:1] != b"\x05":
log.error("Unrecognized SOCKS version")
self._state = ProxyConnectionStates.DISCONNECTED
self._sock.close()
return errno.ECONNREFUSED

if buf[1:2] == b"\x00":
# No authentication required
self._state = ProxyConnectionStates.REQUEST_SUBMIT
elif buf[1:2] == b"\x02":
# Username/password authentication selected
userlen = len(self._proxy_url.username)
passlen = len(self._proxy_url.password)
self._buffer_out = struct.pack(
"!bb{}sb{}s".format(userlen, passlen),
1, # version
userlen,
self._proxy_url.username.encode(),
passlen,
self._proxy_url.password.encode(),
)
self._state = ProxyConnectionStates.AUTHENTICATING
else:
log.error("Unrecognized SOCKS authentication method")
self._state = ProxyConnectionStates.DISCONNECTED
self._sock.close()
return errno.ECONNREFUSED

if self._state == ProxyConnectionStates.AUTHENTICATING:
self._flush_buf()
buf = self._read_buf(2)
if buf == b"\x01\x00":
# Authentication succesful
self._state = ProxyConnectionStates.REQUEST_SUBMIT
else:
log.error("Socks5 proxy authentication failure")
self._state = ProxyConnectionStates.DISCONNECTED
self._sock.close()
return errno.ECONNREFUSED

if self._state == ProxyConnectionStates.REQUEST_SUBMIT:
if self._target_afi == socket.AF_INET:
addr_type = 1
addr_len = 4
elif self._target_afi == socket.AF_INET6:
addr_type = 4
addr_len = 16
else:
log.error("Unknown address family, %r", self._target_afi)
self._state = ProxyConnectionStates.DISCONNECTED
self._sock.close()
return errno.ECONNREFUSED

self._buffer_out = struct.pack(
"!bbbb{}sh".format(addr_len),
5, # version
1, # command: connect
0, # reserved
addr_type, # 1 for ipv4, 4 for ipv6 address
socket.inet_pton(self._target_afi, addr[0]), # either 4 or 16 bytes of actual address
addr[1], # port
)
self._state = ProxyConnectionStates.REQUESTING

if self._state == ProxyConnectionStates.REQUESTING:
self._flush_buf()
buf = self._read_buf(2)
if buf[0:2] == b"\x05\x00":
self._state = ProxyConnectionStates.READ_ADDRESS
else:
log.error("Proxy request failed: %r", buf[1:2])
self._state = ProxyConnectionStates.DISCONNECTED
self._sock.close()
return errno.ECONNREFUSED

if self._state == ProxyConnectionStates.READ_ADDRESS:
# we don't really care about the remote endpoint address, but need to clear the stream
buf = self._peek_buf(2)
if buf[0:2] == b"\x00\x01":
_ = self._read_buf(2 + 4 + 2) # ipv4 address + port
elif buf[0:2] == b"\x00\x05":
_ = self._read_buf(2 + 16 + 2) # ipv6 address + port
else:
log.error("Unrecognized remote address type %r", buf[1:2])
self._state = ProxyConnectionStates.DISCONNECTED
self._sock.close()
return errno.ECONNREFUSED
self._state = ProxyConnectionStates.COMPLETE

if self._state == ProxyConnectionStates.COMPLETE:
return 0

# not reached;
# Send and recv will raise socket error on EWOULDBLOCK/EAGAIN that is assumed to be handled by
# the caller. The caller re-enters this state machine from retry logic with timer or via select & family
log.error("Internal error, state %r not handled correctly", self._state)
self._state = ProxyConnectionStates.DISCONNECTED
if self._sock:
self._sock.close()
return errno.ECONNREFUSED
Loading