Close

Create a Socks5 Proxy Server Using Python Programming Language

Jeeva
Jeeva
Apr 29, 2024 • Python, Proxy
Create a Socks5 Proxy Server Using Python Programming Language

SOCKS is an Internet protocol that exchanges network packets between a client and server through a proxy server. SOCKS5 optionally provides authentication so only authorized users may access a server.

Introduction

In today's connected world, internet security and privacy are increasingly important concerns. One way to enhance security and privacy while using the internet is by using a proxy server. A proxy server acts as an intermediary between your computer and the internet, routing your traffic through a different server and masking your IP address.

SOCKS5 is one of the most popular types of proxy servers, offering a high level of security and flexibility. In this article, we'll walk through the process of building a SOCKS5 proxy server using Python. With Python's powerful networking libraries and easy-to-learn syntax, building a proxy server is easier than you might think.

By the end of this article, you'll have a working SOCKS5 proxy server that you can use to enhance your online security and privacy. So, let's get started!

Setting Up Project

If you are not already installed, then install the Python3 from this URL Python.org. The create a file called proxy.py and open the file in your favourite code editor. I am using VS Code as my editor. Following commands can be used in Linux terminal or Windows powershell.

$ cd Desktop
$ mkdir "Socks5 Proxy Server"
$ cd "Socks5 Proxy Server"
$ code proxy.py

Python Modules

Import the following modules.

import socket
import threading
import select

socket module:

The 'socket' module defines how server and client machines can communicate at hardware level using socket endpoints on top of the operating system. The 'socket' API supports both connection-oriented and connectionless network protocols.

threading module:

Python threading allows you to have different parts of your program run concurrently and can simplify your design. If you've got some experience in Python and want to speed up your program using threads, then this tutorial is for you!

select module:

select() Python's select() function is a direct interface to the underlying operating system implementation. It monitors sockets, open files, and pipes (anything with a fileno() method that returns a valid file descriptor) until they become readable or writable, or a communication error occurs.

Creating a Proxy Class

Then create a Proxy class that will be the entry point of our socks5 proxy server. username and password will be the authentication for our socks5 proxy.

SOCKS_VERSION = 5

class Proxy:
    def __init__(self):
        self.username = "username"
        self.password = "password"

    def run(self, host, port):
        pass


if __name__ == "__main__":
    proxy = Proxy()
    proxy.run("127.0.0.1", 3000)

Creating Server Socket & Binding

Inside the run method we can initialize and bind our server socket. Once we started listen for incoming connection, then we can use the .accept() to accept the connection this method will return a tuple that has client socket and address.

def run(self, host, port):
    s = socket.socket(socket.AF_INET, socket.SOCKS_STREAM)
    s.bind((host, port))
    s.listen()

    while True:
        conn, addr = s.accept()
        print("* new connection from {}".format(addr))
        t = threading.Thread(target=self.handle_client, args=(conn,))
        t.start()

Handle Client

def handle_client(self, connection):
    # greeting header
    # read and unpack 2 bytes from client
    version, nmethods = connection.recv(2)

Read two bytes from client, this will return version and method number we will use the method number to check if it's an auth request. The version will be the version of your SOCKS proxy in this case it will be 5.

# get available methods [0, 1, 2]
methods = self.get_available_methods(nmethods, connection)

# accept only USERNAME/PASSWORD auth
if 2 not in set(methods):
    # close connection
    connection.close()
    return

get_available_methods implementation as follow.

def get_available_methods(self, nmethods, connection):
    methods = []
    for _ in range(nmethods):
        methods.append(ord(connection.recv(1)))
    return methods

This will get all available methods from the client, and we can also use this to check if the request contains auth. If it doen't have auth then we can close the connection.

# send welcome message
connection.sendall(bytes([SOCKS_VERSION, 2]))

If everything alright then, we can send a welcome message to client.

if not self.verify_credentials(connection):
    return

verify_credentials implementation as follow.

def verify_credentials(self, connection):
    version = ord(connection.recv(1)) # should be 1
    username_len = ord(connection.recv(1))
    username = connection.recv(username_len).decode('utf-8')
    password_len = ord(connection.recv(1))
    password = connection.recv(password_len).decode('utf-8')

    if username == self.username and password == self.password:
        # success, status = 0
        response = bytes([version, 0])
        connection.sendall(response)
        return True

    # failure, status != 0
    response = bytes([version, 0xFF])
    connection.sendall(response)
    connection.close()
    return False

Now we can check if the username and password for the SOCKS5 proxy is correct, if it is then we will send an awk response or we will send a fail reply and close the connection.

# request (version=5)
version, cmd, _, address_type = connection.recv(4)

if address_type == 1: # IPv4
    address = socket.inet_ntoa(connection.recv(4))
elif address_type == 3: # Domain name
    domain_length = connection.recv(1)[0]
    address = connection.recv(domain_length)
    address = socket.gethostbyname(address)

# convert bytes to unsigned short array
port = int.from_bytes(connection.recv(2), 'big', signed=False)

Then read 4 bytes from client, the first byte contains the SOCKS version, second byte contains command like CONNECT. fourth byte we ignore it and the fifth byte will contains the address type like IPv4 or Domain. We will use these information to create the address and port accordingly.

