# 源码分析-requests

# 入门介绍 requests

requests是python实现的http库,简洁优雅,符合人类使用习惯。

# 源码分析适用人群

  • 会用requests,想深入学习
  • python爱好者

# 学习该章节收获

  • 精通requests源码结构,使用时理解更深刻
  • 以requests为支点,串联python与http知识
  • 源码精读,分析各个模块常用编程技巧
  • 功能模块在实际项目中类似编程实践

# 版本信息 version.py

版本相关信息较多时单独模块来处理

# .-. .-. .-. . . .-. .-. .-. .-.
# |(  |-  |.| | | |-  `-.  |  `-.
# ' ' `-' `-`.`-' `-' `-'  '  `-'

__title__ = 'requests'
__description__ = 'Python HTTP for Humans.'
__url__ = 'http://python-requests.org'
__version__ = '2.22.0'
__build__ = 0x022200
__author__ = 'Kenneth Reitz'
__author_email__ = 'me@kennethreitz.org'
__license__ = 'Apache 2.0'
__copyright__ = 'Copyright 2019 Kenneth Reitz'
__cake__ = u'\u2728 \U0001f370 \u2728'
  • __version__: major.minor.patch
  • __build__: 猜测表示修订版本的次数

# 网络传输 adapters.py

  • 此章节数据发送与底层urllib3联系紧密,参考urllib3源码泛读
  • BaseAdapter-HTTPAdapter 采用典型的 接口类-实现类,参考设计模式

# 源码分析

# -*- coding: utf-8 -*-
from urllib3.poolmanager import PoolManager

# 模拟数据
class Response(object):
    pass
class CaseInsensitiveDict(object):
    pass
def extract_cookies_to_jar(*arg, **kw):
    pass

# 抽象类
class BaseAdapter(object):
    def __init__(self):
        super(BaseAdapter, self).__init__()

    def send(self, request, stream=False, timeout=None, verify=True, cert=None, proxies=None):
        raise NotImplementedError

    def close(self):
        raise NotImplementedError

# 实现类
class HTTPAdapter(BaseAdapter):
    def __init__(self):
        # 正常写代码的地方,可调用函数,如self.init_poolmanager
        # 内部定义数据结构来存储数据,self.config, self.proxy_manager
        self.config = {}
        self.proxy_manager = {}
        super(HTTPAdapter, self).__init__()
        self.init_poolmanager()

    def init_poolmanager(self):
        self.poolmanager = PoolManager()

    def build_response(self, req, resp):
        # requests的返回值对象属性的来源******
        response = Response()
        response.status_code = getattr(resp, 'status', None)
        response.headers = CaseInsensitiveDict(getattr(resp, 'headers', {}))

        response.encoding = response.headers
        response.raw = resp
        response.reason = response.raw.reason

        if isinstance(req.url, bytes):
            response.url = req.url.decode('utf-8')
        else:
            response.url = req.url
        extract_cookies_to_jar(response.cookies, req, resp)
        
        response.request = req
        response.connection = self
        return response

    def get_connection(self, url):
        conn = self.poolmanager.connection_from_url(url)
        return conn

    def close(self):
        self.poolmanager.clear()

    def send(self, request, stream=False, timeout=None, verify=True, cert=None, proxies=None):
        conn = self.get_connection(request.url)
        resp = conn.urlopen(
            method=request.method,
            url=request.url,
            body=request.body,
            headers=request.headers,
            redirect=False,
            assert_same_host=False,
            preload_content=False,
            decode_content=False,
            retries=self.max_retries,
            timeout=timeout
        )
        return self.build_response(request, resp)

# 经典视图 api.py

# 源码分析

第一个层次,核心逻辑有模块 sessions.py 实现,为方便使用,对外提供接口,对应HTTP协议的方法

第二个层次,涉及局部代码层次结构时,会有一个通用底层代码,如request,然后再此基础上构建上层代码,如get post

# 局部底层代码
def request(method, url, **kw):pass

def get(url, params=None, **kw):pass   # 获取
def options(url, **kw):pass            # 询问url支持的方法
def head(url, **kw):pass               # 确定url的有效性
def post(url, data=None, json=None, **kw):pass  # 新增
def put(url, data=None, **kw):pass     # 整体更新
def patch(url, data=None, **kw):pass   # 局部更新
def delete(url, **kw):pass             # 删除

def request(method, url, **kwargs):
    with sessions.Session() as session:
        return session.request(method=method, url=url, **kwargs)
        
# with 上下文管理器触发的是类Session对应内置方法
def __enter__(self):
    return self
def __exit__(self, *args):
    self.close()
    
# 执行顺序为:
# 执行__enter__返回self作为as中的session
# 执行session.request(method=method, url=url, **kwargs)
# 执行__exit__ session.close()

request外部接口函数的参数与session.request绑定方式是一致,详情见 逻辑实现层sessions.py

# 项目实践

# partial偏函数

把一个函数的某些参数设置默认值,返回一个新的函数,调用这个新函数会更简单些

