# 源码分析-django
# 环境搭建 env
# 工具版本
- macOS High Sierra 10.13.6
- Python 3.8.3
- Django 3.1.5
- PyCharm 2020.2 (Community Edition)
PyCharm的断点调试特别方便
# 代码环境
- fork 官方django源码,拉取到本地,新建分支
read_django_source
- 新建虚拟环境
dj_source_env
,软连接本地django源码到虚拟环境的site-packages
中 - 进入虚拟环境,新建调试项目
mysite
, 那么所依赖的django环境为https://github.com/ni-ning/django.git
,从而动态调试django源码
# 建立当前用户学习目录 learning
cd && mkdir learning
# 拉取fork源码,建立注解分支 read_django_source
git clone https://github.com/ni-ning/django.git
git fetch origin stable/3.1.x
git checkout stable/3.1.x
git checkout -b read_django_source
# 建立虚拟环境
cd ~/learning && mkdir envs
cd ~/learning/envs
virtualenv -p /Library/Frameworks/Python.framework/Versions/3.8/bin/python3 dj_source_env
source ~/learning/envs/dj_source_env/bin/activate
# django的注解分支 映射到 虚拟环境中方便断点调试
ln -s ~/learning/django/django ~/learning/envs/dj_source_env/lib/python3.8/site-packages/django
# 创建测试项目 mysite
python ~/learning/django/django/bin/django-admin.py startproject mysite
# 创建项目时需要额外安装依赖
pip install asgiref
pip install pytz
pip install sqlparse
# 建立快捷命令 vim ~/.bash_profile
alias dsource='source /Users/nining/learning/envs/dj_source_env/bin/activate'
alias cddjango='cd /Users/nining/learning/django/'
alias cdmysite='cd /Users/nining/learning/mysite/'
# 代码结构 structure
外层项目结构可以打包、测试、文档等,重点是内部的源码结构是我们重点研究的对象
# 项目结构
根据名称就可以推断目录的作用,可总结一下打包发布到官方pypi
流程
- setup.py 打包
distuils是标准库中负责建立Python第三方库的安装器,对于简单的分发很有用,但功能缺少。 _setuptools_是distuitls增强版, 不包括在标准库中, 是一个优秀的, 可靠的Python包安装与分发工具
本地开发者 --> 打包上传pypi服务器 --> 使用者直接 pip install <package>
# 1. 在用户目录下创建.pypirc文件。
# 添加权限:chmod 600 ~/.pypirc
[distutils]
index-servers = pypi
[pypi]
repository=https://upload.pypi.org/legacy/
username=******
password=******
# 2. 安装twine
pip install twine
# 3. 在项目根目录执行
python setup.py sdist bdist_wheel
# 4. 上传打包文件
twine upload dist/*
- setup.py 常用配置
详细说明参考官文文档 (opens new window)
PS:被忽视的攻击面:Python package 钓鱼,https://paper.seebug.org/326/ (opens new window)
# 源码结构
django/django
- apps # django的思想是把app作为独立单元来运作的,对外接口AppConfig和apps
- bin # django-admin -> management.execute_from_command_line()
- conf # app_template和project_template创建app和project模块;global_settings.py 全局默认配置;可以重点研究一下 __init__中的LazySettings
- contrib # 可插拔
|- admin # resister
|- auth # authenticate,login,logout,get_user_model,get_user
|- sessions # Session
|- ....
- core
|- cache # 缓存 内容变化不快 但访问量大的场景,如首页
|- checks # 如检查数据库配置是否可以正常连接
|- files # 文件上传相关
|- handlers # wsgi实现
|- mail # 邮件
|- management # python manage.py <命令>
|- serializers # 序列化相关 queryset甚至可以为yaml
|- servers # HTTP server
|- exceptions.py # 全局自定义异常
|- paginator.py # 分页实现逻辑
|- signals.py # 全局信号
|- signing.py # 签名
|- validators.py # 相关通用正则验证
|- wsig.py # wsig 接口 get_wsgi_application
- db # 数据库相关 重点研究
|- backends # 策略支持不同数据库
|- models # 重点研究
- dispathch # 如信号具体实现
- forms # form表单映射
- http # HttpRequest, HttpResponse, Http404, SimpleCookie 重点研究
- middleware # 重点研究
- template
- templatetags
- test # 单元测试
- urls # 现代化的path, re_path, include
- utils # 实用工具 重点研究
- views # 视图 重点研究 catch csrf Class-based View
- shortcuts.py # 快捷键
- pip install本质是把django/django源码文件拷贝到site-package中
# 项目创建 project
创建项目 django-admin startproject mysite
和创建应用django-admin startapp app
基本流程
# django/bin/django-admin.py 命令脚本实际执行逻辑
from django.core import management
management.execute_from_command_line()
from django.core.management import execute_from_command_line
当这行代码开始执行时,首先运行django.core.management.__init__.py
整个文件,找到execute_from_command_line
函数并将其导入当前程序的命名空间中;- 不可忽略的是
djanog.core.management.__init__.py
也包含import语句
djanog.core.management.__init__.py
...
import django # 这里
from django.apps import apps # 这里
from django.conf import settings
class ManagementUtility:
def __init__(self, argv=None):
pass
def main_help_text(self):
# 帮助信息
def fetch_command(self, subcommand):
pass
def autocomplete(self):
# 自动补全
def execute(self):
# 执行入口函数
def execute_from_command_line(argv=None):
utility = ManagementUtility(argv)
utility.execute()
import django
运行了文件django.__init__.py
from django.apps import apps
运行了文件django.apps.__init__.py
整个django的开端就从这里开始
execute_from_command_line
工厂函数,负责指挥ManagementUtility
类利用execute
方法来解析参数和启动wsgi
服务,重点在于self.fetch_command(subcommand).run_from_argv(self.argv)
self.fetch_command
利用django内置的命令管理区去匹配到具体的模块,self.fetch_command('startproject')
相当于django.core.management.commands.startproject.Command()
run_from_argv(self.argv)
app_or_project 从django/conf/模板
经过Context
渲染,然后shutil.copyfile
# 本地启动 runserver
python manage.py runserver
Django框架的整个数据流向、服务启动、端口监听等基础核心功能都是按照WSGI标准进行开发的
- WSGI, Web Server Gateway Interface, Web服务器和Web框架或应用的协议接口
- 主要作用是接收客户端(浏览器/软件/APP/调试程序)发送的请求和转发请求给上层服务(例如Django)
- 采用select网络模型进行数据的接收和分发,可以利用操作系统的非堵塞和线程池等特性,因此非常高效,如开源项目uWSGI是C语言代码实现
- Django采用wsgi是纯python代码实现,实际项目部署使用 uWSGI 或者 Gunicorn
# checks 校验
一个完整的功能设计可包括:
- 功能定义 如
django/core/checks/registry.py
中定义 数据字典class Tags
, 定义类并实例化为功能函数,作为对外提供的接口 - 功能使用 如
django/core/checks/database.py
中 使用 register,且要符合功能定义时规范,如返回列表issues - 嵌入到整体中,成为一部分,如self.check(display_num_erros)、self.check_migrations() 启动初始化校验
# 内部流程
self.fetch_command
利用django内置的命令管理区去匹配到具体的模块,self.fetch_command('runserver')
相当于django.contrib.staticfiles.management.commands.runserver.Command()
,django中的命令工具代码组织采用的是策略模式+接口模式,也就是说django.core.management.commands这个目录下面存在各种命令工具,每个工具下面都有一个Command接口,当匹配到’runserver’时调用’runserver’命令工具的Command接口,当匹配到’migrate’时调用’migrate’命令工具的Command接口run_from_argv(self.argv)
初始化中间件、启动服务,也就是拉起wgsi
# 先捋清楚 继承关系链
WSGIServer -> simple_server.WSGIServer -> http.server.HTTPServer -> scokectserver.TCPServer -> socketserver.BaseServer
小结:对于开发测试环境runserver而言,Django统一实现的 WSGIHandler + SocketServer + App 对外提供服务;实际部署时nginx + uwsgi承担WSGI,而Django只是作为App
# runserver.py中 def inner_run中代码
try:
handler = self.get_handler(*args, **options)
run(self.addr, int(self.port), handler,
ipv6=self.use_ipv6, threading=threading, server_cls=self.server_cls)
except OSError as e:
pass
- 初始化WSGIHandler时调用self.load_middleware()
- WSGIHandler中会处理Request和Response所有逻辑,其中就涉及到url解析
PS:模型示例
- 内部实现原理可用下面模型总结 基于IO多路复用的socket的服务端
# coding: utf-8
import socket
import select
class Handler(object):
""" 业务功能代码 """
def __init__(self, conn):
self.conn = conn
def execute(self):
"""
处理客户端的请求
:return: False,断开连接;True,继续执行当前客户端发来的请求
"""
data = self.conn.recv(1024)
content = data.decode('utf-8')
if content.upper() == "Q":
return False
# 1.处理登录
# "login linda"
# 2.注册
# "register linda"
self.conn.sendall(b'OK')
return True
class Server(object):
""" 基于普通socket的服务端 """
def run(self, handler_class):
# socket服务端
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("IP", "端口"))
sock.listen(5)
while True:
# 等待,客户端发来连接
conn, address = sock.accept()
# 新客户端到来。 Handler对象
instance = handler_class(conn)
# 处理客户端的请求。 Handler对象.execute
while True:
result = instance.execute()
if not result:
break
conn.close()
sock.close()
class SelectServer(object):
""" 基于IO多路复用的socket的服务端 """
def run(self, handler_class):
server_object = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_object.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_object.setblocking(False)
server_object.bind(("IP", "端口"))
server_object.listen(5)
socket_object_list = [server_object, "客户端socket对象1", "客户端socket对象2"]
conn_handler_map = {}
# conn_handler_map = {
# "客户端socket对象1": Handler(conn1),
# "客户端socket对象2": Handler(conn2),
# }
while True:
# r = ["客户端socket对象4", ]
r, w, e = select.select(socket_object_list, [], [], 0.05)
for sock in r:
# sock="客户端socket对象4"
# 新连接到来,执行 handler的 __init__ 方法
if sock == server_object:
print("新客户端来连接")
conn, address = server_object.accept()
socket_object_list.append(conn)
# 实例化handler类,即:类(conn)
conn_handler_map[conn] = handler_class(conn)
continue
# 一旦有请求发来,找到相关的 handler对象,执行他的 execute方法。
# execute方法返回False,则意味着此客户端要断开连接。
handler_object = conn_handler_map[sock] # 自己Handler(conn1),
# 找到execute去处理各自的业务逻辑
result = handler_object.execute()
if not result:
socket_object_list.remove(sock)
del conn_handler_map[sock]
sock.close()
if __name__ == '__main__':
# server = Server()
server = SelectServer()
server.run(Handler)
从服务端代码角度,socket可以分为自己的服务端socket和客户端socket
- 服务端socket有个额外功能,可以接收accept客户端socket
- 服务端socket和客户端socket统一交给select.select进行选择判断,看看哪个socket准备好了
- 准备好了的意思是说:
- 判定服务端socket可以接收新的请求
- 判定客户端socket可以收发数据
- 每次socket的切换上下文的保存,也就是如何再定位到原先的socket,细节是由select.select实现的,我们只要实现socket收发业务逻辑就行
# 解析django.setup()
调用apps.populate(settings.INSTALLED_APPS)
- 加载app模块
- 加载app下的models模块
- 调用加载完成的ready事件
# 调试工具 pdb
pdb - The Python Debugger
print --> Python 自带的Debug工具:pdb
- 非侵入式方法(不用额外修改源代码,在命令行下直接运行就能调试)
python -m pdb filename.py
- 侵入式方法(需要在被调试的代码中添加一行代码然后再正常运行代码
import pdb; pdb.set_trace()
当你在命令行看到下面这个提示符时,说明已经正确打开了pdb
(Pdb) h or help command
l(ist) [first [,last] | .] # List source code for the current file.
ll # List the whole source code for the current funtion or frame.
b/tbreak/cl # 添加/临时/清除断点
p expression # Print the value of the expression
# 逐行调试命令 - 执行下一行
s # 能够进入函数体
n # 不会进入函数体
r # 在函数中时会直接执行到函数返回处
# 非逐行调试命令
c # 持续执行下去,直到遇到一个断点
unt lineno # 持续执行直到运行到指定行(或遇到断点)
j lineno # 直接跳转到指定行(注意,被跳过的代码不执行)
a # 在函数中时打印函数的参数和参数的值
whatis expression # 打印表达式的类型,常用来打印变量值
interact # 启动交互式解释器
w # 打印堆栈信息
enter # 执行最近命令
PS:pip install ipdb (opens new window),基于ipython的pdb,带颜色
# 路由匹配 url
全局搜索配置文件中 ROOT_URLCONF
,定位于 class BaseHandler中的 def _get_response
def _get_response(self, request):
"""
Resolve and call the view, then apply view, exception, and
template_response middleware. This method is everything that happens
inside the request/response middleware.
"""
response = None
callback, callback_args, callback_kwargs = self.resolve_request(request)
# Apply view middleware
for middleware_method in self._view_middleware:
response = middleware_method(request, callback, callback_args, callback_kwargs)
if response:
break
if response is None:
wrapped_callback = self.make_view_atomic(callback)
# If it is an asynchronous view, run it in a subthread.
if asyncio.iscoroutinefunction(wrapped_callback):
wrapped_callback = async_to_sync(wrapped_callback)
try:
response = wrapped_callback(request, *callback_args, **callback_kwargs)
except Exception as e:
response = self.process_exception_by_middleware(e, request)
if response is None:
raise
# Complain if the view returned None (a common error).
self.check_response(response, callback)
# If the response supports deferred rendering, apply template
# response middleware and then render the response
if hasattr(response, 'render') and callable(response.render):
for middleware_method in self._template_response_middleware:
response = middleware_method(request, response)
# Complain if the template response middleware returned None (a common error).
self.check_response(
response,
middleware_method,
name='%s.process_template_response' % (
middleware_method.__self__.__class__.__name__,
)
)
try:
response = response.render()
except Exception as e:
response = self.process_exception_by_middleware(e, request)
if response is None:
raise
return response
- callback, callback_args, callback_kwargs = self.resolve_request(request)
# 数据模型 models
django/db
- django/db
|- backends # 支持多种数据库驱动
|- models # 模型相关,如manager.py、query.py等
|- init.py # 对外导入接口
|- transaction.py # 事务相关操作
|- utils.py # 连接connection相关操作
# 基本使用
# 基础理论
- ORM 实现数据模型与数据库的解耦,即数据模型的设计不依赖于特定的数据库
- ORM,Object Relation Mapping, "对象-关系-映射"
# 创建表
create table employee(
id int primary key auto_increment,
name varchar(32) not null,
gender bool default 1 not null,
birthday date,
department varcher(32) not null,
salary decimal(8, 2)
);
insert employee (name, gender, birthday, department, salary) values ('linda', 0, '2000-01-01', 'Sale', 8000.00);
select * from employee where name = 'linda';
update employ set salary = 10000.00 where name = 'lidna';
delete from employ where name = 'linda';
# python类
class Employee(models.Model):
id = models.AutoField(primary_key=True)
name = models.CharField(max_length=32)
gener = models.BooleanField(default=True)
birthday = models.DateField(null=True, blank=True)
department = models.CharField(max_length=32)
salary = models.DecimalField(max_digits=8, decimal_places=2, default=0)
emp = Employee(name='linda', gender=False, birthday='Sale', salary=8000.00)
Employee.objects.filter(name='linda')
Employee.objects.filter(name='linda').update(salary=10000.00)
Employ.objects.filter(name='linda').delete()
- 名称,见名识意
- 类型,数值型 字符型 日期型
- 约束,主键 外键 默认值 唯一性 非空
- 设计的字段是用来确定唯一性的标识字段,还是唯一性基础上的属性说明字段
- 关联关系的 往上字典 往下列表
A = models.ForeignKey(B, related_name='a_foreigh_key_b')
- 正向查询: A --------> B
- 反向查询: B --------> A
# 多表分组查询
分组查询SQL: 查询每一个出版社的名称以及出版的书籍个数
select publish.name, count('title')
from book inner join publish
on book.publish_id = publish.id
group by publish.id, publish.name, publish.addr, publish.email # 主键publish.id唯一非空,效果相同
Models查询:查询每一个出版社的名称以及出版的书籍个数
# select id from publish,即按照select 语句进行分组
# <QuerySet [{'id': 1, 'book_count': 100}, {'id': 2, 'book_count': 200}]> 有几个字典代表对应出版社个数
# values('其实可展示 Publish 对象字段 和 聚合值 book_count')
# Count('book__title') 反向查询表名小写__字段名
Publish.objects.values('id').annotate(book_count=Count('book__title')).values('name', 'email', 'book_count')
# <QuerySet [<Publish: 人民出版社>, <Publish: 南京出版社>]>
Publish.objects.all()
# <QuerySet [<Publish: 人民出版社>, <Publish: 南京出版社>]> 显示对象
Publish.objects.all().annotate(book_count=Count('book__title'))
# select id, name, publish, addr, email 与 id 结果相同
Publish.objects.all().annotate(book_count=Count('book__title')).values('name', 'email', 'book_count')
Publish.objects.annotate(book_count=Count('book__title')).values('name', 'email', 'book_count')
# .filter(book_count__gt=1) having book_count > 1 统计完了之后再过滤 having
Publish.objects.annotate(book_count=Count('book__title')).values('name', 'email', 'book_count').filter(book_count__gt=1)
总结 跨表的分组查询的模型:
- 每一个后的表模型.objects.values('pk').annotate(聚合函数(关联表__统计字段)).values('表模型的所有字段以及统计字段')
- 每一个后的表模型.objects.annotate(聚合函数(关联表__统计字段)).values('表模型的所有字段以及统计字段')
# aggregate 和 annotate
annotate()可以得到一些我们手动计算得到的值,并将其作为Queryset中Item的一个属性来调用
aggregate()可以使得查询的返回值由一个Queryset变成一个dict,每个key和对应的value由自己计算得到。
Case/When
Staff.objects.annotate(
age_tag = Case(
When(
age__lt=18,
then=1,
),
When(
age__lt=30,
then=2,
)
When(
age__lt=60,
then=3,
)
default=0,
output_field=IntegerField(),
)
)
# F查询 与 Q查询
model进行查询时通常是 某个字段和一个常量
对比,涉及到变量时,可用F查询
from django.db.models import F, Q
# 评论数大于阅读数
SQL: select * from book where comment_num > read_num
Model: Book.objects.filter(comment__num__gt=F('read_num'))
# 每一本书价格加10
Book.objects.all().update(price=F('price') + 10)
# 且关系 等同于 Q('红楼梦') & Q(price=100)
Book.objects.filter(title='红楼梦', price=100)
# 或关系
Book.objects.filter(Q(title='红楼梦') | Q(price=100))
# 非关系
Book.objects.filter(~Q(title='红楼梦')
# 注意顺序 先 Q,后字段
Book.objects.filter(Q(title='红楼梦') | Q(price=100), comment_num__gte=100)
组合技
import operator
from django.db.models import Q
q1 = reduce(operator.and_, Q(gender='F'))
q2 = reduce(operator.or_, Q(gender='M'))
# 最佳实践
- 当需要同时写多张表的时候,为了保证数据的一致性,需要使用事务
# 定义事务时间点
sid = transaction.savepoint(using=settings.WRITE_DATABASE)
# 成功提交
transaction.savepoint_commit(sid, using=settings.WRITE_DATABASE)
# 失败回滚
transaction.savepoint_rollback(sid, using=settings.WRITE_DATABASE)
# with 上下文形式
with transaction.atomic(using=settings.WRITE_DATABASE):
pass
- default=None -> isnull=True
# 定义MySQL数据字段类型
ALTER TABLE `teacher` ADD COLUMN first_class_time DATETIME DEFAULT NULL;
# 定义Model字段
first_class_time = models.DateTimeField(help_text='老师的第一节课', default=None)
# 使用Model字段查询
models.Teacher.objects.filter(first_class_time__isnull=False).count()
排序 主键索引 order_by('-id')
collections.Counter()
course_ids = list(orders.values_list('course_id', flat=True))
times_of_course_info = collections.Counter(course_ids)
times_of_course_info = dict(times_of_course_info.most_common(10))
# backends
- backends
|- base
|- dummy
|- mysql
|- base.py # 统一定义 class DatabaseWrapper
|- oracle
|- postgresql
|- sqlite3
- 规定好目录接口,对外暴露统一的接口,实现支持多种数据库,而使用方感知不到
- backends/base中的 BaseDatabaseWrapper 定义connect函数,而具体的mysql/base中的 def get_new_connection(self, conn_params): 进行实际的MySQLdb.connect(**conn_params)
# models
query_set = Book.objects.all()
print(query_set.count())
print(len(query_set))
Book.objects.all().select_related('category')
- 查询也是可以放到Model上来处理,但是这样会导致Model操作功能较多,所以objects为Manager实例,单独来处理数据库操作
- Manager是QeurySet完整代理
# transaction.py
- 当需要同时写多张表的时候,为了保证数据的一致性,需要使用事务
# 定义事务时间点
sid = transaction.savepoint(using=settings.WRITE_DATABASE)
# 成功提交
transaction.savepoint_commit(sid, using=settings.WRITE_DATABASE)
# 失败回滚
transaction.savepoint_rollback(sid, using=settings.WRITE_DATABASE)
# with 上下文形式
with transaction.atomic(using=settings.WRITE_DATABASE):
pass
# utils.py
def load_backend(backend_name):
# setttings中配置策略匹配到 backends中 DatabaseWrapper
try:
return import_module('%s.base' % backend_name)
except ImportError as e_user:
raise
# 自定义Handler类,让使用起来类似一个字典
class ConnectionHandler:
def __init__(self, databases=None):
self._databases = databases
self._connections = Local(thread_critical=True)
@cached_property
def databases(self):
if self._databases is None:
self._databases = settings.DATABASES
if self._databases == {}:
self._databases = {
DEFAULT_DB_ALIAS: {
'ENGINE': 'django.db.backends.dummy',
},
}
if DEFAULT_DB_ALIAS not in self._databases:
raise ImproperlyConfigured("You must define a '%s' database." % DEFAULT_DB_ALIAS)
if self._databases[DEFAULT_DB_ALIAS] == {}:
self._databases[DEFAULT_DB_ALIAS]['ENGINE'] = 'django.db.backends.dummy'
return self._databases
def __getitem__(self, alias):
if hasattr(self._connections, alias):
return getattr(self._connections, alias)
self.ensure_defaults(alias)
self.prepare_test_settings(alias)
db = self.databases[alias]
backend = load_backend(db['ENGINE'])
conn = backend.DatabaseWrapper(db, alias)
setattr(self._connections, alias, conn)
return conn
# 信号系统 signal
高内聚 低耦合
- Django 内置信号使用
from django.db import models
from django.db.models.signals import post_save
from django.dispatch import receiver, Signal
class Category(models.Model):
name = models.CharField(max_length=128, verbose_name='类名')
updated_time = models.DateTimeField(auto_now=True)
created_time = models.DateTimeField(auto_now_add=True)
class Book(models.Model):
title = models.CharField(max_length=128, verbose_name='书名')
category = models.ForeignKey(Category, on_delete=models.DO_NOTHING)
updated_time = models.DateTimeField(auto_now=True)
created_time = models.DateTimeField(auto_now_add=True)
# django下的已定义好的post_save
@receiver(post_save, sender=Book)
def notify_thirty(sender, **kwargs):
print(sender, kwargs)
- 自定义信号使用
from django.dispatch import receiver, Signal
# 定义signal方式
pizza_done = Signal(providing_args=['size'])
# 触发操作点
def handler(self, *arg, **kw):
sender = self.__class__
pizza_done.send(sender=sender, size=10)
# 触发回调函数
@receiver(pizza_done)
def launch(sender, **kw):
print(sender, kw)
# or 触发回调函数
# pizza_done.connect(launch)
- 源码所在模块
django/dispatch/dispatcher.py
不是很复杂,可以精读一下,观察者模式
# 多级缓存 caches
源码位置
django/core/cache
根据settings.CACHES封装对外的caches、cachedjango/middleware/cache.py
实现页面级别缓存
MIDDLEWARE = [
'django.middleware.cache.UpdateCacheMiddleware',
...
'django.middleware.cache.FetchFromCacheMiddleware'
]
# 认证权限 auth
你是谁,能干啥
源码位置
MIDDLEWARE = [
'django.contrib.sessions.middleware.SessionMiddleware',
...
'django.contrib.auth.middleware.AuthenticationMiddleware',
]