Update minitwit & improve testing for examples (#1954)
* Update minitwit & improve testing for examples * Related to #1945 * Re-works minitwit to be installed and run as: pip install --editable . export FLASK_APP=minitwit.minitwit export FLASK_DEBUG=1 flask initdb flask run * added flaskr and minitwit to norecursedirs * tests not properly run when using pytest standards * see: http://stackoverflow.com/questions/38313171/configuring-pytest-with-installable-examples-in-a-project * Both flaskr and minitwit now follow pytest standards. * Tests can for them as `py.test` or `python setup.py test` * Update minitwit readme * updates the instructions for running * Fixes for updating the minitwit example - This reverts the changes to the *docs/* (I will file separate PR). - Running the app is now: `export FLASK_APP=minitwit` & `flask run` (After installing the app) * Remove unnecessary comma from flaskr/setup.py
This commit is contained in:
parent
1e5746bb2b
commit
5f009374fd
17 changed files with 29 additions and 11 deletions
|
|
@ -1,256 +0,0 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
MiniTwit
|
||||
~~~~~~~~
|
||||
|
||||
A microblogging application written with Flask and sqlite3.
|
||||
|
||||
:copyright: (c) 2015 by Armin Ronacher.
|
||||
:license: BSD, see LICENSE for more details.
|
||||
"""
|
||||
|
||||
import time
|
||||
from sqlite3 import dbapi2 as sqlite3
|
||||
from hashlib import md5
|
||||
from datetime import datetime
|
||||
from flask import Flask, request, session, url_for, redirect, \
|
||||
render_template, abort, g, flash, _app_ctx_stack
|
||||
from werkzeug import check_password_hash, generate_password_hash
|
||||
|
||||
|
||||
# configuration
|
||||
DATABASE = '/tmp/minitwit.db'
|
||||
PER_PAGE = 30
|
||||
DEBUG = True
|
||||
SECRET_KEY = 'development key'
|
||||
|
||||
# create our little application :)
|
||||
app = Flask(__name__)
|
||||
app.config.from_object(__name__)
|
||||
app.config.from_envvar('MINITWIT_SETTINGS', silent=True)
|
||||
|
||||
|
||||
def get_db():
|
||||
"""Opens a new database connection if there is none yet for the
|
||||
current application context.
|
||||
"""
|
||||
top = _app_ctx_stack.top
|
||||
if not hasattr(top, 'sqlite_db'):
|
||||
top.sqlite_db = sqlite3.connect(app.config['DATABASE'])
|
||||
top.sqlite_db.row_factory = sqlite3.Row
|
||||
return top.sqlite_db
|
||||
|
||||
|
||||
@app.teardown_appcontext
|
||||
def close_database(exception):
|
||||
"""Closes the database again at the end of the request."""
|
||||
top = _app_ctx_stack.top
|
||||
if hasattr(top, 'sqlite_db'):
|
||||
top.sqlite_db.close()
|
||||
|
||||
|
||||
def init_db():
|
||||
"""Initializes the database."""
|
||||
db = get_db()
|
||||
with app.open_resource('schema.sql', mode='r') as f:
|
||||
db.cursor().executescript(f.read())
|
||||
db.commit()
|
||||
|
||||
|
||||
@app.cli.command('initdb')
|
||||
def initdb_command():
|
||||
"""Creates the database tables."""
|
||||
init_db()
|
||||
print('Initialized the database.')
|
||||
|
||||
|
||||
def query_db(query, args=(), one=False):
|
||||
"""Queries the database and returns a list of dictionaries."""
|
||||
cur = get_db().execute(query, args)
|
||||
rv = cur.fetchall()
|
||||
return (rv[0] if rv else None) if one else rv
|
||||
|
||||
|
||||
def get_user_id(username):
|
||||
"""Convenience method to look up the id for a username."""
|
||||
rv = query_db('select user_id from user where username = ?',
|
||||
[username], one=True)
|
||||
return rv[0] if rv else None
|
||||
|
||||
|
||||
def format_datetime(timestamp):
|
||||
"""Format a timestamp for display."""
|
||||
return datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%d @ %H:%M')
|
||||
|
||||
|
||||
def gravatar_url(email, size=80):
|
||||
"""Return the gravatar image for the given email address."""
|
||||
return 'http://www.gravatar.com/avatar/%s?d=identicon&s=%d' % \
|
||||
(md5(email.strip().lower().encode('utf-8')).hexdigest(), size)
|
||||
|
||||
|
||||
@app.before_request
|
||||
def before_request():
|
||||
g.user = None
|
||||
if 'user_id' in session:
|
||||
g.user = query_db('select * from user where user_id = ?',
|
||||
[session['user_id']], one=True)
|
||||
|
||||
|
||||
@app.route('/')
|
||||
def timeline():
|
||||
"""Shows a users timeline or if no user is logged in it will
|
||||
redirect to the public timeline. This timeline shows the user's
|
||||
messages as well as all the messages of followed users.
|
||||
"""
|
||||
if not g.user:
|
||||
return redirect(url_for('public_timeline'))
|
||||
return render_template('timeline.html', messages=query_db('''
|
||||
select message.*, user.* from message, user
|
||||
where message.author_id = user.user_id and (
|
||||
user.user_id = ? or
|
||||
user.user_id in (select whom_id from follower
|
||||
where who_id = ?))
|
||||
order by message.pub_date desc limit ?''',
|
||||
[session['user_id'], session['user_id'], PER_PAGE]))
|
||||
|
||||
|
||||
@app.route('/public')
|
||||
def public_timeline():
|
||||
"""Displays the latest messages of all users."""
|
||||
return render_template('timeline.html', messages=query_db('''
|
||||
select message.*, user.* from message, user
|
||||
where message.author_id = user.user_id
|
||||
order by message.pub_date desc limit ?''', [PER_PAGE]))
|
||||
|
||||
|
||||
@app.route('/<username>')
|
||||
def user_timeline(username):
|
||||
"""Display's a users tweets."""
|
||||
profile_user = query_db('select * from user where username = ?',
|
||||
[username], one=True)
|
||||
if profile_user is None:
|
||||
abort(404)
|
||||
followed = False
|
||||
if g.user:
|
||||
followed = query_db('''select 1 from follower where
|
||||
follower.who_id = ? and follower.whom_id = ?''',
|
||||
[session['user_id'], profile_user['user_id']],
|
||||
one=True) is not None
|
||||
return render_template('timeline.html', messages=query_db('''
|
||||
select message.*, user.* from message, user where
|
||||
user.user_id = message.author_id and user.user_id = ?
|
||||
order by message.pub_date desc limit ?''',
|
||||
[profile_user['user_id'], PER_PAGE]), followed=followed,
|
||||
profile_user=profile_user)
|
||||
|
||||
|
||||
@app.route('/<username>/follow')
|
||||
def follow_user(username):
|
||||
"""Adds the current user as follower of the given user."""
|
||||
if not g.user:
|
||||
abort(401)
|
||||
whom_id = get_user_id(username)
|
||||
if whom_id is None:
|
||||
abort(404)
|
||||
db = get_db()
|
||||
db.execute('insert into follower (who_id, whom_id) values (?, ?)',
|
||||
[session['user_id'], whom_id])
|
||||
db.commit()
|
||||
flash('You are now following "%s"' % username)
|
||||
return redirect(url_for('user_timeline', username=username))
|
||||
|
||||
|
||||
@app.route('/<username>/unfollow')
|
||||
def unfollow_user(username):
|
||||
"""Removes the current user as follower of the given user."""
|
||||
if not g.user:
|
||||
abort(401)
|
||||
whom_id = get_user_id(username)
|
||||
if whom_id is None:
|
||||
abort(404)
|
||||
db = get_db()
|
||||
db.execute('delete from follower where who_id=? and whom_id=?',
|
||||
[session['user_id'], whom_id])
|
||||
db.commit()
|
||||
flash('You are no longer following "%s"' % username)
|
||||
return redirect(url_for('user_timeline', username=username))
|
||||
|
||||
|
||||
@app.route('/add_message', methods=['POST'])
|
||||
def add_message():
|
||||
"""Registers a new message for the user."""
|
||||
if 'user_id' not in session:
|
||||
abort(401)
|
||||
if request.form['text']:
|
||||
db = get_db()
|
||||
db.execute('''insert into message (author_id, text, pub_date)
|
||||
values (?, ?, ?)''', (session['user_id'], request.form['text'],
|
||||
int(time.time())))
|
||||
db.commit()
|
||||
flash('Your message was recorded')
|
||||
return redirect(url_for('timeline'))
|
||||
|
||||
|
||||
@app.route('/login', methods=['GET', 'POST'])
|
||||
def login():
|
||||
"""Logs the user in."""
|
||||
if g.user:
|
||||
return redirect(url_for('timeline'))
|
||||
error = None
|
||||
if request.method == 'POST':
|
||||
user = query_db('''select * from user where
|
||||
username = ?''', [request.form['username']], one=True)
|
||||
if user is None:
|
||||
error = 'Invalid username'
|
||||
elif not check_password_hash(user['pw_hash'],
|
||||
request.form['password']):
|
||||
error = 'Invalid password'
|
||||
else:
|
||||
flash('You were logged in')
|
||||
session['user_id'] = user['user_id']
|
||||
return redirect(url_for('timeline'))
|
||||
return render_template('login.html', error=error)
|
||||
|
||||
|
||||
@app.route('/register', methods=['GET', 'POST'])
|
||||
def register():
|
||||
"""Registers the user."""
|
||||
if g.user:
|
||||
return redirect(url_for('timeline'))
|
||||
error = None
|
||||
if request.method == 'POST':
|
||||
if not request.form['username']:
|
||||
error = 'You have to enter a username'
|
||||
elif not request.form['email'] or \
|
||||
'@' not in request.form['email']:
|
||||
error = 'You have to enter a valid email address'
|
||||
elif not request.form['password']:
|
||||
error = 'You have to enter a password'
|
||||
elif request.form['password'] != request.form['password2']:
|
||||
error = 'The two passwords do not match'
|
||||
elif get_user_id(request.form['username']) is not None:
|
||||
error = 'The username is already taken'
|
||||
else:
|
||||
db = get_db()
|
||||
db.execute('''insert into user (
|
||||
username, email, pw_hash) values (?, ?, ?)''',
|
||||
[request.form['username'], request.form['email'],
|
||||
generate_password_hash(request.form['password'])])
|
||||
db.commit()
|
||||
flash('You were successfully registered and can login now')
|
||||
return redirect(url_for('login'))
|
||||
return render_template('register.html', error=error)
|
||||
|
||||
|
||||
@app.route('/logout')
|
||||
def logout():
|
||||
"""Logs the user out."""
|
||||
flash('You were logged out')
|
||||
session.pop('user_id', None)
|
||||
return redirect(url_for('public_timeline'))
|
||||
|
||||
|
||||
# add some filters to jinja
|
||||
app.jinja_env.filters['datetimeformat'] = format_datetime
|
||||
app.jinja_env.filters['gravatar'] = gravatar_url
|
||||
Loading…
Add table
Add a link
Reference in a new issue