import functools

def show_arg(*arg, **kw):
    print(arg, kw)

show_arg(1, 2, 3, a='a', b='b', c='c')

show1 = functools.partial(show_arg, 1, 2, 3)
show1(a='a', b='b', c='c')

show2 = functools.partial(show_arg, a='a', b='b', c='c')
show2(1, 2, 3)

# 程序局部结构

实际项目开发时经常是前后端分离开发,对于后端避免不了API的开发,如何构建程序结构

package
  |-views   # 对外展示接口函数
  |-ops     # 内部实现逻辑
  |-models  # 存储模型

# 版本兼容 compat.py

Python2和Python3的字符编码,模块路径名称的变动,会产生不一致的问题,统一建立新的变量名称是很好的编程实践

import sys
_ver = sys.version_info
is_py2 = (_ver[0] == 2)
is_py3 = (_ver[0] == 3)

# simplejson比json效率更高, 支持版本更多
# 需单独安装 pip install simplejson
try:
    import simplejson as json
except ImportError:
    import json

if is_py2:
    from urllib import (
        quote, unquote, quote_plus, unquote_plus, urlencode, getproxies,
        proxy_bypass, proxy_bypass_environment, getproxies_environment)
    from urlparse import urlparse, urlunparse, urljoin, urlsplit, urldefrag
    from urllib2 import parse_http_list
    import cookielib
    from Cookie import Morsel
    from StringIO import StringIO
    from collections import Callable, Mapping, MutableMapping, OrderedDict

    builtin_str = str
    bytes = str
    str = unicode
    basestring = basestring
    numeric_types = (int, long, float)
    integer_types = (int, long)

elif is_py3:
    from urllib.parse import urlparse, urlunparse, urljoin, urlsplit, urlencode, quote, unquote, quote_plus, unquote_plus, urldefrag
    from urllib.request import parse_http_list, getproxies, proxy_bypass, proxy_bypass_environment, getproxies_environment
    from http import cookiejar as cookielib
    from http.cookies import Morsel
    from io import StringIO
    from collections import OrderedDict
    from collections.abc import Callable, Mapping, MutableMapping

    builtin_str = str
    str = str
    bytes = bytes
    basestring = (str, bytes)
    numeric_types = (int, float)
    integer_types = (int,)

# 字符编码

统一str表示unicode,bytes表示字节符,数字类型统一为numeric_types和integer_types,详尽参考

# 异常结构 exceptions.py

优秀的判断力来自经验,但经验来自于错误的判断。 - Fred Brooks, 著有<<人月神话>>

# 背景理论

# 什么是异常

异常就是程序运行时发生错误的信号,构成:异常的追踪信息,异常类,异常值

异常结构把功能逻辑和错误处理分开了,结构更加清晰,防止程序意外崩溃

  • 语法错误:Python解释器进行语法检测,执行前必须改正
  • 逻辑错误:运行期发生的错误
# TypeError: int 类型不可迭代
for i in 3:
    pass

# ValueError: aaa 不是有效的10进制字符
num = int('aaa')

# IndexError: 超出索引范围
li = [1, 2, 3]
li[100]

# NameError: 变量name没有定义
name

# KeyError: 没有对应键
dic = {"name": "linda"}
dic['age']

# AttributeError: Foo没有属性x
class Foo: pass
Foo.x

# ZeroDivisionError: 0除错误
str1 = 1/0

# 异常的种类

  • AttributeError 试图访问一个对象没有的属性,比如foo.x,但是foo没有属性x
  • IOError 输入/输出异常,基本上是无法打开文件
  • ImportError 无法引入模块或包,基本上是路径问题或名称错误
  • IndentationError 语法错误,代码没有正确对齐
  • IndexError 下标索引超出序列边界,比如当x只有三个元素,却试图访问x[5]
  • KeyError 试图访问字典里不存在的键
  • KeyboardInterrupt Ctrl+C被按下
  • NameError 使用一个还未被赋予对象的变量
  • SyntaxError Python代码非法,代码不能编译
  • TypeError 传入对象类型与要求的不符合
  • ValueError 传入一个调用者不期望的值,即使值的类型是正确的
  • ....

# 异常处理

为了保证程序的健壮性与容错性,即在遇到错误时程序不会崩溃,我们需要对异常进行处理

  • 如果错误发生的条件是可预知的,我们需要用if进行处理:在错误发生之前进行预防
AGE = 10
while True:     
    age = input('>>: ').strip()
    if age.isdigit():  # 只有在age为字符串形式的整数时,下列代码才不会出错,该条件是可预知的
        age = int(age)
    if age == AGE:
        print('you got it')
        break
  • 如果错误发生的条件是不可预知的,则需要用到try...except:在错误发生之后进行处理

# 异常类只能用来处理指定的异常情况,如果非指定异常则无法处理

try:
    int('hello')
except IndexError as e:  # 未捕获到异常,程序直接报错
    print(str(e))

# 多分支

try:
    int('hello')
