oauth/oauth.py
author durandn
Wed, 05 Oct 2016 14:24:32 +0200
changeset 124 983e72b4bc45
parent 58 c56ca9e06cc8
permissions -rw-r--r--
Added tag 00.00.21 for changeset 15724968d6e6

# coding: utf-8

from datetime import datetime, timedelta
from flask import Flask
from flask import session, request
from flask import render_template, redirect, jsonify
from flask_sqlalchemy import SQLAlchemy
from werkzeug.security import gen_salt
from flask_oauthlib.provider import OAuth2Provider
from settings.oauth_settings import OAuthSettings
import uuid

app = Flask(__name__, template_folder='templates')
app.debug = True
app.secret_key = 'secret'
app.config.from_object(OAuthSettings)
app.config.update({
    'SQLALCHEMY_DATABASE_URI': 'sqlite:///db.sqlite',
})
db = SQLAlchemy(app)
oauth = OAuth2Provider(app)


class User(db.Model):
    id = db.Column(db.String(256), primary_key=True)
    username = db.Column(db.String(40), unique=True)
    uai = db.Column(db.String(40), default="uaidefault")

class Client(db.Model):
    client_id = db.Column(db.String(40), primary_key=True)
    client_secret = db.Column(db.String(55), nullable=False)
    client_type = db.Column(db.String(12), nullable=False, default='public')

    user_id = db.Column(db.ForeignKey('user.id'))
    user = db.relationship('User')
    
    _redirect_uris = db.Column(db.Text)
    _default_scopes = db.Column(db.Text)

    @property
    def redirect_uris(self):
        if self._redirect_uris:
            return self._redirect_uris.split()
        return []

    @property
    def default_redirect_uri(self):
        return self.redirect_uris[0]

    @property
    def default_scopes(self):
        if self._default_scopes:
            return self._default_scopes.split()
        return []


class Grant(db.Model):
    id = db.Column(db.Integer, primary_key=True)

    user_id = db.Column(
        db.Integer, db.ForeignKey('user.id', ondelete='CASCADE')
    )
    user = db.relationship('User')

    client_id = db.Column(
        db.String(40), db.ForeignKey('client.client_id'),
        nullable=False,
    )
    client = db.relationship('Client')

    code = db.Column(db.String(255), index=True, nullable=False)

    redirect_uri = db.Column(db.String(255))
    expires = db.Column(db.DateTime)

    _scopes = db.Column(db.Text)

    def delete(self):
        db.session.delete(self)
        db.session.commit()
        return self

    @property
    def scopes(self):
        if self._scopes:
            return self._scopes.split()
        return []


