如何使用Wireshark抓包分析socks5协议

如何使用Wireshark构造Socks5协议的pcap包

运行环境

服务器:Debian 10

客户端:Kali linux

安装PySocks

安装Python3-pip服务

apt-get update
apt-get install python3-pip

使用pip安装PySocks

后面的python脚本中,import的包中使用到了PySocks第三方库,这里需要事先安装好PySocks

pip3 install PySocks

开启wireshark抓包

开启wireshark抓包,然后运行以下内容

服务器端运行python脚本

设置好编码格式

这里需要在python的代码里设置好UTF-8编码,# coding=UTF-8

不然会出现报错

Non-ASCII character '\xe5' 

Python脚本代码

新建一个server.py的文件

# coding=UTF-8
import select
import socket
import struct
from socketserver import StreamRequestHandler, ThreadingTCPServer
SOCKS_VERSION = 5
class SocksProxy(StreamRequestHandler):
    def handle(self):
        print('Accepting connection from {}'.format(self.client_address))
        # 协商
        # 从客户端读取并解包两个字节的数据
        header = self.connection.recv(2)
        version, nmethods = struct.unpack("!BB", header)
        # 设置socks5协议,METHODS字段的数目大于0
        assert version == SOCKS_VERSION
        assert nmethods > 0
        # 接受支持的方法
        methods = self.get_available_methods(nmethods)
        # 无需认证
        if 0 not in set(methods):
            self.server.close_request(self.request)
            return
        # 发送协商响应数据包
        self.connection.sendall(struct.pack("!BB", SOCKS_VERSION, 0))
        # 请求
        version, cmd, _, address_type = struct.unpack("!BBBB", self.connection.recv(4))
        assert version == SOCKS_VERSION
        if address_type == 1:  # IPv4
            address = socket.inet_ntoa(self.connection.recv(4))
        elif address_type == 3:  # Domain name
            domain_length = self.connection.recv(1)[0]
            address = self.connection.recv(domain_length)
            #address = socket.gethostbyname(address.decode("UTF-8"))  # 将域名转化为IP,这一行可以去掉
        elif address_type == 4: # IPv6
            addr_ip = self.connection.recv(16)
            address = socket.inet_ntop(socket.AF_INET6, addr_ip)
        else:
            self.server.close_request(self.request)
            return
        port = struct.unpack('!H', self.connection.recv(2))[0]
        # 响应,只支持CONNECT请求
        try:
            if cmd == 1:  # CONNECT
                remote = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                remote.connect((address, port))
                bind_address = remote.getsockname()
                print('Connected to {} {}'.format(address, port))
            else:
                self.server.close_request(self.request)
            addr = struct.unpack("!I", socket.inet_aton(bind_address[0]))[0]
            port = bind_address[1]
            #reply = struct.pack("!BBBBIH", SOCKS_VERSION, 0, 0, address_type, addr, port)
            # 注意:按照标准协议,返回的应该是对应的address_type,但是实际测试发现,当address_type=3,也就是说是域名类型时,会出现卡死情况,但是将address_type该为1,则不管是IP类型和域名类型都能正常运行
            reply = struct.pack("!BBBBIH", SOCKS_VERSION, 0, 0, 1, addr, port)
        except Exception as err:
            logging.error(err)
            # 响应拒绝连接的错误
            reply = self.generate_failed_reply(address_type, 5)
        self.connection.sendall(reply)
        # 建立连接成功,开始交换数据
        if reply[1] == 0 and cmd == 1:
            self.exchange_loop(self.connection, remote)
        self.server.close_request(self.request)
    def get_available_methods(self, n):
        methods = []
        for i in range(n):
            methods.append(ord(self.connection.recv(1)))
        return methods
    def generate_failed_reply(self, address_type, error_number):
        return struct.pack("!BBBBIH", SOCKS_VERSION, error_number, 0, address_type, 0, 0)
    def exchange_loop(self, client, remote):
        while True:
            # 等待数据
            r, w, e = select.select([client, remote], [], [])
            if client in r:
                data = client.recv(4096)
                if remote.send(data) <= 0:
                    break
            if remote in r:
                data = remote.recv(4096)
                if client.send(data) <= 0:
                    break
if __name__ == '__main__':
    # 使用socketserver库的多线程服务器ThreadingTCPServer启动代理
    with ThreadingTCPServer(('0.0.0.0', 9011), SocksProxy) as server:
        server.serve_forever()

使用Python3运行

python3 server.py

客户端连接

curl -v --socks5 192.168.47.129:9011 http://www.baidu.com
客户端界面截图
服务器端界面截图

Wireshark抓包结果

