Skip to content

Commit

Permalink
Merge branch 'main' into planwindows
Browse files Browse the repository at this point in the history
  • Loading branch information
Cathyhjj authored Jun 5, 2024
2 parents ae23181 + 4206fd3 commit 3bda789
Show file tree
Hide file tree
Showing 15 changed files with 598 additions and 8 deletions.
2 changes: 1 addition & 1 deletion .readthedocs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ formats:
build:
os: ubuntu-22.04
tools:
python: "3.10"
python: "3.9"
# You can also specify other tool versions:
# nodejs: "16"
# rust: "1.55"
Expand Down
19 changes: 12 additions & 7 deletions src/haven/preprocessors.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@
import socket
import warnings
from collections import ChainMap
from typing import Iterable, Sequence, Union
from typing import Sequence, Union # , Iterable

import epics
import pkg_resources
from bluesky.preprocessors import baseline_wrapper as bluesky_baseline_wrapper
from bluesky.preprocessors import finalize_wrapper, msg_mutator
from bluesky.suspenders import SuspendBoolLow

# from bluesky.suspenders import SuspendBoolLow
from bluesky.utils import Msg, make_decorator

from ._iconfig import load_config
Expand Down Expand Up @@ -153,11 +154,15 @@ def shutter_suspend_wrapper(plan, shutter_signals=None):
shutter_signals = [s.pss_state for s in shutters]
# Create a suspender for each shutter
suspenders = []
for sig in shutter_signals:
suspender = SuspendBoolLow(sig, sleep=3.0)
suspenders.append(suspender)
if not isinstance(suspenders, Iterable):
suspenders = [suspenders]

###################################################
# Temporarily disabled for technical commissioning
###################################################
# for sig in shutter_signals:
# suspender = SuspendBoolLow(sig, sleep=3.0)
# suspenders.append(suspender)
# if not isinstance(suspenders, Iterable):
# suspenders = [suspenders]

def _install():
for susp in suspenders:
Expand Down
2 changes: 2 additions & 0 deletions src/haven/tests/test_preprocessors.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from unittest.mock import MagicMock

import pytest
from bluesky import plans as bp
from bluesky.callbacks import CallbackBase
from ophyd.sim import SynAxis, det, instantiate_fake_device
Expand All @@ -9,6 +10,7 @@
from haven.preprocessors import shutter_suspend_decorator, shutter_suspend_wrapper


@pytest.mark.xfail
def test_shutter_suspend_wrapper(aps, shutters, sim_registry):
# Check that the run engine does not have any shutter suspenders
# Currently this test is fragile since we might add non-shutter
Expand Down
166 changes: 166 additions & 0 deletions src/queueserver/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
Queueserver Configuration
=========================

Configuration and setup for running a bluesky-queueserver and related
tools (kafka, redis, mongo_consumer)

Full setup of the spectroscopy group's queueserver stack involves the
following parts

- **redis:** In-memory database holding queueserver history.
- **queueserver:** Runs the bluesky plans.
- **kafka:** Scalable database holds documents produced by queueserver.
- **zookeeper:** Manager for controlling the kafka server.
- **mongo_consumer:** Reads the documents from kafka and stores them in mongoDB.

The command ``haven_config`` is used in several places to retrieve
configuration from a single .toml file. For non-Haven deployment,
replace `haven_config <key>` with the appropriate values.

Systemd Units
-------------

Starting and stopping these services is done through systemd
units. The mongo_consumer is the top-level unit, and so **starting
mongo_consumer.service is enough to start the remaining services**.

1. Copy the contents of ``systemd-units`` into ``~/.config/systemd/user/``
2. Modify the units as described in the sections below.
3. Reload the modified unit files: ``systemctl --user daemon-reload``
4. Start mongo_consumer: ``systemctl --user start mongo_consumer``
5. [Optional] View console output of all services: ``journalctl -xef --user``
6. [Optional] View console output of a specific unit (e.g. kafka): ``journalctl -xef --user --unit=kafka.service``
7. [Optional] Enable mongo_consumer to start on boot: ``systemctl --user enable mongo_consumer``

A useful pattern is to set environmental variables in each systemd
unit file, and read these environmental variables in the various
python and shell scripts. This allows, for example, multiple
queueserver instances to be run with different ZMQ ports.

The systemd unit files **assume** the various bluesky tools are
installed in a micromamba environment named *haven*, and that this
repository is cloned to ``~/src/queueserver``. **Modify the systemd
unit files to use the correct environment and repository location.**

The systemd unit files are also capable of **setting PVs based on the
service's state**. For example, by uncommenting the lines
``ExecStopPost=...`` and ``ExecStartPost=`` in each systemd unit file,
EPICS PVs can be toggled when the units start and stop, possibly
alerting staff or users that the queueserver pipeline is not
functioning.

Multiple QueueServers
---------------------

