【群友分享】Python 实现QQ空间登录及相册抓取

来自群友Pluto的分享

Python 爬取QQ空间相册

项目背景

前几天看着QQ相册里的有些年代比较久远还是比较令人怀念的,于是我本想一键导出相册但却发现QQ相册并没有这个功能,还得自己写代码一键导出着实令人头疼,说做就做,还是直接搞吧。

网页分析

首先应该是找相册列表的接口,这个应该是比较容易找到,接口是https://user.qzone.qq.com/proxy/domain/photo.qzone.qq.com/fcgi-bin/fcg_list_album_v3而其参数中,我们可以剔出不必要的参数,最后剩下g_tkhostUinuininCharsetoutCharsetpageNumModeSortpageNumModeClass这几个参数值,其中g_tk为加密算法的g_tk值,hostUinuin是登录QQ账号和被访问QQ账号,因为我们是在下载自己空间相册,所以这两个都是我们的QQ账号,inCharsetoutCharset则是输入输出编码格式,这是固定的,pageNumModeSortpageNumModeClass则是每页的相册信息,保持默认不变即可。

接下来应该是找相册图片列表的接口,这个也是比较容易找到,接口是https://h5.qzone.qq.com/proxy/domain/photo.qzone.qq.com/fcgi-bin/cgi_list_photo,这个接口最后也有一些参数,去掉不必要的参数,最后剩下g_tkhostUinuininCharsetoutCharsettopicIdpageNumpage这几个参数,而前五个参数与上一个接口相同,而topicId对应的是相册的id,而pageNumpage则是每页相册数以及页码数,直接令pageNum等于该相册内照片数目,page等于0或者取消该参数,即可获得该相册全部照片数据。

在相册数据集中就会有每张照片所对应的信息,以及URL地址,我们则可以通过该URL链接下载我们所需要的照片,即接下来就可以写代码实现相关需求了。

爬取思路

根据以上分析,我们首先定义一个相关的类QQZonePictures,并初始化相关参数和属性。

  def __init__(self, cookies=None, gtk=None, uin=None):
         self.cookies = cookies
         self.gtk = gtk
         self.uin = uin
         self.root = self.Mkdir_path(".//images//")
         self.url_list = 'https://user.qzone.qq.com/proxy/domain/photo.qzone.qq.com/fcgi-bin/fcg_list_album_v3'
         self.url_photo = 'https://h5.qzone.qq.com/proxy/domain/photo.qzone.qq.com/fcgi-bin/cgi_list_photo'
         self.header = {
             'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;'
                       'q=0.8,application/signed-exchange;v=b3;q=0.9',
             'accept-encoding': 'gzip, deflate, br',
             'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8',
             'cache-control': 'no-cache',
             'cookie': self.cookies,
             'pragma': 'no-cache',
             'sec-fetch-dest': 'document',
             'sec-fetch-mode': 'navigate',
             'sec-fetch-site': 'none',
             'sec-fetch-user': '?1',
             'upgrade-insecure-requests': '1',
             'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
                           'AppleWebKit/537.36 (KHTML, like Gecko) '
                           'Chrome/80.0.3987.116 Safari/537.36',
         }

然后我们需要写两个方法分别对两个接口进行爬取,但是对两个接口爬取到的信息需要进行清洗才能够使用,所以我们创建了一个数据清洗的方法,对获取的数据进行清洗,并返回清洗后的数据字典。

  def Clean_data(self, string):
         response = string.replace(' ', '')
         response = response.replace('\t', '')
         response = response.replace('\n', '')
         response = response.replace('false', '"false"')
         response = response.replace('true', '"true"')
         data = json.loads(response[10:-2])
         return data
 ​
     def Get_photo_lists(self):
         param = {
             'g_tk': self.gtk,
             'hostUin': self.uin,
             'uin': self.uin,
             'inCharset': 'utf-8',
             'outCharset': 'utf-8',
             'pageNumModeSort': '40',
             'pageNumModeClass': '15'
         }
         res = requests.get(self.url_list, headers=self.header, params=param)
         Photo_lists_data = self.Clean_data(res.text)
         return Photo_lists_data
 ​
     def Get_photos(self, list_id, num):
         param = {
             'g_tk': self.gtk,
             'hostUin': self.uin,
             'uin': self.uin,
             'inCharset': 'utf-8',
             'outCharset': 'utf-8',
             'topicId': list_id,
             'pageNum': num,
         }
         res = requests.get(self.url_photo, headers=self.header, params=param)
         Photos_data = self.Clean_data(res.text)
         return Photos_data

