跳转至

2023年第7周_新的旅途

本周报分享每周所学所知所闻所感,时间范围是 2023-02-132023-02-19 会记录一些工作及生活上有意思的事情。 本周报每周日晚上发布,会同步到本人博客

见闻

为个人开发者实现了一套集成化的收费方案

最近一周在捣腾 RSS 订阅软件的事情,发现市面上可以主动发现独立博客,并可以订阅万物的软件很少,用的很顺滑的工具也非常的少。于是想开发软件来实现,但是服务器维护费用也是非常高的,所以想做一个可以订阅付费,但是作为个人开发者,开通支付渠道门槛非常高。于是在网上摸索了很久,找到了一个基本可行的方案,v2geek「收费5%还是有点高的」。他基本满足我的需求,但是没有提供任何开发的接口,不能和软件体系闭环。于是我采用了爬虫技术,基本实现了平台和软件的闭环,可以达到在软件上支付订阅费用,立马给订阅者分发授权码!一整套操作,不需要任何人工操作。

#coding=utf-8
import requests
import re
from bs4 import BeautifulSoup
import uuid

class V2GeeKPay(object):
    def __init__(self, cookies, project=""):
        self.cookies = cookies
        self.headers = {
            "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
            "accept-encoding": "gzip, deflate, br",
            "accept-language": "zh-CN,zh;q=0.9,und;q=0.8,zh-Hans;q=0.7,en;q=0.6",
            "cache-control": "max-age=0",
            "cookie": "_sessions={}".format(cookies),
            "referer": "https://v2geek.com/udmin/projects",
            "sec-ch-ua-mobile": "?0",
            "sec-ch-ua-platform": "Linux",
            "sec-fetch-dest": "document",
            "sec-fetch-mode": "navigate",
            "sec-fetch-site": "same-origin",
            "sec-fetch-user": "?1",
            "upgrade-insecure-requests": "1",
        }
        flag, self.project_list, self.user_name = self.get_project_list()
        if not flag:
            raise ValueError("cookie过期,请检查")
        if project != "":
            self.set_project(project)

    def set_project(self, project):
        project_exist = False
        for project_obj in self.project_list:
            if project_obj["key"] == project:
                self.pay_js = "https://v2geek.com/{}/{}/widget.js".format(self.user_name, project)
                self.login_url = "https://v2geek.com/users/sign_in"
                self.import_url = "https://v2geek.com/udmin/projects/{}/licenses/import".format(project)
                self.project_url = "https://v2geek.com/udmin/projects/{}".format(project)
                self.order_url = "https://v2geek.com/udmin/orders"
                self.project = project
                project_exist = True
                break
        if not project_exist:
            raise ValueError("当前用户不存在此项目,请检查")

    def get_project_list(self):
        """获取当前用户的项目以及用户名"""
        project_list_url = "https://v2geek.com/udmin/projects"
        session = requests.Session()
        try:
            response = str(session.get(project_list_url, headers=self.headers, verify=False).content, encoding="utf-8")
            html_obj = BeautifulSoup(response, "html.parser")
            tr_list = html_obj.select("tbody")[0].select("tr")
            dict_project_info = {
                0: "name",
                1: "price"
            }
            user_name = ""
            project_list = []
            for tr in tr_list:
                temp = {}
                for idx, td in enumerate(tr.select("td")):
                    if idx in dict_project_info:
                        pro_key = dict_project_info[idx]
                        temp[pro_key] = td.text.strip().replace("¥","").replace("元","")
                        if pro_key == "name":
                            key_list = td.select("a")[0].get("href").split("/")
                            if user_name != "":
                                user_name = key_list[1]
                            temp["key"] = key_list[2]
                project_list.append(temp)
            return True, project_list, user_name
        except:
            return False, [], ""

    def generateAuthToken(self, numbers=100):
        """使用uuid算法自动生成authToken
        """
        return [uuid.uuid4().hex for i in range(numbers)]

    def import_auth_token(self, auth_token_list):
        # 自动生成一系列的auth_token值,并分发到系统以便不断的扩充auth_token的值,并将值存入系统
        # 获取token值
        saved_token_list, unused_token, rest_token = self.get_auth_token_list()
        if rest_token != "":
            data = {
                "utf8": (None, "✓"),
                "authenticity_token": (None, rest_token),
                "licenses": (None, "\n".join(auth_token_list)),
                "commit": (None, "提交")
            }
            session = requests.Session()
            session.post(self.import_url, data=data, headers=headers, verify=False)
            print("导入成功")
            return True, "success"
        else:
            print("cookie过期")
            return False, "cookie已过期,请联系工程师处理"

    def get_auth_token_list(self):
        """
        获取账户的相关信息,以便能够实时进行激活处理
        获取成功则返回当前的token,可以用于导入auth_token
        """
        session = requests.Session()
        try:
            response = str(session.get(self.project_url, headers=self.headers, verify=False).content, encoding="utf-8")
            html_obj_first = BeautifulSoup(response, "html.parser")
            page = html_obj_first.select(".pagination")
            meta_list = html_obj_first.select("meta")
            rest_token = ""
            html_obj_list = [html_obj_first]
            for meta in meta_list:
                if meta.get("name") == "csrf-token":
                    rest_token = meta.get("content")
            if len(page):
                page_num = int(page[0].select("a")[-1].get("href").split("=")[1].strip())+1
                for i in range(2, page_num):
                    response = str(session.get(self.project_url+"?page={}".format(i), headers=headers, verify=False).content, encoding="utf-8")
                    html_obj_copy = BeautifulSoup(response, "html.parser")
                    html_obj_list.append(html_obj_copy)
            dict_order_info = {
                0: "auth_token",
                1: "use_status",
                2: "order_id",
            }
            auth_token_list = []
            temp = {}
            for html_obj in html_obj_list:
                for tr in html_obj.select("tbody")[0].select("tr"):
                    if "license" in tr.get("id"):
                        temp = {
                            "license_id": tr.get("id")
                        }
                        for idx, td in enumerate(tr.select("td")):
                            key_id = idx % 4
                            if key_id in dict_order_info:
                                temp[dict_order_info[key_id]] = td.text.strip()
                        auth_token_list.append(temp)
            # 计算还剩余下多少个token没有被分配完,这里不进行自动生成分配,交给别的业务系统来处理这个事情
            unused_token = []
            for auth_token in auth_token_list:
                if auth_token.get("order_id") == "-":
                    unused_token.append(auth_token)
            return auth_token_list, unused_token, rest_token
        except Exception as e:
            return [], [], ""

    def verify_order_token(self, order_id):
        """验证订单号有没有被分配授权码,如果有则返回True和授权码
        """
        auth_token_list, unused_token, rest_token = self.get_auth_token_list()
        for auth_token in auth_token_list:
            if auth_token.get("order_id") == order_id:
                return True, auth_token.get("auth_token")
        return False,""

    def query_order_status(self, order_id):
        """查询当前订单状态"""
        headers = { 'content-type': 'application/json','User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.119 Safari/537.36'}
        order_status_url = "https://v2geek.com/{user_name}/{project}/checkouts/{order_id}/status".format(user_name=self.user_name, project=self.project, order_id=order_id)
        session = requests.Session()
        res = session.get(order_status_url, headers=headers, verify=False)
        html_content = str(res.content, encoding='utf8')
        return html_content.strip()

    def query_order_list(self):
        """查询订单列表
        """
        session = requests.Session()
        res = session.get(self.order_url, headers=self.headers, verify=False)
        html_content = str(res.content, encoding='utf8')
        html_obj_first = BeautifulSoup(html_content, "html.parser")
        try:
            meta_list = html_obj_first.select("meta")
            rest_token = ""
            html_obj_list = [html_obj_first]
            for meta in meta_list:
                if meta.get("name") == "csrf-token":
                    rest_token = meta.get("content")
            html_obj_list = [html_obj_first]
            page = html_obj_first.select(".pagination")
            if len(page):
                page_num = int(page[0].select("a")[-1].get("href").split("=")[1].strip())+1
                for i in range(2, page_num):
                    response = str(session.get(self.order_url+"?page={}".format(i), headers=self.headers, verify=False).content, encoding="utf-8")
                    html_obj_copy = BeautifulSoup(response, "html.parser")
                    html_obj_list.append(html_obj_copy)

            dict_order_info = {
                1: "order_status",
                2: "order_id",
                3: "pay_num",
                4: "real_pay",
                5: "order_time",
                6: "pay_time",
                7: "email"
            }
            order_list = []
            temp = {}
            for html_obj in html_obj_list:
                for tr in html_obj.select("tbody")[0].select("tr"):
                    temp = {}
                    for idx, td in enumerate(tr.select("td")):
                        key_id = idx
                        if key_id in dict_order_info:
                            temp[dict_order_info[key_id]] = td.text.strip().replace("¥","").replace("元","")
                    order_list.append(temp)
            return order_list, rest_token
        except Exception as e:
            return [], ""

    def clear_expired_order(self):
        order_list, rest_token = self.query_order_list()
        if rest_token != "":
            for order in order_list:
                if order["order_status"] == "未支付" or order["order_status"] == "已关闭":
                    print(self.close_order(order["order_id"]))
            return True, "success"
        else:
            return False,"cookie过期"

    def close_order(self, order_id):
        order_close_url = "https://v2geek.com/udmin/orders/{}/close".format(order_id)
        order_list, rest_token = self.query_order_list()
        if rest_token != "":
            # 查看当前订单状态
            for order in order_list:
                if order["order_id"] == order_id:
                    if order["order_status"] == "已关闭":
                        self.delete_order(order_id, rest_token)
                        return True
                    elif order["order_status"] == "未支付":
                        session = requests.Session()
                        data = {
                            "_method": (None, "post"),
                            "authenticity_token": (None, rest_token),
                        }
                        res= session.post(order_close_url, data=data, headers=self.headers, verify=False)
                        close_content = str(res.content, encoding='utf8')
                        html_obj = BeautifulSoup(close_content, "html.parser")
                        meta_list = html_obj.select("meta")
                        for meta in meta_list:
                            if meta.get("name") == "csrf-token":
                                rest_token = meta.get("content")
                                if rest_token:
                                    self.delete_order(order_id, rest_token)
                                    return True, "success"
                                else:
                                    return False, "关闭失败"
                            else:
                                return False, "关闭失败"
                    else:
                        return False, "订单为{}状态,不支持关闭".format(order["order_status"])
        else:
            return False,"cookie过期"

    def delete_order(self, order_id, token):
        delete_url = "https://v2geek.com/udmin/orders/{}".format(order_id)
        data = {
            "_method": (None, "delete"),
            "authenticity_token": (None, token),
        }
        session = requests.Session()
        session.post(delete_url, data=data, headers=self.headers, verify=False)

    def moke_pay(self, email, coupon=None):
        """.dockerignore"""
        headers = { 'content-type': 'application/json','User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.119 Safari/537.36'}
        session = requests.Session()
        res = session.get(self.pay_js, headers=headers, verify=False)
        js = str(res.content, encoding='utf8')
        def get_pay_token(js):
            """.dockerignore"""
            it = re.finditer(r"value=([\s\S]*?)/>", js)
            try:
                for match in it:
                    ret = match.group()
                    token = ret.split("\"")[1].replace('\\', "")
                    if len(token) > 16:
                        return token
            except:
                raise ValueError("代码格式错误了,请联系工程师进行修复")

        def get_order_id(content):
            """.dockerignore"""
            it = re.finditer(r"/checkouts/([\s\S]*?)/status", content.decode("utf-8"))
            try:
                for match in it:
                    ret = match.group()
                    return ret.split("/")[2]
            except:
                raise ValueError("获取订单,代码格式错误,请联系工程师进行修复")

        def get_pay_url(js):
            """.dockerignore"""
            it = re.finditer(r"action=([\s\S]*?)/>", js)
            try:
                for match in it:
                    ret = match.group()
                    reg = 'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'
                    url_list = re.findall(reg, ret)
                    for url in url_list:
                        if len(url) > 16:
                            return url.replace('\\', "")
            except Exception as e:
                raise ValueError("代码格式错误,请联系工程师进行修复")

        token = get_pay_token(js)
        pay_url = get_pay_url(js)
        data = {
            "utf8": (None, "✓"),
            "authenticity_token": (None, token),
            "order[buyer_email]": (None, email),
            "order[coupon]": (None, coupon),
            "order[payment]": (None, "wechat"),
            "commit": (None, "提交")
        }
        try:
            r = session.post(pay_url, data=data, verify=False)
            pay_content = r.content
            order_id = get_order_id(pay_content)
            html_obj = BeautifulSoup(pay_content, "html.parser")
            back_url = "https://v2geek.com/{}/{}/checkouts/{}/paying".format(self.user_name, self.project, order_id)
            return order_id, html_obj.select('.qrcode')[0].get('src'), back_url
        except:
            raise ValueError("代码格式错误,请联系工程师进行修复")

开发一款RSS订阅软件

在这信息爆炸的时代,我们身边围绕着十分多的信息,加上推荐系统不断在为我们构建信息围城,我们需要寻找更好的阅读体验的方式,去打破信息围城,以突破信息边界,让我们和世界的信息差不断缩小。有时候也在迷茫,想寻找更好的资讯渠道,比如优质博主,优质信息。知识付费盛行,导致优质信息获取进一步变得十分困难。所以内心一直在想做一款能够对内容进行涮选的rss订阅软件,来丰富自己的阅读,并提升阅读体验和更快的找到优质的信息。