前后端JWT认证使用accessToken与refreshToken实现无感刷新

之前写过一篇文章介绍JWT和使用refreshToken实现无感刷新的文章 → JWT实现refresh token与前端无感刷新。这篇文章主要是前后端实现的具体代码。

前端环境:Vue 2.7 + TypeScript 2.8.0 + Axios 0.27 + Vuex 3.6

后端环境:Python 3.11 + Sanic 23.3.0 + PyJWT 2.0.0


前端主要代码:axios.ts

import axios, { AxiosRequestConfig, AxiosResponse } from 'axios';

import { refreshToken } from '@/api';
import { getQueryString, validateJWT } from '@/utils/helpers';

axios.interceptors.request.use(
  (config: AxiosRequestConfig<KubeRequest>): AxiosRequestConfig<KubeRequest> | Promise<any> => {

    if (window.localStorage.getItem('JWT')) {
      config.headers.Authorization = `Bearer ${window.localStorage.getItem('JWT')}`;
    }
    ......
    config.url = encodeURI(config.url);
    return config;
  },
);

interface PendingTask {
  config: AxiosRequestConfig;
  resolve: Function;
}

// 是否正在刷新的标记
let refreshing = false;
// 重试队列,每一项将是一个待执行的函数形式
const queue: PendingTask[] = [];

axios.interceptors.response.use(
  (response: AxiosResponse<KubeResponse<any, any>, KubeRequest>): Promise<KubeResponse<any, any>> => {
  ......
  },
  async (error: any): Promise<unknown> => {
  
    if (!(error && error.response)) {
    ......
    } else {
      switch (error.response.status) {
        case 400:
          .....
          break;
        case 401:
          const { config } = error.response;

          if (refreshing) {
            return new Promise((resolve) => {
              queue.push({
                config,
                resolve,
              });
            });
          }

          if (
            !validateJWT(window.localStorage.getItem('JWT')) &&
            ['/login', '/refresh_token', '/403', '/404'].indexOf(window.location.pathname) === -1
          ) {
            refreshing = true;
            const res = await refreshToken({ refresh_token: window.localStorage.getItem('refreshtoken') });
            refreshing = false;
            if (res.code === 1) {
              const { token, refresh_token } = res;
              store.commit('SET_JWT', token);
              store.commit('SET_RefreshToken', refresh_token);
              error.config.headers.Authorization = `Bearer ${window.localStorage.getItem('JWT')}`;
              // 已经刷新了token,将所有队列中的请求进行重试
              queue.forEach(({ config, resolve }) => {
                resolve(axios(config));
              });
              return axios(error.config);
            } else {
              console.log('refresh_token过期,重新登陆');
              store.commit('SET_SNACKBAR', {
                text: i18n.t('tip.401'),
                color: 'warning',
              });
              store.commit('CLEARALL');
              if (
                [
                  '/login',
                  '/refresh_token',
                  '/403',
                  '/404',
                  '/white/page',
                  '/white/tenant',
                  '/whitecluster/cluster',
                ].indexOf(window.location.pathname) === -1
              ) {
                router.push({
                  name: 'login',
                  query: {
                    redirect: `${window.location.pathname}${window.location.search}`,
                  },
                });
              }
            }
          }
          if (window.location.pathname.startsWith('/oauth/callback')) {
            router.push({
              name: 'login',
            });
          }
          break;
 
        case 504:
          ......
          break;
        default:
          store.commit('SET_SNACKBAR', {
            text: i18n.t('tip.unknown_error'),
            color: 'error',
          });
          break;
      }
    }
    return new Promise(() => {
      return;
    });
  },
);

refreshToken:

// 刷新token
export const refreshToken = (body: { [key: string]: any } = {}): Promise<{ [key: string]: any }> =>
axios.post('api/v1/system/refresh_token/', body);

// validateJWT
export function validateJWT(jwt: string): boolean {
  if (!jwt) return false;
  const jwtInfo: any = JSON.parse(window.atob(jwt.split('.')[1]));
  const now: number = Date.parse(new Date().toString()) / 1000;
  return jwtInfo.exp > now;
}

然后就是在登陆认证通过后设置accessToken和refreshToken,退出后清空所有token。