到此我们已经能够获取QQ空间所有的相册,以及所有相册中的照片信息,接下来就是对相册中的照片进行下载,我们需要创建一个生成路径文件夹的方法用于以及一个用于图片下载的方法。

 def Mkdir_path(self, path):
         if not os.path.exists(path):
             os.mkdir(path)
         return path
         
     def Downloads(self, data):
         file_name, count = data["data"]["topic"]['name'], 0
         root = self.Mkdir_path(self.root + file_name + '//')
         print(f"相册{file_name}开始下载...")
         for photo in data["data"]["photoList"]:
             path = root + f"image_{count}.jpg"
             read = requests.get(photo['url'])
             with open(path, 'wb') as file:
                 file.write(read.content)
             print(f"image_{count}.jpg 下载成功")
             count = count + 1
         print(f"相册{file_name}下载完成...")

最后,我们创建主方法,通过上述方法将整个QQ空间相册下载类中的方法和属性整合在一起。

 def main(self):
         photos_lists = self.Get_photo_lists()
         for photos_list in photos_lists["data"]["albumListModeSort"]:
             list_id = photos_list['id']
             num = photos_list['total']
             Photos_data = self.Get_photos(list_id, num)
             self.Downloads(Photos_data)

完整代码

 import requests
 import random
 import time
 import json
 import util
 import os
 ​
 ​
 class QQZonePictures:
     def __init__(self, cookies=None, gtk=None, uin=None):
         self.cookies = cookies
         self.gtk = gtk
         self.uin = uin
         self.root = self.Mkdir_path(".//images//")
         self.url_list = 'https://user.qzone.qq.com/proxy/domain/photo.qzone.qq.com/fcgi-bin/fcg_list_album_v3'
         self.url_photo = 'https://h5.qzone.qq.com/proxy/domain/photo.qzone.qq.com/fcgi-bin/cgi_list_photo'
         self.header = {
             'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;'
                       'q=0.8,application/signed-exchange;v=b3;q=0.9',
             'accept-encoding': 'gzip, deflate, br',
             'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8',
             'cache-control': 'no-cache',
             'cookie': self.cookies,
             'pragma': 'no-cache',
             'sec-fetch-dest': 'document',
             'sec-fetch-mode': 'navigate',
             'sec-fetch-site': 'none',
             'sec-fetch-user': '?1',
             'upgrade-insecure-requests': '1',
             'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
                           'AppleWebKit/537.36 (KHTML, like Gecko) '
                           'Chrome/80.0.3987.116 Safari/537.36',
         }
 ​
     def Clean_data(self, string):
         response = string.replace(' ', '')
         response = response.replace('\t', '')
         response = response.replace('\n', '')
         response = response.replace('false', '"false"')
         response = response.replace('true', '"true"')
         data = json.loads(response[10:-2])
         return data
 ​
     def Mkdir_path(self, path):
         if not os.path.exists(path):
             os.mkdir(path)
         return path
 ​
     def Get_photo_lists(self):
         param = {
             'g_tk': self.gtk,
             'hostUin': self.uin,
             'uin': self.uin,
             'inCharset': 'utf-8',
             'outCharset': 'utf-8',
             'pageNumModeSort': '40',
             'pageNumModeClass': '15'
         }
         res = requests.get(self.url_list, headers=self.header, params=param)
         Photo_lists_data = self.Clean_data(res.text)
         return Photo_lists_data
 ​
     def Get_photos(self, list_id, num):
         param = {
             'g_tk': self.gtk,
             'hostUin': self.uin,
             'uin': self.uin,
             'inCharset': 'utf-8',
             'outCharset': 'utf-8',
             'topicId': list_id,
             'pageNum': num,
         }
         res = requests.get(self.url_photo, headers=self.header, params=param)
         Photos_data = self.Clean_data(res.text)
         return Photos_data
 ​
     def Downloads(self, data):
         file_name, count = data["data"]["topic"]['name'], 0
         root = self.Mkdir_path(self.root + file_name + '//')
         print(f"相册{file_name}开始下载...")
         for photo in data["data"]["photoList"]:
             path = root + f"image_{count}.jpg"
             read = requests.get(photo['url'])
             with open(path, 'wb') as file:
                 file.write(read.content)
             print(f"image_{count}.jpg 下载成功")
             count = count + 1
         print(f"相册{file_name}下载完成...")
 ​
     def main(self):
         photos_lists = self.Get_photo_lists()
         for photos_list in photos_lists["data"]["albumListModeSort"]:
             list_id = photos_list['id']
             num = photos_list['total']
             Photos_data = self.Get_photos(list_id, num)
             self.Downloads(Photos_data)
 ​
 ​
 if __name__ == '__main__':
     Login = QQZone()
     cookies, gtk, uin = Login.login()
     Spider = QQZonePictures(cookies=cookies, gtk=gtk, uin=uin)
     Spider.main()
 ​

