Skip to content

Commit

Permalink
fix: Resolve issue with ProcessorParseDelimiterNative capturing next …
Browse files Browse the repository at this point in the history
…line data (#1250)

* std::search
The root issue of ProcessorParseDelimiterNative mistakenly pulling content from the following line has been addressed. The underlying cause was identified as the `strstr` method lacking a length parameter, which has now been rectified by using std::search to ensure proper content parsing.
  • Loading branch information
quzard authored and yyuuttaaoo committed Dec 7, 2023
1 parent 5a9fcc2 commit 4bfdccd
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 10 deletions.
13 changes: 10 additions & 3 deletions core/processor/ProcessorParseDelimiterNative.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
*/

#include "processor/ProcessorParseDelimiterNative.h"

#include "common/Constants.h"
#include "models/LogEvent.h"
#include "monitor/MetricConstants.h"
#include "parser/LogParser.h"
#include "plugin/instance/ProcessorInstance.h"
#include "monitor/MetricConstants.h"


namespace logtail {
Expand Down Expand Up @@ -247,8 +248,14 @@ bool ProcessorParseDelimiterNative::SplitString(
size_t pos = begIdx;
size_t top = endIdx - d_size;
while (pos <= top) {
const char* pch = strstr(buffer + pos, mSeparator.c_str());
size_t pos2 = pch == NULL ? endIdx : (pch - buffer);
const char* pch = std::search(buffer + pos, buffer + endIdx, mSeparator.begin(), mSeparator.end());
size_t pos2;
// if not found, pos2 = endIdx
if (pch == buffer + endIdx) {
pos2 = endIdx;
} else {
pos2 = pch - buffer;
}
if (pos2 != pos) {
colBegIdxs.push_back(pos);
colLens.push_back(pos2 - pos);
Expand Down
6 changes: 4 additions & 2 deletions core/processor/ProcessorSplitLogStringNative.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
* limitations under the License.
*/

#include "plugin/interface/Processor.h"
#include <string>
#include <boost/regex.hpp>
#include <string>

#include "plugin/interface/Processor.h"

namespace logtail {

Expand All @@ -41,6 +42,7 @@ class ProcessorSplitLogStringNative : public Processor {
bool mEnableLogPositionMeta = false;
#ifdef APSARA_UNIT_TEST_MAIN
friend class ProcessorRegexStringNativeUnittest;
friend class ProcessorParseDelimiterNativeUnittest;
#endif
};

Expand Down
91 changes: 86 additions & 5 deletions core/unittest/processor/ProcessorParseDelimiterNativeUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
// limitations under the License.

#include <cstdlib>
#include "unittest/Unittest.h"

#include "common/JsonUtil.h"
#include "config/Config.h"
#include "processor/ProcessorParseDelimiterNative.h"
#include "models/LogEvent.h"
#include "plugin/instance/ProcessorInstance.h"
#include "processor/ProcessorParseDelimiterNative.h"
#include "processor/ProcessorSplitLogStringNative.h"
#include "unittest/Unittest.h"

namespace logtail {

Expand All @@ -34,6 +35,7 @@ class ProcessorParseDelimiterNativeUnittest : public ::testing::Test {
}

void TestInit();
void TestMultipleLines();
void TestProcessWholeLine();
void TestProcessQuote();
void TestProcessKeyOverwritten();
Expand All @@ -46,6 +48,7 @@ class ProcessorParseDelimiterNativeUnittest : public ::testing::Test {
};

UNIT_TEST_CASE(ProcessorParseDelimiterNativeUnittest, TestInit);
UNIT_TEST_CASE(ProcessorParseDelimiterNativeUnittest, TestMultipleLines);
UNIT_TEST_CASE(ProcessorParseDelimiterNativeUnittest, TestProcessWholeLine);
UNIT_TEST_CASE(ProcessorParseDelimiterNativeUnittest, TestProcessQuote);
UNIT_TEST_CASE(ProcessorParseDelimiterNativeUnittest, TestProcessKeyOverwritten);
Expand All @@ -70,6 +73,84 @@ void ProcessorParseDelimiterNativeUnittest::TestInit() {
APSARA_TEST_TRUE_FATAL(processorInstance.Init(componentConfig, mContext));
}

void ProcessorParseDelimiterNativeUnittest::TestMultipleLines() {
// make config
Config config;
config.mLogType = DELIMITER_LOG;
config.mLogBeginReg = "";
config.mAdvancedConfig.mEnableLogPositionMeta = false;
config.mSeparator = "@@";
config.mQuote = '\000';
config.mColumnKeys = {"a", "b", "c"};
config.mDiscardUnmatch = true;
config.mUploadRawLog = false;
config.mAdvancedConfig.mRawLogTag = "__raw__";
// make events
auto sourceBuffer = std::make_shared<SourceBuffer>();
PipelineEventGroup eventGroup(sourceBuffer);
std::string inJson = R"({
"events" :
[
{
"contents" :
{
"content" : "123@@456@@789
012@@345@@678
",
"log.file.offset": "0"
},
"timestamp" : 12345678901,
"type" : 1
}
]
})";
eventGroup.FromJsonString(inJson);
// run function ProcessorSplitLogStringNative
ProcessorSplitLogStringNative processorSplitLogStringNative;
processorSplitLogStringNative.SetContext(mContext);
std::string pluginId = "testID";
ComponentConfig componentConfig(pluginId, config);
APSARA_TEST_TRUE_FATAL(processorSplitLogStringNative.Init(componentConfig));
processorSplitLogStringNative.Process(eventGroup);
// run function ProcessorParseDelimiterNative
ProcessorParseDelimiterNative& processorDelimiterNative = *(new ProcessorParseDelimiterNative);
ProcessorInstance processorInstance(&processorDelimiterNative, pluginId);
APSARA_TEST_TRUE_FATAL(processorInstance.Init(componentConfig, mContext));
processorDelimiterNative.Process(eventGroup);
std::string expectJson = R"({
"events" :
[
{
"contents" :
{
"a": "123",
"b": "456",
"c": "789",
"log.file.offset": "0"
},
"timestamp" : 12345678901,
"timestampNanosecond": 0,
"type" : 1
},
{
"contents" :
{
"a": "012",
"b": "345",
"c": "678",
"log.file.offset": "0"
},
"timestamp" : 12345678901,
"timestampNanosecond": 0,
"type" : 1
}
]
})";
// judge result
std::string outJson = eventGroup.ToJsonString();
APSARA_TEST_STREQ_FATAL(CompactJson(expectJson).c_str(), CompactJson(outJson).c_str());
}

void ProcessorParseDelimiterNativeUnittest::TestProcessWholeLine() {
// make config
Config config;
Expand Down Expand Up @@ -558,10 +639,10 @@ void ProcessorParseDelimiterNativeUnittest::TestProcessEventKeepUnmatch() {
APSARA_TEST_EQUAL_FATAL(count, processor.GetContext().GetProcessProfile().parseFailures);
APSARA_TEST_EQUAL_FATAL(count, processorInstance.mProcInRecordsTotal->GetValue());
std::string expectValue = "value1";
APSARA_TEST_EQUAL_FATAL((expectValue.length())*count, processor.mProcParseInSizeBytes->GetValue());
APSARA_TEST_EQUAL_FATAL((expectValue.length()) * count, processor.mProcParseInSizeBytes->GetValue());
APSARA_TEST_EQUAL_FATAL(count, processorInstance.mProcOutRecordsTotal->GetValue());
expectValue = "__raw_log__value1";
APSARA_TEST_EQUAL_FATAL((expectValue.length())*count, processor.mProcParseOutSizeBytes->GetValue());
APSARA_TEST_EQUAL_FATAL((expectValue.length()) * count, processor.mProcParseOutSizeBytes->GetValue());

APSARA_TEST_EQUAL_FATAL(0, processor.mProcDiscardRecordsTotal->GetValue());

Expand Down Expand Up @@ -646,7 +727,7 @@ void ProcessorParseDelimiterNativeUnittest::TestProcessEventDiscardUnmatch() {
APSARA_TEST_EQUAL_FATAL(count, processor.GetContext().GetProcessProfile().parseFailures);
APSARA_TEST_EQUAL_FATAL(count, processorInstance.mProcInRecordsTotal->GetValue());
std::string expectValue = "value1";
APSARA_TEST_EQUAL_FATAL((expectValue.length())*count, processor.mProcParseInSizeBytes->GetValue());
APSARA_TEST_EQUAL_FATAL((expectValue.length()) * count, processor.mProcParseInSizeBytes->GetValue());
// discard unmatch, so output is 0
APSARA_TEST_EQUAL_FATAL(0, processorInstance.mProcOutRecordsTotal->GetValue());
APSARA_TEST_EQUAL_FATAL(0, processor.mProcParseOutSizeBytes->GetValue());
Expand Down

0 comments on commit 4bfdccd

Please sign in to comment.