Skip to content

Commit

Permalink
[search/save] Run bystro-stats on saved annotations (#321)
Browse files Browse the repository at this point in the history
* Add running bystro-stats on annotation output
* Add tests for search/utils/annotation
* Fix startup.yml and ensure proteomics server is defined in
beanstalkd.yml

Partially addresses #314
  • Loading branch information
akotlar authored Oct 25, 2023
1 parent a2db727 commit 9a7584a
Show file tree
Hide file tree
Showing 12 changed files with 583 additions and 112 deletions.
3 changes: 3 additions & 0 deletions config/beanstalk.clean.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@ beanstalkd:
ancestry:
submission: ancestry
events: ancestry_events
proteomics:
submission: proteomics
events: proteomics_events
2 changes: 1 addition & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[build-system]
requires = ["maturin>=0.14,<0.15", "setuptools", "wheel", "Cython"]
requires = ["maturin>=1.3.0,<1.4.0", "setuptools", "wheel", "Cython"]
build-backend = "maturin"

[project]
Expand Down
14 changes: 7 additions & 7 deletions python/python/bystro/search/index/bystro_file.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ cdef class ReadAnnotationTarball:
list paths
int id

def __cinit__(self, str index_name, dict delimiters, str tar_path, str annotation_name = 'annotation.tsv.gz', int chunk_size=500):
def __cinit__(self, str index_name, object delimiters, str tar_path, str annotation_name = 'annotation.tsv.gz', int chunk_size=500):
self.index_name = index_name
self.chunk_size = chunk_size
self.field_separator = delimiters['field']
self.position_delimiter = delimiters['position']
self.overlap_delimiter = delimiters['overlap']
self.value_delimiter = delimiters['value']
self.empty_field_char = delimiters['empty_field']
self.field_separator = delimiters.field
self.position_delimiter = delimiters.position
self.overlap_delimiter = delimiters.overlap
self.value_delimiter = delimiters.value
self.empty_field_char = delimiters.empty_field

t = tarfile.open(tar_path)
for member in t.getmembers():
Expand Down Expand Up @@ -117,5 +117,5 @@ cdef class ReadAnnotationTarball:
def get_header_fields(self):
return self.header_fields

cpdef ReadAnnotationTarball read_annotation_tarball(str index_name, dict delimiters, str tar_path, str annotation_name = 'annotation.tsv.gz', int chunk_size=500):
cpdef ReadAnnotationTarball read_annotation_tarball(str index_name, object delimiters, str tar_path, str annotation_name = 'annotation.tsv.gz', int chunk_size=500):
return ReadAnnotationTarball(index_name, delimiters, tar_path, annotation_name, chunk_size)
18 changes: 12 additions & 6 deletions python/python/bystro/search/index/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

from bystro.beanstalkd.worker import ProgressPublisher, get_progress_reporter
from bystro.search.index.bystro_file import ( # type: ignore # pylint: disable=no-name-in-module,import-error # noqa: E501
read_annotation_tarball,
read_annotation_tarball,
)
from bystro.search.utils.annotation import get_delimiters
from bystro.search.utils.annotation import DelimitersConfig
from bystro.search.utils.opensearch import gather_opensearch_args

ray.init(ignore_reinit_error=True, address="auto")
Expand Down Expand Up @@ -47,7 +47,9 @@ async def index(self, data):
self.counter += resp[0]

if self.counter >= self.reporter_batch:
await asyncio.to_thread(self.progress_tracker.increment.remote, self.counter)
await asyncio.to_thread(
self.progress_tracker.increment.remote, self.counter
)
self.counter = 0

return resp
Expand Down Expand Up @@ -96,7 +98,9 @@ async def go(

if not index_body["settings"].get("number_of_shards"):
file_size = os.path.getsize(tar_path)
index_body["settings"]["number_of_shards"] = ceil(float(file_size) / float(1e10))
index_body["settings"]["number_of_shards"] = ceil(
float(file_size) / float(1e10)
)

try:
await client.indices.create(index_name, body=index_body)
Expand All @@ -106,7 +110,7 @@ async def go(
data = read_annotation_tarball(
index_name=index_name,
tar_path=tar_path,
delimiters=get_delimiters(),
delimiters=DelimitersConfig(),
chunk_size=paralleleism_chunk_size,
)

Expand Down Expand Up @@ -154,7 +158,9 @@ async def go(

if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Process some config files.")
parser.add_argument("--tar", type=str, help="Path to the tarball containing the annotation")
parser.add_argument(
"--tar", type=str, help="Path to the tarball containing the annotation"
)

parser.add_argument(
"--search_conf",
Expand Down
17 changes: 10 additions & 7 deletions python/python/bystro/search/index/tests/test_bystro_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from bystro.search.index.bystro_file import ( # type: ignore # pylint: disable=no-name-in-module,import-error # noqa: E501
read_annotation_tarball,
)
from bystro.search.utils.annotation import get_delimiters
from bystro.search.utils.annotation import DelimitersConfig


def create_mock_tarball(annotation_content):
Expand All @@ -27,11 +27,11 @@ def create_mock_tarball(annotation_content):


def test_read_annotation_tarball():
delims = get_delimiters()
delim_v = delims["value"] # e.g. ;
delim_f = delims["field"] # e.g. \t
delim_o = delims["overlap"] # e.g. chr(31)
delim_p = delims["position"] # e.g. |
delims = DelimitersConfig()
delim_v = delims.value # e.g. ;
delim_f = delims.field # e.g. \t
delim_o = delims.overlap # e.g. chr(31)
delim_p = delims.position # e.g. |

header = f"field1{delim_f}field2{delim_f}field3\n"
field1val = f"value1a{delim_v}value1b{delim_p}value2aa{delim_o}value2ab{delim_v}value2b{delim_f}"
Expand Down Expand Up @@ -70,9 +70,12 @@ def test_read_annotation_tarball():
},
}
]

import numpy as np
import json
result_data = next(reader)
assert result_data == expected_data
np.testing.assert_equal(result_data, expected_data)
print(json.dumps(expected_data, indent=4))

# Test the end of the data
with pytest.raises(StopIteration):
Expand Down
66 changes: 48 additions & 18 deletions python/python/bystro/search/save/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,30 @@
# TODO 2023-05-08: concatenate chunks in a different ray worker

import gzip
import logging
import math
import os
import pathlib
import subprocess
import traceback
from typing import Any

import numpy as np
import ray

from opensearchpy import OpenSearch

from bystro.beanstalkd.worker import ProgressPublisher, get_progress_reporter
from bystro.search.utils.annotation import AnnotationOutputs, get_delimiters
from bystro.search.utils.annotation import (
AnnotationOutputs,
DelimitersConfig,
)
from bystro.search.utils.messages import SaveJobData
from bystro.search.utils.opensearch import gather_opensearch_args
from bystro.utils.compress import GZIP_EXECUTABLE
from bystro.utils.tar import GNU_TAR_EXECUTABLE_NAME

logger = logging.getLogger(__name__)

ray.init(ignore_reinit_error=True, address="auto")

Expand Down Expand Up @@ -57,8 +66,8 @@ def _get_header(field_names):
return parents, children


def _populate_data(field_path, data_for_end_of_path):
if not isinstance(field_path, list) or data_for_end_of_path is None:
def _populate_data(field_path: list[str] | str, data_for_end_of_path: Any):
if not isinstance(data_for_end_of_path, dict):
return data_for_end_of_path

for child_field in field_path:
Expand All @@ -70,8 +79,8 @@ def _populate_data(field_path, data_for_end_of_path):
return data_for_end_of_path


def _make_output_string(rows: list, delims: dict):
empty_field_char = delims["empty_field"]
def _make_output_string(rows: list, delims: DelimitersConfig):
empty_field_char = delims.empty_field
for row_idx, row in enumerate(rows): # pylint:disable=too-many-nested-blocks
# Some fields may just be missing; we won't store even the alt/pos [[]] structure for those
for i, column in enumerate(row):
Expand Down Expand Up @@ -99,18 +108,23 @@ def _make_output_string(rows: list, delims: dict):

if isinstance(sub, list):
inner_values.append(
delims["overlap"].join(
map(lambda x: str(x) if x is not None else empty_field_char, sub)
delims.overlap.join(
map(
lambda x: str(x)
if x is not None
else empty_field_char,
sub,
)
)
)
else:
inner_values.append(str(sub))

column[j] = delims["value"].join(inner_values)
column[j] = delims.value.join(inner_values)

row[i] = delims["position"].join(column)
row[i] = delims.position.join(column)

rows[row_idx] = delims["field"].join(row)
rows[row_idx] = delims.field.join(row)

return bytes("\n".join(rows) + "\n", encoding="utf-8")

Expand Down Expand Up @@ -143,7 +157,9 @@ def _process_query(
for doc in resp["hits"]["hits"]:
row = np.empty(len(field_names), dtype=object)
for y in range(len(field_names)):
row[y] = _populate_data(child_fields[y], doc["_source"].get(parent_fields[y]))
row[y] = _populate_data(
child_fields[y], doc["_source"].get(parent_fields[y])
)

if row[discordant_idx][0][0] is False:
row[discordant_idx][0][0] = 0
Expand Down Expand Up @@ -191,7 +207,7 @@ def go( # pylint:disable=invalid-name
output_dir = os.path.dirname(job_data.outputBasePath)
basename = os.path.basename(job_data.outputBasePath)
pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
outputs = AnnotationOutputs.from_path(output_dir, basename, True)
outputs, stats = AnnotationOutputs.from_path(output_dir, basename, compress=True)

written_chunks = [os.path.join(output_dir, f"{job_data.indexName}_header")]

Expand All @@ -203,7 +219,9 @@ def go( # pylint:disable=invalid-name
client = OpenSearch(**search_client_args)

query = _clean_query(job_data.queryBody)
num_slices = _get_num_slices(client, job_data.indexName, max_query_size, max_slices, query)
num_slices = _get_num_slices(
client, job_data.indexName, max_query_size, max_slices, query
)
pit_id = client.create_point_in_time(index=job_data.indexName, params={"keep_alive": keep_alive})["pit_id"] # type: ignore # noqa: E501
try:
reporter = get_progress_reporter(publisher)
Expand All @@ -212,7 +230,9 @@ def go( # pylint:disable=invalid-name

reqs = []
for slice_id in range(num_slices):
written_chunks.append(os.path.join(output_dir, f"{job_data.indexName}_{slice_id}"))
written_chunks.append(
os.path.join(output_dir, f"{job_data.indexName}_{slice_id}")
)
body = query.copy()
if num_slices > 1:
# Slice queries require max > 1
Expand All @@ -223,7 +243,7 @@ def go( # pylint:disable=invalid-name
job_data.fieldNames,
written_chunks[-1],
reporter,
get_delimiters(),
DelimitersConfig(),
)
reqs.append(res)
results_processed = ray.get(reqs)
Expand All @@ -234,14 +254,24 @@ def go( # pylint:disable=invalid-name
all_chunks = " ".join(written_chunks)

annotation_path = os.path.join(output_dir, outputs.annotation)
ret = subprocess.call(f"cat {all_chunks} > {annotation_path}; rm {all_chunks}", shell=True)
ret = subprocess.call(
f"cat {all_chunks} > {annotation_path}; rm {all_chunks}", shell=True
)
if ret != 0:
raise IOError(f"Failed to write {annotation_path}")

tarball_name = os.path.basename(outputs.archived)
ret = subprocess.call(
f"{GZIP_EXECUTABLE} -d -c {annotation_path} | {stats.stdin_cli_stats_command}",
shell=True,
)
if ret != 0:
raise IOError(f"Failed to write statistics for {annotation_path}")

tarball_name = os.path.basename(outputs.archived)
# Webserver requires the output to have top-level statistics data,
# but annotation data will be too large to want to store 2 copies of
ret = subprocess.call(
f'cd {output_dir}; tar --exclude ".*" --exclude={tarball_name} -cf {tarball_name} * --remove-files', # noqa: E501
f'cd {output_dir}; {GNU_TAR_EXECUTABLE_NAME} --exclude ".*" --exclude={tarball_name} -cf {tarball_name} * && rm {annotation_path}', # noqa: E501
shell=True,
)
if ret != 0:
Expand Down
Loading

0 comments on commit 9a7584a

Please sign in to comment.