Reformat with black

https://github.com/python/black
This commit is contained in:
David Baumgold 2019-05-06 15:39:41 -04:00
parent 5b309831ec
commit 025589ee76
63 changed files with 3784 additions and 3459 deletions

View file

@ -8,14 +8,14 @@ def create_app(test_config=None):
app = Flask(__name__, instance_relative_config=True)
app.config.from_mapping(
# a default secret that should be overridden by instance config
SECRET_KEY='dev',
SECRET_KEY="dev",
# store the database in the instance folder
DATABASE=os.path.join(app.instance_path, 'flaskr.sqlite'),
DATABASE=os.path.join(app.instance_path, "flaskr.sqlite"),
)
if test_config is None:
# load the instance config, if it exists, when not testing
app.config.from_pyfile('config.py', silent=True)
app.config.from_pyfile("config.py", silent=True)
else:
# load the test config if passed in
app.config.update(test_config)
@ -26,16 +26,18 @@ def create_app(test_config=None):
except OSError:
pass
@app.route('/hello')
@app.route("/hello")
def hello():
return 'Hello, World!'
return "Hello, World!"
# register the database commands
from flaskr import db
db.init_app(app)
# apply the blueprints to the app
from flaskr import auth, blog
app.register_blueprint(auth.bp)
app.register_blueprint(blog.bp)
@ -43,6 +45,6 @@ def create_app(test_config=None):
# in another app, you might define a separate main index here with
# app.route, while giving the blog blueprint a url_prefix, but for
# the tutorial the blog will be the main index
app.add_url_rule('/', endpoint='index')
app.add_url_rule("/", endpoint="index")
return app

View file

@ -1,21 +1,29 @@
import functools
from flask import (
Blueprint, flash, g, redirect, render_template, request, session, url_for
Blueprint,
flash,
g,
redirect,
render_template,
request,
session,
url_for,
)
from werkzeug.security import check_password_hash, generate_password_hash
from flaskr.db import get_db
bp = Blueprint('auth', __name__, url_prefix='/auth')
bp = Blueprint("auth", __name__, url_prefix="/auth")
def login_required(view):
"""View decorator that redirects anonymous users to the login page."""
@functools.wraps(view)
def wrapped_view(**kwargs):
if g.user is None:
return redirect(url_for('auth.login'))
return redirect(url_for("auth.login"))
return view(**kwargs)
@ -26,83 +34,84 @@ def login_required(view):
def load_logged_in_user():
"""If a user id is stored in the session, load the user object from
the database into ``g.user``."""
user_id = session.get('user_id')
user_id = session.get("user_id")
if user_id is None:
g.user = None
else:
g.user = get_db().execute(
'SELECT * FROM user WHERE id = ?', (user_id,)
).fetchone()
g.user = (
get_db().execute("SELECT * FROM user WHERE id = ?", (user_id,)).fetchone()
)
@bp.route('/register', methods=('GET', 'POST'))
@bp.route("/register", methods=("GET", "POST"))
def register():
"""Register a new user.
Validates that the username is not already taken. Hashes the
password for security.
"""
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
if request.method == "POST":
username = request.form["username"]
password = request.form["password"]
db = get_db()
error = None
if not username:
error = 'Username is required.'
error = "Username is required."
elif not password:
error = 'Password is required.'
elif db.execute(
'SELECT id FROM user WHERE username = ?', (username,)
).fetchone() is not None:
error = 'User {0} is already registered.'.format(username)
error = "Password is required."
elif (
db.execute("SELECT id FROM user WHERE username = ?", (username,)).fetchone()
is not None
):
error = "User {0} is already registered.".format(username)
if error is None:
# the name is available, store it in the database and go to
# the login page
db.execute(
'INSERT INTO user (username, password) VALUES (?, ?)',
(username, generate_password_hash(password))
"INSERT INTO user (username, password) VALUES (?, ?)",
(username, generate_password_hash(password)),
)
db.commit()
return redirect(url_for('auth.login'))
return redirect(url_for("auth.login"))
flash(error)
return render_template('auth/register.html')
return render_template("auth/register.html")
@bp.route('/login', methods=('GET', 'POST'))
@bp.route("/login", methods=("GET", "POST"))
def login():
"""Log in a registered user by adding the user id to the session."""
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
if request.method == "POST":
username = request.form["username"]
password = request.form["password"]
db = get_db()
error = None
user = db.execute(
'SELECT * FROM user WHERE username = ?', (username,)
"SELECT * FROM user WHERE username = ?", (username,)
).fetchone()
if user is None:
error = 'Incorrect username.'
elif not check_password_hash(user['password'], password):
error = 'Incorrect password.'
error = "Incorrect username."
elif not check_password_hash(user["password"], password):
error = "Incorrect password."
if error is None:
# store the user id in a new session and return to the index
session.clear()
session['user_id'] = user['id']
return redirect(url_for('index'))
session["user_id"] = user["id"]
return redirect(url_for("index"))
flash(error)
return render_template('auth/login.html')
return render_template("auth/login.html")
@bp.route('/logout')
@bp.route("/logout")
def logout():
"""Clear the current session, including the stored user id."""
session.clear()
return redirect(url_for('index'))
return redirect(url_for("index"))

