Skip to content

Commit

Permalink
CN harvesting is missing some pids #267
Browse files Browse the repository at this point in the history
  • Loading branch information
gothub committed Sep 2, 2020
1 parent 5bfd7a7 commit bc9b37a
Show file tree
Hide file tree
Showing 10 changed files with 682 additions and 163 deletions.
16 changes: 12 additions & 4 deletions src/main/java/edu/ucsb/nceas/mdqengine/model/Task.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package edu.ucsb.nceas.mdqengine.model;

import java.util.HashMap;

public class Task {

private String taskName;
private String taskType;
private String lastHarvestDatetime;
private HashMap<String, String> lastHarvestDatetimes = new HashMap<>();

public void setTaskName(String name) {
this.taskName = name;
Expand All @@ -18,10 +20,16 @@ public String getTaskName() {

public String getTaskType() { return taskType; }

public void setLastHarvestDatetime(String lastHarvestDatetime) {
this.lastHarvestDatetime = lastHarvestDatetime;
public void setLastHarvestDatetimes(HashMap<String, String> lastHarvestDatetimes) {
this.lastHarvestDatetimes = lastHarvestDatetimes;
}

public void setLastHarvestDatetime(String lastHarvestDatetime, String nodeId) {
this.lastHarvestDatetimes.put(nodeId, lastHarvestDatetime);
}

public String getLastHarvestDatetime() { return lastHarvestDatetime; }
public String getLastHarvestDatetime(String nodeId) {
return this.lastHarvestDatetimes.get(nodeId);
}

}
17 changes: 17 additions & 0 deletions src/main/java/edu/ucsb/nceas/mdqengine/scheduler/JobScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,16 @@ public static void main(String[] argv) throws Exception {
log.debug("fileExcludeMatch: " + fileExcludeMatch);
logFile = splitted[++icnt].trim();
log.debug("log file: " + logFile);
} else if (taskType.equals("nodelist")) {
log.debug("Scheduling nodelist update from DataONE, task name: " + taskName + ", task group: " + taskGroup);
String[] splitted = Arrays.stream(params.split(";"))
.map(String::trim)
.toArray(String[]::new);

int icnt = -1;
log.debug("Split length: " + splitted.length);
nodeId = splitted[++icnt].trim();
log.debug("nodeId: " + nodeId);
}

try {
Expand Down Expand Up @@ -221,6 +231,13 @@ public static void main(String[] argv) throws Exception {
.usingJobData("fileExcludeMatch", fileExcludeMatch)
.usingJobData("logFile", logFile)
.build();
} else if (taskType.equalsIgnoreCase("nodelist")) {
job = newJob(NodeList.class)
.withIdentity(taskName, taskGroup)
.usingJobData("taskName", taskName)
.usingJobData("taskType", taskType)
.usingJobData("nodeId", nodeId)
.build();
}

CronTrigger trigger = newTrigger()
Expand Down
168 changes: 168 additions & 0 deletions src/main/java/edu/ucsb/nceas/mdqengine/scheduler/NodeList.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package edu.ucsb.nceas.mdqengine.scheduler;

import edu.ucsb.nceas.mdqengine.DataONE;
import edu.ucsb.nceas.mdqengine.MDQconfig;
import edu.ucsb.nceas.mdqengine.exception.MetadigStoreException;
import edu.ucsb.nceas.mdqengine.store.DatabaseStore;
import edu.ucsb.nceas.mdqengine.store.MDQStore;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.dataone.client.rest.HttpMultipartRestClient;
import org.dataone.client.rest.MultipartRestClient;
import org.dataone.client.v2.impl.MultipartCNode;
import org.dataone.service.exceptions.NotImplemented;
import org.dataone.service.exceptions.ServiceFailure;
import org.dataone.service.types.v1.*;
import org.dataone.service.types.v2.Node;
import org.dataone.service.types.v2.Property;
import org.quartz.*;

import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.TimeZone;

/**
* <p>
* Run a MetaDIG Quality Engine Scheduler task, for example,
* query a member node for new pids and request that a quality
* report is created for each one.
* </p>
*
* @author Peter Slaughter
*/
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
public class NodeList implements Job {

private Log log = LogFactory.getLog(NodeList.class);

// Since Quartz will re-instantiate a class every time it
// gets executed, non-static member variables can
// not be used to maintain state!

/**
* <p>
* Called by the <code>{@link org.quartz.Scheduler}</code> when a
* <code>{@link org.quartz.Trigger}</code> fires that is associated with
* the <code>Job</code>.
* </p>
*
* @throws JobExecutionException if there is an exception while executing the job.
*/
public void execute(JobExecutionContext context)
throws JobExecutionException {

Log log = LogFactory.getLog(NodeList.class);
JobKey key = context.getJobDetail().getKey();
JobDataMap dataMap = context.getJobDetail().getJobDataMap();

String taskName = dataMap.getString("taskName");
String taskType = dataMap.getString("taskType");
String nodeId = dataMap.getString("nodeId");
MultipartRestClient mrc = null;
MultipartCNode cnNode = null;

String nodeServiceUrl = null;

try {
MDQconfig cfg = new MDQconfig();
String nodeAbbr = nodeId.replace("urn:node:", "");
// TODO: Cache the node values from the CN listNode service
nodeServiceUrl = cfg.getString(nodeAbbr + ".serviceUrl");
} catch (ConfigurationException | IOException ce) {
JobExecutionException jee = new JobExecutionException(taskName + ": error executing task.");
jee.initCause(ce);
throw jee;
}

log.debug("Executing task " + taskType + ", " + taskName + " for node: " + nodeId);

Session session = DataONE.getSession(null, null);

try {
mrc = new HttpMultipartRestClient();
} catch (Exception e) {
log.error(taskName + ": error creating rest client: " + e.getMessage());
JobExecutionException jee = new JobExecutionException(e);
jee.setRefireImmediately(false);
throw jee;
}

cnNode = new MultipartCNode(mrc, nodeServiceUrl, session);
org.dataone.service.types.v2.NodeList nodeList = null;

try {
nodeList = cnNode.listNodes();
} catch (NotImplemented | ServiceFailure e) {
e.printStackTrace();
throw new JobExecutionException(taskName + ": cannot renew store, unable to schedule job", e);
}

// Get a connection to the database
MDQStore store = null;

try {
store = new DatabaseStore();
} catch (Exception e) {
e.printStackTrace();
throw new JobExecutionException(taskName + ": cannot create store, unable to schedule job", e);
}

if (!store.isAvailable()) {
try {
store.renew();
} catch (MetadigStoreException e) {
e.printStackTrace();
throw new JobExecutionException(taskName + ": cannot renew store, unable to schedule job", e);
}
}

Property property = null;
ArrayList<Property> plist = null;
for (Node node : nodeList.getNodeList()) {
log.debug("node: " + node.getName());
log.debug("type: " + node.getType().toString());
log.debug("id: " + node.getIdentifier().getValue());
log.debug("state: " + node.getState().toString());
log.debug("is synchonized: " + node.isSynchronize());

if (! node.isSynchronize()) {
log.debug(taskName + ": Skipping unsynchronized node " + node.getIdentifier().getValue());
continue;
} else if (node.getType().toString().equalsIgnoreCase("MN")) {
log.debug(taskName + ": saving node " + node.getIdentifier().getValue());
try {
store.saveNode(node);
} catch (MetadigStoreException mse) {
mse.printStackTrace();
throw new JobExecutionException("Cannot save node " + node.getIdentifier().getValue() + " to store", mse);
}
} else {
log.debug(taskName + ": skipping CN node: " + node.getIdentifier().getValue());
}
}

// For debugging purposes: retrieve and print out all node entries if trace logging is enabled.
if (log.isTraceEnabled()) {
log.trace("Retrieving and printing out all saved node harvest dates...");

ArrayList<Node> nodes = store.getNodes();
for (Node node : nodes) {
log.trace("identifier: " + node.getIdentifier().getValue());

DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
dateFormat.setTimeZone(TimeZone.getTimeZone("GMT"));
String lastHarvestDatetimeStr = dateFormat.format(node.getSynchronization().getLastHarvested());

log.trace("harvest: " + lastHarvestDatetimeStr);
log.trace("synchronize: " + node.isSynchronize());
log.trace("state: " + node.getState().toString());
log.trace("baseURL: " + node.getBaseURL());
}
}
}
}

Loading

0 comments on commit bc9b37a

Please sign in to comment.