Skip to content

Commit

Permalink
Server-side restore improvements, client-side UI improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
georgweiss committed May 17, 2024
1 parent 9e59351 commit 779a76d
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 117 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,35 @@
import org.phoebus.framework.preferences.PropertyPreferenceLoader;
import org.phoebus.pv.PV;
import org.phoebus.pv.PVPool;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;

import java.io.File;
import java.io.FileInputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;

public class SnapshotRestorer {

private final Logger LOG = Logger.getLogger(SnapshotRestorer.class.getName());

@Value("${connection.timeout:5000}")
private int connectionTimeout;

@Value("${write.timeout:5000}")
private int writeTimeout;

@Autowired
private ExecutorService executorService;

public SnapshotRestorer() {
final File site_settings = new File("settings.ini");
if (site_settings.canRead()) {
Expand All @@ -44,48 +60,96 @@ public SnapshotRestorer() {
* @param snapshotItems {@link SnapshotItem}
*/
public synchronized List<RestoreResult> restorePVValues(List<SnapshotItem> snapshotItems) {
// Attempt to connect to all PVs before trying to write/restore.
List<PV> connectedPvs = connectPVs(snapshotItems);
List<RestoreResult> failedPvs = new ArrayList<>();
final CountDownLatch countDownLatch = new CountDownLatch(snapshotItems.size());
snapshotItems.forEach(item -> {
String pvName = item.getConfigPv().getPvName();
Optional<PV> pvOptional = null;
try {
// Check if PV is connected. If not, do not even try to write/restore.
pvOptional = connectedPvs.stream().filter(pv -> pv.getName().equals(pvName)).findFirst();
if (pvOptional.isPresent()) {
pvOptional.get().write(VTypeHelper.toObject(item.getValue()));
} else {
RestoreResult restoreResult = new RestoreResult();
restoreResult.setSnapshotItem(item);
restoreResult.setErrorMsg("PV disconnected");
failedPvs.add(restoreResult);
}
} catch (Exception e) {
LOG.log(Level.WARNING, "Failed to restore PV " + pvName);
RestoreResult restoreResult = new RestoreResult();
restoreResult.setSnapshotItem(item);
restoreResult.setErrorMsg(e.getMessage());
failedPvs.add(restoreResult);
} finally {
if (pvOptional != null && pvOptional.isPresent()) {
PVPool.releasePV(pvOptional.get());
}
countDownLatch.countDown();
}
});

try {
countDownLatch.await(writeTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.log(Level.INFO, "Encountered InterruptedException", e);
}

return failedPvs;
}

var futures = snapshotItems.stream().filter(
(snapshot_item) -> snapshot_item.getConfigPv().getPvName() != null)
.map((snapshotItem) -> {
var pvName = snapshotItem.getConfigPv().getPvName();
var pvValue = snapshotItem.getValue();
Object rawValue = VTypeHelper.toObject(pvValue);
PV pv;
CompletableFuture<?> future;
try {
pv = PVPool.getPV(pvName);
future = pv.asyncWrite(rawValue);
} catch (Exception e) {
var restoreResult = new RestoreResult();
var errorMsg = e.getMessage();
restoreResult.setSnapshotItem(snapshotItem);
restoreResult.setErrorMsg(errorMsg);
LOG.warning(String.format("Error writing to channel %s %s", pvName, errorMsg));
return CompletableFuture.completedFuture(restoreResult);
}
return future.handle((result, ex) -> {
String errorMsg;
if (ex != null) {
errorMsg = ex.getMessage();
LOG.warning(String.format("Error writing to channel %s %s", pvName, errorMsg));
} else {
errorMsg = null;
/**
* Attempts to connect to all PVs using {@link PVPool}. A connection is considered successful once an
* event is received that does not indicate disconnection.
*
* <p>
* A timeout of {@link #connectionTimeout} ms is used to wait for a PV to supply a value message indicating
* successful connection.
* </p>
* <p>
* An {@link ExecutorService} is used to run connection attempts concurrently. However, no timeout is employed
* for the overall execution of all connection attempts.
* </p>
*
* @param snapshotItems List of {@link SnapshotItem}s in a snapshot.
* @return A {@link List} of {@link PV}s for which connection succeeded. Ideally this should be all
* PVs as listed in the input argument.
*/
private List<PV> connectPVs(List<SnapshotItem> snapshotItems) {
List<PV> connectedPvs = new ArrayList<>();

List<? extends Callable<Void>> callables = snapshotItems.stream().map(snapshotItem -> {
return (Callable<Void>) () -> {
CountDownLatch countDownLatch = new CountDownLatch(1);
try {
PV pv = PVPool.getPV(snapshotItem.getConfigPv().getPvName());
pv.onValueEvent().subscribe(value -> {
if (!PV.isDisconnected(value)) {
connectedPvs.add(pv);
countDownLatch.countDown();
}
var restoreResult = new RestoreResult();
restoreResult.setSnapshotItem(snapshotItem);
restoreResult.setErrorMsg(errorMsg);
return restoreResult;
});
})
.toList();
// Wait for a value message indicating connection
countDownLatch.await(connectionTimeout, TimeUnit.MILLISECONDS);
} catch (Exception e) {
LOG.log(Level.WARNING, "Failed to connect to PV " + snapshotItem.getConfigPv().getPvName(), e);
countDownLatch.countDown();
}
return null;
};
}).toList();

CompletableFuture<Void> all_done = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

// Wait on the futures concurrently
all_done.join();
try {
executorService.invokeAll(callables);
} catch (InterruptedException e) {
LOG.log(Level.WARNING, "Got exception waiting for all tasks to finish", e);
// Return empty list here?
return Collections.emptyList();
}

// Joins should not block as all the futures should be completed.
return futures.stream().map(CompletableFuture::join).collect(Collectors.toList());
return connectedPvs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

/**
* {@link Configuration} class setting up beans for {@link org.springframework.stereotype.Controller} classes.
*/
Expand Down Expand Up @@ -56,4 +60,9 @@ public AcceptHeaderResolver acceptHeaderResolver() {
public SnapshotRestorer snapshotRestorer(){
return new SnapshotRestorer();
}

@Bean
public ExecutorService executorService(){
return Executors.newCachedThreadPool();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,6 @@ authorization.permitall = true
role.user=sar-user
role.admin=sar-admin

############## EPICS related #################
connection.timeout=5000
write.timout=5000

0 comments on commit 779a76d

Please sign in to comment.