Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(views): Add schema support for views & MVs #621

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 182 additions & 41 deletions sqlalchemy_utils/view.py
Original file line number Diff line number Diff line change
@@ -1,50 +1,156 @@
from typing import Any, Dict, List, Optional, TYPE_CHECKING

import sqlalchemy as sa
from sqlalchemy.ext import compiler
from sqlalchemy.schema import DDLElement, PrimaryKeyConstraint

from sqlalchemy_utils.functions import get_columns

if TYPE_CHECKING:
from sqlalchemy.engine.default import DefaultDialect
from sqlalchemy.orm import Session
from sqlalchemy.sql import Selectable
from sqlalchemy.sql.compiler import SQLCompiler


def _prepare_view_identifier(
dialect: 'DefaultDialect',
view_name: str,
schema: Optional[str] = None,
) -> str:
quoted_view_name = dialect.identifier_preparer.quote(view_name)
if schema:
return dialect.identifier_preparer.quote_schema(schema) + '.' + quoted_view_name

return quoted_view_name


class CreateView(DDLElement):
def __init__(self, name, selectable, materialized=False):
def __init__(
self,
name: str,
selectable: 'Selectable',
schema: Optional[str] = None,
):
self.name = name
self.selectable = selectable
self.materialized = materialized
self.schema = schema


@compiler.compiles(CreateView)
def compile_create_materialized_view(element, compiler, **kw):
return 'CREATE {}VIEW {} AS {}'.format(
'MATERIALIZED ' if element.materialized else '',
compiler.dialect.identifier_preparer.quote(element.name),
compiler.sql_compiler.process(element.selectable, literal_binds=True),
def compile_create_view(
element: 'CreateView',
compiler: 'SQLCompiler',
**kw: Any,
) -> str:
view_identifier = _prepare_view_identifier(
compiler.dialect, element.name, element.schema
)
compiled_selectable = compiler.sql_compiler.process(
element.selectable, literal_binds=True
)
return f'CREATE VIEW {view_identifier} AS {compiled_selectable}'


class DropView(DDLElement):
def __init__(self, name, materialized=False, cascade=True):
def __init__(
self,
name: str,
schema: Optional[str] = None,
cascade: Optional[bool] = None,
):
self.name = name
self.materialized = materialized
self.schema = schema
self.cascade = cascade


@compiler.compiles(DropView)
def compile_drop_materialized_view(element, compiler, **kw):
return 'DROP {}VIEW IF EXISTS {} {}'.format(
'MATERIALIZED ' if element.materialized else '',
compiler.dialect.identifier_preparer.quote(element.name),
'CASCADE' if element.cascade else ''
def compile_drop_view(element: 'DropView', compiler: 'SQLCompiler', **kw: Any) -> str:
view_identifier = _prepare_view_identifier(
compiler.dialect, element.name, element.schema
)

stmt = f'DROP VIEW IF EXISTS {view_identifier}'
if element.cascade is True:
stmt += ' CASCADE'
elif element.cascade is False:
stmt += ' RESTRICT'
return stmt


class CreateMaterializedView(DDLElement):
def __init__(
self,
name: str,
selectable: 'Selectable',
schema: Optional[str] = None,
populate: Optional[bool] = None,
):
self.name = name
self.selectable = selectable
self.schema = schema
self.populate = populate


@compiler.compiles(CreateMaterializedView)
def compile_create_materialized_view(
element: 'CreateMaterializedView',
compiler: 'SQLCompiler',
**kw: Any,
) -> str:
view_identifier = _prepare_view_identifier(
dialect=compiler.dialect, view_name=element.name, schema=element.schema
)
compiled_selectable = compiler.sql_compiler.process(
element.selectable, literal_binds=True
)

stmt = f'CREATE MATERIALIZED VIEW {view_identifier} AS {compiled_selectable}'
if element.populate is True:
stmt += ' WITH DATA'
elif element.populate is False:
stmt += ' WITH NO DATA'
return stmt


class DropMaterializedView(DDLElement):
def __init__(
self,
name: str,
schema: Optional[str] = None,
cascade: Optional[bool] = None,
):
self.name = name
self.schema = schema
self.cascade = cascade


@compiler.compiles(DropMaterializedView)
def compile_drop_materialized_view(
element: 'DropMaterializedView',
compiler: 'SQLCompiler',
**kw: Any,
) -> str:
view_identifier = _prepare_view_identifier(
dialect=compiler.dialect, view_name=element.name, schema=element.schema
)
stmt = f'DROP MATERIALIZED VIEW IF EXISTS {view_identifier}'
if element.cascade is True:
stmt += ' CASCADE'
elif element.cascade is False:
stmt += ' RESTRICT'
return stmt


def create_table_from_selectable(
name,
selectable,
indexes=None,
metadata=None,
aliases=None,
**kwargs
):
name: str,
selectable: 'Selectable',
indexes: Optional[List[sa.Index]] = None,
metadata: Optional[sa.MetaData] = None,
aliases: Optional[Dict[str, str]] = None,
schema: Optional[str] = None,
**kwargs: Any,
) -> sa.Table:
if indexes is None:
indexes = []
if metadata is None:
Expand All @@ -60,7 +166,7 @@ def create_table_from_selectable(
)
for c in get_columns(selectable)
] + indexes
table = sa.Table(name, metadata, *args, **kwargs)
table = sa.Table(name, metadata, *args, schema=schema, **kwargs)