QQ空间模拟登陆

虽然大家读完这篇博客已经清楚了,但是对于我给出的完整代码最后的g_tkcookiesuin这三个参数可能还有较多的疑问,对于g_tkcookiesuin的获取方法就要涉及到爬虫的模拟登陆,但是QQ空间的模拟登陆是比较复杂的,所以我提供三种解决方案,而这三种解决方案的代码实现将会在后续博客中进行详细描述的解析。

解决方案①

我们作为在查找接口时,已经获取过这三个参数g_tkcookiesuin,所以手动登陆空间然后获取这些参数,手动输入后,便可直接运行上述程序。

解决方案②

通过代码用selenium来模拟人的操作来实现模拟登陆,具体来说就是先点击“账号密码登录”,定位账号、密码输入框并输入相应内容,定位登录按钮,点击登录,,要注意的是这个页面是有框架的,在源码里可以看到”login_frame”,所以要先切换到框架。

from selenium import webdriver
import requests
import copy
import re


class QQZone:
    def __init__(self, username=None, password=None):
        self.url_login = 'https://i.qq.com/'
        self.username = username
        self.password = password
        self.qzonetoken = None
        self.cookies = None
        self.g_tk = None
        self.headers = {
            'host': 'user.qzone.qq.com',
            'accept-encoding': 'gzip, deflate, br',
            'accept-language': 'zh-CN,zh;q=0.8',
            'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
            'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.13; rv:66.0) Gecko/20100101 Firefox/66.0',
            'connection': 'keep-alive',
            'referer': 'https://qzone.qq.com/',
        }

    def driver(self):
        # 无头浏览器的写法
        # chrome_options = Options()
        # chrome_options.add_argument('--headless')
        # chrome_options.add_argument('--disable-gpu')
        # driver = webdriver.Chrome(options=chrome_options)

        # 有头浏览器的写法
        driver = webdriver.Chrome()
        driver.get(self.url_login)
        driver.switch_to.frame('login_frame')

        # 切换到账号密码登录
        log_method = driver.find_element_by_id('switcher_plogin')
        log_method.click()

        # 输入账号密码,登录
        account_input = driver.find_element_by_id('u')
        account_input.send_keys(self.username)
        password_input = driver.find_element_by_id('p')
        password_input.send_keys(self.password)
        login_button = driver.find_element_by_id('login_button')
        login_button.click()

        driver.switch_to.default_content()
        self.cookies = driver.get_cookies()
        return driver

    def back_session(self):
        # 创建一个session对象
        my_session = requests.session()
        headers = copy.deepcopy(self.headers)
        headers['host'] = 'h5.qzone.qq.com'
        # 将字典转为cookiejar, 这样就可以将cookie赋给session
        c = requests.utils.cookiejar_from_dict(self.cookies, cookiejar=None, overwrite=True)
        my_session.headers = headers
        # 将cookie赋给session
        my_session.cookies.update(c)
        return my_session

    # 生成g_tk
    def get_g_tk(self, driver):
        hashes = 5381
        for letter in driver.get_cookie('p_skey'):
            hashes += (hashes << 5) + ord(letter)
        self.g_tk = hashes & 0x7fffffff
        return self.g_tk

    # 获取qzonetoken
    def get_qzonetoken(self, driver):
        html = driver.page_source
        xpat = r'window\.g_qzonetoken = \(function\(\)\{ try\{return (.*?);\} catch\(e\)'
        qzonetoken = re.compile(xpat).findall(html)[0]
        self.qzonetoken = qzonetoken.strip('\"')
        return self.qzonetoken

    def login(self):
        driver = self.driver()
        self.get_g_tk(driver)
        self.get_qzonetoken(driver)
        return self.cookies, self.g_tk, self.username

解决方案③

