Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/candidate-9.8.x'
Browse files Browse the repository at this point in the history
Signed-off-by: Gordon Smith <[email protected]>

# Conflicts:
#	commons-hpcc/pom.xml
#	dfsclient/pom.xml
#	pom.xml
#	wsclient/pom.xml
  • Loading branch information
GordonSmith committed Jul 5, 2024
2 parents 520d0c0 + 2cfbaad commit 1812eb1
Show file tree
Hide file tree
Showing 8 changed files with 523 additions and 51 deletions.
19 changes: 15 additions & 4 deletions .github/workflows/Jirabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,17 @@ jobs:
import time
import sys
import json
import subprocess
from email.utils import parseaddr
from atlassian.jira import Jira
def sanitizeInput(input: str, inputType: str) -> str:
if inputType.lower() == 'email':
# Return the email address only, returns '' if not valid or found
return parseaddr(input)[1]
else:
return ''
def updateIssue(jira, issue, prAuthor : str, propertyMap: dict, pull_url: str) -> str:
result = ''
Expand Down Expand Up @@ -83,8 +92,12 @@ jobs:
assigneeId = assignee['accountId']
assigneeEmail = assignee["emailAddress"]
assigneeEmail = sanitizeInput(assigneeEmail, 'email')
prAuthorId = prAuthor["accountId"]
prAuthorEmail = prAuthor["emailAddress"]
prAuthorEmail = sanitizeInput(prAuthorEmail, 'email')
if assigneeId is None or assigneeId == '':
jira.assign_issue(issueName, prAuthorId)
result += 'Assigning user: ' + prAuthorEmail + '\n'
Expand All @@ -104,7 +117,6 @@ jobs:
github_token = os.environ['GITHUB_TOKEN']
comments_url = os.environ['COMMENTS_URL']
print("%s %s %s" % (title, prAuthor, comments_url))
result = ''
issuem = re.search("(HPCC4J|JAPI)-[0-9]+", title)
if issuem:
Expand All @@ -126,7 +138,7 @@ jobs:
if userSearchResults and len(userSearchResults) > 0:
jiraUser = userSearchResults[0]
else:
print('Error: Unable to find Jira user: ' + prAuthor + ' continuing without assigning')
print('Error: Unable to map GitHub user to Jira user, continuing without assigning')
if not jira.issue_exists(issue_name):
sys.exit('Error: Unable to find Jira issue: ' + issue_name)
Expand All @@ -148,8 +160,7 @@ jobs:
# Escape the result for JSON
result = json.dumps(result)
curlCommand = 'curl -X POST %s -H "Content-Type: application/json" -H "Authorization: token %s" --data \'{ "body": %s }\'' % ( comments_url, github_token, result )
os.system(curlCommand)
subprocess.run(['curl', '-X', 'POST', comments_url, '-H', 'Content-Type: application/json', '-H', f'Authorization: token {github_token}', '--data', f'{{ "body": {result} }}'], check=True)
else:
print('Unable to find Jira issue name in title')
Expand Down
1 change: 0 additions & 1 deletion .github/workflows/JirabotMerge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ jobs:
comments_url = os.environ['COMMENTS_URL']
project_name = 'HPCC4J'
print("Attempting to close out Jira issue: %s %s %s" % (title, user, comments_url))
result = ''
issuem = re.search("(" + project_name + ")-[0-9]+", title, re.IGNORECASE)
if issuem:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,14 @@

import org.hpccsystems.commons.ecl.FieldDef;
import org.hpccsystems.commons.ecl.RecordDefinitionTranslator;

import org.hpccsystems.dfs.client.RowServiceOutputStream;
import org.hpccsystems.dfs.client.Utils;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.semconv.ServerAttributes;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
Expand All @@ -37,6 +44,9 @@ public class HPCCRemoteFileWriter<T>
private long recordsWritten = 0;
private long openTimeMs = 0;

private Span writeSpan = null;
private String writeSpanName = null;

/**
* A remote file writer.
*
Expand Down Expand Up @@ -105,9 +115,24 @@ public HPCCRemoteFileWriter(DataPartition dp, FieldDef recordDef, IRecordAccesso

this.recordAccessor = recordAccessor;

this.writeSpanName = "HPCCRemoteFileWriter.RowService/Write_" + dp.getFileName() + "_" + dp.getThisPart();
this.writeSpan = Utils.createSpan(writeSpanName);

String primaryIP = dp.getCopyIP(0);
String secondaryIP = "";
if (dp.getCopyCount() > 1)
{
secondaryIP = dp.getCopyIP(1);
}

Attributes attributes = Attributes.of( AttributeKey.stringKey("server.primary.address"), primaryIP,
AttributeKey.stringKey("server.secondary.address"), secondaryIP,
ServerAttributes.SERVER_PORT, Long.valueOf(dp.getPort()));
writeSpan.setAllAttributes(attributes);

this.outputStream = new RowServiceOutputStream(dataPartition.getCopyIP(0), dataPartition.getPort(), dataPartition.getUseSsl(),
dataPartition.getFileAccessBlob(), this.recordDef, this.dataPartition.getThisPart(), this.dataPartition.getCopyPath(0),
fileCompression, connectTimeoutMs, socketOpTimeoutMs);
fileCompression, connectTimeoutMs, socketOpTimeoutMs, this.writeSpan);

this.binaryRecordWriter = new BinaryRecordWriter(this.outputStream);
this.binaryRecordWriter.initialize(this.recordAccessor);
Expand Down Expand Up @@ -161,6 +186,8 @@ public void close() throws Exception
this.report();
this.binaryRecordWriter.finalize();

this.writeSpan.end();

long closeTimeMs = System.currentTimeMillis();
double writeTimeS = (closeTimeMs - openTimeMs) / 1000.0;
log.info("HPCCRemoteFileWriter: Closing file part: " + dataPartition.getThisPart()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,17 @@
*******************************************************************************/
package org.hpccsystems.dfs.client;