View file

@ -1,24 +1,22 @@
from flask import (
Blueprint, flash, g, redirect, render_template, request, url_for
)
from flask import Blueprint, flash, g, redirect, render_template, request, url_for
from werkzeug.exceptions import abort
from flaskr.auth import login_required
from flaskr.db import get_db
bp = Blueprint('blog', __name__)
bp = Blueprint("blog", __name__)
@bp.route('/')
@bp.route("/")
def index():
"""Show all the posts, most recent first."""
db = get_db()
posts = db.execute(
'SELECT p.id, title, body, created, author_id, username'
' FROM post p JOIN user u ON p.author_id = u.id'
' ORDER BY created DESC'
"SELECT p.id, title, body, created, author_id, username"
" FROM post p JOIN user u ON p.author_id = u.id"
" ORDER BY created DESC"
).fetchall()
return render_template('blog/index.html', posts=posts)
return render_template("blog/index.html", posts=posts)
def get_post(id, check_author=True):
@ -33,78 +31,80 @@ def get_post(id, check_author=True):
:raise 404: if a post with the given id doesn't exist
:raise 403: if the current user isn't the author
"""
post = get_db().execute(
'SELECT p.id, title, body, created, author_id, username'
' FROM post p JOIN user u ON p.author_id = u.id'
' WHERE p.id = ?',
(id,)
).fetchone()
post = (
get_db()
.execute(
"SELECT p.id, title, body, created, author_id, username"
" FROM post p JOIN user u ON p.author_id = u.id"
" WHERE p.id = ?",
(id,),
)
.fetchone()
)
if post is None:
abort(404, "Post id {0} doesn't exist.".format(id))
if check_author and post['author_id'] != g.user['id']:
if check_author and post["author_id"] != g.user["id"]:
abort(403)
return post
@bp.route('/create', methods=('GET', 'POST'))
@bp.route("/create", methods=("GET", "POST"))
@login_required
def create():
"""Create a new post for the current user."""
if request.method == 'POST':
title = request.form['title']
body = request.form['body']
if request.method == "POST":
title = request.form["title"]
body = request.form["body"]
error = None
if not title:
error = 'Title is required.'
error = "Title is required."
if error is not None:
flash(error)
else:
db = get_db()
db.execute(
'INSERT INTO post (title, body, author_id)'
' VALUES (?, ?, ?)',
(title, body, g.user['id'])
"INSERT INTO post (title, body, author_id)" " VALUES (?, ?, ?)",
(title, body, g.user["id"]),
)
db.commit()
return redirect(url_for('blog.index'))
return redirect(url_for("blog.index"))
return render_template('blog/create.html')
return render_template("blog/create.html")
@bp.route('/<int:id>/update', methods=('GET', 'POST'))
@bp.route("/<int:id>/update", methods=("GET", "POST"))
@login_required
def update(id):
"""Update a post if the current user is the author."""
post = get_post(id)
if request.method == 'POST':
title = request.form['title']
body = request.form['body']
if request.method == "POST":
title = request.form["title"]
body = request.form["body"]
error = None
if not title:
error = 'Title is required.'
error = "Title is required."
if error is not None:
flash(error)
else:
db = get_db()
db.execute(
'UPDATE post SET title = ?, body = ? WHERE id = ?',
(title, body, id)
"UPDATE post SET title = ?, body = ? WHERE id = ?", (title, body, id)
)
db.commit()
return redirect(url_for('blog.index'))
return redirect(url_for("blog.index"))
return render_template('blog/update.html', post=post)
return render_template("blog/update.html", post=post)
@bp.route('/<int:id>/delete', methods=('POST',))
@bp.route("/<int:id>/delete", methods=("POST",))
@login_required
def delete(id):
"""Delete a post.
@ -114,6 +114,6 @@ def delete(id):
"""
get_post(id)
db = get_db()
db.execute('DELETE FROM post WHERE id = ?', (id,))
db.execute("DELETE FROM post WHERE id = ?", (id,))
db.commit()
return redirect(url_for('blog.index'))
return redirect(url_for("blog.index"))

View file

@ -10,10 +10,9 @@ def get_db():
is unique for each request and will be reused if this is called
again.
"""
if 'db' not in g:
if "db" not in g:
g.db = sqlite3.connect(
current_app.config['DATABASE'],
detect_types=sqlite3.PARSE_DECLTYPES
current_app.config["DATABASE"], detect_types=sqlite3.PARSE_DECLTYPES
)
g.db.row_factory = sqlite3.Row
@ -24,7 +23,7 @@ def close_db(e=None):
"""If this request connected to the database, close the
connection.
"""
db = g.pop('db', None)
db = g.pop("db", None)
if db is not None:
db.close()
@ -34,16 +33,16 @@ def init_db():
"""Clear existing data and create new tables."""
db = get_db()
with current_app.open_resource('schema.sql') as f:
db.executescript(f.read().decode('utf8'))
with current_app.open_resource("schema.sql") as f:
db.executescript(f.read().decode("utf8"))
@click.command('init-db')
@click.command("init-db")
@with_appcontext
def init_db_command():
"""Clear existing data and create new tables."""
init_db()
click.echo('Initialized the database.')
click.echo("Initialized the database.")
def init_app(app):

