From b333af96fe96b8ff5fa116a6c5f0506643ad3fef Mon Sep 17 00:00:00 2001 From: Gordon Smith Date: Thu, 9 May 2024 16:55:23 +0100 Subject: [PATCH 1/4] Split off 9.6.12 Signed-off-by: Gordon Smith --- commons-hpcc/pom.xml | 2 +- dfsclient/pom.xml | 2 +- pom.xml | 2 +- wsclient/pom.xml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/commons-hpcc/pom.xml b/commons-hpcc/pom.xml index c17cd4a58..e6a661543 100644 --- a/commons-hpcc/pom.xml +++ b/commons-hpcc/pom.xml @@ -9,7 +9,7 @@ org.hpccsystems hpcc4j - 9.6.11-0-SNAPSHOT + 9.6.13-0-SNAPSHOT diff --git a/dfsclient/pom.xml b/dfsclient/pom.xml index 0084a8c71..6556e45f5 100644 --- a/dfsclient/pom.xml +++ b/dfsclient/pom.xml @@ -9,7 +9,7 @@ org.hpccsystems hpcc4j - 9.6.11-0-SNAPSHOT + 9.6.13-0-SNAPSHOT diff --git a/pom.xml b/pom.xml index d81d3bff5..26bf719ae 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ 4.0.0 org.hpccsystems hpcc4j - 9.6.11-0-SNAPSHOT + 9.6.13-0-SNAPSHOT pom HPCC Systems Java Projects https://hpccsystems.com diff --git a/wsclient/pom.xml b/wsclient/pom.xml index c4d5c7581..80045f0bc 100644 --- a/wsclient/pom.xml +++ b/wsclient/pom.xml @@ -9,7 +9,7 @@ org.hpccsystems hpcc4j - 9.6.11-0-SNAPSHOT + 9.6.13-0-SNAPSHOT From cfbc2a03530ae6c9b769425833590460280f20f4 Mon Sep 17 00:00:00 2001 From: Gordon Smith Date: Thu, 9 May 2024 16:56:42 +0100 Subject: [PATCH 2/4] Split off 9.4.60 Signed-off-by: Gordon Smith --- commons-hpcc/pom.xml | 2 +- dfsclient/pom.xml | 2 +- pom.xml | 2 +- wsclient/pom.xml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/commons-hpcc/pom.xml b/commons-hpcc/pom.xml index 0101e144c..0e45ac4bc 100644 --- a/commons-hpcc/pom.xml +++ b/commons-hpcc/pom.xml @@ -9,7 +9,7 @@ org.hpccsystems hpcc4j - 9.4.59-0-SNAPSHOT + 9.4.61-0-SNAPSHOT diff --git a/dfsclient/pom.xml b/dfsclient/pom.xml index fc4607753..32a4ac47d 100644 --- a/dfsclient/pom.xml +++ b/dfsclient/pom.xml @@ -9,7 +9,7 @@ org.hpccsystems hpcc4j - 9.4.59-0-SNAPSHOT + 9.4.61-0-SNAPSHOT diff --git a/pom.xml b/pom.xml index 53209c449..481f5be93 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ 4.0.0 org.hpccsystems hpcc4j - 9.4.59-0-SNAPSHOT + 9.4.61-0-SNAPSHOT pom HPCC Systems Java Projects https://hpccsystems.com diff --git a/wsclient/pom.xml b/wsclient/pom.xml index c80c18516..49762d707 100644 --- a/wsclient/pom.xml +++ b/wsclient/pom.xml @@ -9,7 +9,7 @@ org.hpccsystems hpcc4j - 9.4.59-0-SNAPSHOT + 9.4.61-0-SNAPSHOT From b9ce161f5cf22c99d47ba5de42b50323b384984c Mon Sep 17 00:00:00 2001 From: James McMullan Date: Wed, 15 May 2024 13:22:41 -0400 Subject: [PATCH 3/4] HPCC4J-599 Halt all JUnits on init failure (#708) - Updated init logic in BaseRemoteTest to automatically fail tests if init fails Signed-off-by: James McMullan James.McMullan@lexisnexis.com Signed-off-by: James McMullan James.McMullan@lexisnexis.com --- .github/workflows/JAPIPRBuildAction.yml | 10 ++-- .../ws/client/BaseRemoteInitTest.java | 38 +++++++++++++ .../hpccsystems/ws/client/BaseRemoteTest.java | 54 +++++++++++++++---- 3 files changed, 85 insertions(+), 17 deletions(-) create mode 100644 wsclient/src/test/java/org/hpccsystems/ws/client/BaseRemoteInitTest.java diff --git a/.github/workflows/JAPIPRBuildAction.yml b/.github/workflows/JAPIPRBuildAction.yml index 56d2121c0..4b238b99d 100644 --- a/.github/workflows/JAPIPRBuildAction.yml +++ b/.github/workflows/JAPIPRBuildAction.yml @@ -12,12 +12,6 @@ jobs: with: ref: ${{ github.event.pull_request.head.sha }} fetch-depth: 0 - - name: Rebase - run: | - git config user.email 'hpccsystems@lexisnexisrisk.com' - git config user.name 'hpccsystems development' - git rebase origin/${{ github.event.pull_request.base.ref }} - git log --pretty=one -n 15 - name: Set up JDK 11 uses: actions/setup-java@v1 @@ -34,3 +28,7 @@ jobs: - name: Build with Maven run: mvn -B package --file pom.xml + + # Expect a failure here, verifying that the test suite fails early on init issues + - name: Test Suite Verification + run: "! mvn test --activate-profiles jenkins-on-demand -Dhpccconn=https://bad_host:8010" diff --git a/wsclient/src/test/java/org/hpccsystems/ws/client/BaseRemoteInitTest.java b/wsclient/src/test/java/org/hpccsystems/ws/client/BaseRemoteInitTest.java new file mode 100644 index 000000000..abca7b7ae --- /dev/null +++ b/wsclient/src/test/java/org/hpccsystems/ws/client/BaseRemoteInitTest.java @@ -0,0 +1,38 @@ +/*############################################################################## + + HPCC SYSTEMS software Copyright (C) 2024 HPCC Systems®. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +############################################################################## */ + +package org.hpccsystems.ws.client; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(org.hpccsystems.commons.annotations.RemoteTests.class) +public class BaseRemoteInitTest +{ + @Test + public void initTest() throws Exception + { + String exceptionMessage = ""; + if (BaseRemoteTest.initializationException != null) + { + exceptionMessage = BaseRemoteTest.initializationException.getMessage(); + } + + Assert.assertTrue("Error initializing test suite: " + exceptionMessage, BaseRemoteTest.initializationException == null); + } +} diff --git a/wsclient/src/test/java/org/hpccsystems/ws/client/BaseRemoteTest.java b/wsclient/src/test/java/org/hpccsystems/ws/client/BaseRemoteTest.java index 402e2782d..08716ab48 100644 --- a/wsclient/src/test/java/org/hpccsystems/ws/client/BaseRemoteTest.java +++ b/wsclient/src/test/java/org/hpccsystems/ws/client/BaseRemoteTest.java @@ -17,9 +17,6 @@ HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®. package org.hpccsystems.ws.client; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - import java.io.ByteArrayOutputStream; import java.io.InputStream; import java.net.InetAddress; @@ -40,7 +37,9 @@ HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®. import org.hpccsystems.ws.client.platform.Platform; import org.hpccsystems.ws.client.utils.Connection; import org.hpccsystems.ws.client.wrappers.gen.wstopology.TpGroupWrapper; +import org.junit.Assume; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.experimental.categories.Category; import java.net.URL; @@ -52,6 +51,7 @@ HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®. @Category(org.hpccsystems.commons.annotations.RemoteTests.class) public abstract class BaseRemoteTest { + public static Exception initializationException = null; protected static Platform platform; protected static HPCCWsClient wsclient; @@ -85,6 +85,31 @@ public abstract class BaseRemoteTest public static final String DEFAULTHPCCSUPERFILENAME = "benchmark::all_types::superfile"; static + { + try + { + initialize(); + } + catch (Exception e) + { + initializationException = e; + } + } + + @BeforeClass + public static void initCheck() + { + String exceptionMessage = ""; + if (initializationException != null) + { + exceptionMessage = initializationException.getLocalizedMessage(); + initializationException.printStackTrace(); + } + + Assume.assumeTrue("Error initializing test suite: " + exceptionMessage, initializationException == null); + } + + public static void initialize() throws Exception { // This allows testing against locally created self signed certs to work. // In production certs will need to be created valid hostnames @@ -146,10 +171,14 @@ public boolean verify(String hostname,javax.net.ssl.SSLSession sslSession) } catch (MalformedURLException e) { - fail("Could not acquire connection object based on: '" + connString + "' - " + e.getLocalizedMessage()); + throw new Exception("Could not acquire connection object based on: '" + connString + "' - " + e.getLocalizedMessage()); + } + + if (connection == null) + { + throw new Exception("Could not acquire connection object based on: '" + connString + "'"); } - Assert.assertNotNull("Could not acquire connection object", connection); connection.setCredentials(hpccUser, hpccPass); if (connTO != null) @@ -159,8 +188,10 @@ public boolean verify(String hostname,javax.net.ssl.SSLSession sslSession) connection.setSocketTimeoutMilli(Integer.valueOf(sockTO)); platform = Platform.get(connection); - - Assert.assertNotNull("Could not acquire platform object", platform); + if (platform == null) + { + throw new Exception("Could not acquire platform object"); + } } try @@ -200,10 +231,11 @@ public boolean verify(String hostname,javax.net.ssl.SSLSession sslSession) } catch (Exception e) { - fail("Could not acquire wsclient object: " + e.getMessage() ); + throw new Exception("Could not acquire wsclient object: " + e.getMessage() ); } - Assert.assertNotNull("Could not acquire wsclient object", wsclient); + if (wsclient == null) + throw new Exception("Could not acquire wsclient object"); // Run the generate-datasets.ecl script if present in the project resources try @@ -212,7 +244,7 @@ public boolean verify(String hostname,javax.net.ssl.SSLSession sslSession) } catch (Exception e) { - Assert.fail("Error executing test data generation scripts with error: " + e.getMessage()); + throw new Exception("Error executing test data generation scripts with error: " + e.getMessage()); } } @@ -255,7 +287,7 @@ static public void executeMultiThreadedTask(Callable callableTask, int t { try { - assertTrue(futures.get(threadIndex).get().isEmpty()); + Assert.assertTrue(futures.get(threadIndex).get().isEmpty()); } catch (InterruptedException | ExecutionException e) { From 3c2441589e7bd9b125ab54a891636f407dd8cee6 Mon Sep 17 00:00:00 2001 From: drealeed Date: Thu, 16 May 2024 13:57:42 -0400 Subject: [PATCH 4/4] HPCC4J-595 Enhance log messages --- .../dfs/client/BinaryRecordReader.java | 12 +-- .../hpccsystems/dfs/client/DataPartition.java | 76 +++++++++++++++++-- .../org/hpccsystems/dfs/client/HPCCFile.java | 2 +- .../dfs/client/HpccRemoteFileReader.java | 14 ++-- .../dfs/client/RowServiceInputStream.java | 76 ++++++++++--------- .../dfs/client/RowServiceOutputStream.java | 2 +- .../dfs/client/DFSReadWriteTest.java | 13 +++- .../ws/client/HPCCWsTopologyClient.java | 2 +- .../ws/client/HPCCWsWorkUnitsClient.java | 2 +- .../hpccsystems/ws/client/BaseRemoteTest.java | 1 + 10 files changed, 141 insertions(+), 59 deletions(-) diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/BinaryRecordReader.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/BinaryRecordReader.java index 58470fdd8..b1ab06fe5 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/BinaryRecordReader.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/BinaryRecordReader.java @@ -271,7 +271,7 @@ public boolean hasNext() throws HpccFileException { if (this.rootRecordBuilder == null) { - throw new HpccFileException("RecordReader must be initialized before being used."); + throw new HpccFileException("BinaryRecordReader.hasNext(): RecordReader must be initialized before being used. rootRecordBuilder is null, hasNext() failed."); } int nextByte = -1; @@ -299,7 +299,7 @@ public boolean hasNext() throws HpccFileException } catch (IOException e) { - throw new HpccFileException(e); + throw new HpccFileException("BinaryRecordReader.hasNext(): failed to peek at the next byte in the input stream:" + e.getMessage(),e); } return nextByte >= 0; @@ -314,7 +314,7 @@ public Object getNext() throws HpccFileException { if (this.rootRecordBuilder == null) { - throw new HpccFileException("RecordReader must be initialized before being used."); + throw new HpccFileException("BinaryRecordReader.getNext(): RecordReader must be initialized before being used, rootRecordBuilder is null."); } if (!this.hasNext()) throw new NoSuchElementException("No next record!"); @@ -325,13 +325,13 @@ public Object getNext() throws HpccFileException record = parseRecord(this.rootRecordDefinition, this.rootRecordBuilder, this.defaultLE); if (record == null) { - throw new HpccFileException("RecordContent not found, or invalid record structure. Check logs for more information."); + throw new HpccFileException("BinaryRecordReader.getNext(): RecordContent not found, or invalid record structure. Check logs for more information."); } } catch (Exception e) { - throw new HpccFileException("Failed to parse next record: " + e.getMessage(), e); + throw new HpccFileException("BinaryRecordReader.getNext(): Failed to parse next record: " + e.getMessage(), e); } this.streamPosAfterLastRecord = this.inputStream.getStreamPosition(); @@ -370,7 +370,7 @@ private Object parseFlatField(FieldDef fd, boolean isLittleEndian) throws Unpars if (fd.isFixed() && fd.getDataLen() > Integer.MAX_VALUE) { - throw new UnparsableContentException("Data length: " + fd.getDataLen() + " exceeds max supported length: " + Integer.MAX_VALUE); + throw new UnparsableContentException("BinaryRecordReader.parseFlatField(): Data length: " + fd.getDataLen() + " exceeds max supported length: " + Integer.MAX_VALUE); } // Embedded field lengths are little endian diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java index a846969e0..75eb5c1e0 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java @@ -43,6 +43,7 @@ public class DataPartition implements Serializable private String fileAccessBlob; private FileType fileType; private boolean isTLK; + private String fileName; public static enum FileType { @@ -197,13 +198,42 @@ private DataPartition(String[] copyLocations, String[] copyPaths, int partNum, i * the file type */ private DataPartition(String[] copylocations, String[] copyPaths, int this_part, int num_parts, int clearport, boolean sslport, FileFilter filter, - String fileAccessBlob, FileType fileType) + String fileAccessBlob, FileType fileType) { + this(copylocations,copyPaths,this_part,num_parts,clearport,sslport,filter,fileAccessBlob,fileType,null); + } + /** + * Construct the data part, used by makeParts. + * + * @param copylocations + * locations of all copies of this file part + * @param copyPaths + * the copy paths + * @param this_part + * part number + * @param num_parts + * number of parts + * @param clearport + * port number of clear communications + * @param sslport + * port number of ssl communications + * @param filter + * the file filter object + * @param fileAccessBlob + * file access token + * @param fileType + * the file type + * @param fileName + * the file name + */ + private DataPartition(String[] copylocations, String[] copyPaths, int this_part, int num_parts, int clearport, boolean sslport, FileFilter filter, + String fileAccessBlob, FileType fileType, String fileName) { this.this_part = this_part; this.num_parts = num_parts; this.rowservicePort = clearport; this.useSSL = sslport; this.fileFilter = filter; + this.fileName=fileName; if (this.fileFilter == null) { this.fileFilter = new FileFilter(); @@ -348,6 +378,16 @@ public boolean getUseSsl() return useSSL; } + /** + * File name being read + * + * @return filename + */ + public String getFileName() + { + return fileName; + } + /** * Copy Path. * @@ -415,8 +455,7 @@ public DataPartition setFilter(FileFilter filter) public String toString() { StringBuilder sb = new StringBuilder(); - sb.append(this.getThisPart()); - sb.append(" copy locations: {"); + sb.append("part ").append(this.getThisPart()).append(", copy locations: {"); for (int copyindex = 0; copyindex < getCopyCount(); copyindex++) { if (copyindex > 0) sb.append(", "); @@ -471,6 +510,31 @@ public static DataPartition[] createPartitions(DFUFilePartWrapper[] dfuparts, Cl return createPartitions(dfuparts, clusterremapper, max_parts, FileFilter.nullFilter(), fileAccessBlob, FileType.FLAT); } + + /** + * Creates the partitions. + * + * @param dfuparts + * the dfuparts + * @param clusterremapper + * the clusterremapper + * @param max_parts + * the max parts + * @param filter + * the filter + * @param fileAccessBlob + * the file access blob + * @param fileType + * the file type + * @return the data partition[] + * @throws HpccFileException + * the hpcc file exception + */ + public static DataPartition[] createPartitions(DFUFilePartWrapper[] dfuparts, ClusterRemapper clusterremapper, int max_parts, FileFilter filter, + String fileAccessBlob, FileType fileType) throws HpccFileException { + return createPartitions(dfuparts,clusterremapper,max_parts,filter,fileAccessBlob,fileType,null); + } + /** * Creates the partitions. * @@ -486,12 +550,14 @@ public static DataPartition[] createPartitions(DFUFilePartWrapper[] dfuparts, Cl * the file access blob * @param fileType * the file type + * @param fileName + * the file name * @return the data partition[] * @throws HpccFileException * the hpcc file exception */ public static DataPartition[] createPartitions(DFUFilePartWrapper[] dfuparts, ClusterRemapper clusterremapper, int max_parts, FileFilter filter, - String fileAccessBlob, FileType fileType) throws HpccFileException + String fileAccessBlob, FileType fileType, String fileName) throws HpccFileException { DataPartition[] rslt = new DataPartition[dfuparts.length]; @@ -508,7 +574,7 @@ public static DataPartition[] createPartitions(DFUFilePartWrapper[] dfuparts, Cl DataPartition new_dp = new DataPartition(clusterremapper.reviseIPs(dfuparts[i].getCopies()), copyPaths, dfuparts[i].getPartIndex(), dfuparts.length, clusterremapper.revisePort(null), clusterremapper.getUsesSSLConnection(null), filter, fileAccessBlob, - fileType); + fileType,fileName); new_dp.isTLK = dfuparts[i].isTopLevelKey(); rslt[i] = new_dp; diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCFile.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCFile.java index 8df2ba73e..98bf4d47d 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCFile.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCFile.java @@ -469,7 +469,7 @@ private void createDataParts() throws HpccFileException { ClusterRemapper clusterremapper = ClusterRemapper.makeMapper(clusterRemapInfo, fileinfoforread); this.dataParts = DataPartition.createPartitions(fileinfoforread.getFileParts(), clusterremapper, - /* maxParts currently ignored anyway */0, filter, fileinfoforread.getFileAccessInfoBlob(), fileType); + /* maxParts currently ignored anyway */0, filter, fileinfoforread.getFileAccessInfoBlob(), fileType,this.getFileName()); // Check to see if this file has a TLK. The TLK will always be the last partition. // If we do have a TLK remove it from the standard list of data partitions. diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java index 1a4d0dc54..a1d161fe3 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java @@ -193,7 +193,7 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde * @throws Exception * general exception */ - public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilder recBuilder, int connectTimeout, int limit, boolean createPrefetchThread, int readSizeKB, FileReadResumeInfo resumeInfo, int socketOpTimeoutMs) throws Exception + public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilder recBuilder, int connectTimeout, int limit, boolean createPrefetchThread, int readSizeKB, FileReadResumeInfo resumeInfo, int socketOpTimeoutMs) throws Exception { this.handlePrefetch = createPrefetchThread; this.originalRecordDef = originalRD; @@ -280,8 +280,8 @@ private boolean retryRead() try { this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, - this.recordBuilder.getRecordDefinition(), this.connectTimeout, this.limit, this.createPrefetchThread, - this.readSizeKB, restartInfo, false, this.socketOpTimeoutMs); + this.recordBuilder.getRecordDefinition(), this.connectTimeout, this.limit, this.createPrefetchThread, + this.readSizeKB, restartInfo, false, this.socketOpTimeoutMs); long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos; if (bytesToSkip < 0) { @@ -434,7 +434,7 @@ public boolean hasNext() if (!retryRead()) { canReadNext = false; - log.error("Read failure for " + this.dataPartition.toString(), e); + log.error("Read failure for " + this.dataPartition.toString() +":" + e.getMessage(),e); java.util.NoSuchElementException exception = new java.util.NoSuchElementException("Fatal read error: " + e.getMessage()); exception.initCause(e); throw exception; @@ -505,7 +505,7 @@ public void close() throws Exception long closeTimeMs = System.currentTimeMillis(); double readTimeS = (closeTimeMs - openTimeMs) / 1000.0; - log.info("HPCCRemoteFileReader: Closing file part: " + dataPartition.getThisPart() + log.info("HPCCRemoteFileReader: Closing file part: " + dataPartition.getThisPart() + " for " + dataPartition.getFileName() + " read time: " + readTimeS + "s " + " records read: " + recordsRead); } @@ -550,8 +550,8 @@ public void report() { if (getRemoteReadMessageCount() > 0) { - log.warn("DataPartition '" + this.dataPartition + "' read operation messages:\n"); + log.warn("DataPartition '" + this.dataPartition + "' read operation messages for " + dataPartition.getFileName() + ":\n"); log.warn(getRemoteReadMessages()); } } -} +} \ No newline at end of file diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java index ab3c38a6e..2399ef1e6 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java @@ -61,7 +61,6 @@ public class RowServiceInputStream extends InputStream implements IProfilable private String projectedJsonRecordDefinition = null; private java.io.DataInputStream dis = null; private java.io.DataOutputStream dos = null; - private String rowServiceVersion = ""; private int filePartCopyIndexPointer = 0; //pointer into the prioritizedCopyIndexes struct @@ -370,7 +369,8 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co this.tokenBin = restartInfo.tokenBin; this.streamPos = restartInfo.streamPos; this.streamPosOfFetchStart = this.streamPos; - } + } + String prefix = "RowServiceInputStream constructor, file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; if (inFetchingMode == false) { @@ -389,7 +389,7 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co } catch (Exception e) { - prefetchException = new HpccFileException("Error while batch fetch warm starting: " + e.getMessage()); + prefetchException = new HpccFileException(prefix + "Error while batch fetch warm starting: " + e.getMessage()); } blockingRequestFinished.set(true); @@ -734,6 +734,8 @@ private int startFetch() { return -1; } + String prefix = "RowServiceInputStream.startFetch(), file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; + //------------------------------------------------------------------------------ // If we haven't made the connection active, activate it now and send the @@ -779,7 +781,7 @@ private int startFetch() } catch (IOException e) { - prefetchException = new HpccFileException("Failure sending read ahead transaction", e); + prefetchException = new HpccFileException(prefix + "Failure sending read ahead transaction:" + e.getMessage(), e); try { close(); @@ -814,7 +816,7 @@ private int startFetch() if (response.errorCode != RFCCodes.RFCStreamNoError) { - prefetchException = new HpccFileException(response.errorMessage); + prefetchException = new HpccFileException(prefix + response.errorMessage); try { close(); @@ -834,7 +836,7 @@ private int startFetch() } catch (IOException e) { - prefetchException = new HpccFileException(e.getMessage()); + prefetchException = new HpccFileException(prefix + "response length was < 0; error closing file:" + e.getMessage()); } return -1; } @@ -858,13 +860,13 @@ private int startFetch() } catch (IOException e) { - prefetchException = new HpccFileException("Failed on remote read read retry", e); + prefetchException = new HpccFileException(prefix + "Failed on remote read read retry:" + e.getMessage(), e); return -1; } } else if (this.handle == 0) { - prefetchException = new HpccFileException("Read retry failed"); + prefetchException = new HpccFileException(prefix + "response.handle was null, Read retry failed"); try { close(); @@ -898,7 +900,7 @@ else if (this.handle == 0) } catch (IOException e) { - prefetchException = new HpccFileException("Error during read block", e); + prefetchException = new HpccFileException(prefix + "Error during read block:" + e.getMessage(), e); try { close(); @@ -911,6 +913,7 @@ else if (this.handle == 0) private void readDataInFetch() { + String prefix = "RowServiceInputStream.readDataInFetch(), file " + dataPart.getFileName() + "part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; if (this.closed.get()) { return; @@ -948,7 +951,7 @@ private void readDataInFetch() bytesToRead = this.dis.available(); if (bytesToRead < 0) { - throw new IOException("Encountered unexpected end of stream mid fetch."); + throw new IOException(prefix + "Encountered unexpected end of stream mid fetch, this.dis.available() returned " + bytesToRead + " bytes."); } // Either due to a bug in the JVM or due to a design issue @@ -966,7 +969,7 @@ private void readDataInFetch() } catch (IOException e) { - prefetchException = new HpccFileException("Error during read block", e); + prefetchException = new HpccFileException(prefix + "Error during read block:" + e.getMessage(), e); try { close(); @@ -990,6 +993,7 @@ private void readDataInFetch() private void finishFetch() { + String prefix = "RowServiceInputStream.finishFetch(), file " + dataPart.getFileName() + "part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; if (this.closed.get()) { return; @@ -1026,7 +1030,7 @@ private void finishFetch() } catch (IOException e) { - prefetchException = new HpccFileException("Error during read block", e); + prefetchException = new HpccFileException(prefix + "Error during finish request read block: " + e.getMessage(), e); try { close(); @@ -1053,7 +1057,7 @@ private void finishFetch() } catch (IOException e) { - prefetchException = new HpccFileException("Failure sending read ahead transaction", e); + prefetchException = new HpccFileException(prefix + "Failure sending read ahead transaction:" + e.getMessage(), e); try { close(); @@ -1203,12 +1207,14 @@ private void compactBuffer() @Override public int available() throws IOException { + String prefix = "RowServiceInputStream.available(), file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; + // Do the check for closed first here to avoid data races if (this.closed.get()) { if (this.prefetchException != null) { - throw new IOException("Prefetch thread exited early exception.", this.prefetchException); + throw new IOException("Prefetch thread exited early exception:" + prefetchException.getMessage(), this.prefetchException); } int bufferLen = this.readBufferDataLen.get(); @@ -1216,7 +1222,7 @@ public int available() throws IOException if (availBytes == 0) { // this.bufferWriteMutex.release(); - throw new IOException("End of input stream."); + throw new IOException(prefix + "End of input stream, bufferLen:" + bufferLen + ", this.readPos:" + this.readPos + ", availableBytes=0"); } } @@ -1338,7 +1344,7 @@ public int read() throws IOException { if (this.prefetchException != null) { - throw new IOException(this.prefetchException.getMessage()); + throw new IOException(this.prefetchException.getMessage(),this.prefetchException); } // We are waiting on a single byte so hot loop @@ -1426,7 +1432,7 @@ public int read(byte[] b, int off, int len) throws IOException { if (this.prefetchException != null) { - throw new IOException(this.prefetchException.getMessage()); + throw new IOException(this.prefetchException.getMessage(),prefetchException); } int available = 0; @@ -1466,7 +1472,7 @@ public void reset() throws IOException { if (this.prefetchException != null) { - throw new IOException(this.prefetchException.getMessage()); + throw new IOException(this.prefetchException.getMessage(),prefetchException); } if (this.markPos < 0) @@ -1490,7 +1496,7 @@ public long skip(long n) throws IOException { if (this.prefetchException != null) { - throw new IOException(this.prefetchException.getMessage()); + throw new IOException(this.prefetchException.getMessage(),prefetchException); } // Have to read the data if we need to skip @@ -1550,6 +1556,7 @@ private void makeActive() throws HpccFileException { this.active.set(false); this.handle = 0; + String prefix = "RowServiceInputStream.makeActive, file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; boolean needsRetry = false; do @@ -1597,11 +1604,11 @@ private void makeActive() throws HpccFileException } catch (java.net.UnknownHostException e) { - throw new HpccFileException("Bad file part IP address or host name: " + this.getIP(), e); + throw new HpccFileException(prefix + "Bad file part IP address or host name: " + e.getMessage(),e); } catch (java.io.IOException e) { - throw new HpccFileException(e); + throw new HpccFileException(prefix + " error making part active:" + e.getMessage(),e); } try @@ -1611,7 +1618,7 @@ private void makeActive() throws HpccFileException } catch (java.io.IOException e) { - throw new HpccFileException("Failed to create streams", e); + throw new HpccFileException(prefix + " Failed to make streams for datapart:" + e.getMessage(), e); } //------------------------------------------------------------------------------ @@ -1629,7 +1636,7 @@ private void makeActive() throws HpccFileException } catch (IOException e) { - throw new HpccFileException("Failed on initial remote read trans", e); + throw new HpccFileException(prefix+ " Failed on initial remote read transfer: " + e.getMessage(),e); } RowServiceResponse response = readResponse(); @@ -1648,7 +1655,7 @@ private void makeActive() throws HpccFileException } catch (IOException e) { - throw new HpccFileException("Error while attempting to read version response.", e); + throw new HpccFileException(prefix + "Error while attempting to read version response:" + e.getMessage(), e); } rowServiceVersion = new String(versionBytes, HPCCCharSet); @@ -1678,7 +1685,7 @@ private void makeActive() throws HpccFileException } catch (IOException e) { - throw new HpccFileException("Failed on initial remote read read trans", e); + throw new HpccFileException(prefix + " Failed on initial remote read read trans:" + e.getMessage(), e); } if (CompileTimeConstants.PROFILE_CODE) @@ -1690,14 +1697,12 @@ private void makeActive() throws HpccFileException } catch (Exception e) { - log.error("Could not reach file part: '" + dataPart.getThisPart() + "' copy: '" + (getFilePartCopy() + 1) + "' on IP: '" + getIP() - + "'"); - log.error(e.getMessage()); + log.error(prefix + ": Could not reach file part: '" + dataPart.getThisPart() + "' copy: '" + (getFilePartCopy() + 1) + "' on IP: '" + getIP() + ":" + e.getMessage(),e); needsRetry = true; if (!setNextFilePartCopy()) { - throw new HpccFileException("Unsuccessfuly attempted to connect to all file part copies", e); + throw new HpccFileException(prefix + " Unsuccessfuly attempted to connect to all file part copies", e); } } } while (needsRetry); @@ -2111,6 +2116,8 @@ private String makeCloseHandleRequest() private void sendCloseFileRequest() throws IOException { + String prefix = "RowServiceInputStream.sendCloseFileRequest(), file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; + if (useOldProtocol) { return; @@ -2129,7 +2136,7 @@ private void sendCloseFileRequest() throws IOException } catch (IOException e) { - throw new IOException("Failed on close file with error: ", e); + throw new IOException(prefix + " Failed on close file with error: " + e.getMessage(), e); } try @@ -2138,13 +2145,14 @@ private void sendCloseFileRequest() throws IOException } catch (HpccFileException e) { - throw new IOException("Failed to close file. Unable to read response with error: ", e); + throw new IOException(prefix + "Failed to close file. Unable to read response with error: " + e.getMessage(), e); } } private RowServiceResponse readResponse() throws HpccFileException { RowServiceResponse response = new RowServiceResponse(); + String prefix="RowServiceInputStream.readResponse(): , file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ": "; try { response.len = dis.readInt(); @@ -2187,7 +2195,7 @@ private RowServiceResponse readResponse() throws HpccFileException sb.append("\nInvalid file access expiry reported - change File Access Expiry (HPCCFile) and retry"); break; case RFCCodes.DAFSERR_cmdstream_authexpired: - sb.append("\nFile access expired before initial request - Retry and consider increasing File Access Expiry (HPCCFile)"); + sb.append("\nFile access expired before initial request - Retry and consider increasing File Access Expiry (HPCCFile) to something greater than " + this.socketOpTimeoutMs); break; default: break; @@ -2200,7 +2208,7 @@ private RowServiceResponse readResponse() throws HpccFileException if (response.len < 4) { - throw new HpccFileException("Early data termination, no handle"); + throw new HpccFileException(prefix + "Early data termination, no handle. response length < 4"); } response.handle = dis.readInt(); @@ -2208,7 +2216,7 @@ private RowServiceResponse readResponse() throws HpccFileException } catch (IOException e) { - throw new HpccFileException("Error while attempting to read row service response: ", e); + throw new HpccFileException(prefix + "Error while attempting to read row service response: " + e.getMessage(), e); } return response; diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java index f6ea288cd..be85d89e8 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java @@ -438,7 +438,7 @@ private RowServiceResponse readResponse() throws HpccFileException sb.append("\nInvalid file access expiry reported - change File Access Expiry (HPCCFile) and retry"); break; case RFCCodes.DAFSERR_cmdstream_authexpired: - sb.append("\nFile access expired before initial request - Retry and consider increasing File Access Expiry (HPCCFile)"); + sb.append("\nFile access expired before initial request - Retry and consider increasing File Access Expiry (HPCCFile) to something greater than " + this.sockOpTimeoutMs); break; default: break; diff --git a/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java b/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java index 896fa35c0..7da3d72aa 100644 --- a/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java +++ b/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java @@ -57,6 +57,8 @@ import org.junit.runners.MethodSorters; import org.junit.experimental.categories.Category; +import static org.hpccsystems.dfs.client.HpccRemoteFileReader.DEFAULT_READ_SIZE_OPTION; +import static org.hpccsystems.dfs.client.HpccRemoteFileReader.NO_RECORD_LIMIT; import static org.junit.Assert.*; @Category(org.hpccsystems.commons.annotations.RemoteTests.class) @@ -64,7 +66,11 @@ public class DFSReadWriteTest extends BaseRemoteTest { private static final String[] datasets = { "~benchmark::integer::20kb", "~unit_test::all_types::thor", "~unit_test::all_types::xml", "~unit_test::all_types::json", "~unit_test::all_types::csv" }; - private static final int[] expectedCounts = { 1250, 10000, 10000, 10000, 10000, 10000}; + private static final int[] expectedCounts = { 1250, 5600, 10000, 10000, 10000, 10000}; + + //use until standard test is working + // private static final String[] datasets = { "~benchmark::integer::20kb", "~benchmark::all_types::200kb"};//, "~unit_test::all_types::xml", "~unit_test::all_types::json", "~unit_test::all_types::csv" }; + // private static final int[] expectedCounts = { 1250, 5600 };//, 10000, 10000, 10000, 10000}; private static final Version newProtocolVersion = new Version(8,12,10); @@ -301,7 +307,8 @@ public void readResumeTest() throws Exception for (int i = 0; i < resumeInfo.size(); i++) { HPCCRecordBuilder recordBuilder = new HPCCRecordBuilder(file.getProjectedRecordDefinition()); - HpccRemoteFileReader fileReader = new HpccRemoteFileReader(fileParts[resumeFilePart.get(i)], originalRD, recordBuilder, -1, -1, true, readSizeKB, resumeInfo.get(i)); + HpccRemoteFileReader fileReader = new HpccRemoteFileReader( + fileParts[resumeFilePart.get(i)], originalRD, recordBuilder, 1000000, -1, true, readSizeKB, resumeInfo.get(i)); if (fileReader.hasNext()) { @@ -1407,7 +1414,7 @@ public List readFile(HPCCFile file, Integer connectTimeoutMillis, bo try { HPCCRecordBuilder recordBuilder = new HPCCRecordBuilder(file.getProjectedRecordDefinition()); - HpccRemoteFileReader fileReader = new HpccRemoteFileReader(fileParts[i], originalRD, recordBuilder); + HpccRemoteFileReader fileReader = new HpccRemoteFileReader(fileParts[i], originalRD, recordBuilder, RowServiceInputStream.DEFAULT_CONNECT_TIMEOUT_MILIS, NO_RECORD_LIMIT, true, DEFAULT_READ_SIZE_OPTION,null,RowServiceInputStream.DEFAULT_SOCKET_OP_TIMEOUT_MS); fileReader.getRecordReader().setUseDecimalForUnsigned8(useDecimalForUnsigned8); fileReader.getRecordReader().setStringProcessingFlags(stringProcessingFlags); fileReaders.add(fileReader); diff --git a/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsTopologyClient.java b/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsTopologyClient.java index 1e3cd5859..f01883489 100644 --- a/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsTopologyClient.java +++ b/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsTopologyClient.java @@ -319,7 +319,7 @@ public List getTopologyGroups(String kind) throws HpccContaineri } catch (RemoteException e) { - throw new Exception("HPCCWsTopologyClient.getTopologyGroups(kind) encountered RemoteException.", e); + throw new Exception("HPCCWsTopologyClient.getTopologyGroups(kind) encountered RemoteException for topology kind " + kind, e); } if (response == null) diff --git a/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsWorkUnitsClient.java b/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsWorkUnitsClient.java index 3c7b74424..829644a84 100644 --- a/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsWorkUnitsClient.java +++ b/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsWorkUnitsClient.java @@ -271,7 +271,7 @@ public void initWsWorkUnitsClientStub(Connection conn) } catch (AxisFault e) { - initErrMessage += "\nHPCCWsWorkUnitsClient: Could not retrieve version appropriate stub objct"; + initErrMessage += "\nHPCCWsWorkUnitsClient: Could not retrieve version appropriate stub object:" + e.getMessage(); } } else diff --git a/wsclient/src/test/java/org/hpccsystems/ws/client/BaseRemoteTest.java b/wsclient/src/test/java/org/hpccsystems/ws/client/BaseRemoteTest.java index 402e2782d..ac571bb24 100644 --- a/wsclient/src/test/java/org/hpccsystems/ws/client/BaseRemoteTest.java +++ b/wsclient/src/test/java/org/hpccsystems/ws/client/BaseRemoteTest.java @@ -212,6 +212,7 @@ public boolean verify(String hostname,javax.net.ssl.SSLSession sslSession) } catch (Exception e) { + e.printStackTrace(); Assert.fail("Error executing test data generation scripts with error: " + e.getMessage()); } }