It is possible to have multiple queueservers running, for example if
multiple branches are operating at a single beamline. In this case,
independent instances of queueserver will be started and will need
unique zeroMQ ports. These ports will be set in ``queueserver.sh`` and
so that file should be modified as needed to read ZMQ ports from
config files or environmental variables. Multiple copies of the
systemd unit (``queueserver.service``) should be made with the
correspoding environmental variables.

Each queueserver will have a unique kafka topic. One kafka server will
run with multiple topics, and one (or more, at scale) mongo_consumer
instances will run with a dictionary of topic <=> mongodb-catalog
mappings.

Redis
-----

Redis should be installed by AES-IT on the target system. The
system-wide systemd unit will not be used since it requires root
access to start and stop. No modification of the systemd unit-file is
necessary.

Queueserver
-----------

1. Install haven in a micromamba environment.
2. Modify ``.config/systemd/user/queueserver.service`` to use the
correct conda environment, and point to the location of this repo
and the correct bluesky directory (if needed).

Kafka/Zookeeper
---------------

Kafka tutorial taken from https://linuxconfig.org/how-to-install-kafka-on-redhat-8

We will run a pre-built binary of Kafka and zookeeper. System-wide
installation from RHEL package repos might be preferred to take
advantage of automatic upgrades.

In this tutorial, the kafka installation will be in ``/local/s25staff/``.

1. ``cd /local/s25staff``
2. Download the latest binary from https://kafka.apache.org/downloads
3. Confirm download integrity (e.g. ``sha512sum kafka_2.13-3.3.1.tgz``) and compare to SHA512 sum on downloads website.
4. Unzip kafka: ``tar -zxvf kafka_2.13-3.3.1.tgz``
5. Create a symbolic link (makes upgrades easier): ``ln -s kafka_2.13-3.3.1 kafka``
6. Modify ``~/.config/systemd/user/zookeeper.service`` to point to the
correct kafka directory, and also the config file in this
repository.
7. Modify ``~/.config/systemd/user/kafka.service`` to point to the
correct kafka directory, and also the config file in this
repository.
8. Reload systemd units: ``systemctl --user daemon-reload``
9. Start kafka and zookeeper: ``systemctl --user start kafka``

To confirm it is running, check the output of ``systemctl --user
status kafka`` and
``journalctl -xef --user --unit=kafka.service``. Possibly also check
the output of ``lsof -i :9092`` to ensure kafka is listening on the
correct port.

Once the kafka server is running, we will **create a topic** that will
be used by both the queueserver and any consumers
(e.g. mongo_consumer). In this tutorial, we will use
"s25idc_queueserver"; **modify as needed**. More than one topic is
allowed; you should have one topic for each instance of
queueserver. These options are suitable for small scale at a single
beamline; more partitions and/or replication may be necessary at
scale.

10. ``cd /local/s25staff/kafka``
11. Create the kafka topic ``./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic s25idc_queueserver``

[Optional] Confirm that messages can be successfully passed around the
Kafka server. We will not use these tools for production, but might be
helpful for troubleshooting.

12. ``cd /local/s25staff/kafka``
13. Launch a consumer to watch the topic: ``./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic s25idc_queueserver``
14. In a second terminal, launch a producer to post messages: ``./kafka-console-producer.sh --broker-list localhost:9092 --topic s25idc_queueserver``
15. Type a message into the producer prompt and verify it appears in the producer.


Mongo Consumer
--------------

Mongo consumer polls the kafka topic and saves the documents to the
mongodb database.

1. Modify ``mongo_consumer.py`` in this repository:

1. Set the correct database URI *mongo_uri*.
2. Modify *topics* to be a list of topics to listen on.
3. Set *topic_database_map* to map kafka topics to mongo database catalogs.

2. Modify ``.config/systemd/user/mongo_consumer.service`` to use the
correct conda environment and point to this source repo.
3. Start mongo_consumer: ``systemctl --user start mongo_consumer``
4. [Optional] Enable mongo_consumer start on boot: ``systemctl --user enable mongo_consumer``

Bluesky Kafka Python Client
---------------------------

To receive queueserver documents from the kafka server in python, use
the bluesky-kafka python library. For example, to print the text to
the console from a client computer:

.. code:: python
from bluesky_kafka import BlueskyConsumer
consumer = BlueskyConsumer(["s25idc_queueserver"], bootstrap_servers="myserver.xray.aps.anl.gov:9092", group_id="print.document.group", process_document=lambda consumer, topic, name, doc: print([name, doc]))
consumer.start()
138 changes: 138 additions & 0 deletions src/queueserver/kafka_server.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#
# This configuration file is intended for use in ZK-based mode, where Apache ZooKeeper is required.
# See kafka.server.KafkaConfig for additional details and defaults
#

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

############################# Socket Server Settings #############################

# The address the socket server listens on. If not configured, the host name will be equal to the value of
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://0.0.0.0:9092

# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://fedorov.xray.aps.anl.gov:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
#log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
Loading

0 comments on commit 3bda789

Please sign in to comment.