You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
SELECT titel, 2011-Jahr AS alt, 'Jahre alt' AS Text
FROM buch
WHERE jahr > 1997
ORDER BY alt DESC, titel
SELECT B.buchid, B.titel, V.name, V.ort, B.jahr
FROM buch B NATURAL JOIN verlag V
WHERE V.name='Springer' AND B.jahr>=1990
ORDER BY V.ort
SELECT DISTINCT A.nachname, A.vornamen, A.autorid
FROM autor A NATURAL JOIN buch_aut BA
NATURAL JOIN buch_sw BS NATURAL JOIN schlagwort SW
WHERE SW.schlagwort = 'Datenbank'
ORDER BY A.nachname
SELECT buchid, titel, jahr
FROM buch
WHERE jahr=(SELECT MIN(jahr) FROM buch)
“Parallel” Queries...
sort of...
not really...
gevent
Fast event loop
Lightweight execution units
Monkey patching utility
Coloring PostgreSQL Green
def gevent_wait_callback(conn, timeout=None):
"""A wait callback useful to allow gevent to work with Psycopg."""
while True:
state = conn.poll()
if state == extensions.POLL_OK:
break
elif state == extensions.POLL_READ:
wait_read(conn.fileno(), timeout=timeout)
elif state == extensions.POLL_WRITE:
wait_write(conn.fileno(), timeout=timeout)
else:
raise psycopg2.OperationalError(
"Bad result from poll: %r" % state)
Coloring PostgreSQL Green
def make_psycopg_green():
"""Configure Psycopg to be used with gevent in non-blocking way."""
if not hasattr(extensions, 'set_wait_callback'):
raise ImportError(
"support for coroutines not available in this Psycopg version (%s)"
% psycopg2.__version__)
extensions.set_wait_callback(gevent_wait_callback)
def executor(self, number):
while not self.tasks.empty():
query = self.tasks.get()
try:
results = self.__query(query)
self.output_queue.put(results)
except Exception as exc_info:
print exc_info
print 'Query failed :('
self.tasks.task_done()
Building a Query Pool (Overseer)
def overseer(self):
for query in self.queries:
self.tasks.put(query)
Building a Query Pool (runner)
def run(self):
self.running = []
gevent.spawn(self.overseer).join()
for i in range(self.POOL_MAX):
runner = gevent.spawn(self.executor, i)
runner.start()
self.running.append(runner)
self.tasks.join()
for runner in self.running:
runner.kill()
return [x for x in self.output_queue]
def get_value(self, merge=True, createfunc=None,
expiration_time=None, ignore_expiration=False):
dogpile_region, cache_key = self._get_cache_plus_key()
assert not ignore_expiration or not createfunc, \
"Can't ignore expiration and also provide createfunc"
if ignore_expiration or not createfunc:
cached_value = dogpile_region.get(
cache_key,
expiration_time=expiration_time,
ignore_expiration=ignore_expiration
)
CachingQuery (Getter - cont)
else:
try:
cached_value = dogpile_region.get_or_create(
cache_key,
createfunc,
expiration_time=expiration_time
)
except ConnectionError:
logger.error('Cannot connect to query caching backend!')
cached_value = createfunc()
if cached_value is NO_VALUE:
raise KeyError(cache_key)
if merge:
cached_value = self.merge_result(cached_value, load=False)
return cached_value
def _key_from_query(query, qualifier=None):
stmt = query.with_labels().statement
compiled = stmt.compile()
params = compiled.params
return " ".join([str(compiled)] +
[str(params[k]) for k in sorted(params)])
SQLAlchemy Options (FromQuery)
class FromCache(MapperOption):
"""Specifies that a Query should load results from a cache."""
propagate_to_loaders = False
def __init__(self, region="default", cache_key=None, cache_prefix=None):
self.region = region
self.cache_key = cache_key
self.cache_prefix = cache_prefix
def process_query(self, query):
query._cache_region = self