except IndexError as e:
    print('from IndexError: %s' % str(e))
except KeyError as e:
    print('from KeyError: %s' % str(e))
except ValueError as e:
    print('from ValueError: %s' % str(e))
except Exception as e:
    print('from Exception: %s' % str(e))

# 异常的完整结构

try:
    int('hello')
except Exception as e:
    print('from Exception: %s' % str(e))
else:
    print('try内代码块没有异常,则执行else')
finally:
    print('无论异常与否,都会执行该模块,进行清理工作')

# 主动触发异常

try:
    raise TypeError('类型错误')
except Exception as e:
    print('from Exception: %s' % str(e))

# 自定义异常

class CustomException(BaseException):
    def __init__(self, msg):
        self.msg = msg

    def __str__(self):
        return '<CustomException: %s>' % self.msg

try:
    raise CustomException('权限错误')
except CustomException as e:
    print(e)

# 断言

两部分合作开发,确保上游满足一定条件

assert isinstance(custom, Custom)

# 异常处理机制

在系统内部,解释器使用一种被称作"块栈"(block stack)的结构来处理异常逻辑。在运行期提前将跳转存储到块栈,遇到异常时解释器会检查当前块栈内是否有匹配的处理逻辑,如果有则跳转并执行相应的指令;如果没有则沿调用栈向外传递,知道捕获或程序崩溃。

异常对象被保存到当前线程状态里,可用sys.exc_info查看

import sys

print(sys.exc_info())       # (None, None, None)
try:
    raise Exception('err')
except:
    # (<class 'Exception'>, Exception('err',), <traceback object at 0x0000027AAACFC4C8>)
    print(sys.exc_info())
# 异常一旦被捕获处理,保存在线程内的exc_type、exc_value、exc_traceback都会被清除
print(sys.exc_info())       # (None, None, None)

# 源码分析

实际项目开发中,会根据内置异常类自定义各种功能需求类,如class RequestExeption(IOError):pass

相当于打了不同的锚点,raise异常后,就可以根据不能功能锚点做相应处理

  • 遇到网络问题(如 DNS查询失败、拒绝连接等)时,抛出ConnectionError
  • HTTP请求返回不成功的状态码,r.raise_for_status()会抛出HTTPError
  • 连接超时 ConnectTimeout,读超时ReadTimeout,基础于Timeout
  • 请求超出最大重定向次数,抛出TooManyRedirects
  • requests显式抛出的异常都继承自 RequestException

# 项目实践

import traceback
import sys

'''
1. 复杂逻辑处理时,如客户详情大量字段,每个字段都有相应的权限、参数检查等操作,
   可以继承Exception自定义不同的异常,最外层捕捉不同异常,实现分步操作
2. as e的e是异常实例,如果想追踪异常栈信息来进行相关操作 traceback
'''

class CustomBaseException(Exception):
    def __init__(self, msg):
        super(Exception, self).__init__(msg)

class CustomPermException(CustomBaseException):
    def __init__(self):
        super(CustomPermException, self).__init__('perm forbidden')

class CustomParamException(CustomBaseException):
    def __init__(self, reason):
        super(CustomParamException, self).__init__(reason)

try:
    # raise CustomParamException('Params Error!')
    # raise CustomPermException()
    # raise Exception
    raise Exception('msg')
except CustomPermException as e:
    # print_exception、format_exception的快捷形式
    traceback.print_exc()
    tb_msg = traceback.format_exc()
    print(tb_msg)
except CustomParamException as e:
    # 打印、获取异常详细信息
    traceback.print_exception(type(e), e, e.__traceback__)
    tb_msg = traceback.format_exception(type(e), e, e.__traceback__)
    print(tb_msg)
except Exception as e:
    print(type(e))                      # <class 'Exception'>
    print(isinstance(e, Exception))     # True
    print(str(e))                       # msg 字符串
    print(e.args)                       # ('msg',),当仅仅raise 类Exception时返回值为(,) --> 实际使用时返回实例就可以带参数,推荐
    print(sys.exc_info())               # 异常类,异常类实例,异常追踪栈
    etype, value, tb = sys.exc_info()   # (<class 'Exception'>, Exception('msg',), <traceback object)

# 钩子编程 hooks.py

# hooks.py
HOOKS = ['response']

def default_hooks():
    return {event: [] for event in HOOKS}

def dispatch_hook(key, hooks, hook_data, **kwargs):
    hooks = hooks or {}
    hooks = hooks.get(key)
    if hooks:
        if hasattr(hooks, '__call__'):
            hooks = [hooks]
        for hook in hooks:
            _hook_data = hook(hook_data, **kwargs)
            if _hook_data is not None:
                hook_data = _hook_data
    return hook_data

# models.py
class RequestHooksMixin(object):
    def register_hook(self, event, hook):
        if event not in self.hooks:
            raise ValueError('Unsupported event specified, with event name "%s"' % (event))
        if isinstance(hook, Callable):
            self.hooks[event].append(hook)
        elif hasattr(hook, '__iter__'):
            self.hooks[event].extend(h for h in hook if isinstance(h, Callable))

    def deregister_hook(self, event, hook):
        try:
            self.hooks[event].remove(hook)
            return True
        except ValueError:
            return False