class Token(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    client_id = db.Column(
        db.String(40), db.ForeignKey('client.client_id'),
        nullable=False,
    )
    client = db.relationship('Client')

    user_id = db.Column(
        db.Integer, db.ForeignKey('user.id')
    )
    user = db.relationship('User')

    # currently only bearer is supported
    token_type = db.Column(db.String(40))

    access_token = db.Column(db.String(255), unique=True)
    refresh_token = db.Column(db.String(255), unique=True, nullable=True)
    expires = db.Column(db.DateTime)
    _scopes = db.Column(db.Text)

    @property
    def scopes(self):
        if self._scopes:
            return self._scopes.split()
        return []


def current_user():
    if 'id' in session:
        uid = session['id']
        return User.query.get(uid)
    print(session)
    return None


@app.route('/', methods=('GET', 'POST'))
def home():
    if request.method == 'POST':
        username = request.form.get('username')
        user = User.query.filter_by(username=username).first()
        if not user:
            user = User(id=str(uuid.uuid4()), username=username)
            db.session.add(user)
            db.session.commit()
        session['id'] = user.id
        return redirect('/')
    user = current_user()
    return render_template('oauth/home.html', user=user)

@oauth.clientgetter
def load_client(client_id):
    return Client.query.filter_by(client_id=client_id).first()


@oauth.grantgetter
def load_grant(client_id, code):
    return Grant.query.filter_by(client_id=client_id, code=code).first()


@oauth.grantsetter
def save_grant(client_id, code, request, *args, **kwargs):
    # decide the expires time yourself
    expires = datetime.utcnow() + timedelta(seconds=100)
    grant = Grant(
        client_id=client_id,
        code=code['code'],
        redirect_uri=request.redirect_uri,
        _scopes=' '.join(request.scopes),
        user=current_user(),
        expires=expires
    )
    db.session.add(grant)
    db.session.commit()
    return grant


@oauth.tokengetter
def load_token(access_token=None, refresh_token=None):
    if access_token:
        return Token.query.filter_by(access_token=access_token).first()
    elif refresh_token:
        return Token.query.filter_by(refresh_token=refresh_token).first()


@oauth.tokensetter
def save_token(token, request, *args, **kwargs):
    toks = Token.query.filter_by(
        client_id=request.client.client_id,
        user_id=request.user.id
    )
    # make sure that every client has only one token connected to a user
    for t in toks:
        db.session.delete(t)

    expires_in = token.pop('expires_in')
    expires = datetime.utcnow() + timedelta(seconds=expires_in)

    tok = Token(
        access_token=token['access_token'],
        token_type=token['token_type'],
        _scopes=token['scope'],
        expires=expires,
        client_id=request.client.client_id,
        user_id=request.user.id,
    )
    db.session.add(tok)
    db.session.commit()
    return tok


@app.route('/oauth/oauth2/token', methods=['GET', 'POST'])
@oauth.token_handler
def access_token():
    return None


@app.route('/oauth/oauth2/authorize', methods=['GET', 'POST'])
@oauth.authorize_handler
def authorize(*args, **kwargs):
    user = current_user()
    if not user:
        return redirect('/')
    if request.method == 'GET':
        client_id = kwargs.get('client_id')
        client = Client.query.filter_by(client_id=client_id).first()
        kwargs['client'] = client
        kwargs['user'] = user
        return render_template('oauth/authorize.html', **kwargs)

    confirm = request.form.get('confirm', 'no')
    return confirm == 'yes'


@app.route('/rest/user/InfoComplete')
@oauth.require_oauth()
def user_info():
    user = request.oauth.user
    return jsonify(id=user.id, displayName=user.username, ENTPersonStructRattachUAI=user.uai)

@app.route('/rest/oauth/validate/<token>')
def validate_token(token):
    database_token = Token.query.filter_by(access_token=token).first()
    uris = ""
    scopes = ""
    if database_token is not None:
        related_client = database_token.client
        scopes = database_token.scopes
        uris = related_client.redirect_uris
    if database_token is not None and database_token.access_token == token:
        validate_errors = "0"
        error_description = ""
    else:
        validate_errors = "1"
        error_description = "token not found in db?"
    return jsonify(
        access_token=token,
        uriredirect= uris, 
        error=validate_errors,
        description= error_description,
        scope=scopes
    )

@app.route('/ws/resource/', methods=["POST", "PUT"])
@oauth.require_oauth()
def reference_resource():
    print("#########################")
    print(request.headers)
    print("#########################")
    print(request.data)
    return "Request is valid", 200

def init_client(client_id, client_secret, redirect_uris, client_owner, confidential=False):
    client = Client.query.filter_by(client_id=client_id, client_secret=client_secret).first()
    if not client:
        print("Creating client for "+client_owner)
        user = User.query.filter_by(username=client_owner).first()
        if not user:
            user = User(username=client_owner)
            db.session.add(user)
            db.session.commit()
        if confidential:
            type="confidential"
        else:
            type="public"
        client = Client(
            client_id=client_id,
            client_secret=client_secret,
            _redirect_uris=' '.join(redirect_uris),
            _default_scopes='basic',
            user_id=user.id,
            client_type=type
        )
        db.session.add(client)
        db.session.commit()

if __name__ == '__main__':
    db.create_all()
    init_client(
        client_id=app.config["RENKAN_CLIENT_ID"], 
        client_secret=app.config["RENKAN_CLIENT_SECRET"], 
        redirect_uris=app.config["RENKAN_REDIRECT_URIS"], 
        client_owner=app.config["RENKAN_SERVER_USER"],
        confidential=True
    )
    init_client(
        client_id=app.config["MOCK_GED_CLIENT_ID"], 
        client_secret=app.config["MOCK_GED_CLIENT_SECRET"], 
        redirect_uris=app.config["MOCK_GED_REDIRECT_URIS"], 
        client_owner=app.config["MOCK_GED_SERVER_USER"],
        confidential=True
    )
    app.run()