View file

@ -2,28 +2,21 @@ import io
from setuptools import find_packages, setup
with io.open('README.rst', 'rt', encoding='utf8') as f:
with io.open("README.rst", "rt", encoding="utf8") as f:
readme = f.read()
setup(
name='flaskr',
version='1.0.0',
url='http://flask.pocoo.org/docs/tutorial/',
license='BSD',
maintainer='Pallets team',
maintainer_email='contact@palletsprojects.com',
description='The basic blog app built in the Flask tutorial.',
name="flaskr",
version="1.0.0",
url="http://flask.pocoo.org/docs/tutorial/",
license="BSD",
maintainer="Pallets team",
maintainer_email="contact@palletsprojects.com",
description="The basic blog app built in the Flask tutorial.",
long_description=readme,
packages=find_packages(),
include_package_data=True,
zip_safe=False,
install_requires=[
'flask',
],
extras_require={
'test': [
'pytest',
'coverage',
],
},
install_requires=["flask"],
extras_require={"test": ["pytest", "coverage"]},
)

View file

@ -6,8 +6,8 @@ from flaskr import create_app
from flaskr.db import get_db, init_db
# read in SQL for populating test data
with open(os.path.join(os.path.dirname(__file__), 'data.sql'), 'rb') as f:
_data_sql = f.read().decode('utf8')
with open(os.path.join(os.path.dirname(__file__), "data.sql"), "rb") as f:
_data_sql = f.read().decode("utf8")
@pytest.fixture
@ -16,10 +16,7 @@ def app():
# create a temporary file to isolate the database for each test
db_fd, db_path = tempfile.mkstemp()
# create the app with common test config
app = create_app({
'TESTING': True,
'DATABASE': db_path,
})
app = create_app({"TESTING": True, "DATABASE": db_path})
# create the database and load test data
with app.app_context():
@ -49,14 +46,13 @@ class AuthActions(object):
def __init__(self, client):
self._client = client
def login(self, username='test', password='test'):
def login(self, username="test", password="test"):
return self._client.post(
'/auth/login',
data={'username': username, 'password': password}
"/auth/login", data={"username": username, "password": password}
)
def logout(self):
return self._client.get('/auth/logout')
return self._client.get("/auth/logout")
@pytest.fixture

