initial commit

This commit is contained in:
2020-07-05 17:29:15 +03:00
commit 84ccfd720e
16 changed files with 565 additions and 0 deletions

32
website/__init__.py Normal file
View File

@@ -0,0 +1,32 @@
from flask import Flask
from flask_migrate import Migrate
from flask_login import LoginManager
from flask_sqlalchemy import SQLAlchemy
from .routes import bp
from .models import db, User
from .oauth2 import config_oauth
import config
migrate = Migrate()
login_manager = LoginManager()
login_manager.login_view = 'home.login'
def create_app(cfg):
app = Flask(__name__)
app.config.from_object(cfg)
setup_app(app)
return app
def setup_app(app):
db.init_app(app)
config_oauth(app)
migrate.init_app(app, db)
login_manager.init_app(app)
app.register_blueprint(bp, url_prefix='')
@login_manager.user_loader
def load_user(user_id):
return db.session.query(User).get(user_id)

15
website/forms.py Normal file
View File

@@ -0,0 +1,15 @@
from flask_wtf import FlaskForm
from wtforms import StringField, SubmitField, BooleanField, PasswordField
from wtforms.validators import DataRequired
class LoginForm(FlaskForm):
username = StringField("Username", validators=[DataRequired()])
password = PasswordField("Password", validators=[DataRequired()])
remember = BooleanField("Remember Me")
submit = SubmitField()
class ConfirmAccessForm(FlaskForm):
confirm = BooleanField("Confirm")
submit = SubmitField()

68
website/models.py Normal file
View File

