Skip to content

Commit

Permalink
Merge remote-tracking branch 'alibaba/main' into go-config
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Sep 12, 2024
2 parents 8d23744 + eaeeb4a commit 00ed039
Show file tree
Hide file tree
Showing 117 changed files with 4,399 additions and 4,247 deletions.
3 changes: 1 addition & 2 deletions core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ set(SUB_DIRECTORIES_LIST
file_server file_server/event file_server/event_handler file_server/event_listener file_server/reader file_server/polling
prometheus prometheus/labels prometheus/schedulers prometheus/async
ebpf ebpf/observer ebpf/security ebpf/handler
parser sls_control sdk fuse
parser sls_control sdk
)
if (LINUX)
if (ENABLE_ENTERPRISE)
Expand Down Expand Up @@ -221,7 +221,6 @@ if (BUILD_LOGTAIL_SHARED_LIBRARY)
endif ()

# Generate independent libraries.
# add_subdirectory(helper)
add_subdirectory(go_pipeline)
add_subdirectory(common)

Expand Down
1 change: 0 additions & 1 deletion core/checkpoint/AdhocCheckpointManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include "logger/Logger.h"
#include "monitor/LogtailAlarm.h"
#include "common/Thread.h"
#include "fuse/ulogfslib_file.h"
#include "common/HashUtil.h"

DEFINE_FLAG_INT32(adhoc_checkpoint_dump_thread_wait_interval, "microseconds", 5 * 1000);
Expand Down
14 changes: 8 additions & 6 deletions core/common/HashUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@
// limitations under the License.

#include "HashUtil.h"

#include <memory.h>

#include <boost/functional/hash.hpp>

#include "FileSystemUtil.h"
#include "murmurhash3.h"
#include "LogFileOperator.h"
#include <boost/functional/hash.hpp>
#include "murmurhash3.h"

