Skip to content

Commit

Permalink
Make linter happy
Browse files Browse the repository at this point in the history
  • Loading branch information
visch committed Feb 21, 2024
1 parent 685a2e9 commit 84fc4b3
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 58 deletions.
2 changes: 1 addition & 1 deletion plugins/loaders/target-jsonl--andyh1203.lock
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@
"description": "Specifies a custom name for the filename, instead of the stream name.\n\nThe file name will be `{custom_name}-{timestamp}.jsonl`, if `do_timestamp_file` is `true`.\nOtherwise the file name will be `{custom_name}.jsonl`.\n\nIf custom name is not provided, the stream name will be used.\n"
}
]
}
}
127 changes: 78 additions & 49 deletions tap_mysql/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from __future__ import annotations

import datetime
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any, Iterable

import singer_sdk.helpers._typing
Expand All @@ -25,9 +24,9 @@ def patched_conform(
elem: Any, # noqa: ANN401
property_schema: dict,
) -> Any: # noqa: ANN401
"""Override Singer SDK type conformance to prevent dates turning into datetimes.
"""Override type conformance to prevent dates turning into datetimes.
Converts a primitive (i.e. not object or array) to a json compatible type.
Converts a primitive to a json compatible type.
Returns:
The appropriate json compatible type.
Expand All @@ -42,29 +41,43 @@ def patched_conform(

class MySQLConnector(SQLConnector):
"""Connects to the MySQL SQL source."""

def __init__(
self,
*args,
**kwargs
) -> None:

def __init__(self, *args: tuple[Any, ...], **kwargs: dict[str, Any]) -> None:
"""Initialize the SQL connector.
This method initializes the SQL connector with the provided arguments.
It can accept variable-length arguments and keyword arguments to
customize the connection settings.
Args:
is_vitess: Is this a vitess instance?
sqlalchemy_url: Optional URL for the connection.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
"""
super().__init__(*args, **kwargs)
# Check if we are using PlanetScale, if so we need to let our connector know
# Ideally we'd just check to see if we're on Vitess, but I don't know how to do that quickly
# Check if we are using PlanetScale,
# if so we need to let our connector know
# Ideally we'd just check to see if we're on Vitess,
# but I don't know how to do that quickly
self.is_vitess = False

if self.config.get("is_vitess") is None:
self.logger.info("No is_vitess configuration provided, dynamically checking if we are using a Vitess instance.")
self.logger.info(
"No is_vitess configuration provided, dynamically checking if "
"we are using a Vitess instance."
)
with self._connect() as conn:
output = conn.execute("select variable_value from performance_schema.global_variables where variable_name='version_comment' and variable_value like 'PlanetScale%%'")
output = conn.execute(
"select variable_value from "
"performance_schema.global_variables where "
"variable_name='version_comment' and variable_value like "
"'PlanetScale%%'"
)
if output.rowcount > 0:
self.logger.info("Instance has been detected to be a Vitess (PlanetScale) instance, using Vitess configuration.")
self.logger.info(
"Instance has been detected to be a "
"Vitess (PlanetScale) instance, using Vitess "
"configuration."
)
self.is_vitess = True
output.close()

Expand All @@ -79,15 +92,16 @@ def to_jsonschema_type(
Overridden from SQLConnector to correctly handle JSONB and Arrays.
By default will call `typing.to_jsonschema_type()` for strings and SQLAlchemy
types.
By default will call `typing.to_jsonschema_type()` for strings and
SQLAlchemy types.
Args:
sql_type: The string representation of the SQL type, a SQLAlchemy
TypeEngine class or object, or a custom-specified object.
Raises:
ValueError: If the type received could not be translated to jsonschema.
ValueError: If the type received could not be translated to
jsonschema.
Returns:
The JSON Schema representation of the provided type.
Expand Down Expand Up @@ -196,17 +210,17 @@ def get_schema_names(self, engine: Engine, inspected: Inspector) -> list[str]:
if "filter_schemas" in self.config and len(self.config["filter_schemas"]) != 0:
return self.config["filter_schemas"]
return super().get_schema_names(engine, inspected)
def discover_catalog_entry(

def discover_catalog_entry( # noqa: PLR0913
self,
engine: Engine, # noqa: ARG002
engine: Engine,
inspected: Inspector,
schema_name: str,
table_name: str,
is_view: bool, # noqa: FBT001
) -> CatalogEntry:
"""Overrode to support Vitess as DESCRIBE is not supported for views.
Create `CatalogEntry` object for the given table or a view.
Args:
Expand All @@ -220,9 +234,11 @@ def discover_catalog_entry(
`CatalogEntry` object for the given table or a view
"""
if self.is_vitess is False or is_view is False:
return super().discover_catalog_entry(engine, inspected, schema_name, table_name, is_view)
# For vitess views, we can't use DESCRIBE as it's not supported for views so we do the below
# Initialize unique stream name
return super().discover_catalog_entry(
engine, inspected, schema_name, table_name, is_view
)
# For vitess views, we can't use DESCRIBE as it's not supported for
# views so we do the below.
unique_stream_id = self.get_fully_qualified_name(
db_name=None,
schema_name=schema_name,
Expand All @@ -232,7 +248,6 @@ def discover_catalog_entry(

# Detect key properties
possible_primary_keys: list[list[str]] = []
#pk_def = inspected.get_pk_constraint(table_name, schema=schema_name)
pk_def = False
if pk_def and "constrained_columns" in pk_def:
possible_primary_keys.append(pk_def["constrained_columns"])
Expand All @@ -241,7 +256,7 @@ def discover_catalog_entry(
# returned in the ``expressions`` list of the reflected index.
possible_primary_keys.extend(
index_def["column_names"] # type: ignore[misc]
for index_def in [] #inspected.get_indexes(table_name, schema=schema_name)
for index_def in [] # inspected.get_indexes(table_name, schema=schema_name)
if index_def.get("unique", False)
)

Expand Down Expand Up @@ -295,43 +310,52 @@ def discover_catalog_entry(
replication_key=None, # Must be defined by user
)

def get_sqlalchemy_type(self, col_meta_type):
def get_sqlalchemy_type(self, col_meta_type: str) -> sqlalchemy.Column:
"""Return a SQLAlchemy type object for the given SQL type.
Used ischema_names so we don't have to manually map all types.
"""
dialect = sqlalchemy.dialects.mysql.base.dialect()
dialect = sqlalchemy.dialects.mysql.base.dialect()
ischema_names = dialect.ischema_names
# Example varchar(97)
type_info = col_meta_type.split('(')
base_type_name = type_info[0].split(' ')[0] # bigint unsigned
type_args = type_info[1].split(' ')[0].rstrip(')') if len(type_info) > 1 else None #decimal(25,4) unsigned should work
type_info = col_meta_type.split("(")
base_type_name = type_info[0].split(" ")[0] # bigint unsigned
type_args = (
type_info[1].split(" ")[0].rstrip(")") if len(type_info) > 1 else None
) # decimal(25,4) unsigned should work

if base_type_name in {"enum", "set"}:
self.logger.warning(f"Enum and Set types not supported for {col_meta_type=}. Using varchar instead.")
self.logger.warning(
"Enum and Set types not supported for col_meta_type=%s. "
"Using varchar instead.",
col_meta_type,
)
base_type_name = "varchar"
type_args = None

type_class = ischema_names.get(base_type_name.lower())

try:
try:
if type_class:
# Create an instance of the type class with parameters if they exist
if type_args:
return type_class(*map(int, type_args.split(','))) #Want to create a varchar(97) if asked for
else:
return type_class()
except Exception as e:
self.logger.error(f"Error creating sqlalchemy type for {col_meta_type=}")
raise e

return type_class(
*map(int, type_args.split(","))
) # Want to create a varchar(97) if asked for
return type_class()
except Exception:
self.logger.exception(
"Error creating sqlalchemy type for col_meta_type=%s", col_meta_type
)
raise

def get_table_columns(
self,
full_table_name: str,
column_names: list[str] | None = None,
) -> dict[str, sqlalchemy.Column]:
"""Overrode to support Vitess as DESCRIBE is not supported for views.
Return a list of table columns.
Args:
Expand All @@ -343,16 +367,19 @@ def get_table_columns(
"""
if self.is_vitess is False:
return self.get_table_columns(full_table_name, column_names)
# If Vitess Instance then we can't use DESCRIBE as it's not supported for views so we do below
# If Vitess Instance then we can't use DESCRIBE as it's not supported
# for views so we do below
if full_table_name not in self._table_cols_cache:
_, schema_name, table_name = self.parse_full_table_name(full_table_name)
with self._connect() as conn:
columns = conn.execute(f"SHOW columns from `{schema_name}`.`{table_name}`")
columns = conn.execute(
f"SHOW columns from `{schema_name}`.`{table_name}`"
)
self._table_cols_cache[full_table_name] = {
col_meta["Field"]: sqlalchemy.Column(
col_meta["Field"],
self.get_sqlalchemy_type(col_meta["Type"]),
nullable=col_meta["Null"] == "YES"
nullable=col_meta["Null"] == "YES",
)
for col_meta in columns
if not column_names
Expand Down Expand Up @@ -411,8 +438,10 @@ def get_records(self, context: dict | None) -> Iterable[dict[str, Any]]:
if start_val:
query = query.filter(replication_key_col >= start_val)

with self.connector._connect() as conn:
with self.connector.connect() as conn:
if self.connector.is_vitess:
conn.exec_driver_sql("set workload=olap") # See https://github.com/planetscale/discussion/discussions/190
conn.exec_driver_sql(
"set workload=olap"
) # See https://github.com/planetscale/discussion/discussions/190
for row in conn.execute(query):
yield dict(row)
22 changes: 14 additions & 8 deletions tap_mysql/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,17 @@ def __init__(
See https://github.com/MeltanoLabs/tap-postgres/issues/141
"""
super().__init__(*args, **kwargs)
assert (self.config.get("sqlalchemy_url") is not None) or ( # noqa: S101
if (self.config.get("sqlalchemy_url") is not None) or (
self.config.get("host") is not None
and self.config.get("port") is not None
and self.config.get("user") is not None
and self.config.get("password") is not None
), (
"Need either the sqlalchemy_url to be set or host, port, user,"
" and password to be set"
)
):
msg = (
"Need either the sqlalchemy_url to be set or host, port, "
"user, and password to be set"
)
raise ValueError(msg)

config_jsonschema = th.PropertiesList(
th.Property(
Expand Down Expand Up @@ -91,15 +93,19 @@ def __init__(
"sqlalchemy_options",
th.ObjectType(additional_properties=th.StringType),
description=(
"sqlalchemy_url options (also called the query), to connect to PlanetScale you must turn on SSL see PlanetScale information below. Note if sqlalchemy_url is set this will be ignored."
"sqlalchemy_url options (also called the query), to connect to "
"PlanetScale you must turn on SSL see PlanetScale information "
"below. Note if sqlalchemy_url is set this will be ignored."
),
),
th.Property(
"sqlalchemy_url",
th.StringType,
secret=True,
description=(
"Example pymysql://[username]:[password]@localhost:3306/[db_name][?options] see https://docs.sqlalchemy.org/en/20/dialects/mysql.html#module-sqlalchemy.dialects.mysql.pymysql for more information"
"Example pymysql://[username]:[password]@localhost:3306/[db_name][?options] " # noqa: E501
"see https://docs.sqlalchemy.org/en/20/dialects/mysql.html#module-sqlalchemy.dialects.mysql.pymysql " # noqa: E501
"for more information"
),
),
th.Property(
Expand Down Expand Up @@ -215,7 +221,7 @@ def connector(self) -> MySQLConnector:

return MySQLConnector(
config=dict(self.config),
sqlalchemy_url=url.render_as_string(hide_password=False)
sqlalchemy_url=url.render_as_string(hide_password=False),
)

def guess_key_type(self, key_data: str) -> paramiko.PKey:
Expand Down

0 comments on commit 84fc4b3

Please sign in to comment.