1.专门用来存储用户信息的g对象,g的全称的为global,g对象是全局的 2.g对象在一次请求中的所有的代码的地方,都是可以使用的,g对象在当次请求中一直有效
1.session对象是可以跨request的,只要session还未失效,不同的request的请求会获取到同一个session, 2.但是g对象不是,g对象不需要管过期时间,请求一次就g对象就改变了一次,或者重新赋值了一次
from flask import Flask,g,request,session app = Flask(__name__) @app.before_request def first(): session['name']='dlrb' request.form='egon' g.name='lqz' @app.after_request def after(response): print('11111',g.name) return response @app.route('/') def hello_world(): print('00000',g.name) return 'Hello World!' if __name__ == '__main__': app.run()
作用:将默认保存的签名cookie中的值 保存到 redis/memcached/file/Mongodb/SQLAlchemy
安装:pip3 install flask-session
from flask import Flask,g,request,session from flask_session import RedisSessionInterface app = Flask(__name__) app.debug=True # 开启debug,没上线为True,方便查询错误 app.secret_key='asdfasdfasdf' # 密钥 # 方式一 from redis import Redis conn=Redis(host='127.0.0.1',port=6379) # 使用第三方查询RedisSessionInterface进行将session存入redis app.session_interface=RedisSessionInterface(redis=conn,key_prefix='flask_session') # redis : redis地址,端口(不填,默认本地) # key_prefix : 前缀 @app.route('/') def hello_world(): session['name']='lqz' return 'Hello World!' if __name__ == '__main__': app.run()
from flask_session import Session from redis import Redis app.config['SESSION_TYPE'] = 'redis' app.config['SESSION_KEY_PREFIX']='flask_session' app.config['SESSION_REDIS'] = Redis(host='127.0.0.1',port='6379') Session(app) # 将app传入session内 @app.route('/') def hello_world(): session['name']='lqz' return 'Hello World!' if __name__ == '__main__': app.run()
#源码expires = self.get_expiration_time(app, session) 'PERMANENT_SESSION_LIFETIME': timedelta(days=31),#这个配置文件控制
app.session_interface=RedisSessionInterface(conn,key_prefix='lqz',permanent=False) # permanent=False 的情况下就会关闭浏览器,cookie失效
from flask import Flask import time import pymysql app = Flask(__name__) app.debug=True app.secret_key='asdfasdfasdf' @app.route('/') def hello_world(): # pymysql连接数据库(指定数据库信息) conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', password='1', database='luffy') cursor = conn.cursor() # 获得游标对象 cursor.execute('select * from luffy_order') # 查询luffy_order表 time.sleep(1) print(cursor.fetchall()) # 获取所有 return 'Hello World!' if __name__ == '__main__': app.run()
# 问题: 1.如果使用全局连接对象,会导致数据错乱 # 问题二: 2.如果在视图函数中创建数据库连接对象,会导致连接数过多
使用数据库连接池 DBUtils
from dbutils.pooled_db import PooledDB import pymysql POOL=PooledDB( creator=pymysql, # 使用链接数据库的模块 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=5, # 链接池中最多闲置的链接,0和None不限制 maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='1', database='luffy', charset='utf8') # 导入进程 from threading import Thread def task(): # 去池中获取连接 conn = POOL.connection() # 获取游标 cursor = conn.cursor() cursor.execute('select * from luffy_order') # 查询luffy_order表 print(cursor.fetchall()) # 获取所有 for i in range(100): # 循环100个进程 t=Thread(target=task) # 进程执行 t.start() # mysql可以看到当前有多少个连接数
# Flask框架中的信号基于blinker,其主要就是让开发者可是在flask执行过程中定制一些用户行为
# pip3 install blinker ## flask中有内置信号 # 什么时候触发的 request_started = _signals.signal('request-started') # 请求到来前执行 request_finished = _signals.signal('request-finished') # 请求结束后执行 before_render_template = _signals.signal('before-render-template') # 模板渲染前执行 template_rendered = _signals.signal('template-rendered') # 模板渲染后执行 got_request_exception = _signals.signal('got-request-exception') # 请求执行出现异常时执行 request_tearing_down = _signals.signal('request-tearing-down') # 请求执行完毕后自动执行(无论成功与否) appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 应用上下文执行完毕后自动执行(无论成功与否) appcontext_pushed = _signals.signal('appcontext-pushed') # 应用上下文push时执行 appcontext_popped = _signals.signal('appcontext-popped') # 应用上下文pop时执行 message_flashed = _signals.signal('message-flashed') # 调用flask在其中添加数据时,自动触发
1.写一个函数 2.跟内置信号绑定 3.以后只要触发内置信号,函数就会执行
from flask import Flask,signals,render_template from flask.signals import _signals app = Flask(__name__) # 往信号中注册函数 def func(*args,**kwargs): print('触发型号',args,kwargs) # 信号一般用来记录日志 # signals信号.内置信号(请求到来前执行).connect(执行函数) signals.request_started.connect(func) # 给模板渲染前编写信号 def template_before(*args,**kwargs): print(args) print(kwargs) print('模板开始渲染了') # signals信号.内置信号(模板渲染前执行).connect(执行函数) signals.before_render_template.connect(template_before)
1 写一个信号 2 写一个函数 3 信号绑定函数 4 触发信号
# 自定义信号 # 自定制信号 = signals.signal('自定制信号名称') before_view = _signals.signal('before_view') # 写函数 def test(*args,**kwargs): print('我执行了') print(args) print(kwargs) # 绑定给信号 # before_view信号.connect(执行函数) before_view.connect(test) @app.route('/index',methods=['GET',"POST"]) def index1(): # 触发信号 # before_view信号.send发送(关键字,关键字) before_view.send(name='lqz',age=19) print('视图') return render_template('index.html',a='lqz') if __name__ == '__main__': app.run(port=8080) app.__call__
# 1.用于实现类似于django中 python3 manage.py runserver ...类似的命令
pip3 install flask-script
from flask_script import Manager from flask import Flask app = Flask(__name__) # 传入app生成flask_script对象 manager=Manager(app) if __name__ == '__main__': manager.run() #python3 manage.py runserver --help
以后执行(启动)直接:python3 manage.py runserver
@manager.command def custom(arg): """ 自定义命令 python manage.py custom 123 :param arg: :return: """ print(arg) @manager.option('-n', '--name', dest='name') @manager.option('-u', '--url', dest='url') def cmd(name, url): """ 自定义命令(-n也可以写成--name) 执行: python manage.py cmd -n lqz -u http://www.oldboyedu.com 执行: python manage.py cmd --name lqz --url http://www.oldboyedu.com :param name: :param url: :return: """ print(name, url)
1.可以把excel的数据导入数据库,定制个命令,去执行
1.orm框架SQLAlchemy,第三方,独立使用,集成到web框架中 2.django的orm框架
pip install SQLAlchemy
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Book,Hobby,Person # 指定create_engine对象,sqlalchemy指定数据库 engine = create_engine("mysql+pymysql://root:1@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5) # 连接池大小 # bind绑定engine # 生成Connection对象 Connection = sessionmaker(bind=engine) # 每次执行数据库操作时,都需要创建一个Connection con = Connection() # ############# 执行ORM操作 ############# # 单表插入一条数据 # book=Book(name='金',price=11) # con.add(book) ## 一对多关系插入 # hobby=Hobby(caption='足球') # person=Person(name='lqz',hobby_id=1) # con.add(hobby) # con.add(person) # 一对多插入 # hobby=Hobby(caption='橄榄球') # person=Person(name='egon',hobby=hobby) # con.add(hobby) # con.add(person) # 查询egon # egon=con.query(Person).filter_by(name='egon').first() # print(egon.hobby_id) # print(egon.hobby.caption) # 拿到hobby对象 ,正向查询:字段名 glq=con.query(Hobby).filter_by(caption='橄榄球').first() pers=glq.pers #反向查询,按 backref='pers' print(pers) for p in pers: print(p.name) # 提交事务 con.commit() # 关闭session,其实是将连接放回连接池 con.close()
import datetime from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index from sqlalchemy.orm import relationship Base = declarative_base() class Users(Base): # Base基类(相当于Django中的models.MODELS) __tablename__ = 'users' # 数据库表名称 id = Column(Integer, primary_key=True) # id 主键 name = Column(String(32), index=True, nullable=False) # name列,索引,不可为空 email = Column(String(32), unique=True) #datetime.datetime.now不能加括号,加了括号,以后永远是当前时间 ctime = Column(DateTime, default=datetime.datetime.now) extra = Column(Text, nullable=True) __table_args__ = ( # id和name (联合唯一名称:uix_id_name) UniqueConstraint('id', 'name', name='uix_id_name'), # name和email是联合索引 索引名称(ix_id_name) Index('ix_id_name', 'name', 'email'), #索引 ) class Book(Base): # 表模型 __tablename__ = 'books' id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False) price=Column(Integer) # 一对多关系 class Hobby(Base): # 表模型 __tablename__ = 'hobby' id = Column(Integer, primary_key=True) caption = Column(String(50), default='篮球') class Person(Base): __tablename__ = 'person' nid = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=True) # hobby指的是tablename而不是类名,uselist=False hobby_id = Column(Integer, ForeignKey("hobby.id")) # 外键 # 跟数据库无关,不会新增字段,只用于快速链表操作 # 类名,backref用于反向查询 hobby = relationship('Hobby', backref='pers') def init_db(): """ 根据类创建数据库表 :return: """ engine = create_engine( "mysql+pymysql://root:1@127.0.0.1:3306/aaa?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) Base.metadata.create_all(engine) def drop_db(): """ 根据类删除数据库表 :return: """ engine = create_engine( "mysql+pymysql://root:1@127.0.0.1:3306/aaa?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) Base.metadata.drop_all(engine) if __name__ == '__main__': # drop_db() # 删除表 init_db() # 创建表模型