View file

@ -5,54 +5,55 @@ from flaskr.db import get_db
def test_register(client, app):
# test that viewing the page renders without template errors
assert client.get('/auth/register').status_code == 200
assert client.get("/auth/register").status_code == 200
# test that successful registration redirects to the login page
response = client.post(
'/auth/register', data={'username': 'a', 'password': 'a'}
)
assert 'http://localhost/auth/login' == response.headers['Location']
response = client.post("/auth/register", data={"username": "a", "password": "a"})
assert "http://localhost/auth/login" == response.headers["Location"]
# test that the user was inserted into the database
with app.app_context():
assert get_db().execute(
"select * from user where username = 'a'",
).fetchone() is not None
assert (
get_db().execute("select * from user where username = 'a'").fetchone()
is not None
)
@pytest.mark.parametrize(('username', 'password', 'message'), (
('', '', b'Username is required.'),
('a', '', b'Password is required.'),
('test', 'test', b'already registered'),
))
@pytest.mark.parametrize(
("username", "password", "message"),
(
("", "", b"Username is required."),
("a", "", b"Password is required."),
("test", "test", b"already registered"),
),
)
def test_register_validate_input(client, username, password, message):
response = client.post(
'/auth/register',
data={'username': username, 'password': password}
"/auth/register", data={"username": username, "password": password}
)
assert message in response.data
def test_login(client, auth):
# test that viewing the page renders without template errors
assert client.get('/auth/login').status_code == 200
assert client.get("/auth/login").status_code == 200
# test that successful login redirects to the index page
response = auth.login()
assert response.headers['Location'] == 'http://localhost/'
assert response.headers["Location"] == "http://localhost/"
# login request set the user_id in the session
# check that the user is loaded from the session
with client:
client.get('/')
assert session['user_id'] == 1
assert g.user['username'] == 'test'
client.get("/")
assert session["user_id"] == 1
assert g.user["username"] == "test"
@pytest.mark.parametrize(('username', 'password', 'message'), (
('a', 'test', b'Incorrect username.'),
('test', 'a', b'Incorrect password.'),
))
@pytest.mark.parametrize(
("username", "password", "message"),
(("a", "test", b"Incorrect username."), ("test", "a", b"Incorrect password.")),
)
def test_login_validate_input(auth, username, password, message):
response = auth.login(username, password)
assert message in response.data
@ -63,4 +64,4 @@ def test_logout(client, auth):
with client:
auth.logout()
assert 'user_id' not in session
assert "user_id" not in session

View file

