Skip to content

Commit

Permalink
XrdApps::JCache add a JournalManager and implement serving VectoRead
Browse files Browse the repository at this point in the history
also from the journal if all chunks are available
  • Loading branch information
Andreas Joachim Peters committed Jun 10, 2024
1 parent c9094f4 commit a8b3904
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 13 deletions.
39 changes: 38 additions & 1 deletion src/XrdApps/XrdClJCachePlugin/cache/Journal.hh
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
#include <stdint.h>
#include <string>
#include <mutex>
#include <map>
/*----------------------------------------------------------------------------*/

class Journal
{
static constexpr uint64_t JOURNAL_MAGIC = 0xcafecafecafecafe;
Expand Down Expand Up @@ -128,3 +128,40 @@ private:

};

class JournalManager {
private:
std::map<std::string, std::shared_ptr<Journal>> journals;
std::mutex jMutex;

public:
JournalManager() {}
virtual ~JournalManager() {}

// Attach method: creates or retrieves a Journal object by key
std::shared_ptr<Journal> attach(const std::string &key) {
std::lock_guard<std::mutex> guard(jMutex);
auto it = journals.find(key);
if (it == journals.end()) {
// Create a new Journal object if it doesn't exist
auto journal = std::make_shared<Journal>();
journals[key] = journal;
return journal;
} else {
// Return the existing Journal object
return it->second;
}
}

// Detach method: checks reference count and removes Journal object if necessary
void detach(const std::string &key) {
std::lock_guard<std::mutex> guard(jMutex);
auto it = journals.find(key);
if (it != journals.end()) {
if (it->second.use_count() == 1) {
// Only one reference exists, so erase the entry from the map
journals.erase(it);
}
// If more than one reference exists, do nothing
}
}
};
56 changes: 49 additions & 7 deletions src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
std::string XrdCl::JCacheFile::sCachePath="";
bool XrdCl::JCacheFile::sEnableJournalCache = true;
bool XrdCl::JCacheFile::sEnableVectorCache = true;
JournalManager XrdCl::JCacheFile::sJournalManager;

