diff --git a/app/save-and-restore/model/src/main/java/org/phoebus/applications/saveandrestore/model/SaveAndRestorePv.java b/app/save-and-restore/model/src/main/java/org/phoebus/applications/saveandrestore/model/SaveAndRestorePv.java deleted file mode 100644 index 06c053ed21..0000000000 --- a/app/save-and-restore/model/src/main/java/org/phoebus/applications/saveandrestore/model/SaveAndRestorePv.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright (C) 2020 European Spallation Source ERIC. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. - * - */ - -package org.phoebus.applications.saveandrestore.model; - -import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import com.fasterxml.jackson.databind.annotation.JsonSerialize; -import org.epics.vtype.VType; -import org.phoebus.applications.saveandrestore.model.json.VTypeDeserializer; -import org.phoebus.applications.saveandrestore.model.json.VTypeSerializer; - -public class SaveAndRestorePv { - - private String pvName; - - @JsonSerialize(using = VTypeSerializer.class) - @JsonDeserialize(using = VTypeDeserializer.class) - private VType value; - - public String getPvName() { - return pvName; - } - - public void setPvName(String pvName) { - this.pvName = pvName; - } - - public VType getValue() { - return value; - } - - public void setValue(VType value) { - this.value = value; - } - - public static Builder builder(){ - return new Builder(); - } - - public static class Builder{ - private SaveAndRestorePv saveAndRestorePv; - - private Builder(){ - saveAndRestorePv = new SaveAndRestorePv(); - } - - public Builder pvName(String pvName){ - saveAndRestorePv.setPvName(pvName); - return this; - } - - public Builder value(VType value){ - saveAndRestorePv.setValue(value); - return this; - } - - public SaveAndRestorePv build(){ - return saveAndRestorePv; - } - } -} diff --git a/services/save-and-restore/src/main/java/org/phoebus/service/saveandrestore/epics/SnapshotRestorer.java b/services/save-and-restore/src/main/java/org/phoebus/service/saveandrestore/epics/SnapshotRestorer.java index 41c0f62c51..3968a5e9b3 100644 --- a/services/save-and-restore/src/main/java/org/phoebus/service/saveandrestore/epics/SnapshotRestorer.java +++ b/services/save-and-restore/src/main/java/org/phoebus/service/saveandrestore/epics/SnapshotRestorer.java @@ -6,19 +6,35 @@ import org.phoebus.framework.preferences.PropertyPreferenceLoader; import org.phoebus.pv.PV; import org.phoebus.pv.PVPool; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import java.io.File; import java.io.FileInputStream; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; -import java.util.concurrent.CompletableFuture; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; -import java.util.stream.Collectors; public class SnapshotRestorer { private final Logger LOG = Logger.getLogger(SnapshotRestorer.class.getName()); + @Value("${connection.timeout:5000}") + private int connectionTimeout; + + @Value("${write.timeout:5000}") + private int writeTimeout; + + @Autowired + private ExecutorService executorService; + public SnapshotRestorer() { final File site_settings = new File("settings.ini"); if (site_settings.canRead()) { @@ -44,48 +60,96 @@ public SnapshotRestorer() { * @param snapshotItems {@link SnapshotItem} */ public synchronized List restorePVValues(List snapshotItems) { + // Attempt to connect to all PVs before trying to write/restore. + List connectedPvs = connectPVs(snapshotItems); + List failedPvs = new ArrayList<>(); + final CountDownLatch countDownLatch = new CountDownLatch(snapshotItems.size()); + snapshotItems.forEach(item -> { + String pvName = item.getConfigPv().getPvName(); + Optional pvOptional = null; + try { + // Check if PV is connected. If not, do not even try to write/restore. + pvOptional = connectedPvs.stream().filter(pv -> pv.getName().equals(pvName)).findFirst(); + if (pvOptional.isPresent()) { + pvOptional.get().write(VTypeHelper.toObject(item.getValue())); + } else { + RestoreResult restoreResult = new RestoreResult(); + restoreResult.setSnapshotItem(item); + restoreResult.setErrorMsg("PV disconnected"); + failedPvs.add(restoreResult); + } + } catch (Exception e) { + LOG.log(Level.WARNING, "Failed to restore PV " + pvName); + RestoreResult restoreResult = new RestoreResult(); + restoreResult.setSnapshotItem(item); + restoreResult.setErrorMsg(e.getMessage()); + failedPvs.add(restoreResult); + } finally { + if (pvOptional != null && pvOptional.isPresent()) { + PVPool.releasePV(pvOptional.get()); + } + countDownLatch.countDown(); + } + }); + + try { + countDownLatch.await(writeTimeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOG.log(Level.INFO, "Encountered InterruptedException", e); + } + + return failedPvs; + } - var futures = snapshotItems.stream().filter( - (snapshot_item) -> snapshot_item.getConfigPv().getPvName() != null) - .map((snapshotItem) -> { - var pvName = snapshotItem.getConfigPv().getPvName(); - var pvValue = snapshotItem.getValue(); - Object rawValue = VTypeHelper.toObject(pvValue); - PV pv; - CompletableFuture future; - try { - pv = PVPool.getPV(pvName); - future = pv.asyncWrite(rawValue); - } catch (Exception e) { - var restoreResult = new RestoreResult(); - var errorMsg = e.getMessage(); - restoreResult.setSnapshotItem(snapshotItem); - restoreResult.setErrorMsg(errorMsg); - LOG.warning(String.format("Error writing to channel %s %s", pvName, errorMsg)); - return CompletableFuture.completedFuture(restoreResult); - } - return future.handle((result, ex) -> { - String errorMsg; - if (ex != null) { - errorMsg = ex.getMessage(); - LOG.warning(String.format("Error writing to channel %s %s", pvName, errorMsg)); - } else { - errorMsg = null; + /** + * Attempts to connect to all PVs using {@link PVPool}. A connection is considered successful once an + * event is received that does not indicate disconnection. + * + *

+ * A timeout of {@link #connectionTimeout} ms is used to wait for a PV to supply a value message indicating + * successful connection. + *

+ *

+ * An {@link ExecutorService} is used to run connection attempts concurrently. However, no timeout is employed + * for the overall execution of all connection attempts. + *

+ * + * @param snapshotItems List of {@link SnapshotItem}s in a snapshot. + * @return A {@link List} of {@link PV}s for which connection succeeded. Ideally this should be all + * PVs as listed in the input argument. + */ + private List connectPVs(List snapshotItems) { + List connectedPvs = new ArrayList<>(); + + List> callables = snapshotItems.stream().map(snapshotItem -> { + return (Callable) () -> { + CountDownLatch countDownLatch = new CountDownLatch(1); + try { + PV pv = PVPool.getPV(snapshotItem.getConfigPv().getPvName()); + pv.onValueEvent().subscribe(value -> { + if (!PV.isDisconnected(value)) { + connectedPvs.add(pv); + countDownLatch.countDown(); } - var restoreResult = new RestoreResult(); - restoreResult.setSnapshotItem(snapshotItem); - restoreResult.setErrorMsg(errorMsg); - return restoreResult; }); - }) - .toList(); + // Wait for a value message indicating connection + countDownLatch.await(connectionTimeout, TimeUnit.MILLISECONDS); + } catch (Exception e) { + LOG.log(Level.WARNING, "Failed to connect to PV " + snapshotItem.getConfigPv().getPvName(), e); + countDownLatch.countDown(); + } + return null; + }; + }).toList(); - CompletableFuture all_done = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); - - // Wait on the futures concurrently - all_done.join(); + try { + executorService.invokeAll(callables); + } catch (InterruptedException e) { + LOG.log(Level.WARNING, "Got exception waiting for all tasks to finish", e); + // Return empty list here? + return Collections.emptyList(); + } - // Joins should not block as all the futures should be completed. - return futures.stream().map(CompletableFuture::join).collect(Collectors.toList()); + return connectedPvs; } } diff --git a/services/save-and-restore/src/main/java/org/phoebus/service/saveandrestore/web/config/WebConfiguration.java b/services/save-and-restore/src/main/java/org/phoebus/service/saveandrestore/web/config/WebConfiguration.java index 358721fd70..48087c22fa 100644 --- a/services/save-and-restore/src/main/java/org/phoebus/service/saveandrestore/web/config/WebConfiguration.java +++ b/services/save-and-restore/src/main/java/org/phoebus/service/saveandrestore/web/config/WebConfiguration.java @@ -24,6 +24,10 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; + /** * {@link Configuration} class setting up beans for {@link org.springframework.stereotype.Controller} classes. */ @@ -56,4 +60,9 @@ public AcceptHeaderResolver acceptHeaderResolver() { public SnapshotRestorer snapshotRestorer(){ return new SnapshotRestorer(); } + + @Bean + public ExecutorService executorService(){ + return Executors.newCachedThreadPool(); + } } diff --git a/services/save-and-restore/src/main/resources/application.properties b/services/save-and-restore/src/main/resources/application.properties index 525552efcb..6ad52e731e 100644 --- a/services/save-and-restore/src/main/resources/application.properties +++ b/services/save-and-restore/src/main/resources/application.properties @@ -92,3 +92,6 @@ authorization.permitall = true role.user=sar-user role.admin=sar-admin +############## EPICS related ################# +connection.timeout=5000 +write.timout=5000