try:
    if cmd == 1: # CONNECT
        remote = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        remote.connect((address, port))
        bind_address = remote.getsockname()
        print("* Connected to {} {}".format(address, port))
    else:
        connection.close()

    addr = int.from_bytes(socket.inet_aton(bind_address[0]), 'big', signed=False)
    port = bind_address[1]
    reply = b''.join([
        SOCKS_VERSION.to_bytes(1, 'big'),
        int(0).to_bytes(1, 'big'),
        int(0).to_bytes(1, 'big'),
        int(1).to_bytes(1, 'big'),
        addr.to_bytes(4, 'big'),
        port.to_bytes(2, 'big')
    ])
except Exception as e:
    # return connection refused error
    reply = self.generate_failed_reply(address_type, 5)

connection.sendall(reply)

# establish data exchange
if reply[1] == 0 and cmd == 1:
    self.exchange_loop(connection, remote)

connection.close()

After we done all of verification, we will do the handshake and ready to create the exchange loop.

The Exchange Loop

def exchange_loop(self, client, remote):
    while True:
        # wait until client or remote is available for read
        r, w, e = select.select([client, remote], [], [])

        if client in r:
            data = client.recv(4096)
            if remote.send(data) <= 0:
                break

        if remote in r:
            data = remote.recv(4096)
            if client.send(data) <= 0:
                break

By using the select module we can check if the client or remote socket is readable or writable, if the client socket is readable then we read 4KB from the client socket and send that to the remote, and if the remote socket is readable then we read 4KB from remote and send that to the cleint.

Full code

import socket
import threading
import select



SOCKS_VERSION = 5


class Proxy:
    def __init__(self):
        self.username = "username"
        self.password = "password"

    def handle_client(self, connection):
        # greeting header
        # read and unpack 2 bytes from a client
        version, nmethods = connection.recv(2)

        # get available methods [0, 1, 2]
        methods = self.get_available_methods(nmethods, connection)

        # accept only USERNAME/PASSWORD auth
        if 2 not in set(methods):
            # close connection
            connection.close()
            return

        # send welcome message
        connection.sendall(bytes([SOCKS_VERSION, 2]))

        if not self.verify_credentials(connection):
            return

        # request (version=5)
        version, cmd, _, address_type = connection.recv(4)

        if address_type == 1:  # IPv4
            address = socket.inet_ntoa(connection.recv(4))
        elif address_type == 3:  # Domain name
            domain_length = connection.recv(1)[0]
            address = connection.recv(domain_length)
            address = socket.gethostbyname(address)

        # convert bytes to unsigned short array
        port = int.from_bytes(connection.recv(2), 'big', signed=False)

        try:
            if cmd == 1:  # CONNECT
                remote = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                remote.connect((address, port))
                bind_address = remote.getsockname()
                print("* Connected to {} {}".format(address, port))
            else:
                connection.close()

            addr = int.from_bytes(socket.inet_aton(bind_address[0]), 'big', signed=False)
            port = bind_address[1]

            reply = b''.join([
                SOCKS_VERSION.to_bytes(1, 'big'),
                int(0).to_bytes(1, 'big'),
                int(0).to_bytes(1, 'big'),
                int(1).to_bytes(1, 'big'),
                addr.to_bytes(4, 'big'),
                port.to_bytes(2, 'big')
            ])
        except Exception as e:
            # return connection refused error
            reply = self.generate_failed_reply(address_type, 5)

        connection.sendall(reply)

        # establish data exchange
        if reply[1] == 0 and cmd == 1:
            self.exchange_loop(connection, remote)

        connection.close()


    def exchange_loop(self, client, remote):
        while True:
            # wait until client or remote is available for read
            r, w, e = select.select([client, remote], [], [])

            if client in r:
                data = client.recv(4096)
                if remote.send(data) <= 0:
                    break

            if remote in r:
                data = remote.recv(4096)
                if client.send(data) <= 0:
                    break


    def generate_failed_reply(self, address_type, error_number):
        return b''.join([
            SOCKS_VERSION.to_bytes(1, 'big'),
            error_number.to_bytes(1, 'big'),
            int(0).to_bytes(1, 'big'),
            address_type.to_bytes(1, 'big'),
            int(0).to_bytes(4, 'big'),
            int(0).to_bytes(4, 'big')
        ])


    def verify_credentials(self, connection):
        version = ord(connection.recv(1)) # should be 1

        username_len = ord(connection.recv(1))
        username = connection.recv(username_len).decode('utf-8')

        password_len = ord(connection.recv(1))
        password = connection.recv(password_len).decode('utf-8')

        if username == self.username and password == self.password:
            # success, status = 0
            response = bytes([version, 0])
            connection.sendall(response)
            return True

        # failure, status != 0
        response = bytes([version, 0xFF])
        connection.sendall(response)
        connection.close()
        return False


    def get_available_methods(self, nmethods, connection):
        methods = []
        for i in range(nmethods):
            methods.append(ord(connection.recv(1)))
        return methods

    def run(self, host, port):
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.bind((host, port))
        s.listen()

        print("* Socks5 proxy server is running on {}:{}".format(host, port))

        while True:
            conn, addr = s.accept()
            print("* new connection from {}".format(addr))
            t = threading.Thread(target=self.handle_client, args=(conn,))
            t.start()


if __name__ == "__main__":
    proxy = Proxy()
    proxy.run("127.0.0.1", 3000)