SET_JWT(state: { [key: string]: any }, jwt: string): void {
  state.JWT = jwt;
  window.localStorage.setItem(JWTName, jwt);
},
SET_RefreshToken(state: { [key: string]: any }, refreshtoken: string): void {
  state.RefreshToken = refreshtoken;
  window.localStorage.setItem(refreshToken, refreshtoken);
},
CLEARALL(state: { [key: string]: any }): void {
  delAllCookie();
  const locale = window.localStorage.getItem(Locale) || 'zh-Hans';
  window.localStorage.clear();
  state.JWT = '';
  state.RefreshToken = '';
  state.User = {};
  state.Username = "";
  ......

后端主要代码:jwt2.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

""" jwt operators """

import base64
import json
import time
import cnf
from cnf.system import SystemCnf
import datetime
import jwt


def gen_token(username, access_token_expire_in, refresh_token_expire_in):
    """ generate jwt token """
    try:
        payload = {
            'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=access_token_expire_in),
            'iat': datetime.datetime.utcnow(),
            'iss': 'ken',
            'username': username,
            # "permissions": permissions,
            # "roles": roles
        }
        access_token = jwt.encode(
            payload,
            cnf.JWT_SECRET_KEY,
            algorithm='HS256'
        )
        payload2 = {
            'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=refresh_token_expire_in),
            'iat': datetime.datetime.utcnow(),
            'iss': 'ken',
            'username': username,
        }
        # 生成refresh token
        refresh_token = jwt.encode(payload2, cnf.JWT_SECRET_KEY, algorithm='HS256')
        return True, access_token, refresh_token
    except Exception as e:
        return e

def re_gen_token(request):
    """ 刷新token """
    refresh_token = request.json.get('refresh_token', '')
    try:
        payload = jwt.decode(refresh_token, cnf.JWT_SECRET_KEY, algorithms=['HS256'])
        # 生成新的access token 和 refresh_token
        success, token, refresh_token = gen_token(payload['username'], SystemCnf.token_plus, SystemCnf.refresh_plus)
        return success, {"code": 1, "token": token, "refresh_token": refresh_token}
    except jwt.ExpiredSignatureError:
        return False, 'Refresh token has expired'
    except jwt.InvalidTokenError:
        return False, 'Invalid refresh token'

# accessToken & refreshToken过期时间设置
class SystemCnf:
    access_token_expires_in = xxx
    refresh_token_expires_in = xxx

    token_plus = xxx
    refresh_plus = xxx

登陆相关操作:login.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

""" 登录相关的操作 """

import datetime
from sanic.response import HTTPResponse
from sanic.request import Request
from sanic import Blueprint, response
......
from libs.common.jwt2 import gen_token, re_gen_token
from cnf.system import SystemCnf
bp = Blueprint('system-login', url_prefix='api/v1/system')

@bp.route('/login', name='login', methods=['POST'])
async def login(request: Request) -> HTTPResponse:
    """ 登录 """
    username = request.json.get("username", "")
    password = request.json.get("password", "")
    if not username or not password:
        return response.json({"code": 0, "message": "用户名密码不能为空"})

    # 登陆认证
    ......

    success, token, refresh_token = gen_token(username, SystemCnf.access_token_expires_in, SystemCnf.refresh_token_expires_in)
    if success:
        return response.json({"code": 1, "token": token, "refresh_token": refresh_token, "permissions": permissions, "roles": roles})
    else:
        return response.json({"code": 0, 'message': 'Failed to create token'})

@bp.route('/refresh_token', name='refresh_token', methods=['POST'])
async def refresh_token(request: Request) -> HTTPResponse:
    """ 刷新token """
    success, resp = re_gen_token(request)
    if success:
        return response.json(resp)
    else:
        return response.json({"code": 0, 'message': resp})

登陆拦截放开/refresh_token接口并使用JWT认证:

import jwt

@app.middleware('request')
def setup_context(request) -> None:
    """ token校验 """
    if request.path not in cnf.EXCLUDE_PATH and not request.path.startswith("/api/v1/xxx/"):
        try:
            data = jwt.decode(request.token, cnf.JWT_SECRET_KEY, algorithms=['HS256'])
        except jwt.ExpiredSignatureError:
            return response.json({"message": "token过期了"}, status=401)
        except Exception as e:
            return response.json(
                {
                    "message": "Do not access the platform with a token that is modified privately"
                },
                status=401
            )
        else:
            request.app.ctx.username = data["username"]

查看效果:

前后端JWT认证使用accessToken与refreshToken实现无感刷新

前后端JWT认证使用accessToken与refreshToken实现无感刷新


参考:Python 高性能异步框架 Sanic

anzhihe 安志合个人博客,版权所有 丨 如未注明,均为原创 丨 转载请注明转自:https://chegva.com/6042.html | ☆★★每天进步一点点,加油!★★☆ | 

您可能还感兴趣的文章!

发表评论

电子邮件地址不会被公开。 必填项已用*标注