Skip to content

Commit

Permalink
Merge pull request #307 from NigelWu95/dev
Browse files Browse the repository at this point in the history
fix procedure log process.
  • Loading branch information
吴炳亨 authored Nov 25, 2020
2 parents cedba91 + 76ca00f commit 0846876
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 50 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=8.4.7
version=8.4.8

package_no_test:
mvn clean package -Dmaven.test.skip=true
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.qiniu</groupId>
<artifactId>qsuits</artifactId>
<version>8.4.7</version>
<version>8.4.8</version>
<name>qsuits</name>
<description>qiniu-suits is a efficient tools for qiniu api implemented by java8.</description>
<url>https://github.com/NigelWu95/qiniu-suits-java</url>
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/com/qiniu/datasource/CloudStorageContainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,7 @@ void listing(IStorageLister<E> lister) {
processorMap.put(orderStr, lineProcessor);
}
export(lister, saver, lineProcessor);
procedureLogger.info("{}-|-", lister.getPrefix());
progressMap.remove(lister.getPrefix()); // 只有 export 成功情况下才移除 record
removeRecordedPrefix(lister.getPrefix()); // 只有 export 成功情况下才移除 record
} catch (QiniuException e) {
try { FileUtils.createIfNotExists(errorLogFile); } catch (IOException ignored) {}
errorLogger.error("{}: {}, {}", lister.getPrefix(), progressMap.get(lister.getPrefix()), e.error(), e);
Expand Down Expand Up @@ -383,7 +382,7 @@ private List<IStorageLister<E>> filteredListerByPrefixes(Stream<String> prefixes
if (generated == null) return false;
else if (generated.currents().size() > 0 || generated.hasNext()) return true;
else {
progressMap.remove(generated.getPrefix());
removeRecordedPrefix(generated.getPrefix());
generated.close();
return false;
}
Expand Down Expand Up @@ -412,7 +411,7 @@ private void processNodeLister(IStorageLister<E> lister) {
if (lister.currents().size() > 0 || lister.hasNext()) {
executorPool.execute(() -> listing(lister));
} else {
progressMap.remove(lister.getPrefix());
removeRecordedPrefix(lister.getPrefix());
lister.close();
}
}
Expand Down Expand Up @@ -483,12 +482,13 @@ private List<String> checkListerInPool(List<IStorageLister<E>> listerList, int c
// lister 的 prefix 为 final 对象,不能因为 truncate 的操作之后被修改
prefix = lister.getPrefix();
nextMarker = lister.truncate();
// 防止 truncate 过程中原来的线程中丢失了 prefixAndEndedMap 的操作,这里再判断一次
endMap = prefixAndEndedMap.get(prefix);
prefixMap = new HashMap<>();
if (endMap == null) {
// 做标记,endMap 为空说明该 prefix 本就不是尾结点,重新组合列举时不能因为排序最后而加入到 prefixEndedMap 中去
prefixMap.put("remove", "remove");
} else {
// 防止 truncate 过程中原来的线程中丢失了 prefixAndEndedMap 的操作,这里再判断一次 endKey 进行更新
start = lister.currentEndKey();
if (start != null) endMap.put("start", start);
}
Expand Down Expand Up @@ -645,7 +645,7 @@ public void export() throws Exception {
if (startLister.currents().size() > 0 || startLister.hasNext()) {
listing(startLister);
} else {
progressMap.remove(startLister.getPrefix());
removeRecordedPrefix(startLister.getPrefix());
startLister.close();
}
} else {
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/com/qiniu/datasource/DatasourceActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,20 @@ void recordLister(String key, String record) {
procedureLogger.info("{}-|-{}", key, record);
}

void removeRecordedPrefix(String prefix) {
procedureLogger.info("{}-|-", prefix);
progressMap.remove(prefix);
}

void refreshRecordAndStatistics() {
rootLogger.info("finished count: {}.", statistics.get());
if (procedureLogFile.length() > 536870912) { // 超过 512M 就处理一次
try {
breakpointSaver.clear(breakpointFileName);
// StringBuilder record = new StringBuilder("{\"");
// progressMap.forEach((key, m) -> record.append(key).append("\":").append(m).append(",\""));
// record.delete(record.length() - 2, record.length()).append("}");
// breakpointSaver.writeToKey(breakpointFileName, record.toString().replace("\\", "\\\\"), true);
breakpointSaver.writeToKey(breakpointFileName, JsonUtils.toJsonWithoutUrlEscape(progressMap), true);
procedureLogFile.delete();
} catch (IOException e) {
Expand All @@ -125,6 +134,10 @@ void endAction() throws IOException {
}
String record = "{}";
if (progressMap.size() > 0) {
// StringBuilder re = new StringBuilder("{\"");
// progressMap.forEach((key, m) -> re.append(key).append("\":").append(m).append(",\""));
// re.delete(re.length() - 2, re.length()).append("}");
// record = re.toString().replace("\\", "\\\\");
record = JsonUtils.toJsonWithoutUrlEscape(progressMap);
breakpointSaver.writeToKey(breakpointFileName, record, true);
breakpointSaver.closeWriters();
Expand Down
88 changes: 48 additions & 40 deletions src/main/java/com/qiniu/entry/CommonParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -715,51 +715,51 @@ private void setPrivateType() throws IOException {
}

private void fromProcedureLog(String logFile, boolean withMarker, boolean withEnd) throws IOException {
String lastLine = FileUtils.lastLineOfFile(logFile);
if (lastLine != null && !"".equals(lastLine)) {
try {
try {
String lastLine = FileUtils.lastLineOfFile(logFile);
if (lastLine != null && !"".equals(lastLine)) {
parseConfigMapFromJson(JsonUtils.toJsonObject(lastLine), withMarker, withEnd);
} catch (Exception e) {
File file = new File(logFile);
FileReader fileReader = new FileReader(file);
BufferedReader bufferedReader = new BufferedReader(fileReader);
int index;
String line;
String value;
Map<String, String> map = new HashMap<>();
while ((line = bufferedReader.readLine()) != null) {
index = line.indexOf("-|-");
if (index < 0) {
try {
parseConfigMapFromJson(JsonUtils.toJsonObject(line), withMarker, withEnd);
return;
} catch (Exception exception) {
exception.printStackTrace();
}
} else {
map.put(line.substring(0, index), line.substring(index));
}
} catch (Exception e) {
File file = new File(logFile);
FileReader fileReader = new FileReader(file);
BufferedReader bufferedReader = new BufferedReader(fileReader);
int index;
String line;
String value;
Map<String, String> map = new HashMap<>();
while ((line = bufferedReader.readLine()) != null) {
index = line.indexOf("-|-");
if (index < 0) {
try {
parseConfigMapFromJson(JsonUtils.toJsonObject(line), withMarker, withEnd);
return;
} catch (Exception exception) {
exception.printStackTrace();
}
} else {
map.put(line.substring(0, index), line.substring(index + 3));
}
Map<String, String> configMap;
for (String key : map.keySet()) {
value = map.get(key);
if (!"".equals(value)) {
try {
configMap = JsonUtils.fromJson(value, map.getClass());
} catch (Exception e1) {
e1.printStackTrace();
continue;
}
pathConfigMap.put(key, configMap);
}
Map<String, String> configMap;
for (String key : map.keySet()) {
value = map.get(key);
if (!"".equals(value)) {
try {
configMap = JsonUtils.fromJson(value, map.getClass());
} catch (Exception e1) {
e1.printStackTrace();
continue;
}
pathConfigMap.put(key, configMap);
}
try {
bufferedReader.close();
fileReader.close();
} catch (IOException ioe) {
bufferedReader = null;
fileReader = null;
}
}
try {
bufferedReader.close();
fileReader.close();
} catch (IOException ioe) {
bufferedReader = null;
fileReader = null;
}
}
}
Expand Down Expand Up @@ -1489,6 +1489,10 @@ public void setBucket(String bucket) {
this.bucket = bucket;
}

public void setLogFilepath(String logFilepath) {
this.logFilepath = logFilepath;
}

public void setPathConfigMap(Map<String, Map<String, String>> pathConfigMap) {
this.pathConfigMap = pathConfigMap;
}
Expand Down Expand Up @@ -1689,6 +1693,10 @@ public String getBucket() {
return bucket;
}

public String getLogFilepath() {
return logFilepath;
}

public Map<String, Map<String, String>> getPathConfigMap() {
return pathConfigMap;
}
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/com/qiniu/util/JsonUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public static String toJsonWithoutUrlEscape(Object srcObject) {
.replace("\"{}\"", "{}")
.replace("\"{\\\"", "{\"")
.replace("\\\"}\"", "\"}")
.replace("\\\":\\\"", "\":\"");
.replace("\\\":\\\"", "\":\"")
.replace("\\\",\\\"", "\",\"");
// .replace("\\\\", "\\");
}

Expand Down
8 changes: 8 additions & 0 deletions src/test/java/com/qiniu/entry/CommonParamsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.qiniu.interfaces.IEntryParam;
import org.junit.Test;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.Map;

Expand All @@ -26,4 +27,11 @@ public void test() throws Exception {
Map<String,String> map = commonParams.getIndexMap();
System.out.println(map);
}

@Test
public void testFromProcedureLog() throws Exception {
CommonParams commonParams = new CommonParams();
commonParams.setLogFilepath("/Users/wubingheng/Downloads/procedure0.log.txt");
// commonParams.setPathConfigMap("", null, true, true);
}
}
2 changes: 1 addition & 1 deletion version.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=8.4.7
version=8.4.8

0 comments on commit 0846876

Please sign in to comment.