pip install django==2.0 pip install djangorestframework==3.10.0 pip install pymysql==1.0.2
from django.db import models from werkzeug.security import generate_password_hash, check_password_hash class User(models.Model): db_table = 'adminuser_user' id = models.AutoField(primary_key=True) username = models.CharField(max_length=30) password_hash = models.TextField() root = models.IntegerField() email = models.EmailField(max_length=30, default='') mobile = models.CharField(max_length=11, default='') status = models.IntegerField(default=0) @property def password(self): raise AttributeError('Can not read password!') @password.setter def password(self, password): self.password_hash = generate_password_hash(password) def verify_password(self, password): return check_password_hash(self.password_hash, password)
from django.urls import path,re_path from . import views, loginView urlpatterns = [ # 登录接口 path('admin/login', loginView.LoginView.as_view()), path('admin/users', views.UserView.as_view()), ] app_name = 'Admin'
-------------------------------------------loginView.py------------------------------------------- from .models import User import datetime from rest_framework.response import Response from rest_framework.views import APIView import jwt from PetHome import settings class LoginView(APIView): def post(self, request): ret = { "data": {}, "meta": { "status": 200, "message": "" } } try: username = request.data["username"] password = request.data["password"] print(username, password) if username and password: user = User.objects.filter(username=username) print(2) if user.count == 0: print(3) ret["meta"]["status"] = 500 ret["meta"]["message"] = "用户不存在或密码错误" return Response(ret) elif user and user.first().verify_password(password): print(4) dict = { "exp": datetime.datetime.now() + datetime.timedelta(days=1), # 过期时间 "iat": datetime.datetime.now(), # 开始时间 "id": user.first().id, "username": user.first().username, "mobile": user.first().mobile } token = jwt.encode(dict, settings.SECRET_KEY, algorithm="HS256") ret["data"]["token"] = token ret["data"]["username"] = user.first().username ret["data"]["user_id"] = user.first().id ret["meta"]["status"] = 200 ret["meta"]["message"] = "登录成功" return Response(ret) else: ret["meta"]["status"] = 500 ret["meta"]["message"] = "用户不存在或密码错误" return Response(ret) except Exception as error: print(error) ret["meta"]["status"] = 500 ret["meta"]["message"] = "用户不存在或密码错误" return Response(ret) -------------------------------------------views.py------------------------------------------- from werkzeug.security import generate_password_hash from .TokenAuthtication import TokenAuthtication from rest_framework.response import Response from rest_framework.views import APIView from adminuser.serializer.userSerializers import * from rest_framework.pagination import PageNumberPagination from collections import OrderedDict from .models import User class UserView(APIView): authentication_classes = [TokenAuthtication, ] def get(self, request, *args, **kwargs): """ 获取用户列表 :param request: :param args: :param kwargs: :return: """ pk = kwargs.get('pk') # 获取单挑数据 id 值 query = request.GET.get('query') print(query) ret = { "data": { "users": [] }, "meta": { "status": 200, "message": "" } } page = LargeResultsSetPagination() if not pk: if not query: queryset = User.objects.all().order_by("-id") page_user = page.paginate_queryset(queryset=queryset, request=request, view=self) ser = UserSerializer(instance=page_user, many=True) ret["data"]["users"] = ser.data else: queryset = User.objects.filter(Q(mobile__contains=query) | Q(username__contains=query) | Q(email__contains=query)).order_by("-id") page_user = page.paginate_queryset(queryset=queryset, request=request, view=self) ser = UserSerializer(instance=page_user, many=True) ret["data"]["users"] = ser.data return page.get_paginated_response(ret) else: if User.objects.filter(pk=pk): obj_dict = User.objects.filter(pk=pk).first() ser = UserSerializer(instance=obj_dict, many=False) ret["data"]["users"] = ser.data else: ret["meta"]["status"] = 500 ret["meta"]["message"] = "系统报错" return Response(ret) -------------------------------------------TokenAuthtication.py------------------------------------------- from rest_framework.authentication import BaseAuthentication from rest_framework import exceptions import time from PetHome import settings import jwt from .models import User class TokenAuthtication(BaseAuthentication): def authenticate(self, request): """ 先获取 header 中的token进行解析,在进行校验 - 获取token,检查过期时间是否大于当前时间 :param request: :return: """ try: # header 中的token 在 request.MEAT.get("HTTP_TOKEN")获取 token = request.META.get("HTTP_TOKEN") print("检查token:" + token) data = jwt.decode(token, settings.SECRET_KEY, 'HS256') print(data) if data["exp"] < int(time.time()): raise exceptions.AuthenticationFailed("登录超时,请重新登录") elif User.objects.filter(username=data["username"]).count == 0: raise exceptions.AuthenticationFailed("认证用户不存在") except Exception as error: print(error) raise exceptions.AuthenticationFailed("用户未登录,请登录")
http://127.0.0.1:8089/admin/login
http://127.0.0.1:8089/admin/users
1、用户未登录时 (请求headers未携带token)
2、用户已经登 (请求headers携带token)
# django restframework 配置 REST_FRAMEWORK = { 'DEFAULT_AUTHENTICATION_CLASSES': ['adminuser.TokenAuthtication.TokenAuthtication', ] }
1、全局配置用户认证后,所有的接口类都不需要添加 authentication_classes = [TokenAuthtication, ] 属性,默认所有接口都会进行登录校验
------------------------------------------例子1:views.py-------------------------------------------
from werkzeug.security import generate_password_hash from .TokenAuthtication import TokenAuthtication from rest_framework.response import Response from rest_framework.views import APIView from adminuser.serializer.userSerializers import * from rest_framework.pagination import PageNumberPagination from collections import OrderedDict from .models import User class UserView(APIView): def get(self, request, *args, **kwargs): """ 获取用户列表 :param request: :param args: :param kwargs: :return: """ pk = kwargs.get('pk') # 获取单挑数据 id 值 query = request.GET.get('query') print(query) ret = { "data": { "users": [] }, "meta": { "status": 200, "message": "" } } page = LargeResultsSetPagination() if not pk: if not query: queryset = User.objects.all().order_by("-id") page_user = page.paginate_queryset(queryset=queryset, request=request, view=self) ser = UserSerializer(instance=page_user, many=True) ret["data"]["users"] = ser.data else: queryset = User.objects.filter(Q(mobile__contains=query) | Q(username__contains=query) | Q(email__contains=query)).order_by("-id") page_user = page.paginate_queryset(queryset=queryset, request=request, view=self) ser = UserSerializer(instance=page_user, many=True) ret["data"]["users"] = ser.data return page.get_paginated_response(ret) else: if User.objects.filter(pk=pk): obj_dict = User.objects.filter(pk=pk).first() ser = UserSerializer(instance=obj_dict, many=False) ret["data"]["users"] = ser.data else: ret["meta"]["status"] = 500 ret["meta"]["message"] = "系统报错" return Response(ret)
2、为解决部分接口不需要登录验证问题,只需要在该类中设置 authentication_classes = [] ,[] 表示不需要认证
------------------------------------------例子2:views.py-------------------------------------------
from werkzeug.security import generate_password_hash from .TokenAuthtication import TokenAuthtication from rest_framework.response import Response from rest_framework.views import APIView from adminuser.serializer.userSerializers import * from rest_framework.pagination import PageNumberPagination from collections import OrderedDict from .models import User class UserView(APIView): authentication_classes = [] def get(self, request, *args, **kwargs): """ 获取用户列表 :param request: :param args: :param kwargs: :return: """ pk = kwargs.get('pk') # 获取单挑数据 id 值 query = request.GET.get('query') print(query) ret = { "data": { "users": [] }, "meta": { "status": 200, "message": "" } } page = LargeResultsSetPagination() if not pk: if not query: queryset = User.objects.all().order_by("-id") page_user = page.paginate_queryset(queryset=queryset, request=request, view=self) ser = UserSerializer(instance=page_user, many=True) ret["data"]["users"] = ser.data else: queryset = User.objects.filter(Q(mobile__contains=query) | Q(username__contains=query) | Q(email__contains=query)).order_by("-id") page_user = page.paginate_queryset(queryset=queryset, request=request, view=self) ser = UserSerializer(instance=page_user, many=True) ret["data"]["users"] = ser.data return page.get_paginated_response(ret) else: if User.objects.filter(pk=pk): obj_dict = User.objects.filter(pk=pk).first() ser = UserSerializer(instance=obj_dict, many=False) ret["data"]["users"] = ser.data else: ret["meta"]["status"] = 500 ret["meta"]["message"] = "系统报错" return Response(ret)
不传 token 也能返回数据啦