@@ -0,0 +1,68 @@
import time
from werkzeug.security import generate_password_hash, check_password_hash
from flask_sqlalchemy import SQLAlchemy
from authlib.integrations.sqla_oauth2 import (
OAuth2ClientMixin,
OAuth2AuthorizationCodeMixin,
OAuth2TokenMixin,
)
from flask_login import UserMixin
db = SQLAlchemy()
class User(db.Model, UserMixin):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(40), unique=True)
name = db.Column(db.String(100))
email = db.Column(db.String(100), nullable=False, unique=True)
password_hash = db.Column(db.String(100), nullable=False)
clients = db.relationship('OAuth2Client')
auth_codes = db.relationship('OAuth2AuthorizationCode')
tokens = db.relationship('OAuth2Token')
def __repr__(self):
return "<{}:{}>".format(self.id, self.username)
def get_user_id(self):
return self.id
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
class OAuth2Client(db.Model, OAuth2ClientMixin):
__tablename__ = 'oauth2_client'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(
db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'))
user = db.relationship('User')
class OAuth2AuthorizationCode(db.Model, OAuth2AuthorizationCodeMixin):
__tablename__ = 'oauth2_code'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(
db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'))
user = db.relationship('User')
class OAuth2Token(db.Model, OAuth2TokenMixin):
__tablename__ = 'oauth2_token'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(
db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'))
user = db.relationship('User')
def is_refresh_token_active(self):
if self.revoked:
return False
expires_at = self.issued_at + self.expires_in * 2
return expires_at >= time.time()

102
website/oauth2.py Normal file
View File

@@ -0,0 +1,102 @@
from authlib.integrations.flask_oauth2 import (
AuthorizationServer,
ResourceProtector,
)
from authlib.integrations.sqla_oauth2 import (
create_query_client_func,
create_save_token_func,
create_revocation_endpoint,
create_bearer_token_validator,
)
from authlib.oauth2.rfc6749 import grants
from authlib.oauth2.rfc7636 import CodeChallenge
from werkzeug.security import gen_salt
from .models import db, User
from .models import OAuth2Client, OAuth2AuthorizationCode, OAuth2Token
class AuthorizationCodeGrant(grants.AuthorizationCodeGrant):
TOKEN_ENDPOINT_AUTH_METHODS = [
'client_secret_basic',
'client_secret_post',
'none',
]
def save_authorization_code(self, code, request):
code_challenge = request.data.get('code_challenge')
code_challenge_method = request.data.get('code_challenge_method')
auth_code = OAuth2AuthorizationCode(
code=code,
client_id=request.client.client_id,
redirect_uri=request.redirect_uri,
scope=request.scope,
user_id=request.user.id,
code_challenge=code_challenge,
code_challenge_method=code_challenge_method,
)
db.session.add(auth_code)
db.session.commit()
return auth_code
def query_authorization_code(self, code, client):
auth_code = OAuth2AuthorizationCode.query.filter_by(
code=code, client_id=client.client_id).first()
if auth_code and not auth_code.is_expired():
return auth_code
def delete_authorization_code(self, authorization_code):
db.session.delete(authorization_code)
db.session.commit()
def authenticate_user(self, authorization_code):
return User.query.get(authorization_code.user_id)
class PasswordGrant(grants.ResourceOwnerPasswordCredentialsGrant):
def authenticate_user(self, username, password):
user = User.query.filter_by(username=username).first()
if user is not None and user.check_password(password):
return user
class RefreshTokenGrant(grants.RefreshTokenGrant):
def authenticate_refresh_token(self, refresh_token):
token = OAuth2Token.query.filter_by(refresh_token=refresh_token).first()
if token and token.is_refresh_token_active():
return token
def authenticate_user(self, credential):
return User.query.get(credential.user_id)
def revoke_old_credential(self, credential):
credential.revoked = True
db.session.add(credential)
db.session.commit()
query_client = create_query_client_func(db.session, OAuth2Client)
save_token = create_save_token_func(db.session, OAuth2Token)
authorization = AuthorizationServer(
query_client=query_client,
save_token=save_token,
)
require_oauth = ResourceProtector()
def config_oauth(app):
authorization.init_app(app)
# support all grants
authorization.register_grant(grants.ImplicitGrant)
authorization.register_grant(grants.ClientCredentialsGrant)
authorization.register_grant(AuthorizationCodeGrant, [CodeChallenge(required=True)])
authorization.register_grant(PasswordGrant)
authorization.register_grant(RefreshTokenGrant)
# support revocation
revocation_cls = create_revocation_endpoint(db.session, OAuth2Token)
authorization.register_endpoint(revocation_cls)
# protect resource
bearer_cls = create_bearer_token_validator(db.session, OAuth2Token)
require_oauth.register_token_validator(bearer_cls())

85
website/routes.py Normal file
View File

@@ -0,0 +1,85 @@
from flask import Blueprint, Flask, request, render_template, redirect, url_for, flash, jsonify, make_response, session
from flask_login import login_required, login_user, current_user, logout_user
from authlib.integrations.flask_oauth2 import current_token
from authlib.oauth2 import OAuth2Error
from .models import User, OAuth2Client, db
from .forms import LoginForm, ConfirmAccessForm
from .oauth2 import authorization, require_oauth
from werkzeug.exceptions import abort
bp = Blueprint('home', __name__)
@bp.route('/')
def home():
user = None
if current_user.is_authenticated:
clients = current_user.clients
user = current_user.name
else:
clients = []
return render_template('home.html', user=user, clients=clients)
@bp.route('/login/', methods=['post', 'get'])
def login():
if current_user.is_authenticated:
return redirect(url_for('.home'))
form = LoginForm()
if form.validate_on_submit():
user = db.session.query(User).filter(User.username == form.username.data).first()
if user and user.check_password(form.password.data):
login_user(user, remember=form.remember.data)
nextpage = request.args.get('next', url_for('.home'))
return redirect(nextpage)
else:
flash("Invalid username/password", 'error')
return render_template('login.html', form=form)
@bp.route('/logout/')
@login_required
def logout():
logout_user()
flash("You have been logged out.")
return redirect(url_for('.home'))
@bp.route('/oauth/token', methods=['POST'])
def issue_token():
return authorization.create_token_response()
@bp.route('/oauth/revoke', methods=['POST'])
def revoke_token():
return authorization.create_endpoint_response('revocation')
@bp.route('/oauth/authorize', methods=['GET', 'POST'])
@login_required
def authorize():
user = current_user
grant_user = None
form = ConfirmAccessForm()
if request.method == 'GET':
try:
grant = authorization.validate_consent_request(end_user=user)
except OAuth2Error as error:
return error.error
return render_template('authorize.html', user=user, grant=grant, form=form)
if form.validate_on_submit():
if form.confirm.data:
grant_user = user
return authorization.create_authorization_response(grant_user=grant_user)
@bp.route('/api/me')
@require_oauth('profile')
def api_me():
user = current_token.user
return jsonify(id=user.id, username=user.username)

View File

@@ -0,0 +1,34 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
{% for category, message in get_flashed_messages(with_categories=true) %}
<spam class="{{ category }}">{{ message }}</spam>
{% endfor %}<br/>
<div>Logged in as <strong>{{user.name}}</strong> (<a href="{{ url_for('.logout') }}">Log Out</a>)</div><br/>
<p>The application <strong>{{grant.client.client_name}}</strong> is requesting:
<strong>{{ grant.request.scope }}</strong>
</p>
<p>
from You - a.k.a. <strong>{{ user.username }}</strong>
</p>
<form action="" method="post">
{{ form.csrf_token }}
<p>
{{ form.confirm.label() }}
{{ form.confirm() }}
</p>
<p>
{{ form.submit() }}
</p>
</form>
</body>
</html>

View File

@@ -0,0 +1,32 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
{% for category, message in get_flashed_messages(with_categories=true) %}
<spam class="{{ category }}">{{ message }}</spam>
{% endfor %}<br/>
{% if user %}
<style>pre{white-space:wrap}</style>
<div>Logged in as <strong>{{user.name}}</strong> (<a href="{{ url_for('.logout') }}">Log Out</a>)</div>
<br/><div><h3>Clients:</h3>
{% for client in clients %}
<pre>
{{ client.client_info|tojson }}
{{ client.client_metadata|tojson }}
</pre>
<hr>
{% endfor %}
<br/></div>
{% else %}
<br><p>Please, <a href="{{ url_for('.login') }}">Login</a></p>
{% endif %}
</body>
</html>

View File

@@ -0,0 +1,43 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Login</title>
</head>
<body>
{% for category, message in get_flashed_messages(with_categories=true) %}
<spam class="{{ category }}">{{ message }}</spam>
{% endfor %}
<form action="" method="post">
{{ form.csrf_token }}
<p>
{{ form.username.label() }}
{{ form.username() }}
{% if form.username.errors %}
{% for error in form.username.errors %}
{{ error }}
{% endfor %}
{% endif %}
</p>
<p>
{{ form.password.label() }}
{{ form.password() }}
{% if form.password.errors %}
{% for error in form.password.errors %}
{{ error }}
{% endfor %}
{% endif %}
</p>
<p>
{{ form.remember.label() }}
{{ form.remember() }}
</p>
<p>
{{ form.submit() }}
</p>
</form>
</body>
</html>