05表示socks5协议,00表示无需认证的认证方法,我们这里没设置用户名和密码

如果你需要设置必须认证用户名和密码的socks5协议,可以参考以下代码

import logging
import select
import socket
import struct
from socketserver import StreamRequestHandler, ThreadingTCPServer
logging.basicConfig(level=logging.DEBUG)
SOCKS_VERSION = 5
class SocksProxy(StreamRequestHandler):
   # 用户名为username
    username = 'username'
   # 密码为password
    password = 'password'
    def handle(self):
        logging.info('Accepting connection from %s:%s' % self.client_address)
        # 协商
        # 从客户端读取并解包两个字节的数据
        header = self.connection.recv(2)
        version, nmethods = struct.unpack("!BB", header)
        # 设置socks5协议,METHODS字段的数目大于0
        assert version == SOCKS_VERSION
        assert nmethods > 0
        # 接受支持的方法
        methods = self.get_available_methods(nmethods)
        # 检查是否支持用户名/密码认证方式,不支持则断开连接
        if 2 not in set(methods):
            self.server.close_request(self.request)
            return
        # 发送协商响应数据包
        self.connection.sendall(struct.pack("!BB", SOCKS_VERSION, 2))
        # 校验用户名和密码
        if not self.verify_credentials():
            return
        # 请求
        version, cmd, _, address_type = struct.unpack("!BBBB", self.connection.recv(4))
        assert version == SOCKS_VERSION
        if address_type == 1:  # IPv4
            address = socket.inet_ntoa(self.connection.recv(4))
        elif address_type == 3:  # 域名
            domain_length = self.connection.recv(1)[0]
            address = self.connection.recv(domain_length)
        elif address_type == 4: # IPv6
            addr_ip = self.connection.recv(16)
            address = socket.inet_ntop(socket.AF_INET6, addr_ip)
        else:
            self.server.close_request(self.request)
            return
        port = struct.unpack('!H', self.connection.recv(2))[0]
        # 响应,只支持CONNECT请求
        try:
            if cmd == 1:  # CONNECT
                remote = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                remote.connect((address, port))
                bind_address = remote.getsockname()
                logging.info('Connected to %s %s' % (address, port))
            else:
                self.server.close_request(self.request)
            addr = struct.unpack("!I", socket.inet_aton(bind_address[0]))[0]
            port = bind_address[1]
            reply = struct.pack("!BBBBIH", SOCKS_VERSION, 0, 0, 1, addr, port)
        except Exception as err:
            logging.error(err)
            # 响应拒绝连接的错误
            reply = self.generate_failed_reply(address_type, 5)
        self.connection.sendall(reply)
        # 建立连接成功,开始交换数据
        if reply[1] == 0 and cmd == 1:
            self.exchange_loop(self.connection, remote)
        self.server.close_request(self.request)
    def get_available_methods(self, n):
        methods = []
        for i in range(n):
            methods.append(ord(self.connection.recv(1)))
        return methods
    def verify_credentials(self):
        """校验用户名和密码"""
        version = ord(self.connection.recv(1))
        assert version == 1
        username_len = ord(self.connection.recv(1))
        username = self.connection.recv(username_len).decode('utf-8')
        password_len = ord(self.connection.recv(1))
        password = self.connection.recv(password_len).decode('utf-8')
        if username == self.username and password == self.password:
            # 验证成功, status = 0
            response = struct.pack("!BB", version, 0)
            self.connection.sendall(response)
            return True
        # 验证失败, status != 0
        response = struct.pack("!BB", version, 0xFF)
        self.connection.sendall(response)
        self.server.close_request(self.request)
        return False
    def generate_failed_reply(self, address_type, error_number):
        return struct.pack("!BBBBIH", SOCKS_VERSION, error_number, 0, address_type, 0, 0)
    def exchange_loop(self, client, remote):
        while True:
            # 等待数据
            r, w, e = select.select([client, remote], [], [])
            if client in r:
                data = client.recv(4096)
                if remote.send(data) <= 0:
                    break
            if remote in r:
                data = remote.recv(4096)
                if client.send(data) <= 0:
                    break
if __name__ == '__main__':
    # 使用socketserver库的多线程服务器ThreadingTCPServer启动代理
    with ThreadingTCPServer(('0.0.0.0', 9011), SocksProxy) as server:
        server.serve_forever()

连接该代理的语句,192.168.47.129为搭建socks5代理服务器的ip地址

curl -v  --socks5 192.168.47.129:9011  -U username:password http://www.baidu.com

© 版权声明
THE END
喜欢就支持一下吧
点赞21
分享
评论 共3条

请登录后发表评论