import org.hpccsystems.dfs.client.Utils;

import org.hpccsystems.commons.ecl.FieldDef;
import org.hpccsystems.commons.ecl.RecordDefinitionTranslator;
import org.hpccsystems.commons.errors.HpccFileException;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.semconv.ServerAttributes;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

Expand Down Expand Up @@ -48,6 +56,9 @@ public class HpccRemoteFileReader<T> implements Iterator<T>
private long openTimeMs = 0;
private long recordsRead = 0;

private Span readSpan = null;
private String readSpanName = null;

public static final int NO_RECORD_LIMIT = -1;
public static final int DEFAULT_READ_SIZE_OPTION = -1;
public static final int DEFAULT_CONNECT_TIMEOUT_OPTION = -1;
Expand Down Expand Up @@ -204,6 +215,22 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde
this.createPrefetchThread = createPrefetchThread;
this.socketOpTimeoutMs = socketOpTimeoutMs;

this.readSpanName = "HPCCRemoteFileReader.RowService/Read_" + dp.getFileName() + "_" + dp.getThisPart();
this.readSpan = Utils.createSpan(readSpanName);

String primaryIP = dp.getCopyIP(0);
String secondaryIP = "";
if (dp.getCopyCount() > 1)
{
secondaryIP = dp.getCopyIP(1);
}

Attributes attributes = Attributes.of( AttributeKey.stringKey("server.primary.address"), primaryIP,
AttributeKey.stringKey("server.secondary.address"), secondaryIP,
ServerAttributes.SERVER_PORT, Long.valueOf(dp.getPort()),
AttributeKey.longKey("read.size"), Long.valueOf(readSizeKB*1000));
readSpan.setAllAttributes(attributes);

if (connectTimeout < 1)
{
connectTimeout = RowServiceInputStream.DEFAULT_CONNECT_TIMEOUT_MILIS;
Expand All @@ -212,18 +239,24 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde

if (this.originalRecordDef == null)
{
throw new Exception("HpccRemoteFileReader: Provided original record definition is null, original record definition is required.");
Exception e = new Exception("HpccRemoteFileReader: Provided original record definition is null, original record definition is required.");
this.readSpan.recordException(e);
this.readSpan.end();
throw e;
}

FieldDef projectedRecordDefinition = recBuilder.getRecordDefinition();
if (projectedRecordDefinition == null)
{
throw new Exception("IRecordBuilder does not have a valid record definition.");
Exception e = new Exception("IRecordBuilder does not have a valid record definition.");
this.readSpan.recordException(e);
this.readSpan.end();
throw e;
}

if (resumeInfo == null)
{
this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, null, false, socketOpTimeoutMs);
this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, null, false, socketOpTimeoutMs, readSpan);
this.binaryRecordReader = new BinaryRecordReader(this.inputStream);
this.binaryRecordReader.initialize(this.recordBuilder);

Expand All @@ -238,11 +271,14 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde
restartInfo.streamPos = resumeInfo.inputStreamPos;
restartInfo.tokenBin = resumeInfo.tokenBin;

this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, restartInfo, false, socketOpTimeoutMs);
this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, restartInfo, false, socketOpTimeoutMs, this.readSpan);

long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos;
if (bytesToSkip < 0)
{
throw new Exception("Unable to restart unexpected stream pos in record reader.");
Exception e = new Exception("Unable to restart read stream, unexpected stream position in record reader.");
this.readSpan.recordException(e);
this.readSpan.end();
}
this.inputStream.skip(bytesToSkip);

Expand Down Expand Up @@ -279,9 +315,11 @@ private boolean retryRead()

try
{
this.readSpan = Utils.createSpan(readSpanName);
this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef,
this.recordBuilder.getRecordDefinition(), this.connectTimeout, this.limit, this.createPrefetchThread,
this.readSizeKB, restartInfo, false, this.socketOpTimeoutMs);
this.readSizeKB, restartInfo, false, this.socketOpTimeoutMs, this.readSpan);

long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos;
if (bytesToSkip < 0)
{
Expand All @@ -294,6 +332,8 @@ private boolean retryRead()
}
catch (Exception e)
{
this.readSpan.recordException(e);
this.readSpan.end();
log.error("Failed to retry read for " + this.dataPartition.toString() + " " + e.getMessage(), e);
return false;
}
Expand Down Expand Up @@ -499,7 +539,9 @@ public void close() throws Exception
return;
}

this.readSpan.end();
report();

this.inputStream.close();
isClosed = true;

Expand Down
Loading

0 comments on commit 1812eb1

Please sign in to comment.