之前写过一篇文章介绍JWT和使用refreshToken实现无感刷新的文章 → JWT实现refresh token与前端无感刷新。这篇文章主要是前后端实现的具体代码。
前端环境:Vue 2.7 + TypeScript 2.8.0 + Axios 0.27 + Vuex 3.6
后端环境:Python 3.11 + Sanic 23.3.0 + PyJWT 2.0.0
前端主要代码:axios.ts
import axios, { AxiosRequestConfig, AxiosResponse } from 'axios';
import { refreshToken } from '@/api';
import { getQueryString, validateJWT } from '@/utils/helpers';
axios.interceptors.request.use(
(config: AxiosRequestConfig<KubeRequest>): AxiosRequestConfig<KubeRequest> | Promise<any> => {
if (window.localStorage.getItem('JWT')) {
config.headers.Authorization = `Bearer ${window.localStorage.getItem('JWT')}`;
}
......
config.url = encodeURI(config.url);
return config;
},
);
interface PendingTask {
config: AxiosRequestConfig;
resolve: Function;
}
// 是否正在刷新的标记
let refreshing = false;
// 重试队列,每一项将是一个待执行的函数形式
const queue: PendingTask[] = [];
axios.interceptors.response.use(
(response: AxiosResponse<KubeResponse<any, any>, KubeRequest>): Promise<KubeResponse<any, any>> => {
......
},
async (error: any): Promise<unknown> => {
if (!(error && error.response)) {
......
} else {
switch (error.response.status) {
case 400:
.....
break;
case 401:
const { config } = error.response;
if (refreshing) {
return new Promise((resolve) => {
queue.push({
config,
resolve,
});
});
}
if (
!validateJWT(window.localStorage.getItem('JWT')) &&
['/login', '/refresh_token', '/403', '/404'].indexOf(window.location.pathname) === -1
) {
refreshing = true;
const res = await refreshToken({ refresh_token: window.localStorage.getItem('refreshtoken') });
refreshing = false;
if (res.code === 1) {
const { token, refresh_token } = res;
store.commit('SET_JWT', token);
store.commit('SET_RefreshToken', refresh_token);
error.config.headers.Authorization = `Bearer ${window.localStorage.getItem('JWT')}`;
// 已经刷新了token,将所有队列中的请求进行重试
queue.forEach(({ config, resolve }) => {
resolve(axios(config));
});
return axios(error.config);
} else {
console.log('refresh_token过期,重新登陆');
store.commit('SET_SNACKBAR', {
text: i18n.t('tip.401'),
color: 'warning',
});
store.commit('CLEARALL');
if (
[
'/login',
'/refresh_token',
'/403',
'/404',
'/white/page',
'/white/tenant',
'/whitecluster/cluster',
].indexOf(window.location.pathname) === -1
) {
router.push({
name: 'login',
query: {
redirect: `${window.location.pathname}${window.location.search}`,
},
});
}
}
}
if (window.location.pathname.startsWith('/oauth/callback')) {
router.push({
name: 'login',
});
}
break;
case 504:
......
break;
default:
store.commit('SET_SNACKBAR', {
text: i18n.t('tip.unknown_error'),
color: 'error',
});
break;
}
}
return new Promise(() => {
return;
});
},
);refreshToken:
// 刷新token
export const refreshToken = (body: { [key: string]: any } = {}): Promise<{ [key: string]: any }> =>
axios.post('api/v1/system/refresh_token/', body);
// validateJWT
export function validateJWT(jwt: string): boolean {
if (!jwt) return false;
const jwtInfo: any = JSON.parse(window.atob(jwt.split('.')[1]));
const now: number = Date.parse(new Date().toString()) / 1000;
return jwtInfo.exp > now;
}然后就是在登陆认证通过后设置accessToken和refreshToken,退出后清空所有token。
SET_JWT(state: { [key: string]: any }, jwt: string): void {
state.JWT = jwt;
window.localStorage.setItem(JWTName, jwt);
},
SET_RefreshToken(state: { [key: string]: any }, refreshtoken: string): void {
state.RefreshToken = refreshtoken;
window.localStorage.setItem(refreshToken, refreshtoken);
},
CLEARALL(state: { [key: string]: any }): void {
delAllCookie();
const locale = window.localStorage.getItem(Locale) || 'zh-Hans';
window.localStorage.clear();
state.JWT = '';
state.RefreshToken = '';
state.User = {};
state.Username = "";
......后端主要代码:jwt2.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
""" jwt operators """
import base64
import json
import time
import cnf
from cnf.system import SystemCnf
import datetime
import jwt
def gen_token(username, access_token_expire_in, refresh_token_expire_in):
""" generate jwt token """
try:
payload = {
'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=access_token_expire_in),
'iat': datetime.datetime.utcnow(),
'iss': 'ken',
'username': username,
# "permissions": permissions,
# "roles": roles
}
access_token = jwt.encode(
payload,
cnf.JWT_SECRET_KEY,
algorithm='HS256'
)
payload2 = {
'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=refresh_token_expire_in),
'iat': datetime.datetime.utcnow(),
'iss': 'ken',
'username': username,
}
# 生成refresh token
refresh_token = jwt.encode(payload2, cnf.JWT_SECRET_KEY, algorithm='HS256')
return True, access_token, refresh_token
except Exception as e:
return e
def re_gen_token(request):
""" 刷新token """
refresh_token = request.json.get('refresh_token', '')
try:
payload = jwt.decode(refresh_token, cnf.JWT_SECRET_KEY, algorithms=['HS256'])
# 生成新的access token 和 refresh_token
success, token, refresh_token = gen_token(payload['username'], SystemCnf.token_plus, SystemCnf.refresh_plus)
return success, {"code": 1, "token": token, "refresh_token": refresh_token}
except jwt.ExpiredSignatureError:
return False, 'Refresh token has expired'
except jwt.InvalidTokenError:
return False, 'Invalid refresh token'
# accessToken & refreshToken过期时间设置
class SystemCnf:
access_token_expires_in = xxx
refresh_token_expires_in = xxx
token_plus = xxx
refresh_plus = xxx登陆相关操作:login.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
""" 登录相关的操作 """
import datetime
from sanic.response import HTTPResponse
from sanic.request import Request
from sanic import Blueprint, response
......
from libs.common.jwt2 import gen_token, re_gen_token
from cnf.system import SystemCnf
bp = Blueprint('system-login', url_prefix='api/v1/system')
@bp.route('/login', name='login', methods=['POST'])
async def login(request: Request) -> HTTPResponse:
""" 登录 """
username = request.json.get("username", "")
password = request.json.get("password", "")
if not username or not password:
return response.json({"code": 0, "message": "用户名密码不能为空"})
# 登陆认证
......
success, token, refresh_token = gen_token(username, SystemCnf.access_token_expires_in, SystemCnf.refresh_token_expires_in)
if success:
return response.json({"code": 1, "token": token, "refresh_token": refresh_token, "permissions": permissions, "roles": roles})
else:
return response.json({"code": 0, 'message': 'Failed to create token'})
@bp.route('/refresh_token', name='refresh_token', methods=['POST'])
async def refresh_token(request: Request) -> HTTPResponse:
""" 刷新token """
success, resp = re_gen_token(request)
if success:
return response.json(resp)
else:
return response.json({"code": 0, 'message': resp})登陆拦截放开/refresh_token接口并使用JWT认证:
import jwt
@app.middleware('request')
def setup_context(request) -> None:
""" token校验 """
if request.path not in cnf.EXCLUDE_PATH and not request.path.startswith("/api/v1/xxx/"):
try:
data = jwt.decode(request.token, cnf.JWT_SECRET_KEY, algorithms=['HS256'])
except jwt.ExpiredSignatureError:
return response.json({"message": "token过期了"}, status=401)
except Exception as e:
return response.json(
{
"message": "Do not access the platform with a token that is modified privately"
},
status=401
)
else:
request.app.ctx.username = data["username"]查看效果:

