Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Uncovertruth feature/support dax #785

Open
wants to merge 35 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
2fbdc0b
add dax client
Feb 26, 2018
cb32fdb
add dax_endpoints params
Feb 27, 2018
d54c2d1
fix connection
opapy Feb 27, 2018
6899e9f
rmeove connection ppling
opapy Feb 27, 2018
a81605b
fix rebase problem
opapy Mar 1, 2018
eb6fe2b
rebase
opapy Mar 1, 2018
c67a34a
keep connection pool
opapy Mar 1, 2018
11ceb25
fix setup.py
opapy Mar 5, 2018
a891206
add write and read endpoints
opapy Mar 6, 2018
b878b50
fix write and read endpoints parameter
opapy Mar 6, 2018
33f08d6
fix TableConnection
opapy Mar 6, 2018
850e8e0
add write and read client
opapy Mar 6, 2018
c72e107
add OP_READ
opapy Mar 6, 2018
da357ca
fix merge dict
opapy Mar 6, 2018
09fd44c
fix get client
opapy Mar 8, 2018
45f6645
remove debug message
opapy Mar 8, 2018
fc4ad4f
performance tunning
opapy Mar 8, 2018
b9aa012
fix operation name map
opapy Mar 8, 2018
7eb7a20
remove session argument
opapy Mar 12, 2018
cc7435d
remove session arguments
opapy Mar 12, 2018
65d672e
move api call method
opapy Mar 22, 2018
52afadf
add docs
opapy Mar 22, 2018
7d72267
add amazon-dax-client for requirements-dev.txt
opapy Mar 22, 2018
fb649df
minor-fix to support fallback to dynamodb
vedavidhbudimuri May 5, 2018
950e0bd
only install amazon-dax-cient when python_version>=2.7 and python_ver…
opapy Jul 10, 2018
ac8e6f1
add amazon-dax-client>=1.0.5
opapy Jul 10, 2018
d8eff6d
amazon-dax-client==1.0.5
opapy Jan 28, 2019
da4aa23
amazon-dax-client==1.0.5
opapy Jan 28, 2019
bfca679
upgrade dynamodb-dax-clioent
opapy Jan 28, 2019
5ed46cc
remove amazon-dax-client when py26 test
opapy Jan 28, 2019
de2e01e
goodbye python2.6
opapy Jan 28, 2019
38cd04c
goodbye python2.6
opapy Jan 28, 2019
51e8ee7
Merge pull request #1 from pynamodb/master
vedavidhbudimuri May 11, 2019
d74b70f
Merge branch 'feature/support-dax' of https://github.com/uncovertruth…
vedavidhbudimuri May 17, 2020
e8c17a9
updated constants.pyi
vedavidhbudimuri May 17, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions docs/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,24 @@ capacity information in responses. If ``False``, scans will fail
should the server not return consumed capacity information in an
effort to prevent unintentional capacity usage..

dax_write_endpoints
------------------

Default: ``[]``

Connect to DAX endpoints when write operations.

PutItem, DeleteItem, UpdateItem, BatchWriteItem These operations are supported.

dax_read_endpoints
------------------

Default: ``[]``

Connect to DAX endpoints when read operations.

GetItem, Scan, BatchGetItem, Query These operations are supported.

Overriding settings
~~~~~~~~~~~~~~~~~~~

Expand Down
102 changes: 82 additions & 20 deletions pynamodb/connection/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,47 @@
from botocore.exceptions import BotoCoreError
from botocore.session import get_session
from botocore.vendored import requests
from botocore.vendored.requests import Request
from six.moves import range

from pynamodb.compat import NullHandler
from pynamodb.connection.util import pythonic
from pynamodb.connection.dax import (
OP_READ,
OP_WRITE,
DaxClient,
)
from pynamodb.constants import (
RETURN_CONSUMED_CAPACITY_VALUES, RETURN_ITEM_COLL_METRICS_VALUES, COMPARISON_OPERATOR_VALUES,
RETURN_ITEM_COLL_METRICS, RETURN_CONSUMED_CAPACITY, RETURN_VALUES_VALUES, ATTR_UPDATE_ACTIONS,
COMPARISON_OPERATOR, EXCLUSIVE_START_KEY, SCAN_INDEX_FORWARD, SCAN_FILTER_VALUES, ATTR_DEFINITIONS,
BATCH_WRITE_ITEM, CONSISTENT_READ, ATTR_VALUE_LIST, DESCRIBE_TABLE, KEY_CONDITION_EXPRESSION,
BATCH_GET_ITEM, DELETE_REQUEST, SELECT_VALUES, RETURN_VALUES, REQUEST_ITEMS, ATTR_UPDATES,
PROJECTION_EXPRESSION, SERVICE_NAME, DELETE_ITEM, PUT_REQUEST, UPDATE_ITEM, SCAN_FILTER, TABLE_NAME,
INDEX_NAME, KEY_SCHEMA, ATTR_NAME, ATTR_TYPE, TABLE_KEY, EXPECTED, KEY_TYPE, GET_ITEM, UPDATE,
PUT_ITEM, SELECT, ACTION, EXISTS, VALUE, LIMIT, QUERY, SCAN, ITEM, LOCAL_SECONDARY_INDEXES,
KEYS, KEY, EQ, SEGMENT, TOTAL_SEGMENTS, CREATE_TABLE, PROVISIONED_THROUGHPUT, READ_CAPACITY_UNITS,
WRITE_CAPACITY_UNITS, GLOBAL_SECONDARY_INDEXES, PROJECTION, EXCLUSIVE_START_TABLE_NAME, TOTAL,
DELETE_TABLE, UPDATE_TABLE, LIST_TABLES, GLOBAL_SECONDARY_INDEX_UPDATES, ATTRIBUTES,
CONSUMED_CAPACITY, CAPACITY_UNITS, QUERY_FILTER, QUERY_FILTER_VALUES, CONDITIONAL_OPERATOR,
RETURN_CONSUMED_CAPACITY_VALUES, RETURN_ITEM_COLL_METRICS_VALUES,
COMPARISON_OPERATOR_VALUES,
RETURN_ITEM_COLL_METRICS, RETURN_CONSUMED_CAPACITY, RETURN_VALUES_VALUES,
ATTR_UPDATE_ACTIONS,
COMPARISON_OPERATOR, EXCLUSIVE_START_KEY, SCAN_INDEX_FORWARD,
SCAN_FILTER_VALUES, ATTR_DEFINITIONS,
BATCH_WRITE_ITEM, CONSISTENT_READ, ATTR_VALUE_LIST, DESCRIBE_TABLE,
BATCH_GET_ITEM, DELETE_REQUEST, SELECT_VALUES, RETURN_VALUES,
REQUEST_ITEMS, ATTR_UPDATES,
SERVICE_NAME, DELETE_ITEM, PUT_REQUEST, UPDATE_ITEM, SCAN_FILTER,
TABLE_NAME,
INDEX_NAME, KEY_SCHEMA, ATTR_NAME, ATTR_TYPE, TABLE_KEY, EXPECTED,
KEY_TYPE, GET_ITEM, UPDATE,
PUT_ITEM, SELECT, ACTION, EXISTS, VALUE, LIMIT, QUERY, SCAN, ITEM,
LOCAL_SECONDARY_INDEXES,
KEYS, KEY, EQ, SEGMENT, TOTAL_SEGMENTS, CREATE_TABLE,
PROVISIONED_THROUGHPUT, READ_CAPACITY_UNITS,
WRITE_CAPACITY_UNITS, GLOBAL_SECONDARY_INDEXES, PROJECTION,
EXCLUSIVE_START_TABLE_NAME, TOTAL,
DELETE_TABLE, UPDATE_TABLE, LIST_TABLES, GLOBAL_SECONDARY_INDEX_UPDATES,
ATTRIBUTES,
CONSUMED_CAPACITY, CAPACITY_UNITS, QUERY_FILTER, QUERY_FILTER_VALUES,
CONDITIONAL_OPERATOR,
CONDITIONAL_OPERATORS, NULL, NOT_NULL, SHORT_ATTR_TYPES, DELETE, PUT,
ITEMS, DEFAULT_ENCODING, BINARY_SHORT, BINARY_SET_SHORT, LAST_EVALUATED_KEY, RESPONSES, UNPROCESSED_KEYS,
UNPROCESSED_ITEMS, STREAM_SPECIFICATION, STREAM_VIEW_TYPE, STREAM_ENABLED, UPDATE_EXPRESSION,
EXPRESSION_ATTRIBUTE_NAMES, EXPRESSION_ATTRIBUTE_VALUES, KEY_CONDITION_OPERATOR_MAP,
CONDITION_EXPRESSION, FILTER_EXPRESSION, FILTER_EXPRESSION_OPERATOR_MAP, NOT_CONTAINS, AND)
ITEMS, DEFAULT_ENCODING, BINARY_SHORT, BINARY_SET_SHORT,
LAST_EVALUATED_KEY, RESPONSES, UNPROCESSED_KEYS,
UNPROCESSED_ITEMS, STREAM_SPECIFICATION, STREAM_VIEW_TYPE, STREAM_ENABLED,
NOT_CONTAINS, AND, CONDITION_EXPRESSION, UPDATE_EXPRESSION,
EXPRESSION_ATTRIBUTE_NAMES, EXPRESSION_ATTRIBUTE_VALUES,
PROJECTION_EXPRESSION, FILTER_EXPRESSION, KEY_CONDITION_OPERATOR_MAP,
KEY_CONDITION_EXPRESSION, FILTER_EXPRESSION_OPERATOR_MAP)
from pynamodb.exceptions import (
TableError, QueryError, PutError, DeleteError, UpdateError, GetError, ScanError, TableDoesNotExist,
VerboseClientError
Expand Down Expand Up @@ -227,14 +245,23 @@ class Connection(object):
A higher level abstraction over botocore
"""

def __init__(self, region=None, host=None,
def __init__(self, region=None, host=None, max_retry_attempts=None,
base_backoff_ms=None, dax_write_endpoints=None, dax_read_endpoints=None,
fall_back_to_dynamodb=False,
read_timeout_seconds=None, connect_timeout_seconds=None,
max_retry_attempts=None, base_backoff_ms=None,
max_pool_connections=None, extra_headers=None):
self._tables = {}
self.host = host
if not dax_write_endpoints:
dax_write_endpoints = []
if not dax_read_endpoints:
dax_read_endpoints = []
self.dax_write_endpoints = dax_write_endpoints
self.dax_read_endpoints = dax_read_endpoints
self._local = local()
self._client = None
self._dax_write_client = None
self._dax_read_client = None
if region:
self.region = region
else:
Expand All @@ -260,6 +287,11 @@ def __init__(self, region=None, host=None,
else:
self._base_backoff_ms = get_settings_value('base_backoff_ms')

if fall_back_to_dynamodb is not None:
self._fall_back_to_dynamodb = fall_back_to_dynamodb
else:
self._fall_back_to_dynamodb = get_settings_value('fall_back_to_dynamodb')

if max_pool_connections is not None:
self._max_pool_connections = max_pool_connections
else:
Expand Down Expand Up @@ -313,7 +345,9 @@ def dispatch(self, operation_name, operation_kwargs):
req_uuid = uuid.uuid4()

self.send_pre_boto_callback(operation_name, req_uuid, table_name)

data = self._make_api_call(operation_name, operation_kwargs)

self.send_post_boto_callback(operation_name, req_uuid, table_name)

if data and CONSUMED_CAPACITY in data:
Expand Down Expand Up @@ -341,10 +375,20 @@ def _make_api_call(self, operation_name, operation_kwargs):
1. It's faster to avoid using botocore's response parsing
2. It provides a place to monkey patch requests for unit testing
"""
from amazondax.DaxError import DaxClientError
try:
if operation_name in OP_WRITE.keys() and self.dax_write_endpoints:
return self.dax_write_client.dispatch(operation_name, operation_kwargs)
elif operation_name in OP_READ.keys() and self.dax_read_endpoints:
return self.dax_read_client.dispatch(operation_name, operation_kwargs)
except DaxClientError as err:
if not self._fall_back_to_dynamodb:
raise err

operation_model = self.client._service_model.operation_model(operation_name)
request_dict = self.client._convert_to_request_dict(
operation_kwargs,
operation_model,
operation_model
)

for i in range(0, self._max_retry_attempts_exception + 1):
Expand Down Expand Up @@ -507,6 +551,24 @@ def client(self):
self._client = self.session.create_client(SERVICE_NAME, self.region, endpoint_url=self.host, config=config)
return self._client

@property
def dax_write_client(self):
if self._dax_write_client is None:
self._dax_write_client = DaxClient(
endpoints=self.dax_write_endpoints,
region_name=self.region
)
return self._dax_write_client

@property
def dax_read_client(self):
if self._dax_read_client is None:
self._dax_read_client = DaxClient(
endpoints=self.dax_read_endpoints,
region_name=self.region
)
return self._dax_read_client

def get_meta_table(self, table_name, refresh=False):
"""
Returns a MetaTable
Expand Down
33 changes: 33 additions & 0 deletions pynamodb/connection/dax.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# coding: utf-8
from amazondax import AmazonDaxClient


OP_WRITE = {
'PutItem': 'put_item',
'DeleteItem': 'delete_item',
'UpdateItem': 'update_item',
'BatchWriteItem': 'batch_write_item',
}

OP_READ = {
'GetItem': 'get_item',
'Scan': 'scan',
'BatchGetItem': 'batch_get_item',
'Query': 'query',
}

OP_NAME_TO_METHOD = OP_WRITE.copy()
OP_NAME_TO_METHOD.update(OP_READ)


class DaxClient(object):

def __init__(self, endpoints, region_name):
self.connection = AmazonDaxClient(
endpoints=endpoints,
region_name=region_name
)

def dispatch(self, operation_name, kwargs):
method = getattr(self.connection, OP_NAME_TO_METHOD[operation_name])
return method(**kwargs)
15 changes: 13 additions & 2 deletions pynamodb/connection/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,15 @@ def __init__(self,
max_pool_connections=None,
extra_headers=None,
aws_access_key_id=None,
aws_secret_access_key=None):
aws_secret_access_key=None,
dax_write_endpoints=None,
dax_read_endpoints=None,
fall_back_to_dynamodb=False):
if not dax_read_endpoints:
dax_read_endpoints = []
if not dax_write_endpoints:
dax_write_endpoints = []

self._hash_keyname = None
self._range_keyname = None
self.table_name = table_name
Expand All @@ -32,7 +40,10 @@ def __init__(self,
max_retry_attempts=max_retry_attempts,
base_backoff_ms=base_backoff_ms,
max_pool_connections=max_pool_connections,
extra_headers=extra_headers)
extra_headers=extra_headers,
dax_write_endpoints=dax_write_endpoints,
dax_read_endpoints=dax_read_endpoints,
fall_back_to_dynamodb=fall_back_to_dynamodb)

if aws_access_key_id and aws_secret_access_key:
self.connection.session.set_credentials(aws_access_key_id,
Expand Down
9 changes: 9 additions & 0 deletions pynamodb/constants.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,12 @@ CONDITIONAL_OPERATOR: str
AND: str
OR: str
CONDITIONAL_OPERATORS: Any
CONDITION_EXPRESSION: str
UPDATE_EXPRESSION: str
EXPRESSION_ATTRIBUTE_NAMES: str
EXPRESSION_ATTRIBUTE_VALUES: str
PROJECTION_EXPRESSION: str
FILTER_EXPRESSION: str
KEY_CONDITION_OPERATOR_MAP: Any
KEY_CONDITION_EXPRESSION: str
FILTER_EXPRESSION_OPERATOR_MAP:Any
8 changes: 7 additions & 1 deletion pynamodb/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ def __init__(cls, name, bases, attrs):
setattr(attr_obj, 'aws_access_key_id', None)
if not hasattr(attr_obj, 'aws_secret_access_key'):
setattr(attr_obj, 'aws_secret_access_key', None)
if not hasattr(attr_obj, 'dax_write_endpoints'):
setattr(attr_obj, 'dax_write_endpoints', get_settings_value('dax_write_endpoints'))
if not hasattr(attr_obj, 'dax_read_endpoints'):
setattr(attr_obj, 'dax_read_endpoints', get_settings_value('dax_read_endpoints'))
elif issubclass(attr_obj.__class__, (Index, )):
attr_obj.Meta.model = cls
if not hasattr(attr_obj.Meta, "index_name"):
Expand Down Expand Up @@ -1301,7 +1305,9 @@ def _get_connection(cls):
max_pool_connections=cls.Meta.max_pool_connections,
extra_headers=cls.Meta.extra_headers,
aws_access_key_id=cls.Meta.aws_access_key_id,
aws_secret_access_key=cls.Meta.aws_secret_access_key)
aws_secret_access_key=cls.Meta.aws_secret_access_key,
dax_write_endpoints=cls.Meta.dax_write_endpoints,
dax_read_endpoints=cls.Meta.dax_read_endpoints)
return cls._connection

def _deserialize(self, attrs):
Expand Down
3 changes: 3 additions & 0 deletions pynamodb/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
'max_pool_connections': 10,
'extra_headers': None,
'allow_rate_limited_scan_without_consumed_capacity': False,
'dax_write_endpoints': [],
'dax_read_endpoints': [],
'fall_back_to_dynamodb': False
}

OVERRIDE_SETTINGS_PATH = getenv('PYNAMODB_CONFIG', '/etc/pynamodb/global_default_settings.py')
Expand Down
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
# we didn't want to bump the actual dependency of the library for consumers as it would effectively
# be a breaking change. As a result, we use the 1.6.0 dependency for development here for the
# purpose of integration tests, even though requirements.txt still has 1.2.0.
amazon-dax-client==1.0.6
botocore==1.11.4
six==1.10.0
coverage==4.5.3
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
amazon-dax-client==1.0.6
botocore==1.2.0
six==1.9.0
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def find_stubs(package):

install_requires = [
'six',
'amazon-dax-client>=1.0.6;python_version>="2.7" and python_version>="3.5"',
'botocore>=1.11.0',
'python-dateutil>=2.1,<3.0.0',
]
Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
exclude = .tox,docs,build

[tox]
envlist = py26,py27,py33,py34,py35,py36,pypy
envlist = py27,py33,py34,py35,py36,pypy

[testenv]
deps = -rrequirements-dev.txt
Expand Down