forked from orbit-oss/flask
This takes a popular API whereby instead of passing the HTTP method as
an argument to route it is instead used as the method name i.e.
@app.route("/", methods=["POST"])
is now writeable as,
@app.post("/")
This is simply syntatic sugar, it doesn't do anything else, but makes
it slightly easier for users.
I've included all the methods that are relevant and aren't auto
generated i.e. not connect, head, options, and trace.
2029 lines
55 KiB
Python
2029 lines
55 KiB
Python
import gc
|
|
import re
|
|
import sys
|
|
import time
|
|
import uuid
|
|
import weakref
|
|
from datetime import datetime
|
|
from platform import python_implementation
|
|
from threading import Thread
|
|
|
|
import pytest
|
|
import werkzeug.serving
|
|
from werkzeug.exceptions import BadRequest
|
|
from werkzeug.exceptions import Forbidden
|
|
from werkzeug.exceptions import NotFound
|
|
from werkzeug.http import parse_date
|
|
from werkzeug.routing import BuildError
|
|
|
|
import flask
|
|
|
|
|
|
require_cpython_gc = pytest.mark.skipif(
|
|
python_implementation() != "CPython",
|
|
reason="Requires CPython GC behavior",
|
|
)
|
|
|
|
|
|
def test_options_work(app, client):
|
|
@app.route("/", methods=["GET", "POST"])
|
|
def index():
|
|
return "Hello World"
|
|
|
|
rv = client.open("/", method="OPTIONS")
|
|
assert sorted(rv.allow) == ["GET", "HEAD", "OPTIONS", "POST"]
|
|
assert rv.data == b""
|
|
|
|
|
|
def test_options_on_multiple_rules(app, client):
|
|
@app.route("/", methods=["GET", "POST"])
|
|
def index():
|
|
return "Hello World"
|
|
|
|
@app.route("/", methods=["PUT"])
|
|
def index_put():
|
|
return "Aha!"
|
|
|
|
rv = client.open("/", method="OPTIONS")
|
|
assert sorted(rv.allow) == ["GET", "HEAD", "OPTIONS", "POST", "PUT"]
|
|
|
|
|
|
@pytest.mark.parametrize("method", ["get", "post", "put", "delete", "patch"])
|
|
def test_method_route(app, client, method):
|
|
method_route = getattr(app, method)
|
|
client_method = getattr(client, method)
|
|
|
|
@method_route("/")
|
|
def hello():
|
|
return "Hello"
|
|
|
|
assert client_method("/").data == b"Hello"
|
|
|
|
|
|
def test_method_route_no_methods(app):
|
|
with pytest.raises(TypeError):
|
|
app.get("/", methods=["GET", "POST"])
|
|
|
|
|
|
def test_provide_automatic_options_attr():
|
|
app = flask.Flask(__name__)
|
|
|
|
def index():
|
|
return "Hello World!"
|
|
|
|
index.provide_automatic_options = False
|
|
app.route("/")(index)
|
|
rv = app.test_client().open("/", method="OPTIONS")
|
|
assert rv.status_code == 405
|
|
|
|
app = flask.Flask(__name__)
|
|
|
|
def index2():
|
|
return "Hello World!"
|
|
|
|
index2.provide_automatic_options = True
|
|
app.route("/", methods=["OPTIONS"])(index2)
|
|
rv = app.test_client().open("/", method="OPTIONS")
|
|
assert sorted(rv.allow) == ["OPTIONS"]
|
|
|
|
|
|
def test_provide_automatic_options_kwarg(app, client):
|
|
def index():
|
|
return flask.request.method
|
|
|
|
def more():
|
|
return flask.request.method
|
|
|
|
app.add_url_rule("/", view_func=index, provide_automatic_options=False)
|
|
app.add_url_rule(
|
|
"/more",
|
|
view_func=more,
|
|
methods=["GET", "POST"],
|
|
provide_automatic_options=False,
|
|
)
|
|
assert client.get("/").data == b"GET"
|
|
|
|
rv = client.post("/")
|
|
assert rv.status_code == 405
|
|
assert sorted(rv.allow) == ["GET", "HEAD"]
|
|
|
|
rv = client.open("/", method="OPTIONS")
|
|
assert rv.status_code == 405
|
|
|
|
rv = client.head("/")
|
|
assert rv.status_code == 200
|
|
assert not rv.data # head truncates
|
|
assert client.post("/more").data == b"POST"
|
|
assert client.get("/more").data == b"GET"
|
|
|
|
rv = client.delete("/more")
|
|
assert rv.status_code == 405
|
|
assert sorted(rv.allow) == ["GET", "HEAD", "POST"]
|
|
|
|
rv = client.open("/more", method="OPTIONS")
|
|
assert rv.status_code == 405
|
|
|
|
|
|
def test_request_dispatching(app, client):
|
|
@app.route("/")
|
|
def index():
|
|
return flask.request.method
|
|
|
|
@app.route("/more", methods=["GET", "POST"])
|
|
def more():
|
|
return flask.request.method
|
|
|
|
assert client.get("/").data == b"GET"
|
|
rv = client.post("/")
|
|
assert rv.status_code == 405
|
|
assert sorted(rv.allow) == ["GET", "HEAD", "OPTIONS"]
|
|
rv = client.head("/")
|
|
assert rv.status_code == 200
|
|
assert not rv.data # head truncates
|
|
assert client.post("/more").data == b"POST"
|
|
assert client.get("/more").data == b"GET"
|
|
rv = client.delete("/more")
|
|
assert rv.status_code == 405
|
|
assert sorted(rv.allow) == ["GET", "HEAD", "OPTIONS", "POST"]
|
|
|
|
|
|
def test_disallow_string_for_allowed_methods(app):
|
|
with pytest.raises(TypeError):
|
|
|
|
@app.route("/", methods="GET POST")
|
|
def index():
|
|
return "Hey"
|
|
|
|
|
|
def test_url_mapping(app, client):
|
|
random_uuid4 = "7eb41166-9ebf-4d26-b771-ea3f54f8b383"
|
|
|
|
def index():
|
|
return flask.request.method
|
|
|
|
def more():
|
|
return flask.request.method
|
|
|
|
def options():
|
|
return random_uuid4
|
|
|
|
app.add_url_rule("/", "index", index)
|
|
app.add_url_rule("/more", "more", more, methods=["GET", "POST"])
|
|
|
|
# Issue 1288: Test that automatic options are not added
|
|
# when non-uppercase 'options' in methods
|
|
app.add_url_rule("/options", "options", options, methods=["options"])
|
|
|
|
assert client.get("/").data == b"GET"
|
|
rv = client.post("/")
|
|
assert rv.status_code == 405
|
|
assert sorted(rv.allow) == ["GET", "HEAD", "OPTIONS"]
|
|
rv = client.head("/")
|
|
assert rv.status_code == 200
|
|
assert not rv.data # head truncates
|
|
assert client.post("/more").data == b"POST"
|
|
assert client.get("/more").data == b"GET"
|
|
rv = client.delete("/more")
|
|
assert rv.status_code == 405
|
|
assert sorted(rv.allow) == ["GET", "HEAD", "OPTIONS", "POST"]
|
|
rv = client.open("/options", method="OPTIONS")
|
|
assert rv.status_code == 200
|
|
assert random_uuid4 in rv.data.decode("utf-8")
|
|
|
|
|
|
def test_werkzeug_routing(app, client):
|
|
from werkzeug.routing import Submount, Rule
|
|
|
|
app.url_map.add(
|
|
Submount("/foo", [Rule("/bar", endpoint="bar"), Rule("/", endpoint="index")])
|
|
)
|
|
|
|
def bar():
|
|
return "bar"
|
|
|
|
def index():
|
|
return "index"
|
|
|
|
app.view_functions["bar"] = bar
|
|
app.view_functions["index"] = index
|
|
|
|
assert client.get("/foo/").data == b"index"
|
|
assert client.get("/foo/bar").data == b"bar"
|
|
|
|
|
|
def test_endpoint_decorator(app, client):
|
|
from werkzeug.routing import Submount, Rule
|
|
|
|
app.url_map.add(
|
|
Submount("/foo", [Rule("/bar", endpoint="bar"), Rule("/", endpoint="index")])
|
|
)
|
|
|
|
@app.endpoint("bar")
|
|
def bar():
|
|
return "bar"
|
|
|
|
@app.endpoint("index")
|
|
def index():
|
|
return "index"
|
|
|
|
assert client.get("/foo/").data == b"index"
|
|
assert client.get("/foo/bar").data == b"bar"
|
|
|
|
|
|
def test_session(app, client):
|
|
@app.route("/set", methods=["POST"])
|
|
def set():
|
|
assert not flask.session.accessed
|
|
assert not flask.session.modified
|
|
flask.session["value"] = flask.request.form["value"]
|
|
assert flask.session.accessed
|
|
assert flask.session.modified
|
|
return "value set"
|
|
|
|
@app.route("/get")
|
|
def get():
|
|
assert not flask.session.accessed
|
|
assert not flask.session.modified
|
|
v = flask.session.get("value", "None")
|
|
assert flask.session.accessed
|
|
assert not flask.session.modified
|
|
return v
|
|
|
|
assert client.post("/set", data={"value": "42"}).data == b"value set"
|
|
assert client.get("/get").data == b"42"
|
|
|
|
|
|
def test_session_using_server_name(app, client):
|
|
app.config.update(SERVER_NAME="example.com")
|
|
|
|
@app.route("/")
|
|
def index():
|
|
flask.session["testing"] = 42
|
|
return "Hello World"
|
|
|
|
rv = client.get("/", "http://example.com/")
|
|
assert "domain=.example.com" in rv.headers["set-cookie"].lower()
|
|
assert "httponly" in rv.headers["set-cookie"].lower()
|
|
|
|
|
|
def test_session_using_server_name_and_port(app, client):
|
|
app.config.update(SERVER_NAME="example.com:8080")
|
|
|
|
@app.route("/")
|
|
def index():
|
|
flask.session["testing"] = 42
|
|
return "Hello World"
|
|
|
|
rv = client.get("/", "http://example.com:8080/")
|
|
assert "domain=.example.com" in rv.headers["set-cookie"].lower()
|
|
assert "httponly" in rv.headers["set-cookie"].lower()
|
|
|
|
|
|
def test_session_using_server_name_port_and_path(app, client):
|
|
app.config.update(SERVER_NAME="example.com:8080", APPLICATION_ROOT="/foo")
|
|
|
|
@app.route("/")
|
|
def index():
|
|
flask.session["testing"] = 42
|
|
return "Hello World"
|
|
|
|
rv = client.get("/", "http://example.com:8080/foo")
|
|
assert "domain=example.com" in rv.headers["set-cookie"].lower()
|
|
assert "path=/foo" in rv.headers["set-cookie"].lower()
|
|
assert "httponly" in rv.headers["set-cookie"].lower()
|
|
|
|
|
|
def test_session_using_application_root(app, client):
|
|
class PrefixPathMiddleware:
|
|
def __init__(self, app, prefix):
|
|
self.app = app
|
|
self.prefix = prefix
|
|
|
|
def __call__(self, environ, start_response):
|
|
environ["SCRIPT_NAME"] = self.prefix
|
|
return self.app(environ, start_response)
|
|
|
|
app.wsgi_app = PrefixPathMiddleware(app.wsgi_app, "/bar")
|
|
app.config.update(APPLICATION_ROOT="/bar")
|
|
|
|
@app.route("/")
|
|
def index():
|
|
flask.session["testing"] = 42
|
|
return "Hello World"
|
|
|
|
rv = client.get("/", "http://example.com:8080/")
|
|
assert "path=/bar" in rv.headers["set-cookie"].lower()
|
|
|
|
|
|
def test_session_using_session_settings(app, client):
|
|
app.config.update(
|
|
SERVER_NAME="www.example.com:8080",
|
|
APPLICATION_ROOT="/test",
|
|
SESSION_COOKIE_DOMAIN=".example.com",
|
|
SESSION_COOKIE_HTTPONLY=False,
|
|
SESSION_COOKIE_SECURE=True,
|
|
SESSION_COOKIE_SAMESITE="Lax",
|
|
SESSION_COOKIE_PATH="/",
|
|
)
|
|
|
|
@app.route("/")
|
|
def index():
|
|
flask.session["testing"] = 42
|
|
return "Hello World"
|
|
|
|
rv = client.get("/", "http://www.example.com:8080/test/")
|
|
cookie = rv.headers["set-cookie"].lower()
|
|
assert "domain=.example.com" in cookie
|
|
assert "path=/" in cookie
|
|
assert "secure" in cookie
|
|
assert "httponly" not in cookie
|
|
assert "samesite" in cookie
|
|
|
|
@app.route("/clear")
|
|
def clear():
|
|
flask.session.pop("testing", None)
|
|
return "Goodbye World"
|
|
|
|
rv = client.get("/clear", "http://www.example.com:8080/test/")
|
|
cookie = rv.headers["set-cookie"].lower()
|
|
assert "session=;" in cookie
|
|
assert "domain=.example.com" in cookie
|
|
assert "path=/" in cookie
|
|
assert "secure" in cookie
|
|
assert "samesite" in cookie
|
|
|
|
|
|
def test_session_using_samesite_attribute(app, client):
|
|
@app.route("/")
|
|
def index():
|
|
flask.session["testing"] = 42
|
|
return "Hello World"
|
|
|
|
app.config.update(SESSION_COOKIE_SAMESITE="invalid")
|
|
|
|
with pytest.raises(ValueError):
|
|
client.get("/")
|
|
|
|
app.config.update(SESSION_COOKIE_SAMESITE=None)
|
|
rv = client.get("/")
|
|
cookie = rv.headers["set-cookie"].lower()
|
|
assert "samesite" not in cookie
|
|
|
|
app.config.update(SESSION_COOKIE_SAMESITE="Strict")
|
|
rv = client.get("/")
|
|
cookie = rv.headers["set-cookie"].lower()
|
|
assert "samesite=strict" in cookie
|
|
|
|
app.config.update(SESSION_COOKIE_SAMESITE="Lax")
|
|
rv = client.get("/")
|
|
cookie = rv.headers["set-cookie"].lower()
|
|
assert "samesite=lax" in cookie
|
|
|
|
|
|
def test_session_localhost_warning(recwarn, app, client):
|
|
app.config.update(SERVER_NAME="localhost:5000")
|
|
|
|
@app.route("/")
|
|
def index():
|
|
flask.session["testing"] = 42
|
|
return "testing"
|
|
|
|
rv = client.get("/", "http://localhost:5000/")
|
|
assert "domain" not in rv.headers["set-cookie"].lower()
|
|
w = recwarn.pop(UserWarning)
|
|
assert "'localhost' is not a valid cookie domain" in str(w.message)
|
|
|
|
|
|
def test_session_ip_warning(recwarn, app, client):
|
|
app.config.update(SERVER_NAME="127.0.0.1:5000")
|
|
|
|
@app.route("/")
|
|
def index():
|
|
flask.session["testing"] = 42
|
|
return "testing"
|
|
|
|
rv = client.get("/", "http://127.0.0.1:5000/")
|
|
assert "domain=127.0.0.1" in rv.headers["set-cookie"].lower()
|
|
w = recwarn.pop(UserWarning)
|
|
assert "cookie domain is an IP" in str(w.message)
|
|
|
|
|
|
def test_missing_session(app):
|
|
app.secret_key = None
|
|
|
|
def expect_exception(f, *args, **kwargs):
|
|
e = pytest.raises(RuntimeError, f, *args, **kwargs)
|
|
assert e.value.args and "session is unavailable" in e.value.args[0]
|
|
|
|
with app.test_request_context():
|
|
assert flask.session.get("missing_key") is None
|
|
expect_exception(flask.session.__setitem__, "foo", 42)
|
|
expect_exception(flask.session.pop, "foo")
|
|
|
|
|
|
def test_session_expiration(app, client):
|
|
permanent = True
|
|
|
|
@app.route("/")
|
|
def index():
|
|
flask.session["test"] = 42
|
|
flask.session.permanent = permanent
|
|
return ""
|
|
|
|
@app.route("/test")
|
|
def test():
|
|
return str(flask.session.permanent)
|
|
|
|
rv = client.get("/")
|
|
assert "set-cookie" in rv.headers
|
|
match = re.search(r"(?i)\bexpires=([^;]+)", rv.headers["set-cookie"])
|
|
expires = parse_date(match.group())
|
|
expected = datetime.utcnow() + app.permanent_session_lifetime
|
|
assert expires.year == expected.year
|
|
assert expires.month == expected.month
|
|
assert expires.day == expected.day
|
|
|
|
rv = client.get("/test")
|
|
assert rv.data == b"True"
|
|
|
|
permanent = False
|
|
rv = client.get("/")
|
|
assert "set-cookie" in rv.headers
|
|
match = re.search(r"\bexpires=([^;]+)", rv.headers["set-cookie"])
|
|
assert match is None
|
|
|
|
|
|
def test_session_stored_last(app, client):
|
|
@app.after_request
|
|
def modify_session(response):
|
|
flask.session["foo"] = 42
|
|
return response
|
|
|
|
@app.route("/")
|
|
def dump_session_contents():
|
|
return repr(flask.session.get("foo"))
|
|
|
|
assert client.get("/").data == b"None"
|
|
assert client.get("/").data == b"42"
|
|
|
|
|
|
def test_session_special_types(app, client):
|
|
now = datetime.utcnow().replace(microsecond=0)
|
|
the_uuid = uuid.uuid4()
|
|
|
|
@app.route("/")
|
|
def dump_session_contents():
|
|
flask.session["t"] = (1, 2, 3)
|
|
flask.session["b"] = b"\xff"
|
|
flask.session["m"] = flask.Markup("<html>")
|
|
flask.session["u"] = the_uuid
|
|
flask.session["d"] = now
|
|
flask.session["t_tag"] = {" t": "not-a-tuple"}
|
|
flask.session["di_t_tag"] = {" t__": "not-a-tuple"}
|
|
flask.session["di_tag"] = {" di": "not-a-dict"}
|
|
return "", 204
|
|
|
|
with client:
|
|
client.get("/")
|
|
s = flask.session
|
|
assert s["t"] == (1, 2, 3)
|
|
assert type(s["b"]) == bytes
|
|
assert s["b"] == b"\xff"
|
|
assert type(s["m"]) == flask.Markup
|
|
assert s["m"] == flask.Markup("<html>")
|
|
assert s["u"] == the_uuid
|
|
assert s["d"] == now
|
|
assert s["t_tag"] == {" t": "not-a-tuple"}
|
|
assert s["di_t_tag"] == {" t__": "not-a-tuple"}
|
|
assert s["di_tag"] == {" di": "not-a-dict"}
|
|
|
|
|
|
def test_session_cookie_setting(app):
|
|
is_permanent = True
|
|
|
|
@app.route("/bump")
|
|
def bump():
|
|
rv = flask.session["foo"] = flask.session.get("foo", 0) + 1
|
|
flask.session.permanent = is_permanent
|
|
return str(rv)
|
|
|
|
@app.route("/read")
|
|
def read():
|
|
return str(flask.session.get("foo", 0))
|
|
|
|
def run_test(expect_header):
|
|
with app.test_client() as c:
|
|
assert c.get("/bump").data == b"1"
|
|
assert c.get("/bump").data == b"2"
|
|
assert c.get("/bump").data == b"3"
|
|
|
|
rv = c.get("/read")
|
|
set_cookie = rv.headers.get("set-cookie")
|
|
assert (set_cookie is not None) == expect_header
|
|
assert rv.data == b"3"
|
|
|
|
is_permanent = True
|
|
app.config["SESSION_REFRESH_EACH_REQUEST"] = True
|
|
run_test(expect_header=True)
|
|
|
|
is_permanent = True
|
|
app.config["SESSION_REFRESH_EACH_REQUEST"] = False
|
|
run_test(expect_header=False)
|
|
|
|
is_permanent = False
|
|
app.config["SESSION_REFRESH_EACH_REQUEST"] = True
|
|
run_test(expect_header=False)
|
|
|
|
is_permanent = False
|
|
app.config["SESSION_REFRESH_EACH_REQUEST"] = False
|
|
run_test(expect_header=False)
|
|
|
|
|
|
def test_session_vary_cookie(app, client):
|
|
@app.route("/set")
|
|
def set_session():
|
|
flask.session["test"] = "test"
|
|
return ""
|
|
|
|
@app.route("/get")
|
|
def get():
|
|
return flask.session.get("test")
|
|
|
|
@app.route("/getitem")
|
|
def getitem():
|
|
return flask.session["test"]
|
|
|
|
@app.route("/setdefault")
|
|
def setdefault():
|
|
return flask.session.setdefault("test", "default")
|
|
|
|
@app.route("/vary-cookie-header-set")
|
|
def vary_cookie_header_set():
|
|
response = flask.Response()
|
|
response.vary.add("Cookie")
|
|
flask.session["test"] = "test"
|
|
return response
|
|
|
|
@app.route("/vary-header-set")
|
|
def vary_header_set():
|
|
response = flask.Response()
|
|
response.vary.update(("Accept-Encoding", "Accept-Language"))
|
|
flask.session["test"] = "test"
|
|
return response
|
|
|
|
@app.route("/no-vary-header")
|
|
def no_vary_header():
|
|
return ""
|
|
|
|
def expect(path, header_value="Cookie"):
|
|
rv = client.get(path)
|
|
|
|
if header_value:
|
|
# The 'Vary' key should exist in the headers only once.
|
|
assert len(rv.headers.get_all("Vary")) == 1
|
|
assert rv.headers["Vary"] == header_value
|
|
else:
|
|
assert "Vary" not in rv.headers
|
|
|
|
expect("/set")
|
|
expect("/get")
|
|
expect("/getitem")
|
|
expect("/setdefault")
|
|
expect("/vary-cookie-header-set")
|
|
expect("/vary-header-set", "Accept-Encoding, Accept-Language, Cookie")
|
|
expect("/no-vary-header", None)
|
|
|
|
|
|
def test_flashes(app, req_ctx):
|
|
assert not flask.session.modified
|
|
flask.flash("Zap")
|
|
flask.session.modified = False
|
|
flask.flash("Zip")
|
|
assert flask.session.modified
|
|
assert list(flask.get_flashed_messages()) == ["Zap", "Zip"]
|
|
|
|
|
|
def test_extended_flashing(app):
|
|
# Be sure app.testing=True below, else tests can fail silently.
|
|
#
|
|
# Specifically, if app.testing is not set to True, the AssertionErrors
|
|
# in the view functions will cause a 500 response to the test client
|
|
# instead of propagating exceptions.
|
|
|
|
@app.route("/")
|
|
def index():
|
|
flask.flash("Hello World")
|
|
flask.flash("Hello World", "error")
|
|
flask.flash(flask.Markup("<em>Testing</em>"), "warning")
|
|
return ""
|
|
|
|
@app.route("/test/")
|
|
def test():
|
|
messages = flask.get_flashed_messages()
|
|
assert list(messages) == [
|
|
"Hello World",
|
|
"Hello World",
|
|
flask.Markup("<em>Testing</em>"),
|
|
]
|
|
return ""
|
|
|
|
@app.route("/test_with_categories/")
|
|
def test_with_categories():
|
|
messages = flask.get_flashed_messages(with_categories=True)
|
|
assert len(messages) == 3
|
|
assert list(messages) == [
|
|
("message", "Hello World"),
|
|
("error", "Hello World"),
|
|
("warning", flask.Markup("<em>Testing</em>")),
|
|
]
|
|
return ""
|
|
|
|
@app.route("/test_filter/")
|
|
def test_filter():
|
|
messages = flask.get_flashed_messages(
|
|
category_filter=["message"], with_categories=True
|
|
)
|
|
assert list(messages) == [("message", "Hello World")]
|
|
return ""
|
|
|
|
@app.route("/test_filters/")
|
|
def test_filters():
|
|
messages = flask.get_flashed_messages(
|
|
category_filter=["message", "warning"], with_categories=True
|
|
)
|
|
assert list(messages) == [
|
|
("message", "Hello World"),
|
|
("warning", flask.Markup("<em>Testing</em>")),
|
|
]
|
|
return ""
|
|
|
|
@app.route("/test_filters_without_returning_categories/")
|
|
def test_filters2():
|
|
messages = flask.get_flashed_messages(category_filter=["message", "warning"])
|
|
assert len(messages) == 2
|
|
assert messages[0] == "Hello World"
|
|
assert messages[1] == flask.Markup("<em>Testing</em>")
|
|
return ""
|
|
|
|
# Create new test client on each test to clean flashed messages.
|
|
|
|
client = app.test_client()
|
|
client.get("/")
|
|
client.get("/test_with_categories/")
|
|
|
|
client = app.test_client()
|
|
client.get("/")
|
|
client.get("/test_filter/")
|
|
|
|
client = app.test_client()
|
|
client.get("/")
|
|
client.get("/test_filters/")
|
|
|
|
client = app.test_client()
|
|
client.get("/")
|
|
client.get("/test_filters_without_returning_categories/")
|
|
|
|
|
|
def test_request_processing(app, client):
|
|
evts = []
|
|
|
|
@app.before_request
|
|
def before_request():
|
|
evts.append("before")
|
|
|
|
@app.after_request
|
|
def after_request(response):
|
|
response.data += b"|after"
|
|
evts.append("after")
|
|
return response
|
|
|
|
@app.route("/")
|
|
def index():
|
|
assert "before" in evts
|
|
assert "after" not in evts
|
|
return "request"
|
|
|
|
assert "after" not in evts
|
|
rv = client.get("/").data
|
|
assert "after" in evts
|
|
assert rv == b"request|after"
|
|
|
|
|
|
def test_request_preprocessing_early_return(app, client):
|
|
evts = []
|
|
|
|
@app.before_request
|
|
def before_request1():
|
|
evts.append(1)
|
|
|
|
@app.before_request
|
|
def before_request2():
|
|
evts.append(2)
|
|
return "hello"
|
|
|
|
@app.before_request
|
|
def before_request3():
|
|
evts.append(3)
|
|
return "bye"
|
|
|
|
@app.route("/")
|
|
def index():
|
|
evts.append("index")
|
|
return "damnit"
|
|
|
|
rv = client.get("/").data.strip()
|
|
assert rv == b"hello"
|
|
assert evts == [1, 2]
|
|
|
|
|
|
def test_after_request_processing(app, client):
|
|
@app.route("/")
|
|
def index():
|
|
@flask.after_this_request
|
|
def foo(response):
|
|
response.headers["X-Foo"] = "a header"
|
|
return response
|
|
|
|
return "Test"
|
|
|
|
resp = client.get("/")
|
|
assert resp.status_code == 200
|
|
assert resp.headers["X-Foo"] == "a header"
|
|
|
|
|
|
def test_teardown_request_handler(app, client):
|
|
called = []
|
|
|
|
@app.teardown_request
|
|
def teardown_request(exc):
|
|
called.append(True)
|
|
return "Ignored"
|
|
|
|
@app.route("/")
|
|
def root():
|
|
return "Response"
|
|
|
|
rv = client.get("/")
|
|
assert rv.status_code == 200
|
|
assert b"Response" in rv.data
|
|
assert len(called) == 1
|
|
|
|
|
|
def test_teardown_request_handler_debug_mode(app, client):
|
|
called = []
|
|
|
|
@app.teardown_request
|
|
def teardown_request(exc):
|
|
called.append(True)
|
|
return "Ignored"
|
|
|
|
@app.route("/")
|
|
def root():
|
|
return "Response"
|
|
|
|
rv = client.get("/")
|
|
assert rv.status_code == 200
|
|
assert b"Response" in rv.data
|
|
assert len(called) == 1
|
|
|
|
|
|
def test_teardown_request_handler_error(app, client):
|
|
called = []
|
|
app.testing = False
|
|
|
|
@app.teardown_request
|
|
def teardown_request1(exc):
|
|
assert type(exc) == ZeroDivisionError
|
|
called.append(True)
|
|
# This raises a new error and blows away sys.exc_info(), so we can
|
|
# test that all teardown_requests get passed the same original
|
|
# exception.
|
|
try:
|
|
raise TypeError()
|
|
except Exception:
|
|
pass
|
|
|
|
@app.teardown_request
|
|
def teardown_request2(exc):
|
|
assert type(exc) == ZeroDivisionError
|
|
called.append(True)
|
|
# This raises a new error and blows away sys.exc_info(), so we can
|
|
# test that all teardown_requests get passed the same original
|
|
# exception.
|
|
try:
|
|
raise TypeError()
|
|
except Exception:
|
|
pass
|
|
|
|
@app.route("/")
|
|
def fails():
|
|
1 // 0
|
|
|
|
rv = client.get("/")
|
|
assert rv.status_code == 500
|
|
assert b"Internal Server Error" in rv.data
|
|
assert len(called) == 2
|
|
|
|
|
|
def test_before_after_request_order(app, client):
|
|
called = []
|
|
|
|
@app.before_request
|
|
def before1():
|
|
called.append(1)
|
|
|
|
@app.before_request
|
|
def before2():
|
|
called.append(2)
|
|
|
|
@app.after_request
|
|
def after1(response):
|
|
called.append(4)
|
|
return response
|
|
|
|
@app.after_request
|
|
def after2(response):
|
|
called.append(3)
|
|
return response
|
|
|
|
@app.teardown_request
|
|
def finish1(exc):
|
|
called.append(6)
|
|
|
|
@app.teardown_request
|
|
def finish2(exc):
|
|
called.append(5)
|
|
|
|
@app.route("/")
|
|
def index():
|
|
return "42"
|
|
|
|
rv = client.get("/")
|
|
assert rv.data == b"42"
|
|
assert called == [1, 2, 3, 4, 5, 6]
|
|
|
|
|
|
def test_error_handling(app, client):
|
|
app.testing = False
|
|
|
|
@app.errorhandler(404)
|
|
def not_found(e):
|
|
return "not found", 404
|
|
|
|
@app.errorhandler(500)
|
|
def internal_server_error(e):
|
|
return "internal server error", 500
|
|
|
|
@app.errorhandler(Forbidden)
|
|
def forbidden(e):
|
|
return "forbidden", 403
|
|
|
|
@app.route("/")
|
|
def index():
|
|
flask.abort(404)
|
|
|
|
@app.route("/error")
|
|
def error():
|
|
1 // 0
|
|
|
|
@app.route("/forbidden")
|
|
def error2():
|
|
flask.abort(403)
|
|
|
|
rv = client.get("/")
|
|
assert rv.status_code == 404
|
|
assert rv.data == b"not found"
|
|
rv = client.get("/error")
|
|
assert rv.status_code == 500
|
|
assert b"internal server error" == rv.data
|
|
rv = client.get("/forbidden")
|
|
assert rv.status_code == 403
|
|
assert b"forbidden" == rv.data
|
|
|
|
|
|
def test_error_handler_unknown_code(app):
|
|
with pytest.raises(KeyError) as exc_info:
|
|
app.register_error_handler(999, lambda e: ("999", 999))
|
|
|
|
assert "Use a subclass" in exc_info.value.args[0]
|
|
|
|
|
|
def test_error_handling_processing(app, client):
|
|
app.testing = False
|
|
|
|
@app.errorhandler(500)
|
|
def internal_server_error(e):
|
|
return "internal server error", 500
|
|
|
|
@app.route("/")
|
|
def broken_func():
|
|
1 // 0
|
|
|
|
@app.after_request
|
|
def after_request(resp):
|
|
resp.mimetype = "text/x-special"
|
|
return resp
|
|
|
|
resp = client.get("/")
|
|
assert resp.mimetype == "text/x-special"
|
|
assert resp.data == b"internal server error"
|
|
|
|
|
|
def test_baseexception_error_handling(app, client):
|
|
app.testing = False
|
|
|
|
@app.route("/")
|
|
def broken_func():
|
|
raise KeyboardInterrupt()
|
|
|
|
with pytest.raises(KeyboardInterrupt):
|
|
client.get("/")
|
|
|
|
ctx = flask._request_ctx_stack.top
|
|
assert ctx.preserved
|
|
assert type(ctx._preserved_exc) is KeyboardInterrupt
|
|
|
|
|
|
def test_before_request_and_routing_errors(app, client):
|
|
@app.before_request
|
|
def attach_something():
|
|
flask.g.something = "value"
|
|
|
|
@app.errorhandler(404)
|
|
def return_something(error):
|
|
return flask.g.something, 404
|
|
|
|
rv = client.get("/")
|
|
assert rv.status_code == 404
|
|
assert rv.data == b"value"
|
|
|
|
|
|
def test_user_error_handling(app, client):
|
|
class MyException(Exception):
|
|
pass
|
|
|
|
@app.errorhandler(MyException)
|
|
def handle_my_exception(e):
|
|
assert isinstance(e, MyException)
|
|
return "42"
|
|
|
|
@app.route("/")
|
|
def index():
|
|
raise MyException()
|
|
|
|
assert client.get("/").data == b"42"
|
|
|
|
|
|
def test_http_error_subclass_handling(app, client):
|
|
class ForbiddenSubclass(Forbidden):
|
|
pass
|
|
|
|
@app.errorhandler(ForbiddenSubclass)
|
|
def handle_forbidden_subclass(e):
|
|
assert isinstance(e, ForbiddenSubclass)
|
|
return "banana"
|
|
|
|
@app.errorhandler(403)
|
|
def handle_403(e):
|
|
assert not isinstance(e, ForbiddenSubclass)
|
|
assert isinstance(e, Forbidden)
|
|
return "apple"
|
|
|
|
@app.route("/1")
|
|
def index1():
|
|
raise ForbiddenSubclass()
|
|
|
|
@app.route("/2")
|
|
def index2():
|
|
flask.abort(403)
|
|
|
|
@app.route("/3")
|
|
def index3():
|
|
raise Forbidden()
|
|
|
|
assert client.get("/1").data == b"banana"
|
|
assert client.get("/2").data == b"apple"
|
|
assert client.get("/3").data == b"apple"
|
|
|
|
|
|
def test_errorhandler_precedence(app, client):
|
|
class E1(Exception):
|
|
pass
|
|
|
|
class E2(Exception):
|
|
pass
|
|
|
|
class E3(E1, E2):
|
|
pass
|
|
|
|
@app.errorhandler(E2)
|
|
def handle_e2(e):
|
|
return "E2"
|
|
|
|
@app.errorhandler(Exception)
|
|
def handle_exception(e):
|
|
return "Exception"
|
|
|
|
@app.route("/E1")
|
|
def raise_e1():
|
|
raise E1
|
|
|
|
@app.route("/E3")
|
|
def raise_e3():
|
|
raise E3
|
|
|
|
rv = client.get("/E1")
|
|
assert rv.data == b"Exception"
|
|
|
|
rv = client.get("/E3")
|
|
assert rv.data == b"E2"
|
|
|
|
|
|
def test_trapping_of_bad_request_key_errors(app, client):
|
|
@app.route("/key")
|
|
def fail():
|
|
flask.request.form["missing_key"]
|
|
|
|
@app.route("/abort")
|
|
def allow_abort():
|
|
flask.abort(400)
|
|
|
|
rv = client.get("/key")
|
|
assert rv.status_code == 400
|
|
assert b"missing_key" not in rv.data
|
|
rv = client.get("/abort")
|
|
assert rv.status_code == 400
|
|
|
|
app.debug = True
|
|
with pytest.raises(KeyError) as e:
|
|
client.get("/key")
|
|
assert e.errisinstance(BadRequest)
|
|
assert "missing_key" in e.value.get_description()
|
|
rv = client.get("/abort")
|
|
assert rv.status_code == 400
|
|
|
|
app.debug = False
|
|
app.config["TRAP_BAD_REQUEST_ERRORS"] = True
|
|
with pytest.raises(KeyError):
|
|
client.get("/key")
|
|
with pytest.raises(BadRequest):
|
|
client.get("/abort")
|
|
|
|
|
|
def test_trapping_of_all_http_exceptions(app, client):
|
|
app.config["TRAP_HTTP_EXCEPTIONS"] = True
|
|
|
|
@app.route("/fail")
|
|
def fail():
|
|
flask.abort(404)
|
|
|
|
with pytest.raises(NotFound):
|
|
client.get("/fail")
|
|
|
|
|
|
def test_error_handler_after_processor_error(app, client):
|
|
app.testing = False
|
|
|
|
@app.before_request
|
|
def before_request():
|
|
if _trigger == "before":
|
|
1 // 0
|
|
|
|
@app.after_request
|
|
def after_request(response):
|
|
if _trigger == "after":
|
|
1 // 0
|
|
return response
|
|
|
|
@app.route("/")
|
|
def index():
|
|
return "Foo"
|
|
|
|
@app.errorhandler(500)
|
|
def internal_server_error(e):
|
|
return "Hello Server Error", 500
|
|
|
|
for _trigger in "before", "after":
|
|
rv = client.get("/")
|
|
assert rv.status_code == 500
|
|
assert rv.data == b"Hello Server Error"
|
|
|
|
|
|
def test_enctype_debug_helper(app, client):
|
|
from flask.debughelpers import DebugFilesKeyError
|
|
|
|
app.debug = True
|
|
|
|
@app.route("/fail", methods=["POST"])
|
|
def index():
|
|
return flask.request.files["foo"].filename
|
|
|
|
# with statement is important because we leave an exception on the
|
|
# stack otherwise and we want to ensure that this is not the case
|
|
# to not negatively affect other tests.
|
|
with client:
|
|
with pytest.raises(DebugFilesKeyError) as e:
|
|
client.post("/fail", data={"foo": "index.txt"})
|
|
assert "no file contents were transmitted" in str(e.value)
|
|
assert "This was submitted: 'index.txt'" in str(e.value)
|
|
|
|
|
|
def test_response_types(app, client):
|
|
@app.route("/text")
|
|
def from_text():
|
|
return "Hällo Wörld"
|
|
|
|
@app.route("/bytes")
|
|
def from_bytes():
|
|
return "Hällo Wörld".encode()
|
|
|
|
@app.route("/full_tuple")
|
|
def from_full_tuple():
|
|
return (
|
|
"Meh",
|
|
400,
|
|
{"X-Foo": "Testing", "Content-Type": "text/plain; charset=utf-8"},
|
|
)
|
|
|
|
@app.route("/text_headers")
|
|
def from_text_headers():
|
|
return "Hello", {"X-Foo": "Test", "Content-Type": "text/plain; charset=utf-8"}
|
|
|
|
@app.route("/text_status")
|
|
def from_text_status():
|
|
return "Hi, status!", 400
|
|
|
|
@app.route("/response_headers")
|
|
def from_response_headers():
|
|
return (
|
|
flask.Response(
|
|
"Hello world", 404, {"Content-Type": "text/html", "X-Foo": "Baz"}
|
|
),
|
|
{"Content-Type": "text/plain", "X-Foo": "Bar", "X-Bar": "Foo"},
|
|
)
|
|
|
|
@app.route("/response_status")
|
|
def from_response_status():
|
|
return app.response_class("Hello world", 400), 500
|
|
|
|
@app.route("/wsgi")
|
|
def from_wsgi():
|
|
return NotFound()
|
|
|
|
@app.route("/dict")
|
|
def from_dict():
|
|
return {"foo": "bar"}, 201
|
|
|
|
assert client.get("/text").data == "Hällo Wörld".encode()
|
|
assert client.get("/bytes").data == "Hällo Wörld".encode()
|
|
|
|
rv = client.get("/full_tuple")
|
|
assert rv.data == b"Meh"
|
|
assert rv.headers["X-Foo"] == "Testing"
|
|
assert rv.status_code == 400
|
|
assert rv.mimetype == "text/plain"
|
|
|
|
rv = client.get("/text_headers")
|
|
assert rv.data == b"Hello"
|
|
assert rv.headers["X-Foo"] == "Test"
|
|
assert rv.status_code == 200
|
|
assert rv.mimetype == "text/plain"
|
|
|
|
rv = client.get("/text_status")
|
|
assert rv.data == b"Hi, status!"
|
|
assert rv.status_code == 400
|
|
assert rv.mimetype == "text/html"
|
|
|
|
rv = client.get("/response_headers")
|
|
assert rv.data == b"Hello world"
|
|
assert rv.content_type == "text/plain"
|
|
assert rv.headers.getlist("X-Foo") == ["Bar"]
|
|
assert rv.headers["X-Bar"] == "Foo"
|
|
assert rv.status_code == 404
|
|
|
|
rv = client.get("/response_status")
|
|
assert rv.data == b"Hello world"
|
|
assert rv.status_code == 500
|
|
|
|
rv = client.get("/wsgi")
|
|
assert b"Not Found" in rv.data
|
|
assert rv.status_code == 404
|
|
|
|
rv = client.get("/dict")
|
|
assert rv.json == {"foo": "bar"}
|
|
assert rv.status_code == 201
|
|
|
|
|
|
def test_response_type_errors():
|
|
app = flask.Flask(__name__)
|
|
app.testing = True
|
|
|
|
@app.route("/none")
|
|
def from_none():
|
|
pass
|
|
|
|
@app.route("/small_tuple")
|
|
def from_small_tuple():
|
|
return ("Hello",)
|
|
|
|
@app.route("/large_tuple")
|
|
def from_large_tuple():
|
|
return "Hello", 234, {"X-Foo": "Bar"}, "???"
|
|
|
|
@app.route("/bad_type")
|
|
def from_bad_type():
|
|
return True
|
|
|
|
@app.route("/bad_wsgi")
|
|
def from_bad_wsgi():
|
|
return lambda: None
|
|
|
|
c = app.test_client()
|
|
|
|
with pytest.raises(TypeError) as e:
|
|
c.get("/none")
|
|
assert "returned None" in str(e.value)
|
|
assert "from_none" in str(e.value)
|
|
|
|
with pytest.raises(TypeError) as e:
|
|
c.get("/small_tuple")
|
|
assert "tuple must have the form" in str(e.value)
|
|
|
|
pytest.raises(TypeError, c.get, "/large_tuple")
|
|
|
|
with pytest.raises(TypeError) as e:
|
|
c.get("/bad_type")
|
|
assert "it was a bool" in str(e.value)
|
|
|
|
pytest.raises(TypeError, c.get, "/bad_wsgi")
|
|
|
|
|
|
def test_make_response(app, req_ctx):
|
|
rv = flask.make_response()
|
|
assert rv.status_code == 200
|
|
assert rv.data == b""
|
|
assert rv.mimetype == "text/html"
|
|
|
|
rv = flask.make_response("Awesome")
|
|
assert rv.status_code == 200
|
|
assert rv.data == b"Awesome"
|
|
assert rv.mimetype == "text/html"
|
|
|
|
rv = flask.make_response("W00t", 404)
|
|
assert rv.status_code == 404
|
|
assert rv.data == b"W00t"
|
|
assert rv.mimetype == "text/html"
|
|
|
|
|
|
def test_make_response_with_response_instance(app, req_ctx):
|
|
rv = flask.make_response(flask.jsonify({"msg": "W00t"}), 400)
|
|
assert rv.status_code == 400
|
|
assert rv.data == b'{"msg":"W00t"}\n'
|
|
assert rv.mimetype == "application/json"
|
|
|
|
rv = flask.make_response(flask.Response(""), 400)
|
|
assert rv.status_code == 400
|
|
assert rv.data == b""
|
|
assert rv.mimetype == "text/html"
|
|
|
|
rv = flask.make_response(
|
|
flask.Response("", headers={"Content-Type": "text/html"}),
|
|
400,
|
|
[("X-Foo", "bar")],
|
|
)
|
|
assert rv.status_code == 400
|
|
assert rv.headers["Content-Type"] == "text/html"
|
|
assert rv.headers["X-Foo"] == "bar"
|
|
|
|
|
|
def test_jsonify_no_prettyprint(app, req_ctx):
|
|
app.config.update({"JSONIFY_PRETTYPRINT_REGULAR": False})
|
|
compressed_msg = b'{"msg":{"submsg":"W00t"},"msg2":"foobar"}\n'
|
|
uncompressed_msg = {"msg": {"submsg": "W00t"}, "msg2": "foobar"}
|
|
|
|
rv = flask.make_response(flask.jsonify(uncompressed_msg), 200)
|
|
assert rv.data == compressed_msg
|
|
|
|
|
|
def test_jsonify_prettyprint(app, req_ctx):
|
|
app.config.update({"JSONIFY_PRETTYPRINT_REGULAR": True})
|
|
compressed_msg = {"msg": {"submsg": "W00t"}, "msg2": "foobar"}
|
|
pretty_response = (
|
|
b'{\n "msg": {\n "submsg": "W00t"\n }, \n "msg2": "foobar"\n}\n'
|
|
)
|
|
|
|
rv = flask.make_response(flask.jsonify(compressed_msg), 200)
|
|
assert rv.data == pretty_response
|
|
|
|
|
|
def test_jsonify_mimetype(app, req_ctx):
|
|
app.config.update({"JSONIFY_MIMETYPE": "application/vnd.api+json"})
|
|
msg = {"msg": {"submsg": "W00t"}}
|
|
rv = flask.make_response(flask.jsonify(msg), 200)
|
|
assert rv.mimetype == "application/vnd.api+json"
|
|
|
|
|
|
@pytest.mark.skipif(sys.version_info < (3, 7), reason="requires Python >= 3.7")
|
|
def test_json_dump_dataclass(app, req_ctx):
|
|
from dataclasses import make_dataclass
|
|
|
|
Data = make_dataclass("Data", [("name", str)])
|
|
value = flask.json.dumps(Data("Flask"), app=app)
|
|
value = flask.json.loads(value, app=app)
|
|
assert value == {"name": "Flask"}
|
|
|
|
|
|
def test_jsonify_args_and_kwargs_check(app, req_ctx):
|
|
with pytest.raises(TypeError) as e:
|
|
flask.jsonify("fake args", kwargs="fake")
|
|
assert "behavior undefined" in str(e.value)
|
|
|
|
|
|
def test_url_generation(app, req_ctx):
|
|
@app.route("/hello/<name>", methods=["POST"])
|
|
def hello():
|
|
pass
|
|
|
|
assert flask.url_for("hello", name="test x") == "/hello/test%20x"
|
|
assert (
|
|
flask.url_for("hello", name="test x", _external=True)
|
|
== "http://localhost/hello/test%20x"
|
|
)
|
|
|
|
|
|
def test_build_error_handler(app):
|
|
# Test base case, a URL which results in a BuildError.
|
|
with app.test_request_context():
|
|
pytest.raises(BuildError, flask.url_for, "spam")
|
|
|
|
# Verify the error is re-raised if not the current exception.
|
|
try:
|
|
with app.test_request_context():
|
|
flask.url_for("spam")
|
|
except BuildError as err:
|
|
error = err
|
|
try:
|
|
raise RuntimeError("Test case where BuildError is not current.")
|
|
except RuntimeError:
|
|
pytest.raises(BuildError, app.handle_url_build_error, error, "spam", {})
|
|
|
|
# Test a custom handler.
|
|
def handler(error, endpoint, values):
|
|
# Just a test.
|
|
return "/test_handler/"
|
|
|
|
app.url_build_error_handlers.append(handler)
|
|
with app.test_request_context():
|
|
assert flask.url_for("spam") == "/test_handler/"
|
|
|
|
|
|
def test_build_error_handler_reraise(app):
|
|
# Test a custom handler which reraises the BuildError
|
|
def handler_raises_build_error(error, endpoint, values):
|
|
raise error
|
|
|
|
app.url_build_error_handlers.append(handler_raises_build_error)
|
|
|
|
with app.test_request_context():
|
|
pytest.raises(BuildError, flask.url_for, "not.existing")
|
|
|
|
|
|
def test_url_for_passes_special_values_to_build_error_handler(app):
|
|
@app.url_build_error_handlers.append
|
|
def handler(error, endpoint, values):
|
|
assert values == {
|
|
"_external": False,
|
|
"_anchor": None,
|
|
"_method": None,
|
|
"_scheme": None,
|
|
}
|
|
return "handled"
|
|
|
|
with app.test_request_context():
|
|
flask.url_for("/")
|
|
|
|
|
|
def test_static_files(app, client):
|
|
rv = client.get("/static/index.html")
|
|
assert rv.status_code == 200
|
|
assert rv.data.strip() == b"<h1>Hello World!</h1>"
|
|
with app.test_request_context():
|
|
assert flask.url_for("static", filename="index.html") == "/static/index.html"
|
|
rv.close()
|
|
|
|
|
|
def test_static_url_path():
|
|
app = flask.Flask(__name__, static_url_path="/foo")
|
|
app.testing = True
|
|
rv = app.test_client().get("/foo/index.html")
|
|
assert rv.status_code == 200
|
|
rv.close()
|
|
|
|
with app.test_request_context():
|
|
assert flask.url_for("static", filename="index.html") == "/foo/index.html"
|
|
|
|
|
|
def test_static_url_path_with_ending_slash():
|
|
app = flask.Flask(__name__, static_url_path="/foo/")
|
|
app.testing = True
|
|
rv = app.test_client().get("/foo/index.html")
|
|
assert rv.status_code == 200
|
|
rv.close()
|
|
|
|
with app.test_request_context():
|
|
assert flask.url_for("static", filename="index.html") == "/foo/index.html"
|
|
|
|
|
|
def test_static_url_empty_path(app):
|
|
app = flask.Flask(__name__, static_folder="", static_url_path="")
|
|
rv = app.test_client().open("/static/index.html", method="GET")
|
|
assert rv.status_code == 200
|
|
rv.close()
|
|
|
|
|
|
def test_static_url_empty_path_default(app):
|
|
app = flask.Flask(__name__, static_folder="")
|
|
rv = app.test_client().open("/static/index.html", method="GET")
|
|
assert rv.status_code == 200
|
|
rv.close()
|
|
|
|
|
|
@pytest.mark.skipif(sys.version_info < (3, 6), reason="requires Python >= 3.6")
|
|
def test_static_folder_with_pathlib_path(app):
|
|
from pathlib import Path
|
|
|
|
app = flask.Flask(__name__, static_folder=Path("static"))
|
|
rv = app.test_client().open("/static/index.html", method="GET")
|
|
assert rv.status_code == 200
|
|
rv.close()
|
|
|
|
|
|
def test_static_folder_with_ending_slash():
|
|
app = flask.Flask(__name__, static_folder="static/")
|
|
|
|
@app.route("/<path:path>")
|
|
def catch_all(path):
|
|
return path
|
|
|
|
rv = app.test_client().get("/catch/all")
|
|
assert rv.data == b"catch/all"
|
|
|
|
|
|
def test_static_route_with_host_matching():
|
|
app = flask.Flask(__name__, host_matching=True, static_host="example.com")
|
|
c = app.test_client()
|
|
rv = c.get("http://example.com/static/index.html")
|
|
assert rv.status_code == 200
|
|
rv.close()
|
|
with app.test_request_context():
|
|
rv = flask.url_for("static", filename="index.html", _external=True)
|
|
assert rv == "http://example.com/static/index.html"
|
|
# Providing static_host without host_matching=True should error.
|
|
with pytest.raises(Exception):
|
|
flask.Flask(__name__, static_host="example.com")
|
|
# Providing host_matching=True with static_folder
|
|
# but without static_host should error.
|
|
with pytest.raises(Exception):
|
|
flask.Flask(__name__, host_matching=True)
|
|
# Providing host_matching=True without static_host
|
|
# but with static_folder=None should not error.
|
|
flask.Flask(__name__, host_matching=True, static_folder=None)
|
|
|
|
|
|
def test_request_locals():
|
|
assert repr(flask.g) == "<LocalProxy unbound>"
|
|
assert not flask.g
|
|
|
|
|
|
def test_server_name_subdomain():
|
|
app = flask.Flask(__name__, subdomain_matching=True)
|
|
client = app.test_client()
|
|
|
|
@app.route("/")
|
|
def index():
|
|
return "default"
|
|
|
|
@app.route("/", subdomain="foo")
|
|
def subdomain():
|
|
return "subdomain"
|
|
|
|
app.config["SERVER_NAME"] = "dev.local:5000"
|
|
rv = client.get("/")
|
|
assert rv.data == b"default"
|
|
|
|
rv = client.get("/", "http://dev.local:5000")
|
|
assert rv.data == b"default"
|
|
|
|
rv = client.get("/", "https://dev.local:5000")
|
|
assert rv.data == b"default"
|
|
|
|
app.config["SERVER_NAME"] = "dev.local:443"
|
|
rv = client.get("/", "https://dev.local")
|
|
|
|
# Werkzeug 1.0 fixes matching https scheme with 443 port
|
|
if rv.status_code != 404:
|
|
assert rv.data == b"default"
|
|
|
|
app.config["SERVER_NAME"] = "dev.local"
|
|
rv = client.get("/", "https://dev.local")
|
|
assert rv.data == b"default"
|
|
|
|
# suppress Werkzeug 1.0 warning about name mismatch
|
|
with pytest.warns(None):
|
|
rv = client.get("/", "http://foo.localhost")
|
|
assert rv.status_code == 404
|
|
|
|
rv = client.get("/", "http://foo.dev.local")
|
|
assert rv.data == b"subdomain"
|
|
|
|
|
|
@pytest.mark.filterwarnings("ignore::pytest.PytestUnraisableExceptionWarning")
|
|
@pytest.mark.filterwarnings("ignore::pytest.PytestUnhandledThreadExceptionWarning")
|
|
def test_exception_propagation(app, client):
|
|
def apprunner(config_key):
|
|
@app.route("/")
|
|
def index():
|
|
1 // 0
|
|
|
|
if config_key is not None:
|
|
app.config[config_key] = True
|
|
with pytest.raises(Exception):
|
|
client.get("/")
|
|
else:
|
|
assert client.get("/").status_code == 500
|
|
|
|
# we have to run this test in an isolated thread because if the
|
|
# debug flag is set to true and an exception happens the context is
|
|
# not torn down. This causes other tests that run after this fail
|
|
# when they expect no exception on the stack.
|
|
for config_key in "TESTING", "PROPAGATE_EXCEPTIONS", "DEBUG", None:
|
|
t = Thread(target=apprunner, args=(config_key,))
|
|
t.start()
|
|
t.join()
|
|
|
|
|
|
@pytest.mark.parametrize("debug", [True, False])
|
|
@pytest.mark.parametrize("use_debugger", [True, False])
|
|
@pytest.mark.parametrize("use_reloader", [True, False])
|
|
@pytest.mark.parametrize("propagate_exceptions", [None, True, False])
|
|
def test_werkzeug_passthrough_errors(
|
|
monkeypatch, debug, use_debugger, use_reloader, propagate_exceptions, app
|
|
):
|
|
rv = {}
|
|
|
|
# Mocks werkzeug.serving.run_simple method
|
|
def run_simple_mock(*args, **kwargs):
|
|
rv["passthrough_errors"] = kwargs.get("passthrough_errors")
|
|
|
|
monkeypatch.setattr(werkzeug.serving, "run_simple", run_simple_mock)
|
|
app.config["PROPAGATE_EXCEPTIONS"] = propagate_exceptions
|
|
app.run(debug=debug, use_debugger=use_debugger, use_reloader=use_reloader)
|
|
|
|
|
|
def test_max_content_length(app, client):
|
|
app.config["MAX_CONTENT_LENGTH"] = 64
|
|
|
|
@app.before_request
|
|
def always_first():
|
|
flask.request.form["myfile"]
|
|
AssertionError()
|
|
|
|
@app.route("/accept", methods=["POST"])
|
|
def accept_file():
|
|
flask.request.form["myfile"]
|
|
AssertionError()
|
|
|
|
@app.errorhandler(413)
|
|
def catcher(error):
|
|
return "42"
|
|
|
|
rv = client.post("/accept", data={"myfile": "foo" * 100})
|
|
assert rv.data == b"42"
|
|
|
|
|
|
def test_url_processors(app, client):
|
|
@app.url_defaults
|
|
def add_language_code(endpoint, values):
|
|
if flask.g.lang_code is not None and app.url_map.is_endpoint_expecting(
|
|
endpoint, "lang_code"
|
|
):
|
|
values.setdefault("lang_code", flask.g.lang_code)
|
|
|
|
@app.url_value_preprocessor
|
|
def pull_lang_code(endpoint, values):
|
|
flask.g.lang_code = values.pop("lang_code", None)
|
|
|
|
@app.route("/<lang_code>/")
|
|
def index():
|
|
return flask.url_for("about")
|
|
|
|
@app.route("/<lang_code>/about")
|
|
def about():
|
|
return flask.url_for("something_else")
|
|
|
|
@app.route("/foo")
|
|
def something_else():
|
|
return flask.url_for("about", lang_code="en")
|
|
|
|
assert client.get("/de/").data == b"/de/about"
|
|
assert client.get("/de/about").data == b"/foo"
|
|
assert client.get("/foo").data == b"/en/about"
|
|
|
|
|
|
def test_inject_blueprint_url_defaults(app):
|
|
bp = flask.Blueprint("foo.bar.baz", __name__, template_folder="template")
|
|
|
|
@bp.url_defaults
|
|
def bp_defaults(endpoint, values):
|
|
values["page"] = "login"
|
|
|
|
@bp.route("/<page>")
|
|
def view(page):
|
|
pass
|
|
|
|
app.register_blueprint(bp)
|
|
|
|
values = dict()
|
|
app.inject_url_defaults("foo.bar.baz.view", values)
|
|
expected = dict(page="login")
|
|
assert values == expected
|
|
|
|
with app.test_request_context("/somepage"):
|
|
url = flask.url_for("foo.bar.baz.view")
|
|
expected = "/login"
|
|
assert url == expected
|
|
|
|
|
|
def test_nonascii_pathinfo(app, client):
|
|
@app.route("/киртест")
|
|
def index():
|
|
return "Hello World!"
|
|
|
|
rv = client.get("/киртест")
|
|
assert rv.data == b"Hello World!"
|
|
|
|
|
|
def test_debug_mode_complains_after_first_request(app, client):
|
|
app.debug = True
|
|
|
|
@app.route("/")
|
|
def index():
|
|
return "Awesome"
|
|
|
|
assert not app.got_first_request
|
|
assert client.get("/").data == b"Awesome"
|
|
with pytest.raises(AssertionError) as e:
|
|
|
|
@app.route("/foo")
|
|
def broken():
|
|
return "Meh"
|
|
|
|
assert "A setup function was called" in str(e.value)
|
|
|
|
app.debug = False
|
|
|
|
@app.route("/foo")
|
|
def working():
|
|
return "Meh"
|
|
|
|
assert client.get("/foo").data == b"Meh"
|
|
assert app.got_first_request
|
|
|
|
|
|
def test_before_first_request_functions(app, client):
|
|
got = []
|
|
|
|
@app.before_first_request
|
|
def foo():
|
|
got.append(42)
|
|
|
|
client.get("/")
|
|
assert got == [42]
|
|
client.get("/")
|
|
assert got == [42]
|
|
assert app.got_first_request
|
|
|
|
|
|
def test_before_first_request_functions_concurrent(app, client):
|
|
got = []
|
|
|
|
@app.before_first_request
|
|
def foo():
|
|
time.sleep(0.2)
|
|
got.append(42)
|
|
|
|
def get_and_assert():
|
|
client.get("/")
|
|
assert got == [42]
|
|
|
|
t = Thread(target=get_and_assert)
|
|
t.start()
|
|
get_and_assert()
|
|
t.join()
|
|
assert app.got_first_request
|
|
|
|
|
|
def test_routing_redirect_debugging(app, client):
|
|
app.debug = True
|
|
|
|
@app.route("/foo/", methods=["GET", "POST"])
|
|
def foo():
|
|
return "success"
|
|
|
|
with client:
|
|
with pytest.raises(AssertionError) as e:
|
|
client.post("/foo", data={})
|
|
assert "http://localhost/foo/" in str(e.value)
|
|
assert "Make sure to directly send your POST-request to this URL" in str(
|
|
e.value
|
|
)
|
|
|
|
rv = client.get("/foo", data={}, follow_redirects=True)
|
|
assert rv.data == b"success"
|
|
|
|
app.debug = False
|
|
with client:
|
|
rv = client.post("/foo", data={}, follow_redirects=True)
|
|
assert rv.data == b"success"
|
|
|
|
|
|
def test_route_decorator_custom_endpoint(app, client):
|
|
app.debug = True
|
|
|
|
@app.route("/foo/")
|
|
def foo():
|
|
return flask.request.endpoint
|
|
|
|
@app.route("/bar/", endpoint="bar")
|
|
def for_bar():
|
|
return flask.request.endpoint
|
|
|
|
@app.route("/bar/123", endpoint="123")
|
|
def for_bar_foo():
|
|
return flask.request.endpoint
|
|
|
|
with app.test_request_context():
|
|
assert flask.url_for("foo") == "/foo/"
|
|
assert flask.url_for("bar") == "/bar/"
|
|
assert flask.url_for("123") == "/bar/123"
|
|
|
|
assert client.get("/foo/").data == b"foo"
|
|
assert client.get("/bar/").data == b"bar"
|
|
assert client.get("/bar/123").data == b"123"
|
|
|
|
|
|
def test_preserve_only_once(app, client):
|
|
app.debug = True
|
|
|
|
@app.route("/fail")
|
|
def fail_func():
|
|
1 // 0
|
|
|
|
for _x in range(3):
|
|
with pytest.raises(ZeroDivisionError):
|
|
client.get("/fail")
|
|
|
|
assert flask._request_ctx_stack.top is not None
|
|
assert flask._app_ctx_stack.top is not None
|
|
# implicit appctx disappears too
|
|
flask._request_ctx_stack.top.pop()
|
|
assert flask._request_ctx_stack.top is None
|
|
assert flask._app_ctx_stack.top is None
|
|
|
|
|
|
def test_preserve_remembers_exception(app, client):
|
|
app.debug = True
|
|
errors = []
|
|
|
|
@app.route("/fail")
|
|
def fail_func():
|
|
1 // 0
|
|
|
|
@app.route("/success")
|
|
def success_func():
|
|
return "Okay"
|
|
|
|
@app.teardown_request
|
|
def teardown_handler(exc):
|
|
errors.append(exc)
|
|
|
|
# After this failure we did not yet call the teardown handler
|
|
with pytest.raises(ZeroDivisionError):
|
|
client.get("/fail")
|
|
assert errors == []
|
|
|
|
# But this request triggers it, and it's an error
|
|
client.get("/success")
|
|
assert len(errors) == 2
|
|
assert isinstance(errors[0], ZeroDivisionError)
|
|
|
|
# At this point another request does nothing.
|
|
client.get("/success")
|
|
assert len(errors) == 3
|
|
assert errors[1] is None
|
|
|
|
|
|
def test_get_method_on_g(app_ctx):
|
|
assert flask.g.get("x") is None
|
|
assert flask.g.get("x", 11) == 11
|
|
flask.g.x = 42
|
|
assert flask.g.get("x") == 42
|
|
assert flask.g.x == 42
|
|
|
|
|
|
def test_g_iteration_protocol(app_ctx):
|
|
flask.g.foo = 23
|
|
flask.g.bar = 42
|
|
assert "foo" in flask.g
|
|
assert "foos" not in flask.g
|
|
assert sorted(flask.g) == ["bar", "foo"]
|
|
|
|
|
|
def test_subdomain_basic_support():
|
|
app = flask.Flask(__name__, subdomain_matching=True)
|
|
app.config["SERVER_NAME"] = "localhost.localdomain"
|
|
client = app.test_client()
|
|
|
|
@app.route("/")
|
|
def normal_index():
|
|
return "normal index"
|
|
|
|
@app.route("/", subdomain="test")
|
|
def test_index():
|
|
return "test index"
|
|
|
|
rv = client.get("/", "http://localhost.localdomain/")
|
|
assert rv.data == b"normal index"
|
|
|
|
rv = client.get("/", "http://test.localhost.localdomain/")
|
|
assert rv.data == b"test index"
|
|
|
|
|
|
def test_subdomain_matching():
|
|
app = flask.Flask(__name__, subdomain_matching=True)
|
|
client = app.test_client()
|
|
app.config["SERVER_NAME"] = "localhost.localdomain"
|
|
|
|
@app.route("/", subdomain="<user>")
|
|
def index(user):
|
|
return f"index for {user}"
|
|
|
|
rv = client.get("/", "http://mitsuhiko.localhost.localdomain/")
|
|
assert rv.data == b"index for mitsuhiko"
|
|
|
|
|
|
def test_subdomain_matching_with_ports():
|
|
app = flask.Flask(__name__, subdomain_matching=True)
|
|
app.config["SERVER_NAME"] = "localhost.localdomain:3000"
|
|
client = app.test_client()
|
|
|
|
@app.route("/", subdomain="<user>")
|
|
def index(user):
|
|
return f"index for {user}"
|
|
|
|
rv = client.get("/", "http://mitsuhiko.localhost.localdomain:3000/")
|
|
assert rv.data == b"index for mitsuhiko"
|
|
|
|
|
|
@pytest.mark.parametrize("matching", (False, True))
|
|
def test_subdomain_matching_other_name(matching):
|
|
app = flask.Flask(__name__, subdomain_matching=matching)
|
|
app.config["SERVER_NAME"] = "localhost.localdomain:3000"
|
|
client = app.test_client()
|
|
|
|
@app.route("/")
|
|
def index():
|
|
return "", 204
|
|
|
|
# suppress Werkzeug 0.15 warning about name mismatch
|
|
with pytest.warns(None):
|
|
# ip address can't match name
|
|
rv = client.get("/", "http://127.0.0.1:3000/")
|
|
assert rv.status_code == 404 if matching else 204
|
|
|
|
# allow all subdomains if matching is disabled
|
|
rv = client.get("/", "http://www.localhost.localdomain:3000/")
|
|
assert rv.status_code == 404 if matching else 204
|
|
|
|
|
|
def test_multi_route_rules(app, client):
|
|
@app.route("/")
|
|
@app.route("/<test>/")
|
|
def index(test="a"):
|
|
return test
|
|
|
|
rv = client.open("/")
|
|
assert rv.data == b"a"
|
|
rv = client.open("/b/")
|
|
assert rv.data == b"b"
|
|
|
|
|
|
def test_multi_route_class_views(app, client):
|
|
class View:
|
|
def __init__(self, app):
|
|
app.add_url_rule("/", "index", self.index)
|
|
app.add_url_rule("/<test>/", "index", self.index)
|
|
|
|
def index(self, test="a"):
|
|
return test
|
|
|
|
_ = View(app)
|
|
rv = client.open("/")
|
|
assert rv.data == b"a"
|
|
rv = client.open("/b/")
|
|
assert rv.data == b"b"
|
|
|
|
|
|
def test_run_defaults(monkeypatch, app):
|
|
rv = {}
|
|
|
|
# Mocks werkzeug.serving.run_simple method
|
|
def run_simple_mock(*args, **kwargs):
|
|
rv["result"] = "running..."
|
|
|
|
monkeypatch.setattr(werkzeug.serving, "run_simple", run_simple_mock)
|
|
app.run()
|
|
assert rv["result"] == "running..."
|
|
|
|
|
|
def test_run_server_port(monkeypatch, app):
|
|
rv = {}
|
|
|
|
# Mocks werkzeug.serving.run_simple method
|
|
def run_simple_mock(hostname, port, application, *args, **kwargs):
|
|
rv["result"] = f"running on {hostname}:{port} ..."
|
|
|
|
monkeypatch.setattr(werkzeug.serving, "run_simple", run_simple_mock)
|
|
hostname, port = "localhost", 8000
|
|
app.run(hostname, port, debug=True)
|
|
assert rv["result"] == f"running on {hostname}:{port} ..."
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"host,port,server_name,expect_host,expect_port",
|
|
(
|
|
(None, None, "pocoo.org:8080", "pocoo.org", 8080),
|
|
("localhost", None, "pocoo.org:8080", "localhost", 8080),
|
|
(None, 80, "pocoo.org:8080", "pocoo.org", 80),
|
|
("localhost", 80, "pocoo.org:8080", "localhost", 80),
|
|
("localhost", 0, "localhost:8080", "localhost", 0),
|
|
(None, None, "localhost:8080", "localhost", 8080),
|
|
(None, None, "localhost:0", "localhost", 0),
|
|
),
|
|
)
|
|
def test_run_from_config(
|
|
monkeypatch, host, port, server_name, expect_host, expect_port, app
|
|
):
|
|
def run_simple_mock(hostname, port, *args, **kwargs):
|
|
assert hostname == expect_host
|
|
assert port == expect_port
|
|
|
|
monkeypatch.setattr(werkzeug.serving, "run_simple", run_simple_mock)
|
|
app.config["SERVER_NAME"] = server_name
|
|
app.run(host, port)
|
|
|
|
|
|
def test_max_cookie_size(app, client, recwarn):
|
|
app.config["MAX_COOKIE_SIZE"] = 100
|
|
|
|
# outside app context, default to Werkzeug static value,
|
|
# which is also the default config
|
|
response = flask.Response()
|
|
default = flask.Flask.default_config["MAX_COOKIE_SIZE"]
|
|
assert response.max_cookie_size == default
|
|
|
|
# inside app context, use app config
|
|
with app.app_context():
|
|
assert flask.Response().max_cookie_size == 100
|
|
|
|
@app.route("/")
|
|
def index():
|
|
r = flask.Response("", status=204)
|
|
r.set_cookie("foo", "bar" * 100)
|
|
return r
|
|
|
|
client.get("/")
|
|
assert len(recwarn) == 1
|
|
w = recwarn.pop()
|
|
assert "cookie is too large" in str(w.message)
|
|
|
|
app.config["MAX_COOKIE_SIZE"] = 0
|
|
|
|
client.get("/")
|
|
assert len(recwarn) == 0
|
|
|
|
|
|
@require_cpython_gc
|
|
def test_app_freed_on_zero_refcount():
|
|
# A Flask instance should not create a reference cycle that prevents CPython
|
|
# from freeing it when all external references to it are released (see #3761).
|
|
gc.disable()
|
|
try:
|
|
app = flask.Flask(__name__)
|
|
assert app.view_functions["static"]
|
|
weak = weakref.ref(app)
|
|
assert weak() is not None
|
|
del app
|
|
assert weak() is None
|
|
finally:
|
|
gc.enable()
|