class PreparedRequest(RequestHooksMixin):
    def __init__(self):
        self.hooks = default_hooks()
        
    def prepare(self, hooks=None):
        self.prepare_hooks(hooks)


# sessions.py 
# 默认hooks
self.hooks = default_hooks()
# 默认hooks + 定义Session时传入的hooks,如{'response': []}
hooks=merge_hooks(request.hooks, self.hooks)

r = adapter.send(request, **kwargs)
r = dispatch_hook('response', hooks, r, **kwargs)

# 初始化说 init.py

# 背景理论

# 1. __init__.py用来标识所在目录是一个python的模块包(module package)

实际上,如果目录包含__init__.py文件,当导入该目录时,会执行__init__.py里面的代码

request
   |__init__.py  --> print('I from the __init__.py of request.')
   |api.py --> def view(): pass

工作目录request所在目录,进入交互模式 import

>>> import request
I from the __init__.py of request.

# 2. __init__.py用来控制模块的导入,对外提供功能

  • 有时项目目录比较深,可在__init__.py中导入,使用者直接从package顶层即可导入使用
  • 控制模块导入,当import request注意时request所在目录为工作目录
# __init__.py 导入其他功能时,注意工作目录
from request.api import view

print('I from the __init__.py of request.')

# 源码分析

# 自己开发的项目,如request的启动目录是固定的,所以from request.api import view
# 作为开源的reuqests使用相对导入较好 .就表示__init__.py所在的目录及requests
from .__version__ import __title__, __description__, __url__, __version__
from .__version__ import __build__, __author__, __author_email__, __license__
from .__version__ import __copyright__, __cake__

from . import utils
from . import packages
from .models import Request, Response, PreparedRequest
from .api import request, get, head, post, patch, put, delete, options
from .sessions import session, Session
from .status_codes import codes
from .exceptions import (
    RequestException, Timeout, URLRequired,
    TooManyRedirects, HTTPError, ConnectionError,
    FileModeWarning, ConnectTimeout, ReadTimeout
)

# reqeusts 使用方式
>>> requests.__version__
>>> requests.PreparedRequest()
>>> requests.get(url='http://www.baidu.com')
>>> requests.Session()
>>> from requests import codes
>>> from requests import ConnectTimeout
  • warnings 用于提示用户一些错误或过时的用法,后续代码依然执行
  • chardet 对未知bytes的编码进行猜测,然后转换为str
>>> chardet.detect(b'Hello, world!')
{'encoding': 'ascii', 'confidence': 1.0, 'language': ''}
>>> chardet.detect('中华人民共和国'.encode('gbk'))
{'confidence': 0.99, 'encoding': 'GB2312', 'language': 'Chinese'}
  • urllib是Python官方连接的标准库
  • urllib3是第三方库,提供了原生urllib没有的特性,如连接池
  • requests库其实是对urllib3的再次封装,使用更加友好
# 几乎所有的三方模块都会有相关代码段,后续会有logging源码分析
import logging
from logging import NullHandler

logging.getLogger(__name__).addHandler(NullHandler())
  • check_compatibility和_check_cryptography实现思想可参考异常结构exceptions.py

# 项目结构

# 数据存储 models.py

api.py 定义一系列函数作为外部使用的接口,从而可以看出两者各自的应用场景

# Requst

该模块最直观的是定义了一些类,作为存储模型,也就是规定了存储各个字段

用户输入了一些列参数: method, url, headers, files, data, params, auth, cookies, hooks, json

  • 参数较多,首先把这些分散的参数汇聚成一个Request对象
  • 每一个参数需要校验格式化,并且与系统默认的参数合并,PreparedRequest为每个参数定义了各自处理方法
  • PreparedRequest是最终send的合格数据,研究每个处理方法可以加深对HTTP协议的理解,以及处理用户输入的不确定性
# Reqeust对用户输入数据首次封装
# Request也提供了封装数据处理成格式数据的方法 prepare
class Request(RequestHooksMixin):
    def __init__(self,
            method=None, url=None, headers=None, files=None, data=None,
            params=None, auth=None, cookies=None, hooks=None, json=None):
        
        # 定义函数参数为None,内部再用三元表达式处理成可变的数据类型
        data = [] if data is None else data
        files = [] if files is None else files
        headers = {} if headers is None else headers
        params = {} if params is None else params
        hooks = {} if hooks is None else hooks

        self.hooks = default_hooks()
        for (k, v) in list(hooks.items()):
            self.register_hook(event=k, hook=v)

        self.method = method
        self.url = url
        self.headers = headers
        self.files = files
        self.data = data
        self.json = json
        self.params = params
        self.auth = auth
        self.cookies = cookies
        # 执行__init__内部代码时,self这个变量是已经创建好了,可以放心使用
        # d = dict()  # 可变类型dict
        # ret = d     # ret作为返回值, d修改时ret也会跟着变动,所以放心使用self
         
    def __repr__(self):
        return '<Request [%s]>' % (self.method)  # 学着定义更加明确 '<类名[标识如post]>'

    def prepare(self):
        # 类的使用有时也很简单,p属性与方法的结合体
        p = PreparedRequest()
        # p.prepare可处理的参数,初始化自带参数,绑定方法定义的参数
        # 绑定方法对一些列参数进行处理,可以有直接返回值
        # 也可以直接使用p,因为p是可变类型,并且还是属性和方法的结合体
        p.prepare(
            method=self.method,
            url=self.url,
            headers=self.headers,
            files=self.files,
            data=self.data,
            json=self.json,
            params=self.params,
            auth=self.auth,
            cookies=self.cookies,
            hooks=self.hooks,
        )
        # pp 很神奇
        return p

