## 创建虚拟环境 python3 -m venv venv # 安装flask pip install --upgrade pip pip install -U setuptools pip install flask pip install watchd # 非必须
# 编写main.py # filename: main.py from flask import Flask app = Flask(__name__) @app.route("/") def hello_world(): return "<p>Hello, World!</p>" # 进入虚拟环境 source ./venv/bin/activate # 退出虚拟环境 deactivate # 启动flask服务(指定host和端口) flask run --host=0.0.0.0 --port=8080
pip install uwsgi yum install nginx
说明: 使用python3.6版本安装uwsgi会报错,升级到最新版本(python3.10)后安装正常(重新编译)
# 如下命令试过,都无效,重新编译才OK yum -y install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gdbm-devel db4-devel libpcap-devel xz-devel yum install -y gcc* pcre-devel openssl-devel yum install -y python-devel
# 编写 uwsgi 配置文件 [uwsgi] socket = 127.0.0.1:5000 # 如果不使用socket,直接用uwsgi启动程序,就是用http # http=0.0.0.0:5000 chdir = /data/proj/flask module = main:app processes = 2 threads = 2 master = true daemonize = /data/proj/flask/logs/uwsgi.log pidfile = uwsgi.pid virtualenv = /data/proj/flask/venv 各参数含义如下: - socket: 设定 Flask 的地址和端口号。 - chdir: 设定 Flask 应用的根目录 - module: 设定应用的入口文件及 Flask 对象, main为入口文件(main.py), app为文件中的Flask对象 - processes: 设定应用进程的数量。 - threads: 设定每个进程的线程数量。 - master: 设定是否启动主线程。 - daemonize: 设定日志的打印文件。 - pidfile: 设定主进程 pid 的写入文件。 - virtualenv: 设定虚拟环境的路径。 # 启动 uwsgi uwsgi --ini uwsgi.ini # 停止 uwsgi uwsgi --stop uwsgi.pid # 重启 uwsgi uwsgi --reload uwsgi.pid uwsgi --http-socket :8088 --wsgi-file test.py 或 uwsgi --http :8001 --wsgi-file test.py
server { # 监听端口 listen 80; # 监听ip 换成服务器公网IP server_name ***.***.***.***; #动态请求 location / { include uwsgi_params; uwsgi_pass 127.0.0.1:5000; } #静态请求 # location /static { # alias /root/face/server/static/; # } } # 启动 nginx nginx # 停止 nginx nginx -s stop # 重启 nginx nginx -s reload
本文只针对一些日常用到最基础的内容记录,更新参考官网,
@app.route("/me") def me_api(): user = get_current_user() return { "username": user.username, "theme": user.theme, "image": url_for("user_image", filename=user.image), } @app.route("/users") def users_api(): users = get_all_users() return jsonify([user.to_json() for user in users])
from markupsafe import escape @app.route('/user/<username>') def show_user_profile(username): # show the user profile for that user return f'User {escape(username)}' @app.route('/post/<int:post_id>') def show_post(post_id): # show the post with the given id, the id is an integer return f'Post {post_id}' @app.route('/path/<path:subpath>') def show_subpath(subpath): # show the subpath after /path/ return f'Subpath {escape(subpath)}'
转换器类型
from flask import request # 要操作 URL (如 ?key=value )中提交的参数可以使用 args 属性: searchword = request.args.get('key', '') # 处理json格式输入(如下为request 常见属性的打印) @app.route('/request', methods=['POST', 'GET']) def show_request(): return { "url": request.url, "method": request.method, "args": request.args, "headers": str(request.headers), "cookies": request.cookies "data": json.loads(request.data), "form": request.form, "files": request.files, } ## 操作 form 属性表单:request.form['username'] @app.route('/login', methods=['POST', 'GET']) def login(): error = None if request.method == 'POST': if valid_login(request.form['username'], request.form['password']): return log_the_user_in(request.form['username']) else: error = 'Invalid username/password' # the code below is executed if the request method # was GET or the credentials were invalid return render_template('login.html', error=error)
至此可实现一个简单的接受 json API请求,并返回json的后端服务
单纯一个flask后端服务,只能做简单的请求响应处理,但涉及到更广泛的应用诉求,就必须使用数据库。本文介绍 pymysql 的基础用法
pip3 install PyMySQL
如下为 简单示例,先在 Mysql 数据库中创建一个雇员的表格,并插入一条数据
mysql> DROP TABLE IF EXISTS EMPLOYEE; mysql> CREATE TABLE EMPLOYEE ( USER_NAME CHAR(20) NOT NULL, USER_ID CHAR(20) NOT NULL, AGE INT, SEX CHAR(1), INCOME FLOAT ); mysql> INSERT INTO EMPLOYEE(user_name, user_id, age, sex, income) VALUES ("coreylin", "coreylin", 10, M, 1000); mysql> describe EMPLOYEE; +-----------+----------+------+-----+---------+-------+ | Field | Type | Null | Key | Default | Extra | +-----------+----------+------+-----+---------+-------+ | USER_NAME | char(20) | NO | | NULL | | | USER_ID | char(20) | NO | | NULL | | | AGE | int(11) | YES | | NULL | | | SEX | char(1) | YES | | NULL | | | INCOME | float | YES | | NULL | | +-----------+----------+------+-----+---------+-------+ 5 rows in set (0.00 sec)
并在flask的入口文件(如本项目中:main.py)中增加如下查询接口
import pymysql db = pymysql.connect( host = "127.0.0.1", user = "root", password = "", database = "flask") @app.route("/query", methods=['POST', 'GET']) def query_db(): data = json.loads(request.data) user_name = data.get("user_name", None) if user_name is None: return { "message": "user_name not found", "data": [] } sql = 'select * from EMPLOYEE where USER_NAME="{}"'.format(user_name) cursor = db.cursor() cursor.execute(sql) results = cursor.fetchall() ret = { "message": "success", 'data': [] } for row in results: ret['data'].append({ "user_name": row[0], "user_id": row[1], "age": row[2], "sex": row[3], "income": row[4], }) return ret
这里主要介绍如何在flask中引用数据库,并提供了一个简单的demo,不过分去介绍各种数据库的框架,lib,模块,已经如何做数据库封装。这些可以在自己先手写几个方法后,慢慢找对应的资料去提升。
如下是PyMySQL调用的几个方法
#!/usr/bin/python3 import pymysql class Database(): # 打开数据库连接 db = pymysql.connect(host='localhost', user='root', password='', database='flask') # 使用 cursor() 方法创建一个游标对象 cursor cursor = db.cursor() # 使用 execute() 方法执行 SQL 查询 cursor.execute("SELECT VERSION()") # 使用 fetchone() 方法获取单条数据. data = cursor.fetchone() print ("Database version : %s " % data) # 关闭数据库连接 db.close() # 使用 fetchall() 方法获取全部返回。 data = cursor.fetchall()
走到这里,已经能够使用 flask 做最简单的操作数据库 CRUD 的后端服务。
对于需要登录的服务,最关键的部分并非一个登录界面,而是保持用户的登录态信息,确保每次请求都经过鉴权,这跟之前的内容有很大不同,之前的服务是“无状态”的,而一旦需要登录,意味着服务会变成一个“有状态”的服务。这里,我们可以用 “session”来存储登录态。
用户登录的步骤:
如下为创建,删除 登录 session 的例子,这里的例子中为了简便,直接使用 用户名称 作为 session id,这样很容易被猜到,实际上 session id 是一个临时的,有一定私密性的字段,一般是后台服务器生成,再提供给认证过后的客户端,客户端的每次请求都需要带上这个session id,作为这个用户访问后端服务的一个临时凭证。
from flask import session from flask import redirect, url_for # Set the secret key to some random bytes. Keep this really secret! app.secret_key = b'_5#y2L"F4Q8z\n\xec]/' @app.route('/user/<username>') def show_user_profile(username): # show the user profile for that user # return f'User {escape(username)}' return 'Hello {}<br> <a href=/logout> logout</a>'.format(username) # login @app.route("/") def index(): if 'username' in session: return f'Logged in as {session["username"]}' return redirect(url_for('login')) @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': print(request.form) session['username'] = request.form['username'] return redirect(url_for('show_user_profile', username=session['username'])) return ''' <form method="post"> <p><input type=text name=username> <p><input type=submit value=Login> </form> ''' @app.route('/logout') def logout(): # remove the username from the session if it's there session.pop('username', None) return redirect(url_for('login'))