Date: Wed, 19 Aug 2020 13:46:40 -0700
Subject: [PATCH 39/47] Code cleanup, adjust logging levels
---
.../edu/ucsb/nceas/mdqengine/DataONE.java | 42 ++++++++-------
.../edu/ucsb/nceas/mdqengine/MDQconfig.java | 18 +------
.../java/edu/ucsb/nceas/mdqengine/Worker.java | 13 -----
.../mdqengine/scheduler/JobScheduler.java | 4 +-
.../mdqengine/scheduler/RequestReportJob.java | 20 ++------
.../mdqengine/scheduler/RequestScorerJob.java | 39 +++++---------
.../ucsb/nceas/mdqengine/scorer/Scorer.java | 8 ---
.../nceas/mdqengine/store/DatabaseStore.java | 35 ++++++-------
.../ucsb/nceas/mdqengine/store/MDQStore.java | 51 +++++++++----------
9 files changed, 80 insertions(+), 150 deletions(-)
diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/DataONE.java b/src/main/java/edu/ucsb/nceas/mdqengine/DataONE.java
index 561ff9ee..17fea9e1 100644
--- a/src/main/java/edu/ucsb/nceas/mdqengine/DataONE.java
+++ b/src/main/java/edu/ucsb/nceas/mdqengine/DataONE.java
@@ -4,11 +4,12 @@
import org.apache.commons.logging.LogFactory;
import edu.ucsb.nceas.mdqengine.exception.MetadigProcessException;
import org.dataone.client.auth.AuthTokenSession;
+import org.dataone.client.rest.DefaultHttpMultipartRestClient;
+import org.dataone.client.rest.HttpMultipartRestClient;
import org.dataone.client.rest.MultipartRestClient;
import org.dataone.client.v2.impl.MultipartD1Node;
import org.dataone.service.types.v1.Session;
import edu.ucsb.nceas.mdqengine.exception.MetadigException;
-import org.dataone.client.rest.DefaultHttpMultipartRestClient;
import org.dataone.client.v2.impl.MultipartCNode;
import org.dataone.client.v2.impl.MultipartMNode;
import org.dataone.service.types.v1.Subject;
@@ -36,7 +37,7 @@ public class DataONE {
public static SubjectInfo getSubjectInfo(Subject rightsHolder, MultipartCNode CNnode,
Session session) throws MetadigProcessException {
- log.debug("Getting subject info for: " + rightsHolder.getValue());
+ log.trace("Getting subject info for: " + rightsHolder.getValue());
//MultipartCNode cnNode = null;
MetadigProcessException metadigException = null;
SubjectInfo subjectInfo = null;
@@ -68,7 +69,7 @@ public static MultipartD1Node getMultipartD1Node(Session session, String service
// First create an HTTP client
try {
- mrc = new DefaultHttpMultipartRestClient();
+ mrc = new HttpMultipartRestClient();
} catch (Exception ex) {
log.error("Error creating rest client: " + ex.getMessage());
metadigException = new MetadigProcessException("Unable to get collection pids");
@@ -80,10 +81,10 @@ public static MultipartD1Node getMultipartD1Node(Session session, String service
// Now create a DataONE object that uses the rest client
if (isCN) {
- log.debug("creating cn MultipartMNode" + ", subjectId: " + session.getSubject().getValue());
+ log.debug("creating cn MultipartMNode");
d1Node = new MultipartCNode(mrc, serviceUrl, session);
} else {
- log.debug("creating mn MultipartMNode" + ", subjectId: " + session.getSubject().getValue());
+ log.debug("creating mn MultipartMNode");
d1Node = new MultipartMNode(mrc, serviceUrl, session);
}
return d1Node;
@@ -98,9 +99,6 @@ public static MultipartD1Node getMultipartD1Node(Session session, String service
* @return an XML document containing the query result
* @throws Exception
*/
- //public static Document querySolr(String queryStr, int startPos, int countRequested, MultipartCNode cnNode,
- // MultipartMNode mnNode, Boolean isCN,
- // Session session) throws MetadigProcessException {
public static Document querySolr(String queryStr, int startPos, int countRequested, MultipartD1Node d1Node,
Session session) throws MetadigProcessException {
@@ -110,10 +108,10 @@ public static Document querySolr(String queryStr, int startPos, int countRequest
InputStream qis = null;
MetadigProcessException metadigException = null;
- log.debug("Sending query: " + queryStr);
+ log.trace("Sending query: " + queryStr);
try {
qis = d1Node.query(session, "solr", queryStr);
- log.debug("Sent query");
+ log.trace("Sent query");
} catch (Exception e) {
log.error("Error retrieving pids: " + e.getMessage());
metadigException = new MetadigProcessException("Unable to query dataone node: " + e.getMessage());
@@ -121,19 +119,19 @@ public static Document querySolr(String queryStr, int startPos, int countRequest
throw metadigException;
}
- log.debug("Creating xml doc with results");
+ log.trace("Creating xml doc with results");
Document xmldoc = null;
DocumentBuilder builder = null;
try {
// If results were returned, create an XML document from them
- log.debug("qis available: " + qis.available());
+ log.trace("qis available: " + qis.available());
if (qis.available() > 0) {
try {
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
builder = factory.newDocumentBuilder();
xmldoc = builder.parse(new InputSource(qis));
- log.debug("Created xml doc: " + xmldoc.toString());
+ log.trace("Created xml doc: " + xmldoc.toString());
} catch (Exception e) {
log.error("Unable to create w3c Document from input stream", e);
e.printStackTrace();
@@ -145,13 +143,13 @@ public static Document querySolr(String queryStr, int startPos, int countRequest
qis.close();
}
} catch (IOException ioe) {
- log.debug("IO exception: " + ioe.getMessage());
+ log.trace("IO exception: " + ioe.getMessage());
metadigException = new MetadigProcessException("Unable prepare query result xml document: " + ioe.getMessage());
metadigException.initCause(ioe);
throw metadigException;
}
- log.debug("Created results xml doc");
+ log.trace("Created results xml doc");
return xmldoc;
}
@@ -169,10 +167,10 @@ public static Session getSession(String subjectId, String authToken) {
// query Solr - either the member node or cn, for the project 'solrquery' field
if (authToken == null || authToken.isEmpty()) {
- log.debug("Creating public sessioni");
+ log.trace("Creating public sessioni");
session = new Session();
} else {
- log.debug("Creating authentication session for subjectId: " + subjectId + ", token: " + authToken.substring(0, 5) + "...");
+ log.trace("Creating authentication session for subjectId: " + subjectId + ", token: " + authToken.substring(0, 5) + "...");
session = new AuthTokenSession(authToken);
}
@@ -180,7 +178,7 @@ public static Session getSession(String subjectId, String authToken) {
Subject subject = new Subject();
subject.setValue(subjectId);
session.setSubject(subject);
- log.debug("Set session subjectId to: " + session.getSubject().getValue());
+ log.trace("Set session subjectId to: " + session.getSubject().getValue());
}
return session;
@@ -199,18 +197,18 @@ public static Boolean isCN(String nodeStr) {
if (nodeStr.matches("^\\s*urn:node:.*")) {
if (nodeStr.matches("^\\s*urn:node:CN.*$|^\\s*urn:node:cn.*$")) {
isCN = true;
- log.debug("The nodeId is for a CN: " + nodeStr);
+ log.trace("The nodeId is for a CN: " + nodeStr);
} else {
- log.debug("The nodeId is not for a CN: " + nodeStr);
+ log.trace("The nodeId is not for a CN: " + nodeStr);
isCN = false;
}
} else {
// match cn service url e.g. "https://cn.dataone.org/cn"
if (nodeStr.matches("^\\s*https*://cn.*?\\.dataone\\.org.*$|https*://cn.*?\\.test\\.dataone\\.org.*$")) {
isCN = true;
- log.debug("The service URL is for a CN: " + nodeStr);
+ log.trace("The service URL is for a CN: " + nodeStr);
} else {
- log.debug("The service URL is not for a CN: " + nodeStr);
+ log.trace("The service URL is not for a CN: " + nodeStr);
isCN = false;
}
}
diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/MDQconfig.java b/src/main/java/edu/ucsb/nceas/mdqengine/MDQconfig.java
index c2840e01..b3e7de4a 100644
--- a/src/main/java/edu/ucsb/nceas/mdqengine/MDQconfig.java
+++ b/src/main/java/edu/ucsb/nceas/mdqengine/MDQconfig.java
@@ -20,19 +20,7 @@ public class MDQconfig {
public static Configuration config;
public MDQconfig () throws ConfigurationException, IOException {
- // Check if we are running in a servlet
boolean inServlet = false;
- /*
- try {
- Class servletClass = Class.forName("javax.servlet.http.HttpServlet");
- inServlet = true;
- log.debug("Loaded javax.servlet.http.HttpServlet - running in servlet environment.");
- //} catch (ClassNotFoundException ex) {
- } catch (Exception e) {
- log.debug("Unable to load javax.servlet.http.HttpServlet - not running in servlet environment.");
- inServlet = false;
- }
- */
// If running in a servlet, have to get the config info from the webapp context, as we can't
// read from external dirs on disk.
@@ -41,15 +29,13 @@ public MDQconfig () throws ConfigurationException, IOException {
InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream("/metadig.properties");
String TMP_DIR = System.getProperty("java.io.tmpdir");
File tempFile = new File(TMP_DIR + "/metadig.properties");
- log.debug("Reading config properties in servlet from: " + tempFile);
+ log.trace("Reading config properties in servlet from: " + tempFile);
FileOutputStream out = new FileOutputStream(tempFile);
IOUtils.copy(inputStream, out);
config = configs.properties(tempFile);
- log.debug("Successfully read properties from: " + tempFile);
} else {
- log.debug("Reading config properties from: " + configFilePath);
+ log.trace("Reading config properties from: " + configFilePath);
config = configs.properties(new File(configFilePath));
- log.debug("Successfully read properties from: " + configFilePath);
}
}
diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/Worker.java b/src/main/java/edu/ucsb/nceas/mdqengine/Worker.java
index 7cd516bb..ecbbb554 100644
--- a/src/main/java/edu/ucsb/nceas/mdqengine/Worker.java
+++ b/src/main/java/edu/ucsb/nceas/mdqengine/Worker.java
@@ -477,19 +477,6 @@ public Run processReport(QueueEntry message) throws InterruptedException, Except
} catch (Throwable thrown) {
log.error("Error while waiting for group lookup thread completion");
}
- // Wait for a few seconds for the 'accounts'
-// for (int i = 0; i < 5; i++) {
-// try {
-// groups = future.get();
-// } catch (Throwable thrown) {
-// log.error("Error while waiting for thread completion");
-// }
-// // Sleep for 1 second
-//
-// if (groups.size() > 0 ) break;
-// log.debug("Waiting 1 second for DataONE group lookup");
-// Thread.sleep(1000);
-// }
if (groups != null) {
smm.setGroups(groups);
diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/JobScheduler.java b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/JobScheduler.java
index 90efc13f..3f9612a3 100644
--- a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/JobScheduler.java
+++ b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/JobScheduler.java
@@ -85,6 +85,7 @@ public static void main(String[] argv) throws Exception {
cronSchedule = record.get("cron-schedule").trim();
params = record.get("params").trim();
log.debug("Task type: " + taskType);
+ log.debug("Task name: " + taskName);
log.debug("cronSchedule: " + cronSchedule);
params = params.startsWith("\"") ? params.substring(1) : params;
params = params.endsWith("\"") ? params.substring(0, params.length()-1) : params;
@@ -182,7 +183,6 @@ public static void main(String[] argv) throws Exception {
}
try {
- log.debug("Setting task");
// Currently there is only taskType="quality", but there could be more in the future!
JobDetail job = null;
if(taskType.equals("quality")) {
@@ -223,13 +223,11 @@ public static void main(String[] argv) throws Exception {
.build();
}
- log.debug("Setting trigger");
CronTrigger trigger = newTrigger()
.withIdentity(taskName + "-trigger", taskGroup)
.withSchedule(cronSchedule(cronSchedule))
.build();
- log.debug("Scheduling task");
scheduler.scheduleJob(job, trigger);
} catch (SchedulerException se) {
diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestReportJob.java b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestReportJob.java
index 6a11c68c..19fdc7ea 100644
--- a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestReportJob.java
+++ b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestReportJob.java
@@ -124,21 +124,13 @@ public void execute(JobExecutionContext context)
JobDataMap dataMap = context.getJobDetail().getJobDataMap();
String taskName = dataMap.getString("taskName");
- log.debug("taskName: " + taskName);
String taskType = dataMap.getString("taskType");
- log.debug("taskType: " + taskType);
String pidFilter = dataMap.getString("pidFilter");
- log.debug("pidFilter: " + pidFilter);
String suiteId = dataMap.getString("suiteId");
- log.debug("suiteId: " + suiteId);
String nodeId = dataMap.getString("nodeId");
- log.debug("nodeId: " + nodeId);
String startHarvestDatetimeStr = dataMap.getString("startHarvestDatetime");
- log.debug("startHavestDatetimeStr: " + startHarvestDatetimeStr);
int harvestDatetimeInc = dataMap.getInt("harvestDatetimeInc");
- log.debug("harvestDatetimeInc: " + harvestDatetimeInc);
int countRequested = dataMap.getInt("countRequested");
- log.debug("countRequested: " + countRequested);
MultipartRestClient mrc = null;
MultipartMNode mnNode = null;
MultipartCNode cnNode = null;
@@ -162,7 +154,7 @@ public void execute(JobExecutionContext context)
throw jee;
}
- log.debug("Executing task for node: " + nodeId + ", suiteId: " + suiteId);
+ log.info("Executing task " + taskType + ", " + taskName + " for node: " + nodeId + ", suiteId: " + suiteId);
try {
mrc = new HttpMultipartRestClient();
@@ -183,7 +175,7 @@ public void execute(JobExecutionContext context)
mnNode = new MultipartMNode(mrc, nodeServiceUrl, session);
}
- // Don't know node type yet from the id, so have to manually check if it's a CN
+ // Get a connection to the database
MDQStore store = null;
try {
@@ -208,13 +200,9 @@ public void execute(JobExecutionContext context)
DateTime currentDT = new DateTime(DateTimeZone.UTC);
DateTimeFormatter dtfOut = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SS'Z'");
String currentDatetimeStr = dtfOut.print(currentDT);
-
DateTime startDateTimeRange = null;
DateTime endDateTimeRange = null;
-
String lastHarvestDateStr = null;
- //edu.ucsb.nceas.mdqengine.model.Node node;
- //node = store.getNode(nodeId, jobName);
Task task;
task = store.getTask(taskName, taskType);
@@ -349,8 +337,8 @@ public ListResult getPidsToProcess(MultipartCNode cnNode, MultipartMNode mnNode,
Date endDate = new Date(msSinceEpoch);
try {
- // Even though MultipartMNode and MultipartCNode have the same parent class, their interfaces are differnt, so polymorphism
- // isn't happening here.
+ // Even though MultipartMNode and MultipartCNode have the same parent class D1Node, the interface for D1Node doesn't
+ // include listObjects (it should), so we have to maintain a cnNode and mnNode.
if(isCN) {
objList = cnNode.listObjects(session, startDate, endDate, formatId, nodeRef, identifier, startCount, countRequested);
} else {
diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestScorerJob.java b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestScorerJob.java
index 11d965fe..fe908c2d 100644
--- a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestScorerJob.java
+++ b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestScorerJob.java
@@ -3,6 +3,7 @@
import edu.ucsb.nceas.mdqengine.Controller;
import edu.ucsb.nceas.mdqengine.MDQconfig;
import edu.ucsb.nceas.mdqengine.DataONE;
+import edu.ucsb.nceas.mdqengine.exception.MetadigException;
import edu.ucsb.nceas.mdqengine.exception.MetadigProcessException;
import edu.ucsb.nceas.mdqengine.exception.MetadigStoreException;
import edu.ucsb.nceas.mdqengine.model.Task;
@@ -16,11 +17,7 @@
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
-import org.dataone.client.rest.DefaultHttpMultipartRestClient;
-import org.dataone.client.rest.MultipartRestClient;
-import org.dataone.client.v2.impl.MultipartCNode;
import org.dataone.client.v2.impl.MultipartD1Node;
-import org.dataone.client.v2.impl.MultipartMNode;
import org.dataone.service.types.v1.*;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
@@ -33,8 +30,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
/**
*
@@ -74,7 +69,7 @@ Integer getResultCount() {
}
// Since Quartz will re-instantiate a class every time it
- // gets executed, members non-static member variables can
+ // gets executed, non-static member variables can
// not be used to maintain state!
/**
@@ -146,13 +141,12 @@ public void execute(JobExecutionContext context)
try {
cfg = new MDQconfig();
qualityServiceUrl = cfg.getString("quality.serviceUrl");
- log.debug("nodeId from request: " + nodeId);
+ log.trace("nodeId from request: " + nodeId);
String nodeAbbr = nodeId.replace("urn:node:", "");
authToken = cfg.getString(nodeAbbr + ".authToken");
subjectId = cfg.getString(nodeAbbr + ".subjectId");
- // TODO: Cache the node values from the CN listNode service
nodeServiceUrl = cfg.getString(nodeAbbr + ".serviceUrl");
- log.debug("nodeServiceUrl: " + nodeServiceUrl);
+ log.trace("nodeServiceUrl: " + nodeServiceUrl);
} catch (ConfigurationException | IOException ce) {
JobExecutionException jee = new JobExecutionException("Error executing task.");
jee.initCause(ce);
@@ -271,12 +265,11 @@ public void execute(JobExecutionContext context)
throw jee;
}
} else {
- log.debug("Getting portal pids to process...");
+ Integer allIds = 0;
boolean morePids = true;
while (morePids) {
ArrayList pidsToProcess = null;
- log.debug("startCount: " + startCount);
- log.debug("countRequested:" + countRequested);
+ log.trace("Getting portal pids to process, startCount: " + startCount + ", countRequested: " + countRequested);
try {
result = getPidsToProcess(d1Node, session, pidFilter, startDTRstr, endDTRstr, startCount, countRequested);
@@ -288,7 +281,7 @@ public void execute(JobExecutionContext context)
throw jee;
}
- log.info("Found " + resultCount + " seriesIds" + " for date: " + startDTRstr + " at servierUrl: " + nodeServiceUrl);
+ log.trace(taskName + ": found " + resultCount + " seriesIds" + " for date: " + startDTRstr + " at servierUrl: " + nodeServiceUrl);
for (String pidStr : pidsToProcess) {
try {
submitScorerRequest(qualityServiceUrl, pidStr, suiteId, nodeId, formatFamily);
@@ -304,15 +297,12 @@ public void execute(JobExecutionContext context)
if (resultCount >= countRequested) {
morePids = true;
startCount = startCount + resultCount;
- log.info("Paging through more results, current start is " + startCount);
+ log.trace("Paging through more results, current start is " + startCount);
} else {
morePids = false;
// Record the new "last harvested" date
task.setLastHarvestDatetime(endDTRstr);
- log.debug("taskName: " + task.getTaskName());
- log.debug("taskType: " + task.getTaskType());
- log.debug("lastharvestdate: " + task.getLastHarvestDatetime());
try {
store.saveTask(task);
@@ -360,7 +350,7 @@ public ListResult getPidsToProcess(MultipartD1Node d1Node, Session session,
String queryStr = "?q=formatId:" + pidFilter + "+-obsoletedBy:*" + "+dateUploaded:[" + startHarvestDatetimeStr + "%20TO%20"
+ endHarvestDatetimeStr + "]"
+ "&fl=seriesId&q.op=AND";
- log.debug("query: " + queryStr);
+ log.trace("query: " + queryStr);
// Send the query to DataONE Solr to retrieve portal seriesIds for a given time frame
@@ -370,7 +360,7 @@ public ListResult getPidsToProcess(MultipartD1Node d1Node, Session session,
int thisResultLength;
// Now setup the xpath to retrieve the ids returned from the collection query.
try {
- log.debug("Compiling xpath for seriesId");
+ log.trace("Compiling xpath for seriesId");
// Extract the collection query from the Solr result XML
XPathFactory xPathfactory = XPathFactory.newInstance();
xpath = xPathfactory.newXPath();
@@ -384,9 +374,7 @@ public ListResult getPidsToProcess(MultipartD1Node d1Node, Session session,
// Loop through the Solr result. As the result may be large, page through the results, accumulating
// the pids returned into a ListResult object.
-
- //log.debug("Getting portal seriesIds from Solr using subjectId: " + subjectId + ", servicerUrl: " + serviceUrl);
- log.debug("Getting portal seriesIds from Solr " );
+ log.trace("Getting portal seriesIds from Solr " );
int startPos = startCount;
do {
@@ -408,13 +396,13 @@ public ListResult getPidsToProcess(MultipartD1Node d1Node, Session session,
}
String currentPid = null;
thisResultLength = xpathResult.getLength();
- log.debug("Got " + thisResultLength + " pids this query");
+ log.trace("Got " + thisResultLength + " pids this query");
if(thisResultLength == 0) break;
for (int index = 0; index < xpathResult.getLength(); index++) {
node = xpathResult.item(index);
currentPid = node.getTextContent();
pids.add(currentPid);
- log.debug("adding pid: " + currentPid);
+ log.trace("adding pid: " + currentPid);
}
startPos += thisResultLength;
@@ -453,7 +441,6 @@ public void submitScorerRequest(String qualityServiceUrl, String collectionId, S
// send to service
log.debug("submitting scores request : " + scorerServiceUrl);
- //post.setEntity((HttpEntity) entity);
CloseableHttpClient client = HttpClients.createDefault();
CloseableHttpResponse response = client.execute(post);
diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/scorer/Scorer.java b/src/main/java/edu/ucsb/nceas/mdqengine/scorer/Scorer.java
index e61bbfcd..23ea5697 100644
--- a/src/main/java/edu/ucsb/nceas/mdqengine/scorer/Scorer.java
+++ b/src/main/java/edu/ucsb/nceas/mdqengine/scorer/Scorer.java
@@ -99,14 +99,6 @@ void setResult(ArrayList result) {
ArrayList getResult() {
return this.result;
}
-
-// void setResultCount(Integer count) {
-// this.resultCount = count;
-// }
-//
-// Integer getResultCount() {
-// return this.resultCount;
-// }
}
public static void main(String[] argv) throws Exception {
diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/store/DatabaseStore.java b/src/main/java/edu/ucsb/nceas/mdqengine/store/DatabaseStore.java
index 3fcca606..9958136c 100644
--- a/src/main/java/edu/ucsb/nceas/mdqengine/store/DatabaseStore.java
+++ b/src/main/java/edu/ucsb/nceas/mdqengine/store/DatabaseStore.java
@@ -48,7 +48,7 @@ public class DatabaseStore implements MDQStore {
private DataSource dataSource = null;
public DatabaseStore () throws MetadigStoreException {
- log.debug("Initializing a new DatabaseStore to " + dbUrl + ".");
+ log.trace("Initializing a new DatabaseStore to " + dbUrl + ".");
this.init();
}
@@ -57,7 +57,7 @@ public DatabaseStore () throws MetadigStoreException {
*/
private void init() throws MetadigStoreException {
- log.debug("initializing connection");
+ log.trace("initializing connection");
String additionalDir = null;
try {
MDQconfig cfg = new MDQconfig();
@@ -90,7 +90,7 @@ private void init() throws MetadigStoreException {
throw(mse);
}
- log.debug("Connection initialized");
+ log.trace("Connection initialized");
PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
@@ -111,7 +111,6 @@ private void init() throws MetadigStoreException {
Suite suite = null;
try {
URL url = resource.getURL();
- //log.debug("Loading suite found at: " + url.toString());
String xml = IOUtils.toString(url.openStream(), "UTF-8");
suite = (Suite) XmlMarshaller.fromXml(xml, Suite.class);
} catch (JAXBException | IOException | SAXException e) {
@@ -123,7 +122,7 @@ private void init() throws MetadigStoreException {
}
}
if(this.isAvailable()) {
- log.debug("Initialized database store: opened database successfully");
+ log.trace("Initialized database store: opened database successfully");
} else {
throw new MetadigStoreException("Error initializing database, connection not available");
}
@@ -153,13 +152,13 @@ public Run getRun(String metadataId, String suiteId) throws MetadigStoreExceptio
MetadigStoreException me = new MetadigStoreException("Unable get quality report to the datdabase.");
// Select records from the 'runs' table
try {
- log.debug("preparing statement for query");
+ log.trace("preparing statement for query");
String sql = "select * from runs where metadata_id = ? and suite_id = ?";
stmt = conn.prepareStatement(sql);
stmt.setString(1, metadataId);
stmt.setString(2, suiteId);
- log.debug("issuing query: " + sql);
+ log.trace("issuing query: " + sql);
ResultSet rs = stmt.executeQuery();
if(rs.next()) {
mId = rs.getString("metadata_id");
@@ -176,9 +175,9 @@ public Run getRun(String metadataId, String suiteId) throws MetadigStoreExceptio
// have to be manually added after the JAXB marshalling has created the run object.
run.setSequenceId(seqId);
run.setIsLatest(isLatest);
- log.debug("Retrieved run successfully for metadata id: " + run.getObjectIdentifier());
+ log.trace("Retrieved run successfully for metadata id: " + run.getObjectIdentifier());
} else {
- log.debug("Run not found for metadata id: " + metadataId + ", suiteId: " + suiteId);
+ log.trace("Run not found for metadata id: " + metadataId + ", suiteId: " + suiteId);
}
} catch ( Exception e ) {
log.error( e.getClass().getName()+": "+ e.getMessage());
@@ -210,8 +209,6 @@ public void saveRun(Run run) throws MetadigStoreException {
String sequenceId = run.getSequenceId();
Boolean isLatest = run.getIsLatest();
String resultStr = null;
- //DateTime now = new DateTime();
- //OffsetDateTime dateTime = OffsetDateTime.now();
Timestamp dateTime = Timestamp.from(Instant.now());
run.setTimestamp(dateTime);
@@ -288,7 +285,7 @@ public void saveRun(Run run) throws MetadigStoreException {
}
// Next, insert a record into the child table ('runs')
- log.debug("Records created successfully");
+ log.trace("Records created successfully");
}
/*
@@ -296,7 +293,7 @@ public void saveRun(Run run) throws MetadigStoreException {
*/
public boolean isAvailable() {
boolean reachable = false;
- log.debug("Checking if store (i.e. sql connection) is available.");
+ log.trace("Checking if store (i.e. sql connection) is available.");
try {
reachable = conn.isValid(10);
} catch (Exception e ) {
@@ -310,7 +307,7 @@ public boolean isAvailable() {
*/
public void renew() throws MetadigStoreException {
if(!this.isAvailable()) {
- log.debug("Renewing connection to database");
+ log.trace("Renewing connection to database");
this.init();
}
}
@@ -319,7 +316,7 @@ public void shutdown() {
try {
conn.close();
- log.debug("Successfully closed database");
+ log.trace("Successfully closed database");
} catch ( java.sql.SQLException e) {
log.error("Error closing database: " + e.getMessage());
}
@@ -355,7 +352,7 @@ public void saveTask(Task task) throws MetadigStoreException {
}
// Next, insert a record into the child table ('runs')
- log.debug("Records created successfully");
+ log.trace("Records created successfully");
}
public Task getTask(String taskName, String taskType) {
@@ -368,13 +365,13 @@ public Task getTask(String taskName, String taskType) {
// Select records from the 'nodes' table
try {
- log.debug("preparing statement for query");
+ log.trace("preparing statement for query");
String sql = "select * from tasks where task_name = ? and task_type = ?";
stmt = conn.prepareStatement(sql);
stmt.setString(1, taskName);
stmt.setString(2, taskType);
- log.debug("issuing query: " + sql);
+ log.trace("issuing query: " + sql);
ResultSet rs = stmt.executeQuery();
if(rs.next()) {
task.setTaskName(rs.getString("task_name"));
@@ -383,7 +380,7 @@ public Task getTask(String taskName, String taskType) {
rs.close();
stmt.close();
} else {
- log.debug("No results returned from query");
+ log.trace("No results returned from query");
}
} catch ( Exception e ) {
log.error( e.getClass().getName()+": "+ e.getMessage());
diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/store/MDQStore.java b/src/main/java/edu/ucsb/nceas/mdqengine/store/MDQStore.java
index c573803d..b9796c29 100644
--- a/src/main/java/edu/ucsb/nceas/mdqengine/store/MDQStore.java
+++ b/src/main/java/edu/ucsb/nceas/mdqengine/store/MDQStore.java
@@ -7,33 +7,30 @@
public interface MDQStore {
- public Collection listSuites();
- public Suite getSuite(String id);
- public void createSuite(Suite suite);
- public void updateSuite(Suite suite);
- public void deleteSuite(Suite suite);
-
- public Collection listChecks();
- public Check getCheck(String id);
- public void createCheck(Check check);
- public void updateCheck(Check check);
- public void deleteCheck(Check check);
+ Collection listSuites();
+ Suite getSuite(String id);
+ void createSuite(Suite suite);
+ void updateSuite(Suite suite);
+ void deleteSuite(Suite suite);
+
+ Collection listChecks();
+ Check getCheck(String id);
+ void createCheck(Check check);
+ void updateCheck(Check check);
+ void deleteCheck(Check check);
- public Collection listRuns();
- public Run getRun(String suite, String id ) throws MetadigStoreException;
- public void saveRun(Run run) throws MetadigStoreException;
- public void createRun(Run run);
- public void deleteRun(Run run);
-
- public void shutdown();
-
- public boolean isAvailable();
- public void renew() throws MetadigStoreException;
-//
-// public Node getNode(String nodeId, String jobName);
-// public void saveNode(Node node) throws MetadigStoreException;
-
- public Task getTask(String taskName, String taskType);
- public void saveTask(Task task) throws MetadigStoreException;
+ Collection listRuns();
+ Run getRun(String suite, String id ) throws MetadigStoreException;
+ void saveRun(Run run) throws MetadigStoreException;
+ void createRun(Run run);
+ void deleteRun(Run run);
+
+ void shutdown();
+
+ boolean isAvailable();
+ void renew() throws MetadigStoreException;
+
+ Task getTask(String taskName, String taskType);
+ void saveTask(Task task) throws MetadigStoreException;
}
From ef01e2639accf142080b7cb8559b667ec21d1c76 Mon Sep 17 00:00:00 2001
From: gothub
Date: Wed, 19 Aug 2020 13:50:54 -0700
Subject: [PATCH 40/47] CN harvesting is missing some pids bug (#267)
---
.../mdqengine/scheduler/RequestReportJob.java | 177 +++++++++---------
1 file changed, 86 insertions(+), 91 deletions(-)
diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestReportJob.java b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestReportJob.java
index 19fdc7ea..43ebc9e0 100644
--- a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestReportJob.java
+++ b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestReportJob.java
@@ -37,8 +37,6 @@
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Date;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
/**
*
@@ -56,37 +54,44 @@ public class RequestReportJob implements Job {
private Log log = LogFactory.getLog(RequestReportJob.class);
class ListResult {
- // The total result count returned from DataONE
- Integer totalResultCount;
+ // The total result count for all object types returned from DataONE. This is the count of all object types
+ // that were retrieved for a given request. The DataONE 'listObjects' service does provide
+ // parameters to filter by formatId wildcard, so we have to retrieve all pids for a time range
+ // and filter the result list.
+ private Integer totalResultCount = 0;
// The filtered result count returned from DataONE.
// The DataONE listObjects service returns all new pids for all formatIds
// but we are typically only interested in a subset of those, i.e. EML metadata pids,
// so this is the count of pids from the result that we are actually interested in.
- Integer filteredResultCount;
- ArrayList result = new ArrayList<>();
+ private Integer filteredResultCount = 0;
+ private ArrayList result = new ArrayList<>();
+
+ // The scheduler keeps track of the sysmeta 'dateSystemMetadataModified' of the last pid harvested,
+ // which will be used as the starting time of the next harvest.
+ private DateTime lastDateModifiedDT = null;
void setResult(ArrayList result) {
this.result = result;
}
- ArrayList getResult() {
+ public ArrayList getResult() {
return this.result;
}
void setTotalResultCount(Integer count) {
this.totalResultCount = count;
}
- void setFilteredResultCount(Integer count) {
- this.filteredResultCount = count;
+ void setFilteredResultCount(Integer count) { this.filteredResultCount = count; }
+ void setLastDateModified(DateTime date) {
+ log.debug("Setter last modified date, date: " + date.toString());
+ this.lastDateModifiedDT = date;
}
- Integer getTotalResultCount() {
- return this.totalResultCount;
- }
+ public Integer getTotalResultCount() { return this.totalResultCount; }
- Integer getFilteredResultCount() {
- return this.filteredResultCount;
- }
+ public Integer getFilteredResultCount() { return this.filteredResultCount; }
+
+ public DateTime getLastDateModified() { return this.lastDateModifiedDT; }
}
// Since Quartz will re-instantiate a class every time it
@@ -198,7 +203,7 @@ public void execute(JobExecutionContext context)
// Get current datetime, which may be used for start time range.
DateTimeZone.setDefault(DateTimeZone.UTC);
DateTime currentDT = new DateTime(DateTimeZone.UTC);
- DateTimeFormatter dtfOut = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SS'Z'");
+ DateTimeFormatter dtfOut = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
String currentDatetimeStr = dtfOut.print(currentDT);
DateTime startDateTimeRange = null;
DateTime endDateTimeRange = null;
@@ -219,58 +224,63 @@ public void execute(JobExecutionContext context)
lastHarvestDateStr = task.getLastHarvestDatetime();
}
- DateTime lastHarvestDate = new DateTime(lastHarvestDateStr);
+ DateTime lastHarvestDateDT = new DateTime(lastHarvestDateStr);
// Set the search start datetime to the last harvest datetime, unless it is in the
// future. (This can happen when the previous time range end was for the current day,
// as the end datetime range for the previous task run will have been stored as the
// new lastharvestDateTime.
- DateTime startDTR = null;
- if(lastHarvestDate.isAfter(currentDT.toInstant())) {
- startDTR = currentDT;
+ DateTime startDT = null;
+ if(lastHarvestDateDT.isAfter(currentDT.toInstant())) {
+ startDT = currentDT;
} else {
- startDTR = new DateTime(lastHarvestDate);
+ startDT = new DateTime(lastHarvestDateDT);
}
- DateTime endDTR = new DateTime(startDTR);
- endDTR = endDTR.plusDays(harvestDatetimeInc);
- if(endDTR.isAfter(currentDT.toInstant())) {
- endDTR = currentDT;
+ DateTime endDT = new DateTime(startDT);
+ endDT = endDT.plusDays(harvestDatetimeInc);
+ if(endDT.isAfter(currentDT.toInstant())) {
+ endDT = currentDT;
}
- // If the start and end harvest dates are the same (happends for a new node), then
- // tweek the start so that DataONE listObjects doesn't complain.
- if(startDTR == endDTR ) {
- startDTR = startDTR.minusMinutes(1);
+ // If the start and end harvest dates are the same (happens for a new node), then
+ // tweak the start so that DataONE listObjects doesn't complain.
+ if(startDT == endDT ) {
+ startDT = startDT.minusMinutes(1);
+ log.debug("Reset start back 1 minute to: " + startDT);
}
- String startDTRstr = dtfOut.print(startDTR);
- String endDTRstr = dtfOut.print(endDTR);
+ // Track the sysmeta dateUploaded of the latest harvested pid. This will become the starting time of
+ // the next harvest.
+ DateTime lastDateModifiedDT = startDT;
+
+ String startDTstr = dtfOut.print(startDT);
+ String endDTstr = dtfOut.print(endDT);
Integer startCount = new Integer(0);
ListResult result = null;
- Integer totalResultCount = null;
- Integer filteredResultCount = null;
+ Integer totalResultCount = 0;
+ Integer filteredResultCount = 0;
+ Integer allPidsCnt = 0;
boolean morePids = true;
while(morePids) {
ArrayList pidsToProcess = null;
- log.info("Getting pids for node: " + nodeId + ", suiteId: " + suiteId + ", harvest start: " + startDTRstr);
-
try {
- result = getPidsToProcess(cnNode, mnNode, isCN, session, suiteId, nodeId, pidFilter, startDTRstr, endDTRstr, startCount, countRequested);
+ result = getPidsToProcess(cnNode, mnNode, isCN, session, suiteId, nodeId, pidFilter, startDTstr, endDTstr, startCount, countRequested, lastDateModifiedDT);
pidsToProcess = result.getResult();
totalResultCount = result.getTotalResultCount();
filteredResultCount = result.getFilteredResultCount();
+ lastDateModifiedDT = result.getLastDateModified();
} catch (Exception e) {
JobExecutionException jee = new JobExecutionException("Unable to get pids to process", e);
jee.setRefireImmediately(false);
throw jee;
}
- log.info("Found " + filteredResultCount + " pids" + " for node: " + nodeId);
+ allPidsCnt = pidsToProcess.size();
for (String pidStr : pidsToProcess) {
try {
- log.info("submitting pid: " + pidStr);
+ log.debug("submitting pid: " + pidStr);
submitReportRequest(cnNode, mnNode, isCN, session, qualityServiceUrl, pidStr, suiteId);
} catch (org.dataone.service.exceptions.NotFound nfe) {
log.error("Unable to process pid: " + pidStr + nfe.getMessage());
@@ -278,16 +288,24 @@ public void execute(JobExecutionContext context)
} catch (Exception e) {
log.error("Unable to process pid: " + pidStr + " - " + e.getMessage());
continue;
- //JobExecutionException jee = new JobExecutionException("Unable to submit request to create new quality reports", e);
- //jee.setRefireImmediately(false);
- //throw jee;
}
}
- task.setLastHarvestDatetime(endDTRstr);
- log.debug("taskName: " + task.getTaskName());
- log.debug("taskType: " + task.getTaskType());
- log.debug("lastharvestdate: " + task.getLastHarvestDatetime());
+ // Check if DataONE returned the max number of results. If so, we have to request more by paging through
+ // the results returned pidsToProcess (i.e. DataONE listObjects service). If the returned result is
+ // less than the requested result, then all pids have been retrieved.
+ if(totalResultCount >= countRequested) {
+ morePids = true;
+ startCount = startCount + totalResultCount;
+ log.trace("Paging through more results, current start is " + startCount);
+ } else {
+ morePids = false;
+ }
+ }
+ // Don't update the lastHarvestDateDT if no pids were found.
+ if (allPidsCnt > 0) {
+ task.setLastHarvestDatetime(dtfOut.print(lastDateModifiedDT));
+ log.debug("Saving lastHarvestDate: " + dtfOut.print(lastDateModifiedDT));
try {
store.saveTask(task);
} catch (MetadigStoreException mse) {
@@ -296,24 +314,15 @@ public void execute(JobExecutionContext context)
jee.setRefireImmediately(false);
throw jee;
}
-
- // Check if DataONE returned the max number of results. If so, we have to request more by paging through
- // the results returned pidsToProcess (i.e. DataONE listObjects service).
- if(totalResultCount >= countRequested) {
- morePids = true;
- startCount = startCount + totalResultCount;
- log.info("Paging through more results, current start is " + startCount);
- } else {
- morePids = false;
- }
}
+ log.info(taskName + ": Found " + allPidsCnt + " pids for start: " + startDTstr + ", end: " + endDTstr + " at servierUrl: " + nodeServiceUrl);
store.shutdown();
}
public ListResult getPidsToProcess(MultipartCNode cnNode, MultipartMNode mnNode, Boolean isCN, Session session,
String suiteId, String nodeId, String pidFilter, String startHarvestDatetimeStr,
String endHarvestDatetimeStr, int startCount,
- int countRequested) throws Exception {
+ int countRequested, DateTime lastDateModifiedDT) throws Exception {
ArrayList pids = new ArrayList();
InputStream qis = null;
@@ -353,15 +362,16 @@ public ListResult getPidsToProcess(MultipartCNode cnNode, MultipartMNode mnNode,
String thisFormatId = null;
String thisPid = null;
int pidCount = 0;
+ Date thisDateModified;
if (objList.getCount() > 0) {
for(ObjectInfo oi: objList.getObjectInfoList()) {
thisFormatId = oi.getFormatId().getValue();
thisPid = oi.getIdentifier().getValue();
- log.debug("Checking pid: " + thisPid + ", format: " + thisFormatId);
+ log.trace("Checking pid: " + thisPid + ", format: " + thisFormatId);
- // Check all pid filters. There could be multiple wildcard filters, which are separated
- // by ','.
+ // Check all pid filters to see if this pids's format was found in the list of desired formats.
+ // There could be multiple wildcard filters, which are separated by ','.
String [] filters = pidFilter.split("\\|");
Boolean found = false;
for(String thisFilter:filters) {
@@ -378,7 +388,16 @@ public ListResult getPidsToProcess(MultipartCNode cnNode, MultipartMNode mnNode,
// if (!runExists(thisPid, suiteId, store)) {
pidCount = pidCount++;
pids.add(thisPid);
- log.info("adding pid " + thisPid + ", formatId: " + thisFormatId);
+ log.trace("adding pid " + thisPid + ", formatId: " + thisFormatId);
+ // If this pid's modified date is after the stored latest encountered modified date, then update
+ // the lastModified date
+ DateTime thisDateModifiedDT = new DateTime(oi.getDateSysMetadataModified());
+ // Add a millisecond to lastDateModfiedDT so that this pid won't be harvested again (in the event
+ // that this is the last pid to be harvested in this round.
+ if (thisDateModifiedDT.isAfter(lastDateModifiedDT)) {
+ lastDateModifiedDT = thisDateModifiedDT.plusMillis(1) ;
+ log.debug("Updated lastDateMoidifed: " + lastDateModifiedDT.toString());
+ }
// }
}
}
@@ -390,6 +409,8 @@ public ListResult getPidsToProcess(MultipartCNode cnNode, MultipartMNode mnNode,
// Set the count for the total number of pids returned from DataONE (all formatIds) for this query
result.setTotalResultCount(objList.getCount());
result.setResult(pids);
+ // Return the sysmeta 'dateSystemMetadataModified' of the last pid harvested.
+ result.setLastDateModified(lastDateModifiedDT);
return result;
}
@@ -445,45 +466,19 @@ public void submitReportRequest(MultipartCNode cnNode, MultipartMNode mnNode, Bo
} else {
objectIS = mnNode.get(session, pid);
}
- log.debug("Retrieved metadata object for pid: " + pidStr);
+ log.trace("Retrieved metadata object for pid: " + pidStr);
} catch (NotAuthorized na) {
- log.error("Not authorized to read pid: " + pid + ", continuing with next pid...");
+ log.error("Not authorized to read pid: " + pid + ", unable to retrieve metadata, continuing with next pid...");
return;
- } catch (Exception e) {
- throw(e);
}
// quality suite service url, i.e. "http://docke-ucsb-1.dataone.org:30433/quality/suites/knb.suite.1/run
qualityServiceUrl = qualityServiceUrl + "/suites/" + suiteId + "/run";
HttpPost post = new HttpPost(qualityServiceUrl);
- try {
- // add document
- SimpleMultipartEntity entity = new SimpleMultipartEntity();
- entity.addFilePart("document", objectIS);
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- TypeMarshaller.marshalTypeToOutputStream(sysmeta, baos);
- entity.addFilePart("systemMetadata", new ByteArrayInputStream(baos.toByteArray()));
-
- // make sure we get XML back
- post.addHeader("Accept", "application/xml");
-
- // send to service
- log.trace("submitting: " + qualityServiceUrl);
- post.setEntity((HttpEntity) entity);
- CloseableHttpClient client = HttpClients.createDefault();
- CloseableHttpResponse response = client.execute(post);
-
- // retrieve results
- HttpEntity reponseEntity = response.getEntity();
- if (reponseEntity != null) {
- runResultIS = reponseEntity.getContent();
- }
- } catch (Exception e) {
- throw(e);
- }
- }
+ // add document
+ SimpleMultipartEntity entity = new SimpleMultipartEntity();
+ entity.addFilePart("document", objectIS);
private Boolean isCN(String serviceUrl) {
From 7e557579f60961b82224b27480bd97eb90593b88 Mon Sep 17 00:00:00 2001
From: gothub
Date: Wed, 19 Aug 2020 13:52:20 -0700
Subject: [PATCH 41/47] Detect D1 client connection type (CN or MN) (#265)
---
.../mdqengine/scheduler/RequestReportJob.java | 33 ++++++++++---------
1 file changed, 18 insertions(+), 15 deletions(-)
diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestReportJob.java b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestReportJob.java
index 43ebc9e0..3900ac12 100644
--- a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestReportJob.java
+++ b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestReportJob.java
@@ -173,7 +173,7 @@ public void execute(JobExecutionContext context)
Session session = DataONE.getSession(subjectId, authToken);
// Don't know node type yet from the id, so have to manually check if it's a CN
- Boolean isCN = isCN(nodeServiceUrl);
+ Boolean isCN = DataONE.isCN(nodeServiceUrl);
if(isCN) {
cnNode = new MultipartCNode(mrc, nodeServiceUrl, session);
} else {
@@ -418,6 +418,7 @@ public ListResult getPidsToProcess(MultipartCNode cnNode, MultipartMNode mnNode,
public boolean runExists(String pid, String suiteId, MDQStore store) throws MetadigStoreException {
boolean found = false;
+ Date runDateSystemMetadataModified = null;
if(!store.isAvailable()) {
try {
@@ -480,21 +481,23 @@ public void submitReportRequest(MultipartCNode cnNode, MultipartMNode mnNode, Bo
SimpleMultipartEntity entity = new SimpleMultipartEntity();
entity.addFilePart("document", objectIS);
- private Boolean isCN(String serviceUrl) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ TypeMarshaller.marshalTypeToOutputStream(sysmeta, baos);
+ entity.addFilePart("systemMetadata", new ByteArrayInputStream(baos.toByteArray()));
- Boolean isCN = false;
- // Identity node as either a CN or MN based on the serviceUrl
- String pattern = "https*://cn.*?\\.dataone\\.org|https*://cn.*?\\.test\\.dataone\\.org";
- Pattern r = Pattern.compile(pattern);
- Matcher m = r.matcher(serviceUrl);
- if (m.find()) {
- isCN = true;
- log.debug("service URL is for a CN: " + serviceUrl);
- } else {
- log.debug("service URL is not for a CN: " + serviceUrl);
- isCN = false;
- }
+ // make sure we get XML back
+ post.addHeader("Accept", "application/xml");
- return isCN;
+ // send to service
+ log.trace("submitting: " + qualityServiceUrl);
+ post.setEntity((HttpEntity) entity);
+ CloseableHttpClient client = HttpClients.createDefault();
+ CloseableHttpResponse response = client.execute(post);
+
+ // retrieve results
+ HttpEntity reponseEntity = response.getEntity();
+ if (reponseEntity != null) {
+ runResultIS = reponseEntity.getContent();
+ }
}
}
From 594f4b8709a276e55325a7b34c1a532d29e98fc6 Mon Sep 17 00:00:00 2001
From: gothub
Date: Wed, 19 Aug 2020 13:53:33 -0700
Subject: [PATCH 42/47] Reuse CN clients when possible (#264)
---
.../mdqengine/scheduler/RequestScorerJob.java | 46 ++++++-------------
1 file changed, 13 insertions(+), 33 deletions(-)
diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestScorerJob.java b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestScorerJob.java
index fe908c2d..7c099f31 100644
--- a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestScorerJob.java
+++ b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestScorerJob.java
@@ -103,9 +103,6 @@ public void execute(JobExecutionContext context)
throws JobExecutionException {
String qualityServiceUrl = null;
- String CNsubjectId = null;
- String CNauthToken = null;
- String CNserviceUrl = null;
MDQconfig cfg = null;
JobKey key = context.getJobDetail().getKey();
@@ -123,20 +120,17 @@ public void execute(JobExecutionContext context)
// Number of pids to get each query (this number of pids will be fetched each query until all pids are obtained)
int countRequested = dataMap.getInt("countRequested");
String requestType = null;
- if (taskType.equalsIgnoreCase("score")) {
- requestType = dataMap.getString("requestType");
- }
- // TODO: add formatFamily to scheduler request
String formatFamily = null;
- MultipartRestClient mrc = null;
- MultipartMNode mnNode = null;
- MultipartCNode cnNode = null;
-
+ MultipartD1Node d1Node = null;
String authToken = null;
String subjectId = null;
String nodeServiceUrl = null;
- log.info("Executing task: " + taskName + ", taskType: " + taskType);
+ if (taskType.equalsIgnoreCase("score")) {
+ requestType = dataMap.getString("requestType");
+ }
+
+ log.info("Executing task " + taskType + ", " + taskName + " for node: " + nodeId + ", suiteId: " + suiteId);
try {
cfg = new MDQconfig();
@@ -153,33 +147,19 @@ public void execute(JobExecutionContext context)
throw jee;
}
- try {
- mrc = new DefaultHttpMultipartRestClient();
- } catch (Exception e) {
- log.error("Error creating rest client: " + e.getMessage());
- JobExecutionException jee = new JobExecutionException(e);
- jee.setRefireImmediately(false);
- throw jee;
- }
-
Session session = DataONE.getSession(subjectId, authToken);
- // Don't know node type yet from the id, so have to manually check if it's a CN
- Boolean isCN = DataONE.isCN(nodeServiceUrl);
-
- MultipartD1Node d1Node = null;
- if(isCN) {
- //cnNode = new MultipartCNode(mrc, nodeServiceUrl, session);
- d1Node = new MultipartCNode(mrc, nodeServiceUrl, session);
- log.debug("Created cnNode for serviceUrl: " + nodeServiceUrl);
- } else {
- //mnNode = new MultipartMNode(mrc, nodeServiceUrl, session);
- d1Node = new MultipartMNode(mrc, nodeServiceUrl, session);
- log.debug("Created mnNode for serviceUrl: " + nodeServiceUrl);
+ // Get a connection to the DataONE node (CN or MN)
+ try {
+ d1Node = DataONE.getMultipartD1Node(session, nodeServiceUrl);
+ } catch (MetadigException mpe) {
+ mpe.printStackTrace();
+ throw new JobExecutionException(taskName + ": unable to create connection to service URL " + nodeServiceUrl , mpe);
}
MDQStore store = null;
+ // Get stored task info from the last task execution
try {
store = new DatabaseStore();
} catch (Exception e) {
From bc176ed3cb7933942cafd3c987d9b55fe2509b27 Mon Sep 17 00:00:00 2001
From: gothub
Date: Wed, 19 Aug 2020 13:55:51 -0700
Subject: [PATCH 43/47] Reuse CN clients when possible (#264)
---
.../ucsb/nceas/mdqengine/scorer/Scorer.java | 72 ++++++-------------
1 file changed, 20 insertions(+), 52 deletions(-)
diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/scorer/Scorer.java b/src/main/java/edu/ucsb/nceas/mdqengine/scorer/Scorer.java
index 23ea5697..fede5a0f 100644
--- a/src/main/java/edu/ucsb/nceas/mdqengine/scorer/Scorer.java
+++ b/src/main/java/edu/ucsb/nceas/mdqengine/scorer/Scorer.java
@@ -21,11 +21,9 @@
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.util.ClientUtils;
-import org.dataone.client.rest.DefaultHttpMultipartRestClient;
import org.dataone.client.rest.MultipartRestClient;
import org.dataone.client.v2.impl.MultipartCNode;
-import org.dataone.client.v2.impl.MultipartD1Node; // Don't include org.dataone.client.rest.MultipartD1Node (this is what IDEA selects)
-import org.dataone.client.v2.impl.MultipartMNode;
+import org.dataone.client.v2.impl.MultipartD1Node;
import org.dataone.service.types.v1.Session;
import org.dataone.service.types.v1.Subject;
import org.dataone.service.types.v1.Group;
@@ -148,9 +146,8 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
String nodeServiceUrl = null;
String label = null;
String title = null;
- MultipartRestClient mrc = null;
- MultipartMNode mnNode = null;
- MultipartCNode cnNode = null;
+ //MultipartRestClient mrc = null;
+ MultipartD1Node d1Node = null;
GraphType graphType = null;
//long startTime = System.nanoTime();
@@ -201,9 +198,6 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
// Pids associated with a collection, based on query results using 'collectionQuery' field in solr.
ArrayList collectionPids = null;
- // The harvesting and evaluation of the collectionQuery is based on the nodeId that is passed in, i.e.
- // If an MN is specified, then the collection (portal) Solr entry will be obtained from the MN, and the
- // collectionQuery string will also be evaluated on that node.
String nodeAbbr = nodeId.replace("urn:node:", "");
authToken = cfg.getString(nodeAbbr + ".authToken");
subjectId = cfg.getString(nodeAbbr + ".subjectId");
@@ -211,45 +205,20 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
nodeServiceUrl = cfg.getString(nodeAbbr + ".serviceUrl");
HashMap variables = new HashMap<>();
- // Create the graph.
- // Two types of graphs are currently supported:
- // - a graph for all pids included in a DataONE collection (portal), and a specified suite id
- // - a graph for specified filters: member node, suite id, metadata format
+
MetadigFile mdFile = new MetadigFile();
Graph graph = new Graph();
- // If creating a graph for a collection, get the set of pids associated with the collection.
- // Only scores for these pids will be included in the graph.
-
- try {
- mrc = new DefaultHttpMultipartRestClient();
- } catch (Exception e) {
- log.error("Error creating rest client: " + e.getMessage());
- JobExecutionException jee = new JobExecutionException(e);
- jee.setRefireImmediately(false);
- throw jee;
- }
-
Session session = DataONE.getSession(subjectId, authToken);
- // Don't know node type yet from the id, so have to manually check if it's a CN
- Boolean isCN = DataONE.isCN(nodeServiceUrl);
+ d1Node = DataONE.getMultipartD1Node(session, nodeServiceUrl);
- MultipartD1Node d1Node = null;
- if(isCN) {
- //cnNode = new MultipartCNode(mrc, nodeServiceUrl, session);
- d1Node = new MultipartCNode(mrc, nodeServiceUrl, session);
- log.debug("Created cnNode for serviceUrl: " + nodeServiceUrl);
- } else {
- //mnNode = new MultipartMNode(mrc, nodeServiceUrl, session);
- d1Node = new MultipartMNode(mrc, nodeServiceUrl, session);
- log.debug("Created mnNode for serviceUrl: " + nodeServiceUrl);
- }
-
- // Check if this is a "node" collection. For "node" collections, all scores for a member node
- // are used to create the assessment graph, so we don't need to get the collection pids as is
- // done for portals (by evaluating the Solr collectionQuery). Therefor, getCollectionPids doesn't
- // need to be called and we can proceed directly to getting the quality scores from the quality
- // Solr server.
+ // Quality scores must be retrieved from the quality Solr server from which a graph is created.
+ // There are two
+ // Check if this is a "node" collection. For "node" collections, all scores from the quality
+ // Solr server with 'datasource' = nodeId are used to create the assessment graph, so we don't need
+ // to get the collection pids. However, this is done for portals (by evaluating the DataONE Solr collectionQuery).
+ // Therefor, for a "node" collection, getCollectionPids doesn't need to be called and we can proceed directly
+ // to getting the quality scores from the quality Solr server.
if (collectionId.matches("^\\s*urn:node:.*")) {
graphType = GraphType.CUMULATIVE;
log.debug("Processing a member node request, skipping step of getting collection pids (not required).");
@@ -290,6 +259,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
log.info("# of quality scores returned: " + scores.size());
}
+ // Create the data file used by the graphing method
File scoreFile = gfr.createScoreFile(scores);
log.debug("Created score file: " + scoreFile.getPath());
@@ -304,13 +274,11 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
// Generate a temporary graph file based on the quality scores
log.debug("Creating graph for collection id: " + collectionId);
- //String filePath = graph.create(GraphType.CUMULATIVE, title, scoreFile.getPath());
String filePath = graph.create(graphType, title, scoreFile.getPath());
+
// Now save the graphics file to permanent storage
String outfile;
-
DateTime createDateTime = DateTime.now();
-
mdFile.setCreationDatetime(createDateTime);
mdFile.setPid(collectionId);
mdFile.setSuiteId(suiteId);
@@ -425,17 +393,17 @@ which will be used to query DataONE Solr for all the pids associated with that p
org.w3c.dom.Node node = null;
String label = null;
String rightsHolder = null;
- MultipartRestClient mrc = null;
- MultipartCNode CNnode = null;
+ //MultipartRestClient mrc = null;
+ MultipartCNode cnNode = null;
Session CNsession = null;
try {
CNsession = DataONE.getSession(CNsubjectId, CNauthToken);
- // // Only CNs can call the 'subjectInfo' service (aka accounts), so we have to use
+ // Only CNs can call the 'subjectInfo' service (aka accounts), so we have to use
// a MultipartCNode instance here.
try {
- CNnode = (MultipartCNode) DataONE.getMultipartD1Node(CNsession, CNserviceUrl);
+ cnNode = (MultipartCNode) DataONE.getMultipartD1Node(CNsession, CNserviceUrl);
} catch (Exception ex) {
metadigException = new MetadigProcessException("Unable to create multipart D1 node: " + ex.getMessage());
metadigException.initCause(ex);
@@ -523,7 +491,7 @@ which will be used to query DataONE Solr for all the pids associated with that p
subject.setValue(rightsHolder);
// The subject info can only be obtained from a CN, so use the CN auth info for the current DataONE environment,
// which should be configured in the metadig.properties file
- SubjectInfo subjectInfo = DataONE.getSubjectInfo(subject, CNnode, CNsession);
+ SubjectInfo subjectInfo = DataONE.getSubjectInfo(subject, cnNode, CNsession);
String groupStr = null;
groupStr = "(readPermission:" + "\"" + rightsHolder
@@ -584,7 +552,7 @@ which will be used to query DataONE Solr for all the pids associated with that p
do {
//TODO: check that a result was returned
// Note: the collectionQuery is always evaluated on the CN, so that the entire DataONE network is queried.
- xmldoc = DataONE.querySolr(queryStr, startPos, countRequested, CNnode, CNsession);
+ xmldoc = DataONE.querySolr(queryStr, startPos, countRequested, cnNode, CNsession);
if(xmldoc == null) {
log.info("no values returned from query");
break;
From d1f5a97d271ee2c0a666f69dfc8af81030679d78 Mon Sep 17 00:00:00 2001
From: gothub
Date: Thu, 20 Aug 2020 11:16:46 -0700
Subject: [PATCH 44/47] CN harvesting is missing some pids (#267)
---
.../mdqengine/scheduler/RequestReportJob.java | 29 ++--
.../mdqengine/scheduler/RequestScorerJob.java | 129 +++++++++++++-----
2 files changed, 107 insertions(+), 51 deletions(-)
diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestReportJob.java b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestReportJob.java
index 3900ac12..22540674 100644
--- a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestReportJob.java
+++ b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestReportJob.java
@@ -83,7 +83,6 @@ void setTotalResultCount(Integer count) {
}
void setFilteredResultCount(Integer count) { this.filteredResultCount = count; }
void setLastDateModified(DateTime date) {
- log.debug("Setter last modified date, date: " + date.toString());
this.lastDateModifiedDT = date;
}
@@ -91,11 +90,13 @@ void setLastDateModified(DateTime date) {
public Integer getFilteredResultCount() { return this.filteredResultCount; }
- public DateTime getLastDateModified() { return this.lastDateModifiedDT; }
+ public DateTime getLastDateModified() {
+ return this.lastDateModifiedDT;
+ }
}
// Since Quartz will re-instantiate a class every time it
- // gets executed, members non-static member variables can
+ // gets executed, non-static member variables can
// not be used to maintain state!
/**
@@ -236,17 +237,17 @@ public void execute(JobExecutionContext context)
startDT = new DateTime(lastHarvestDateDT);
}
- DateTime endDT = new DateTime(startDT);
- endDT = endDT.plusDays(harvestDatetimeInc);
- if(endDT.isAfter(currentDT.toInstant())) {
- endDT = currentDT;
- }
+// DateTime endDT = new DateTime(startDT);
+// endDT = endDT.plusDays(harvestDatetimeInc);
+// if(endDT.isAfter(currentDT.toInstant())) {
+// endDT = currentDT;
+// }
+ DateTime endDT = new DateTime(currentDT);
// If the start and end harvest dates are the same (happens for a new node), then
// tweak the start so that DataONE listObjects doesn't complain.
if(startDT == endDT ) {
startDT = startDT.minusMinutes(1);
- log.debug("Reset start back 1 minute to: " + startDT);
}
// Track the sysmeta dateUploaded of the latest harvested pid. This will become the starting time of
@@ -266,7 +267,7 @@ public void execute(JobExecutionContext context)
while(morePids) {
ArrayList pidsToProcess = null;
try {
- result = getPidsToProcess(cnNode, mnNode, isCN, session, suiteId, nodeId, pidFilter, startDTstr, endDTstr, startCount, countRequested, lastDateModifiedDT);
+ result = getPidsToProcess(cnNode, mnNode, isCN, session, suiteId, pidFilter, startDTstr, endDTstr, startCount, countRequested, lastDateModifiedDT);
pidsToProcess = result.getResult();
totalResultCount = result.getTotalResultCount();
filteredResultCount = result.getFilteredResultCount();
@@ -280,7 +281,7 @@ public void execute(JobExecutionContext context)
allPidsCnt = pidsToProcess.size();
for (String pidStr : pidsToProcess) {
try {
- log.debug("submitting pid: " + pidStr);
+ log.debug(taskName + ": submitting pid: " + pidStr);
submitReportRequest(cnNode, mnNode, isCN, session, qualityServiceUrl, pidStr, suiteId);
} catch (org.dataone.service.exceptions.NotFound nfe) {
log.error("Unable to process pid: " + pidStr + nfe.getMessage());
@@ -362,7 +363,7 @@ public ListResult getPidsToProcess(MultipartCNode cnNode, MultipartMNode mnNode,
String thisFormatId = null;
String thisPid = null;
int pidCount = 0;
- Date thisDateModified;
+ DateTime thisDateModifiedDT;
if (objList.getCount() > 0) {
for(ObjectInfo oi: objList.getObjectInfoList()) {
@@ -391,11 +392,11 @@ public ListResult getPidsToProcess(MultipartCNode cnNode, MultipartMNode mnNode,
log.trace("adding pid " + thisPid + ", formatId: " + thisFormatId);
// If this pid's modified date is after the stored latest encountered modified date, then update
// the lastModified date
- DateTime thisDateModifiedDT = new DateTime(oi.getDateSysMetadataModified());
+ thisDateModifiedDT = new DateTime(oi.getDateSysMetadataModified());
// Add a millisecond to lastDateModfiedDT so that this pid won't be harvested again (in the event
// that this is the last pid to be harvested in this round.
if (thisDateModifiedDT.isAfter(lastDateModifiedDT)) {
- lastDateModifiedDT = thisDateModifiedDT.plusMillis(1) ;
+ lastDateModifiedDT = thisDateModifiedDT.plusMillis(1);
log.debug("Updated lastDateMoidifed: " + lastDateModifiedDT.toString());
}
// }
diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestScorerJob.java b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestScorerJob.java
index 7c099f31..31dcea61 100644
--- a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestScorerJob.java
+++ b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestScorerJob.java
@@ -51,6 +51,10 @@ class ListResult {
Integer resultCount;
ArrayList result = new ArrayList<>();
+ // The scheduler keeps track of Solr 'dateModified' of the last pid harvested,
+ // which will be used as the starting time of the next harvest.
+ private DateTime lastDateModifiedDT = null;
+
void setResult(ArrayList result) {
this.result = result;
}
@@ -66,6 +70,12 @@ void setResultCount(Integer count) {
Integer getResultCount() {
return this.resultCount;
}
+
+ void setLastDateModified(DateTime date) {
+ this.lastDateModifiedDT = date;
+ }
+
+ public DateTime getLastDateModified() { return this.lastDateModifiedDT; }
}
// Since Quartz will re-instantiate a class every time it
@@ -180,10 +190,7 @@ public void execute(JobExecutionContext context)
// Get current datetime, which may be used for start time range.
DateTimeZone.setDefault(DateTimeZone.UTC);
DateTime currentDT = new DateTime(DateTimeZone.UTC);
- DateTimeFormatter dtfOut = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SS'Z'");
- String currentDatetimeStr = dtfOut.print(currentDT);
- DateTime startDateTimeRange = null;
- DateTime endDateTimeRange = null;
+ DateTimeFormatter dtfOut = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
String lastHarvestDateStr = null;
Task task;
@@ -202,41 +209,54 @@ public void execute(JobExecutionContext context)
lastHarvestDateStr = task.getLastHarvestDatetime();
}
- DateTime lastHarvestDate = new DateTime(lastHarvestDateStr);
+ DateTime lastHarvestDateDT = new DateTime(lastHarvestDateStr);
// Set the search start datetime to the last harvest datetime, unless it is in the
// future. (This can happen when the previous time range end was for the current day,
// as the end datetime range for the previous task run will have been stored as the
// new lastharvestDateTime.
- DateTime startDTR = null;
- if(lastHarvestDate.isAfter(currentDT.toInstant())) {
- startDTR = currentDT;
+ DateTime startDT = null;
+ if(lastHarvestDateDT.isAfter(currentDT.toInstant())) {
+ startDT = currentDT;
} else {
- startDTR = new DateTime(lastHarvestDate);
+ startDT = new DateTime(lastHarvestDateDT);
}
- DateTime endDTR = new DateTime(startDTR);
- endDTR = endDTR.plusDays(harvestDatetimeInc);
- if(endDTR.isAfter(currentDT.toInstant())) {
- endDTR = currentDT;
- }
+// DateTime endDT = new DateTime(startDT);
+// endDT = endDT.plusDays(harvestDatetimeInc);
+// if(endDT.isAfter(currentDT.toInstant())) {
+// endDT = currentDT;
+// }
+
+ DateTime endDT = new DateTime(currentDT);
// If the start and end harvest dates are the same (happends for a new node), then
// tweek the start so that DataONE listObjects doesn't complain.
- if(startDTR == endDTR ) {
- startDTR = startDTR.minusMinutes(1);
+ if(startDT == endDT ) {
+ startDT = startDT.minusMinutes(1);
}
- String startDTRstr = dtfOut.print(startDTR);
- String endDTRstr = dtfOut.print(endDTR);
+ // Track the sysmeta dateUploaded of the latest harvested pid. This will become the starting time of
+ // the next harvest.
+ DateTime lastDateModifiedDT = startDT;
+
+ String startDTstr = dtfOut.print(startDT);
+ String endDTstr = dtfOut.print(endDT);
int startCount = 0;
RequestScorerJob.ListResult result = null;
- Integer resultCount = null;
+ Integer resultCount = 0;
+ // Two types of score requests can be processed - a "node" request that will get score info for an
+ // entire repository (e.g. urn:node:ARCTIC) or a "portal" request that will get scores for a
+ // specific portal (from the Solr portal entry collectionQuery).
if(requestType != null && requestType.equalsIgnoreCase("node")) {
try {
// For a 'node' scores request, the 'collection' is the entire node, so specify
- // the nodeId as the collectionid.
+ // the nodeId as the collectionid. It is not necessary to retrieve a collectionQuery for this
+ // 'node' portal, as there is no Solr entry for this type collection. All quality scores available
+ // in the quality Solr server will be directly retrieved, filtering on the 'nodeId' (datasource)
+ log.info("TaskName: " + taskName + ", taskType: " + taskType + " submitting node request for nodeId: "
+ + nodeId + ", suiteId: " + suiteId + "formatFamily: " + formatFamily);
submitScorerRequest(qualityServiceUrl, nodeId, suiteId, nodeId, formatFamily);
} catch (Exception e) {
JobExecutionException jee = new JobExecutionException("Unable to submit request to create new node ("
@@ -248,22 +268,26 @@ public void execute(JobExecutionContext context)
Integer allIds = 0;
boolean morePids = true;
while (morePids) {
+ // Get a list of pids selected by a collection (portal) search filter (collectionQuery) and get
+ // the quality scores (from the quality Solr server) for that list of pids.
ArrayList pidsToProcess = null;
log.trace("Getting portal pids to process, startCount: " + startCount + ", countRequested: " + countRequested);
try {
- result = getPidsToProcess(d1Node, session, pidFilter, startDTRstr, endDTRstr, startCount, countRequested);
+ result = getPidsToProcess(d1Node, session, pidFilter, startDTstr, endDTstr, startCount, countRequested, lastDateModifiedDT);
pidsToProcess = result.getResult();
resultCount = result.getResultCount();
+ lastDateModifiedDT = result.getLastDateModified();
} catch (Exception e) {
JobExecutionException jee = new JobExecutionException("Unable to get pids to process", e);
jee.setRefireImmediately(false);
throw jee;
}
- log.trace(taskName + ": found " + resultCount + " seriesIds" + " for date: " + startDTRstr + " at servierUrl: " + nodeServiceUrl);
+ log.trace(taskName + ": found " + resultCount + " seriesIds" + " for date: " + startDTstr + " at servierUrl: " + nodeServiceUrl);
for (String pidStr : pidsToProcess) {
try {
+ log.debug(taskName + ": submitting seriesId: " + pidStr);
submitScorerRequest(qualityServiceUrl, pidStr, suiteId, nodeId, formatFamily);
} catch (Exception e) {
JobExecutionException jee = new JobExecutionException("Unable to submit request to create new score graph/data file", e);
@@ -274,6 +298,7 @@ public void execute(JobExecutionContext context)
// Check if DataONE returned the max number of results. If so, we have to request more by paging through
// the results.
+ allIds += pidsToProcess.size();
if (resultCount >= countRequested) {
morePids = true;
startCount = startCount + resultCount;
@@ -281,19 +306,23 @@ public void execute(JobExecutionContext context)
} else {
morePids = false;
- // Record the new "last harvested" date
- task.setLastHarvestDatetime(endDTRstr);
+ }
+ }
- try {
- store.saveTask(task);
- } catch (MetadigStoreException mse) {
- log.error("Error saving task: " + task.getTaskName());
- JobExecutionException jee = new JobExecutionException("Unable to save new harvest date", mse);
- jee.setRefireImmediately(false);
- throw jee;
- }
+ if (allIds > 0) {
+ // Record the new "last harvested" date
+ task.setLastHarvestDatetime(dtfOut.print(lastDateModifiedDT));
+ log.debug("Saving lastHarvestDate: " + dtfOut.print(lastDateModifiedDT));
+ try {
+ store.saveTask(task);
+ } catch (MetadigStoreException mse) {
+ log.error("Error saving task: " + task.getTaskName());
+ JobExecutionException jee = new JobExecutionException("Unable to save new harvest date", mse);
+ jee.setRefireImmediately(false);
+ throw jee;
}
}
+ log.info(taskName + ": found " + allIds + " seriesIds" + " for start: " + startDTstr + ", end: " + endDTstr + " at servierUrl: " + nodeServiceUrl);
}
store.shutdown();
}
@@ -322,14 +351,15 @@ public ListResult getPidsToProcess(MultipartD1Node d1Node, Session session,
org.w3c.dom.NodeList xpathResult = null;
XPathExpression fieldXpath = null;
+ XPathExpression dateModifiedXpath = null;
XPath xpath = null;
org.w3c.dom.Node node = null;
ArrayList pids = new ArrayList();
Document xmldoc = null;
- String queryStr = "?q=formatId:" + pidFilter + "+-obsoletedBy:*" + "+dateUploaded:[" + startHarvestDatetimeStr + "%20TO%20"
+ String queryStr = "?q=formatId:" + pidFilter + "+-obsoletedBy:*" + "+dateModified:[" + startHarvestDatetimeStr + "%20TO%20"
+ endHarvestDatetimeStr + "]"
- + "&fl=seriesId&q.op=AND";
+ + "&fl=seriesId,dateModified&q.op=AND";
log.trace("query: " + queryStr);
// Send the query to DataONE Solr to retrieve portal seriesIds for a given time frame
@@ -345,6 +375,7 @@ public ListResult getPidsToProcess(MultipartD1Node d1Node, Session session,
XPathFactory xPathfactory = XPathFactory.newInstance();
xpath = xPathfactory.newXPath();
fieldXpath = xpath.compile("//result/doc/str[@name='seriesId']/text()");
+ dateModifiedXpath = xpath.compile("//result/doc/date[@name='dateModified']/text()");
} catch (XPathExpressionException xpe) {
log.error("Error extracting id from solr result doc: " + xpe.getMessage());
metadigException = new MetadigProcessException("Unable to get collection pids: " + xpe.getMessage());
@@ -358,16 +389,13 @@ public ListResult getPidsToProcess(MultipartD1Node d1Node, Session session,
int startPos = startCount;
do {
- //xmldoc = DataONE.querySolr(queryStr, startPos, countRequested, cnNode, mnNode, isCN, session);
xmldoc = DataONE.querySolr(queryStr, startPos, countRequested, d1Node, session);
if(xmldoc == null) {
log.info("no values returned from query");
break;
}
try {
- log.debug("processing xpathresult...");
xpathResult = (org.w3c.dom.NodeList) fieldXpath.evaluate(xmldoc, XPathConstants.NODESET);
- log.debug("processed xpathResult");
} catch (XPathExpressionException xpe) {
log.error("Error extracting seriesId from solr result doc: " + xpe.getMessage());
metadigException = new MetadigProcessException("Unable to get collection pids: " + xpe.getMessage());
@@ -385,12 +413,39 @@ public ListResult getPidsToProcess(MultipartD1Node d1Node, Session session,
log.trace("adding pid: " + currentPid);
}
+ // Get dateModified for the returned seriesIds
+ try {
+ xpathResult = (org.w3c.dom.NodeList) dateModifiedXpath.evaluate(xmldoc, XPathConstants.NODESET);
+ } catch (XPathExpressionException xpe) {
+ log.error("Error extracting dateModified from solr result doc: " + xpe.getMessage());
+ metadigException = new MetadigProcessException("Unable to get collection pids: " + xpe.getMessage());
+ metadigException.initCause(xpe);
+ throw metadigException;
+ }
+
+ DateTime thisDateModified;
+ thisResultLength = xpathResult.getLength();
+ if(thisResultLength == 0) break;
+ for (int index = 0; index < xpathResult.getLength(); index++) {
+ node = xpathResult.item(index);
+ String dateStr = node.getTextContent();
+ log.debug("Checking date str: " + dateStr);
+ thisDateModified = DateTime.parse(dateStr,
+ DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"));
+ if(thisDateModified.isAfter(lastDateModifiedDT)) {
+ lastDateModifiedDT = thisDateModified.plusMillis(1);
+ log.debug("Updated lastDateModified to " + lastDateModifiedDT);
+ }
+ }
+
startPos += thisResultLength;
} while (thisResultLength > 0);
RequestScorerJob.ListResult result = new RequestScorerJob.ListResult();
result.setResultCount(pids.size());
result.setResult(pids);
+ // Return the sysmeta 'dateSystemMetadataModified' of the last pid harvested.
+ result.setLastDateModified(lastDateModifiedDT);
return result;
}
From 5bfd7a780b380b3feb6ddac493448e2f4aa64d29 Mon Sep 17 00:00:00 2001
From: gothub
Date: Thu, 20 Aug 2020 11:17:40 -0700
Subject: [PATCH 45/47] Improve javadocs; code cleanup
---
.../mdqengine/scheduler/JobScheduler.java | 8 ++-
.../mdqengine/scheduler/RequestReportJob.java | 57 +++++++++++++++++--
.../mdqengine/scheduler/RequestScorerJob.java | 35 ++++++++----
.../ucsb/nceas/mdqengine/scorer/Scorer.java | 14 ++---
4 files changed, 89 insertions(+), 25 deletions(-)
diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/JobScheduler.java b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/JobScheduler.java
index 3f9612a3..dd72f43b 100644
--- a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/JobScheduler.java
+++ b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/JobScheduler.java
@@ -241,12 +241,18 @@ public static void main(String[] argv) throws Exception {
public JobScheduler () {
}
+ /**
+ * Read a single parameter from the quality engine parameter file
+ * @param paramName the parameter to read from the config file
+ * @throws ConfigurationException if there is an exception while reading the config file
+ * @throws IOException if there is an exception while reading the config file
+ */
public String readConfig (String paramName) throws ConfigurationException, IOException {
String paramValue = null;
try {
MDQconfig cfg = new MDQconfig();
paramValue = cfg.getString(paramName);
- } catch (Exception e) {
+ } catch (ConfigurationException | IOException e) {
log.error("Could not read configuration for param: " + paramName + ": " + e.getMessage());
throw e;
}
diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestReportJob.java b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestReportJob.java
index 22540674..27a7458b 100644
--- a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestReportJob.java
+++ b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestReportJob.java
@@ -320,8 +320,25 @@ public void execute(JobExecutionContext context)
store.shutdown();
}
+ /**
+ * Query a DataONE CN or MN to obtain a list of persistent identifiers (pids) for metadata objects have been
+ * added to the system during a specific time period.
+ * @param cnNode a DataONE CN connection client object
+ * @param mnNode a DataONE MN connection client object
+ * @param isCN a logical indicating whether a CN of MN object is being used
+ * @param session a DataONE authentication session
+ * @param suiteId the quality suite to check (if this pids has already been processed)
+ * @param pidFilter the DataONE format identifies to filter for
+ * @param startHarvestDatetimeStr the starting date to harvest pids from
+ * @param endHarvestDatetimeStr the ending data to harvest pids from
+ * @param startCount the start count for paging results from DataONE, for large results
+ * @param countRequested the number of items to get from DataONE on each request
+ * @param lastDateModifiedDT the sysmeta 'dateSystemMetadataModified' value of the last harvested pid
+ * @throws Exception if there is an exception while executing the job.
+ * @return a ListResult object containing the matching pids
+ */
public ListResult getPidsToProcess(MultipartCNode cnNode, MultipartMNode mnNode, Boolean isCN, Session session,
- String suiteId, String nodeId, String pidFilter, String startHarvestDatetimeStr,
+ String suiteId, String pidFilter, String startHarvestDatetimeStr,
String endHarvestDatetimeStr, int startCount,
int countRequested, DateTime lastDateModifiedDT) throws Exception {
@@ -331,7 +348,6 @@ public ListResult getPidsToProcess(MultipartCNode cnNode, MultipartMNode mnNode,
ObjectFormatIdentifier formatId = null;
NodeReference nodeRef = null;
- //nodeRef.setValue(nodeId);
Identifier identifier = null;
Boolean replicaStatus = false;
@@ -356,7 +372,7 @@ public ListResult getPidsToProcess(MultipartCNode cnNode, MultipartMNode mnNode,
}
//log.info("Got " + objList.getCount() + " pids for format: " + formatId.getValue() + " pids.");
} catch (Exception e) {
- log.error("Error retrieving pids for node " + nodeId + ": " + e.getMessage());
+ log.error("Error retrieving pids: " + e.getMessage());
throw e;
}
@@ -416,7 +432,24 @@ public ListResult getPidsToProcess(MultipartCNode cnNode, MultipartMNode mnNode,
return result;
}
- public boolean runExists(String pid, String suiteId, MDQStore store) throws MetadigStoreException {
+
+ /**
+ * Check if the specified quality suite has already been run for a pid.
+ *
+ * An additional check is made to see if the system metadata in the
+ * run is older than the passed in date. Because the quality engine
+ * uses fields from sysmeta (obsoletes, obsoletedBy), a run may need
+ * to be performed on an existing run in order to update the sysmeta, as
+ * the system is stored in the run object, and this run object is
+ * parsed when the run is inserted into the Solr index.
+ *
+ * @param pid the pid to check
+ * @param suiteId the suite identifier to check (e.g. "FAIR-suite-0.3.1")
+ * @param store the DataStore object to send the check request to.
+ * @throws MetadigStoreException
+ *
+ */
+ public boolean runExists(String pid, String suiteId, MDQStore store, Date dateSystemMetadataModified) throws MetadigStoreException {
boolean found = false;
Date runDateSystemMetadataModified = null;
@@ -440,6 +473,22 @@ public boolean runExists(String pid, String suiteId, MDQStore store) throws Meta
return found;
}
+ /**
+ * Submit a request to the metadig controller to run a quality suite for the specified pid.
+ *
+ * The system metadata for a pid is also obtained and sent with the request
+ *
+ *
+ * @param cnNode a DataONE CN connection client object
+ * @param mnNode a DataONE MN connection client object
+ * @param isCN a logical indicating whether a CN of MN object
+ * @param session a DataONE authentication session
+ * @param qualityServiceUrl the URL of the MetaDIG quality service
+ * @param pidStr the pid to submit the request for
+ * @param suiteId the suite identifier to submit the request for
+ *
+ * @throws Exception
+ */
public void submitReportRequest(MultipartCNode cnNode, MultipartMNode mnNode, Boolean isCN, Session session, String qualityServiceUrl, String pidStr, String suiteId) throws Exception {
SystemMetadata sysmeta = null;
diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestScorerJob.java b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestScorerJob.java
index 31dcea61..1abb1dce 100644
--- a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestScorerJob.java
+++ b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestScorerJob.java
@@ -330,22 +330,21 @@ public void execute(JobExecutionContext context)
/**
* Query a DataONE CN or MN object store for a list of object that match the time range and formatId filters provided.
*
- * //@param cnNode
- * //@param mnNode
- * //@param isCN
- * @param session
- * @param pidFilter
- * @param startHarvestDatetimeStr
- * @param endHarvestDatetimeStr
- * @param startCount
- * @param countRequested
+ * @param d1Node a DataONE CN or MN connection client object
+ * @param session a DataONE authentication session
+ * @param pidFilter the DataONE format identifies to filter for
+ * @param startHarvestDatetimeStr the starting date to harvest pids from
+ * @param endHarvestDatetimeStr the ending data to harvest pids from
+ * @param startCount the start count for paging results from DataONE, for large results
+ * @param countRequested the number of items to get from DataONE on each request
+ * @param lastDateModifiedDT the sysmeta 'dateSystemMetadataModified' value of the last harvested pid
+ * @throws Exception if there is an exception while executing the job.
* @return a ListResult object containing the matching pids
* @throws Exception
*/
- //public ListResult getPidsToProcess(MultipartCNode cnNode, MultipartMNode mnNode, Boolean isCN, Session session,
public ListResult getPidsToProcess(MultipartD1Node d1Node, Session session,
String pidFilter, String startHarvestDatetimeStr, String endHarvestDatetimeStr,
- int startCount, int countRequested) throws Exception {
+ int startCount, int countRequested, DateTime lastDateModifiedDT) throws Exception {
MetadigProcessException metadigException = null;
@@ -450,6 +449,18 @@ public ListResult getPidsToProcess(MultipartD1Node d1Node, Session session,
return result;
}
+ /**
+ * Submit a requst to the metadig controller to get qualiry score info and create a graph for the specified collection.
+ *
+ * @param qualityServiceUrl
+ * @param collectionId
+ * @param suiteId
+ * @param nodeId
+ * @param formatFamily
+ *
+ * @throws Exception
+ *
+ */
public void submitScorerRequest(String qualityServiceUrl, String collectionId, String suiteId, String nodeId, String formatFamily) throws Exception {
InputStream runResultIS = null;
@@ -475,7 +486,7 @@ public void submitScorerRequest(String qualityServiceUrl, String collectionId, S
post.addHeader("Accept", "application/xml");
// send to service
- log.debug("submitting scores request : " + scorerServiceUrl);
+ log.trace("submitting scores request : " + scorerServiceUrl);
CloseableHttpClient client = HttpClients.createDefault();
CloseableHttpResponse response = client.execute(post);
diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/scorer/Scorer.java b/src/main/java/edu/ucsb/nceas/mdqengine/scorer/Scorer.java
index fede5a0f..df56654d 100644
--- a/src/main/java/edu/ucsb/nceas/mdqengine/scorer/Scorer.java
+++ b/src/main/java/edu/ucsb/nceas/mdqengine/scorer/Scorer.java
@@ -129,6 +129,7 @@ public static void main(String[] argv) throws Exception {
* A set of quality scores are retrieved from the Quality Solr Server and a quality graph and csv file are created from
* them. For DataONE collections, the 'collectionQuery' is retrieved from Solr to determine the set of pids to be
* included.
+ *
*
*/
final Consumer consumer = new DefaultConsumer(inProcessChannel) {
@@ -330,6 +331,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
}
};
+ // Initialize the RabbitMQ queue for scorer requests send by the controller
inProcessChannel.basicConsume(SCORER_QUEUE_NAME, false, consumer);
}
@@ -337,18 +339,16 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
* Retrieve pids associated with a DataONE collection.
*
* First the 'collectionQuery' field is retrieved from DataONE Solr for the collection
- * Next, a query is issued with the query from collectionQuery field, to retrieve all Solr docs for the collection ids./p>
+ *
Next, a query is issued with the query from the collectionQuery field, to retrieve all Solr docs for the collection ids./p>
*
*
Note that in the current design, the collection query is always obtained by querying the node specified in the taskList.csv file,
* which is usually an MN, but the collectionQuery is always evaluated on the CN
*
* @param collectionId a DataONE project id to fetch scores for, e.g. urn:uuid:f137095e-4266-4474-aa5f-1e1fcaa5e2dc
- * @param d1Node
- * @param session
+ * @param d1Node the DataONE connection object for a node
+ * @param session the DataONE authentication session
* @return a List of quality scores fetched from Solr
*/
- //private ScorerResult getCollectionPids(String collectionId, MultipartCNode cnNode, MultipartMNode mnNode,
- // Boolean isCN, Session session) throws MetadigProcessException {
private ScorerResult getCollectionPids(String collectionId, MultipartD1Node d1Node, Session session) throws MetadigProcessException {
Document xmldoc = null;
@@ -363,11 +363,9 @@ which will be used to query DataONE Solr for all the pids associated with that p
*/
ArrayList pids = new ArrayList<>();
queryStr = "?q=seriesId:" + escapeSpecialChars(collectionId) + "+-obsoletedBy:*" + "&fl=collectionQuery,label,rightsHolder&q.op=AND";
- //queryStr = "?q=seriesId:" + encodeValue(collectionId) + "+-obsoletedBy:*" + "&fl=collectionQuery,label,rightsHolder&q.op=AND";
- //queryStr = "?q=seriesId:" + collectionId + "+-obsoletedBy:*&fl=collectionQuery,label,rightsHolder&q.op=AND";
startPos = 0;
- // Just getting 1 row
+ // Just getting 1 row (for the collectionQuery field)
countRequested = 10;
// Get the collectionQuery from Solr
From bc9b37ad577e78479797b42837283c7df2634f86 Mon Sep 17 00:00:00 2001
From: gothub
Date: Wed, 2 Sep 2020 15:45:35 -0700
Subject: [PATCH 46/47] CN harvesting is missing some pids #267
---
.../edu/ucsb/nceas/mdqengine/model/Task.java | 16 +-
.../mdqengine/scheduler/JobScheduler.java | 17 ++
.../nceas/mdqengine/scheduler/NodeList.java | 168 +++++++++++
.../mdqengine/scheduler/RequestReportJob.java | 267 +++++++++++-------
.../mdqengine/scheduler/RequestScorerJob.java | 51 ++--
.../nceas/mdqengine/store/DatabaseStore.java | 252 ++++++++++++++++-
.../nceas/mdqengine/store/InMemoryStore.java | 15 +-
.../ucsb/nceas/mdqengine/store/MDQStore.java | 12 +-
.../ucsb/nceas/mdqengine/store/MNStore.java | 21 +-
src/main/resources/sql/quality-v2.3.0.sql | 26 +-
10 files changed, 682 insertions(+), 163 deletions(-)
create mode 100644 src/main/java/edu/ucsb/nceas/mdqengine/scheduler/NodeList.java
diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/model/Task.java b/src/main/java/edu/ucsb/nceas/mdqengine/model/Task.java
index 5e174d42..f2290b28 100644
--- a/src/main/java/edu/ucsb/nceas/mdqengine/model/Task.java
+++ b/src/main/java/edu/ucsb/nceas/mdqengine/model/Task.java
@@ -1,10 +1,12 @@
package edu.ucsb.nceas.mdqengine.model;
+import java.util.HashMap;
+
public class Task {
private String taskName;
private String taskType;
- private String lastHarvestDatetime;
+ private HashMap lastHarvestDatetimes = new HashMap<>();
public void setTaskName(String name) {
this.taskName = name;
@@ -18,10 +20,16 @@ public String getTaskName() {
public String getTaskType() { return taskType; }
- public void setLastHarvestDatetime(String lastHarvestDatetime) {
- this.lastHarvestDatetime = lastHarvestDatetime;
+ public void setLastHarvestDatetimes(HashMap lastHarvestDatetimes) {
+ this.lastHarvestDatetimes = lastHarvestDatetimes;
+ }
+
+ public void setLastHarvestDatetime(String lastHarvestDatetime, String nodeId) {
+ this.lastHarvestDatetimes.put(nodeId, lastHarvestDatetime);
}
- public String getLastHarvestDatetime() { return lastHarvestDatetime; }
+ public String getLastHarvestDatetime(String nodeId) {
+ return this.lastHarvestDatetimes.get(nodeId);
+ }
}
diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/JobScheduler.java b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/JobScheduler.java
index dd72f43b..c38e8d1f 100644
--- a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/JobScheduler.java
+++ b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/JobScheduler.java
@@ -180,6 +180,16 @@ public static void main(String[] argv) throws Exception {
log.debug("fileExcludeMatch: " + fileExcludeMatch);
logFile = splitted[++icnt].trim();
log.debug("log file: " + logFile);
+ } else if (taskType.equals("nodelist")) {
+ log.debug("Scheduling nodelist update from DataONE, task name: " + taskName + ", task group: " + taskGroup);
+ String[] splitted = Arrays.stream(params.split(";"))
+ .map(String::trim)
+ .toArray(String[]::new);
+
+ int icnt = -1;
+ log.debug("Split length: " + splitted.length);
+ nodeId = splitted[++icnt].trim();
+ log.debug("nodeId: " + nodeId);
}
try {
@@ -221,6 +231,13 @@ public static void main(String[] argv) throws Exception {
.usingJobData("fileExcludeMatch", fileExcludeMatch)
.usingJobData("logFile", logFile)
.build();
+ } else if (taskType.equalsIgnoreCase("nodelist")) {
+ job = newJob(NodeList.class)
+ .withIdentity(taskName, taskGroup)
+ .usingJobData("taskName", taskName)
+ .usingJobData("taskType", taskType)
+ .usingJobData("nodeId", nodeId)
+ .build();
}
CronTrigger trigger = newTrigger()
diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/NodeList.java b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/NodeList.java
new file mode 100644
index 00000000..5eecc2cd
--- /dev/null
+++ b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/NodeList.java
@@ -0,0 +1,168 @@
+package edu.ucsb.nceas.mdqengine.scheduler;
+
+import edu.ucsb.nceas.mdqengine.DataONE;
+import edu.ucsb.nceas.mdqengine.MDQconfig;
+import edu.ucsb.nceas.mdqengine.exception.MetadigStoreException;
+import edu.ucsb.nceas.mdqengine.store.DatabaseStore;
+import edu.ucsb.nceas.mdqengine.store.MDQStore;
+import org.apache.commons.configuration2.ex.ConfigurationException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.dataone.client.rest.HttpMultipartRestClient;
+import org.dataone.client.rest.MultipartRestClient;
+import org.dataone.client.v2.impl.MultipartCNode;
+import org.dataone.service.exceptions.NotImplemented;
+import org.dataone.service.exceptions.ServiceFailure;
+import org.dataone.service.types.v1.*;
+import org.dataone.service.types.v2.Node;
+import org.dataone.service.types.v2.Property;
+import org.quartz.*;
+
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.TimeZone;
+
+/**
+ *
+ * Run a MetaDIG Quality Engine Scheduler task, for example,
+ * query a member node for new pids and request that a quality
+ * report is created for each one.
+ *
+ *
+ * @author Peter Slaughter
+ */
+@PersistJobDataAfterExecution
+@DisallowConcurrentExecution
+public class NodeList implements Job {
+
+ private Log log = LogFactory.getLog(NodeList.class);
+
+ // Since Quartz will re-instantiate a class every time it
+ // gets executed, non-static member variables can
+ // not be used to maintain state!
+
+ /**
+ *
+ * Called by the {@link org.quartz.Scheduler}
when a
+ * {@link org.quartz.Trigger}
fires that is associated with
+ * the Job
.
+ *
+ *
+ * @throws JobExecutionException if there is an exception while executing the job.
+ */
+ public void execute(JobExecutionContext context)
+ throws JobExecutionException {
+
+ Log log = LogFactory.getLog(NodeList.class);
+ JobKey key = context.getJobDetail().getKey();
+ JobDataMap dataMap = context.getJobDetail().getJobDataMap();
+
+ String taskName = dataMap.getString("taskName");
+ String taskType = dataMap.getString("taskType");
+ String nodeId = dataMap.getString("nodeId");
+ MultipartRestClient mrc = null;
+ MultipartCNode cnNode = null;
+
+ String nodeServiceUrl = null;
+
+ try {
+ MDQconfig cfg = new MDQconfig();
+ String nodeAbbr = nodeId.replace("urn:node:", "");
+ // TODO: Cache the node values from the CN listNode service
+ nodeServiceUrl = cfg.getString(nodeAbbr + ".serviceUrl");
+ } catch (ConfigurationException | IOException ce) {
+ JobExecutionException jee = new JobExecutionException(taskName + ": error executing task.");
+ jee.initCause(ce);
+ throw jee;
+ }
+
+ log.debug("Executing task " + taskType + ", " + taskName + " for node: " + nodeId);
+
+ Session session = DataONE.getSession(null, null);
+
+ try {
+ mrc = new HttpMultipartRestClient();
+ } catch (Exception e) {
+ log.error(taskName + ": error creating rest client: " + e.getMessage());
+ JobExecutionException jee = new JobExecutionException(e);
+ jee.setRefireImmediately(false);
+ throw jee;
+ }
+
+ cnNode = new MultipartCNode(mrc, nodeServiceUrl, session);
+ org.dataone.service.types.v2.NodeList nodeList = null;
+
+ try {
+ nodeList = cnNode.listNodes();
+ } catch (NotImplemented | ServiceFailure e) {
+ e.printStackTrace();
+ throw new JobExecutionException(taskName + ": cannot renew store, unable to schedule job", e);
+ }
+
+ // Get a connection to the database
+ MDQStore store = null;
+
+ try {
+ store = new DatabaseStore();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new JobExecutionException(taskName + ": cannot create store, unable to schedule job", e);
+ }
+
+ if (!store.isAvailable()) {
+ try {
+ store.renew();
+ } catch (MetadigStoreException e) {
+ e.printStackTrace();
+ throw new JobExecutionException(taskName + ": cannot renew store, unable to schedule job", e);
+ }
+ }
+
+ Property property = null;
+ ArrayList plist = null;
+ for (Node node : nodeList.getNodeList()) {
+ log.debug("node: " + node.getName());
+ log.debug("type: " + node.getType().toString());
+ log.debug("id: " + node.getIdentifier().getValue());
+ log.debug("state: " + node.getState().toString());
+ log.debug("is synchonized: " + node.isSynchronize());
+
+ if (! node.isSynchronize()) {
+ log.debug(taskName + ": Skipping unsynchronized node " + node.getIdentifier().getValue());
+ continue;
+ } else if (node.getType().toString().equalsIgnoreCase("MN")) {
+ log.debug(taskName + ": saving node " + node.getIdentifier().getValue());
+ try {
+ store.saveNode(node);
+ } catch (MetadigStoreException mse) {
+ mse.printStackTrace();
+ throw new JobExecutionException("Cannot save node " + node.getIdentifier().getValue() + " to store", mse);
+ }
+ } else {
+ log.debug(taskName + ": skipping CN node: " + node.getIdentifier().getValue());
+ }
+ }
+
+ // For debugging purposes: retrieve and print out all node entries if trace logging is enabled.
+ if (log.isTraceEnabled()) {
+ log.trace("Retrieving and printing out all saved node harvest dates...");
+
+ ArrayList nodes = store.getNodes();
+ for (Node node : nodes) {
+ log.trace("identifier: " + node.getIdentifier().getValue());
+
+ DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
+ dateFormat.setTimeZone(TimeZone.getTimeZone("GMT"));
+ String lastHarvestDatetimeStr = dateFormat.format(node.getSynchronization().getLastHarvested());
+
+ log.trace("harvest: " + lastHarvestDatetimeStr);
+ log.trace("synchronize: " + node.isSynchronize());
+ log.trace("state: " + node.getState().toString());
+ log.trace("baseURL: " + node.getBaseURL());
+ }
+ }
+ }
+}
+
diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestReportJob.java b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestReportJob.java
index 27a7458b..acbecf1c 100644
--- a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestReportJob.java
+++ b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestReportJob.java
@@ -19,6 +19,7 @@
import org.dataone.client.rest.MultipartRestClient;
import org.dataone.client.v2.impl.MultipartCNode;
import org.dataone.client.v2.impl.MultipartMNode;
+import org.dataone.service.types.v2.Node;
import org.dataone.mimemultipart.SimpleMultipartEntity;
import org.dataone.service.exceptions.NotAuthorized;
import org.dataone.service.types.v1.*;
@@ -155,17 +156,17 @@ public void execute(JobExecutionContext context)
// TODO: Cache the node values from the CN listNode service
nodeServiceUrl = cfg.getString(nodeAbbr + ".serviceUrl");
} catch (ConfigurationException | IOException ce) {
- JobExecutionException jee = new JobExecutionException("Error executing task.");
+ JobExecutionException jee = new JobExecutionException(taskName + ": error executing task.");
jee.initCause(ce);
throw jee;
}
- log.info("Executing task " + taskType + ", " + taskName + " for node: " + nodeId + ", suiteId: " + suiteId);
+ log.debug("Executing task " + taskType + ", " + taskName + " for node: " + nodeId + ", suiteId: " + suiteId);
try {
mrc = new HttpMultipartRestClient();
} catch (Exception e) {
- log.error("Error creating rest client: " + e.getMessage());
+ log.error(taskName + ": error creating rest client: " + e.getMessage());
JobExecutionException jee = new JobExecutionException(e);
jee.setRefireImmediately(false);
throw jee;
@@ -200,123 +201,170 @@ public void execute(JobExecutionContext context)
}
}
- // Set UTC as the default time zone for all DateTime operations.
- // Get current datetime, which may be used for start time range.
- DateTimeZone.setDefault(DateTimeZone.UTC);
- DateTime currentDT = new DateTime(DateTimeZone.UTC);
- DateTimeFormatter dtfOut = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
- String currentDatetimeStr = dtfOut.print(currentDT);
- DateTime startDateTimeRange = null;
- DateTime endDateTimeRange = null;
- String lastHarvestDateStr = null;
-
- Task task;
- task = store.getTask(taskName, taskType);
- // If a 'task' entry has not been saved for this task name yet, then a 'lastHarvested'
- // DataTime will not be available, in which case the 'startHarvestDataTime' from the
- // config file will be used.
- if(task.getLastHarvestDatetime() == null) {
- task = new Task();
- task.setTaskName(taskName);
- task.setTaskType(taskType);
- lastHarvestDateStr = startHarvestDatetimeStr;
- task.setLastHarvestDatetime(lastHarvestDateStr);
- } else {
- lastHarvestDateStr = task.getLastHarvestDatetime();
- }
+ ArrayList nodes = new ArrayList<>();
- DateTime lastHarvestDateDT = new DateTime(lastHarvestDateStr);
- // Set the search start datetime to the last harvest datetime, unless it is in the
- // future. (This can happen when the previous time range end was for the current day,
- // as the end datetime range for the previous task run will have been stored as the
- // new lastharvestDateTime.
- DateTime startDT = null;
- if(lastHarvestDateDT.isAfter(currentDT.toInstant())) {
- startDT = currentDT;
+ if (isCN) {
+ nodes = store.getNodes();
} else {
- startDT = new DateTime(lastHarvestDateDT);
+ Node node = store.getNode(nodeId);
+ if (node.getIdentifier().getValue() == null) {
+ String msg = ("Node entry not found for node: " + nodeId);
+ log.error(msg);
+ JobExecutionException jee = new JobExecutionException(msg);
+ jee.setRefireImmediately(false);
+ throw jee;
+ } else {
+ log.trace("Got node " + node.getIdentifier().getValue());
+ nodes.add(node);
+ }
}
-// DateTime endDT = new DateTime(startDT);
-// endDT = endDT.plusDays(harvestDatetimeInc);
-// if(endDT.isAfter(currentDT.toInstant())) {
-// endDT = currentDT;
-// }
- DateTime endDT = new DateTime(currentDT);
-
- // If the start and end harvest dates are the same (happens for a new node), then
- // tweak the start so that DataONE listObjects doesn't complain.
- if(startDT == endDT ) {
- startDT = startDT.minusMinutes(1);
- }
+ String harvestNodeId = null;
+ for (Node node : nodes) {
- // Track the sysmeta dateUploaded of the latest harvested pid. This will become the starting time of
- // the next harvest.
- DateTime lastDateModifiedDT = startDT;
+ harvestNodeId = node.getIdentifier().getValue();
+ // If processing a CN, check each MN to see if it is being synchronized and if it
+ // is up.
+ if (isCN) {
- String startDTstr = dtfOut.print(startDT);
- String endDTstr = dtfOut.print(endDT);
+ // The NodeList task doesn't save CN entries from the DataONE 'listNodes()' service, but check
+ // just in case.
+ if (node.getType().equals(NodeType.CN)) {
+ log.debug("Harvesting from CN, skipping CN entry from node list for " + node.getIdentifier().getValue());
+ continue;
+ }
- Integer startCount = new Integer(0);
- ListResult result = null;
- Integer totalResultCount = 0;
- Integer filteredResultCount = 0;
- Integer allPidsCnt = 0;
+ if (! node.isSynchronize() || ! node.getState().equals(NodeState.UP)) {
+ log.trace("Skipping disabled node: " + node.getIdentifier().getValue() + ", sync: " + node.isSynchronize()
+ + ", status: " + node.getState().toString());
+ continue;
+ }
- boolean morePids = true;
- while(morePids) {
- ArrayList pidsToProcess = null;
- try {
- result = getPidsToProcess(cnNode, mnNode, isCN, session, suiteId, pidFilter, startDTstr, endDTstr, startCount, countRequested, lastDateModifiedDT);
- pidsToProcess = result.getResult();
- totalResultCount = result.getTotalResultCount();
- filteredResultCount = result.getFilteredResultCount();
- lastDateModifiedDT = result.getLastDateModified();
- } catch (Exception e) {
- JobExecutionException jee = new JobExecutionException("Unable to get pids to process", e);
- jee.setRefireImmediately(false);
- throw jee;
- }
+ DateTime mnLastHarvestDT = new DateTime(node.getSynchronization().getLastHarvested(), DateTimeZone.UTC);
+ DateTime oneMonthAgoDT = new DateTime(DateTimeZone.UTC).minusMonths(1);
- allPidsCnt = pidsToProcess.size();
- for (String pidStr : pidsToProcess) {
- try {
- log.debug(taskName + ": submitting pid: " + pidStr);
- submitReportRequest(cnNode, mnNode, isCN, session, qualityServiceUrl, pidStr, suiteId);
- } catch (org.dataone.service.exceptions.NotFound nfe) {
- log.error("Unable to process pid: " + pidStr + nfe.getMessage());
- continue;
- } catch (Exception e) {
- log.error("Unable to process pid: " + pidStr + " - " + e.getMessage());
+ if (mnLastHarvestDT.isBefore(oneMonthAgoDT.toInstant())) {
+ DateTimeZone.setDefault(DateTimeZone.UTC);
+ DateTimeFormatter dtfOut = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
+ log.trace("Skipping node " + node.getIdentifier().getValue() + " that hasn't been sync'd since " + dtfOut.print(mnLastHarvestDT));
continue;
}
}
- // Check if DataONE returned the max number of results. If so, we have to request more by paging through
- // the results returned pidsToProcess (i.e. DataONE listObjects service). If the returned result is
- // less than the requested result, then all pids have been retrieved.
- if(totalResultCount >= countRequested) {
- morePids = true;
- startCount = startCount + totalResultCount;
- log.trace("Paging through more results, current start is " + startCount);
+ log.trace("Harvesting node: " + node.getIdentifier().getValue());
+
+ // Set UTC as the default time zone for all DateTime operations.
+ // Get current datetime, which may be used for start time range.
+ DateTimeZone.setDefault(DateTimeZone.UTC);
+ DateTime currentDT = new DateTime(DateTimeZone.UTC);
+ DateTimeFormatter dtfOut = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
+ String lastHarvestDateStr = null;
+
+ Task task;
+ task = store.getTask(taskName, taskType, harvestNodeId);
+ // If a 'task' entry has not been saved for this task name yet, then a 'lastHarvested'
+ // DataTime will not be available, in which case the 'startHarvestDataTime' from the
+ // config file will be used.
+ if (task.getLastHarvestDatetime(harvestNodeId) == null) {
+ task.setTaskName(taskName);
+ task.setTaskType(taskType);
+ lastHarvestDateStr = startHarvestDatetimeStr;
+ task.setLastHarvestDatetime(lastHarvestDateStr, harvestNodeId);
} else {
- morePids = false;
+ lastHarvestDateStr = task.getLastHarvestDatetime(harvestNodeId);
}
- }
- // Don't update the lastHarvestDateDT if no pids were found.
- if (allPidsCnt > 0) {
- task.setLastHarvestDatetime(dtfOut.print(lastDateModifiedDT));
- log.debug("Saving lastHarvestDate: " + dtfOut.print(lastDateModifiedDT));
- try {
- store.saveTask(task);
- } catch (MetadigStoreException mse) {
- log.error("Error saving task: " + task.getTaskName());
- JobExecutionException jee = new JobExecutionException("Unable to save new harvest date", mse);
- jee.setRefireImmediately(false);
- throw jee;
+
+ DateTime lastHarvestDateDT = new DateTime(lastHarvestDateStr);
+ // Set the search start datetime to the last harvest datetime, unless it is in the
+ // future. (This can happen when the previous time range end was for the current day,
+ // as the end datetime range for the previous task run will have been stored as the
+ // new lastharvestDateTime.
+ DateTime startDT = null;
+ if (lastHarvestDateDT.isAfter(currentDT.toInstant())) {
+ startDT = currentDT;
+ } else {
+ startDT = new DateTime(lastHarvestDateDT);
+ }
+
+ DateTime endDT = new DateTime(currentDT);
+
+ // If the start and end harvest dates are the same (happens for a new node), then
+ // tweak the start so that DataONE listObjects doesn't complain.
+ if (startDT == endDT) {
+ startDT = startDT.minusMinutes(1);
+ }
+
+ // Track the sysmeta dateUploaded of the latest harvested pid. This will become the starting time of
+ // the next harvest.
+ DateTime lastDateModifiedDT = startDT;
+
+ String startDTstr = dtfOut.print(startDT);
+ String endDTstr = dtfOut.print(endDT);
+
+ log.trace("start time: " + startDTstr);
+
+ Integer startCount = new Integer(0);
+ ListResult result = null;
+ Integer totalResultCount = 0;
+ Integer filteredResultCount = 0;
+ Integer allPidsCnt = 0;
+
+ log.trace("Getting pids for nodeId: " + harvestNodeId);
+ boolean morePids = true;
+ while (morePids) {
+ ArrayList pidsToProcess = null;
+ try {
+ result = getPidsToProcess(cnNode, mnNode, isCN, session, suiteId, pidFilter, startDTstr, endDTstr, startCount, countRequested, lastDateModifiedDT, harvestNodeId, taskName);
+ pidsToProcess = result.getResult();
+ totalResultCount = result.getTotalResultCount();
+ filteredResultCount = result.getFilteredResultCount();
+ lastDateModifiedDT = result.getLastDateModified();
+ } catch (Exception e) {
+ JobExecutionException jee = new JobExecutionException("Unable to get pids to process", e);
+ jee.setRefireImmediately(false);
+ throw jee;
+ }
+
+ allPidsCnt = pidsToProcess.size();
+ for (String pidStr : pidsToProcess) {
+ try {
+ log.debug(taskName + ": submitting pid: " + pidStr);
+ submitReportRequest(cnNode, mnNode, isCN, session, qualityServiceUrl, pidStr, suiteId);
+ } catch (org.dataone.service.exceptions.NotFound nfe) {
+ log.error("Unable to process pid: " + pidStr + nfe.getMessage());
+ continue;
+ } catch (Exception e) {
+ log.error("Unable to process pid: " + pidStr + " - " + e.getMessage());
+ continue;
+ }
+ }
+
+ // Check if DataONE returned the max number of results. If so, we have to request more by paging through
+ // the results returned pidsToProcess (i.e. DataONE listObjects service). If the returned result is
+ // less than the requested result, then all pids have been retrieved.
+ if (totalResultCount >= countRequested) {
+ morePids = true;
+ startCount = startCount + totalResultCount;
+ log.trace("Paging through more results, current start is " + startCount);
+ } else {
+ morePids = false;
+ }
+ }
+ // Don't update the lastHarvestDateDT if no pids were found.
+ if (allPidsCnt > 0) {
+ task.setLastHarvestDatetime(dtfOut.print(lastDateModifiedDT), harvestNodeId);
+ log.trace("Saving lastHarvestDate: " + dtfOut.print(lastDateModifiedDT) + " for node: " + harvestNodeId);
+ try {
+ store.saveTask(task, harvestNodeId);
+ } catch (MetadigStoreException mse) {
+ log.error("Error saving task: " + task.getTaskName());
+ JobExecutionException jee = new JobExecutionException("Unable to save new harvest date", mse);
+ jee.setRefireImmediately(false);
+ throw jee;
+ }
+ log.info(taskName + ": found " + allPidsCnt + " pids for nodeId: " + harvestNodeId + ", start: " + startDTstr + ", end: " + endDTstr + ", servierUrl: " + nodeServiceUrl);
}
}
- log.info(taskName + ": Found " + allPidsCnt + " pids for start: " + startDTstr + ", end: " + endDTstr + " at servierUrl: " + nodeServiceUrl);
store.shutdown();
}
@@ -334,13 +382,14 @@ public void execute(JobExecutionContext context)
* @param startCount the start count for paging results from DataONE, for large results
* @param countRequested the number of items to get from DataONE on each request
* @param lastDateModifiedDT the sysmeta 'dateSystemMetadataModified' value of the last harvested pid
+ * @param nodeIdFilter filter results for this nodeId (applies only to CN)
* @throws Exception if there is an exception while executing the job.
* @return a ListResult object containing the matching pids
*/
public ListResult getPidsToProcess(MultipartCNode cnNode, MultipartMNode mnNode, Boolean isCN, Session session,
String suiteId, String pidFilter, String startHarvestDatetimeStr,
String endHarvestDatetimeStr, int startCount,
- int countRequested, DateTime lastDateModifiedDT) throws Exception {
+ int countRequested, DateTime lastDateModifiedDT, String nodeIdFilter, String taskName) throws Exception {
ArrayList pids = new ArrayList();
InputStream qis = null;
@@ -364,15 +413,19 @@ public ListResult getPidsToProcess(MultipartCNode cnNode, MultipartMNode mnNode,
try {
// Even though MultipartMNode and MultipartCNode have the same parent class D1Node, the interface for D1Node doesn't
- // include listObjects (it should), so we have to maintain a cnNode and mnNode.
+ // include listObjects, as the parameters differ from CN to MN, so we have to use a different object for each.
if(isCN) {
+ log.trace("Getting pids for cn, for nodeid: " + nodeIdFilter);
+ nodeRef = new NodeReference();
+ nodeRef.setValue(nodeIdFilter);
objList = cnNode.listObjects(session, startDate, endDate, formatId, nodeRef, identifier, startCount, countRequested);
} else {
+ log.trace("Getting pids for mn");
objList = mnNode.listObjects(session, startDate, endDate, formatId, identifier, replicaStatus, startCount, countRequested);
}
//log.info("Got " + objList.getCount() + " pids for format: " + formatId.getValue() + " pids.");
} catch (Exception e) {
- log.error("Error retrieving pids: " + e.getMessage());
+ log.error(taskName + ": error retrieving pids: " + e.getMessage());
throw e;
}
@@ -413,7 +466,7 @@ public ListResult getPidsToProcess(MultipartCNode cnNode, MultipartMNode mnNode,
// that this is the last pid to be harvested in this round.
if (thisDateModifiedDT.isAfter(lastDateModifiedDT)) {
lastDateModifiedDT = thisDateModifiedDT.plusMillis(1);
- log.debug("Updated lastDateMoidifed: " + lastDateModifiedDT.toString());
+ log.debug("New value for lastDateMoidifed: " + lastDateModifiedDT.toString());
}
// }
}
diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestScorerJob.java b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestScorerJob.java
index 1abb1dce..b98fbd4c 100644
--- a/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestScorerJob.java
+++ b/src/main/java/edu/ucsb/nceas/mdqengine/scheduler/RequestScorerJob.java
@@ -140,7 +140,7 @@ public void execute(JobExecutionContext context)
requestType = dataMap.getString("requestType");
}
- log.info("Executing task " + taskType + ", " + taskName + " for node: " + nodeId + ", suiteId: " + suiteId);
+ log.debug("Executing task " + taskType + ", " + taskName + " for node: " + nodeId + ", suiteId: " + suiteId);
try {
cfg = new MDQconfig();
@@ -152,11 +152,16 @@ public void execute(JobExecutionContext context)
nodeServiceUrl = cfg.getString(nodeAbbr + ".serviceUrl");
log.trace("nodeServiceUrl: " + nodeServiceUrl);
} catch (ConfigurationException | IOException ce) {
- JobExecutionException jee = new JobExecutionException("Error executing task.");
+ JobExecutionException jee = new JobExecutionException(taskName + ": Error executing task: " + ce.getMessage());
jee.initCause(ce);
throw jee;
}
+ if(nodeServiceUrl == null) {
+ String msg = taskName + "Unable to read serviceUrl from config file for: " + nodeId;
+ throw new JobExecutionException(msg);
+ }
+
Session session = DataONE.getSession(subjectId, authToken);
// Get a connection to the DataONE node (CN or MN)
@@ -194,19 +199,19 @@ public void execute(JobExecutionContext context)
String lastHarvestDateStr = null;
Task task;
- task = store.getTask(taskName, taskType);
+ task = store.getTask(taskName, taskType, nodeId);
// If a 'task' entry has not been saved for this task name yet, then a 'lastHarvested'
// DataTime will not be available, in which case the 'startHarvestDataTime' from the
// config file will be used.
- if(task.getLastHarvestDatetime() == null) {
+ if(task.getLastHarvestDatetime(nodeId) == null) {
task = new Task();
task.setTaskName(taskName);
task.setTaskType(taskType);
lastHarvestDateStr = startHarvestDatetimeStr;
- task.setLastHarvestDatetime(lastHarvestDateStr);
+ task.setLastHarvestDatetime(lastHarvestDateStr, nodeId);
} else {
- lastHarvestDateStr = task.getLastHarvestDatetime();
+ lastHarvestDateStr = task.getLastHarvestDatetime(nodeId);
}
DateTime lastHarvestDateDT = new DateTime(lastHarvestDateStr);
@@ -221,12 +226,6 @@ public void execute(JobExecutionContext context)
startDT = new DateTime(lastHarvestDateDT);
}
-// DateTime endDT = new DateTime(startDT);
-// endDT = endDT.plusDays(harvestDatetimeInc);
-// if(endDT.isAfter(currentDT.toInstant())) {
-// endDT = currentDT;
-// }
-
DateTime endDT = new DateTime(currentDT);
// If the start and end harvest dates are the same (happends for a new node), then
@@ -274,7 +273,7 @@ public void execute(JobExecutionContext context)
log.trace("Getting portal pids to process, startCount: " + startCount + ", countRequested: " + countRequested);
try {
- result = getPidsToProcess(d1Node, session, pidFilter, startDTstr, endDTstr, startCount, countRequested, lastDateModifiedDT);
+ result = getPidsToProcess(d1Node, session, pidFilter, startDTstr, endDTstr, startCount, countRequested, lastDateModifiedDT, taskName);
pidsToProcess = result.getResult();
resultCount = result.getResultCount();
lastDateModifiedDT = result.getLastDateModified();
@@ -311,18 +310,18 @@ public void execute(JobExecutionContext context)
if (allIds > 0) {
// Record the new "last harvested" date
- task.setLastHarvestDatetime(dtfOut.print(lastDateModifiedDT));
+ task.setLastHarvestDatetime(dtfOut.print(lastDateModifiedDT), nodeId);
log.debug("Saving lastHarvestDate: " + dtfOut.print(lastDateModifiedDT));
try {
- store.saveTask(task);
+ store.saveTask(task, nodeId);
} catch (MetadigStoreException mse) {
log.error("Error saving task: " + task.getTaskName());
JobExecutionException jee = new JobExecutionException("Unable to save new harvest date", mse);
jee.setRefireImmediately(false);
throw jee;
}
+ log.info(taskName + ": found " + allIds + " seriesIds" + " for start: " + startDTstr + ", end: " + endDTstr + " at servierUrl: " + nodeServiceUrl);
}
- log.info(taskName + ": found " + allIds + " seriesIds" + " for start: " + startDTstr + ", end: " + endDTstr + " at servierUrl: " + nodeServiceUrl);
}
store.shutdown();
}
@@ -344,7 +343,7 @@ public void execute(JobExecutionContext context)
*/
public ListResult getPidsToProcess(MultipartD1Node d1Node, Session session,
String pidFilter, String startHarvestDatetimeStr, String endHarvestDatetimeStr,
- int startCount, int countRequested, DateTime lastDateModifiedDT) throws Exception {
+ int startCount, int countRequested, DateTime lastDateModifiedDT, String taskName) throws Exception {
MetadigProcessException metadigException = null;
@@ -376,7 +375,7 @@ public ListResult getPidsToProcess(MultipartD1Node d1Node, Session session,
fieldXpath = xpath.compile("//result/doc/str[@name='seriesId']/text()");
dateModifiedXpath = xpath.compile("//result/doc/date[@name='dateModified']/text()");
} catch (XPathExpressionException xpe) {
- log.error("Error extracting id from solr result doc: " + xpe.getMessage());
+ log.error(taskName + ": error extracting id from solr result doc: " + xpe.getMessage());
metadigException = new MetadigProcessException("Unable to get collection pids: " + xpe.getMessage());
metadigException.initCause(xpe);
throw metadigException;
@@ -396,7 +395,7 @@ public ListResult getPidsToProcess(MultipartD1Node d1Node, Session session,
try {
xpathResult = (org.w3c.dom.NodeList) fieldXpath.evaluate(xmldoc, XPathConstants.NODESET);
} catch (XPathExpressionException xpe) {
- log.error("Error extracting seriesId from solr result doc: " + xpe.getMessage());
+ log.error(taskName + ": error extracting seriesId from solr result doc: " + xpe.getMessage());
metadigException = new MetadigProcessException("Unable to get collection pids: " + xpe.getMessage());
metadigException.initCause(xpe);
throw metadigException;
@@ -416,7 +415,7 @@ public ListResult getPidsToProcess(MultipartD1Node d1Node, Session session,
try {
xpathResult = (org.w3c.dom.NodeList) dateModifiedXpath.evaluate(xmldoc, XPathConstants.NODESET);
} catch (XPathExpressionException xpe) {
- log.error("Error extracting dateModified from solr result doc: " + xpe.getMessage());
+ log.error(taskName + ": error extracting dateModified from solr result doc: " + xpe.getMessage());
metadigException = new MetadigProcessException("Unable to get collection pids: " + xpe.getMessage());
metadigException.initCause(xpe);
throw metadigException;
@@ -450,13 +449,13 @@ public ListResult getPidsToProcess(MultipartD1Node d1Node, Session session,
}
/**
- * Submit a requst to the metadig controller to get qualiry score info and create a graph for the specified collection.
+ * Submit a requst to the metadig controller to get quality score info and create a graph for the specified collection.
*
- * @param qualityServiceUrl
- * @param collectionId
- * @param suiteId
- * @param nodeId
- * @param formatFamily
+ * @param qualityServiceUrl the URL of the MetaDIG quality service
+ * @param collectionId the DataONE collection (portal) seriesId
+ * @param suiteId the quality suite to run for the collection
+ * @param nodeId the DataONE node identifier that the collection is hosted on
+ * @param formatFamily the format identifier family (e.g. "eml" for all EML format identifier versions)
*
* @throws Exception
*
diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/store/DatabaseStore.java b/src/main/java/edu/ucsb/nceas/mdqengine/store/DatabaseStore.java
index 9958136c..8a83abce 100644
--- a/src/main/java/edu/ucsb/nceas/mdqengine/store/DatabaseStore.java
+++ b/src/main/java/edu/ucsb/nceas/mdqengine/store/DatabaseStore.java
@@ -9,6 +9,8 @@
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.dataone.service.types.v1.*;
+import org.dataone.service.types.v2.Node;
import org.dataone.service.util.TypeMarshaller;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
@@ -22,11 +24,11 @@
import java.io.UnsupportedEncodingException;
import java.net.URL;
import java.sql.*;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
import java.time.Instant;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
+import java.util.Date;
/**
* Persistent storage for quality runs.
@@ -322,27 +324,24 @@ public void shutdown() {
}
}
- public void saveTask(Task task) throws MetadigStoreException {
+ public void saveTask(Task task, String nodeId) throws MetadigStoreException {
PreparedStatement stmt = null;
// Perform an 'upsert' on the 'nodes' table - if a record exists for the 'metadata_id, suite_id' already,
// then update the record with the incoming data.
try {
- String sql = "INSERT INTO tasks (task_name, task_type, last_harvest_datetime) VALUES (?, ?, ?)"
+ String sql = "INSERT INTO tasks (task_name, task_type) VALUES (?, ?)"
+ " ON CONFLICT ON CONSTRAINT task_name_task_type"
- + " DO UPDATE SET (task_name, task_type, last_harvest_datetime) = (?, ?, ?);";
+ + " DO NOTHING";
stmt = conn.prepareStatement(sql);
stmt.setString(1, task.getTaskName());
stmt.setString(2, task.getTaskType());
- stmt.setString(3, task.getLastHarvestDatetime());
- stmt.setString(4, task.getTaskName());
- stmt.setString(5, task.getTaskType());
- stmt.setString(6, task.getLastHarvestDatetime());
stmt.executeUpdate();
stmt.close();
conn.commit();
+ saveNodeHarvest(task, nodeId);
//conn.close();
} catch (SQLException e) {
log.error( e.getClass().getName()+": "+ e.getMessage());
@@ -355,7 +354,7 @@ public void saveTask(Task task) throws MetadigStoreException {
log.trace("Records created successfully");
}
- public Task getTask(String taskName, String taskType) {
+ public Task getTask(String taskName, String taskType, String nodeId) {
//return runs.get(id);
Result result = new Result();
@@ -376,12 +375,13 @@ public Task getTask(String taskName, String taskType) {
if(rs.next()) {
task.setTaskName(rs.getString("task_name"));
task.setTaskType(rs.getString("task_type"));
- task.setLastHarvestDatetime(rs.getString("last_harvest_datetime"));
rs.close();
stmt.close();
} else {
log.trace("No results returned from query");
}
+
+ task.setLastHarvestDatetimes(getNodeHarvestDatetimes(task.getTaskName(), task.getTaskType(), nodeId));
} catch ( Exception e ) {
log.error( e.getClass().getName()+": "+ e.getMessage());
}
@@ -389,6 +389,232 @@ public Task getTask(String taskName, String taskType) {
return(task);
}
+ public HashMap getNodeHarvestDatetimes(String taskName, String taskType, String nodeId) {
+
+ //return runs.get(id);
+ Result result = new Result();
+ PreparedStatement stmt = null;
+ String lastDT = null;
+ Task task = new Task();
+
+ HashMap nodeHarvestDates = new HashMap<>();
+ // Select records from the 'nodes' table
+ try {
+ String sql = "select * from node_harvest where task_name = ? and task_type = ? and node_id = ?";
+ stmt = conn.prepareStatement(sql);
+ stmt.setString(1, taskName);
+ stmt.setString(2, taskType);
+ stmt.setString(3, nodeId);
+
+ log.trace("issuing query: " + sql);
+ ResultSet rs = stmt.executeQuery();
+ while (rs.next()) {
+ nodeHarvestDates.put(nodeId, rs.getString("last_harvest_datetime"));
+ }
+ rs.close();
+ stmt.close();
+ } catch ( Exception e ) {
+ log.error( e.getClass().getName()+": "+ e.getMessage());
+ }
+
+ return(nodeHarvestDates);
+ }
+
+
+ public void saveNodeHarvest(Task task, String nodeId) throws MetadigStoreException {
+
+ PreparedStatement stmt = null;
+
+ // Perform an 'upsert' on the 'nodes' table - if a record exists for the 'metadata_id, suite_id' already,
+ // then update the record with the incoming data.
+ try {
+ String sql = "INSERT INTO node_harvest (task_name, task_type, node_id, last_harvest_datetime) VALUES (?, ?, ?, ?)"
+ + " ON CONFLICT ON CONSTRAINT node_harvest_task_name_task_type_node_id_uc"
+ + " DO UPDATE SET (task_name, task_type, node_id, last_harvest_datetime) = (?, ?, ?, ?);";
+
+ stmt = conn.prepareStatement(sql);
+ stmt.setString(1, task.getTaskName());
+ stmt.setString(2, task.getTaskType());
+ stmt.setString(3, nodeId);
+ stmt.setString(4, task.getLastHarvestDatetime(nodeId));
+ stmt.setString(5, task.getTaskName());
+ stmt.setString(6, task.getTaskType());
+ stmt.setString(7, nodeId);
+ stmt.setString(8, task.getLastHarvestDatetime(nodeId));
+ stmt.executeUpdate();
+ stmt.close();
+ conn.commit();
+ //conn.close();
+ } catch (SQLException e) {
+ log.error( e.getClass().getName()+": "+ e.getMessage());
+ MetadigStoreException me = new MetadigStoreException("Unable save last harvest date to the datdabase.");
+ me.initCause(e);
+ throw(me);
+ }
+
+ // Next, insert a record into the child table ('runs')
+ log.trace("Records created successfully");
+ }
+
+ public void saveNode(Node node) throws MetadigStoreException {
+
+ PreparedStatement stmt = null;
+
+ // Perform an 'upsert' on the 'nodes' table - if a record exists for the 'metadata_id, suite_id' already,
+ // then update the record with the incoming data.
+ try {
+ String sql = "INSERT INTO nodes " +
+ " (identifier, name, type, state, synchronize, last_harvest, baseURL) VALUES (?, ?, ?, ?, ?, ?, ?) " +
+ " ON CONFLICT ON CONSTRAINT node_id_pk DO UPDATE SET " +
+ " (identifier, name, type, state, synchronize, last_harvest, baseURL) = (?, ?, ?, ?, ?, ?, ?);";
+
+ DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
+ dateFormat.setTimeZone(TimeZone.getTimeZone("GMT"));
+ String lastHarvestDatetimeStr = dateFormat.format(node.getSynchronization().getLastHarvested());
+
+ stmt = conn.prepareStatement(sql);
+ stmt.setString(1, node.getIdentifier().getValue());
+ stmt.setString(2, node.getName());
+ stmt.setString(3, node.getType().toString());
+ stmt.setString(4, node.getState().toString());
+ stmt.setBoolean(5, node.isSynchronize());
+ stmt.setString(6, lastHarvestDatetimeStr);
+ stmt.setString(7, node.getBaseURL());
+ stmt.setString(8, node.getIdentifier().getValue());
+ stmt.setString(9, node.getName());
+ stmt.setString(10, node.getType().toString());
+ stmt.setString(11, node.getState().toString());
+ stmt.setBoolean(12, node.isSynchronize());
+ stmt.setString(13, lastHarvestDatetimeStr);
+ stmt.setString(14, node.getBaseURL());
+ stmt.executeUpdate();
+ stmt.close();
+ conn.commit();
+ } catch (SQLException e) {
+ log.error( e.getClass().getName()+": "+ e.getMessage());
+ MetadigStoreException me = new MetadigStoreException("Unable to save node " + node.getIdentifier().getValue() + " to database.");
+ me.initCause(e);
+ throw(me);
+ }
+
+ // Next, insert a record into the child table ('runs')
+ log.trace("Records created successfully");
+ }
+
+ public Node getNode(String nodeId) {
+
+ Result result = new Result();
+ PreparedStatement stmt = null;
+ Node node = new Node();
+
+ // Select records from the 'nodes' table
+ try {
+ log.trace("preparing statement for query");
+ String sql = "select * from nodes where identifier = ? ";
+ stmt = conn.prepareStatement(sql);
+ stmt.setString(1, nodeId);
+
+ log.trace("issuing query: " + sql);
+ ResultSet rs = stmt.executeQuery();
+ if(rs.next()) {
+ node = extractNodeFields(rs);
+ rs.close();
+ stmt.close();
+ } else {
+ log.trace("No results returned for nodeId: " + nodeId);
+ }
+ } catch ( Exception e ) {
+ log.error( e.getClass().getName()+": "+ e.getMessage());
+ }
+
+ return(node);
+ }
+
+ public ArrayList getNodes() {
+
+ Result result = new Result();
+ PreparedStatement stmt = null;
+
+ ArrayList nodes = new ArrayList<> ();
+ ResultSet rs = null;
+ Node node;
+ // Select records from the 'nodes' table
+ try {
+ log.trace("preparing statement for query");
+ String sql = "select * from nodes; ";
+ stmt = conn.prepareStatement(sql);
+
+ log.trace("issuing query: " + sql);
+ rs = stmt.executeQuery();
+ while(rs.next()) {
+ node = extractNodeFields(rs);
+ nodes.add(node);
+ }
+ } catch ( Exception e ) {
+ log.error(e.getClass().getName() + ": " + e.getMessage());
+ }
+
+ try {
+ rs.close();
+ stmt.close();
+ } catch (Exception e) {
+ log.error("Error closing node database: " + e.getMessage());
+ }
+
+ log.trace(nodes.size() + " nodes found in node table.");
+
+ return(nodes);
+ }
+
+ public Node extractNodeFields (ResultSet resultSet) {
+
+ Node node = new Node();
+ try {
+ NodeReference nodeReference = new NodeReference();
+ nodeReference.setValue(resultSet.getString("identifier"));
+ node.setIdentifier(nodeReference);
+ node.setName(resultSet.getString("name"));
+ switch (resultSet.getString("type")) {
+ case "CN":
+ node.setType(NodeType.CN);
+ break;
+ case "MN":
+ node.setType(NodeType.MN);
+ break;
+ case "MONITOR":
+ node.setType(NodeType.MONITOR);
+ break;
+ }
+
+ switch (resultSet.getString("state")) {
+ case "UP":
+ node.setState(NodeState.UP);
+ break;
+ case "DOWN":
+ node.setState(NodeState.DOWN);
+ break;
+ default:
+ node.setState(NodeState.UNKNOWN);
+ break;
+ }
+
+ node.setSynchronize(resultSet.getBoolean("synchronize"));
+
+ Synchronization synchronization = new Synchronization();
+ SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
+ formatter.setTimeZone(TimeZone.getTimeZone("GMT"));
+ Date lastHarvestDate = formatter.parse(resultSet.getString("last_harvest"));
+ synchronization.setLastHarvested(lastHarvestDate);
+ node.setSynchronization(synchronization);
+
+ node.setBaseURL(resultSet.getString("baseURL"));
+ } catch (java.sql.SQLException | java.text.ParseException e) {
+ log.error("Error retrieving node from database: " + e);
+ }
+
+ return node;
+ }
+
@Override
public void createRun(Run run) {
runs.put(run.getId(), run);
diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/store/InMemoryStore.java b/src/main/java/edu/ucsb/nceas/mdqengine/store/InMemoryStore.java
index af7637a0..e3f47e7a 100644
--- a/src/main/java/edu/ucsb/nceas/mdqengine/store/InMemoryStore.java
+++ b/src/main/java/edu/ucsb/nceas/mdqengine/store/InMemoryStore.java
@@ -9,6 +9,7 @@
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.dataone.service.types.v2.Node;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.xml.sax.SAXException;
@@ -16,6 +17,7 @@
import javax.xml.bind.JAXBException;
import java.io.IOException;
import java.net.URL;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@@ -210,13 +212,22 @@ public void deleteRun(Run run) {
// public void saveNode(Node node) throws MetadigStoreException { }
@Override
- public Task getTask(String taskName, String taskType) { return new Task(); }
+ public Task getTask(String taskName, String taskType, String nodeId) { return new Task(); }
@Override
- public void saveTask(Task task) throws MetadigStoreException { }
+ public void saveTask(Task task, String nodeId) throws MetadigStoreException { }
@Override
public void shutdown() {};
+ @Override
+ public Node getNode (String nodeId) { return new Node(); };
+
+ @Override
+ public void saveNode(Node node) throws MetadigStoreException {};
+
+ @Override
+ public ArrayList getNodes() { return new ArrayList<> (); };
+
}
diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/store/MDQStore.java b/src/main/java/edu/ucsb/nceas/mdqengine/store/MDQStore.java
index b9796c29..ad64d726 100644
--- a/src/main/java/edu/ucsb/nceas/mdqengine/store/MDQStore.java
+++ b/src/main/java/edu/ucsb/nceas/mdqengine/store/MDQStore.java
@@ -2,8 +2,11 @@
import edu.ucsb.nceas.mdqengine.exception.MetadigStoreException;
import edu.ucsb.nceas.mdqengine.model.*;
+import org.dataone.service.types.v2.Node;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
public interface MDQStore {
@@ -30,7 +33,12 @@ public interface MDQStore {
boolean isAvailable();
void renew() throws MetadigStoreException;
- Task getTask(String taskName, String taskType);
- void saveTask(Task task) throws MetadigStoreException;
+ Task getTask(String taskName, String taskType, String nodeId);
+ void saveTask(Task task, String nodeId) throws MetadigStoreException;
+
+ Node getNode (String nodeId);
+ void saveNode(Node node) throws MetadigStoreException;
+
+ ArrayList getNodes();
}
diff --git a/src/main/java/edu/ucsb/nceas/mdqengine/store/MNStore.java b/src/main/java/edu/ucsb/nceas/mdqengine/store/MNStore.java
index 4613577e..593e3e36 100644
--- a/src/main/java/edu/ucsb/nceas/mdqengine/store/MNStore.java
+++ b/src/main/java/edu/ucsb/nceas/mdqengine/store/MNStore.java
@@ -19,6 +19,7 @@
import org.dataone.service.types.v1.Session;
import org.dataone.service.types.v1.Subject;
import org.dataone.service.types.v1.util.ChecksumUtil;
+import org.dataone.service.types.v2.Node;
import org.dataone.service.types.v2.SystemMetadata;
import javax.xml.bind.JAXBException;
@@ -328,17 +329,11 @@ public void deleteRun(Run run) {
@Override
public void renew() {}
-// @Override
-// public Node getNode(String nodeId, String jobName) { return new Node(); }
-//
-// @Override
-// public void saveNode(Node node) throws MetadigStoreException { }
-
@Override
- public Task getTask(String taskName, String taskType) { return new Task(); }
+ public Task getTask(String taskName, String taskType, String nodeId) { return new Task(); }
@Override
- public void saveTask(Task task) throws MetadigStoreException { }
+ public void saveTask(Task task, String nodeId) throws MetadigStoreException { }
@Override
public void shutdown() {};
@@ -346,4 +341,14 @@ public void saveTask(Task task) throws MetadigStoreException { }
@Override
public void saveRun(Run run) {}
+ @Override
+ public Node getNode (String nodeId) { return new Node(); };
+
+ @Override
+ public void saveNode(Node node) throws MetadigStoreException {};
+
+ @Override
+ public ArrayList getNodes() { return new ArrayList<> (); };
+
+
}
diff --git a/src/main/resources/sql/quality-v2.3.0.sql b/src/main/resources/sql/quality-v2.3.0.sql
index 3c4e7dfb..45a26865 100644
--- a/src/main/resources/sql/quality-v2.3.0.sql
+++ b/src/main/resources/sql/quality-v2.3.0.sql
@@ -24,12 +24,22 @@ alter table identifiers owner to metadig;
create table tasks (
task_name TEXT not null,
task_type TEXT not null,
- last_harvest_datetime TEXT not null,
CONSTRAINT task_name_task_type PRIMARY KEY (task_name, task_type)
);
alter table tasks owner to metadig;
+create table node_harvest (
+ task_name TEXT not null,
+ task_type TEXT not null,
+ node_id TEXT not null,
+ last_harvest_datetime TEXT not null,
+ CONSTRAINT node_harvest_task_name_task_type_fk FOREIGN KEY (task_name, task_type) REFERENCES tasks (task_name, task_type),
+ CONSTRAINT node_harvest_task_name_task_type_node_id_uc UNIQUE (task_name, task_type, node_id)
+);
+
+alter table node_harvest owner to metadig;
+
create TABLE runs (
metadata_id TEXT not null,
suite_id TEXT not null,
@@ -62,3 +72,17 @@ create TABLE filestore (
alter table filestore owner to metadig;
+create TABLE nodes (
+ identifier TEXT not null,
+ name TEXT not null,
+ type TEXT not NULL,
+ state TEXT not NULL,
+ synchronize boolean not null,
+ last_harvest TEXT not null,
+ baseURL TEXT not null,
+ CONSTRAINT node_id_pk PRIMARY KEY (identifier)
+);
+
+alter table nodes owner to metadig;
+
+
From 54a1c4efa885c4435787f1185540bd5df8618352 Mon Sep 17 00:00:00 2001
From: gothub
Date: Wed, 2 Sep 2020 16:02:18 -0700
Subject: [PATCH 47/47] Add portal harvest task for mn-ucsb-1 (#256)
This is the current taskList.csv, which includes add'l entries to mn-ucsb-1
---
src/main/resources/configuration/taskList.csv | 51 +++++++++++++++----
1 file changed, 40 insertions(+), 11 deletions(-)
diff --git a/src/main/resources/configuration/taskList.csv b/src/main/resources/configuration/taskList.csv
index e1351e9a..80976d46 100644
--- a/src/main/resources/configuration/taskList.csv
+++ b/src/main/resources/configuration/taskList.csv
@@ -1,9 +1,8 @@
task-type,task-name,task-group,cron-schedule,params
-# task type, task name, task group, cron schedule, "formatId filter (regex); suite id; node id; D1 node base url; harvest begin date; harvest increment (days);requestCount"
-# - task type: currently 'quality' and 'score' task are supported.
-# - task name: any unique string, i.e. 'quality-knb'
-# - task group: currently only 'metadig' is used
-# - nodeId
+# task type, job name, job group, cron schedule, "formatId filter (regex); suite id; node id; D1 node base url; harvest begin date; harvest increment (days);requestCount"
+# - task type:
+# - job name:
+# - job group:
# - cron schedule:
# - seconds, minutes, hours, day of month, month, day of week, year
# - params
@@ -11,10 +10,40 @@ task-type,task-name,task-group,cron-schedule,params
# - suite id: the metadig suite id
# - node id: a DataONE node URN - data will be filtered using this (DataONE sysmeta "datasource")
# - D1 node base url: the base service URL for an MN or CN that will be used to query for pids to be processed
-# - harvest begin date: the first date to use for the DataONE 'listObjects' service
-# - harvest increment (days): the time span for each search
+# - harvest begin date: begin date: the first date to use for the DataONE 'listObjects' service
+# - harvest increment (days): increment (days): the time span for each search
# - requestCount: the number of itmes to request from DataONE listObjects
-score,score-DataONE-fair,metadig,35 0/1 * * * ?,".*portal.*;FAIR.suite.1;urn:node:CN;2019-12-01T00:00:00.00Z;1;100;refresh"
-quality,quality-arctic,metadig,20 0/1 * * * ?,"^eml.*|^http.*eml.*;arctic.data.center.suite.1;urn:node:ARCTIC;1;100"
-filestore,ingest,metadig,0 0/1 * * * ?,"stage;;*.*;README.txt;filestore-ingest.log"
-
+# - requestType: for score tasks, determine type of portal processing ("portal" or "node")
+#
+# Dataset quality scoring tasks
+quality,quality-knb,metadig,0 0/1 * * * ?,"^eml.*|^http.*eml.*;knb.suite.1;urn:node:KNB;2020-08-28T14:05:48.764Z;1;1000"
+quality,quality-arctic,metadig,5 0/1 * * * ?,"^eml.*|^http.*eml.*;arctic.data.center.suite.1;urn:node:ARCTIC;2020-08-27T00:00:00.000Z;1;1000"
+quality,quality-dataone-fair,metadig,10 0/1 * * * ?,"^eml.*|^http.*eml.*|.*www.isotc211.org.*;FAIR-suite-0.3.1;urn:node:CN;2020-08-28T00:00:00.000Z;1;1000"
+quality,quality-ess-dive,metadig,15 0/1 * * * ?,"^eml.*|^http.*eml.*;ess-dive.data.center.suite.1;urn:node:ESS_DIVE;2020-08-27T20:38:19.953Z;1;1000;"
+#
+# Portal scoring tasks
+score,portal-KNB-FAIR,metadig,5 0/1 * * * ?,"*portals*;FAIR-suite-0.3.1;urn:node:KNB;2020-08-28T00:00:00.00Z;1;100;portal"
+score,portal-ARCTIC-FAIR,metadig,10 0/1 * * * ?,"*portals*;FAIR-suite-0.3.1;urn:node:ARCTIC;2020-08-28T00:00:00.00Z;1;100;portal"
+score,portal-mnUCSB1-FAIR,metadig,15 0/1 * * * ?,"*portals*;FAIR-suite-0.3.1;urn:node:mnUCSB1;2020-08-28T00:00:00.00Z;1;100;portal"
+#
+# Note: Portal harvesting for DataONE portals created on search.dataone.org will be performed on mnUCSB1, as MetacatUI sends create and
+# update requests performed on search.dataone.org to this host. We want to harvest them as soon as they are created, and not have to wait for mnUCSB1 to
+# sync to the CN, and then the CN index it, so the following entry is obsolete, and no longer used.
+# # score,portal-CN-FAIR,metadig,35 0/1 * * * ?,"*portals*;FAIR.suite-0.3.1;urn:node:CN;2020-08-24T00:00:00.00Z;1;100;portal"
+#
+# Task for creating member node metadata assessment graphs
+score,mn-portal-ARCTIC-FAIR,metadig,0 0 2 * * ?,";FAIR-suite-0.3.1;urn:node:ARCTIC;2020-08-28T00:00:00.00Z;1;1000;node"
+score,mn-portal-KNB-FAIR,metadig,0 1 2 * * ?,";FAIR-suite-0.3.1;urn:node:KNB;2020-08-28T00:00:00.00Z;1;1000;node"
+score,mn-portal-ESS-DIVE-FAIR,metadig,0 2 2 * * ?,";FAIR-suite-0.3.1;urn:node:ESS_DIVE;2020-08-28T00:00:00.00Z;1;1000;node"
+score,mn-portal-CA_OPC-FAIR,metadig,0 3 2 * * ?,";FAIR-suite-0.3.1;urn:node:CA_OPC;2020-08-28T00:00:00.00Z;1;1000;node"
+score,mn-portal-DataONE-FAIR,metadig,0 4 2 * * ?,";FAIR-suite-0.3.1;urn:node:CN;2020-08-28T00:00:00.00Z;1;1000;node"
+#
+# Task for ingesting files into the file store from /data/metadig/store/stage/{code,data,graph,metadata}
+# filestore,ingest,metadig,0 0/1 * * * ?,"stage;;*.*;README.txt;filestore-ingest.log"
+#
+# Admin NOTE: it appears that DataONE HttpMultipartRestClient can't handle two clients being created at the same time, even if they are by different threads. This needs to be
+# investigated further and potentially a bug needs to be logged in redmine for this. Until then, an easy workaround is to ensure that no two tasks are started
+# at the same time, so adjust the cron schedule accordingly.
+#
+# Node list from DataONE
+nodelist,MN-NODE-LIST,metadig,0 0 0/1 * * ?,"urn:node:CN"
\ No newline at end of file