Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable rebuilding scalers in post-processing engine #322

Merged
merged 24 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions bin/run-clara
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
#!/bin/bash -f

ulimit -u 49152 >& /dev/null
export JAVA_OPTS="${JAVA_OPTS} -XX:+IgnoreUnrecognizedVMOptions"

usage="Usage: run-clara [-i IN] [-o OUT] [-c CLARA] [-t #] [-n #] YAML"
info='\nOptions:\n
YAML - path to CLARA YAML steering file\n
-i input HIPO file or directory of *.hipo files (default=.)\n
-o output directory (default=.)\n
-c CLARA installation (default=$CLARA_HOME)\n
-t number of threads (default=2)\n
-n number of events (default=-1)\n\n
Defaults will use $CLARA_HOME to read all *.hipo files in $PWD,\n
with all output written to $PWD.'

function error() {
echo -e "\n$usage\n\nERROR: $@." && exit 1
}
function abspath() {
[ -d $1 ] && echo $(cd $1 && pwd) && return 0
[ -r $1 ] && echo $(cd $(dirname $1) && pwd)/$(basename $1) && return 0
return 1
}
function get_host_ip() {
if command -v ip >/dev/null 2>&1
then
ip route get 1 | awk '{print $7; exit}' && return 0
elif command -v ifconfig >/dev/null 2>&1
then
while IFS=$': \t' read -r -a line
do
if [ -z "${line%inet}" ]
then
ip=${line[${#line[1]}>4?1:2]}
[ "${ip#127.0.0.1}" ]
echo $ip && return 0
fi
done< <(LANG=C ifconfig)
fi
return 1
}
function get_dpe_port() {
local ports
ports=$(seq 7000 20 8000)
command -v shuf >/dev/null 2>&1 && ports=$(echo "$ports" | shuf)
for port in $ports
do
local ctrl_port=$((port + 2))
if ! eval "exec 6<>/dev/tcp/127.0.0.1/$ctrl_port" 2> /dev/null
then
echo $port
return 0
fi
done
return 1
}

set -e

# Check user command-line options:
input=.
output=.
threads=2
while getopts i:o:c:t:n:h opt
do
case $opt in
i) input=$OPTARG ;;
o) output=$OPTARG ;;
c) CLARA_HOME=$OPTARG ;;
t) threads=$OPTARG && echo $threads | grep -q -E '^[0-9]+$' || error "-t must be an integer, threads" ;;
n) nevents="-e $OPTARG" && echo $nevents | grep -q -E '^-e [0-9]+$' || error "-n must be an integer, events" ;;
h) echo -e $usage && echo -e $info && exit 0 ;;
esac
done