namespace XrdCl
{
Expand Down Expand Up @@ -128,7 +129,7 @@ JCacheFile::Close(ResponseHandler* handler,
st = XRootDStatus(stOK, 0);
}
if (sEnableJournalCache) {
pJournal.detach();
pJournal->detach();
}
} else {
st = XRootDStatus(stOK, 0);
Expand Down Expand Up @@ -172,7 +173,7 @@ JCacheFile::Read(uint64_t offset,

if (pFile) {
if (sEnableJournalCache && AttachForRead()) {
auto rb = pJournal.pread(buffer, size, offset);
auto rb = pJournal->pread(buffer, size, offset);
if (rb == size) {
pStats.bytesCached += rb;
pStats.readOps++;
Expand All @@ -187,7 +188,7 @@ JCacheFile::Read(uint64_t offset,
}
}

auto jhandler = new JCacheReadHandler(handler, &pStats.bytesRead,sEnableJournalCache?&pJournal:nullptr);
auto jhandler = new JCacheReadHandler(handler, &pStats.bytesRead,sEnableJournalCache?pJournal.get():nullptr);
pStats.readOps++;
st = pFile->Read(offset, size, buffer, jhandler, timeout);
} else {
Expand Down Expand Up @@ -231,7 +232,7 @@ JCacheFile::PgRead( uint64_t offset,
XRootDStatus st;
if (pFile) {
if (sEnableJournalCache && AttachForRead()) {
auto rb = pJournal.pread(buffer, size, offset);
auto rb = pJournal->pread(buffer, size, offset);
if (rb == size) {
pStats.bytesCached += rb;
pStats.readOps++;
Expand All @@ -246,7 +247,7 @@ JCacheFile::PgRead( uint64_t offset,
}
}

auto jhandler = new JCachePgReadHandler(handler, &pStats.bytesRead,sEnableJournalCache?&pJournal:nullptr);
auto jhandler = new JCachePgReadHandler(handler, &pStats.bytesRead,sEnableJournalCache?pJournal.get():nullptr);
pStats.readOps++;
st = pFile->PgRead(offset, size, buffer, jhandler, timeout);
} else {
Expand Down Expand Up @@ -347,11 +348,50 @@ JCacheFile::VectorRead(const ChunkList& chunks,
vResp = chunks;
obj->Set(vReadInfo);
handler->HandleResponse(ret_st, obj);
pStats.readVOps++;
pStats.readVreadOps += chunks.size();
pStats.bytesCachedV += len;
return st;
}
} else {
if (sEnableJournalCache) {
bool inJournal = true;
size_t len = 0;
// try to get chunks from journal cache
for (auto it = chunks.begin(); it != chunks.end(); ++it) {
auto rb = pJournal->pread(it->buffer, it->length, it->offset);
if (rb != it->length) {
// interrupt if we miss a piece and go remote
inJournal = false;
break;
} else {
len += it->length;
}
}
if (inJournal) {
// we found everything in the journal
pStats.readVOps++;
pStats.readVreadOps += chunks.size();
pStats.bytesCachedV += len;
XRootDStatus* ret_st = new XRootDStatus(st);
*ret_st = XRootDStatus(stOK, 0);
AnyObject* obj = new AnyObject();
VectorReadInfo* vReadInfo = new VectorReadInfo();
vReadInfo->SetSize(len);
ChunkList& vResp = vReadInfo->GetChunks();
vResp = chunks;
obj->Set(vReadInfo);
handler->HandleResponse(ret_st, obj);
pStats.readVOps++;
pStats.readVreadOps += chunks.size();
pStats.bytesCachedV += len;
return st;
}
}
}


auto jhandler = new JCacheReadVHandler(handler, &pStats.bytesReadV,sEnableJournalCache?&pJournal:nullptr, buffer?(char*)buffer:(char*)(chunks.begin()->buffer), sEnableVectorCache?sCachePath:"", pUrl);
auto jhandler = new JCacheReadVHandler(handler, &pStats.bytesReadV,sEnableJournalCache?pJournal.get():nullptr, buffer?(char*)buffer:(char*)(chunks.begin()->buffer), sEnableVectorCache?sCachePath:"", pUrl);
pStats.readVOps++;
pStats.readVreadOps += chunks.size();

Expand Down Expand Up @@ -453,10 +493,12 @@ JCacheFile::AttachForRead()
if ((mFlags & OpenFlags::Flags::Read) == OpenFlags::Flags::Read) {
// attach to a cache
if (sEnableJournalCache && pFile) {
mLog->Info(1, "JCache : attaching via journalmanager to '%s'", pUrl.c_str());
pJournal = sJournalManager.attach(pUrl);
StatInfo* sinfo = 0;
auto st = pFile->Stat(false, sinfo);
if (sinfo) {
if (pJournal.attach(pJournalPath,sinfo->GetSize(),sinfo->GetModTime(),0)) {
if (pJournal->attach(pJournalPath,sinfo->GetSize(),sinfo->GetModTime(),0)) {
mLog->Error(1, "JCache : failed to attach to cache directory: %s", pJournalPath.c_str());
mAttachedForRead = true;
return false;
Expand Down
17 changes: 12 additions & 5 deletions src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.hh
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ public:
static std::string sCachePath;
static bool sEnableVectorCache;
static bool sEnableJournalCache;

static JournalManager sJournalManager;

//----------------------------------------------------------------------------
//! @brief log cache hit statistics
//----------------------------------------------------------------------------
Expand Down Expand Up @@ -286,13 +287,19 @@ public:
{}

double HitRate() {
return 100.0*(this->bytesCached.load()+1) /(this->bytesCached.load()+this->bytesRead.load()+1);
auto n = this->bytesCached.load()+this->bytesRead.load();
if (!n) return 100.0;
return 100.0*(this->bytesCached.load()) / n;
}
double HitRateV() {
return 100.0*(this->bytesCachedV.load()+1) /(this->bytesCachedV.load()+this->bytesReadV.load()+1);
auto n = this->bytesCachedV.load()+this->bytesReadV.load();
if (!n) return 100.0;
return 100.0*(this->bytesCachedV.load()) / n;
}
double CombinedHitRate() {
return 100.0*(this->bytesCached.load()+this->bytesCachedV.load()+1) /(this->bytesCached.load()+this->bytesRead.load()+this->bytesCachedV.load()+this->bytesReadV.load()+1);
auto n = (this->bytesCached.load()+this->bytesRead.load()+this->bytesCachedV.load()+this->bytesReadV.load());
if (!n) return 100.0;
return 100.0*(this->bytesCached.load()+this->bytesCachedV.load()) / n;
}

std::atomic<uint64_t> bytesRead;
Expand Down Expand Up @@ -321,7 +328,7 @@ private:
//! @brief URL of the remote file
std::string pUrl;
//! @brief instance of a local journal for this file
Journal pJournal;
std::shared_ptr<Journal> pJournal;
//! @brief path to the journal of this file
std::string pJournalPath;
//! @brief pointer to logging object
Expand Down

0 comments on commit a8b3904

Please sign in to comment.