如何使用Wireshark构造Socks5协议的pcap包
运行环境
服务器:Debian 10
客户端:Kali linux
安装PySocks
安装Python3-pip服务
apt-get update
apt-get install python3-pip
使用pip安装PySocks
后面的python脚本中,import的包中使用到了PySocks
第三方库,这里需要事先安装好PySocks
pip3 install PySocks
开启wireshark抓包
开启wireshark抓包,然后运行以下内容
服务器端运行python脚本
设置好编码格式
这里需要在python的代码里设置好UTF-8编码,# coding=UTF-8
不然会出现报错
Non-ASCII character '\xe5'
Python脚本代码
新建一个server.py
的文件
# coding=UTF-8
import select
import socket
import struct
from socketserver import StreamRequestHandler, ThreadingTCPServer
SOCKS_VERSION = 5
class SocksProxy(StreamRequestHandler):
def handle(self):
print('Accepting connection from {}'.format(self.client_address))
# 协商
# 从客户端读取并解包两个字节的数据
header = self.connection.recv(2)
version, nmethods = struct.unpack("!BB", header)
# 设置socks5协议,METHODS字段的数目大于0
assert version == SOCKS_VERSION
assert nmethods > 0
# 接受支持的方法
methods = self.get_available_methods(nmethods)
# 无需认证
if 0 not in set(methods):
self.server.close_request(self.request)
return
# 发送协商响应数据包
self.connection.sendall(struct.pack("!BB", SOCKS_VERSION, 0))
# 请求
version, cmd, _, address_type = struct.unpack("!BBBB", self.connection.recv(4))
assert version == SOCKS_VERSION
if address_type == 1: # IPv4
address = socket.inet_ntoa(self.connection.recv(4))
elif address_type == 3: # Domain name
domain_length = self.connection.recv(1)[0]
address = self.connection.recv(domain_length)
#address = socket.gethostbyname(address.decode("UTF-8")) # 将域名转化为IP,这一行可以去掉
elif address_type == 4: # IPv6
addr_ip = self.connection.recv(16)
address = socket.inet_ntop(socket.AF_INET6, addr_ip)
else:
self.server.close_request(self.request)
return
port = struct.unpack('!H', self.connection.recv(2))[0]
# 响应,只支持CONNECT请求
try:
if cmd == 1: # CONNECT
remote = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
remote.connect((address, port))
bind_address = remote.getsockname()
print('Connected to {} {}'.format(address, port))
else:
self.server.close_request(self.request)
addr = struct.unpack("!I", socket.inet_aton(bind_address[0]))[0]
port = bind_address[1]
#reply = struct.pack("!BBBBIH", SOCKS_VERSION, 0, 0, address_type, addr, port)
# 注意:按照标准协议,返回的应该是对应的address_type,但是实际测试发现,当address_type=3,也就是说是域名类型时,会出现卡死情况,但是将address_type该为1,则不管是IP类型和域名类型都能正常运行
reply = struct.pack("!BBBBIH", SOCKS_VERSION, 0, 0, 1, addr, port)
except Exception as err:
logging.error(err)
# 响应拒绝连接的错误
reply = self.generate_failed_reply(address_type, 5)
self.connection.sendall(reply)
# 建立连接成功,开始交换数据
if reply[1] == 0 and cmd == 1:
self.exchange_loop(self.connection, remote)
self.server.close_request(self.request)
def get_available_methods(self, n):
methods = []
for i in range(n):
methods.append(ord(self.connection.recv(1)))
return methods
def generate_failed_reply(self, address_type, error_number):
return struct.pack("!BBBBIH", SOCKS_VERSION, error_number, 0, address_type, 0, 0)
def exchange_loop(self, client, remote):
while True:
# 等待数据
r, w, e = select.select([client, remote], [], [])
if client in r:
data = client.recv(4096)
if remote.send(data) <= 0:
break
if remote in r:
data = remote.recv(4096)
if client.send(data) <= 0:
break
if __name__ == '__main__':
# 使用socketserver库的多线程服务器ThreadingTCPServer启动代理
with ThreadingTCPServer(('0.0.0.0', 9011), SocksProxy) as server:
server.serve_forever()
使用Python3运行
python3 server.py
客户端连接
curl -v --socks5 192.168.47.129:9011 http://www.baidu.com
![图片[1]-如何使用Wireshark抓包分析socks5协议-FancyPig's blog](https://static.iculture.cc/wp-content/uploads/2021/05/image-79-1024x649.png?x-oss-process=image/auto-orient,1/format,webp/watermark,image_cHVibGljL2xvZ28ucG5nP3gtb3NzLXByb2Nlc3M9aW1hZ2UvcmVzaXplLFBfMTA,x_10,y_10)
![图片[2]-如何使用Wireshark抓包分析socks5协议-FancyPig's blog](https://static.iculture.cc/wp-content/uploads/2021/05/image-80.png?x-oss-process=image/auto-orient,1/format,webp/watermark,image_cHVibGljL2xvZ28ucG5nP3gtb3NzLXByb2Nlc3M9aW1hZ2UvcmVzaXplLFBfMTA,x_10,y_10)
Wireshark抓包结果
![图片[3]-如何使用Wireshark抓包分析socks5协议-FancyPig's blog](https://static.iculture.cc/wp-content/uploads/2021/05/image-81-1024x385.png?x-oss-process=image/auto-orient,1/format,webp/watermark,image_cHVibGljL2xvZ28ucG5nP3gtb3NzLXByb2Nlc3M9aW1hZ2UvcmVzaXplLFBfMTA,x_10,y_10)
![图片[4]-如何使用Wireshark抓包分析socks5协议-FancyPig's blog](https://static.iculture.cc/wp-content/uploads/2021/05/image-82-1024x521.png?x-oss-process=image/auto-orient,1/format,webp/watermark,image_cHVibGljL2xvZ28ucG5nP3gtb3NzLXByb2Nlc3M9aW1hZ2UvcmVzaXplLFBfMTA,x_10,y_10)
05表示socks5协议,00表示无需认证的认证方法,我们这里没设置用户名和密码
如果你需要设置必须认证用户名和密码的socks5协议,可以参考以下代码
import logging
import select
import socket
import struct
from socketserver import StreamRequestHandler, ThreadingTCPServer
logging.basicConfig(level=logging.DEBUG)
SOCKS_VERSION = 5
class SocksProxy(StreamRequestHandler):
# 用户名为username
username = 'username'
# 密码为password
password = 'password'
def handle(self):
logging.info('Accepting connection from %s:%s' % self.client_address)
# 协商
# 从客户端读取并解包两个字节的数据
header = self.connection.recv(2)
version, nmethods = struct.unpack("!BB", header)
# 设置socks5协议,METHODS字段的数目大于0
assert version == SOCKS_VERSION
assert nmethods > 0
# 接受支持的方法
methods = self.get_available_methods(nmethods)
# 检查是否支持用户名/密码认证方式,不支持则断开连接
if 2 not in set(methods):
self.server.close_request(self.request)
return
# 发送协商响应数据包
self.connection.sendall(struct.pack("!BB", SOCKS_VERSION, 2))
# 校验用户名和密码
if not self.verify_credentials():
return
# 请求
version, cmd, _, address_type = struct.unpack("!BBBB", self.connection.recv(4))
assert version == SOCKS_VERSION
if address_type == 1: # IPv4
address = socket.inet_ntoa(self.connection.recv(4))
elif address_type == 3: # 域名
domain_length = self.connection.recv(1)[0]
address = self.connection.recv(domain_length)
elif address_type == 4: # IPv6
addr_ip = self.connection.recv(16)
address = socket.inet_ntop(socket.AF_INET6, addr_ip)
else:
self.server.close_request(self.request)
return
port = struct.unpack('!H', self.connection.recv(2))[0]
# 响应,只支持CONNECT请求
try:
if cmd == 1: # CONNECT
remote = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
remote.connect((address, port))
bind_address = remote.getsockname()
logging.info('Connected to %s %s' % (address, port))
else:
self.server.close_request(self.request)
addr = struct.unpack("!I", socket.inet_aton(bind_address[0]))[0]
port = bind_address[1]
reply = struct.pack("!BBBBIH", SOCKS_VERSION, 0, 0, 1, addr, port)
except Exception as err:
logging.error(err)
# 响应拒绝连接的错误
reply = self.generate_failed_reply(address_type, 5)
self.connection.sendall(reply)
# 建立连接成功,开始交换数据
if reply[1] == 0 and cmd == 1:
self.exchange_loop(self.connection, remote)
self.server.close_request(self.request)
def get_available_methods(self, n):
methods = []
for i in range(n):
methods.append(ord(self.connection.recv(1)))
return methods
def verify_credentials(self):
"""校验用户名和密码"""
version = ord(self.connection.recv(1))
assert version == 1
username_len = ord(self.connection.recv(1))
username = self.connection.recv(username_len).decode('utf-8')
password_len = ord(self.connection.recv(1))
password = self.connection.recv(password_len).decode('utf-8')
if username == self.username and password == self.password:
# 验证成功, status = 0
response = struct.pack("!BB", version, 0)
self.connection.sendall(response)
return True
# 验证失败, status != 0
response = struct.pack("!BB", version, 0xFF)
self.connection.sendall(response)
self.server.close_request(self.request)
return False
def generate_failed_reply(self, address_type, error_number):
return struct.pack("!BBBBIH", SOCKS_VERSION, error_number, 0, address_type, 0, 0)
def exchange_loop(self, client, remote):
while True:
# 等待数据
r, w, e = select.select([client, remote], [], [])
if client in r:
data = client.recv(4096)
if remote.send(data) <= 0:
break
if remote in r:
data = remote.recv(4096)
if client.send(data) <= 0:
break
if __name__ == '__main__':
# 使用socketserver库的多线程服务器ThreadingTCPServer启动代理
with ThreadingTCPServer(('0.0.0.0', 9011), SocksProxy) as server:
server.serve_forever()
连接该代理的语句,192.168.47.129
为搭建socks5代理服务器的ip地址
curl -v --socks5 192.168.47.129:9011 -U username:password http://www.baidu.com
© 版权声明
THE END
- 最新
- 最热
只看作者