之前写过一篇文章介绍JWT和使用refreshToken实现无感刷新的文章 → JWT实现refresh token与前端无感刷新。这篇文章主要是前后端实现的具体代码。
前端环境:Vue 2.7 + TypeScript 2.8.0 + Axios 0.27 + Vuex 3.6
后端环境:Python 3.11 + Sanic 23.3.0 + PyJWT 2.0.0
前端主要代码:axios.ts
import axios, { AxiosRequestConfig, AxiosResponse } from 'axios'; import { refreshToken } from '@/api'; import { getQueryString, validateJWT } from '@/utils/helpers'; axios.interceptors.request.use( (config: AxiosRequestConfig<KubeRequest>): AxiosRequestConfig<KubeRequest> | Promise<any> => { if (window.localStorage.getItem('JWT')) { config.headers.Authorization = `Bearer ${window.localStorage.getItem('JWT')}`; } ...... config.url = encodeURI(config.url); return config; }, ); interface PendingTask { config: AxiosRequestConfig; resolve: Function; } // 是否正在刷新的标记 let refreshing = false; // 重试队列,每一项将是一个待执行的函数形式 const queue: PendingTask[] = []; axios.interceptors.response.use( (response: AxiosResponse<KubeResponse<any, any>, KubeRequest>): Promise<KubeResponse<any, any>> => { ...... }, async (error: any): Promise<unknown> => { if (!(error && error.response)) { ...... } else { switch (error.response.status) { case 400: ..... break; case 401: const { config } = error.response; if (refreshing) { return new Promise((resolve) => { queue.push({ config, resolve, }); }); } if ( !validateJWT(window.localStorage.getItem('JWT')) && ['/login', '/refresh_token', '/403', '/404'].indexOf(window.location.pathname) === -1 ) { refreshing = true; const res = await refreshToken({ refresh_token: window.localStorage.getItem('refreshtoken') }); refreshing = false; if (res.code === 1) { const { token, refresh_token } = res; store.commit('SET_JWT', token); store.commit('SET_RefreshToken', refresh_token); error.config.headers.Authorization = `Bearer ${window.localStorage.getItem('JWT')}`; // 已经刷新了token,将所有队列中的请求进行重试 queue.forEach(({ config, resolve }) => { resolve(axios(config)); }); return axios(error.config); } else { console.log('refresh_token过期,重新登陆'); store.commit('SET_SNACKBAR', { text: i18n.t('tip.401'), color: 'warning', }); store.commit('CLEARALL'); if ( [ '/login', '/refresh_token', '/403', '/404', '/white/page', '/white/tenant', '/whitecluster/cluster', ].indexOf(window.location.pathname) === -1 ) { router.push({ name: 'login', query: { redirect: `${window.location.pathname}${window.location.search}`, }, }); } } } if (window.location.pathname.startsWith('/oauth/callback')) { router.push({ name: 'login', }); } break; case 504: ...... break; default: store.commit('SET_SNACKBAR', { text: i18n.t('tip.unknown_error'), color: 'error', }); break; } } return new Promise(() => { return; }); }, );
refreshToken:
// 刷新token export const refreshToken = (body: { [key: string]: any } = {}): Promise<{ [key: string]: any }> => axios.post('api/v1/system/refresh_token/', body); // validateJWT export function validateJWT(jwt: string): boolean { if (!jwt) return false; const jwtInfo: any = JSON.parse(window.atob(jwt.split('.')[1])); const now: number = Date.parse(new Date().toString()) / 1000; return jwtInfo.exp > now; }
然后就是在登陆认证通过后设置accessToken和refreshToken,退出后清空所有token。
SET_JWT(state: { [key: string]: any }, jwt: string): void { state.JWT = jwt; window.localStorage.setItem(JWTName, jwt); }, SET_RefreshToken(state: { [key: string]: any }, refreshtoken: string): void { state.RefreshToken = refreshtoken; window.localStorage.setItem(refreshToken, refreshtoken); }, CLEARALL(state: { [key: string]: any }): void { delAllCookie(); const locale = window.localStorage.getItem(Locale) || 'zh-Hans'; window.localStorage.clear(); state.JWT = ''; state.RefreshToken = ''; state.User = {}; state.Username = ""; ......
后端主要代码:jwt2.py
#!/usr/bin/env python # -*- coding: utf-8 -*- """ jwt operators """ import base64 import json import time import cnf from cnf.system import SystemCnf import datetime import jwt def gen_token(username, access_token_expire_in, refresh_token_expire_in): """ generate jwt token """ try: payload = { 'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=access_token_expire_in), 'iat': datetime.datetime.utcnow(), 'iss': 'ken', 'username': username, # "permissions": permissions, # "roles": roles } access_token = jwt.encode( payload, cnf.JWT_SECRET_KEY, algorithm='HS256' ) payload2 = { 'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=refresh_token_expire_in), 'iat': datetime.datetime.utcnow(), 'iss': 'ken', 'username': username, } # 生成refresh token refresh_token = jwt.encode(payload2, cnf.JWT_SECRET_KEY, algorithm='HS256') return True, access_token, refresh_token except Exception as e: return e def re_gen_token(request): """ 刷新token """ refresh_token = request.json.get('refresh_token', '') try: payload = jwt.decode(refresh_token, cnf.JWT_SECRET_KEY, algorithms=['HS256']) # 生成新的access token 和 refresh_token success, token, refresh_token = gen_token(payload['username'], SystemCnf.token_plus, SystemCnf.refresh_plus) return success, {"code": 1, "token": token, "refresh_token": refresh_token} except jwt.ExpiredSignatureError: return False, 'Refresh token has expired' except jwt.InvalidTokenError: return False, 'Invalid refresh token' # accessToken & refreshToken过期时间设置 class SystemCnf: access_token_expires_in = xxx refresh_token_expires_in = xxx token_plus = xxx refresh_plus = xxx
登陆相关操作:login.py
#!/usr/bin/env python # -*- coding: utf-8 -*- """ 登录相关的操作 """ import datetime from sanic.response import HTTPResponse from sanic.request import Request from sanic import Blueprint, response ...... from libs.common.jwt2 import gen_token, re_gen_token from cnf.system import SystemCnf bp = Blueprint('system-login', url_prefix='api/v1/system') @bp.route('/login', name='login', methods=['POST']) async def login(request: Request) -> HTTPResponse: """ 登录 """ username = request.json.get("username", "") password = request.json.get("password", "") if not username or not password: return response.json({"code": 0, "message": "用户名密码不能为空"}) # 登陆认证 ...... success, token, refresh_token = gen_token(username, SystemCnf.access_token_expires_in, SystemCnf.refresh_token_expires_in) if success: return response.json({"code": 1, "token": token, "refresh_token": refresh_token, "permissions": permissions, "roles": roles}) else: return response.json({"code": 0, 'message': 'Failed to create token'}) @bp.route('/refresh_token', name='refresh_token', methods=['POST']) async def refresh_token(request: Request) -> HTTPResponse: """ 刷新token """ success, resp = re_gen_token(request) if success: return response.json(resp) else: return response.json({"code": 0, 'message': resp})
登陆拦截放开/refresh_token接口并使用JWT认证:
import jwt @app.middleware('request') def setup_context(request) -> None: """ token校验 """ if request.path not in cnf.EXCLUDE_PATH and not request.path.startswith("/api/v1/xxx/"): try: data = jwt.decode(request.token, cnf.JWT_SECRET_KEY, algorithms=['HS256']) except jwt.ExpiredSignatureError: return response.json({"message": "token过期了"}, status=401) except Exception as e: return response.json( { "message": "Do not access the platform with a token that is modified privately" }, status=401 ) else: request.app.ctx.username = data["username"]
查看效果: