SOCKS is an Internet protocol that exchanges network packets between a client and server through a proxy server. SOCKS5 optionally provides authentication so only authorized users may access a server.
In today's connected world, internet security and privacy are increasingly important concerns. One way to enhance security and privacy while using the internet is by using a proxy server. A proxy server acts as an intermediary between your computer and the internet, routing your traffic through a different server and masking your IP address.
SOCKS5 is one of the most popular types of proxy servers, offering a high level of security and flexibility. In this article, we'll walk through the process of building a SOCKS5 proxy server using Python. With Python's powerful networking libraries and easy-to-learn syntax, building a proxy server is easier than you might think.
By the end of this article, you'll have a working SOCKS5 proxy server that you can use to enhance your online security and privacy. So, let's get started!
If you are not already installed, then install the Python3 from this URL Python.org. The create a file called proxy.py and open the file in your favourite code editor. I am using VS Code as my editor. Following commands can be used in Linux terminal or Windows powershell.
$ cd Desktop
$ mkdir "Socks5 Proxy Server"
$ cd "Socks5 Proxy Server"
$ code proxy.py
Import the following modules.
import socket
import threading
import select
socket module:
The 'socket' module defines how server and client machines can communicate at hardware level using socket endpoints on top of the operating system. The 'socket' API supports both connection-oriented and connectionless network protocols.
threading module:
Python threading allows you to have different parts of your program run concurrently and can simplify your design. If you've got some experience in Python and want to speed up your program using threads, then this tutorial is for you!
select module:
select()
Python's select()
function is a direct interface to the underlying operating system implementation. It monitors sockets, open files, and pipes (anything with a fileno()
method that returns a valid file descriptor) until they become readable or writable, or a communication error occurs.
Then create a Proxy class that will be the entry point of our socks5 proxy server. username and password will be the authentication for our socks5 proxy.
SOCKS_VERSION = 5
class Proxy:
def __init__(self):
self.username = "username"
self.password = "password"
def run(self, host, port):
pass
if __name__ == "__main__":
proxy = Proxy()
proxy.run("127.0.0.1", 3000)
Inside the run method we can initialize and bind our server socket. Once we started listen for incoming connection, then we can use the .accept()
to accept the connection this method will return a tuple that has client socket and address.
def run(self, host, port):
s = socket.socket(socket.AF_INET, socket.SOCKS_STREAM)
s.bind((host, port))
s.listen()
while True:
conn, addr = s.accept()
print("* new connection from {}".format(addr))
t = threading.Thread(target=self.handle_client, args=(conn,))
t.start()
def handle_client(self, connection):
# greeting header
# read and unpack 2 bytes from client
version, nmethods = connection.recv(2)
Read two bytes from client, this will return version
and method number
we will use the method number to check if it's an auth request. The version will be the version of your SOCKS proxy in this case it will be 5.
# get available methods [0, 1, 2]
methods = self.get_available_methods(nmethods, connection)
# accept only USERNAME/PASSWORD auth
if 2 not in set(methods):
# close connection
connection.close()
return
get_available_methods
implementation as follow.
def get_available_methods(self, nmethods, connection):
methods = []
for _ in range(nmethods):
methods.append(ord(connection.recv(1)))
return methods
This will get all available methods from the client, and we can also use this to check if the request contains auth. If it doen't have auth then we can close the connection.
# send welcome message
connection.sendall(bytes([SOCKS_VERSION, 2]))
If everything alright then, we can send a welcome message to client.
if not self.verify_credentials(connection):
return
verify_credentials
implementation as follow.
def verify_credentials(self, connection):
version = ord(connection.recv(1)) # should be 1
username_len = ord(connection.recv(1))
username = connection.recv(username_len).decode('utf-8')
password_len = ord(connection.recv(1))
password = connection.recv(password_len).decode('utf-8')
if username == self.username and password == self.password:
# success, status = 0
response = bytes([version, 0])
connection.sendall(response)
return True
# failure, status != 0
response = bytes([version, 0xFF])
connection.sendall(response)
connection.close()
return False
Now we can check if the username and password for the SOCKS5 proxy is correct, if it is then we will send an awk response or we will send a fail reply and close the connection.
# request (version=5)
version, cmd, _, address_type = connection.recv(4)
if address_type == 1: # IPv4
address = socket.inet_ntoa(connection.recv(4))
elif address_type == 3: # Domain name
domain_length = connection.recv(1)[0]
address = connection.recv(domain_length)
address = socket.gethostbyname(address)
# convert bytes to unsigned short array
port = int.from_bytes(connection.recv(2), 'big', signed=False)
Then read 4 bytes from client, the first byte contains the SOCKS version, second byte contains command like CONNECT. fourth byte we ignore it and the fifth byte will contains the address type like IPv4
or Domain
. We will use these information to create the address and port accordingly.
try:
if cmd == 1: # CONNECT
remote = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
remote.connect((address, port))
bind_address = remote.getsockname()
print("* Connected to {} {}".format(address, port))
else:
connection.close()
addr = int.from_bytes(socket.inet_aton(bind_address[0]), 'big', signed=False)
port = bind_address[1]
reply = b''.join([
SOCKS_VERSION.to_bytes(1, 'big'),
int(0).to_bytes(1, 'big'),
int(0).to_bytes(1, 'big'),
int(1).to_bytes(1, 'big'),
addr.to_bytes(4, 'big'),
port.to_bytes(2, 'big')
])
except Exception as e:
# return connection refused error
reply = self.generate_failed_reply(address_type, 5)
connection.sendall(reply)
# establish data exchange
if reply[1] == 0 and cmd == 1:
self.exchange_loop(connection, remote)
connection.close()
After we done all of verification, we will do the handshake and ready to create the exchange loop.
def exchange_loop(self, client, remote):
while True:
# wait until client or remote is available for read
r, w, e = select.select([client, remote], [], [])
if client in r:
data = client.recv(4096)
if remote.send(data) <= 0:
break
if remote in r:
data = remote.recv(4096)
if client.send(data) <= 0:
break
By using the select
module we can check if the client or remote socket is readable or writable, if the client socket is readable then we read 4KB from the client socket and send that to the remote, and if the remote socket is readable then we read 4KB from remote and send that to the cleint.
import socket
import threading
import select
SOCKS_VERSION = 5
class Proxy:
def __init__(self):
self.username = "username"
self.password = "password"
def handle_client(self, connection):
# greeting header
# read and unpack 2 bytes from a client
version, nmethods = connection.recv(2)
# get available methods [0, 1, 2]
methods = self.get_available_methods(nmethods, connection)
# accept only USERNAME/PASSWORD auth
if 2 not in set(methods):
# close connection
connection.close()
return
# send welcome message
connection.sendall(bytes([SOCKS_VERSION, 2]))
if not self.verify_credentials(connection):
return
# request (version=5)
version, cmd, _, address_type = connection.recv(4)
if address_type == 1: # IPv4
address = socket.inet_ntoa(connection.recv(4))
elif address_type == 3: # Domain name
domain_length = connection.recv(1)[0]
address = connection.recv(domain_length)
address = socket.gethostbyname(address)
# convert bytes to unsigned short array
port = int.from_bytes(connection.recv(2), 'big', signed=False)
try:
if cmd == 1: # CONNECT
remote = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
remote.connect((address, port))
bind_address = remote.getsockname()
print("* Connected to {} {}".format(address, port))
else:
connection.close()
addr = int.from_bytes(socket.inet_aton(bind_address[0]), 'big', signed=False)
port = bind_address[1]
reply = b''.join([
SOCKS_VERSION.to_bytes(1, 'big'),
int(0).to_bytes(1, 'big'),
int(0).to_bytes(1, 'big'),
int(1).to_bytes(1, 'big'),
addr.to_bytes(4, 'big'),
port.to_bytes(2, 'big')
])
except Exception as e:
# return connection refused error
reply = self.generate_failed_reply(address_type, 5)
connection.sendall(reply)
# establish data exchange
if reply[1] == 0 and cmd == 1:
self.exchange_loop(connection, remote)
connection.close()
def exchange_loop(self, client, remote):
while True:
# wait until client or remote is available for read
r, w, e = select.select([client, remote], [], [])
if client in r:
data = client.recv(4096)
if remote.send(data) <= 0:
break
if remote in r:
data = remote.recv(4096)
if client.send(data) <= 0:
break
def generate_failed_reply(self, address_type, error_number):
return b''.join([
SOCKS_VERSION.to_bytes(1, 'big'),
error_number.to_bytes(1, 'big'),
int(0).to_bytes(1, 'big'),
address_type.to_bytes(1, 'big'),
int(0).to_bytes(4, 'big'),
int(0).to_bytes(4, 'big')
])
def verify_credentials(self, connection):
version = ord(connection.recv(1)) # should be 1
username_len = ord(connection.recv(1))
username = connection.recv(username_len).decode('utf-8')
password_len = ord(connection.recv(1))
password = connection.recv(password_len).decode('utf-8')
if username == self.username and password == self.password:
# success, status = 0
response = bytes([version, 0])
connection.sendall(response)
return True
# failure, status != 0
response = bytes([version, 0xFF])
connection.sendall(response)
connection.close()
return False
def get_available_methods(self, nmethods, connection):
methods = []
for i in range(nmethods):
methods.append(ord(connection.recv(1)))
return methods
def run(self, host, port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((host, port))
s.listen()
print("* Socks5 proxy server is running on {}:{}".format(host, port))
while True:
conn, addr = s.accept()
print("* new connection from {}".format(addr))
t = threading.Thread(target=self.handle_client, args=(conn,))
t.start()
if __name__ == "__main__":
proxy = Proxy()
proxy.run("127.0.0.1", 3000)