1.jdango和flask框架的认识? -对于请求的处理方式不一样:django是直接以参数的形式传入,flask则是上下文管理的机制 2.flask中上下文管理机制? -当一个请求过来之后 先通过wsgi处理 然后会执行app.__call__方法 该方法又会执行wsgi_app方法 会创建一个ctx而ctx又是一个RequestContext的对象里面含有request和session 之后会通过localstack将ctx放入local中以{线程id号:{'stactk':[ctx]}形式存储 取值时是基于localproxy类再调用一个函数让localstack从local中取出ctx中的request或者session 3.为什么把请求放到RequestContext中? -ctx = RequestContext()——> request, session 对于请求数据request和session会经常用放一起便于导入 4.Local对象的作用? -看过Local源码,和threading.local相似,但是又有不同 -Local中可以基于协程greenlet获取唯一标识,粒度更细。 5.LocalStack对象的作用? -对Local对象中的数据进行操作。 -将Local对象中的数据维护成了一个栈(先进后出) -local = { 1231:{stack:[ctx,ctx...]} } 6.上下文管理? -请求上下文:request/session -APP和g:app/g 7.什么是g? -一次请求周期内的全局变量在reques_before时定义,后面可以直接调用 8.获取session/g -LocalProxy 9.技术: -反射 -面向对象,封装 -__dict__ -线程(threading.local) -笔试:自己写一个类+列表实现栈。(LocalStack实例) 10.实例化的类可被for循环 -该类的__iter__方法返回一个可迭代对象
class Foo: def __iter__(self): # 返回迭代器 # return iter([11, 22, 33, 'xx']) # 返回生成器(特殊的迭代器) yield 11 yield 22 yield 33 yield 'xx' obj = Foo() for i in obj: print(i) >>>> 11 22 33 xx
from flask import Flask, request, session from flask_session import RedisSessionInterface from redis import Redis # 默认方式将session存入cookie返回到浏览器保存 # from flask.sessions import SecureCookieSessionInterface # app.session_interface = SecureCookieSessionInterface() # 方式一:redis保存session可以数据保存在后台 app.session_interface = RedisSessionInterface( redis=Redis(host='localhost', port=6379), key_prefix='flask_xx' # session中随即字符串的 flask_xx+uuid4 ) # 方式二:redis保存session from flask_session import Session from redis import Redis app.config['SESSION_TYPE'] = 'redis' app.config['SESSION_REDIS'] = Redis(host='localhost', port=6379) Session(app)
home.py
from flask import Blueprint, session home = Blueprint('home', __name__) @home.route('/index') def index(): # session['user_info'] = {'k1': 1, 'k2': 2} # 会调用__setitem__方法里面会执行on_update将modified改为True user_info = session.get('user_info') print("原来的值", user_info) session['user_info']['k1'] = 777 # 只执行了__getitem__方法此时modified还是为False,所有不会修改cookie中的session user_info = session.get('user_info') print("修改后的值", user_info) # 方式一:手动将modified改为True session['modified'] = True # 方式二:将SESSION_REFRESH_EACH_REQUEST在配置中设置为True(这个还能刷新session的超时时间)、同时还要在登录成功之后还要让session.permanent = True return 'index' @home.route('/test') def test(): user_info = session.get('user_info') print(user_info) return 'test'
account.py
from flask import Blueprint, redirect, render_template,request,session from uuid import uuid4 account = Blueprint('account', __name__) @account.route('/login', methods=['GET', 'POST']) def login(): error='' if request.method == 'POST': user = request.form.get('user') pwd = request.form.get('pwd') if user=='lem' and pwd == '123': uid = uuid4() session.permanent = True session['user_info'] = {'id':uid, 'name':user} return redirect('/index') else: error = '用户名或者密码错误' return render_template('login.html', error=error)
settings.py
from datetime import timedelta class Config: DEBUG = True SECRET_KEY = 'lem' # session加密的盐 PERMANENT_SESSION_LIFETIME = timedelta(minutes=30) # 超时时间30分钟失效 SESSION_REFRESH_EACH_REQUEST = True # 每次更新session重复保存以保证session超时时间改变 class DevelopmentConfig(Config): pass class ProductionConfig(Config): pass class TestingConfig(Config): pass
作用:将默认保存的签名cookie中的值保存到 redis/memcached 应用: -方式一: -配置 -app.config['SESSION_TYPE'] = 'redis' -app.config['SESSION_REDIS'] = Redis(host='localhost', port=6379) -替换 -from flask_session import Session -Session(app) -方式二: -from flask_session import RedisSessionInterface -app.session_interface = RedisSessionInterface(redis=Redis(host='localhost',port=6379), key_prefix='flask_xx' ) 注意:session中存储的是字典,修改字典内部元素时,会造成数据不更新。 -1.modified=True -2.SESSION_REFRESH_EACH_REQUEST = True and session.permanent = True
模式: -每个线程创建一个链接,关闭(默认不关闭),线程终止时,才关闭链接 -创建共享连接池 应用: -只要写原生SQL,就要用数据库链接池
import pymysql from dbutils.pooled_db import PooledDB POOL = PooledDB( creator=pymysql, # 使用连接数据库的模块 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,连接池中至少创建的空闲的连接,0表示不创建 maxcached=5, # 连接池中最多闲置的连接,0和None不限制 # 连接池中最多共享的连接数量,0和None表示全部共享.PS:无用,因为pymysql和MYSQLdb等模块的threadsafety = 1,而threadsafety=1时_maxshared默认为0,所有永远都是所有连接都共享 maxshared=3, blocking=True, # 连接池中如果没有可用的连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage=None, # 一个连接池最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to...", "set time zone..."] # ping Mysql服务端,检查是否服务可用。如:0=None=never,1=default=whenever it is requested,2=when a cursor is created, 4=when a query is executed,7=always ping=0, # 工作中一般都是0/4/7 host='localhost', port=3306, user='root', password='123456', database='flask', charset='utf8' ) def func(): """ 检测当前正在运行连接数是否小于最大连接数,如果不小于则:等待或者抛出raise TooManyConnections异常 否则优先去初始化时创建的连接中获取连接,SteadyDBConnection 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回 一旦关闭链接后,连接就返回到连接池让后续线程继续使用 :return: """ conn = POOL.connection() # print(th, "连接被拿走了", conn1._con) # print(th, "池子里目前有", pool.idle_cache,'\r\n') cursor = conn.cursor(pymysql.cursors.DictCursor) cursor.execute("select * from user") res = cursor.fetchall() conn.close() return res
class Config: DEBUG = True SECRET_KEY = 'lem' # session加密的盐 PERMANENT_SESSION_LIFETIME = timedelta(minutes=30) # 超时时间30分钟失效 SESSION_REFRESH_EACH_REQUEST = True # 每次更新session重复保存以保证session超时时间改变 SESSION_TYPE = 'redis' class DevelopmentConfig(Config): ENV = 'development' SESSION_REDIS = Redis(host='192.168.11.213', port=6379) SESSION_KEY_PREFIX='lem' PYMYSQL_HOST = "localhost" PYMYSQL_PORT = 3306 PYMYSQL_USER = 'root' PYMYSQL_PASSWORD = '123456' PYMYSQL_DATABASE = 'flask' PYMYSQL_CHARSET = 'utf8'
类似于redis(注意app创建时间和连接池创建时调用app的配置信息的顺序)
pool.py
import pymysql from dbutils.pooled_db import PooledDB def init_pool(app): POOL = PooledDB( creator=pymysql, # 使用连接数据库的模块 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,连接池中至少创建的空闲的连接,0表示不创建 maxcached=5, # 连接池中最多闲置的连接,0和None不限制 # 连接池中最多共享的连接数量,0和None表示全部共享.PS:无用,因为pymysql和MYSQLdb等模块的threadsafety = 1,而threadsafety=1时_maxshared默认为0,所有永远都是所有连接都共享 maxshared=3, blocking=True, # 连接池中如果没有可用的连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage=None, # 一个连接池最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to...", "set time zone..."] # ping Mysql服务端,检查是否服务可用。如:0=None=never,1=default=whenever it is requested,2=when a cursor is created, 4=when a query is executed,7=always ping=0, # 工作中一般都是0/4/7 # 读取app的配置文件中的参数 host=app.config['PYMYSQL_HOST'], port=app.config['PYMYSQL_PORT'], user=app.config['PYMYSQL_USER'], password=app.config['PYMYSQL_PASSWORD'], database=app.config['PYMYSQL_DATABASE'], charset=app.config['PYMYSQL_CHARSET'] ) # 给app的上面的POOL添加到app的config中 app.config['PYMYSQL_POOL'] = POOL
__init__.py
from flask import Flask from .views import account, home import settings from flask_session import Session from apps.utils.mysql_pool import init_pool def create_user(): app = Flask(__name__) app.config.from_object(settings.DevelopmentConfig) app.register_blueprint(account.account) app.register_blueprint(home.home) # 将session替换成redis session # Session(app) init_pool(app) # 将app以参数的形式传到上面的init_pool中,该方法与上面Session(app)类似 return app
helper.py
import pymysql from flask import current_app # 创建一个操作sql的类,让需要执行sql的地方导入即可 class MysqlHelper: @staticmethod def open(): POOL = current_app.config['PYMYSQL_POOL'] conn = POOL.connection() cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) return conn, cursor @staticmethod def close(conn, cursor): conn.commit() cursor.close() conn.close() @classmethod def fetch_one(cls, sql, args): conn, cursor = cls.open() cursor.execute(sql, args) obj = cursor.fetchone() cls.close(conn, cursor) return obj @classmethod def fetch_all(cls, sql, args): conn, cursor = cls.open() cursor.execute(sql, args) obj = cursor.fetchall() cls.close(conn, cursor) return obj
作用:用于对python web框架做表单验证 使用: class MyForm(Form): user = 类(正则,插件) 字段 = 类(正则,插件) 字段 = 类(正则,插件) 字段 = 类(正则,插件) 字段 = 类(正则,插件) 字段 = 类(正则,插件) obj = MyForm() # 生成HTML标签 form.obj # 会调用类.__str__方法==> 插件.xx方法 # 验证 obj = MyForm(formdata=request.form) if form.validate(): # 内部找到所有的字段:user+用户发过来的数据=>正则校验
wtforms登录校验
from wtforms import Form from wtforms.fields import simple, core, html5 from wtforms import validators from wtforms import widgets # 前端提交post请求时不做校验 form表单加novalidate class LoginForm(Form): user = simple.StringField( label='用户名', validators=[ validators.DataRequired(message='用户名不能为空.'), # validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d') ], widget=widgets.TextInput(), render_kw={'class': 'form-control'} ) pwd = simple.PasswordField( label='密码', validators=[ validators.DataRequired(message='密码不能为空.'), # validators.Length(min=8, message='用户名长度必须大于%(min)d'), # validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}", # message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符') ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) @account.route('/login', methods=['GET', 'POST']) def login(): form = LoginForm() if request.method == 'POST': form = LoginForm(formdata=request.form) if form.validate(): print("用户提交数据通过校验,提交的数据为:", form.data) # {'user': 'hina', 'pwd': '123'} obj = MysqlHelper.fetch_one( "select id, username from user where username=%(username)s and password=%(password)s", form.data) if obj: session.permanent = True session['user_info'] = {'id': obj['id'], 'name': obj['username']} return redirect('/index') return render_template('login.html', form=form)
wtforms注册校验
from wtforms import Form, StringField, PasswordField, RadioField, SelectField, SelectMultipleField from wtforms.fields import simple, core, html5 from wtforms import validators from wtforms import widgets import email_validator class RegisterForm(Form): name = simple.StringField( label='用户名', validators=[ validators.DataRequired() ], widget=widgets.TextInput(), render_kw={'class': 'form-control'}, default='hina' ) pwd = simple.PasswordField( label='密码', validators=[ validators.DataRequired(message='密码不能为空.') ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) pwd_confirm = simple.PasswordField( label='重复密码', validators=[ validators.DataRequired(message='重复密码不能为空.'), validators.EqualTo('pwd', message="两次密码输入不一致") ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) email = html5.EmailField( label='邮箱', validators=[ validators.DataRequired(message='邮箱不能为空.'), validators.Email(message='邮箱格式错误'), ], widget=widgets.TextInput(input_type='email'), render_kw={'class': 'form-control'} ) gender = core.RadioField( label='性别', choices=( (1, '男'), (2, '女'), ), coerce=int ) city = core.SelectField( label='城市', choices=MysqlHelper.fetch_all("select id, name from city", {}, None), # # ('bj', '北京'), # ('sh', '上海'), # ) coerce = int ) hobby = core.SelectMultipleField( label='爱好', choices=( (1, '篮球'), (2, '足球'), ), coerce=int ) favor = core.SelectMultipleField( label='喜好', choices=( (1, '篮球'), (2, '足球'), ), widget=widgets.ListWidget(prefix_label=False), option_widget=widgets.CheckboxInput(), coerce=int, default=[1, 2] ) def __init__(self, *args, **kwargs): super(RegisterForm, self).__init__(*args, **kwargs) self.city.choices = MysqlHelper.fetch_all("select id, name from city", {}, None) def validate_pwd_confirm(self, field): """ 自定义pwd_confirm字段规则,例:与pwd字段是否一致 :param field: :return: """ # 最开始初始化时,self.data中已经有所有的值 if field.data != self.data['pwd']: # raise validators.ValidationError("密码不一致") # 继续后续验证 raise validators.StopValidation("密码不一致") # 不再继续后续验证 def validate_name(self, field): print(field.data) # 当前name传来的值 print(self.data) # 当前传来的所有值 name, gender .... obj = MysqlHelper.fetch_one("select id from user where username=%s", (field.data,)) if obj: # raise validators.ValidationError("用户名已存在") # 继续后续验证 raise validators.StopValidation("用户名已存在") # 不再继续后续验证 @account.route('/register', methods=['GET', 'POST']) def register(): form = RegisterForm() if request.method == 'POST': form = RegisterForm(formdata=request.form) if form.validate(): print(form.data) else: print(form.errors) return render_template('register.html', form=form)
# 由于在请求到来之前先要去数据库获取city的id和name所以又出现之前的获取不到current_app注册的配置PYMYSQL_POOL需要直接写在配置文件中导入 class DevelopmentConfig(Config): ENV = 'development' SESSION_REDIS = Redis(host='192.168.11.213', port=6379) SESSION_KEY_PREFIX = 'lem' PYMYSQL_HOST = "localhost" PYMYSQL_PORT = 3306 PYMYSQL_USER = 'root' PYMYSQL_PASSWORD = '123456' PYMYSQL_DATABASE = 'flask' PYMYSQL_CHARSET = 'utf8' PYMYSQL_CONFIG = {"host": 'localhost', "user": 'root', "password": '123456', 'port': 3306, 'db': 'flask', 'charset': 'utf8'} PYMYSQL_POOL = PooledDB( creator=pymysql, # 使用连接数据库的模块 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,连接池中至少创建的空闲的连接,0表示不创建 maxcached=5, # 连接池中最多闲置的连接,0和None不限制 # 连接池中最多共享的连接数量,0和None表示全部共享.PS:无用,因为pymysql和MYSQLdb等模块的threadsafety = 1,而threadsafety=1时_maxshared默认为0,所有永远都是所有连接都共享 maxshared=0, blocking=True, # 连接池中如果没有可用的连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage=None, # 一个连接池最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to...", "set time zone..."] # ping Mysql服务端,检查是否服务可用。如:0=None=never,1=default=whenever it is requested,2=when a cursor is created, 4=when a query is executed,7=always ping=0, # 工作中一般都是0/4/7 host="localhost", port=3306, user="root", password="123456", database="flask", charset="utf8", )
总结:
-上下文管理 -falsk-session -wtforms -name = simple.StringField() UnboundField(StringField, 计数器) -FormMeta.__call__