# PreparedRequest

PreparedRequest对象p是最终发送时符合HTTP协议规范的数据集合。parepare接收的10个参数经过处理封装到self属性上,结合HTTP协议,分析每个self.prepare_*绑定方法

class PreparedRequest(RequestEncodingMixin, RequestHooksMixin):
    def __init__(self):
        self.method = None
        self.url = None
        self.headers = None
        self._cookies = None
        self.body = None
        self.hooks = default_hooks()
        self._body_position = None

    def prepare(self,
            method=None, url=None, headers=None, files=None, data=None,
            params=None, auth=None, cookies=None, hooks=None, json=None)

        self.prepare_method(method)
        self.prepare_url(url, params)
        self.prepare_headers(headers)
        self.prepare_cookies(cookies)
        self.prepare_body(data, files, json)
        self.prepare_auth(auth, url)
        self.prepare_hooks(hooks)

# prepare_method

def prepare_method(self, method):
    # self == p 数据存储容器
    self.method = method
    
    # 解决Pyhton 2 3 兼容性问题,以及method输入支持b'post'格式,最终统一处理为'POST'
    if self.method is not None:
        self.method = to_native_string(self.method.upper())

# prepare_url

http://username:password@www.example.com:80/dir/index.html?uid=1#ch1

scheme    协议名 http:或https: 不区分大小写 最后附一个冒号(:)
auth      登录信息(认证)
host      服务器地址  
port      服务器端口
path      带层次的文件路径
query     查询字符串
fragment  片段标识符
def prepare_url(self, url, params):
    # str bytes unicode 是跨不过去的坎
    if isinstance(url, bytes):
        url = url.decode('utf8')
    else:
        url = unicode(url) if is_py2 else str(url)
    
    # 后续处理http开始的url
    url = url.lstrip()
    if ':' in url and not url.lower().startswith('http'):
        self.url = url
        return
    
    try:
        scheme, auth, host, port, path, query, fragment = parse_url(url)
    except LocationParseError as e:
        raise InvalidURL(*e.args)
    if not scheme:
        error = ("Invalid URL {0!r}: No schema supplied. Perhaps you meant http://{0}?")
        error = error.format(to_native_string(url, 'utf8'))
        raise MissingSchema(error)
    if not host:
        raise InvalidURL("Invalid URL %r: No host supplied" % url)
    # 支持国际化域名
    if not unicode_is_ascii(host):
        try:
            host = self._get_idna_encoded_host(host)
        except UnicodeError:
            raise InvalidURL('URL has an invalid label.')
    elif host.startswith(u'*'):
        raise InvalidURL('URL has an invalid label.')

    # username:password@www.example.com:80
    netloc = auth or ''
    if netloc:
        netloc += '@'
    netloc += host
    if port:
        netloc += ':' + str(port)
        
    if not path:
        path = '/'
  
    # get请求时支持 params参数的原因
    enc_params = self._encode_params(params)
    if enc_params:
        if query:
            query = '%s&%s' % (query, enc_params)
        else:
            query = enc_params
    # 最后把处理好的各项重新拼接为url
    url = requote_uri(urlunparse([scheme, netloc, path, None, query, fragment]))
    # self == p 数据存储容器
    self.url = url

# prepare_headers

def prepare_headers(self, headers):
    # self == p 数据存储容器, key值大小写不敏感
    self.headers = CaseInsensitiveDict()
    if headers:
        for header in headers.items():
            check_header_validity(header)
            name, value = header
            self.headers[to_native_string(name)] = value

# prepare_cookies

# Cookie是headers中一表项,所以prepare_cookies放置于prepare_headers之后执行
def prepare_cookies(self, cookies):
    # cookielib.CookieJar 数据结构单独分析
    if isinstance(cookies, cookielib.CookieJar):
        self._cookies = cookies
    else:
        self._cookies = cookiejar_from_dict(cookies)

    cookie_header = get_cookie_header(self._cookies, self)
    if cookie_header is not None:
        self.headers['Cookie'] = cookie_header

# prepare_body

