diff --git a/python/lsst/daf/butler/remote_butler/server/handlers/_external_query.py b/python/lsst/daf/butler/remote_butler/server/handlers/_external_query.py index b5b73e04b6..f7ad755efb 100644 --- a/python/lsst/daf/butler/remote_butler/server/handlers/_external_query.py +++ b/python/lsst/daf/butler/remote_butler/server/handlers/_external_query.py @@ -71,12 +71,15 @@ def query_execute( with ExitStack() as exit_stack: ctx = exit_stack.enter_context(_get_query_context(factory, request.query)) spec = request.result_spec.to_result_spec(ctx.driver.universe) + print("before execute") response_pages = ctx.driver.execute(spec, ctx.tree) # We write the response incrementally, one page at a time, as # newline-separated chunks of JSON. This allows clients to start # reading results earlier and prevents the server from exhausting # all its memory buffering rows from large queries. + + print("before stream") output_generator = _stream_query_pages( # Transfer control of the context manager to # _stream_query_pages. @@ -114,11 +117,13 @@ async def _stream_query_pages( When it takes longer than 15 seconds to get a response from the DB, sends a keep-alive message to prevent clients from timing out. """ + print("in stream") # Ensure that the database connection is cleaned up by taking control of # exit_stack. async with contextmanager_in_threadpool(exit_stack): iterator = iterate_in_threadpool(serialize_query_pages(spec, pages)) done = False + print("starting iteration") while not done: # Read the next value from the iterator, possibly with some # additional keep-alive messages if it takes a long time. @@ -140,12 +145,15 @@ async def _fetch_next_with_keepalives(iterator: AsyncIterator[str]) -> AsyncIter future = asyncio.ensure_future(anext(iterator, None)) ready = False while not ready: + print("waiting for task") (finished_task, pending_task) = await asyncio.wait([future], timeout=15) if pending_task: + print("timeout") # Hit the timeout, send a keep-alive and keep waiting. yield QueryKeepAliveModel().model_dump_json() else: # The next value from the iterator is ready to read. + print("ready") ready = True finally: # Even if we get cancelled above, we need to wait for this iteration to