Flask Update.

Flask Update, works for CPython 3.10.0 Beta 3.
This commit is contained in:
Fernando Ochoa O 2021-06-24 14:43:08 -05:00
parent aac67289e5
commit 529962d6b5
41 changed files with 2882 additions and 2092 deletions

26
.gitignore vendored
View file

@ -1,26 +0,0 @@
.DS_Store
.env
.flaskenv
*.pyc
*.pyo
env/
venv/
.venv/
env*
dist/
build/
*.egg
*.egg-info/
_mailinglist
.tox/
.cache/
.pytest_cache/
.idea/
docs/_build/
.vscode
# Coverage reports
htmlcov/
.coverage
.coverage.*
*,cover

View file

@ -1,46 +1,60 @@
from markupsafe import escape
from markupsafe import Markup
from werkzeug.exceptions import abort as abort
from werkzeug.utils import redirect as redirect
# -*- coding: utf-8 -*-
"""
flask
~~~~~
from . import json as json
from .app import Flask as Flask
from .app import Request as Request
from .app import Response as Response
from .blueprints import Blueprint as Blueprint
from .config import Config as Config
from .ctx import after_this_request as after_this_request
from .ctx import copy_current_request_context as copy_current_request_context
from .ctx import has_app_context as has_app_context
from .ctx import has_request_context as has_request_context
from .globals import _app_ctx_stack as _app_ctx_stack
from .globals import _request_ctx_stack as _request_ctx_stack
from .globals import current_app as current_app
from .globals import g as g
from .globals import request as request
from .globals import session as session
from .helpers import flash as flash
from .helpers import get_flashed_messages as get_flashed_messages
from .helpers import get_template_attribute as get_template_attribute
from .helpers import make_response as make_response
from .helpers import safe_join as safe_join
from .helpers import send_file as send_file
from .helpers import send_from_directory as send_from_directory
from .helpers import stream_with_context as stream_with_context
from .helpers import url_for as url_for
from .json import jsonify as jsonify
from .signals import appcontext_popped as appcontext_popped
from .signals import appcontext_pushed as appcontext_pushed
from .signals import appcontext_tearing_down as appcontext_tearing_down
from .signals import before_render_template as before_render_template
from .signals import got_request_exception as got_request_exception
from .signals import message_flashed as message_flashed
from .signals import request_finished as request_finished
from .signals import request_started as request_started
from .signals import request_tearing_down as request_tearing_down
from .signals import signals_available as signals_available
from .signals import template_rendered as template_rendered
from .templating import render_template as render_template
from .templating import render_template_string as render_template_string
A microframework based on Werkzeug. It's extensively documented
and follows best practice patterns.
__version__ = "2.1.0.dev0"
:copyright: 2010 Pallets
:license: BSD-3-Clause
"""
# utilities we import from Werkzeug and Jinja2 that are unused
# in the module but are exported as public interface.
from jinja2 import escape
from jinja2 import Markup
from werkzeug.exceptions import abort
from werkzeug.utils import redirect
from . import json
from ._compat import json_available
from .app import Flask
from .app import Request
from .app import Response
from .blueprints import Blueprint
from .config import Config
from .ctx import after_this_request
from .ctx import copy_current_request_context
from .ctx import has_app_context
from .ctx import has_request_context
from .globals import _app_ctx_stack
from .globals import _request_ctx_stack
from .globals import current_app
from .globals import g
from .globals import request
from .globals import session
from .helpers import flash
from .helpers import get_flashed_messages
from .helpers import get_template_attribute
from .helpers import make_response
from .helpers import safe_join
from .helpers import send_file
from .helpers import send_from_directory
from .helpers import stream_with_context
from .helpers import url_for
from .json import jsonify
from .signals import appcontext_popped
from .signals import appcontext_pushed
from .signals import appcontext_tearing_down
from .signals import before_render_template
from .signals import got_request_exception
from .signals import message_flashed
from .signals import request_finished
from .signals import request_started
from .signals import request_tearing_down
from .signals import signals_available
from .signals import template_rendered
from .templating import render_template
from .templating import render_template_string
__version__ = "1.1.4"

View file

@ -1,3 +1,15 @@
from .cli import main
# -*- coding: utf-8 -*-
"""
flask.__main__
~~~~~~~~~~~~~~
main()
Alias for flask.run for the command line.
:copyright: 2010 Pallets
:license: BSD-3-Clause
"""
if __name__ == "__main__":
from .cli import main
main(as_module=True)

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

145
src/flask/_compat.py Normal file
View file

@ -0,0 +1,145 @@
# -*- coding: utf-8 -*-
"""
flask._compat
~~~~~~~~~~~~~
Some py2/py3 compatibility support based on a stripped down
version of six so we don't have to depend on a specific version
of it.
:copyright: 2010 Pallets
:license: BSD-3-Clause
"""
import sys
PY2 = sys.version_info[0] == 2
_identity = lambda x: x
try: # Python 2
text_type = unicode
string_types = (str, unicode)
integer_types = (int, long)
except NameError: # Python 3
text_type = str
string_types = (str,)
integer_types = (int,)
if not PY2:
iterkeys = lambda d: iter(d.keys())
itervalues = lambda d: iter(d.values())
iteritems = lambda d: iter(d.items())
from inspect import getfullargspec as getargspec
from io import StringIO
import collections.abc as collections_abc
def reraise(tp, value, tb=None):
if value.__traceback__ is not tb:
raise value.with_traceback(tb)
raise value
implements_to_string = _identity
else:
iterkeys = lambda d: d.iterkeys()
itervalues = lambda d: d.itervalues()
iteritems = lambda d: d.iteritems()
from inspect import getargspec
from cStringIO import StringIO
import collections as collections_abc
exec("def reraise(tp, value, tb=None):\n raise tp, value, tb")
def implements_to_string(cls):
cls.__unicode__ = cls.__str__
cls.__str__ = lambda x: x.__unicode__().encode("utf-8")
return cls
def with_metaclass(meta, *bases):
"""Create a base class with a metaclass."""
# This requires a bit of explanation: the basic idea is to make a
# dummy metaclass for one level of class instantiation that replaces
# itself with the actual metaclass.
class metaclass(type):
def __new__(metacls, name, this_bases, d):
return meta(name, bases, d)
return type.__new__(metaclass, "temporary_class", (), {})
# Certain versions of pypy have a bug where clearing the exception stack
# breaks the __exit__ function in a very peculiar way. The second level of
# exception blocks is necessary because pypy seems to forget to check if an
# exception happened until the next bytecode instruction?
#
# Relevant PyPy bugfix commit:
# https://bitbucket.org/pypy/pypy/commits/77ecf91c635a287e88e60d8ddb0f4e9df4003301
# According to ronan on #pypy IRC, it is released in PyPy2 2.3 and later
# versions.
#
# Ubuntu 14.04 has PyPy 2.2.1, which does exhibit this bug.
BROKEN_PYPY_CTXMGR_EXIT = False
if hasattr(sys, "pypy_version_info"):
class _Mgr(object):
def __enter__(self):
return self
def __exit__(self, *args):
if hasattr(sys, "exc_clear"):
# Python 3 (PyPy3) doesn't have exc_clear
sys.exc_clear()
try:
try:
with _Mgr():
raise AssertionError()
except: # noqa: B001
# We intentionally use a bare except here. See the comment above
# regarding a pypy bug as to why.
raise
except TypeError:
BROKEN_PYPY_CTXMGR_EXIT = True
except AssertionError:
pass
try:
from os import fspath
except ImportError:
# Backwards compatibility as proposed in PEP 0519:
# https://www.python.org/dev/peps/pep-0519/#backwards-compatibility
def fspath(path):
return path.__fspath__() if hasattr(path, "__fspath__") else path
class _DeprecatedBool(object):
def __init__(self, name, version, value):
self.message = "'{}' is deprecated and will be removed in version {}.".format(
name, version
)
self.value = value
def _warn(self):
import warnings
warnings.warn(self.message, DeprecationWarning, stacklevel=2)
def __eq__(self, other):
self._warn()
return other == self.value
def __ne__(self, other):
self._warn()
return other != self.value
def __bool__(self):
self._warn()
return self.value
__nonzero__ = __bool__
json_available = _DeprecatedBool("flask.json_available", "2.0.0", True)

File diff suppressed because it is too large Load diff

View file