@ -3,47 +3,40 @@ from flaskr.db import get_db
def test_index(client, auth):
response = client.get('/')
response = client.get("/")
assert b"Log In" in response.data
assert b"Register" in response.data
auth.login()
response = client.get('/')
assert b'test title' in response.data
assert b'by test on 2018-01-01' in response.data
assert b'test\nbody' in response.data
response = client.get("/")
assert b"test title" in response.data
assert b"by test on 2018-01-01" in response.data
assert b"test\nbody" in response.data
assert b'href="/1/update"' in response.data
@pytest.mark.parametrize('path', (
'/create',
'/1/update',
'/1/delete',
))
@pytest.mark.parametrize("path", ("/create", "/1/update", "/1/delete"))
def test_login_required(client, path):
response = client.post(path)
assert response.headers['Location'] == 'http://localhost/auth/login'
assert response.headers["Location"] == "http://localhost/auth/login"
def test_author_required(app, client, auth):
# change the post author to another user
with app.app_context():
db = get_db()
db.execute('UPDATE post SET author_id = 2 WHERE id = 1')
db.execute("UPDATE post SET author_id = 2 WHERE id = 1")
db.commit()
auth.login()
# current user can't modify other user's post
assert client.post('/1/update').status_code == 403
assert client.post('/1/delete').status_code == 403
assert client.post("/1/update").status_code == 403
assert client.post("/1/delete").status_code == 403
# current user doesn't see edit link
assert b'href="/1/update"' not in client.get('/').data
assert b'href="/1/update"' not in client.get("/").data
@pytest.mark.parametrize('path', (
'/2/update',
'/2/delete',
))
@pytest.mark.parametrize("path", ("/2/update", "/2/delete"))
def test_exists_required(client, auth, path):
auth.login()
assert client.post(path).status_code == 404
@ -51,42 +44,39 @@ def test_exists_required(client, auth, path):
def test_create(client, auth, app):
auth.login()
assert client.get('/create').status_code == 200
client.post('/create', data={'title': 'created', 'body': ''})
assert client.get("/create").status_code == 200
client.post("/create", data={"title": "created", "body": ""})
with app.app_context():
db = get_db()
count = db.execute('SELECT COUNT(id) FROM post').fetchone()[0]
count = db.execute("SELECT COUNT(id) FROM post").fetchone()[0]
assert count == 2
def test_update(client, auth, app):
auth.login()
assert client.get('/1/update').status_code == 200
client.post('/1/update', data={'title': 'updated', 'body': ''})
assert client.get("/1/update").status_code == 200
client.post("/1/update", data={"title": "updated", "body": ""})
with app.app_context():
db = get_db()
post = db.execute('SELECT * FROM post WHERE id = 1').fetchone()
assert post['title'] == 'updated'
post = db.execute("SELECT * FROM post WHERE id = 1").fetchone()
assert post["title"] == "updated"
@pytest.mark.parametrize('path', (
'/create',
'/1/update',
))
@pytest.mark.parametrize("path", ("/create", "/1/update"))
def test_create_update_validate(client, auth, path):
auth.login()
response = client.post(path, data={'title': '', 'body': ''})
assert b'Title is required.' in response.data
response = client.post(path, data={"title": "", "body": ""})
assert b"Title is required." in response.data
def test_delete(client, auth, app):
auth.login()
response = client.post('/1/delete')
assert response.headers['Location'] == 'http://localhost/'
response = client.post("/1/delete")
assert response.headers["Location"] == "http://localhost/"
with app.app_context():
db = get_db()
post = db.execute('SELECT * FROM post WHERE id = 1').fetchone()
post = db.execute("SELECT * FROM post WHERE id = 1").fetchone()
assert post is None

View file

@ -10,9 +10,9 @@ def test_get_close_db(app):
assert db is get_db()
with pytest.raises(sqlite3.ProgrammingError) as e:
db.execute('SELECT 1')
db.execute("SELECT 1")
assert 'closed' in str(e)
assert "closed" in str(e)
def test_init_db_command(runner, monkeypatch):
@ -22,7 +22,7 @@ def test_init_db_command(runner, monkeypatch):
def fake_init_db():
Recorder.called = True
monkeypatch.setattr('flaskr.db.init_db', fake_init_db)
result = runner.invoke(args=['init-db'])
assert 'Initialized' in result.output
monkeypatch.setattr("flaskr.db.init_db", fake_init_db)
result = runner.invoke(args=["init-db"])
assert "Initialized" in result.output
assert Recorder.called

View file

@ -4,9 +4,9 @@ from flaskr import create_app
def test_config():
"""Test create_app without passing test config."""
assert not create_app().testing
assert create_app({'TESTING': True}).testing
assert create_app({"TESTING": True}).testing
def test_hello(client):
response = client.get('/hello')
assert response.data == b'Hello, World!'
response = client.get("/hello")
assert response.data == b"Hello, World!"