def prepare_body(self, data, files, json=None):
    body = None
    content_type = None
    
    # json 传入一个dict,会执行dumps(dict)
    if not data and json is not None:
        content_type = 'application/json'
        body = complexjson.dumps(json)
        if not isinstance(body, bytes):
            body = body.encode('utf-8')

    is_stream = all([
        hasattr(data, '__iter__'),
        not isinstance(data, (basestring, list, tuple, Mapping))
    ])

    try:
        length = super_len(data)
    except (TypeError, AttributeError, UnsupportedOperation):
        length = None

    if is_stream:
        body = data

        if getattr(body, 'tell', None) is not None:
            # Record the current file position before reading.
            # This will allow us to rewind a file in the event
            # of a redirect.
            try:
                self._body_position = body.tell()
            except (IOError, OSError):
                # This differentiates from None, allowing us to catch
                # a failed `tell()` later when trying to rewind the body
                self._body_position = object()

        if files:
            raise NotImplementedError('Streamed bodies and files are mutually exclusive.')

        if length:
            self.headers['Content-Length'] = builtin_str(length)
        else:
            self.headers['Transfer-Encoding'] = 'chunked'
    else:
        # Multi-part file uploads.
        if files:
            (body, content_type) = self._encode_files(files, data)
        else:
            if data:
                body = self._encode_params(data)
                if isinstance(data, basestring) or hasattr(data, 'read'):
                    content_type = None
                else:
                    content_type = 'application/x-www-form-urlencoded'

        self.prepare_content_length(body)

        # Add content-type if it wasn't explicitly provided.
        if content_type and ('content-type' not in self.headers):
            self.headers['Content-Type'] = content_type
    
    # self == p 数据存储容器
    self.body = body

# Response

requests会把HTTP返回的信息以对象的形式存储,那类Response就是存储的模型。模型一般是有初始值,在使用的过程中会赋值不同值,满足不同HTTP的返回对象。类Response必然符合HTTP返回信息的相关字段

  • 返回状态码 status_code和描述短语 reason
  • 返回头部字段 headers,以及cookies
  • 网络传输必然涉及到bytes内容的存储 _content和编码信息encoding
  • 整个过程的日志记录信息等 url, history, request, elapsed等
  • 以及其他所需的状态表示和演化而来的property等

从数据流转角度,包括定义模型类,向模型类写数据,从模型类读数据

# 定义模型类

def __init__(self):pass,可以理解为建立库表字段时,定义哪些字段并附加初始值。类的优势可以根据初始属性字段推导出更符合上层使用接口,可仔细分析它们之间的层次结构

# 向模型类写数据

代码写的太好了,不忍加注释,整个过程是属性的确定,没有涉及到Response绑定方法

# requets.adapters.HTTPAdapter
def build_response(self, req, resp):
    """
    :param req: The :class:`PreparedRequest <PreparedRequest>` object
    :param resp: The urllib3 response object
    :rtype: requests.Response
    """
    response = Response()
    response.status_code = getattr(resp, 'status', None)
    response.headers = CaseInsensitiveDict(getattr(resp, 'headers', {}))
    response.encoding = get_encoding_from_headers(response.headers)
    response.raw = resp
    response.reason = response.raw.reason
    if isinstance(req.url, bytes):
        response.url = req.url.decode('utf-8')
    else:
        response.url = req.url
    extract_cookies_to_jar(response.cookies, req, resp)
    response.request = req
    response.connection = self
    return response

# 从模型类读数据

具体Response的使用方式,参考官方文档 (opens new window)

# 逻辑实现 sessions.py

# 背景理论

编程中何时使用函数和类是很有意思的一件事

函数具有确定性,只要根据定义的参数调用即可

  • 可充当类对象上层的对外使用接口,如api.py
  • 作用于变量,驱动变量变化,达到目标
  • 也可以说隐藏处理的细节,简化调用接口

类的三大特性决定了类是一种更复杂的结构

  • 类是一系列变量存储的集合,__init__决定了对象初始的变量定义
  • 内部方法是用来操作这些变量的
  • 继承和组合让嵌套更加复杂

# 源码分析