@ -1,42 +1,31 @@
import typing as t
from collections import defaultdict
# -*- coding: utf-8 -*-
"""
flask.blueprints
~~~~~~~~~~~~~~~~
Blueprints are the recommended way to implement larger or more
pluggable applications in Flask 0.7 and later.
:copyright: 2010 Pallets
:license: BSD-3-Clause
"""
from functools import update_wrapper
from .scaffold import _endpoint_from_view_func
from .scaffold import _sentinel
from .scaffold import Scaffold
from .typing import AfterRequestCallable
from .typing import BeforeFirstRequestCallable
from .typing import BeforeRequestCallable
from .typing import ErrorHandlerCallable
from .typing import TeardownCallable
from .typing import TemplateContextProcessorCallable
from .typing import TemplateFilterCallable
from .typing import TemplateGlobalCallable
from .typing import TemplateTestCallable
from .typing import URLDefaultCallable
from .typing import URLValuePreprocessorCallable
from .helpers import _endpoint_from_view_func
from .helpers import _PackageBoundObject
if t.TYPE_CHECKING:
from .app import Flask
DeferredSetupFunction = t.Callable[["BlueprintSetupState"], t.Callable]
# a singleton sentinel value for parameter defaults
_sentinel = object()
class BlueprintSetupState:
class BlueprintSetupState(object):
"""Temporary holder object for registering a blueprint with the
application. An instance of this class is created by the
:meth:`~flask.Blueprint.make_setup_state` method and later passed
to all register callback functions.
"""
def __init__(
self,
blueprint: "Blueprint",
app: "Flask",
options: t.Any,
first_registration: bool,
) -> None:
def __init__(self, blueprint, app, options, first_registration):
#: a reference to the current application
self.app = app
@ -68,21 +57,12 @@ class BlueprintSetupState:
#: blueprint.
self.url_prefix = url_prefix
self.name = self.options.get("name", blueprint.name)
self.name_prefix = self.options.get("name_prefix", "")
#: A dictionary with URL defaults that is added to each and every
#: URL that was defined with the blueprint.
self.url_defaults = dict(self.blueprint.url_values_defaults)
self.url_defaults.update(self.options.get("url_defaults", ()))
def add_url_rule(
self,
rule: str,
endpoint: t.Optional[str] = None,
view_func: t.Optional[t.Callable] = None,
**options: t.Any,
) -> None:
def add_url_rule(self, rule, endpoint=None, view_func=None, **options):
"""A helper method to register a rule (and optionally a view function)
to the application. The endpoint is automatically prefixed with the
blueprint's name.
@ -94,21 +74,20 @@ class BlueprintSetupState:
rule = self.url_prefix
options.setdefault("subdomain", self.subdomain)
if endpoint is None:
endpoint = _endpoint_from_view_func(view_func) # type: ignore
endpoint = _endpoint_from_view_func(view_func)
defaults = self.url_defaults
if "defaults" in options:
defaults = dict(defaults, **options.pop("defaults"))
self.app.add_url_rule(
rule,
f"{self.name_prefix}.{self.name}.{endpoint}".lstrip("."),
"%s.%s" % (self.blueprint.name, endpoint),
view_func,
defaults=defaults,
**options,
**options
)
class Blueprint(Scaffold):
class Blueprint(_PackageBoundObject):
"""Represents a blueprint, a collection of routes and other
app-related functions that can be registered on a real application
later.
@ -122,7 +101,14 @@ class Blueprint(Scaffold):
that is called with :class:`~flask.blueprints.BlueprintSetupState`
when the blueprint is registered on an application.
See :doc:`/blueprints` for more information.
See :ref:`blueprints` for more information.
.. versionchanged:: 1.1.0
Blueprints have a ``cli`` group to register nested CLI commands.
The ``cli_group`` parameter controls the name of the group under
the ``flask`` command.
.. versionadded:: 0.7
:param name: The name of the blueprint. Will be prepended to each
endpoint name.
@ -148,69 +134,65 @@ class Blueprint(Scaffold):
default.
:param url_defaults: A dict of default values that blueprint routes
will receive by default.
:param root_path: By default, the blueprint will automatically set
this based on ``import_name``. In certain situations this
automatic detection can fail, so the path can be specified
manually instead.
.. versionchanged:: 1.1.0
Blueprints have a ``cli`` group to register nested CLI commands.
The ``cli_group`` parameter controls the name of the group under
the ``flask`` command.
.. versionadded:: 0.7
:param root_path: By default, the blueprint will automatically this
based on ``import_name``. In certain situations this automatic
detection can fail, so the path can be specified manually
instead.
"""
warn_on_modifications = False
_got_registered_once = False
#: Blueprint local JSON encoder class to use. Set to ``None`` to use
#: the app's :class:`~flask.Flask.json_encoder`.
#: Blueprint local JSON decoder class to use.
#: Set to ``None`` to use the app's :class:`~flask.app.Flask.json_encoder`.
json_encoder = None
#: Blueprint local JSON decoder class to use. Set to ``None`` to use
#: the app's :class:`~flask.Flask.json_decoder`.
#: Blueprint local JSON decoder class to use.
#: Set to ``None`` to use the app's :class:`~flask.app.Flask.json_decoder`.
json_decoder = None
# TODO remove the next three attrs when Sphinx :inherited-members: works
# https://github.com/sphinx-doc/sphinx/issues/741
#: The name of the package or module that this app belongs to. Do not
#: change this once it is set by the constructor.
import_name = None
#: Location of the template files to be added to the template lookup.
#: ``None`` if templates should not be added.
template_folder = None
#: Absolute path to the package on the filesystem. Used to look up
#: resources contained in the package.
root_path = None
def __init__(
self,
name: str,
import_name: str,
static_folder: t.Optional[str] = None,
static_url_path: t.Optional[str] = None,
template_folder: t.Optional[str] = None,
url_prefix: t.Optional[str] = None,
subdomain: t.Optional[str] = None,
url_defaults: t.Optional[dict] = None,
root_path: t.Optional[str] = None,
cli_group: t.Optional[str] = _sentinel, # type: ignore
name,
import_name,
static_folder=None,
static_url_path=None,
template_folder=None,
url_prefix=None,
subdomain=None,
url_defaults=None,
root_path=None,
cli_group=_sentinel,
):
super().__init__(
import_name=import_name,
static_folder=static_folder,
static_url_path=static_url_path,
template_folder=template_folder,
root_path=root_path,
_PackageBoundObject.__init__(
self, import_name, template_folder, root_path=root_path
)
if "." in name:
raise ValueError("'name' may not contain a dot '.' character.")
self.name = name
self.url_prefix = url_prefix
self.subdomain = subdomain
self.deferred_functions: t.List[DeferredSetupFunction] = []
self.static_folder = static_folder
self.static_url_path = static_url_path
self.deferred_functions = []
if url_defaults is None:
url_defaults = {}
self.url_values_defaults = url_defaults
self.cli_group = cli_group
self._blueprints: t.List[t.Tuple["Blueprint", dict]] = []
def _is_setup_finished(self) -> bool:
return self.warn_on_modifications and self._got_registered_once
def record(self, func: t.Callable) -> None:
def record(self, func):
"""Registers a function that is called when the blueprint is
registered on the application. This function is called with the
state as argument as returned by the :meth:`make_setup_state`
@ -221,212 +203,114 @@ class Blueprint(Scaffold):
warn(
Warning(
"The blueprint was already registered once but is"
" getting modified now. These changes will not show"
" up."
"The blueprint was already registered once "
"but is getting modified now. These changes "
"will not show up."
)
)
self.deferred_functions.append(func)
def record_once(self, func: t.Callable) -> None:
def record_once(self, func):
"""Works like :meth:`record` but wraps the function in another
function that will ensure the function is only called once. If the
blueprint is registered a second time on the application, the
function passed is not called.
"""
def wrapper(state: BlueprintSetupState) -> None:
def wrapper(state):
if state.first_registration:
func(state)
return self.record(update_wrapper(wrapper, func))
def make_setup_state(
self, app: "Flask", options: dict, first_registration: bool = False
) -> BlueprintSetupState:
def make_setup_state(self, app, options, first_registration=False):
"""Creates an instance of :meth:`~flask.blueprints.BlueprintSetupState`
object that is later passed to the register callback functions.
Subclasses can override this to return a subclass of the setup state.
"""
return BlueprintSetupState(self, app, options, first_registration)
def register_blueprint(self, blueprint: "Blueprint", **options: t.Any) -> None:
"""Register a :class:`~flask.Blueprint` on this blueprint. Keyword
arguments passed to this method will override the defaults set
on the blueprint.
def register(self, app, options, first_registration=False):
"""Called by :meth:`Flask.register_blueprint` to register all views
and callbacks registered on the blueprint with the application. Creates
a :class:`.BlueprintSetupState` and calls each :meth:`record` callback
with it.
.. versionchanged:: 2.0.1
The ``name`` option can be used to change the (pre-dotted)
name the blueprint is registered with. This allows the same
blueprint to be registered multiple times with unique names
for ``url_for``.
.. versionadded:: 2.0
"""
if blueprint is self:
raise ValueError("Cannot register a blueprint on itself")
self._blueprints.append((blueprint, options))
def register(self, app: "Flask", options: dict) -> None:
"""Called by :meth:`Flask.register_blueprint` to register all
views and callbacks registered on the blueprint with the
application. Creates a :class:`.BlueprintSetupState` and calls
each :meth:`record` callback with it.
:param app: The application this blueprint is being registered
with.
:param app: The application this blueprint is being registered with.
:param options: Keyword arguments forwarded from
:meth:`~Flask.register_blueprint`.
.. versionchanged:: 2.0.1
Nested blueprints are registered with their dotted name.
This allows different blueprints with the same name to be
nested at different locations.
.. versionchanged:: 2.0.1
The ``name`` option can be used to change the (pre-dotted)
name the blueprint is registered with. This allows the same
blueprint to be registered multiple times with unique names
for ``url_for``.
.. versionchanged:: 2.0.1
Registering the same blueprint with the same name multiple
times is deprecated and will become an error in Flask 2.1.
:param first_registration: Whether this is the first time this
blueprint has been registered on the application.
"""
first_registration = not any(bp is self for bp in app.blueprints.values())
name_prefix = options.get("name_prefix", "")
self_name = options.get("name", self.name)
name = f"{name_prefix}.{self_name}".lstrip(".")
if name in app.blueprints:
existing_at = f" '{name}'" if self_name != name else ""
if app.blueprints[name] is not self:
raise ValueError(
f"The name '{self_name}' is already registered for"
f" a different blueprint{existing_at}. Use 'name='"
" to provide a unique name."
)
else:
import warnings
warnings.warn(
f"The name '{self_name}' is already registered for"
f" this blueprint{existing_at}. Use 'name=' to"
" provide a unique name. This will become an error"
" in Flask 2.1.",
stacklevel=4,
)
app.blueprints[name] = self
self._got_registered_once = True
state = self.make_setup_state(app, options, first_registration)
if self.has_static_folder:
state.add_url_rule(
f"{self.static_url_path}/<path:filename>",
self.static_url_path + "/<path:filename>",
view_func=self.send_static_file,
endpoint="static",
)
# Merge blueprint data into parent.
if first_registration:
def extend(bp_dict, parent_dict):
for key, values in bp_dict.items():
key = name if key is None else f"{name}.{key}"
parent_dict[key].extend(values)
for key, value in self.error_handler_spec.items():
key = name if key is None else f"{name}.{key}"
value = defaultdict(
dict,
{
code: {
exc_class: func for exc_class, func in code_values.items()
}
for code, code_values in value.items()
},
)
app.error_handler_spec[key] = value
for endpoint, func in self.view_functions.items():
app.view_functions[endpoint] = func
extend(self.before_request_funcs, app.before_request_funcs)
extend(self.after_request_funcs, app.after_request_funcs)
extend(
self.teardown_request_funcs,
app.teardown_request_funcs,
)
extend(self.url_default_functions, app.url_default_functions)
extend(self.url_value_preprocessors, app.url_value_preprocessors)
extend(self.template_context_processors, app.template_context_processors)
for deferred in self.deferred_functions:
deferred(state)
cli_resolved_group = options.get("cli_group", self.cli_group)
if self.cli.commands:
if cli_resolved_group is None:
app.cli.commands.update(self.cli.commands)
elif cli_resolved_group is _sentinel:
self.cli.name = name
app.cli.add_command(self.cli)
else:
self.cli.name = cli_resolved_group
app.cli.add_command(self.cli)
if not self.cli.commands:
return
for blueprint, bp_options in self._blueprints:
bp_options = bp_options.copy()
bp_url_prefix = bp_options.get("url_prefix")
if cli_resolved_group is None:
app.cli.commands.update(self.cli.commands)
elif cli_resolved_group is _sentinel:
self.cli.name = self.name
app.cli.add_command(self.cli)
else:
self.cli.name = cli_resolved_group
app.cli.add_command(self.cli)
if bp_url_prefix is None:
bp_url_prefix = blueprint.url_prefix
def route(self, rule, **options):
"""Like :meth:`Flask.route` but for a blueprint. The endpoint for the
:func:`url_for` function is prefixed with the name of the blueprint.
"""
if state.url_prefix is not None and bp_url_prefix is not None:
bp_options["url_prefix"] = (
state.url_prefix.rstrip("/") + "/" + bp_url_prefix.lstrip("/")
)
elif bp_url_prefix is not None:
bp_options["url_prefix"] = bp_url_prefix
elif state.url_prefix is not None:
bp_options["url_prefix"] = state.url_prefix
def decorator(f):
endpoint = options.pop("endpoint", f.__name__)
self.add_url_rule(rule, endpoint, f, **options)
return f
bp_options["name_prefix"] = name
blueprint.register(app, bp_options)
return decorator
def add_url_rule(
self,
rule: str,
endpoint: t.Optional[str] = None,
view_func: t.Optional[t.Callable] = None,
provide_automatic_options: t.Optional[bool] = None,
**options: t.Any,
) -> None:
def add_url_rule(self, rule, endpoint=None, view_func=None, **options):
"""Like :meth:`Flask.add_url_rule` but for a blueprint. The endpoint for
the :func:`url_for` function is prefixed with the name of the blueprint.
"""
if endpoint and "." in endpoint:
raise ValueError("'endpoint' may not contain a dot '.' character.")
if endpoint:
assert "." not in endpoint, "Blueprint endpoints should not contain dots"
if view_func and hasattr(view_func, "__name__"):
assert (
"." not in view_func.__name__
), "Blueprint view function name should not contain dots"
self.record(lambda s: s.add_url_rule(rule, endpoint, view_func, **options))
if view_func and hasattr(view_func, "__name__") and "." in view_func.__name__:
raise ValueError("'view_func' name may not contain a dot '.' character.")
def endpoint(self, endpoint):
"""Like :meth:`Flask.endpoint` but for a blueprint. This does not
prefix the endpoint with the blueprint name, this has to be done
explicitly by the user of this method. If the endpoint is prefixed
with a `.` it will be registered to the current blueprint, otherwise
it's an application independent endpoint.
"""
self.record(
lambda s: s.add_url_rule(
rule,
endpoint,
view_func,
provide_automatic_options=provide_automatic_options,
**options,
)
)
def decorator(f):
def register_endpoint(state):
state.app.view_functions[endpoint] = f
def app_template_filter(
self, name: t.Optional[str] = None
) -> t.Callable[[TemplateFilterCallable], TemplateFilterCallable]:
self.record_once(register_endpoint)
return f
return decorator
def app_template_filter(self, name=None):
"""Register a custom template filter, available application wide. Like
:meth:`Flask.template_filter` but for a blueprint.
@ -434,15 +318,13 @@ class Blueprint(Scaffold):
function name will be used.
"""
def decorator(f: TemplateFilterCallable) -> TemplateFilterCallable:
def decorator(f):
self.add_app_template_filter(f, name=name)
return f
return decorator
def add_app_template_filter(
self, f: TemplateFilterCallable, name: t.Optional[str] = None
) -> None:
def add_app_template_filter(self, f, name=None):
"""Register a custom template filter, available application wide. Like
:meth:`Flask.add_template_filter` but for a blueprint. Works exactly
like the :meth:`app_template_filter` decorator.
@ -451,14 +333,12 @@ class Blueprint(Scaffold):
function name will be used.
"""
def register_template(state: BlueprintSetupState) -> None:
def register_template(state):
state.app.jinja_env.filters[name or f.__name__] = f
self.record_once(register_template)
def app_template_test(
self, name: t.Optional[str] = None
) -> t.Callable[[TemplateTestCallable], TemplateTestCallable]:
def app_template_test(self, name=None):
"""Register a custom template test, available application wide. Like
:meth:`Flask.template_test` but for a blueprint.
@ -468,15 +348,13 @@ class Blueprint(Scaffold):
function name will be used.
"""
def decorator(f: TemplateTestCallable) -> TemplateTestCallable:
def decorator(f):
self.add_app_template_test(f, name=name)
return f
return decorator
def add_app_template_test(
self, f: TemplateTestCallable, name: t.Optional[str] = None
) -> None:
def add_app_template_test(self, f, name=None):
"""Register a custom template test, available application wide. Like
:meth:`Flask.add_template_test` but for a blueprint. Works exactly
like the :meth:`app_template_test` decorator.
@ -487,14 +365,12 @@ class Blueprint(Scaffold):
function name will be used.
"""
def register_template(state: BlueprintSetupState) -> None:
def register_template(state):
state.app.jinja_env.tests[name or f.__name__] = f
self.record_once(register_template)
def app_template_global(
self, name: t.Optional[str] = None
) -> t.Callable[[TemplateGlobalCallable], TemplateGlobalCallable]:
def app_template_global(self, name=None):
"""Register a custom template global, available application wide. Like
:meth:`Flask.template_global` but for a blueprint.
@ -504,15 +380,13 @@ class Blueprint(Scaffold):
function name will be used.
"""
def decorator(f: TemplateGlobalCallable) -> TemplateGlobalCallable:
def decorator(f):
self.add_app_template_global(f, name=name)
return f
return decorator
def add_app_template_global(
self, f: TemplateGlobalCallable, name: t.Optional[str] = None
) -> None:
def add_app_template_global(self, f, name=None):
"""Register a custom template global, available application wide. Like
:meth:`Flask.add_template_global` but for a blueprint. Works exactly
like the :meth:`app_template_global` decorator.
@ -523,12 +397,22 @@ class Blueprint(Scaffold):
function name will be used.
"""
def register_template(state: BlueprintSetupState) -> None:
def register_template(state):
state.app.jinja_env.globals[name or f.__name__] = f
self.record_once(register_template)
def before_app_request(self, f: BeforeRequestCallable) -> BeforeRequestCallable:
def before_request(self, f):
"""Like :meth:`Flask.before_request` but for a blueprint. This function
is only executed before each request that is handled by a function of
that blueprint.
"""
self.record_once(
lambda s: s.app.before_request_funcs.setdefault(self.name, []).append(f)
)
return f
def before_app_request(self, f):
"""Like :meth:`Flask.before_request`. Such a function is executed
before each request, even if outside of a blueprint.
"""
@ -537,16 +421,24 @@ class Blueprint(Scaffold):
)
return f
def before_app_first_request(
self, f: BeforeFirstRequestCallable
) -> BeforeFirstRequestCallable:
def before_app_first_request(self, f):
"""Like :meth:`Flask.before_first_request`. Such a function is
executed before the first request to the application.
"""
self.record_once(lambda s: s.app.before_first_request_funcs.append(f))
return f
def after_app_request(self, f: AfterRequestCallable) -> AfterRequestCallable:
def after_request(self, f):
"""Like :meth:`Flask.after_request` but for a blueprint. This function
is only executed after each request that is handled by a function of
that blueprint.
"""
self.record_once(
lambda s: s.app.after_request_funcs.setdefault(self.name, []).append(f)
)
return f
def after_app_request(self, f):
"""Like :meth:`Flask.after_request` but for a blueprint. Such a function
is executed after each request, even if outside of the blueprint.
"""
@ -555,7 +447,19 @@ class Blueprint(Scaffold):
)
return f
def teardown_app_request(self, f: TeardownCallable) -> TeardownCallable:
def teardown_request(self, f):
"""Like :meth:`Flask.teardown_request` but for a blueprint. This
function is only executed when tearing down requests handled by a
function of that blueprint. Teardown request functions are executed
when the request context is popped, even when no actual request was
performed.
"""
self.record_once(
lambda s: s.app.teardown_request_funcs.setdefault(self.name, []).append(f)
)
return f
def teardown_app_request(self, f):
"""Like :meth:`Flask.teardown_request` but for a blueprint. Such a
function is executed when tearing down each request, even if outside of
the blueprint.
@ -565,9 +469,18 @@ class Blueprint(Scaffold):
)
return f
def app_context_processor(
self, f: TemplateContextProcessorCallable
) -> TemplateContextProcessorCallable:
def context_processor(self, f):
"""Like :meth:`Flask.context_processor` but for a blueprint. This
function is only executed for requests handled by a blueprint.
"""
self.record_once(
lambda s: s.app.template_context_processors.setdefault(
self.name, []
).append(f)
)
return f
def app_context_processor(self, f):
"""Like :meth:`Flask.context_processor` but for a blueprint. Such a
function is executed each request, even if outside of the blueprint.
"""
@ -576,29 +489,81 @@ class Blueprint(Scaffold):
)
return f
def app_errorhandler(self, code: t.Union[t.Type[Exception], int]) -> t.Callable:
def app_errorhandler(self, code):
"""Like :meth:`Flask.errorhandler` but for a blueprint. This
handler is used for all requests, even if outside of the blueprint.
"""
def decorator(f: ErrorHandlerCallable) -> ErrorHandlerCallable:
def decorator(f):
self.record_once(lambda s: s.app.errorhandler(code)(f))
return f
return decorator
def app_url_value_preprocessor(
self, f: URLValuePreprocessorCallable
) -> URLValuePreprocessorCallable:
"""Same as :meth:`url_value_preprocessor` but application wide."""
def url_value_preprocessor(self, f):
"""Registers a function as URL value preprocessor for this
blueprint. It's called before the view functions are called and
can modify the url values provided.
"""
self.record_once(
lambda s: s.app.url_value_preprocessors.setdefault(self.name, []).append(f)
)
return f
def url_defaults(self, f):
"""Callback function for URL defaults for this blueprint. It's called
with the endpoint and values and should update the values passed
in place.
"""
self.record_once(
lambda s: s.app.url_default_functions.setdefault(self.name, []).append(f)
)
return f
def app_url_value_preprocessor(self, f):
"""Same as :meth:`url_value_preprocessor` but application wide.
"""
self.record_once(
lambda s: s.app.url_value_preprocessors.setdefault(None, []).append(f)
)
return f
def app_url_defaults(self, f: URLDefaultCallable) -> URLDefaultCallable:
"""Same as :meth:`url_defaults` but application wide."""
def app_url_defaults(self, f):
"""Same as :meth:`url_defaults` but application wide.
"""
self.record_once(
lambda s: s.app.url_default_functions.setdefault(None, []).append(f)
)
return f
def errorhandler(self, code_or_exception):
"""Registers an error handler that becomes active for this blueprint
only. Please be aware that routing does not happen local to a
blueprint so an error handler for 404 usually is not handled by
a blueprint unless it is caused inside a view function. Another
special case is the 500 internal server error which is always looked
up from the application.
Otherwise works as the :meth:`~flask.Flask.errorhandler` decorator
of the :class:`~flask.Flask` object.
"""
def decorator(f):
self.record_once(
lambda s: s.app._register_error_handler(self.name, code_or_exception, f)
)
return f
return decorator
def register_error_handler(self, code_or_exception, f):
"""Non-decorator version of the :meth:`errorhandler` error attach
function, akin to the :meth:`~flask.Flask.register_error_handler`
application-wide function of the :class:`~flask.Flask` object but
for error handlers limited to this blueprint.
.. versionadded:: 0.11
"""
self.record_once(
lambda s: s.app._register_error_handler(self.name, code_or_exception, f)
)

View file