就是通过QQ空间的二维码登陆接口,利用手机QQ扫描二维码然后进行QQ空间的登陆,这种方法也是比较简单、高效、实用的方法,所以其实完整代码中的 QQZone() 类便是通过该方法进行登录的。

import matplotlib.image as mpimg
 import matplotlib.pyplot as plt
 from http import cookiejar
 from PIL import Image
 import datetime
 import requests
 import random
 import time
 import copy
 import json
 import util
 import os
 import re
 ​
 ​
 class QQZone:
     def __init__(self):
         self.qzone_login_url = 'https://xui.ptlogin2.qq.com/cgi-bin/xlogin?proxy_url=https%3A//qzs.qq.com/qzone/v6' \
                                '/portal/proxy.html&daid=5&&hide_title_bar=1&low_login=0&qlogin_auto_login=1&no_ver' \
                                'ifyimg=1&link_target=blank&appid=549000912&style=22&target=self&s_url=https%3A%2F%' \
                                '2Fqzs.qq.com%2Fqzone%2Fv5%2Floginsucc.html%3Fpara%3Dizone&pt_qr_app=%E6%89%8B%E6%9' \
                                'C%BAQQ%E7%A9%BA%E9%97%B4&pt_qr_link=https%3A//z.qzone.com/download.html&self_regur' \
                                'l=https%3A//qzs.qq.com/qzone/v6/reg/index.html&pt_qr_help_link=https%3A//z.qzone.c' \
                                'om/download.html&pt_no_auth=0'
         self.headers = {
             'host': 'user.qzone.qq.com',
             'accept-encoding': 'gzip, deflate, br',
             'accept-language': 'zh-CN,zh;q=0.8',
             'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
             'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.13; rv:66.0) Gecko/20100101 Firefox/66.0',
             'connection': 'keep-alive',
             'referer': 'https://qzone.qq.com/',
         }
         self.h5_headers = copy.deepcopy(self.headers)
         self.QR_CODE_PATH = './/images/QQ登录二维码'
         self.req = requests.Session()
         self.cookies = cookiejar.CookieJar()
         self.req.cookies = self.cookies
         self.raw_nickname = None
         self.raw_username = None
         self.qzonetoken = None
         self.nickname = None
         self.username = None
         self.g_tk = 0
 ​
     def login(self):
         # 通过二维码扫描进行登录
         if self.login_with_qr_code():
             # 根据cookie计算g_tk值
             self.get_g_tk()
             # 将cookie转化成字符串格式
             cookie = requests.utils.dict_from_cookiejar(self.cookies)
             cookies = self.change_dict_to_cookie(cookie)
             # 返回cookie,g_tk,username参数
             return cookies, self.g_tk, self.username
         else:
             return None, None, None
 ​
     def login_with_qr_code(self):
         cookies = cookiejar.Cookie(version=0, name='_qz_referrer', value='qzone.qq.com', port=None,
                                    port_specified=False, domain='qq.com', domain_specified=False,
                                    domain_initial_dot=False, path='/', path_specified=True, secure=False,
                                    expires=None, discard=True, comment=None, comment_url=None,
                                    rest={'HttpOnly': None}, rfc2109=False)
         login_url = 'https://ssl.ptlogin2.qq.com/ptqrshow?appid=549000912&e=2&l=M&s=3&d=72&v=4&t=0.{0}6252926{1}' \
                     '2285{2}86&daid=5'.format(random.randint(0, 9), random.randint(0, 9), random.randint(0, 9))
         start_time = util.date_to_millis(datetime.datetime.utcnow())
         self.headers['host'] = 'ssl.ptlogin2.qq.com'
         self.cookies.set_cookie(cookies)
         qr_res = self.req.get(url=login_url, headers=self.headers, timeout=20)
         self.save_image(qr_res.content, self.QR_CODE_PATH)
         print('请使用QQ扫描二维码登陆')
         lena = mpimg.imread(self.QR_CODE_PATH + '.jpg')
         plt.imshow(lena)  # 显示图片
         plt.axis('off')  # 不显示坐标轴
         plt.show()
         # 备用图片显示方案
         # image = Image.open()
         # image.show()
         print(f'二维码图片存储路径为:{self.QR_CODE_PATH}.jpg')
         login_sig = self.get_cookie('pt_login_sig')
         qr_sig = self.get_cookie('qrsig')
         print("success to download qr code")
         while True:
             self.headers['referer'] = self.qzone_login_url
             res = self.req.get(
                 f'https://ssl.ptlogin2.qq.com/ptqrlogin?u1=https%3A%2F%2Fqzs.qq.com%2Fqzone%2Fv5%2Floginsucc.html'
                 f'%3Fpara%3Dizone&ptqrtoken={self.get_qr_token(qr_sig)}&ptredirect=0&h=1&t=1&g=1&from_ui=1&ptlang'
                 f'=2052&action=0-0-{util.date_to_millis(datetime.datetime.utcnow()) - start_time}&js_ver=10220&js'
                 f'_type=1&login_sig={login_sig}&pt_uistyle=40&aid=549000912&daid=5&', headers=self.headers)
             content = res.content.decode("utf-8")
             ret = content.split("'")
             if ret[1] in {'0': '验证成功', '65': 'QRCode失效'}:
                 break
             elif ret[1] in {'66': '未失效', '67': '验证中'}:
                 pass
             time.sleep(2)
 ​
         # 删除QRCode二维码图片
         if os.path.exists(self.QR_CODE_PATH + '.jpg'):
             os.remove(self.QR_CODE_PATH + '.jpg')
 ​
         # 返回登录状态
         if ret[1] == '0':
             print("scan qr code success")
             self.nickname = ret[11]
             self.req.get(url=ret[5])
             username = re.findall(r'uin=([0-9]+?)&', ret[5])[0]
             self.username = username
             skey = self.get_cookie('p_skey')
             self.g_tk = self.get_GTK(skey)
             self.headers['host'] = 'user.qzone.qq.com'
             self.raw_username = copy.deepcopy(self.username)
             self.raw_nickname = copy.deepcopy(self.nickname)
             self.headers.pop('referer')
             self.get_qzone_token()
             print("用户" + self.username + "登陆成功!")
             return True
         else:
             print("Failed to login with qr code")
             return False
 ​
     def get_cookie(self, key):
         for c in self.cookies:
             if c.name == key:
                 return c.value
         return ''
 ​
     def get_GTK(self, skey):
         hash = 5381
         for i in range(0, len(skey)):
             hash += (hash << 5) + self.utf8_unicode(skey[i])
         return hash & 0x7fffffff
 ​
     def utf8_unicode(self, c):
         if len(c) == 1:
             return ord(c)
         elif len(c) == 2:
             n = (ord(c[0]) & 0x3f) << 6
             n += ord(c[1]) & 0x3f
             return n
         elif len(c) == 3:
             n = (ord(c[0]) & 0x1f) << 12
             n += (ord(c[1]) & 0x3f) << 6
             n += ord(c[2]) & 0x3f
             return n
         else:
             n = (ord(c[0]) & 0x0f) << 18
             n += (ord(c[1]) & 0x3f) << 12
             n += (ord(c[2]) & 0x3f) << 6
             n += ord(c[3]) & 0x3f
             return n
 ​
     def change_dict_to_cookie(self, cookie):
         cookies = ''
         for key in cookie:
             cookies += key + '=' + str(cookie[key]) + '; '
         return cookies
 ​
     def get_qr_token(self, qrsig):
         e = 0
         for i in qrsig:
             e += (e << 5) + ord(i)
         return 2147483647 & e
 ​
     # 核心加密字段
     def get_g_tk(self):
         cookies = requests.utils.dict_from_cookiejar(self.cookies)
         p_skey = cookies['p_skey']
         h = 5381
         for i in p_skey:
             h += (h << 5) + ord(i)
         self.g_tk = h & 2147483647
 ​
     def save_image(self, image, name):
         try:
             if not os.path.exists(".//images//"):
                 os.mkdir(".//images//")
             file_image = open(name + '.jpg', 'wb+')
             file_image.write(image)
             file_image.close()
         except BaseException as e:
             print(e, "Failed to save image:" + name)
 ​
     def get_qzone_token(self):
         url = 'https://user.qzone.qq.com/' + self.raw_username + '/main'
         res = self.req.get(url=url, headers=self.headers, timeout=20)
         content = res.content.decode("utf-8")
         qzonetoken = re.findall(re.compile("g_qzonetoken = \(function\(\)\{ try\{return \"(.*)?\""), content)[0]
         self.qzonetoken = qzonetoken
© 版权声明
THE END
喜欢就支持一下吧
点赞7
分享
评论 共4条

请登录后发表评论