JWT 认证及其在 FastAPI 中的使用

本文最后更新于 2 年前,文中所描述的信息可能已发生改变。

什么是 JWT

JWT: JSON Web Tokens,它是一种将 JSON 对象编码为没有空格,且难以理解的长字符串的标准。在具体上,它就是一段字符串,下面就是 FastAPI 文档中给出的例子

txt
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c

抽象地把它看成是这样的

txt
aaaaaaaaaaaa.bbbbbbbbbbbb.cccccccccccc

jwt 由三个部分组成,这三个部分之间用点.拼接起来,这三个部分分别是

  • Header — 头,定义了 jwt 使用的算法等信息
  • Payload — 载荷,存储有效信息的地方
  • Signature — 签名,用于校验 jwt 令牌

这三个部分中,只有第三个部分是无法还原的,前两个部分只是简单的进行了 base64 编码,把上面例子中的前两部分解码之后,看起来是这样的:

Header:

json
{
    "alg":"HS256",
    "typ":"JWT"
}

alg 即为算法,这里使用了 HS256 算法

typ 即为类型,指明这里是一个 JWT 字符串

Payload

json
{
    "sub":"1234567890",
    "name":"John Doe",
    "iat":1516239022
}

可以看到 Payload 中存储了主要的信息

提示提示

根据 JWT 规范, Payload 中的 sub 是一个预定义的声明(Claim),表示JWT的主题(Subject)。它指定了JWT所代表的实体或主题,通常是用户或客户端应用程序。 sub声明的值是字符串类型,通常是唯一标识用户或客户端应用程序的ID。

而 Signature 则是通过加盐加密得到的,加密由服务端实现

关于具体如何实现关于具体如何实现

参考下图

图 1

即,把 header 和 payload 部分进行 base64编码,用点拼接起来,得到结果,把这个结果用我们设置的密钥和算法进行加盐加密,就得到了 Signature

JWT 有什么用

其实它与 token ,cookie 的作用类似,就是用来免密码认证客户端

比如你去上学,学校是服务端,你是客户端.你完成学籍注册之后,学校发给你一个学生证,学生证就是你的 jwt 令牌.

你使用学校的很多服务,都要出示学生证.

而在实际业务中,jwt 令牌是有过期时间的,过期之后,需要客户端再次登录,获取新的 jwt

FastAPI 实现 Bearer JWT 令牌验证

需要安装以下几个包

python-jose,在 Python 中生成和校验 JWT 令牌

bash
pip install python-jose[cryptography]

Passlib ,处理密码哈希

bash
pip install passlib[bcrypt]

WARNING注意

密码哈希和 jwt 的加密不是同一个问题.

哈希是指把特定内容(本例中为密码)转换为乱码形式的字节序列(其实就是字符串)。

每次传入完全相同的内容时(比如,完全相同的密码),返回的都是完全相同的乱码。

但这个乱码无法转换回传入的密码。

把密码进行哈希并存储是必要的安全措施

先不考虑用户注册的问题,假设我们已经有了一个在数据库中的用户,那么这里的逻辑是这样的:

  1. 该用户通过用户名和密码登录
  2. 服务端对其进行校验和认证
  3. 完成认证后,用户拿到 jwt 令牌(token)
  4. 用户使用 jwt 令牌(token)访问其他服务

所以,我们需要:

  1. 一个存储了用户信息的数据库,其中密码存储的是哈希值
  2. 用户和 jwt 令牌(token)的模型
  3. 用于从数据库中获得用户信息,并验证的函数. 包括获取用户,哈希传入的密码,验证用户等
  4. 用于创建 jwt 令牌的函数
  5. 给用户登录并获取 jwt 令牌(token)的接口
  6. 验证用户的 jwt 是否正确和有效

数据库与模型

呃,为了简便和偷懒,就用 FastAPI 文档中的假数据库吧

python
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
        "disabled": False,
    }
}

定义之后要用到的模型

python
from pydantic import BaseModel
from typing import Union

class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: Union[str, None] = None


class User(BaseModel):
    username: str
    email: Union[str, None] = None
    full_name: Union[str, None] = None
    disabled: Union[bool, None] = None


class UserInDB(User):
    hashed_password: str

Token 模型用于在登录接口中指定 response_model

TokenData 用于从 token 中获取数据

User 即用户模型,供外部使用

UserInDB 是用户在数据库中的模型,它继承自 User 添加了 hashed_password 属性

这些模型在之后会用到