namespace logtail {

Expand Down Expand Up @@ -335,9 +338,9 @@ bool CheckAndUpdateSignature(const std::string& signature, uint64_t& sigHash, ui
return rst;
}

bool CheckFileSignature(const std::string& filePath, uint64_t sigHash, uint32_t sigSize, bool fuseMode) {
bool CheckFileSignature(const std::string& filePath, uint64_t sigHash, uint32_t sigSize) {
LogFileOperator logFileOp;
logFileOp.Open(filePath.c_str(), fuseMode);
logFileOp.Open(filePath.c_str());
if (!logFileOp.IsOpen()) {
return false;
}
Expand Down Expand Up @@ -369,10 +372,9 @@ int64_t HashSignatureString(const char* str, size_t strLen) {
return *(int64_t*)hashVal;
}

void HashCombine(size_t &seed, size_t value) {
void HashCombine(size_t& seed, size_t value) {
boost::hash_combine(seed, value);
}



} // namespace logtail
4 changes: 2 additions & 2 deletions core/common/HashUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ void DoMd5(const uint8_t* poolIn, const uint64_t inputBytesNum, uint8_t md5[16])

bool SignatureToHash(const std::string& signature, uint64_t& sigHash, uint32_t& sigSize);
bool CheckAndUpdateSignature(const std::string& signature, uint64_t& sigHash, uint32_t& sigSize);
bool CheckFileSignature(const std::string& filePath, uint64_t sigHash, uint32_t sigSize, bool fuseMode = false);
bool CheckFileSignature(const std::string& filePath, uint64_t sigHash, uint32_t sigSize);

int64_t HashString(const std::string& str);
int64_t HashSignatureString(const char* str, size_t strLen);

void HashCombine(size_t &seed, size_t value);
void HashCombine(size_t& seed, size_t value);

} // namespace logtail
194 changes: 67 additions & 127 deletions core/common/LogFileOperator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,167 +14,115 @@

#include "LogFileOperator.h"
#if defined(_MSC_VER)
#include <io.h>
#include <fcntl.h>
#include <io.h>
#endif
#include "FileSystemUtil.h"
#include "fuse/ulogfslib_file.h"

namespace logtail {

int LogFileOperator::Open(const char* path, bool fuseMode) {
int LogFileOperator::Open(const char* path) {
if (!path || IsOpen()) {
return -1;
}
mFuseMode = fuseMode;

if (mFuseMode) {
mFd = ulogfs_open(path);
} else {
#if defined(_MSC_VER)
auto hFile = CreateFile(path,
GENERIC_READ,
FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
NULL,
OPEN_EXISTING,
FILE_ATTRIBUTE_NORMAL,
NULL);
if (INVALID_HANDLE_VALUE == hFile) {
return -1;
}
mFile = hFile;
// Might conflict, but can make sure that mFd >= 0.
mFd = (reinterpret_cast<std::intptr_t>(hFile)) & std::numeric_limits<int>::max();
auto hFile = CreateFile(path,
GENERIC_READ,
FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
NULL,
OPEN_EXISTING,
FILE_ATTRIBUTE_NORMAL,
NULL);
if (INVALID_HANDLE_VALUE == hFile) {
return -1;
}
mFile = hFile;
// Might conflict, but can make sure that mFd >= 0.
mFd = (reinterpret_cast<std::intptr_t>(hFile)) & std::numeric_limits<int>::max();
#else
mFd = open(path, O_RDONLY);
mFd = open(path, O_RDONLY);
#endif
}
return mFd;
}

int64_t LogFileOperator::Seek(int64_t offset, int whence) {
if (!IsOpen()) {
return -1;
}
if (mFuseMode) {
return ulogfs_seek(mFd, offset, whence);
} else {
#if defined(_MSC_VER)
switch (whence) {
case SEEK_CUR:
whence = FILE_CURRENT;
break;
case SEEK_SET:
whence = FILE_BEGIN;
break;
case SEEK_END:
whence = FILE_END;
break;
default:
return -1;
}
LARGE_INTEGER liPos;
liPos.QuadPart = offset;
LARGE_INTEGER liNewPos{0};
if (FALSE == SetFilePointerEx(mFile, liPos, &liNewPos, whence)) {
switch (whence) {
case SEEK_CUR:
whence = FILE_CURRENT;
break;
case SEEK_SET:
whence = FILE_BEGIN;
break;
case SEEK_END:
whence = FILE_END;
break;
default:
return -1;
}
return liNewPos.QuadPart;
}
LARGE_INTEGER liPos;
liPos.QuadPart = offset;
LARGE_INTEGER liNewPos{0};
if (FALSE == SetFilePointerEx(mFile, liPos, &liNewPos, whence)) {
return -1;
}
return liNewPos.QuadPart;
#else
return lseek(mFd, offset, whence);
return lseek(mFd, offset, whence);
#endif
}
}

int LogFileOperator::Stat(fsutil::PathStat& ps) const {
if (!IsOpen()) {
return -1;
}

if (mFuseMode) {
return ulogfs_stat(mFd, ps.GetRawStat());
} else {
#if defined(_MSC_VER)
return fsutil::PathStat::fstat(mFile, ps) ? 0 : -1;
return fsutil::PathStat::fstat(mFile, ps) ? 0 : -1;
#else
return fsutil::PathStat::fstat(mFd, ps) ? 0 : -1;
return fsutil::PathStat::fstat(mFd, ps) ? 0 : -1;
#endif
}
}

int LogFileOperator::Pread(void* ptr, size_t size, size_t count, int64_t offset) {
if (!ptr || !size || !count || !IsOpen()) {
return 0;
}

if (mFuseMode) {
// datadir is NULL, ulogfs will get real datadir from env
return ulogfs_pread2(mFd, NULL, ptr, size * count, (off_t*)&offset);
} else {
#if defined(_MSC_VER)
LARGE_INTEGER liPos;
liPos.QuadPart = offset;
if (FALSE == SetFilePointerEx(mFile, liPos, NULL, FILE_BEGIN)) {
return 0;
}
DWORD dwRead = 0;
if (FALSE == ::ReadFile(mFile, ptr, size * count, &dwRead, NULL)) {
return 0;
}
return static_cast<int>(dwRead);
#else
return pread(mFd, ptr, size * count, offset);
#endif
}
}

size_t LogFileOperator::SkipHoleRead(void* ptr, size_t size, size_t count, int64_t* offset) {
if (!mFuseMode || !ptr || !size || !count || !IsOpen()) {
LARGE_INTEGER liPos;
liPos.QuadPart = offset;
if (FALSE == SetFilePointerEx(mFile, liPos, NULL, FILE_BEGIN)) {
return 0;
}

int64_t off = *offset;
int nBytes = ulogfs_pread2(mFd, NULL, ptr, (int)(size * count), (off_t*)&off);
if (nBytes <= 0) {
return nBytes;
}

auto readBytes = (size_t)nBytes;

// if off == *offset, no hole no extra handle
// if off > *offset, there is a hole
if (off > *offset) {
if (off > *offset + nBytes) {
readBytes = 0;
} else {
readBytes = *offset + nBytes - off;
memmove(ptr, ((char*)ptr + (off - *offset)), readBytes);
}

*offset = off;
DWORD dwRead = 0;
if (FALSE == ::ReadFile(mFile, ptr, size * count, &dwRead, NULL)) {
return 0;
}

return readBytes;
return static_cast<int>(dwRead);
#else
return pread(mFd, ptr, size * count, offset);
#endif
}

int64_t LogFileOperator::GetFileSize() const {
if (!IsOpen()) {
return -1;
}

if (mFuseMode) {
return static_cast<int64_t>(ulogfs_tell(mFd));
} else {
#if defined(_MSC_VER)
LARGE_INTEGER liSize{0};
if (FALSE == GetFileSizeEx(mFile, &liSize)) {
return -1;
}
return static_cast<int64_t>(liSize.QuadPart);
LARGE_INTEGER liSize{0};
if (FALSE == GetFileSizeEx(mFile, &liSize)) {
return -1;
}
return static_cast<int64_t>(liSize.QuadPart);
#else
return static_cast<int64_t>(lseek(mFd, 0, SEEK_END));
return static_cast<int64_t>(lseek(mFd, 0, SEEK_END));
#endif
}
}

bool LogFileOperator::IsOpen() const {
Expand All @@ -187,16 +135,12 @@ int LogFileOperator::Close() {
}

int ret = 0;
if (mFuseMode) {
ret = ulogfs_close(mFd);
} else {
#if defined(_MSC_VER)
ret = (TRUE == CloseHandle(mFile)) ? 0 : -1;
mFile = INVALID_HANDLE_VALUE;
ret = (TRUE == CloseHandle(mFile)) ? 0 : -1;
mFile = INVALID_HANDLE_VALUE;
#else
ret = close(mFd);
ret = close(mFd);
#endif
}
mFd = -1;
return ret;
}
Expand All @@ -210,23 +154,19 @@ std::string LogFileOperator::GetFilePath() const {
return "";
}

if (mFuseMode) {
return GetFdPath(mFd);
} else {
#if defined(_MSC_VER)
char filePath[MAX_PATH + 1];
auto ret = GetFinalPathNameByHandle(mFile, filePath, MAX_PATH + 1, VOLUME_NAME_DOS);
if (ret > MAX_PATH || ret <= 0) {
return "";
}
if (0 == memcmp(filePath, "\\\\?\\", 4)) {
return std::string(filePath + 4);
}
return std::string(filePath);
char filePath[MAX_PATH + 1];
auto ret = GetFinalPathNameByHandle(mFile, filePath, MAX_PATH + 1, VOLUME_NAME_DOS);
if (ret > MAX_PATH || ret <= 0) {
return "";
}
if (0 == memcmp(filePath, "\\\\?\\", 4)) {
return std::string(filePath + 4);
}
return std::string(filePath);
#else
return GetFdPath(mFd);
return GetFdPath(mFd);
#endif
}
}

} // namespace logtail
8 changes: 2 additions & 6 deletions core/common/LogFileOperator.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,19 @@ namespace fsutil {

class LogFileOperator {
public:
LogFileOperator(bool fuseMode = false) : mFuseMode(fuseMode) {}
LogFileOperator() = default;
~LogFileOperator() { Close(); }

// @return file descriptor when fuseMode is enabled or on Linux.
// An positve identifier is returned on Windows.
int Open(const char* path, bool fuseMode = false);
int Open(const char* path);

int64_t Seek(int64_t offset, int whence);

int Stat(fsutil::PathStat& ps) const;

int Pread(void* ptr, size_t size, size_t count, int64_t offset);

// For FUSE only.
size_t SkipHoleRead(void* ptr, size_t size, size_t count, int64_t* offset);

// GetFileSize gets the size of current file.
int64_t GetFileSize() const;

Expand All @@ -77,7 +74,6 @@ class LogFileOperator {
HANDLE mFile = INVALID_HANDLE_VALUE;
#endif
int mFd = -1;
bool mFuseMode;

#ifdef APSARA_UNIT_TEST_MAIN
friend class LogFileOperatorUnittest;
Expand Down
Loading

0 comments on commit 00ed039

Please sign in to comment.