create a JWT when an altcha is verified
This commit is contained in:
@@ -16,14 +16,21 @@ from fastapi import Body, Depends, FastAPI, Form, HTTPException
|
|||||||
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
|
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
|
||||||
from fastapi.middleware.cors import CORSMiddleware
|
from fastapi.middleware.cors import CORSMiddleware
|
||||||
|
|
||||||
|
from pydantic import AfterValidator, Base64Str
|
||||||
|
|
||||||
from sqlalchemy import create_engine
|
from sqlalchemy import create_engine
|
||||||
|
|
||||||
from pwdlib import PasswordHash
|
from pwdlib import PasswordHash
|
||||||
|
|
||||||
from altcha import ChallengeOptions, create_challenge, verify_solution
|
from altcha import ChallengeOptions, create_challenge
|
||||||
|
|
||||||
|
import jwt
|
||||||
|
|
||||||
|
from uuid import uuid4
|
||||||
|
|
||||||
|
|
||||||
from slopserver.models import Domain, Path, User
|
from slopserver.models import Domain, Path, User
|
||||||
from slopserver.models import SlopReport, SignupForm
|
from slopserver.models import SlopReport, SignupForm, altcha_validator
|
||||||
from slopserver.db import select_slop, insert_slop, get_user, create_user
|
from slopserver.db import select_slop, insert_slop, get_user, create_user
|
||||||
from slopserver.common import TEMP_HMAC_KEY
|
from slopserver.common import TEMP_HMAC_KEY
|
||||||
|
|
||||||
@@ -92,7 +99,7 @@ async def signup_form(form_data: Annotated[SignupForm, Form()]):
|
|||||||
# create user
|
# create user
|
||||||
create_user(form_data.email, get_password_hash(form_data.password), TEMP_ENGINE)
|
create_user(form_data.email, get_password_hash(form_data.password), TEMP_ENGINE)
|
||||||
|
|
||||||
@app.get("/challenge")
|
@app.get("/altcha-challenge")
|
||||||
async def altcha_challenge():
|
async def altcha_challenge():
|
||||||
options = ChallengeOptions(
|
options = ChallengeOptions(
|
||||||
expires=datetime.now() + timedelta(minutes=10),
|
expires=datetime.now() + timedelta(minutes=10),
|
||||||
@@ -102,5 +109,26 @@ async def altcha_challenge():
|
|||||||
challenge = create_challenge(options)
|
challenge = create_challenge(options)
|
||||||
return challenge
|
return challenge
|
||||||
|
|
||||||
|
@app.post("/altcha-challenge")
|
||||||
|
async def altcha_verify(payload: Annotated[Base64Str, AfterValidator(altcha_validator)]):
|
||||||
|
# if verified, return a JWT for anonymous API access
|
||||||
|
expiration = datetime.now() + timedelta(days=30)
|
||||||
|
uuid = uuid4()
|
||||||
|
bearer_token = {
|
||||||
|
"iss": "slopserver",
|
||||||
|
"exp": int(expiration.timestamp()),
|
||||||
|
"aud": "slopserver",
|
||||||
|
"sub": str(uuid),
|
||||||
|
"client_id": str(uuid),
|
||||||
|
"iat": int(datetime.now().timestamp()),
|
||||||
|
"jti": str(uuid)
|
||||||
|
}
|
||||||
|
|
||||||
|
encoded_jwt = jwt.encode(bearer_token, TEMP_SECRET, ALGO)
|
||||||
|
|
||||||
|
return encoded_jwt
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
uvicorn.run(app, host="0.0.0.0", port=8000)
|
uvicorn.run(app, host="0.0.0.0", port=8000)
|
||||||
Reference in New Issue
Block a user