class Session(SessionRedirectMixin):
    __attrs__ = [
        'headers', 'cookies', 'auth', 'proxies', 'hooks', 'params', 'verify',
        'cert', 'prefetch', 'adapters', 'stream', 'trust_env',
        'max_redirects',
    ]
    # 初始化Session对象含有的属性值,可以理解定义库表时有哪些字段以及默认值
    def __init__(self):
        # 组合一般用法 name = Class(), 当对象较复杂时,可用函数隐藏细节,简化调用,提现编程思想
        # headers HTTP协议中传输的元数据字段信息
        self.headers = default_headers()

        self.auth = None
        self.proxies = {}
        
        # 详解钩子编程hooks.py
        self.hooks = default_hooks()
        
        # 类Session是一个复合结构,包含对Request,Response等处理,需要控制参数来适用不同场景
        self.params = {}
        self.stream = False
        self.verify = True
        self.cert = None
        self.max_redirects = DEFAULT_REDIRECT_LIMIT
        self.trust_env = True
        self.cookies = cookiejar_from_dict({})

        # 类内部经典处理逻辑
        # 定义一个变量 self.adapters = {}
        # 向该变量写数据的方法 self.mount()
        # 从该变量读数据的方法 self.get_adapter()
        self.adapters = OrderedDict()
        self.mount('https://', HTTPAdapter())
        self.mount('http://', HTTPAdapter())

    def __enter__(self):
        return self
    def __exit__(self, *args):
        self.close()

    # 初始化的参数模具已准备好,借助绑定方法去获取外部参数,加工,得到符合规定的参数
    # 绑定方法也是可以区分层次的
    #  - 从api.py得知,session对象的入口方法为request,会重点分析这个内部处理逻辑
    #  - 同样,在底层方法request之上提供其他具体使用场景的方法,通过调用self.request
    #  - get, options,head,post,put,patch,delete, 即session.get(url)的由来
    
    # 当需要处理的参数较多,且每个参数都会有自己的处理逻辑时,requests给我们提供了很好的案例,逐步处理
    # 定义req = Request(**kw) 保存初始接收到的参数,此时相当于把分散的参数汇集起来
    # 定义p = PreparedRequest() 这个类对象更像一个工具类,p.prepare(req各个参数)
    # p.prepare中可以实现每个参数的处理逻辑,最终得的符合条件的p,传给底层send发送即可
    
    def prepare_request(self, request):
        pass

    def request(self, method, url,
            params=None, data=None, headers=None, cookies=None, files=None,
            auth=None, timeout=None, allow_redirects=True, proxies=None,
            hooks=None, stream=None, verify=None, cert=None, json=None):

    def get(self, url, **kwargs):

    def options(self, url, **kwargs):

    def head(self, url, **kwargs):

    def post(self, url, data=None, json=None, **kwargs):

    def put(self, url, data=None, **kwargs):

    def patch(self, url, data=None, **kwargs):

    def delete(self, url, **kwargs):

    # 底层发送逻辑,上层接口调用时简单明了
    def send(self, request, **kwargs):

    def merge_environment_settings(self, url, proxies, stream, verify, cert):
        
    def get_adapter(self, url):

    def close(self):

    def mount(self, prefix, adapter):
    
    # pickle模块序列化的时候,涉及的相关字段
    def __getstate__(self):
        # list推导出dict, 常用技巧
        state = {attr: getattr(self, attr, None) for attr in self.__attrs__}
        return state

    def __setstate__(self, state):
        # self存储容器,存储池,存储对象,存储模型,而getattr setattr操作存储的方法
        for attr, value in state.items():
            setattr(self, attr, value)

# 核心结构图

# 交互协定 status_code.py

# 理论背景

实际开发中数字来表示不同的状态,状态较少时直接配置常量定义即可,如

# 使用时直接使用变量名,含义更清晰
PREPARE = 0
GET_TASK = 1
DO_TASK = 2
CLEAN_UP = 3

HTTP协议客户端与服务端交互的协议,双方定义的状态码较多,需采用一种更加灵活的方式

# 源码分析

# 定义原始数据结构
_codes = {
    200: ('ok', 'okay', 'all_ok', 'all_okay', 'all_good', '\\o/', '✓'),
    301: ('moved_permanently', 'moved', '\\o-'),
    302: ('found',),
    404: ('not_found', '-o-'),
    500: ('internal_server_error', 'server_error', '/o\\', '✗'),
}

# codes全局变量,LookupDick对象,codes.okay或codes['okay'] 都行
codes = LookupDict(name='status_codes')

# 因requests/__init__.py中from .status_codes import codes,所以会执行该模块代码

def _init():
    for code, titles in _codes.items():
        for title in titles:
            setattr(codes, title, code)
            if not title.startswith(('\\', '/')):
                setattr(codes, title.upper(), code)
# 导入时已触发执行: 全局变量codes经过setattr把所有短语和状态码进行绑定
_init()
  • 全局项目可以使用codes.name来表示相应状态码
  • _codes作为原始数据源,自动加载设置成codes

# 项目实践

# 1. API通用返回格式

前后端分离开发离不开双发返回状态码的格式定义,随着业务的不断增长,状态码定义需可配置

from status_code import store as sc
from flask import jsonify
class APIResult(dict):
    def __init__(self, code, result=None, msg=None)
        self['code'] = code
        self['msg'] = msg or sc.get_error_msg(code)
        self['result'] = result if result is not None else {}
    
    def __call__(self, *arg, **kw):
        return self.jsonify()
    
    def jsonify(self):
        json_resp = jsonify(**self)
        json_resp.headers['Cache-Control'] = 'no-cache'
        return json_resp

# resp 就是flask 标准的json序列化后的返回对象
# 格式规定:code, result, msg  其中code与msg相对应
resp = APIResult(0, result={"data": [1, 2, 3], msg="成功"})()

