diff --git a/phoebus-product/pom.xml b/phoebus-product/pom.xml index ab311a58af..69d1195cef 100644 --- a/phoebus-product/pom.xml +++ b/phoebus-product/pom.xml @@ -1,3 +1,4 @@ + 4.0.0 product diff --git a/services/save-and-restore/src/main/java/org/phoebus/service/saveandrestore/epics/SnapshotUtil.java b/services/save-and-restore/src/main/java/org/phoebus/service/saveandrestore/epics/SnapshotUtil.java index 9257e0817e..7b4cd53765 100644 --- a/services/save-and-restore/src/main/java/org/phoebus/service/saveandrestore/epics/SnapshotUtil.java +++ b/services/save-and-restore/src/main/java/org/phoebus/service/saveandrestore/epics/SnapshotUtil.java @@ -74,38 +74,10 @@ public synchronized List restore(List snapshotItems List cleanedSnapshotItems = cleanSnapshotItems(snapshotItems); List restoreResultList = new ArrayList<>(); - List> callables = new ArrayList<>(); + List callables = new ArrayList<>(); for (SnapshotItem si : cleanedSnapshotItems) { - Callable writePvCallable = () -> { - CountDownLatch countDownLatch = new CountDownLatch(1); - PV pv; - try { - pv = PVPool.getPV(si.getConfigPv().getPvName()); - pv.onValueEvent().throttleLatest(1000, TimeUnit.MILLISECONDS).subscribe(value -> { - if (!PV.isDisconnected(value)) { - pv.write(VTypeHelper.toObject(si.getValue())); - PVPool.releasePV(pv); - } - countDownLatch.countDown(); - }); - if (!countDownLatch.await(connectionTimeout, TimeUnit.MILLISECONDS)) { - RestoreResult restoreResult = new RestoreResult(); - restoreResult.setSnapshotItem(si); - restoreResult.setErrorMsg("No monitor event from PV " + si.getConfigPv().getPvName()); - restoreResultList.add(restoreResult); - } - } catch (Exception e) { - LOG.log(Level.WARNING, "Failed to write to PV " + si.getConfigPv().getPvName(), e); - RestoreResult restoreResult = new RestoreResult(); - restoreResult.setSnapshotItem(si); - restoreResult.setErrorMsg(e.getMessage()); - restoreResultList.add(restoreResult); - countDownLatch.countDown(); - } - - return null; - }; - callables.add(writePvCallable); + RestoreCallable restoreCallable = new RestoreCallable(si, restoreResultList); + callables.add(restoreCallable); } try { @@ -114,6 +86,8 @@ public synchronized List restore(List snapshotItems LOG.log(Level.WARNING, "Got exception waiting for all tasks to finish", e); // Return empty list here? return Collections.emptyList(); + } finally { + callables.forEach(RestoreCallable::release); } return restoreResultList; @@ -225,4 +199,55 @@ public List takeSnapshot(ConfigurationData configurationData) { private List cleanSnapshotItems(List snapshotItems) { return snapshotItems.stream().filter(si -> !si.getConfigPv().isReadOnly()).toList(); } + + /** + * Wraps PV functionality such that client code may release PV back to the pool once + * write/restore operation has succeeded (or failed). + */ + private class RestoreCallable implements Callable { + + private final List restoreResultList; + private PV pv; + private final SnapshotItem snapshotItem; + + public RestoreCallable(SnapshotItem snapshotItem, List restoreResultList) { + this.snapshotItem = snapshotItem; + this.restoreResultList = restoreResultList; + } + + @Override + public Void call() { + CountDownLatch countDownLatch = new CountDownLatch(1); + try { + pv = PVPool.getPV(snapshotItem.getConfigPv().getPvName()); + pv.onValueEvent().subscribe(value -> { + if (!PV.isDisconnected(value)) { + countDownLatch.countDown(); + } + }); + if (!countDownLatch.await(connectionTimeout, TimeUnit.MILLISECONDS)) { + RestoreResult restoreResult = new RestoreResult(); + restoreResult.setSnapshotItem(snapshotItem); + restoreResult.setErrorMsg("No monitor event from PV " + snapshotItem.getConfigPv().getPvName()); + restoreResultList.add(restoreResult); + } else { + pv.write(VTypeHelper.toObject(snapshotItem.getValue())); + } + } catch (Exception e) { + LOG.log(Level.WARNING, "Failed to write to PV " + snapshotItem.getConfigPv().getPvName(), e); + RestoreResult restoreResult = new RestoreResult(); + restoreResult.setSnapshotItem(snapshotItem); + restoreResult.setErrorMsg(e.getMessage()); + restoreResultList.add(restoreResult); + countDownLatch.countDown(); + } + return null; + } + + public void release() { + if (pv != null) { + PVPool.releasePV(pv); + } + } + } }