if not any([c.primary_key for c in get_columns(selectable)]):
table.append_constraint(
Expand All @@ -70,12 +176,16 @@ def create_table_from_selectable(


def create_materialized_view(
name,
selectable,
metadata,
indexes=None,
aliases=None
):
name: str,
selectable: 'Selectable',
metadata: sa.MetaData,
indexes: Optional[List[sa.Index]] = None,
aliases: Optional[Dict[str, str]] = None,
*,
schema: Optional[str] = None,
populate: Optional[bool] = None,
cascade_on_drop: Optional[bool] = None,
) -> sa.Table:
""" Create a view on a given metadata

:param name: The name of the view to create.
Expand All @@ -87,6 +197,17 @@ def create_materialized_view(
:param aliases:
An optional dictionary containing with keys as column names and values
as column aliases.
:param schema: The name of the schema where the view will be created (optional).
:param populate:
Set ``populate=True`` to create the view with ``WITH DATA``.
Set ``populate=False`` to create the view with ``WITH NO DATA``.
Default to ``None`` for no flags.
See also: https://www.postgresql.org/docs/current/sql-createview.html
:param cascade_on_drop:
Set ``cascade_on_drop=True`` to drop the view with ``CASCADE``.
Set ``cascade_on_drop=False`` to create the view with ``RESTRICT``.
Default to ``None`` for no flags.
See also: https://www.postgresql.org/docs/current/sql-dropmaterializedview.html

Same as for ``create_view`` except that a ``CREATE MATERIALIZED VIEW``
statement is emitted instead of a ``CREATE VIEW``.
Expand All @@ -97,13 +218,14 @@ def create_materialized_view(
selectable=selectable,
indexes=indexes,
metadata=None,
aliases=aliases
aliases=aliases,
schema=schema,
)

sa.event.listen(
metadata,
'after_create',
CreateView(name, selectable, materialized=True)
CreateMaterializedView(name, selectable, schema=schema, populate=populate)
)

@sa.event.listens_for(metadata, 'after_create')
Expand All @@ -114,24 +236,31 @@ def create_indexes(target, connection, **kw):
sa.event.listen(
metadata,
'before_drop',
DropView(name, materialized=True)
DropMaterializedView(name, schema=schema, cascade=cascade_on_drop)
)
return table


def create_view(
name,
selectable,
metadata,
cascade_on_drop=True
):
name: str,
selectable: 'Selectable',
metadata: sa.MetaData,
*,
schema: Optional[str] = None,
cascade_on_drop: Optional[str] = None,
) -> sa.Table:
""" Create a view on a given metadata

:param name: The name of the view to create.
:param selectable: An SQLAlchemy selectable e.g. a select() statement.
:param metadata:
An SQLAlchemy Metadata instance that stores the features of the
database being described.
:param schema: The name of the schema where the view will be created (optional).
:param cascade_on_drop:
Set ``cascade_on_drop=True`` to drop the view with ``CASCADE``.
Set ``cascade_on_drop=False`` to create the view with ``RESTRICT``.
Default to ``None`` for no flags.

The process for creating a view is similar to the standard way that a
table is constructed, except that a selectable is provided instead of
Expand Down Expand Up @@ -160,10 +289,15 @@ def create_view(
table = create_table_from_selectable(
name=name,
selectable=selectable,
metadata=None
metadata=None,
schema=schema,
)

sa.event.listen(metadata, 'after_create', CreateView(name, selectable))
sa.event.listen(
metadata,
'after_create',
CreateView(name, selectable, schema=schema),
)

@sa.event.listens_for(metadata, 'after_create')
def create_indexes(target, connection, **kw):
Expand All @@ -173,26 +307,33 @@ def create_indexes(target, connection, **kw):
sa.event.listen(
metadata,
'before_drop',
DropView(name, cascade=cascade_on_drop)
DropView(name, schema=schema, cascade=cascade_on_drop)
)
return table


def refresh_materialized_view(session, name, concurrently=False):
def refresh_materialized_view(
session: 'Session',
name: str,
concurrently: bool = False,
*,
schema: Optional[str] = None,
) -> None:
""" Refreshes an already existing materialized view

:param session: An SQLAlchemy Session instance.
:param name: The name of the materialized view to refresh.
:param concurrently:
Optional flag that causes the ``CONCURRENTLY`` parameter
to be specified when the materialized view is refreshed.
:param schema: The schema of the view to be refreshed (optional).
"""
# Since session.execute() bypasses autoflush, we must manually flush in
# order to include newly-created/modified objects in the refresh.
session.flush()
session.execute(
sa.text('REFRESH MATERIALIZED VIEW {}{}'.format(
'CONCURRENTLY ' if concurrently else '',
session.bind.engine.dialect.identifier_preparer.quote(name)
_prepare_view_identifier(session.bind.engine.dialect, name, schema),
))
)
Loading