@ -1,3 +1,15 @@
# -*- coding: utf-8 -*-
"""
flask.cli
~~~~~~~~~
A simple command line application to run flask apps.
:copyright: 2010 Pallets
:license: BSD-3-Clause
"""
from __future__ import print_function
import ast
import inspect
import os
@ -5,7 +17,6 @@ import platform
import re
import sys
import traceback
import warnings
from functools import update_wrapper
from operator import attrgetter
from threading import Lock
@ -14,6 +25,10 @@ from threading import Thread
import click
from werkzeug.utils import import_string
from ._compat import getargspec
from ._compat import itervalues
from ._compat import reraise
from ._compat import text_type
from .globals import current_app
from .helpers import get_debug_flag
from .helpers import get_env
@ -27,7 +42,7 @@ except ImportError:
try:
import ssl
except ImportError:
ssl = None # type: ignore
ssl = None
class NoAppException(click.UsageError):
@ -48,15 +63,15 @@ def find_best_app(script_info, module):
return app
# Otherwise find the only object that is a Flask instance.
matches = [v for v in module.__dict__.values() if isinstance(v, Flask)]
matches = [v for v in itervalues(module.__dict__) if isinstance(v, Flask)]
if len(matches) == 1:
return matches[0]
elif len(matches) > 1:
raise NoAppException(
"Detected multiple Flask applications in module"
f" {module.__name__!r}. Use 'FLASK_APP={module.__name__}:name'"
f" to specify the correct one."
'Detected multiple Flask applications in module "{module}". Use '
'"FLASK_APP={module}:name" to specify the correct '
"one.".format(module=module.__name__)
)
# Search for app factory functions.
@ -73,140 +88,109 @@ def find_best_app(script_info, module):
if not _called_with_wrong_args(app_factory):
raise
raise NoAppException(
f"Detected factory {attr_name!r} in module {module.__name__!r},"
" but could not call it without arguments. Use"
f" \"FLASK_APP='{module.__name__}:{attr_name}(args)'\""
" to specify arguments."
'Detected factory "{factory}" in module "{module}", but '
"could not call it without arguments. Use "
"\"FLASK_APP='{module}:{factory}(args)'\" to specify "
"arguments.".format(factory=attr_name, module=module.__name__)
)
raise NoAppException(
"Failed to find Flask application or factory in module"
f" {module.__name__!r}. Use 'FLASK_APP={module.__name__}:name'"
" to specify one."
'Failed to find Flask application or factory in module "{module}". '
'Use "FLASK_APP={module}:name to specify one.'.format(module=module.__name__)
)
def call_factory(script_info, app_factory, args=None, kwargs=None):
def call_factory(script_info, app_factory, arguments=()):
"""Takes an app factory, a ``script_info` object and optionally a tuple
of arguments. Checks for the existence of a script_info argument and calls
the app_factory depending on that and the arguments provided.
"""
sig = inspect.signature(app_factory)
args = [] if args is None else args
kwargs = {} if kwargs is None else kwargs
args_spec = getargspec(app_factory)
arg_names = args_spec.args
arg_defaults = args_spec.defaults
if "script_info" in sig.parameters:
warnings.warn(
"The 'script_info' argument is deprecated and will not be"
" passed to the app factory function in Flask 2.1.",
DeprecationWarning,
)
kwargs["script_info"] = script_info
if "script_info" in arg_names:
return app_factory(*arguments, script_info=script_info)
elif arguments:
return app_factory(*arguments)
elif not arguments and len(arg_names) == 1 and arg_defaults is None:
return app_factory(script_info)
if (
not args
and len(sig.parameters) == 1
and next(iter(sig.parameters.values())).default is inspect.Parameter.empty
):
warnings.warn(
"Script info is deprecated and will not be passed as the"
" single argument to the app factory function in Flask"
" 2.1.",
DeprecationWarning,
)
args.append(script_info)
return app_factory(*args, **kwargs)
return app_factory()
def _called_with_wrong_args(f):
def _called_with_wrong_args(factory):
"""Check whether calling a function raised a ``TypeError`` because
the call failed or because something in the factory raised the
error.
:param f: The function that was called.
:return: ``True`` if the call failed.
:param factory: the factory function that was called
:return: true if the call failed
"""
tb = sys.exc_info()[2]
try:
while tb is not None:
if tb.tb_frame.f_code is f.__code__:
# In the function, it was called successfully.
if tb.tb_frame.f_code is factory.__code__:
# in the factory, it was called successfully
return False
tb = tb.tb_next
# Didn't reach the function.
# didn't reach the factory
return True
finally:
# Delete tb to break a circular reference.
# explicitly delete tb as it is circular referenced
# https://docs.python.org/2/library/sys.html#sys.exc_info
del tb
def find_app_by_string(script_info, module, app_name):
"""Check if the given string is a variable name or a function. Call
a function to get the app instance, or return the variable directly.
"""Checks if the given string is a variable name or a function. If it is a
function, it checks for specified arguments and whether it takes a
``script_info`` argument and calls the function with the appropriate
arguments.
"""
from . import Flask
# Parse app_name as a single expression to determine if it's a valid
# attribute name or function call.
try:
expr = ast.parse(app_name.strip(), mode="eval").body
except SyntaxError:
match = re.match(r"^ *([^ ()]+) *(?:\((.*?) *,? *\))? *$", app_name)
if not match:
raise NoAppException(
f"Failed to parse {app_name!r} as an attribute name or function call."
'"{name}" is not a valid variable name or function '
"expression.".format(name=app_name)
)
if isinstance(expr, ast.Name):
name = expr.id
args = kwargs = None
elif isinstance(expr, ast.Call):
# Ensure the function name is an attribute name only.
if not isinstance(expr.func, ast.Name):
raise NoAppException(
f"Function reference must be a simple name: {app_name!r}."
)
name = expr.func.id
# Parse the positional and keyword arguments as literals.
try:
args = [ast.literal_eval(arg) for arg in expr.args]
kwargs = {kw.arg: ast.literal_eval(kw.value) for kw in expr.keywords}
except ValueError:
# literal_eval gives cryptic error messages, show a generic
# message with the full expression instead.
raise NoAppException(
f"Failed to parse arguments as literal values: {app_name!r}."
)
else:
raise NoAppException(
f"Failed to parse {app_name!r} as an attribute name or function call."
)
name, args = match.groups()
try:
attr = getattr(module, name)
except AttributeError:
raise NoAppException(
f"Failed to find attribute {name!r} in {module.__name__!r}."
)
except AttributeError as e:
raise NoAppException(e.args[0])
# If the attribute is a function, call it with any args and kwargs
# to get the real application.
if inspect.isfunction(attr):
if args:
try:
args = ast.literal_eval("({args},)".format(args=args))
except (ValueError, SyntaxError) as e:
raise NoAppException(
"Could not parse the arguments in "
'"{app_name}".'.format(e=e, app_name=app_name)
)
else:
args = ()
try:
app = call_factory(script_info, attr, args, kwargs)
except TypeError:
app = call_factory(script_info, attr, args)
except TypeError as e:
if not _called_with_wrong_args(attr):
raise
raise NoAppException(
f"The factory {app_name!r} in module"
f" {module.__name__!r} could not be called with the"
" specified arguments."
'{e}\nThe factory "{app_name}" in module "{module}" could not '
"be called with the specified arguments.".format(
e=e, app_name=app_name, module=module.__name__
)
)
else:
app = attr
@ -215,8 +199,8 @@ def find_app_by_string(script_info, module, app_name):
return app
raise NoAppException(
"A valid Flask application was not obtained from"
f" '{module.__name__}:{app_name}'."
"A valid Flask application was not obtained from "
'"{module}:{app_name}".'.format(module=module.__name__, app_name=app_name)
)
@ -257,13 +241,13 @@ def locate_app(script_info, module_name, app_name, raise_if_not_found=True):
except ImportError:
# Reraise the ImportError if it occurred within the imported module.
# Determine this by checking whether the trace has a depth > 1.
if sys.exc_info()[2].tb_next:
if sys.exc_info()[-1].tb_next:
raise NoAppException(
f"While importing {module_name!r}, an ImportError was"
f" raised:\n\n{traceback.format_exc()}"
'While importing "{name}", an ImportError was raised:'
"\n\n{tb}".format(name=module_name, tb=traceback.format_exc())
)
elif raise_if_not_found:
raise NoAppException(f"Could not import {module_name!r}.")
raise NoAppException('Could not import "{name}".'.format(name=module_name))
else:
return
@ -282,10 +266,14 @@ def get_version(ctx, param, value):
import werkzeug
from . import __version__
message = "Python %(python)s\nFlask %(flask)s\nWerkzeug %(werkzeug)s"
click.echo(
f"Python {platform.python_version()}\n"
f"Flask {__version__}\n"
f"Werkzeug {werkzeug.__version__}",
message
% {
"python": platform.python_version(),
"flask": __version__,
"werkzeug": werkzeug.__version__,
},
color=ctx.color,
)
ctx.exit()
@ -301,22 +289,18 @@ version_option = click.Option(
)
class DispatchingApp:
class DispatchingApp(object):
"""Special application that dispatches to a Flask application which
is imported by name in a background thread. If an error happens
it is recorded and shown as part of the WSGI handling which in case
of the Werkzeug debugger means that it shows up in the browser.
"""
def __init__(self, loader, use_eager_loading=None):
def __init__(self, loader, use_eager_loading=False):
self.loader = loader
self._app = None
self._lock = Lock()
self._bg_loading_exc_info = None
if use_eager_loading is None:
use_eager_loading = os.environ.get("WERKZEUG_RUN_MAIN") != "true"
if use_eager_loading:
self._load_unlocked()
else:
@ -339,7 +323,7 @@ class DispatchingApp:
exc_info = self._bg_loading_exc_info
if exc_info is not None:
self._bg_loading_exc_info = None
raise exc_info
reraise(*exc_info)
def _load_unlocked(self):
__traceback_hide__ = True # noqa: F841
@ -360,7 +344,7 @@ class DispatchingApp:
return rv(environ, start_response)
class ScriptInfo:
class ScriptInfo(object):
"""Helper object to deal with Flask applications. This is usually not
necessary to interface with as it's used internally in the dispatching
to click. In future versions of Flask this object will most likely play
@ -391,6 +375,8 @@ class ScriptInfo:
if self._loaded_app is not None:
return self._loaded_app
app = None
if self.create_app is not None:
app = call_factory(self, self.create_app)
else:
@ -478,7 +464,9 @@ class FlaskGroup(AppGroup):
loading more commands from the configured Flask app. Normally a
developer does not have to interface with this class but there are
some very advanced use cases for which it makes sense to create an
instance of this. see :ref:`custom-scripts`.
instance of this.
For information as of why this is useful see :ref:`custom-scripts`.
:param add_default_commands: if this is True then the default run and
shell commands will be added.
@ -503,7 +491,7 @@ class FlaskGroup(AppGroup):
add_version_option=True,
load_dotenv=True,
set_debug_flag=True,
**extra,
**extra
):
params = list(extra.pop("params", None) or ())
@ -537,41 +525,43 @@ class FlaskGroup(AppGroup):
def get_command(self, ctx, name):
self._load_plugin_commands()
# Look up built-in and plugin commands, which should be
# available even if the app fails to load.
rv = super().get_command(ctx, name)
# We load built-in commands first as these should always be the
# same no matter what the app does. If the app does want to
# override this it needs to make a custom instance of this group
# and not attach the default commands.
#
# This also means that the script stays functional in case the
# application completely fails.
rv = AppGroup.get_command(self, ctx, name)
if rv is not None:
return rv
info = ctx.ensure_object(ScriptInfo)
# Look up commands provided by the app, showing an error and
# continuing if the app couldn't be loaded.
try:
return info.load_app().cli.get_command(ctx, name)
except NoAppException as e:
click.secho(f"Error: {e.format_message()}\n", err=True, fg="red")
rv = info.load_app().cli.get_command(ctx, name)
if rv is not None:
return rv
except NoAppException:
pass
def list_commands(self, ctx):
self._load_plugin_commands()
# Start with the built-in and plugin commands.
rv = set(super().list_commands(ctx))
info = ctx.ensure_object(ScriptInfo)
# Add commands provided by the app, showing an error and
# continuing if the app couldn't be loaded.
# The commands available is the list of both the application (if
# available) plus the builtin commands.
rv = set(click.Group.list_commands(self, ctx))
info = ctx.ensure_object(ScriptInfo)
try:
rv.update(info.load_app().cli.list_commands(ctx))
except NoAppException as e:
# When an app couldn't be loaded, show the error message
# without the traceback.
click.secho(f"Error: {e.format_message()}\n", err=True, fg="red")
except Exception:
# When any other errors occurred during loading, show the
# full traceback.
click.secho(f"{traceback.format_exc()}\n", err=True, fg="red")
# Here we intentionally swallow all exceptions as we don't
# want the help page to break if the app does not exist.
# If someone attempts to use the command we try to create
# the app again and this will give us the error.
# However, we will not do so silently because that would confuse
# users.
traceback.print_exc()
return sorted(rv)
def main(self, *args, **kwargs):
@ -593,7 +583,7 @@ class FlaskGroup(AppGroup):
kwargs["obj"] = obj
kwargs.setdefault("auto_envvar_prefix", "FLASK")
return super().main(*args, **kwargs)
return super(FlaskGroup, self).main(*args, **kwargs)
def _path_is_ancestor(path, other):
@ -609,6 +599,10 @@ def load_dotenv(path=None):
If an env var is already set it is not overwritten, so earlier files in the
list are preferred over later files.
Changes the current working directory to the location of the first file
found, with the assumption that it is in the top level project directory
and will be where the Python path should import local packages from.
This is a no-op if `python-dotenv`_ is not installed.
.. _python-dotenv: https://github.com/theskumar/python-dotenv#readme
@ -620,9 +614,6 @@ def load_dotenv(path=None):
Returns ``False`` when python-dotenv is not installed, or when
the given path isn't a file.
.. versionchanged:: 2.0
When loading the env files, set the default encoding to UTF-8.
.. versionadded:: 1.0
"""
if dotenv is None:
@ -640,7 +631,7 @@ def load_dotenv(path=None):
# else False
if path is not None:
if os.path.isfile(path):
return dotenv.load_dotenv(path, encoding="utf-8")
return dotenv.load_dotenv(path)
return False
@ -655,7 +646,10 @@ def load_dotenv(path=None):
if new_dir is None:
new_dir = os.path.dirname(path)
dotenv.load_dotenv(path, encoding="utf-8")
dotenv.load_dotenv(path)
if new_dir and os.getcwd() != new_dir:
os.chdir(new_dir)
return new_dir is not None # at least one file was located and loaded
@ -668,25 +662,25 @@ def show_server_banner(env, debug, app_import_path, eager_loading):
return
if app_import_path is not None:
message = f" * Serving Flask app {app_import_path!r}"
message = ' * Serving Flask app "{0}"'.format(app_import_path)
if not eager_loading:
message += " (lazy loading)"
click.echo(message)
click.echo(f" * Environment: {env}")
click.echo(" * Environment: {0}".format(env))
if env == "production":
click.secho(
" WARNING: This is a development server. Do not use it in"
" a production deployment.",
" WARNING: This is a development server. "
"Do not use it in a production deployment.",
fg="red",
)
click.secho(" Use a production WSGI server instead.", dim=True)
if debug is not None:
click.echo(f" * Debug mode: {'on' if debug else 'off'}")
click.echo(" * Debug mode: {0}".format("on" if debug else "off"))
class CertParamType(click.ParamType):
@ -715,20 +709,22 @@ class CertParamType(click.ParamType):
if value == "adhoc":
try:
import cryptography # noqa: F401
import OpenSSL # noqa: F401
except ImportError:
raise click.BadParameter(
"Using ad-hoc certificates requires the cryptography library.",
ctx,
param,
"Using ad-hoc certificates requires pyOpenSSL.", ctx, param
)
return value
obj = import_string(value, silent=True)
if isinstance(obj, ssl.SSLContext):
return obj
if sys.version_info < (2, 7, 9):
if obj:
return obj
else:
if isinstance(obj, ssl.SSLContext):
return obj
raise
@ -739,7 +735,11 @@ def _validate_key(ctx, param, value):
"""
cert = ctx.params.get("cert")
is_adhoc = cert == "adhoc"
is_context = ssl and isinstance(cert, ssl.SSLContext)
if sys.version_info < (2, 7, 9):
is_context = cert and not isinstance(cert, (text_type, bytes))
else:
is_context = isinstance(cert, ssl.SSLContext)
if value is not None:
if is_adhoc:
@ -772,7 +772,7 @@ class SeparatedPathType(click.Path):
def convert(self, value, param, ctx):
items = self.split_envvar_value(value)
super_convert = super().convert
super_convert = super(SeparatedPathType, self).convert
return [super_convert(item, param, ctx) for item in items]
@ -802,7 +802,7 @@ class SeparatedPathType(click.Path):
"is active if debug is enabled.",
)
@click.option(
"--eager-loading/--lazy-loading",
"--eager-loading/--lazy-loader",
default=None,
help="Enable or disable eager loading. By default eager "
"loading is enabled if the reloader is disabled.",
@ -818,7 +818,7 @@ class SeparatedPathType(click.Path):
type=SeparatedPathType(),
help=(
"Extra files that trigger a reload on change. Multiple paths"
f" are separated by {os.path.pathsep!r}."
" are separated by '{}'.".format(os.path.pathsep)
),
)
@pass_script_info
@ -841,6 +841,9 @@ def run_command(
if debugger is None:
debugger = debug
if eager_loading is None:
eager_loading = not reload
show_server_banner(get_env(), debug, info.app_import_path, eager_loading)
app = DispatchingApp(info.load_app, use_eager_loading=eager_loading)
@ -860,10 +863,10 @@ def run_command(
@click.command("shell", short_help="Run a shell in the app context.")
@with_appcontext
def shell_command() -> None:
def shell_command():
"""Run an interactive Python shell in the context of a given
Flask application. The application will populate the default
namespace of this shell according to its configuration.
namespace of this shell according to it's configuration.
This is useful for executing small snippets of management code
without having to manually configure the application.
@ -872,40 +875,24 @@ def shell_command() -> None:
from .globals import _app_ctx_stack
app = _app_ctx_stack.top.app
banner = (
f"Python {sys.version} on {sys.platform}\n"
f"App: {app.import_name} [{app.env}]\n"
f"Instance: {app.instance_path}"
banner = "Python %s on %s\nApp: %s [%s]\nInstance: %s" % (
sys.version,
sys.platform,
app.import_name,
app.env,
app.instance_path,
)
ctx: dict = {}
ctx = {}
# Support the regular Python interpreter startup script if someone
# is using it.
startup = os.environ.get("PYTHONSTARTUP")
if startup and os.path.isfile(startup):
with open(startup) as f:
with open(startup, "r") as f:
eval(compile(f.read(), startup, "exec"), ctx)
ctx.update(app.make_shell_context())
# Site, customize, or startup script can set a hook to call when
# entering interactive mode. The default one sets up readline with
# tab and history completion.
interactive_hook = getattr(sys, "__interactivehook__", None)
if interactive_hook is not None:
try:
import readline
from rlcompleter import Completer
except ImportError:
pass
else:
# rlcompleter uses __main__.__dict__ by default, which is
# flask.__main__. Use the shell context instead.
readline.set_completer(Completer(ctx).complete)
interactive_hook()
code.interact(banner=banner, local=ctx)
@ -922,7 +909,7 @@ def shell_command() -> None:
)
@click.option("--all-methods", is_flag=True, help="Show HEAD and OPTIONS methods.")
@with_appcontext
def routes_command(sort: str, all_methods: bool) -> None:
def routes_command(sort, all_methods):
"""Show all registered routes with endpoints and methods."""
rules = list(current_app.url_map.iter_rules())
@ -935,12 +922,9 @@ def routes_command(sort: str, all_methods: bool) -> None:
if sort in ("endpoint", "rule"):
rules = sorted(rules, key=attrgetter(sort))
elif sort == "methods":
rules = sorted(rules, key=lambda rule: sorted(rule.methods)) # type: ignore
rules = sorted(rules, key=lambda rule: sorted(rule.methods))
rule_methods = [
", ".join(sorted(rule.methods - ignored_methods)) # type: ignore
for rule in rules
]
rule_methods = [", ".join(sorted(rule.methods - ignored_methods)) for rule in rules]
headers = ("Endpoint", "Methods", "Rule")
widths = (
@ -978,17 +962,10 @@ debug mode.
)
def main() -> None:
if int(click.__version__[0]) < 8:
warnings.warn(
"Using the `flask` cli with Click 7 is deprecated and"
" will not be supported starting with Flask 2.1."
" Please upgrade to Click 8 as soon as possible.",
DeprecationWarning,
)
def main(as_module=False):
# TODO omit sys.argv once https://github.com/pallets/click/issues/536 is fixed
cli.main(args=sys.argv[1:])
cli.main(args=sys.argv[1:], prog_name="python -m flask" if as_module else None)
if __name__ == "__main__":
main()
main(as_module=True)

View file

@ -1,19 +1,32 @@
# -*- coding: utf-8 -*-
"""
flask.config
~~~~~~~~~~~~
Implements the configuration related objects.
:copyright: 2010 Pallets
:license: BSD-3-Clause
"""
import errno
import os
import types
import typing as t
from werkzeug.utils import import_string
from . import json
from ._compat import iteritems
from ._compat import string_types
class ConfigAttribute:
class ConfigAttribute(object):
"""Makes an attribute forward to the config"""
def __init__(self, name: str, get_converter: t.Optional[t.Callable] = None) -> None:
def __init__(self, name, get_converter=None):
self.__name__ = name
self.get_converter = get_converter
def __get__(self, obj: t.Any, owner: t.Any = None) -> t.Any:
def __get__(self, obj, type=None):
if obj is None:
return self
rv = obj.config[self.__name__]
@ -21,7 +34,7 @@ class ConfigAttribute:
rv = self.get_converter(rv)
return rv
def __set__(self, obj: t.Any, value: t.Any) -> None:
def __set__(self, obj, value):
obj.config[self.__name__] = value
@ -69,11 +82,11 @@ class Config(dict):
:param defaults: an optional dictionary of default values
"""
def __init__(self, root_path: str, defaults: t.Optional[dict] = None) -> None:
def __init__(self, root_path, defaults=None):
dict.__init__(self, defaults or {})
self.root_path = root_path
def from_envvar(self, variable_name: str, silent: bool = False) -> bool:
def from_envvar(self, variable_name, silent=False):
"""Loads a configuration from an environment variable pointing to
a configuration file. This is basically just a shortcut with nicer
error messages for this line of code::
@ -90,14 +103,14 @@ class Config(dict):
if silent:
return False
raise RuntimeError(
f"The environment variable {variable_name!r} is not set"
" and as such configuration could not be loaded. Set"
" this variable and make it point to a configuration"
" file"
"The environment variable %r is not set "
"and as such configuration could not be "
"loaded. Set this variable and make it "
"point to a configuration file" % variable_name
)
return self.from_pyfile(rv, silent=silent)
def from_pyfile(self, filename: str, silent: bool = False) -> bool:
def from_pyfile(self, filename, silent=False):
"""Updates the values in the config from a Python file. This function
behaves as if the file was imported as module with the
:meth:`from_object` function.
@ -117,15 +130,15 @@ class Config(dict):
try:
with open(filename, mode="rb") as config_file:
exec(compile(config_file.read(), filename, "exec"), d.__dict__)
except OSError as e:
except IOError as e:
if silent and e.errno in (errno.ENOENT, errno.EISDIR, errno.ENOTDIR):
return False
e.strerror = f"Unable to load configuration file ({e.strerror})"
e.strerror = "Unable to load configuration file (%s)" % e.strerror
raise
self.from_object(d)
return True
def from_object(self, obj: t.Union[object, str]) -> None:
def from_object(self, obj):
"""Updates the values from the given object. An object can be of one
of the following two types:
@ -157,96 +170,61 @@ class Config(dict):
:param obj: an import name or object
"""
if isinstance(obj, str):
if isinstance(obj, string_types):
obj = import_string(obj)
for key in dir(obj):
if key.isupper():
self[key] = getattr(obj, key)
def from_file(
self,
filename: str,
load: t.Callable[[t.IO[t.Any]], t.Mapping],
silent: bool = False,
) -> bool:
"""Update the values in the config from a file that is loaded
using the ``load`` parameter. The loaded data is passed to the
:meth:`from_mapping` method.
def from_json(self, filename, silent=False):
"""Updates the values in the config from a JSON file. This function
behaves as if the JSON object was a dictionary and passed to the
:meth:`from_mapping` function.
.. code-block:: python
:param filename: the filename of the JSON file. This can either be an
absolute filename or a filename relative to the
root path.
:param silent: set to ``True`` if you want silent failure for missing
files.
import toml
app.config.from_file("config.toml", load=toml.load)
:param filename: The path to the data file. This can be an
absolute path or relative to the config root path.
:param load: A callable that takes a file handle and returns a
mapping of loaded data from the file.
:type load: ``Callable[[Reader], Mapping]`` where ``Reader``
implements a ``read`` method.
:param silent: Ignore the file if it doesn't exist.
.. versionadded:: 2.0
.. versionadded:: 0.11
"""
filename = os.path.join(self.root_path, filename)
try:
with open(filename) as f:
obj = load(f)
except OSError as e:
with open(filename) as json_file:
obj = json.loads(json_file.read())
except IOError as e:
if silent and e.errno in (errno.ENOENT, errno.EISDIR):
return False
e.strerror = f"Unable to load configuration file ({e.strerror})"
e.strerror = "Unable to load configuration file (%s)" % e.strerror
raise
return self.from_mapping(obj)
def from_json(self, filename: str, silent: bool = False) -> bool:
"""Update the values in the config from a JSON file. The loaded
data is passed to the :meth:`from_mapping` method.
:param filename: The path to the JSON file. This can be an
absolute path or relative to the config root path.
:param silent: Ignore the file if it doesn't exist.
.. deprecated:: 2.0.0
Will be removed in Flask 2.1. Use :meth:`from_file` instead.
This was removed early in 2.0.0, was added back in 2.0.1.
.. versionadded:: 0.11
"""
import warnings
from . import json
warnings.warn(
"'from_json' is deprecated and will be removed in Flask"
" 2.1. Use 'from_file(path, json.load)' instead.",
DeprecationWarning,
stacklevel=2,
)
return self.from_file(filename, json.load, silent=silent)
def from_mapping(
self, mapping: t.Optional[t.Mapping[str, t.Any]] = None, **kwargs: t.Any
) -> bool:
def from_mapping(self, *mapping, **kwargs):
"""Updates the config like :meth:`update` ignoring items with non-upper
keys.
.. versionadded:: 0.11
"""
mappings: t.Dict[str, t.Any] = {}
if mapping is not None:
mappings.update(mapping)
mappings.update(kwargs)
for key, value in mappings.items():
if key.isupper():
self[key] = value
mappings = []
if len(mapping) == 1:
if hasattr(mapping[0], "items"):
mappings.append(mapping[0].items())
else:
mappings.append(mapping[0])
elif len(mapping) > 1:
raise TypeError(
"expected at most 1 positional argument, got %d" % len(mapping)
)
mappings.append(kwargs.items())
for mapping in mappings:
for (key, value) in mapping:
if key.isupper():
self[key] = value
return True
def get_namespace(
self, namespace: str, lowercase: bool = True, trim_namespace: bool = True
) -> t.Dict[str, t.Any]:
def get_namespace(self, namespace, lowercase=True, trim_namespace=True):
"""Returns a dictionary containing a subset of configuration options
that match the specified namespace/prefix. Example usage::
@ -275,7 +253,7 @@ class Config(dict):
.. versionadded:: 0.11
"""
rv = {}
for k, v in self.items():
for k, v in iteritems(self):
if not k.startswith(namespace):
continue
if trim_namespace:
@ -287,5 +265,5 @@ class Config(dict):
rv[key] = v
return rv
def __repr__(self) -> str:
return f"<{type(self).__name__} {dict.__repr__(self)}>"
def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, dict.__repr__(self))

View file

@ -1,27 +1,31 @@
# -*- coding: utf-8 -*-
"""
flask.ctx
~~~~~~~~~
Implements the objects required to keep the context.
:copyright: 2010 Pallets
:license: BSD-3-Clause
"""
import sys
import typing as t
from functools import update_wrapper
from types import TracebackType
from werkzeug.exceptions import HTTPException
from ._compat import BROKEN_PYPY_CTXMGR_EXIT
from ._compat import reraise
from .globals import _app_ctx_stack
from .globals import _request_ctx_stack
from .signals import appcontext_popped
from .signals import appcontext_pushed
from .typing import AfterRequestCallable
if t.TYPE_CHECKING:
from .app import Flask
from .sessions import SessionMixin
from .wrappers import Request
# a singleton sentinel value for parameter defaults
_sentinel = object()
class _AppCtxGlobals:
class _AppCtxGlobals(object):
"""A plain object. Used as a namespace for storing data during an
application context.
@ -41,25 +45,7 @@ class _AppCtxGlobals:
.. versionadded:: 0.10
"""
# Define attr methods to let mypy know this is a namespace object
# that has arbitrary attributes.
def __getattr__(self, name: str) -> t.Any:
try:
return self.__dict__[name]
except KeyError:
raise AttributeError(name) from None
def __setattr__(self, name: str, value: t.Any) -> None:
self.__dict__[name] = value
def __delattr__(self, name: str) -> None:
try:
del self.__dict__[name]
except KeyError:
raise AttributeError(name) from None
def get(self, name: str, default: t.Optional[t.Any] = None) -> t.Any:
def get(self, name, default=None):
"""Get an attribute by name, or a default value. Like
:meth:`dict.get`.
@ -70,12 +56,12 @@ class _AppCtxGlobals:
"""
return self.__dict__.get(name, default)
def pop(self, name: str, default: t.Any = _sentinel) -> t.Any:
def pop(self, name, default=_sentinel):
"""Get and remove an attribute by name. Like :meth:`dict.pop`.
:param name: Name of attribute to pop.
:param default: Value to return if the attribute is not present,
instead of raising a ``KeyError``.
instead of raise a ``KeyError``.
.. versionadded:: 0.11
"""
@ -84,32 +70,32 @@ class _AppCtxGlobals:
else:
return self.__dict__.pop(name, default)
def setdefault(self, name: str, default: t.Any = None) -> t.Any:
def setdefault(self, name, default=None):
"""Get the value of an attribute if it is present, otherwise
set and return a default value. Like :meth:`dict.setdefault`.
:param name: Name of attribute to get.
:param default: Value to set and return if the attribute is not
:param: default: Value to set and return if the attribute is not
present.
.. versionadded:: 0.11
"""
return self.__dict__.setdefault(name, default)
def __contains__(self, item: str) -> bool:
def __contains__(self, item):
return item in self.__dict__
def __iter__(self) -> t.Iterator[str]:
def __iter__(self):
return iter(self.__dict__)
def __repr__(self) -> str:
def __repr__(self):
top = _app_ctx_stack.top
if top is not None:
return f"<flask.g of {top.app.name!r}>"
return "<flask.g of %r>" % top.app.name
return object.__repr__(self)
def after_this_request(f: AfterRequestCallable) -> AfterRequestCallable:
def after_this_request(f):
"""Executes a function after this request. This is useful to modify
response objects. The function is passed the response object and has
to return the same or a new one.
@ -134,7 +120,7 @@ def after_this_request(f: AfterRequestCallable) -> AfterRequestCallable:
return f
def copy_current_request_context(f: t.Callable) -> t.Callable:
def copy_current_request_context(f):
"""A helper function that decorates a function to retain the current
request context. This is useful when working with greenlets. The moment
the function is decorated a copy of the request context is created and
@ -174,7 +160,7 @@ def copy_current_request_context(f: t.Callable) -> t.Callable:
return update_wrapper(wrapper, f)
def has_request_context() -> bool:
def has_request_context():
"""If you have code that wants to test if a request context is there or
not this function can be used. For instance, you may want to take advantage
of request information if the request object is available, but fail
@ -206,7 +192,7 @@ def has_request_context() -> bool:
return _request_ctx_stack.top is not None
def has_app_context() -> bool:
def has_app_context():
"""Works like :func:`has_request_context` but for the application
context. You can also just do a boolean check on the
:data:`current_app` object instead.
@ -216,7 +202,7 @@ def has_app_context() -> bool:
return _app_ctx_stack.top is not None
class AppContext:
class AppContext(object):
"""The application context binds an application object implicitly
to the current thread or greenlet, similar to how the
:class:`RequestContext` binds request information. The application
@ -225,7 +211,7 @@ class AppContext:
context.
"""
def __init__(self, app: "Flask") -> None:
def __init__(self, app):
self.app = app
self.url_adapter = app.create_url_adapter(None)
self.g = app.app_ctx_globals_class()
@ -234,13 +220,15 @@ class AppContext:
# but there a basic "refcount" is enough to track them.
self._refcnt = 0
def push(self) -> None:
def push(self):
"""Binds the app context to the current context."""
self._refcnt += 1
if hasattr(sys, "exc_clear"):
sys.exc_clear()
_app_ctx_stack.push(self)
appcontext_pushed.send(self.app)
def pop(self, exc: t.Optional[BaseException] = _sentinel) -> None: # type: ignore
def pop(self, exc=_sentinel):
"""Pops the app context."""
try:
self._refcnt -= 1
@ -250,20 +238,21 @@ class AppContext:
self.app.do_teardown_appcontext(exc)
finally:
rv = _app_ctx_stack.pop()
assert rv is self, f"Popped wrong app context. ({rv!r} instead of {self!r})"
assert rv is self, "Popped wrong app context. (%r instead of %r)" % (rv, self)
appcontext_popped.send(self.app)
def __enter__(self) -> "AppContext":
def __enter__(self):
self.push()
return self
def __exit__(
self, exc_type: type, exc_value: BaseException, tb: TracebackType
) -> None:
def __exit__(self, exc_type, exc_value, tb):
self.pop(exc_value)
if BROKEN_PYPY_CTXMGR_EXIT and exc_type is not None:
reraise(exc_type, exc_value, tb)
class RequestContext:
class RequestContext(object):
"""The request context contains all request relevant information. It is
created at the beginning of the request and pushed to the
`_request_ctx_stack` and removed at the end of it. It will create the
@ -293,13 +282,7 @@ class RequestContext:
that situation, otherwise your unittests will leak memory.
"""
def __init__(
self,
app: "Flask",
environ: dict,
request: t.Optional["Request"] = None,
session: t.Optional["SessionMixin"] = None,
) -> None:
def __init__(self, app, environ, request=None, session=None):
self.app = app
if request is None:
request = app.request_class(environ)
@ -316,7 +299,7 @@ class RequestContext:
# other request contexts. Now only if the last level is popped we
# get rid of them. Additionally if an application context is missing
# one is created implicitly so for each level we add this information
self._implicit_app_ctx_stack: t.List[t.Optional["AppContext"]] = []
self._implicit_app_ctx_stack = []
# indicator if the context was preserved. Next time another context
# is pushed the preserved context is popped.
@ -329,17 +312,17 @@ class RequestContext:
# Functions that should be executed after the request on the response
# object. These will be called before the regular "after_request"
# functions.
self._after_request_functions: t.List[AfterRequestCallable] = []
self._after_request_functions = []
@property
def g(self) -> AppContext:
def g(self):
return _app_ctx_stack.top.g
@g.setter
def g(self, value: AppContext) -> None:
def g(self, value):
_app_ctx_stack.top.g = value
def copy(self) -> "RequestContext":
def copy(self):
"""Creates a copy of this request context with the same request object.
This can be used to move a request context to a different greenlet.
Because the actual request object is the same this cannot be used to
@ -359,17 +342,17 @@ class RequestContext:
session=self.session,
)
def match_request(self) -> None:
def match_request(self):
"""Can be overridden by a subclass to hook into the matching
of the request.
"""
try:
result = self.url_adapter.match(return_rule=True) # type: ignore
self.request.url_rule, self.request.view_args = result # type: ignore
result = self.url_adapter.match(return_rule=True)
self.request.url_rule, self.request.view_args = result
except HTTPException as e:
self.request.routing_exception = e
def push(self) -> None:
def push(self):
"""Binds the request context to the current context."""
# If an exception occurs in debug mode or if context preservation is
# activated under exception situations exactly one context stays
@ -393,6 +376,9 @@ class RequestContext:
else:
self._implicit_app_ctx_stack.append(None)
if hasattr(sys, "exc_clear"):
sys.exc_clear()
_request_ctx_stack.push(self)
# Open the session at the moment that the request context is available.
@ -406,12 +392,10 @@ class RequestContext:
if self.session is None:
self.session = session_interface.make_null_session(self.app)
# Match the request URL after loading the session, so that the
# session is available in custom URL converters.
if self.url_adapter is not None:
self.match_request()
def pop(self, exc: t.Optional[BaseException] = _sentinel) -> None: # type: ignore
def pop(self, exc=_sentinel):
"""Pops the request context and unbinds it by doing that. This will
also trigger the execution of functions registered by the
:meth:`~flask.Flask.teardown_request` decorator.
@ -420,9 +404,9 @@ class RequestContext:
Added the `exc` argument.
"""
app_ctx = self._implicit_app_ctx_stack.pop()
clear_request = False
try:
clear_request = False
if not self._implicit_app_ctx_stack:
self.preserved = False
self._preserved_exc = None
@ -430,6 +414,13 @@ class RequestContext:
exc = sys.exc_info()[1]
self.app.do_teardown_request(exc)
# If this interpreter supports clearing the exception information
# we do that now. This will only go into effect on Python 2.x,
# on 3.x it disappears automatically at the end of the exception
# stack.
if hasattr(sys, "exc_clear"):
sys.exc_clear()
request_close = getattr(self.request, "close", None)
if request_close is not None:
request_close()
@ -446,26 +437,25 @@ class RequestContext:
if app_ctx is not None:
app_ctx.pop(exc)
assert (
rv is self
), f"Popped wrong request context. ({rv!r} instead of {self!r})"
assert rv is self, "Popped wrong request context. (%r instead of %r)" % (
rv,
self,
)
def auto_pop(self, exc: t.Optional[BaseException]) -> None:
def auto_pop(self, exc):
if self.request.environ.get("flask._preserve_context") or (
exc is not None and self.app.preserve_context_on_exception
):
self.preserved = True
self._preserved_exc = exc # type: ignore
self._preserved_exc = exc
else:
self.pop(exc)
def __enter__(self) -> "RequestContext":
def __enter__(self):
self.push()
return self
def __exit__(
self, exc_type: type, exc_value: BaseException, tb: TracebackType
) -> None:
def __exit__(self, exc_type, exc_value, tb):
# do not pop the request stack if we are in debug mode and an
# exception happened. This will allow the debugger to still
# access the request object in the interactive shell. Furthermore
@ -473,8 +463,13 @@ class RequestContext:
# See flask.testing for how this works.
self.auto_pop(exc_value)
def __repr__(self) -> str:
return (
f"<{type(self).__name__} {self.request.url!r}"
f" [{self.request.method}] of {self.app.name}>"
if BROKEN_PYPY_CTXMGR_EXIT and exc_type is not None:
reraise(exc_type, exc_value, tb)
def __repr__(self):
return "<%s '%s' [%s] of %s>" % (
self.__class__.__name__,
self.request.url,
self.request.method,
self.app.name,
)

View file

@ -1,7 +1,18 @@
# -*- coding: utf-8 -*-
"""
flask.debughelpers
~~~~~~~~~~~~~~~~~~
Various helpers to make the development experience better.
:copyright: 2010 Pallets
:license: BSD-3-Clause
"""
import os
import typing as t
from warnings import warn
from ._compat import implements_to_string
from ._compat import text_type
from .app import Flask
from .blueprints import Blueprint
from .globals import _request_ctx_stack
@ -13,6 +24,7 @@ class UnexpectedUnicodeError(AssertionError, UnicodeError):
"""
@implements_to_string
class DebugFilesKeyError(KeyError, AssertionError):
"""Raised from request.files during debugging. The idea is that it can
provide a better error message than just a generic KeyError/BadRequest.
@ -21,18 +33,17 @@ class DebugFilesKeyError(KeyError, AssertionError):
def __init__(self, request, key):
form_matches = request.form.getlist(key)
buf = [
f"You tried to access the file {key!r} in the request.files"
" dictionary but it does not exist. The mimetype for the"
f" request is {request.mimetype!r} instead of"
" 'multipart/form-data' which means that no file contents"
" were transmitted. To fix this error you should provide"
' enctype="multipart/form-data" in your form.'
'You tried to access the file "%s" in the request.files '
"dictionary but it does not exist. The mimetype for the request "
'is "%s" instead of "multipart/form-data" which means that no '
"file contents were transmitted. To fix this error you should "
'provide enctype="multipart/form-data" in your form.'
% (key, request.mimetype)
]
if form_matches:
names = ", ".join(repr(x) for x in form_matches)
buf.append(
"\n\nThe browser instead transmitted some file names. "
f"This was submitted: {names}"
"This was submitted: %s" % ", ".join('"%s"' % x for x in form_matches)
)
self.msg = "".join(buf)
@ -49,24 +60,24 @@ class FormDataRoutingRedirect(AssertionError):
def __init__(self, request):
exc = request.routing_exception
buf = [
f"A request was sent to this URL ({request.url}) but a"
" redirect was issued automatically by the routing system"
f" to {exc.new_url!r}."
"A request was sent to this URL (%s) but a redirect was "
'issued automatically by the routing system to "%s".'
% (request.url, exc.new_url)
]
# In case just a slash was appended we can be extra helpful
if f"{request.base_url}/" == exc.new_url.split("?")[0]:
if request.base_url + "/" == exc.new_url.split("?")[0]:
buf.append(
" The URL was defined with a trailing slash so Flask"
" will automatically redirect to the URL with the"
" trailing slash if it was accessed without one."
" The URL was defined with a trailing slash so "
"Flask will automatically redirect to the URL "
"with the trailing slash if it was accessed "
"without one."
)
buf.append(
" Make sure to directly send your"
f" {request.method}-request to this URL since we can't make"
" browsers or HTTP clients redirect with form data reliably"
" or without user interaction."
" Make sure to directly send your %s-request to this URL "
"since we can't make browsers or HTTP clients redirect "
"with form data reliably or without user interaction." % request.method
)
buf.append("\n\nNote: this exception is only raised in debug mode")
AssertionError.__init__(self, "".join(buf).encode("utf-8"))
@ -93,26 +104,26 @@ def attach_enctype_error_multidict(request):
request.files.__class__ = newcls
def _dump_loader_info(loader) -> t.Generator:
yield f"class: {type(loader).__module__}.{type(loader).__name__}"
def _dump_loader_info(loader):
yield "class: %s.%s" % (type(loader).__module__, type(loader).__name__)
for key, value in sorted(loader.__dict__.items()):
if key.startswith("_"):
continue
if isinstance(value, (tuple, list)):
if not all(isinstance(x, str) for x in value):
if not all(isinstance(x, (str, text_type)) for x in value):
continue
yield f"{key}:"
yield "%s:" % key
for item in value:
yield f" - {item}"
yield " - %s" % item
continue
elif not isinstance(value, (str, int, float, bool)):
elif not isinstance(value, (str, text_type, int, float, bool)):
continue
yield f"{key}: {value!r}"
yield "%s: %r" % (key, value)
def explain_template_loading_attempts(app: Flask, template, attempts) -> None:
def explain_template_loading_attempts(app, template, attempts):
"""This should help developers understand what failed"""
info = [f"Locating template {template!r}:"]
info = ['Locating template "%s":' % template]
total_found = 0
blueprint = None
reqctx = _request_ctx_stack.top
@ -121,23 +132,23 @@ def explain_template_loading_attempts(app: Flask, template, attempts) -> None:
for idx, (loader, srcobj, triple) in enumerate(attempts):
if isinstance(srcobj, Flask):
src_info = f"application {srcobj.import_name!r}"
src_info = 'application "%s"' % srcobj.import_name
elif isinstance(srcobj, Blueprint):
src_info = f"blueprint {srcobj.name!r} ({srcobj.import_name})"
src_info = 'blueprint "%s" (%s)' % (srcobj.name, srcobj.import_name)
else:
src_info = repr(srcobj)
info.append(f"{idx + 1:5}: trying loader of {src_info}")
info.append("% 5d: trying loader of %s" % (idx + 1, src_info))
for line in _dump_loader_info(loader):
info.append(f" {line}")
info.append(" %s" % line)
if triple is None:
detail = "no match"
else:
detail = f"found ({triple[1] or '<string>'!r})"
detail = "found (%r)" % (triple[1] or "<string>")
total_found += 1
info.append(f" -> {detail}")
info.append(" -> %s" % detail)
seems_fishy = False
if total_found == 0:
@ -149,23 +160,24 @@ def explain_template_loading_attempts(app: Flask, template, attempts) -> None:
if blueprint is not None and seems_fishy:
info.append(
" The template was looked up from an endpoint that belongs"
f" to the blueprint {blueprint!r}."
" The template was looked up from an endpoint that "
'belongs to the blueprint "%s".' % blueprint
)
info.append(" Maybe you did not place a template in the right folder?")
info.append(" See https://flask.palletsprojects.com/blueprints/#templates")
info.append(" See http://flask.pocoo.org/docs/blueprints/#templates")
app.logger.info("\n".join(info))
def explain_ignored_app_run() -> None:
def explain_ignored_app_run():
if os.environ.get("WERKZEUG_RUN_MAIN") != "true":
warn(
Warning(
"Silently ignoring app.run() because the application is"
" run from the flask command line executable. Consider"
' putting app.run() behind an if __name__ == "__main__"'
" guard to silence this warning."
"Silently ignoring app.run() because the "
"application is run from the flask command line "
"executable. Consider putting app.run() behind an "
'if __name__ == "__main__" guard to silence this '
"warning."
),
stacklevel=3,
)

View file

@ -1,14 +1,19 @@
import typing as t
# -*- coding: utf-8 -*-
"""
flask.globals
~~~~~~~~~~~~~
Defines all the global objects that are proxies to the current
active context.
:copyright: 2010 Pallets
:license: BSD-3-Clause
"""
from functools import partial
from werkzeug.local import LocalProxy
from werkzeug.local import LocalStack
if t.TYPE_CHECKING:
from .app import Flask
from .ctx import _AppCtxGlobals
from .sessions import SessionMixin
from .wrappers import Request
_request_ctx_err_msg = """\
Working outside of request context.
@ -51,9 +56,7 @@ def _find_app():
# context locals
_request_ctx_stack = LocalStack()
_app_ctx_stack = LocalStack()
current_app: "Flask" = LocalProxy(_find_app) # type: ignore
request: "Request" = LocalProxy(partial(_lookup_req_object, "request")) # type: ignore
session: "SessionMixin" = LocalProxy( # type: ignore
partial(_lookup_req_object, "session")
)
g: "_AppCtxGlobals" = LocalProxy(partial(_lookup_app_object, "g")) # type: ignore
current_app = LocalProxy(_find_app)
request = LocalProxy(partial(_lookup_req_object, "request"))
session = LocalProxy(partial(_lookup_req_object, "session"))
g = LocalProxy(partial(_lookup_app_object, "g"))

File diff suppressed because it is too large Load diff

View file

@ -1,335 +1,357 @@
import io
import json as _json
import typing as t
import uuid
import warnings
from datetime import date
# -*- coding: utf-8 -*-
"""
flask.json
~~~~~~~~~~
from jinja2.utils import htmlsafe_json_dumps as _jinja_htmlsafe_dumps
:copyright: 2010 Pallets
:license: BSD-3-Clause
"""
import codecs
import io
import uuid
from datetime import date
from datetime import datetime
from itsdangerous import json as _json
from jinja2 import Markup
from werkzeug.http import http_date
from .._compat import PY2
from .._compat import text_type
from ..globals import current_app
from ..globals import request
if t.TYPE_CHECKING:
from ..app import Flask
from ..wrappers import Response
try:
import dataclasses
except ImportError:
# Python < 3.7
dataclasses = None # type: ignore
dataclasses = None
# Figure out if simplejson escapes slashes. This behavior was changed
# from one version to another without reason.
_slash_escape = "\\/" not in _json.dumps("/")
__all__ = [
"dump",
"dumps",
"load",
"loads",
"htmlsafe_dump",
"htmlsafe_dumps",
"JSONDecoder",
"JSONEncoder",
"jsonify",
]
def _wrap_reader_for_text(fp, encoding):
if isinstance(fp.read(0), bytes):
fp = io.TextIOWrapper(io.BufferedReader(fp), encoding)
return fp
def _wrap_writer_for_text(fp, encoding):
try:
fp.write("")
except TypeError:
fp = io.TextIOWrapper(fp, encoding)
return fp
class JSONEncoder(_json.JSONEncoder):
"""The default JSON encoder. Handles extra types compared to the
built-in :class:`json.JSONEncoder`.
"""The default Flask JSON encoder. This one extends the default
encoder by also supporting ``datetime``, ``UUID``, ``dataclasses``,
and ``Markup`` objects.
- :class:`datetime.datetime` and :class:`datetime.date` are
serialized to :rfc:`822` strings. This is the same as the HTTP
date format.
- :class:`uuid.UUID` is serialized to a string.
- :class:`dataclasses.dataclass` is passed to
:func:`dataclasses.asdict`.
- :class:`~markupsafe.Markup` (or any object with a ``__html__``
method) will call the ``__html__`` method to get a string.
``datetime`` objects are serialized as RFC 822 datetime strings.
This is the same as the HTTP date format.
Assign a subclass of this to :attr:`flask.Flask.json_encoder` or
:attr:`flask.Blueprint.json_encoder` to override the default.
In order to support more data types, override the :meth:`default`
method.
"""
def default(self, o: t.Any) -> t.Any:
"""Convert ``o`` to a JSON serializable type. See
:meth:`json.JSONEncoder.default`. Python does not support
overriding how basic types like ``str`` or ``list`` are
serialized, they are handled before this method.
def default(self, o):
"""Implement this method in a subclass such that it returns a
serializable object for ``o``, or calls the base implementation (to
raise a :exc:`TypeError`).
For example, to support arbitrary iterators, you could implement
default like this::
def default(self, o):
try:
iterable = iter(o)
except TypeError:
pass
else:
return list(iterable)
return JSONEncoder.default(self, o)
"""
if isinstance(o, datetime):
return http_date(o.utctimetuple())
if isinstance(o, date):
return http_date(o)
return http_date(o.timetuple())
if isinstance(o, uuid.UUID):
return str(o)
if dataclasses and dataclasses.is_dataclass(o):
return dataclasses.asdict(o)
if hasattr(o, "__html__"):
return str(o.__html__())
return super().default(o)
return text_type(o.__html__())
return _json.JSONEncoder.default(self, o)
class JSONDecoder(_json.JSONDecoder):
"""The default JSON decoder.
This does not change any behavior from the built-in
:class:`json.JSONDecoder`.
Assign a subclass of this to :attr:`flask.Flask.json_decoder` or
:attr:`flask.Blueprint.json_decoder` to override the default.
"""The default JSON decoder. This one does not change the behavior from
the default simplejson decoder. Consult the :mod:`json` documentation
for more information. This decoder is not only used for the load
functions of this module but also :attr:`~flask.Request`.
"""
def _dump_arg_defaults(
kwargs: t.Dict[str, t.Any], app: t.Optional["Flask"] = None
) -> None:
def _dump_arg_defaults(kwargs, app=None):
"""Inject default arguments for dump functions."""
if app is None:
app = current_app
if app:
cls = app.json_encoder
bp = app.blueprints.get(request.blueprint) if request else None # type: ignore
if bp is not None and bp.json_encoder is not None:
cls = bp.json_encoder
bp = app.blueprints.get(request.blueprint) if request else None
kwargs.setdefault(
"cls", bp.json_encoder if bp and bp.json_encoder else app.json_encoder
)
if not app.config["JSON_AS_ASCII"]:
kwargs.setdefault("ensure_ascii", False)
kwargs.setdefault("cls", cls)
kwargs.setdefault("ensure_ascii", app.config["JSON_AS_ASCII"])
kwargs.setdefault("sort_keys", app.config["JSON_SORT_KEYS"])
else:
kwargs.setdefault("sort_keys", True)
kwargs.setdefault("cls", JSONEncoder)
def _load_arg_defaults(
kwargs: t.Dict[str, t.Any], app: t.Optional["Flask"] = None
) -> None:
def _load_arg_defaults(kwargs, app=None):
"""Inject default arguments for load functions."""
if app is None:
app = current_app
if app:
cls = app.json_decoder
bp = app.blueprints.get(request.blueprint) if request else None # type: ignore
if bp is not None and bp.json_decoder is not None:
cls = bp.json_decoder
kwargs.setdefault("cls", cls)
bp = app.blueprints.get(request.blueprint) if request else None
kwargs.setdefault(
"cls", bp.json_decoder if bp and bp.json_decoder else app.json_decoder
)
else:
kwargs.setdefault("cls", JSONDecoder)
def dumps(obj: t.Any, app: t.Optional["Flask"] = None, **kwargs: t.Any) -> str:
"""Serialize an object to a string of JSON.
def detect_encoding(data):
"""Detect which UTF codec was used to encode the given bytes.
Takes the same arguments as the built-in :func:`json.dumps`, with
some defaults from application configuration.
The latest JSON standard (:rfc:`8259`) suggests that only UTF-8 is
accepted. Older documents allowed 8, 16, or 32. 16 and 32 can be big
or little endian. Some editors or libraries may prepend a BOM.
:param data: Bytes in unknown UTF encoding.
:return: UTF encoding name
"""
head = data[:4]
if head[:3] == codecs.BOM_UTF8:
return "utf-8-sig"
if b"\x00" not in head:
return "utf-8"
if head in (codecs.BOM_UTF32_BE, codecs.BOM_UTF32_LE):
return "utf-32"
if head[:2] in (codecs.BOM_UTF16_BE, codecs.BOM_UTF16_LE):
return "utf-16"
if len(head) == 4:
if head[:3] == b"\x00\x00\x00":
return "utf-32-be"
if head[::2] == b"\x00\x00":
return "utf-16-be"
if head[1:] == b"\x00\x00\x00":
return "utf-32-le"
if head[1::2] == b"\x00\x00":
return "utf-16-le"
if len(head) == 2:
return "utf-16-be" if head.startswith(b"\x00") else "utf-16-le"
return "utf-8"
def dumps(obj, app=None, **kwargs):
"""Serialize ``obj`` to a JSON-formatted string. If there is an
app context pushed, use the current app's configured encoder
(:attr:`~flask.Flask.json_encoder`), or fall back to the default
:class:`JSONEncoder`.
Takes the same arguments as the built-in :func:`json.dumps`, and
does some extra configuration based on the application. If the
simplejson package is installed, it is preferred.
:param obj: Object to serialize to JSON.
:param app: Use this app's config instead of the active app context
or defaults.
:param app: App instance to use to configure the JSON encoder.
Uses ``current_app`` if not given, and falls back to the default
encoder when not in an app context.
:param kwargs: Extra arguments passed to :func:`json.dumps`.
.. versionchanged:: 2.0
``encoding`` is deprecated and will be removed in Flask 2.1.
.. versionchanged:: 1.0.3
``app`` can be passed directly, rather than requiring an app
context for configuration.
"""
_dump_arg_defaults(kwargs, app=app)
encoding = kwargs.pop("encoding", None)
rv = _json.dumps(obj, **kwargs)
if encoding is not None:
warnings.warn(
"'encoding' is deprecated and will be removed in Flask 2.1.",
DeprecationWarning,
stacklevel=2,
)
if isinstance(rv, str):
return rv.encode(encoding) # type: ignore
if encoding is not None and isinstance(rv, text_type):
rv = rv.encode(encoding)
return rv
def dump(
obj: t.Any, fp: t.IO[str], app: t.Optional["Flask"] = None, **kwargs: t.Any
) -> None:
"""Serialize an object to JSON written to a file object.
Takes the same arguments as the built-in :func:`json.dump`, with
some defaults from application configuration.
:param obj: Object to serialize to JSON.
:param fp: File object to write JSON to.
:param app: Use this app's config instead of the active app context
or defaults.
:param kwargs: Extra arguments passed to :func:`json.dump`.
.. versionchanged:: 2.0
Writing to a binary file, and the ``encoding`` argument, is
deprecated and will be removed in Flask 2.1.
"""
def dump(obj, fp, app=None, **kwargs):
"""Like :func:`dumps` but writes into a file object."""
_dump_arg_defaults(kwargs, app=app)
encoding = kwargs.pop("encoding", None)
show_warning = encoding is not None
try:
fp.write("")
except TypeError:
show_warning = True
fp = io.TextIOWrapper(fp, encoding or "utf-8") # type: ignore
if show_warning:
warnings.warn(
"Writing to a binary file, and the 'encoding' argument, is"
" deprecated and will be removed in Flask 2.1.",
DeprecationWarning,
stacklevel=2,
)
if encoding is not None:
fp = _wrap_writer_for_text(fp, encoding)
_json.dump(obj, fp, **kwargs)
def loads(s: str, app: t.Optional["Flask"] = None, **kwargs: t.Any) -> t.Any:
"""Deserialize an object from a string of JSON.
def loads(s, app=None, **kwargs):
"""Deserialize an object from a JSON-formatted string ``s``. If
there is an app context pushed, use the current app's configured
decoder (:attr:`~flask.Flask.json_decoder`), or fall back to the
default :class:`JSONDecoder`.
Takes the same arguments as the built-in :func:`json.loads`, with
some defaults from application configuration.
Takes the same arguments as the built-in :func:`json.loads`, and
does some extra configuration based on the application. If the
simplejson package is installed, it is preferred.
:param s: JSON string to deserialize.
:param app: Use this app's config instead of the active app context
or defaults.
:param app: App instance to use to configure the JSON decoder.
Uses ``current_app`` if not given, and falls back to the default
encoder when not in an app context.
:param kwargs: Extra arguments passed to :func:`json.loads`.
.. versionchanged:: 2.0
``encoding`` is deprecated and will be removed in Flask 2.1. The
data must be a string or UTF-8 bytes.
.. versionchanged:: 1.0.3
``app`` can be passed directly, rather than requiring an app
context for configuration.
"""
_load_arg_defaults(kwargs, app=app)
encoding = kwargs.pop("encoding", None)
if encoding is not None:
warnings.warn(
"'encoding' is deprecated and will be removed in Flask 2.1."
" The data must be a string or UTF-8 bytes.",
DeprecationWarning,
stacklevel=2,
)
if isinstance(s, bytes):
s = s.decode(encoding)
if isinstance(s, bytes):
encoding = kwargs.pop("encoding", None)
if encoding is None:
encoding = detect_encoding(s)
s = s.decode(encoding)
return _json.loads(s, **kwargs)
def load(fp: t.IO[str], app: t.Optional["Flask"] = None, **kwargs: t.Any) -> t.Any:
"""Deserialize an object from JSON read from a file object.
Takes the same arguments as the built-in :func:`json.load`, with
some defaults from application configuration.
:param fp: File object to read JSON from.
:param app: Use this app's config instead of the active app context
or defaults.
:param kwargs: Extra arguments passed to :func:`json.load`.
.. versionchanged:: 2.0
``encoding`` is deprecated and will be removed in Flask 2.1. The
file must be text mode, or binary mode with UTF-8 bytes.
"""
def load(fp, app=None, **kwargs):
"""Like :func:`loads` but reads from a file object."""
_load_arg_defaults(kwargs, app=app)
encoding = kwargs.pop("encoding", None)
if encoding is not None:
warnings.warn(
"'encoding' is deprecated and will be removed in Flask 2.1."
" The file must be text mode, or binary mode with UTF-8"
" bytes.",
DeprecationWarning,
stacklevel=2,
)
if isinstance(fp.read(0), bytes):
fp = io.TextIOWrapper(fp, encoding) # type: ignore
if not PY2:
fp = _wrap_reader_for_text(fp, kwargs.pop("encoding", None) or "utf-8")
return _json.load(fp, **kwargs)
def htmlsafe_dumps(obj: t.Any, **kwargs: t.Any) -> str:
"""Serialize an object to a string of JSON with :func:`dumps`, then
replace HTML-unsafe characters with Unicode escapes and mark the
result safe with :class:`~markupsafe.Markup`.
def htmlsafe_dumps(obj, **kwargs):
"""Works exactly like :func:`dumps` but is safe for use in ``<script>``
tags. It accepts the same arguments and returns a JSON string. Note that
this is available in templates through the ``|tojson`` filter which will
also mark the result as safe. Due to how this function escapes certain
characters this is safe even if used outside of ``<script>`` tags.
This is available in templates as the ``|tojson`` filter.
The following characters are escaped in strings:
The returned string is safe to render in HTML documents and
``<script>`` tags. The exception is in HTML attributes that are
double quoted; either use single quotes or the ``|forceescape``
filter.
- ``<``
- ``>``
- ``&``
- ``'``
.. versionchanged:: 2.0
Uses :func:`jinja2.utils.htmlsafe_json_dumps`. The returned
value is marked safe by wrapping in :class:`~markupsafe.Markup`.
This makes it safe to embed such strings in any place in HTML with the
notable exception of double quoted attributes. In that case single
quote your attributes or HTML escape it in addition.
.. versionchanged:: 0.10
Single quotes are escaped, making this safe to use in HTML,
``<script>`` tags, and single-quoted attributes without further
escaping.
This function's return value is now always safe for HTML usage, even
if outside of script tags or if used in XHTML. This rule does not
hold true when using this function in HTML attributes that are double
quoted. Always single quote attributes if you use the ``|tojson``
filter. Alternatively use ``|tojson|forceescape``.
"""
return _jinja_htmlsafe_dumps(obj, dumps=dumps, **kwargs)
rv = (
dumps(obj, **kwargs)
.replace(u"<", u"\\u003c")
.replace(u">", u"\\u003e")
.replace(u"&", u"\\u0026")
.replace(u"'", u"\\u0027")
)
if not _slash_escape:
rv = rv.replace("\\/", "/")
return rv
def htmlsafe_dump(obj: t.Any, fp: t.IO[str], **kwargs: t.Any) -> None:
"""Serialize an object to JSON written to a file object, replacing
HTML-unsafe characters with Unicode escapes. See
:func:`htmlsafe_dumps` and :func:`dumps`.
"""
fp.write(htmlsafe_dumps(obj, **kwargs))
def htmlsafe_dump(obj, fp, **kwargs):
"""Like :func:`htmlsafe_dumps` but writes into a file object."""
fp.write(text_type(htmlsafe_dumps(obj, **kwargs)))
def jsonify(*args: t.Any, **kwargs: t.Any) -> "Response":
"""Serialize data to JSON and wrap it in a :class:`~flask.Response`
with the :mimetype:`application/json` mimetype.
def jsonify(*args, **kwargs):
"""This function wraps :func:`dumps` to add a few enhancements that make
life easier. It turns the JSON output into a :class:`~flask.Response`
object with the :mimetype:`application/json` mimetype. For convenience, it
also converts multiple arguments into an array or multiple keyword arguments
into a dict. This means that both ``jsonify(1,2,3)`` and
``jsonify([1,2,3])`` serialize to ``[1,2,3]``.
Uses :func:`dumps` to serialize the data, but ``args`` and
``kwargs`` are treated as data rather than arguments to
:func:`json.dumps`.
For clarity, the JSON serialization behavior has the following differences
from :func:`dumps`:
1. Single argument: Treated as a single value.
2. Multiple arguments: Treated as a list of values.
``jsonify(1, 2, 3)`` is the same as ``jsonify([1, 2, 3])``.
3. Keyword arguments: Treated as a dict of values.
``jsonify(data=data, errors=errors)`` is the same as
``jsonify({"data": data, "errors": errors})``.
4. Passing both arguments and keyword arguments is not allowed as
it's not clear what should happen.
1. Single argument: Passed straight through to :func:`dumps`.
2. Multiple arguments: Converted to an array before being passed to
:func:`dumps`.
3. Multiple keyword arguments: Converted to a dict before being passed to
:func:`dumps`.
4. Both args and kwargs: Behavior undefined and will throw an exception.
.. code-block:: python
Example usage::
from flask import jsonify
@app.route("/users/me")
@app.route('/_get_current_user')
def get_current_user():
return jsonify(
username=g.user.username,
email=g.user.email,
id=g.user.id,
)
return jsonify(username=g.user.username,
email=g.user.email,
id=g.user.id)
Will return a JSON response like this:
.. code-block:: javascript
This will send a JSON response like this to the browser::
{
"username": "admin",
"email": "admin@localhost",
"id": 42
"username": "admin",
"email": "admin@localhost",
"id": 42
}
The default output omits indents and spaces after separators. In
debug mode or if :data:`JSONIFY_PRETTYPRINT_REGULAR` is ``True``,
the output will be formatted to be easier to read.
.. versionchanged:: 0.11
Added support for serializing top-level arrays. This introduces
a security risk in ancient browsers. See :ref:`security-json`.
Added support for serializing top-level arrays. This introduces a
security risk in ancient browsers. See :ref:`json-security` for details.
This function's response will be pretty printed if the
``JSONIFY_PRETTYPRINT_REGULAR`` config parameter is set to True or the
Flask app is running in debug mode. Compressed (not pretty) formatting
currently means no indents and no spaces after separators.
.. versionadded:: 0.2
"""
indent = None
separators = (",", ":")
@ -345,6 +367,10 @@ def jsonify(*args: t.Any, **kwargs: t.Any) -> "Response":
data = args or kwargs
return current_app.response_class(
f"{dumps(data, indent=indent, separators=separators)}\n",
dumps(data, indent=indent, separators=separators) + "\n",
mimetype=current_app.config["JSONIFY_MIMETYPE"],
)
def tojson_filter(obj, **kwargs):
return Markup(htmlsafe_dumps(obj, **kwargs))

Binary file not shown.

Binary file not shown.

View file

@ -1,11 +1,12 @@
# -*- coding: utf-8 -*-
"""
Tagged JSON
~~~~~~~~~~~
A compact representation for lossless serialization of non-standard JSON
types. :class:`~flask.sessions.SecureCookieSessionInterface` uses this
to serialize the session data, but it may be useful in other places. It
can be extended to support other types.
A compact representation for lossless serialization of non-standard JSON types.
:class:`~flask.sessions.SecureCookieSessionInterface` uses this to serialize
the session data, but it may be useful in other places. It can be extended to
support other types.
.. autoclass:: TaggedJSONSerializer
:members:
@ -13,15 +14,12 @@ can be extended to support other types.
.. autoclass:: JSONTag
:members:
Let's see an example that adds support for
:class:`~collections.OrderedDict`. Dicts don't have an order in JSON, so
to handle this we will dump the items as a list of ``[key, value]``
pairs. Subclass :class:`JSONTag` and give it the new key ``' od'`` to
identify the type. The session serializer processes dicts first, so
insert the new tag at the front of the order since ``OrderedDict`` must
be processed before ``dict``.
.. code-block:: python
Let's seen an example that adds support for :class:`~collections.OrderedDict`.
Dicts don't have an order in Python or JSON, so to handle this we will dump
the items as a list of ``[key, value]`` pairs. Subclass :class:`JSONTag` and
give it the new key ``' od'`` to identify the type. The session serializer
processes dicts first, so insert the new tag at the front of the order since
``OrderedDict`` must be processed before ``dict``. ::
from flask.json.tag import JSONTag
@ -39,49 +37,53 @@ be processed before ``dict``.
return OrderedDict(value)
app.session_interface.serializer.register(TagOrderedDict, index=0)
:copyright: 2010 Pallets
:license: BSD-3-Clause
"""
import typing as t
from base64 import b64decode
from base64 import b64encode
from datetime import datetime
from uuid import UUID
from markupsafe import Markup
from jinja2 import Markup
from werkzeug.http import http_date
from werkzeug.http import parse_date
from .._compat import iteritems
from .._compat import text_type
from ..json import dumps
from ..json import loads
class JSONTag:
class JSONTag(object):
"""Base class for defining type tags for :class:`TaggedJSONSerializer`."""
__slots__ = ("serializer",)
#: The tag to mark the serialized object with. If ``None``, this tag is
#: only used as an intermediate step during tagging.
key: t.Optional[str] = None
key = None
def __init__(self, serializer: "TaggedJSONSerializer") -> None:
def __init__(self, serializer):
"""Create a tagger for the given serializer."""
self.serializer = serializer
def check(self, value: t.Any) -> bool:
def check(self, value):
"""Check if the given value should be tagged by this tag."""
raise NotImplementedError
def to_json(self, value: t.Any) -> t.Any:
def to_json(self, value):
"""Convert the Python object to an object that is a valid JSON type.
The tag will be added later."""
raise NotImplementedError
def to_python(self, value: t.Any) -> t.Any:
def to_python(self, value):
"""Convert the JSON representation back to the correct type. The tag
will already be removed."""
raise NotImplementedError
def tag(self, value: t.Any) -> t.Any:
def tag(self, value):
"""Convert the value to a valid JSON type and add the tag structure
around it."""
return {self.key: self.to_json(value)}
@ -97,18 +99,18 @@ class TagDict(JSONTag):
__slots__ = ()
key = " di"
def check(self, value: t.Any) -> bool:
def check(self, value):
return (
isinstance(value, dict)
and len(value) == 1
and next(iter(value)) in self.serializer.tags
)
def to_json(self, value: t.Any) -> t.Any:
def to_json(self, value):
key = next(iter(value))
return {f"{key}__": self.serializer.tag(value[key])}
return {key + "__": self.serializer.tag(value[key])}
def to_python(self, value: t.Any) -> t.Any:
def to_python(self, value):
key = next(iter(value))
return {key[:-2]: value[key]}
@ -116,13 +118,13 @@ class TagDict(JSONTag):
class PassDict(JSONTag):
__slots__ = ()
def check(self, value: t.Any) -> bool:
def check(self, value):
return isinstance(value, dict)
def to_json(self, value: t.Any) -> t.Any:
def to_json(self, value):
# JSON objects may only have string keys, so don't bother tagging the
# key here.
return {k: self.serializer.tag(v) for k, v in value.items()}
return dict((k, self.serializer.tag(v)) for k, v in iteritems(value))
tag = to_json
@ -131,23 +133,23 @@ class TagTuple(JSONTag):
__slots__ = ()
key = " t"
def check(self, value: t.Any) -> bool:
def check(self, value):
return isinstance(value, tuple)
def to_json(self, value: t.Any) -> t.Any:
def to_json(self, value):
return [self.serializer.tag(item) for item in value]
def to_python(self, value: t.Any) -> t.Any:
def to_python(self, value):
return tuple(value)
class PassList(JSONTag):
__slots__ = ()
def check(self, value: t.Any) -> bool:
def check(self, value):
return isinstance(value, list)
def to_json(self, value: t.Any) -> t.Any:
def to_json(self, value):
return [self.serializer.tag(item) for item in value]
tag = to_json
@ -157,31 +159,31 @@ class TagBytes(JSONTag):
__slots__ = ()
key = " b"
def check(self, value: t.Any) -> bool:
def check(self, value):
return isinstance(value, bytes)
def to_json(self, value: t.Any) -> t.Any:
def to_json(self, value):
return b64encode(value).decode("ascii")
def to_python(self, value: t.Any) -> t.Any:
def to_python(self, value):
return b64decode(value)
class TagMarkup(JSONTag):
"""Serialize anything matching the :class:`~markupsafe.Markup` API by
"""Serialize anything matching the :class:`~flask.Markup` API by
having a ``__html__`` method to the result of that method. Always
deserializes to an instance of :class:`~markupsafe.Markup`."""
deserializes to an instance of :class:`~flask.Markup`."""
__slots__ = ()
key = " m"
def check(self, value: t.Any) -> bool:
def check(self, value):
return callable(getattr(value, "__html__", None))
def to_json(self, value: t.Any) -> t.Any:
return str(value.__html__())
def to_json(self, value):
return text_type(value.__html__())
def to_python(self, value: t.Any) -> t.Any:
def to_python(self, value):
return Markup(value)
@ -189,13 +191,13 @@ class TagUUID(JSONTag):
__slots__ = ()
key = " u"
def check(self, value: t.Any) -> bool:
def check(self, value):
return isinstance(value, UUID)
def to_json(self, value: t.Any) -> t.Any:
def to_json(self, value):
return value.hex
def to_python(self, value: t.Any) -> t.Any:
def to_python(self, value):
return UUID(value)
@ -203,17 +205,17 @@ class TagDateTime(JSONTag):
__slots__ = ()
key = " d"
def check(self, value: t.Any) -> bool:
def check(self, value):
return isinstance(value, datetime)
def to_json(self, value: t.Any) -> t.Any:
def to_json(self, value):
return http_date(value)
def to_python(self, value: t.Any) -> t.Any:
def to_python(self, value):
return parse_date(value)
class TaggedJSONSerializer:
class TaggedJSONSerializer(object):
"""Serializer that uses a tag system to compactly represent objects that
are not JSON types. Passed as the intermediate serializer to
:class:`itsdangerous.Serializer`.
@ -223,7 +225,7 @@ class TaggedJSONSerializer:
* :class:`dict`
* :class:`tuple`
* :class:`bytes`
* :class:`~markupsafe.Markup`
* :class:`~flask.Markup`
* :class:`~uuid.UUID`
* :class:`~datetime.datetime`
"""
@ -243,19 +245,14 @@ class TaggedJSONSerializer:
TagDateTime,
]
def __init__(self) -> None:
self.tags: t.Dict[str, JSONTag] = {}
self.order: t.List[JSONTag] = []
def __init__(self):
self.tags = {}
self.order = []
for cls in self.default_tags:
self.register(cls)
def register(
self,
tag_class: t.Type[JSONTag],
force: bool = False,
index: t.Optional[int] = None,
) -> None:
def register(self, tag_class, force=False, index=None):
"""Register a new tag with this serializer.
:param tag_class: tag class to register. Will be instantiated with this
@ -274,7 +271,7 @@ class TaggedJSONSerializer:
if key is not None:
if not force and key in self.tags:
raise KeyError(f"Tag '{key}' is already registered.")
raise KeyError("Tag '{0}' is already registered.".format(key))
self.tags[key] = tag
@ -283,7 +280,7 @@ class TaggedJSONSerializer:
else:
self.order.insert(index, tag)
def tag(self, value: t.Any) -> t.Dict[str, t.Any]:
def tag(self, value):
"""Convert a value to a tagged representation if necessary."""
for tag in self.order:
if tag.check(value):
@ -291,7 +288,7 @@ class TaggedJSONSerializer:
return value
def untag(self, value: t.Dict[str, t.Any]) -> t.Any:
def untag(self, value):
"""Convert a tagged representation back to the original type."""
if len(value) != 1:
return value
@ -303,10 +300,10 @@ class TaggedJSONSerializer:
return self.tags[key].to_python(value[key])
def dumps(self, value: t.Any) -> str:
def dumps(self, value):
"""Tag the value and dump it to a compact JSON string."""
return dumps(self.tag(value), separators=(",", ":"))
def loads(self, value: str) -> t.Any:
def loads(self, value):
"""Load data from a JSON string and deserialized any tagged objects."""
return loads(value, object_hook=self.untag)

View file

@ -1,17 +1,24 @@
# -*- coding: utf-8 -*-
"""
flask.logging
~~~~~~~~~~~~~
:copyright: 2010 Pallets
:license: BSD-3-Clause
"""
from __future__ import absolute_import
import logging
import sys
import typing as t
import warnings
from werkzeug.local import LocalProxy
from .globals import request
if t.TYPE_CHECKING:
from .app import Flask
@LocalProxy
def wsgi_errors_stream() -> t.TextIO:
def wsgi_errors_stream():
"""Find the most appropriate error stream for the application. If a request
is active, log to ``wsgi.errors``, otherwise use ``sys.stderr``.
@ -23,7 +30,7 @@ def wsgi_errors_stream() -> t.TextIO:
return request.environ["wsgi.errors"] if request else sys.stderr
def has_level_handler(logger: logging.Logger) -> bool:
def has_level_handler(logger):
"""Check if there is a handler in the logging chain that will handle the
given logger's :meth:`effective level <~logging.Logger.getEffectiveLevel>`.
"""
@ -37,21 +44,35 @@ def has_level_handler(logger: logging.Logger) -> bool:
if not current.propagate:
break
current = current.parent # type: ignore
current = current.parent
return False
#: Log messages to :func:`~flask.logging.wsgi_errors_stream` with the format
#: ``[%(asctime)s] %(levelname)s in %(module)s: %(message)s``.
default_handler = logging.StreamHandler(wsgi_errors_stream) # type: ignore
default_handler = logging.StreamHandler(wsgi_errors_stream)
default_handler.setFormatter(
logging.Formatter("[%(asctime)s] %(levelname)s in %(module)s: %(message)s")
)
def create_logger(app: "Flask") -> logging.Logger:
"""Get the Flask app's logger and configure it if needed.
def _has_config(logger):
"""Decide if a logger has direct configuration applied by checking
its properties against the defaults.
:param logger: The :class:`~logging.Logger` to inspect.
"""
return (
logger.level != logging.NOTSET
or logger.handlers
or logger.filters
or not logger.propagate
)
def create_logger(app):
"""Get the the Flask apps's logger and configure it if needed.
The logger name will be the same as
:attr:`app.import_name <flask.Flask.name>`.
@ -65,6 +86,20 @@ def create_logger(app: "Flask") -> logging.Logger:
"""
logger = logging.getLogger(app.name)
# 1.1.0 changes name of logger, warn if config is detected for old
# name and not new name
for old_name in ("flask.app", "flask"):
old_logger = logging.getLogger(old_name)
if _has_config(old_logger) and not _has_config(logger):
warnings.warn(
"'app.logger' is named '{name}' for this application,"
" but configuration was found for '{old_name}', which"
" no longer has an effect. The logging configuration"
" should be moved to '{name}'.".format(name=app.name, old_name=old_name)
)
break
if app.debug and not logger.level:
logger.setLevel(logging.DEBUG)

View file

@ -1,32 +1,37 @@
# -*- coding: utf-8 -*-
"""
flask.sessions
~~~~~~~~~~~~~~
Implements cookie based sessions based on itsdangerous.
:copyright: 2010 Pallets
:license: BSD-3-Clause
"""
import hashlib
import typing as t
import warnings
from collections.abc import MutableMapping
from datetime import datetime
from itsdangerous import BadSignature
from itsdangerous import URLSafeTimedSerializer
from werkzeug.datastructures import CallbackDict
from ._compat import collections_abc
from .helpers import is_ip
from .helpers import total_seconds
from .json.tag import TaggedJSONSerializer
if t.TYPE_CHECKING:
import typing_extensions as te
from .app import Flask
from .wrappers import Request, Response
class SessionMixin(MutableMapping):
class SessionMixin(collections_abc.MutableMapping):
"""Expands a basic dictionary with session attributes."""
@property
def permanent(self) -> bool:
def permanent(self):
"""This reflects the ``'_permanent'`` key in the dict."""
return self.get("_permanent", False)
@permanent.setter
def permanent(self, value: bool) -> None:
def permanent(self, value):
self["_permanent"] = bool(value)
#: Some implementations can detect whether a session is newly
@ -67,24 +72,24 @@ class SecureCookieSession(CallbackDict, SessionMixin):
#: different users.
accessed = False
def __init__(self, initial: t.Any = None) -> None:
def on_update(self) -> None:
def __init__(self, initial=None):
def on_update(self):
self.modified = True
self.accessed = True
super().__init__(initial, on_update)
super(SecureCookieSession, self).__init__(initial, on_update)
def __getitem__(self, key: str) -> t.Any:
def __getitem__(self, key):
self.accessed = True
return super().__getitem__(key)
return super(SecureCookieSession, self).__getitem__(key)
def get(self, key: str, default: t.Any = None) -> t.Any:
def get(self, key, default=None):
self.accessed = True
return super().get(key, default)
return super(SecureCookieSession, self).get(key, default)
def setdefault(self, key: str, default: t.Any = None) -> t.Any:
def setdefault(self, key, default=None):
self.accessed = True
return super().setdefault(key, default)
return super(SecureCookieSession, self).setdefault(key, default)
class NullSession(SecureCookieSession):
@ -93,18 +98,18 @@ class NullSession(SecureCookieSession):
but fail on setting.
"""
def _fail(self, *args: t.Any, **kwargs: t.Any) -> "te.NoReturn":
def _fail(self, *args, **kwargs):
raise RuntimeError(
"The session is unavailable because no secret "
"key was set. Set the secret_key on the "
"application to something unique and secret."
)
__setitem__ = __delitem__ = clear = pop = popitem = update = setdefault = _fail # type: ignore # noqa: B950
__setitem__ = __delitem__ = clear = pop = popitem = update = setdefault = _fail
del _fail
class SessionInterface:
class SessionInterface(object):
"""The basic interface you have to implement in order to replace the
default session interface which uses werkzeug's securecookie
implementation. The only methods you have to implement are
@ -147,7 +152,7 @@ class SessionInterface:
#: .. versionadded:: 0.10
pickle_based = False
def make_null_session(self, app: "Flask") -> NullSession:
def make_null_session(self, app):
"""Creates a null session which acts as a replacement object if the
real session support could not be loaded due to a configuration
error. This mainly aids the user experience because the job of the
@ -159,7 +164,7 @@ class SessionInterface:
"""
return self.null_session_class()
def is_null_session(self, obj: object) -> bool:
def is_null_session(self, obj):
"""Checks if a given object is a null session. Null sessions are
not asked to be saved.
@ -168,14 +173,7 @@ class SessionInterface:
"""
return isinstance(obj, self.null_session_class)
def get_cookie_name(self, app: "Flask") -> str:
"""Returns the name of the session cookie.
Uses ``app.session_cookie_name`` which is set to ``SESSION_COOKIE_NAME``
"""
return app.session_cookie_name
def get_cookie_domain(self, app: "Flask") -> t.Optional[str]:
def get_cookie_domain(self, app):
"""Returns the domain that should be set for the session cookie.
Uses ``SESSION_COOKIE_DOMAIN`` if it is configured, otherwise
@ -204,13 +202,13 @@ class SessionInterface:
rv = rv.rsplit(":", 1)[0].lstrip(".")
if "." not in rv:
# Chrome doesn't allow names without a '.'. This should only
# come up with localhost. Hack around this by not setting
# the name, and show a warning.
# Chrome doesn't allow names without a '.'
# this should only come up with localhost
# hack around this by not setting the name, and show a warning
warnings.warn(
f"{rv!r} is not a valid cookie domain, it must contain"
" a '.'. Add an entry to your hosts file, for example"
f" '{rv}.localdomain', and use that instead."
'"{rv}" is not a valid cookie domain, it must contain a ".".'
" Add an entry to your hosts file, for example"
' "{rv}.localdomain", and use that instead.'.format(rv=rv)
)
app.config["SESSION_COOKIE_DOMAIN"] = False
return None
@ -228,12 +226,12 @@ class SessionInterface:
# if this is not an ip and app is mounted at the root, allow subdomain
# matching by adding a '.' prefix
if self.get_cookie_path(app) == "/" and not ip:
rv = f".{rv}"
rv = "." + rv
app.config["SESSION_COOKIE_DOMAIN"] = rv
return rv
def get_cookie_path(self, app: "Flask") -> str:
def get_cookie_path(self, app):
"""Returns the path for which the cookie should be valid. The
default implementation uses the value from the ``SESSION_COOKIE_PATH``
config var if it's set, and falls back to ``APPLICATION_ROOT`` or
@ -241,29 +239,27 @@ class SessionInterface:
"""
return app.config["SESSION_COOKIE_PATH"] or app.config["APPLICATION_ROOT"]
def get_cookie_httponly(self, app: "Flask") -> bool:
def get_cookie_httponly(self, app):
"""Returns True if the session cookie should be httponly. This
currently just returns the value of the ``SESSION_COOKIE_HTTPONLY``
config var.
"""
return app.config["SESSION_COOKIE_HTTPONLY"]
def get_cookie_secure(self, app: "Flask") -> bool:
def get_cookie_secure(self, app):
"""Returns True if the cookie should be secure. This currently
just returns the value of the ``SESSION_COOKIE_SECURE`` setting.
"""
return app.config["SESSION_COOKIE_SECURE"]
def get_cookie_samesite(self, app: "Flask") -> str:
def get_cookie_samesite(self, app):
"""Return ``'Strict'`` or ``'Lax'`` if the cookie should use the
``SameSite`` attribute. This currently just returns the value of
the :data:`SESSION_COOKIE_SAMESITE` setting.
"""
return app.config["SESSION_COOKIE_SAMESITE"]
def get_expiration_time(
self, app: "Flask", session: SessionMixin
) -> t.Optional[datetime]:
def get_expiration_time(self, app, session):
"""A helper method that returns an expiration date for the session
or ``None`` if the session is linked to the browser session. The
default implementation returns now + the permanent session
@ -271,9 +267,8 @@ class SessionInterface:
"""
if session.permanent:
return datetime.utcnow() + app.permanent_session_lifetime
return None
def should_set_cookie(self, app: "Flask", session: SessionMixin) -> bool:
def should_set_cookie(self, app, session):
"""Used by session backends to determine if a ``Set-Cookie`` header
should be set for this session cookie for this response. If the session
has been modified, the cookie is set. If the session is permanent and
@ -289,9 +284,7 @@ class SessionInterface:
session.permanent and app.config["SESSION_REFRESH_EACH_REQUEST"]
)
def open_session(
self, app: "Flask", request: "Request"
) -> t.Optional[SessionMixin]:
def open_session(self, app, request):
"""This method has to be implemented and must either return ``None``
in case the loading failed because of a configuration error or an
instance of a session object which implements a dictionary like
@ -299,9 +292,7 @@ class SessionInterface:
"""
raise NotImplementedError()
def save_session(
self, app: "Flask", session: SessionMixin, response: "Response"
) -> None:
def save_session(self, app, session, response):
"""This is called for actual sessions returned by :meth:`open_session`
at the end of the request. This is still called during a request
context so if you absolutely need access to the request you can do
@ -332,9 +323,7 @@ class SecureCookieSessionInterface(SessionInterface):
serializer = session_json_serializer
session_class = SecureCookieSession
def get_signing_serializer(
self, app: "Flask"
) -> t.Optional[URLSafeTimedSerializer]:
def get_signing_serializer(self, app):
if not app.secret_key:
return None
signer_kwargs = dict(
@ -347,37 +336,30 @@ class SecureCookieSessionInterface(SessionInterface):
signer_kwargs=signer_kwargs,
)
def open_session(
self, app: "Flask", request: "Request"
) -> t.Optional[SecureCookieSession]:
def open_session(self, app, request):
s = self.get_signing_serializer(app)
if s is None:
return None
val = request.cookies.get(self.get_cookie_name(app))
val = request.cookies.get(app.session_cookie_name)
if not val:
return self.session_class()
max_age = int(app.permanent_session_lifetime.total_seconds())
max_age = total_seconds(app.permanent_session_lifetime)
try:
data = s.loads(val, max_age=max_age)
return self.session_class(data)
except BadSignature:
return self.session_class()
def save_session(
self, app: "Flask", session: SessionMixin, response: "Response"
) -> None:
name = self.get_cookie_name(app)
def save_session(self, app, session, response):
domain = self.get_cookie_domain(app)
path = self.get_cookie_path(app)
secure = self.get_cookie_secure(app)
samesite = self.get_cookie_samesite(app)
# If the session is modified to be empty, remove the cookie.
# If the session is empty, return without setting the cookie.
if not session:
if session.modified:
response.delete_cookie(
name, domain=domain, path=path, secure=secure, samesite=samesite
app.session_cookie_name, domain=domain, path=path
)
return
@ -390,11 +372,13 @@ class SecureCookieSessionInterface(SessionInterface):
return
httponly = self.get_cookie_httponly(app)
secure = self.get_cookie_secure(app)
samesite = self.get_cookie_samesite(app)
expires = self.get_expiration_time(app, session)
val = self.get_signing_serializer(app).dumps(dict(session)) # type: ignore
val = self.get_signing_serializer(app).dumps(dict(session))
response.set_cookie(
name,
val, # type: ignore
app.session_cookie_name,
val,
expires=expires,
httponly=httponly,
domain=domain,

View file

@ -1,5 +1,14 @@
import typing as t
# -*- coding: utf-8 -*-
"""
flask.signals
~~~~~~~~~~~~~
Implements signals based on blinker if available, otherwise
falls silently back to a noop.
:copyright: 2010 Pallets
:license: BSD-3-Clause
"""
try:
from blinker import Namespace
@ -7,25 +16,25 @@ try:
except ImportError:
signals_available = False
class Namespace: # type: ignore
def signal(self, name: str, doc: t.Optional[str] = None) -> "_FakeSignal":
class Namespace(object):
def signal(self, name, doc=None):
return _FakeSignal(name, doc)
class _FakeSignal:
class _FakeSignal(object):
"""If blinker is unavailable, create a fake class with the same
interface that allows sending of signals but will fail with an
error on anything else. Instead of doing anything on send, it
will just ignore the arguments and do nothing instead.
"""
def __init__(self, name: str, doc: t.Optional[str] = None) -> None:
def __init__(self, name, doc=None):
self.name = name
self.__doc__ = doc
def send(self, *args: t.Any, **kwargs: t.Any) -> t.Any:
def send(self, *args, **kwargs):
pass
def _fail(self, *args: t.Any, **kwargs: t.Any) -> t.Any:
def _fail(self, *args, **kwargs):
raise RuntimeError(
"Signalling support is unavailable because the blinker"
" library is not installed."

View file

@ -1,8 +1,15 @@
import typing as t
# -*- coding: utf-8 -*-
"""
flask.templating
~~~~~~~~~~~~~~~~
Implements the bridge to Jinja2.
:copyright: 2010 Pallets
:license: BSD-3-Clause
"""
from jinja2 import BaseLoader
from jinja2 import Environment as BaseEnvironment
from jinja2 import Template
from jinja2 import TemplateNotFound
from .globals import _app_ctx_stack
@ -10,12 +17,8 @@ from .globals import _request_ctx_stack
from .signals import before_render_template
from .signals import template_rendered
if t.TYPE_CHECKING:
from .app import Flask
from .scaffold import Scaffold
def _default_template_ctx_processor() -> t.Dict[str, t.Any]:
def _default_template_ctx_processor():
"""Default template context processor. Injects `request`,
`session` and `g`.
"""
@ -36,7 +39,7 @@ class Environment(BaseEnvironment):
name of the blueprint to referenced templates if necessary.
"""
def __init__(self, app: "Flask", **options: t.Any) -> None:
def __init__(self, app, **options):
if "loader" not in options:
options["loader"] = app.create_global_jinja_loader()
BaseEnvironment.__init__(self, **options)
@ -48,24 +51,17 @@ class DispatchingJinjaLoader(BaseLoader):
the blueprint folders.
"""
def __init__(self, app: "Flask") -> None:
def __init__(self, app):
self.app = app
def get_source( # type: ignore
self, environment: Environment, template: str
) -> t.Tuple[str, t.Optional[str], t.Optional[t.Callable]]:
def get_source(self, environment, template):
if self.app.config["EXPLAIN_TEMPLATE_LOADING"]:
return self._get_source_explained(environment, template)
return self._get_source_fast(environment, template)
def _get_source_explained(
self, environment: Environment, template: str
) -> t.Tuple[str, t.Optional[str], t.Optional[t.Callable]]:
def _get_source_explained(self, environment, template):
attempts = []
rv: t.Optional[t.Tuple[str, t.Optional[str], t.Optional[t.Callable[[], bool]]]]
trv: t.Optional[
t.Tuple[str, t.Optional[str], t.Optional[t.Callable[[], bool]]]
] = None
trv = None
for srcobj, loader in self._iter_loaders(template):
try:
@ -84,9 +80,7 @@ class DispatchingJinjaLoader(BaseLoader):
return trv
raise TemplateNotFound(template)
def _get_source_fast(
self, environment: Environment, template: str
) -> t.Tuple[str, t.Optional[str], t.Optional[t.Callable]]:
def _get_source_fast(self, environment, template):
for _srcobj, loader in self._iter_loaders(template):
try:
return loader.get_source(environment, template)
@ -94,9 +88,7 @@ class DispatchingJinjaLoader(BaseLoader):
continue
raise TemplateNotFound(template)
def _iter_loaders(
self, template: str
) -> t.Generator[t.Tuple["Scaffold", BaseLoader], None, None]:
def _iter_loaders(self, template):
loader = self.app.jinja_loader
if loader is not None:
yield self.app, loader
@ -106,7 +98,7 @@ class DispatchingJinjaLoader(BaseLoader):
if loader is not None:
yield blueprint, loader
def list_templates(self) -> t.List[str]:
def list_templates(self):
result = set()
loader = self.app.jinja_loader
if loader is not None:
@ -121,7 +113,7 @@ class DispatchingJinjaLoader(BaseLoader):
return list(result)
def _render(template: Template, context: dict, app: "Flask") -> str:
def _render(template, context, app):
"""Renders the template and fires the signal"""
before_render_template.send(app, template=template, context=context)
@ -130,9 +122,7 @@ def _render(template: Template, context: dict, app: "Flask") -> str:
return rv
def render_template(
template_name_or_list: t.Union[str, t.List[str]], **context: t.Any
) -> str:
def render_template(template_name_or_list, **context):
"""Renders a template from the template folder with the given
context.
@ -151,7 +141,7 @@ def render_template(
)
def render_template_string(source: str, **context: t.Any) -> str:
def render_template_string(source, **context):
"""Renders a template from the given template source string
with the given context. Template variables will be autoescaped.

View file

@ -1,22 +1,25 @@
import typing as t
# -*- coding: utf-8 -*-
"""
flask.testing
~~~~~~~~~~~~~
Implements test support helpers. This module is lazily imported
and usually not used in production environments.
:copyright: 2010 Pallets
:license: BSD-3-Clause
"""
import warnings
from contextlib import contextmanager
from copy import copy
from types import TracebackType
import werkzeug.test
from click.testing import CliRunner
from werkzeug.test import Client
from werkzeug.urls import url_parse
from werkzeug.wrappers import Request as BaseRequest
from . import _request_ctx_stack
from .cli import ScriptInfo
from .json import dumps as json_dumps
from .sessions import SessionMixin
if t.TYPE_CHECKING:
from .app import Flask
from .wrappers import Response
class EnvironBuilder(werkzeug.test.EnvironBuilder):
@ -43,14 +46,14 @@ class EnvironBuilder(werkzeug.test.EnvironBuilder):
def __init__(
self,
app: "Flask",
path: str = "/",
base_url: t.Optional[str] = None,
subdomain: t.Optional[str] = None,
url_scheme: t.Optional[str] = None,
*args: t.Any,
**kwargs: t.Any,
) -> None:
app,
path="/",
base_url=None,
subdomain=None,
url_scheme=None,
*args,
**kwargs
):
assert not (base_url or subdomain or url_scheme) or (
base_url is not None
) != bool(
@ -62,15 +65,16 @@ class EnvironBuilder(werkzeug.test.EnvironBuilder):
app_root = app.config["APPLICATION_ROOT"]
if subdomain:
http_host = f"{subdomain}.{http_host}"
http_host = "{0}.{1}".format(subdomain, http_host)
if url_scheme is None:
url_scheme = app.config["PREFERRED_URL_SCHEME"]
url = url_parse(path)
base_url = (
f"{url.scheme or url_scheme}://{url.netloc or http_host}"
f"/{app_root.lstrip('/')}"
base_url = "{scheme}://{netloc}/{path}".format(
scheme=url.scheme or url_scheme,
netloc=url.netloc or http_host,
path=app_root.lstrip("/"),
)
path = url.path
@ -79,9 +83,9 @@ class EnvironBuilder(werkzeug.test.EnvironBuilder):
path += sep + url.query
self.app = app
super().__init__(path, base_url, *args, **kwargs)
super(EnvironBuilder, self).__init__(path, base_url, *args, **kwargs)
def json_dumps(self, obj: t.Any, **kwargs: t.Any) -> str: # type: ignore
def json_dumps(self, obj, **kwargs):
"""Serialize ``obj`` to a JSON-formatted string.
The serialization will be configured according to the config associated
@ -91,6 +95,23 @@ class EnvironBuilder(werkzeug.test.EnvironBuilder):
return json_dumps(obj, **kwargs)
def make_test_environ_builder(*args, **kwargs):
"""Create a :class:`flask.testing.EnvironBuilder`.
.. deprecated: 1.1
Will be removed in 2.0. Construct
``flask.testing.EnvironBuilder`` directly instead.
"""
warnings.warn(
DeprecationWarning(
'"make_test_environ_builder()" is deprecated and will be'
' removed in 2.0. Construct "flask.testing.EnvironBuilder"'
" directly instead."
)
)
return EnvironBuilder(*args, **kwargs)
class FlaskClient(Client):
"""Works like a regular Werkzeug test client but has some knowledge about
how Flask works to defer the cleanup of the request context stack to the
@ -103,23 +124,20 @@ class FlaskClient(Client):
set after instantiation of the `app.test_client()` object in
`client.environ_base`.
Basic usage is outlined in the :doc:`/testing` chapter.
Basic usage is outlined in the :ref:`testing` chapter.
"""
application: "Flask"
preserve_context = False
def __init__(self, *args: t.Any, **kwargs: t.Any) -> None:
super().__init__(*args, **kwargs)
def __init__(self, *args, **kwargs):
super(FlaskClient, self).__init__(*args, **kwargs)
self.environ_base = {
"REMOTE_ADDR": "127.0.0.1",
"HTTP_USER_AGENT": f"werkzeug/{werkzeug.__version__}",
"HTTP_USER_AGENT": "werkzeug/" + werkzeug.__version__,
}
@contextmanager
def session_transaction(
self, *args: t.Any, **kwargs: t.Any
) -> t.Generator[SessionMixin, None, None]:
def session_transaction(self, *args, **kwargs):
"""When used in combination with a ``with`` statement this opens a
session transaction. This can be used to modify the session that
the test client uses. Once the ``with`` block is left the session is
@ -171,64 +189,51 @@ class FlaskClient(Client):
headers = resp.get_wsgi_headers(c.request.environ)
self.cookie_jar.extract_wsgi(c.request.environ, headers)
def open( # type: ignore
self,
*args: t.Any,
as_tuple: bool = False,
buffered: bool = False,
follow_redirects: bool = False,
**kwargs: t.Any,
) -> "Response":
# Same logic as super.open, but apply environ_base and preserve_context.
request = None
def open(self, *args, **kwargs):
as_tuple = kwargs.pop("as_tuple", False)
buffered = kwargs.pop("buffered", False)
follow_redirects = kwargs.pop("follow_redirects", False)
def copy_environ(other):
return {
**self.environ_base,
**other,
"flask._preserve_context": self.preserve_context,
}
if (
not kwargs
and len(args) == 1
and isinstance(args[0], (werkzeug.test.EnvironBuilder, dict))
):
environ = self.environ_base.copy()
if not kwargs and len(args) == 1:
arg = args[0]
if isinstance(args[0], werkzeug.test.EnvironBuilder):
environ.update(args[0].get_environ())
else:
environ.update(args[0])
if isinstance(arg, werkzeug.test.EnvironBuilder):
builder = copy(arg)
builder.environ_base = copy_environ(builder.environ_base or {})
request = builder.get_request()
elif isinstance(arg, dict):
request = EnvironBuilder.from_environ(
arg, app=self.application, environ_base=copy_environ({})
).get_request()
elif isinstance(arg, BaseRequest):
request = copy(arg)
request.environ = copy_environ(request.environ)
if request is None:
kwargs["environ_base"] = copy_environ(kwargs.get("environ_base", {}))
environ["flask._preserve_context"] = self.preserve_context
else:
kwargs.setdefault("environ_overrides", {})[
"flask._preserve_context"
] = self.preserve_context
kwargs.setdefault("environ_base", self.environ_base)
builder = EnvironBuilder(self.application, *args, **kwargs)
try:
request = builder.get_request()
environ = builder.get_environ()
finally:
builder.close()
return super().open( # type: ignore
request,
return Client.open(
self,
environ,
as_tuple=as_tuple,
buffered=buffered,
follow_redirects=follow_redirects,
)
def __enter__(self) -> "FlaskClient":
def __enter__(self):
if self.preserve_context:
raise RuntimeError("Cannot nest client invocations")
self.preserve_context = True
return self
def __exit__(
self, exc_type: type, exc_value: BaseException, tb: TracebackType
) -> None:
def __exit__(self, exc_type, exc_value, tb):
self.preserve_context = False
# Normally the request context is preserved until the next
@ -250,13 +255,11 @@ class FlaskCliRunner(CliRunner):
:meth:`~flask.Flask.test_cli_runner`. See :ref:`testing-cli`.
"""
def __init__(self, app: "Flask", **kwargs: t.Any) -> None:
def __init__(self, app, **kwargs):
self.app = app
super().__init__(**kwargs)
super(FlaskCliRunner, self).__init__(**kwargs)
def invoke( # type: ignore
self, cli: t.Any = None, args: t.Any = None, **kwargs: t.Any
) -> t.Any:
def invoke(self, cli=None, args=None, **kwargs):
"""Invokes a CLI command in an isolated environment. See
:meth:`CliRunner.invoke <click.testing.CliRunner.invoke>` for
full method documentation. See :ref:`testing-cli` for examples.
@ -277,4 +280,4 @@ class FlaskCliRunner(CliRunner):
if "obj" not in kwargs:
kwargs["obj"] = ScriptInfo(create_app=lambda: self.app)
return super().invoke(cli, args, **kwargs)
return super(FlaskCliRunner, self).invoke(cli, args, **kwargs)

View file

@ -1,7 +1,15 @@
import typing as t
# -*- coding: utf-8 -*-
"""
flask.views
~~~~~~~~~~~
This module provides class-based views inspired by the ones in Django.
:copyright: 2010 Pallets
:license: BSD-3-Clause
"""
from ._compat import with_metaclass
from .globals import request
from .typing import ResponseReturnValue
http_method_funcs = frozenset(
@ -9,7 +17,7 @@ http_method_funcs = frozenset(
)
class View:
class View(object):
"""Alternative way to use view functions. A subclass has to implement
:meth:`dispatch_request` which is called with the view arguments from
the URL routing system. If :attr:`methods` is provided the methods
@ -20,7 +28,7 @@ class View:
methods = ['GET']
def dispatch_request(self, name):
return f"Hello {name}!"
return 'Hello %s!' % name
app.add_url_rule('/hello/<name>', view_func=MyView.as_view('myview'))
@ -42,10 +50,10 @@ class View:
"""
#: A list of methods this view can handle.
methods: t.Optional[t.List[str]] = None
methods = None
#: Setting this disables or force-enables the automatic options handling.
provide_automatic_options: t.Optional[bool] = None
provide_automatic_options = None
#: The canonical way to decorate class-based views is to decorate the
#: return value of as_view(). However since this moves parts of the
@ -56,9 +64,9 @@ class View:
#: view function is created the result is automatically decorated.
#:
#: .. versionadded:: 0.8
decorators: t.List[t.Callable] = []
decorators = ()
def dispatch_request(self) -> ResponseReturnValue:
def dispatch_request(self):
"""Subclasses have to override this method to implement the
actual view function code. This method is called with all
the arguments from the URL rule.
@ -66,9 +74,7 @@ class View:
raise NotImplementedError()
@classmethod
def as_view(
cls, name: str, *class_args: t.Any, **class_kwargs: t.Any
) -> t.Callable:
def as_view(cls, name, *class_args, **class_kwargs):
"""Converts the class into an actual view function that can be used
with the routing system. Internally this generates a function on the
fly which will instantiate the :class:`View` on each request and call
@ -78,8 +84,8 @@ class View:
constructor of the class.
"""
def view(*args: t.Any, **kwargs: t.Any) -> ResponseReturnValue:
self = view.view_class(*class_args, **class_kwargs) # type: ignore
def view(*args, **kwargs):
self = view.view_class(*class_args, **class_kwargs)
return self.dispatch_request(*args, **kwargs)
if cls.decorators:
@ -93,12 +99,12 @@ class View:
# view this thing came from, secondly it's also used for instantiating
# the view class so you can actually replace it with something else
# for testing purposes and debugging.
view.view_class = cls # type: ignore
view.view_class = cls
view.__name__ = name
view.__doc__ = cls.__doc__
view.__module__ = cls.__module__
view.methods = cls.methods # type: ignore
view.provide_automatic_options = cls.provide_automatic_options # type: ignore
view.methods = cls.methods
view.provide_automatic_options = cls.provide_automatic_options
return view
@ -108,7 +114,7 @@ class MethodViewType(type):
"""
def __init__(cls, name, bases, d):
super().__init__(name, bases, d)
super(MethodViewType, cls).__init__(name, bases, d)
if "methods" not in d:
methods = set()
@ -129,7 +135,7 @@ class MethodViewType(type):
cls.methods = methods
class MethodView(View, metaclass=MethodViewType):
class MethodView(with_metaclass(MethodViewType, View)):
"""A class-based view that dispatches request methods to the corresponding
class methods. For example, if you implement a ``get`` method, it will be
used to handle ``GET`` requests. ::
@ -145,7 +151,7 @@ class MethodView(View, metaclass=MethodViewType):
app.add_url_rule('/counter', view_func=CounterAPI.as_view('counter'))
"""
def dispatch_request(self, *args: t.Any, **kwargs: t.Any) -> ResponseReturnValue:
def dispatch_request(self, *args, **kwargs):
meth = getattr(self, request.method.lower(), None)
# If the request method is HEAD and we don't have a handler for it
@ -153,5 +159,5 @@ class MethodView(View, metaclass=MethodViewType):
if meth is None and request.method == "HEAD":
meth = getattr(self, "get", None)
assert meth is not None, f"Unimplemented method {request.method!r}"
assert meth is not None, "Unimplemented method %r" % request.method
return meth(*args, **kwargs)

View file

@ -1,19 +1,33 @@
import typing as t
# -*- coding: utf-8 -*-
"""
flask.wrappers
~~~~~~~~~~~~~~
Implements the WSGI wrappers (request and response).
:copyright: 2010 Pallets
:license: BSD-3-Clause
"""
from werkzeug.exceptions import BadRequest
from werkzeug.wrappers import Request as RequestBase
from werkzeug.wrappers import Response as ResponseBase
from werkzeug.wrappers.json import JSONMixin as _JSONMixin
from . import json
from .globals import current_app
from .helpers import _split_blueprint_path
if t.TYPE_CHECKING:
import typing_extensions as te
from werkzeug.routing import Rule
class Request(RequestBase):
class JSONMixin(_JSONMixin):
json_module = json
def on_json_loading_failed(self, e):
if current_app and current_app.debug:
raise BadRequest("Failed to decode JSON object: {0}".format(e))
raise BadRequest()
class Request(RequestBase, JSONMixin):
"""The request object used by default in Flask. Remembers the
matched endpoint and view arguments.
@ -26,8 +40,6 @@ class Request(RequestBase):
specific ones.
"""
json_module = json
#: The internal URL rule that matched the request. This can be
#: useful to inspect which methods are allowed for the URL from
#: a before/after handler (``request.url_rule.methods``) etc.
@ -38,78 +50,41 @@ class Request(RequestBase):
#: because the request was never internally bound.
#:
#: .. versionadded:: 0.6
url_rule: t.Optional["Rule"] = None
url_rule = None
#: A dict of view arguments that matched the request. If an exception
#: happened when matching, this will be ``None``.
view_args: t.Optional[t.Dict[str, t.Any]] = None
view_args = None
#: If matching the URL failed, this is the exception that will be
#: raised / was raised as part of the request handling. This is
#: usually a :exc:`~werkzeug.exceptions.NotFound` exception or
#: something similar.
routing_exception: t.Optional[Exception] = None
routing_exception = None
@property
def max_content_length(self) -> t.Optional[int]: # type: ignore
def max_content_length(self):
"""Read-only view of the ``MAX_CONTENT_LENGTH`` config key."""
if current_app:
return current_app.config["MAX_CONTENT_LENGTH"]
else:
return None
@property
def endpoint(self) -> t.Optional[str]:
"""The endpoint that matched the request URL.
This will be ``None`` if matching failed or has not been
performed yet.
This in combination with :attr:`view_args` can be used to
reconstruct the same URL or a modified URL.
def endpoint(self):
"""The endpoint that matched the request. This in combination with
:attr:`view_args` can be used to reconstruct the same or a
modified URL. If an exception happened when matching, this will
be ``None``.
"""
if self.url_rule is not None:
return self.url_rule.endpoint
return None
@property
def blueprint(self) -> t.Optional[str]:
"""The registered name of the current blueprint.
def blueprint(self):
"""The name of the current blueprint"""
if self.url_rule and "." in self.url_rule.endpoint:
return self.url_rule.endpoint.rsplit(".", 1)[0]
This will be ``None`` if the endpoint is not part of a
blueprint, or if URL matching failed or has not been performed
yet.
This does not necessarily match the name the blueprint was
created with. It may have been nested, or registered with a
different name.
"""
endpoint = self.endpoint
if endpoint is not None and "." in endpoint:
return endpoint.rpartition(".")[0]
return None
@property
def blueprints(self) -> t.List[str]:
"""The registered names of the current blueprint upwards through
parent blueprints.
This will be an empty list if there is no current blueprint, or
if URL matching failed.
.. versionadded:: 2.0.1
"""
name = self.blueprint
if name is None:
return []
return _split_blueprint_path(name)
def _load_form_data(self) -> None:
def _load_form_data(self):
RequestBase._load_form_data(self)
# In debug mode we're replacing the files multidict with an ad-hoc
@ -124,14 +99,8 @@ class Request(RequestBase):
attach_enctype_error_multidict(self)
def on_json_loading_failed(self, e: Exception) -> "te.NoReturn":
if current_app and current_app.debug:
raise BadRequest(f"Failed to decode JSON object: {e}")
raise BadRequest()
class Response(ResponseBase):
class Response(ResponseBase, JSONMixin):
"""The response object that is used by default in Flask. Works like the
response object from Werkzeug but is set to have an HTML mimetype by
default. Quite often you don't have to create this object yourself because
@ -151,17 +120,18 @@ class Response(ResponseBase):
default_mimetype = "text/html"
json_module = json
def _get_data_for_json(self, cache):
return self.get_data()
@property
def max_cookie_size(self) -> int: # type: ignore
def max_cookie_size(self):
"""Read-only view of the :data:`MAX_COOKIE_SIZE` config key.
See :attr:`~werkzeug.wrappers.Response.max_cookie_size` in
See :attr:`~werkzeug.wrappers.BaseResponse.max_cookie_size` in
Werkzeug's docs.
"""
if current_app:
return current_app.config["MAX_COOKIE_SIZE"]
# return Werkzeug's default when not in an app context
return super().max_cookie_size
return super(Response, self).max_cookie_size