对用户名和密码的校验与认证

这部分比较简单,就是把用户输入的密码哈希之后和数据库中的对比就好了

python
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def verify_password(plain_password, hashed_password):
    """
    校验密码是否正确
    将传入的明文密码与数据库中的哈希值进行比对
    """
    return pwd_context.verify(plain_password, hashed_password)


def get_password_hash(password):
    """获取传入的密码的哈希值"""
    return pwd_context.hash(password)


def get_user(db, username: str):
    """根据用户名从指定数据库中获取用户"""
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


def authenticate_user(fake_db, username: str, password: str):
    """身份验证, 返回用户对象或者 False"""
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user

创建 jwt token

完成认证之后,就可以给用户创建 token 了

首先,定义 jwt 的加密密钥,加密算法和过期时间

python
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

注意注意

在实际项目中,jwt 的密钥不应该直接写在代码中,而是应该从环境变量中读取

例如,从环境变量中获取 SECRET_KEY

python
import os
SECRET_KEY = os.environ.get("SECRET_KEY")

然后写创建 token 的函数

python
from datetime import datetime, timedelta
from jose import JWTError, jwt

def create_access_token(data: dict, expires_delta=None):
    """创建 jwt token"""
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    # 注意这里就用到了上面定义的 SECRET_KEY 和 ALGORITHM
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

上面这个函数把传入的用于获取 jwt 的数据进行拷贝,并添加过期时间,然后 encode 得到 jwt token 并返回

至此,我们完成了对用户的校验和认证,以及创建 jwt token 的功能,下面我们要在接口中使用它们

提供登录并创建token的接口

python
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    """登录并获取 token"""
    # 通过用户名和密码获取用户, 如果用户不存在或者密码错误, 则抛出异常
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)

    if not user:
        raise HTTPException(
            # 这部分是规范, 401 表示未授权
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )

    # 上面已经相当于进行了用户身份验证, 所以这里可以直接创建 token
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    # 这里如何返回也是规范, 我们已在上面已经定义好了 response_model=Token
    return {"access_token": access_token, "token_type": "bearer"}

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") 声明了用户应该从哪个路由获取 token (相对路径),但是并未实现具体的功能

而我们自己写的 login_for_access_token 则是实现.

现在用户成功登录后,就拿到了 jwt token ,访问其他服务时,浏览器会携带 jwt token 一起发送请求

我们只需要在其他接口中,对用户传来的 jwt token 进行验证即可

在其他接口中验证jwt token

比如我们有一个获取用户信息的接口 read_users_me ,那么,我们需要:

  1. 获取用户传来的 token
  2. 验证 token 是否正确和有效
  3. 通过 token 获取当前的用户信息

首先,解密 jwt token ,获取当前用户:

python
async def get_current_user(token: str = Depends(oauth2_scheme)):
    """依赖函数, 用于获取当前用户"""
    # 定义一个异常, 用于在用户身份验证失败时抛出
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )

    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        # 从 payload 中获取 username
        # 关于 sub 的解释, 可以参考文档
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user

async def get_current_active_user(current_user: User = Depends(get_current_user)):
    """依赖函数, 用于获取当前活跃的用户,用到了上面的依赖函数 get_current_user
    为什么 current_user 可以是 User 类型而不是 get_current_user 返回的 UserInDB 类型呢?
    因为 UserInDB 类型是 User 类型的子类, 所以可以直接赋值给 User 类型的变量
    """
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user

在 数据库与模型 中, 我们定义的 TokenData 就是这里要用的

因为只是演示,所以这个 TokenData 类显得有点多此一举.

但是如果你需要传递多个 JWT 数据,或者有多个函数需要访问 JWT 数据,那么 TokenData 类可以方便地将这些数据打包在一起,并在函数之间传递它们。

之后就可以在接口中利用依赖注入,验证用户请求

python
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    """获取当前用户信息"""
    return current_user

总结

JWT 是一种认证方式,它的本质是一段带有签名的字符串.

在 Python 中,我们可以使用 jwt 模块来创建和解析 jwt token.

在 FastAPI 中,我们可以使用 OAuth2PasswordBearer 来获取用户传来的 token,然后使用 jwt 模块来解析 token,从而获取用户信息.

参考

JWT 官方文档

FastAPI 官方文档 - 安全性

FastAPI 官方文档 - 依赖注入

Debian/Linux安装最新Python版本及多版本环境管理
ArchLinux on Y9000P2022十分好用,就是有点难用