shift $((OPTIND-1))
[ $# -gt 1 ] && error "Extra arguments: ${@:2} (options must come before positionals)"
[ $# -lt 1 ] && error "YAML file argument is required"
[ -r $1 ] && yaml=$1 || error "YAML file does not exist: $yaml"
[ -z ${CLARA_HOME+x} ] && error "-c must be specified or \$CLARA_HOME set"
[ -d $CLARA_HOME ] || error "Invalid CLARA_HOME: $CLARA_HOME"
[ $threads -eq 0 ] && threads=`grep -c ^processor /proc/cpuinfo`
test -e $input && test -r $input || error "Invalid inputs -i: $input"

# Create the environment variables and directories required by CLARA:
[ -e $output ] && echo "WARNING: Using existing directory: $output."
mkdir -p -v $output || error "Cannot create -o output directory: $output"
mkdir -p $output/log $output/config $output/data/output
export CLARA_USER_DATA=$output
unset CLARA_MONITOR_FE

# Normalize all paths:
input=$(abspath $input)
output=$(abspath $output)
yaml=$(abspath $yaml)
export CLARA_HOME=$(abspath $CLARA_HOME)
export CLAS12DIR=$CLARA_HOME/plugins/clas12

# Generate the file for CLARA containing a file list (of relative paths, not absolute):
test -d $input && find $input -maxdepth 1 -name '*.hipo' -exec basename {} \; > $CLARA_USER_DATA/filelist.txt
test -f $input && echo $(basename $input) > $CLARA_USER_DATA/filelist.txt
[ $(cat $CLARA_USER_DATA/filelist.txt | wc -l) -gt 0 ] || error "Found no input files."

# Finally, run CLARA:
[ -f $input ] || [ -h $input ] && input=$(dirname $input)
if [ $(uname) == "Darwin" ]
then
ip=$(get_host_ip) || error "Unknown IP address"
port=$(get_dpe_port) || error "Unknown DPE port"
$CLARA_HOME/bin/j_dpe \
--host $ip --port $port \
--session recon --max-cores $threads \
--max-sockets 5120 --report 5 \
2>&1 | tee $CLARA_USER_DATA/log/dpe.log &
echo "Sleeping 7 ......." && sleep 7
unset JAVA_OPTS
$CLARA_HOME/bin/clara-orchestrator \
-F -f ${ip}%${port}_java -s recon \
-i $input -o $output -z rec_ \
-p $threads -t $threads \
$yaml $CLARA_USER_DATA/filelist.txt
else
$CLARA_HOME/lib/clara/run-clara \
-i $input \
-o $CLARA_USER_DATA \
-z rec_ \
-x $CLARA_USER_DATA/log \
-t $threads \
$nevents \
-s recon \
$yaml $CLARA_USER_DATA/filelist.txt
fi

Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
*/
public class Processor {

static final Logger logger = Logger.getLogger(Util.class.getName());

public static final String CCDB_TABLES[] = {"/runcontrol/fcup","/runcontrol/slm",
"/runcontrol/helicity","/daq/config/scalers/dsc1","/runcontrol/hwp"};
public static final String DEF_PRELOAD_GLOB = "*.{hipo,h5}";
Expand All @@ -45,32 +43,29 @@ public class Processor {
private DaqScalersSequence chargeSequence = null;
private HelicitySequenceDelayed helicitySequence = null;

public Processor(File file, boolean restream) {
configure(restream, Arrays.asList(file.getAbsolutePath()));
public Processor(File file, boolean restream, boolean rebuild) {
configure(Arrays.asList(file.getAbsolutePath()), restream, rebuild);
}

public Processor(String dir, boolean restream) {
configure(restream, findPreloadFiles(dir,DEF_PRELOAD_GLOB));
public Processor(String dir, boolean restream, boolean rebuild) {
configure(findPreloadFiles(dir,DEF_PRELOAD_GLOB), restream, rebuild);
}

public Processor(String dir, String glob, boolean restream) {
configure(restream, findPreloadFiles(dir,glob));
public Processor(String dir, String glob, boolean restream, boolean rebuild) {
configure(findPreloadFiles(dir,glob), restream, rebuild);
}

private void configure(boolean restream, List<String> preloadFiles) {
if (preloadFiles.isEmpty()) {
logger.warning("<<<< No preload files found, postprocessing disabled.");
initialized = false;
} else {
private void configure(List<String> preloadFiles, boolean restream, boolean rebuild) {
if (!preloadFiles.isEmpty()) {
HipoReader r = new HipoReader();
r.open(preloadFiles.get(0));
conman = new ConstantsManager();
conman.init(CCDB_TABLES);
schemaFactory = r.getSchemaFactory();
helicitySequence = Util.getHelicity(preloadFiles, schemaFactory, restream, conman);
chargeSequence = DaqScalersSequence.readSequence(preloadFiles);
if (rebuild) chargeSequence = DaqScalersSequence.rebuildSequence(1, conman, preloadFiles);
else chargeSequence = DaqScalersSequence.readSequence(preloadFiles);
r.close();
initialized = true;
}
}

Expand Down Expand Up @@ -223,17 +218,9 @@ private static void replace(Map<String,String> files) {
}
}

/**
* Replace preload files with rebuilt ones.
*/
private void rebuildAndReplace(List<String> preloadFiles) {
replace(rebuild(".",preloadFiles));
}

public static void main(String args[]) {
DefaultLogger.debug();
Processor p = new Processor(System.getenv("HOME")+"/tmp","r*.hipo",false);
//p.rebuildAndReplace();
Processor p = new Processor(System.getenv("HOME")+"/tmp","r*.hipo",false,false);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ public void initialize(HipoReader reader) {
}

public void initialize(List<String> filenames) {
LOGGER.info("HelicitySequence: Reading sequence from "+String.join(",",filenames));
for (String filename : filenames) {
HipoReader reader = new HipoReader();
reader.setTags(1);
Expand Down Expand Up @@ -526,7 +527,14 @@ else if (!s.equals(this.states.get(this.states.size()-1))) {
this.integrityCheck();
}

/**
*
* @param schema
* @param conman
* @param filenames
*/
public void addStream(SchemaFactory schema, ConstantsManager conman, List<String> filenames) {
LOGGER.info("HelicitySequence: Restreaming sequence from "+String.join(",",filenames));
Bank runConfigBank = new Bank(schema.getSchema("RUN::config"));
Bank helAdcBank = new Bank(schema.getSchema("HEL::adc"));
TreeSet<HelicityState> stream = new TreeSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ public class DaqScalers {
public StruckScalers struck=null;

private long timestamp=0;
public void setTimestamp(long timestamp) { this.timestamp=timestamp; }
public DaqScalers setTimestamp(long timestamp) {
this.timestamp=timestamp;
return this;
}
public long getTimestamp(){ return this.timestamp; }

/**
Expand Down Expand Up @@ -137,6 +140,37 @@ public static DaqScalers create(Bank rawScalerBank,IndexedTable fcupTable,Indexe
return DaqScalers.create(rawScalerBank,fcupTable,slmTable,helTable,dsc2.getGatedClockSeconds());
}

/**
*
* @param conman
* @param runConfig
* @param rawScaler
* @return
*/
public static DaqScalers create(ConstantsManager conman, Bank runConfig, Bank rawScaler) {
int run = runConfig.getInt("run", 0);
IndexedTable fcup = conman.getConstants(run, "/runcontrol/fcup");
IndexedTable slm = conman.getConstants(run, "/runcontrol/slm");
IndexedTable hel = conman.getConstants(run, "/runcontrol/helicity");
IndexedTable dsc = conman.getConstants(run, "/daq/config/scalers/dsc1");
if (fcup!=null && dsc!=null) {
if (dsc.getIntValue("frequency",0,0,0) < Dsc2Scaler.MAX_DSC2_CLOCK_FREQ) {
return DaqScalers.create(rawScaler, fcup, slm, hel, dsc)
.setTimestamp(runConfig.getLong("timestamp", 0));
}
else {
try {
Time rst = (Time)conman.getRcdbConstant(run,"run_start_time").getValue();
Date uet = new Date(runConfig.getInt("unixtime",0)*1000L);
return DaqScalers.create(rawScaler, fcup, slm, hel, rst, uet)
.setTimestamp(runConfig.getLong("timestamp", 0));
}
catch (Exception e) {}
}
}
return null;
}

/**
* @param schema bank schema
* @return RUN::scaler banks
Expand Down Expand Up @@ -253,7 +287,7 @@ public static List<Bank> createBanks(int runnumber, SchemaFactory schema, Event
IndexedTable hel = conman.getConstants(runnumber, "/runcontrol/helicity");
IndexedTable dsc = conman.getConstants(runnumber, "/daq/config/scalers/dsc1");

if (dsc.getIntValue("frequency", 0,0,0) < 2e5) {
if (dsc.getIntValue("frequency", 0,0,0) < Dsc2Scaler.MAX_DSC2_CLOCK_FREQ) {
ret.addAll(createBanks(schema,rawScaler,fcup, slm, hel, dsc));
}
else {
Expand All @@ -274,5 +308,6 @@ public static List<Bank> createBanks(int runnumber, SchemaFactory schema, Event

return ret;
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.logging.Logger;
import org.jlab.detector.calib.utils.ConstantsManager;

import org.jlab.jnp.hipo4.io.HipoReader;
import org.jlab.jnp.hipo4.data.Event;
Expand All @@ -27,6 +29,8 @@ public class DaqScalersSequence implements Comparator<DaqScalers> {

private Bank rcfgBank=null;

static final Logger logger = Logger.getLogger(DaqScalersSequence.class.getName());

public static class Interval {
private DaqScalers previous = null;
private DaqScalers next = null;
Expand Down Expand Up @@ -187,6 +191,7 @@ public Interval getInterval(Event event1, Event event2) {
* @return sequence
*/
public static DaqScalersSequence readSequence(List<String> filenames) {
logger.info("DaqScalersSequence:: Reading scaler sequence from "+String.join(",", filenames));

DaqScalersSequence seq=new DaqScalersSequence();

Expand All @@ -201,13 +206,12 @@ public static DaqScalersSequence readSequence(List<String> filenames) {
}

SchemaFactory schema = reader.getSchemaFactory();

Event event=new Event();
Bank scalerBank=new Bank(schema.getSchema("RUN::scaler"));
Bank configBank=new Bank(schema.getSchema("RUN::config"));

while (reader.hasNext()) {

Event event=new Event();
Bank scalerBank=new Bank(schema.getSchema("RUN::scaler"));
Bank configBank=new Bank(schema.getSchema("RUN::config"));

reader.nextEvent(event);
event.read(scalerBank);
event.read(configBank);
Expand All @@ -229,6 +233,38 @@ public static DaqScalersSequence readSequence(List<String> filenames) {

return seq;
}

/**
*
* @param tags
* @param conman
* @param filenames
* @return
*/
public static DaqScalersSequence rebuildSequence(int tags, ConstantsManager conman, List<String> filenames) {
logger.info("DaqScalersSequence:: Rebuilding scaler sequence from "+String.join(",", filenames));
DaqScalersSequence seq=new DaqScalersSequence();
for (String filename : filenames) {
HipoReader reader = new HipoReader();
reader.setTags(tags);
reader.open(filename);
SchemaFactory schema = reader.getSchemaFactory();
if (seq.rcfgBank==null)
seq.rcfgBank = new Bank(schema.getSchema("RUN::config"));
while (reader.hasNext()) {
Event event=new Event();
Bank scaler=new Bank(schema.getSchema("RAW::scaler"));
Bank config=new Bank(schema.getSchema("RUN::config"));
reader.nextEvent(event);
event.read(scaler);
event.read(config);
if (scaler.getRows()<1 || config.getRows()<1) continue;
seq.add(DaqScalers.create(conman, config, scaler));
}
reader.close();
}
return seq;
}

public static void main(String[] args) {

Expand Down Expand Up @@ -288,4 +324,4 @@ public static void main(String[] args) {

}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
public class Dsc2Scaler extends DaqScaler{

public static final long MAX_DSC2_CLOCK_FREQ = (long)2e5;

private static final boolean GATEINVERTED=true;

// DSC has TRG and TDC thresholds, we use only TDC here:
Expand Down
Loading