如何实现可配置扩展的状态码映射结构体

  • 定义一个结构体存储 错误码: (错误代码名称,默认错误信息)
  • 生成的对象支持 store.E_SUCC
import types

DEFAULT_DICT = {
    0: ('E_SUCC', '成功'),
    1: ('E_PARAM', '参数错误'),
    2: ('E_INTER', '程序内部错误'),
    3: ('E_EXTERNAL', '外部接口错误'),
    4: ('E_TIMEOUT', '第三方接口超时'),
    5: ('E_RESRC', '接口不存在'),
    6: ('E_AUTH', '鉴权失败'),
    7: ('E_FORBIDDEN', '访问被禁止'),
    8: ('E_RESOURCE_NOT_FIND', '资源不存在或已删除')
}

class StatusCodeStore(object):
    DEFAULT_STORE = None
    def __init__(self, codes=None):
        self.codes = codes if type(codes) is dict else {}
        self.refresh()
    
    def refresh(self):
        self.reverse = {}
        set_into_modeluls(self.reverse, from_store=self)
    
    def get_error_msg(self, code):
    if isinstance(code, str) and code.isdigit():
        code = int(code)
    _, msg = self.codes.get(code, (None, None)
    return msg or '未知错误'
    
    def __getattr__(self, name):
        code = self.reverse[name]
        return code

def set_into_modules(target, from_store=None):
    from_store = StatusCodeStore.DEFAULT_STORE if from_store is None else from_store
    
    if isinstance(target, dict):
        target_dict = target
    elif isinstance(target, types.ModuleType):
        target_dict = target.__dict__
    for (code, (name, msg)) in from_store.codes.items():
        target_dict[name] = code

store = StatusCodeStore.DEFAULT_STORE = StatusCodeStore(DEFAULT_DICT)
  • 根据项目需要,可以自定义status_code.json文件,或者单独项目以供多个项目使用

# 2. 函数之间标识符

# 定义方
def func():
    flag = True
    if flag:
        print("Flag is OK.")
        return True, None
    return False, "msg"
    
# 调用方
code, msg = func()
# 根据code码来判断后续执行逻辑

# 3. 多人之间协议

from collections import namedtuple

# 开户基本信息
OpenProtocol = namedtuple(
    "OpenProtocol",
    "identity_card,"
    "identity_type,"
    "bank_number,"
    "data"
)

# 一方做本地的业务操作,一方做第三发连接操作,只要都满足OpenProtocol协议字段即可

# 数据结构 structures.py

dict类型不但在各类程序中广泛使用,它也是Python语言的基石。模块的命名空间、实例的属性和函数的关键参数都可以看到字典的身影。跟它有关的内置函数都在__builtins__.__dict__模块中。

                                                                                                                                    -   来自&lt;&lt;流畅的Python&gt;&gt;

# 背景理论

collections.abc模块中有Mapping和MutableMapping这两个抽象基类,它们的作用是为dict和其他类似的类型定义形式接口,即定义了构建一个映射类型所需要的最基本接口

# 源码分析

class CaseInsensitiveDict(MutableMapping):
    # CaseInsensitiveDict 可
    def __init__(self, data=None, **kwargs):
        self._store = OrderedDict()    # 内部定义一个存储
        if data is None:               # None 常见用法
            data = {}
        self.update(data, **kwargs)    # 可选项 update

    def __setitem__(self, key, value):
        # key.lower(): (key, value), 注意存储的结构
        self._store[key.lower()] = (key, value)

    def __getitem__(self, key):
        # D[key]触发,联系存储结构中的值 (key, value)
        return self._store[key.lower()][1]

    def __delitem__(self, key):
        # del D[key]触发
        del self._store[key.lower()]

    def __iter__(self):
        # 生成器表达式可解决大数据量问题
        return (casedkey for casedkey, mappedvalue in self._store.values())

    def __len__(self):
        # 围绕基本数据结构self._store
        return len(self._store)

    def lower_items(self):
        # 定义独有的方法
        return (
            (lowerkey, keyval[1])
            for (lowerkey, keyval)
            in self._store.items()
        )

    def __eq__(self, other):
        # object == object 时触发
        if isinstance(other, Mapping):
            other = CaseInsensitiveDict(other)
        else:
            return NotImplemented
        # 比较很有意思
        return dict(self.lower_items()) == dict(other.lower_items())

    def copy(self):
        # CaseInsensitiveDict 直接实例化,牛叉
        return CaseInsensitiveDict(self._store.values())

    def __repr__(self):
        return str(dict(self.items()))
# 小结:核心self._store + 映射规范

# dict的扩展
class LookupDict(dict):
    def __init__(self, name=None):
        self.name = name
        super(LookupDict, self).__init__()

    def __repr__(self):
        return '<lookup \'%s\'>' % (self.name)

    def __getitem__(self, key):
        return self.__dict__.get(key, None)

    def get(self, key, default=None):
        return self.__dict__.get(key, default)
上次更新: 8/28/2022, 12:39:12 PM