-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add scan option to build packages in subdirectories #139
base: main
Are you sure you want to change the base?
Changes from 4 commits
3904c24
605745d
d782a87
4e6a718
b5dd2e6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,10 +19,10 @@ | |
from pathlib import Path | ||
import shutil | ||
import subprocess | ||
import sys | ||
|
||
from jinja2 import Template | ||
import setuptools | ||
from sphinx.cmd.build import main as sphinx_main | ||
|
||
from ..builder import Builder | ||
from ..collect_inventory_files import collect_inventory_files | ||
|
@@ -523,7 +523,8 @@ def build(self, *, doc_build_folder, output_staging_directory): | |
logger.info( | ||
f"Running sphinx-apidoc: '{' '.join(cmd)}' in '{wrapped_sphinx_directory}'" | ||
) | ||
completed_process = subprocess.run(cmd, cwd=wrapped_sphinx_directory) | ||
completed_process = subprocess.run(cmd, cwd=wrapped_sphinx_directory, | ||
stdout=sys.stdout, stderr=sys.stderr) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same behavior here looks the same as default. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||
msg = f"sphinx-apidoc exited with return code '{completed_process.returncode}'" | ||
if completed_process.returncode == 0: | ||
logger.debug(msg) | ||
|
@@ -533,12 +534,18 @@ def build(self, *, doc_build_folder, output_staging_directory): | |
# Invoke Sphinx-build. | ||
sphinx_output_dir = os.path.abspath( | ||
os.path.join(wrapped_sphinx_directory, 'sphinx_output')) | ||
cmd = [ | ||
'sphinx-build', | ||
wrapped_sphinx_directory, | ||
sphinx_output_dir, | ||
] | ||
logger.info( | ||
f"Running sphinx_build with: [{wrapped_sphinx_directory}, '{sphinx_output_dir}]'" | ||
f"Running Sphinx-build: '{' '.join(cmd)}' in '{wrapped_sphinx_directory}'" | ||
) | ||
returncode = sphinx_main([wrapped_sphinx_directory, sphinx_output_dir]) | ||
msg = f"sphinx_build exited with return code '{returncode}'" | ||
if returncode == 0: | ||
completed_process = subprocess.run(cmd, cwd=wrapped_sphinx_directory, | ||
stdout=sys.stdout, stderr=sys.stderr) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. uncaptured here too |
||
msg = f"Sphinx-build exited with return code '{completed_process.returncode}'" | ||
if completed_process.returncode == 0: | ||
logger.info(msg) | ||
else: | ||
raise RuntimeError(msg) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
# Copyright 2024 R. Kent James <[email protected]> | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
from .impl import main | ||
from .impl import prepare_arguments | ||
|
||
__all__ = [ | ||
'entry_point_data', | ||
] | ||
|
||
entry_point_data = { | ||
'verb': 'scan', | ||
'description': 'Scan subdirectories looking for packages, then build those packages.', | ||
# Called for execution, given parsed arguments object | ||
'main': main, | ||
# Called first to setup argparse, given argparse parser | ||
'prepare_arguments': prepare_arguments, | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,217 @@ | ||
# Copyright 2024 R. Kent James <[email protected]> | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import logging | ||
import multiprocessing as mp | ||
import os | ||
import signal | ||
import sys | ||
import threading | ||
import time | ||
|
||
from catkin_pkg.packages import find_packages_allowing_duplicates | ||
from rosdoc2.verbs.build.impl import main_impl as build_main_impl | ||
from rosdoc2.verbs.build.impl import prepare_arguments as build_prepare_arguments | ||
|
||
logging.basicConfig( | ||
format='[%(name)s] [%(levelname)s] %(message)s', level=logging.INFO) | ||
logger_scan = logging.getLogger('rosdoc2.scan') | ||
|
||
goptions = None | ||
# Setting the BATCH_SIZE or MAX_PACKAGES to a smaller value may be useful in debugging | ||
# this module, to reduce run time or isolate sections that cause hangs. | ||
BATCH_SIZE = 10000 | ||
MAX_PACKAGES = 10000 | ||
WATCHDOG_TIMEOUT = 15 * 60 # Seconds | ||
|
||
|
||
def main(options): | ||
"""Execute the program, catching errors.""" | ||
try: | ||
return main_impl(options) | ||
except Exception as e: # noqa: B902 | ||
if options.debug: | ||
raise | ||
else: | ||
sys.exit(str(e)) | ||
|
||
|
||
class Struct: | ||
"""Wrap argparse options to allow copies.""" | ||
|
||
def __init__(self, **entries): | ||
"""Create a dictionary from option entries.""" | ||
self.__dict__.update(entries) | ||
|
||
|
||
def prepare_arguments(parser): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be good to expose BATCH_SIZE here too otherwise it's a magic number. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As discussed below, I don't think this is useful to the typical user, so I'll just remove batching instead. |
||
"""Add command-line arguments to the argparse object.""" | ||
# Wrap the builder arguments to include their choices. | ||
build_prepare_arguments(parser) | ||
|
||
# Additional options for scan | ||
parser.add_argument( | ||
'--timeout', | ||
'-t', | ||
default=WATCHDOG_TIMEOUT, | ||
help='maximum time in seconds allowed per package', | ||
) | ||
parser.add_argument( | ||
'--max-packages', | ||
'-m', | ||
default=MAX_PACKAGES, | ||
help='maximum number of packages to process' | ||
) | ||
return parser | ||
|
||
|
||
def main_impl(options): | ||
"""Execute the program.""" | ||
global goptions | ||
goptions = options | ||
|
||
if options.install_directory is not None: | ||
# Check that the install directory exists. | ||
if not os.path.exists(options.install_directory): | ||
sys.exit( | ||
f"Error: given install directory '{options.install_directory}' does not exist") | ||
|
||
# Locate the packages to document. | ||
packages = find_packages_allowing_duplicates(options.package_path) | ||
if len(packages) == 0: | ||
logger_scan.error(f'No packages found in subdirectories of {options.package_path}') | ||
exit(1) | ||
max_packages = int(options.max_packages) | ||
if len(packages) > max_packages: | ||
packages = packages[0:max_packages] | ||
|
||
packages_total = len(packages) | ||
packages_done = 0 | ||
logger_scan.info(f'Processing {packages_total} packages') | ||
batch_packages = [] | ||
batch_no = 0 | ||
failed_packages = [] | ||
while len(packages) > 0: | ||
batch_no += 1 | ||
batch_packages.clear() | ||
packages = list(packages.values()) | ||
for i in range(len(packages)): | ||
batch_packages.append(packages.pop()) | ||
if len(batch_packages) >= BATCH_SIZE: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At a broader view, what is the value of the batching for running this? It basically forces the mp.Pool to be reset periodically. And with the current default it's not expected to ever be hit. As such might it make sense to reduce the complexity and not batch things before the pool. If someone wants to debug a build/scan they can change the argument to a specific subfolder which is where the problem is being encountered more intuitively than trying to tune the batch size. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The batching originally got added during development, when things were dying and it was really hard to figure out where and why. With small batch sizes, the problem could be isolated to a small number of possible packages to look for problems. As a debugging tool, it was easy enough to just change the value in code to enable batching. I don't really see the value except in debugging, hence it was not exposed to the user. I have not had issues lately with it dying though, so I think I've caught all of the problems with other tools (adding a timeout, running Sphinx as a subprocess again, better logging of individual package outputs.) But I left it in since it was useful during initial debugging. But I don't feel strongly either, as --max-packages and --timeout mostly serve that need. I think I would rather remove it than expose it to the user though, as it really is not of any value except to the rosdoc2 developer. So unless you have another viewpoint, I'll remove the batching. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I think now that it's working well, it would be better to remove it and keep the much simpler flow of build and list and execute it in a pool. |
||
break | ||
logger_scan.info(f'Begin batch # {batch_no}') | ||
for package in batch_packages: | ||
logger_scan.info(f'Adding {package.name} for processing') | ||
pool = mp.Pool(maxtasksperchild=1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should definitely allow the ability to control the pool size as an argument. Otherwise some machines could easily be overrun if their memory to CPU core counts are different than we usually use. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wilco. |
||
pool_results = pool.imap_unordered(package_impl, batch_packages) | ||
while True: | ||
try: | ||
(package, returns, message) = pool_results.next() | ||
packages_done += 1 | ||
if returns != 0: | ||
logger_scan.warning(f'{package.name} ({packages_done}/{packages_total})' | ||
f' returned {returns}: {message}') | ||
failed_packages.append((package, returns, message)) | ||
else: | ||
logger_scan.info( | ||
f'{package.name} successful ({packages_done}/{packages_total})') | ||
except StopIteration: | ||
break | ||
except BaseException as e: # noqa: B902 | ||
logger_scan.error(f'Unexpected error in scan: {type(e).__name__ + " " + str(e)}') | ||
break | ||
logger_scan.info(f'Finished batch {batch_no}') | ||
# I'd prefer close() then join() but that seems to sometimes hang. | ||
pool.terminate() | ||
|
||
logger_scan.info('scan complete') | ||
if len(failed_packages) > 0: | ||
print(f'{len(failed_packages)} packages failed:') | ||
for failed in failed_packages: | ||
(package, returns, message) = failed | ||
print(f'{package.name}: retval={returns}: {message}') | ||
else: | ||
print('All packages succeeded') | ||
|
||
|
||
def _clocktime(): | ||
return time.strftime('%H:%M:%S') | ||
|
||
|
||
def package_impl(package): | ||
"""Execute for a single function.""" | ||
global goptions | ||
options = Struct(**goptions.__dict__) | ||
package_path = os.path.dirname(package.filename) | ||
options.package_path = package_path | ||
return_value = 100 | ||
message = 'Unknown error' | ||
start = time.time() | ||
had_timeout = threading.Event() | ||
|
||
def watchdog(): | ||
"""Kill the process after a timeout.""" | ||
time.sleep(float(options.timeout)) | ||
had_timeout.set() | ||
os.kill(os.getpid(), signal.SIGINT) | ||
threading.Thread(target=watchdog, daemon=True).start() | ||
|
||
# Generate the doc build directory. | ||
os.makedirs(options.doc_build_directory, exist_ok=True) | ||
|
||
print(f'{_clocktime()} Begin processing {package.name}', flush=True) | ||
# remap output | ||
outfile = open(os.path.join(options.doc_build_directory, f'{package.name}.txt'), 'w') | ||
old_stdout = sys.stdout | ||
old_stderr = sys.stderr | ||
sys.stdout = outfile | ||
sys.stderr = outfile | ||
logging.basicConfig( | ||
format='%(asctime)s [%(name)s] [%(levelname)s] %(message)s', | ||
level=logging.INFO, stream=outfile, force=True) | ||
logger = logging.getLogger('rosdoc2') | ||
logger.info(f'Processing package build at {package_path}') | ||
|
||
try: | ||
# run rosdoc2 for the package | ||
build_main_impl(options) | ||
return_value = 0 | ||
message = 'OK' | ||
except RuntimeError as e: | ||
return_value = 1 | ||
message = type(e).__name__ + ' ' + str(e) | ||
except KeyboardInterrupt as e: | ||
return_value = 2 | ||
if had_timeout.is_set(): | ||
e = TimeoutError(f'runtime {"{:.3f}".format(time.time() - start)} seconds') | ||
message = type(e).__name__ + ' ' + str(e) | ||
except BaseException as e: # noqa: B902 | ||
return_value = 3 | ||
message = type(e).__name__ + ' ' + str(e) | ||
finally: | ||
sys.stdout = old_stdout | ||
sys.stderr = old_stderr | ||
elapsed_time = '{:.3f}'.format(time.time() - start) | ||
if return_value != 0: | ||
print(f'{_clocktime()} Package at {package_path} failed {return_value}: {message}', | ||
flush=True) | ||
logger.error(f'Package at {package_path} failed {return_value}: {message}') | ||
else: | ||
print(f'{_clocktime()} Package {package.name} succeeded ' | ||
f'in {elapsed_time} seconds', flush=True) | ||
logger.info(f'Completed rosdoc2 build for {package_path} ' | ||
f'in {elapsed_time} seconds') | ||
if not outfile.closed: | ||
outfile.close() | ||
return (package, return_value, message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this doing the default behavior with more complexity. The default is that subprocess.run doesn't capture the output which means it goes to the console.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason that I did this is so that I can remap sys.stdout and sys.stderr to a file, and capture the output of individual package runs. IIRC from my tests, without this I could not remap to capture the output.
I was definitely following my "Programming is an Experimental Science"™ philosophy here though, trying things until something worked. But I'll look this over again, and confirm that I really need to do this. It seems strange to me as well that this is needed, but it seems to work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you do find that this helps with that extra bit of debuggability, it's fine to keep it. It would be great if you could add a comment to explain it so others can also find this benefit too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried this again, and I can confirm that leaving
completed_process = subprocess.run(cmd, cwd=working_directory)
without adding, stdout=sys.stdout, stderr=sys.stderr
does not work. The outputs go to the console, which is not what we want, we want to capture to a file.I tried looking at pytest to see how they do it, but got lost in the levels of abstraction. Pytest has a lot of code around capturing, but I am sure they deal with a lot of edge cases that we don't care about.
If I look at subprocess, the code says:
So in fact the default for syserr is not
sys.stderr
but None. We want to capture the output to our own PIPE, which we store insys.stderr
.We would not have to store that pipe in
sys.stderr
in the subprocess, we could define a variable for it, but by doing so we also capture output generated by, for example,print(f"[rosdoc2] adding markdown parser", file=sys.stderr)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I missed that you're changing the sys.stdout/err in the per package scan code here https://github.com/ros-infrastructure/rosdoc2/pull/139/files#diff-4d67c84fa514f2538371532c7f2d3979b42da8852649e61ed9cac143e83f1c55R169-R180
Which is capturing the output different than capturing it through the subprocess native mechanisms. Instead of changing sys.stderr at the global level, I think it would make sense to extend the main_impl to include the ability to pass an alternative output file. And if that output file is set, then pass that through to the subprocess invocation. This will avoid worries about race conditions on globally redirecting for potential other implementations of parallel threading.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm having a hard time understanding and implementing what you want here.
First, I don't see any mechanism within Pool to control capture of stdout and stderr, unlike the basic subprocess.run(). Do you know how to do that?
Once I get into package_impl, output is going to the normal sys.stdout and sys.stderr, and we have to fix that (there is no subprocess.run involved with build_main_impl). I fixed if via
sys.stdout = outfile
. I don't see what else to do.I was uncertain if that was the correct approach, but then I ran across contextlib.redirect_stdout in the standard library, and they do exactly what process_impl does: save the old value of sys.stdout, change sys.stdout to the new value, then restore it when they are done.
So I need more clarity about what you want here.