Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit the number of entries per folder. User xrootdpyfs instead of system calls #250

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 142 additions & 82 deletions cms-2016-simulated-datasets/code/eos_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,22 @@

import json
import os
import sys
import re
import subprocess
from utils import get_dataset_name, \
get_dataset_runperiod, \
get_dataset_version, \
get_dataset_format, \
get_dataset_year
import sys

from xrootdpyfs import XRootDPyFS

XROOTD_URI_BASE = 'root://eospublic.cern.ch/'
from utils import (get_dataset_format, get_dataset_name, get_dataset_runperiod,
get_dataset_version, get_dataset_year)

XROOTD_DIR_BASE = '/eos/opendata/'
XROOTD_URI_BASE = "root://eospublic.cern.ch/"

MCDIR_BASE = 'mc'
XROOTD_DIR_BASE = "/eos/opendata/"

EXPERIMENT = 'cms'
MCDIR_BASE = "mc"

EXPERIMENT = "cms"

DEBUG = True

Expand All @@ -41,9 +41,13 @@ def check_datasets_in_eos_dir(datasets, eos_dir):
dataset_full_names = []
for dataset in datasets:
dataset_index_file_base = get_dataset_index_file_base(dataset)
if subprocess.call('ls ' + eos_dir + ' | grep -q ' + dataset_index_file_base, shell=True):
print('[ERROR] Missing EOS information, ignoring dataset ' + dataset,
file=sys.stderr)
if subprocess.call(
"ls " + eos_dir + " | grep -q " + dataset_index_file_base, shell=True
):
print(
"[ERROR] Missing EOS information, ignoring dataset " + dataset,
file=sys.stderr,
)
else:
dataset_full_names.append(dataset)

Expand All @@ -52,106 +56,162 @@ def check_datasets_in_eos_dir(datasets, eos_dir):

def get_dataset_index_file_base(dataset):
"Return index file base for given dataset."
filebase = EXPERIMENT.upper() + '_' + \
MCDIR_BASE + '_' + \
get_dataset_runperiod(dataset) + '_' + \
get_dataset_name(dataset) + '_' + \
get_dataset_format(dataset) + '_' + \
get_dataset_version(dataset)
filebase = (
EXPERIMENT.upper()
+ "_"
+ MCDIR_BASE
+ "_"
+ get_dataset_runperiod(dataset)
+ "_"
+ get_dataset_name(dataset)
+ "_"
+ get_dataset_format(dataset)
+ "_"
+ get_dataset_version(dataset)
)
return filebase


def get_dataset_location(dataset):
"Return EOS location of the dataset."
return XROOTD_DIR_BASE + \
EXPERIMENT + '/' + \
MCDIR_BASE + '/' + \
get_dataset_runperiod(dataset) + '/' + \
get_dataset_name(dataset) + '/' + \
get_dataset_format(dataset) + '/' + \
get_dataset_version(dataset)


def get_dataset_volumes(dataset):
return (
XROOTD_DIR_BASE
+ EXPERIMENT
+ "/"
+ MCDIR_BASE
+ "/"
+ get_dataset_runperiod(dataset)
+ "/"
+ get_dataset_name(dataset)
+ "/"
+ get_dataset_format(dataset)
+ "/"
+ get_dataset_version(dataset)
)


def get_dataset_volumes(fs, dataset):
"Return list of volumes for the given dataset."
volumes = []
dataset_location = get_dataset_location(dataset)
try:
output = subprocess.check_output('eos ls -1 ' + dataset_location, shell=True)
except subprocess.CalledProcessError:
return []
output = str(output.decode("utf-8"))
for line in output.split('\n'):
if line and line != 'file-indexes':
volumes.append(line)

volumes = []
for entry in fs.listdir(dataset_location, dirs_only=True):
if entry != "file-indexes":
volumes.append(entry)
return volumes


def get_dataset_volume_files(dataset, volume):
def get_dataset_volume_files(fs, dataset, volume):
"Return file list with information about name, size, location for the given dataset and volume."
files = []
dataset_location = get_dataset_location(dataset)
output = subprocess.check_output('eos oldfind --size --checksum ' + dataset_location + '/' + volume, shell=True)
output = str(output.decode("utf-8"))
for line in output.split('\n'):
if line and line != 'file-indexes':
match = re.match(r'^path=(.*) size=(.*) checksum=(.*)$', line)
if match:
path, size, checksum = match.groups()
files.append({'filename': os.path.basename(path),
'size': int(size),
'checksum': 'adler32:' + checksum,
'uri': XROOTD_URI_BASE + path})
return files


def create_index_file(dataset, volume, files, eos_dir, style='txt'):

all_dirs = [f"{dataset_location}/{volume}"]
all_files = []

for my_dir in all_dirs:
all_dirs += fs.listdir(my_dir, dirs_only=True, absolute=True)
all_files += fs.listdir(my_dir, files_only=True, absolute=True)

all_entries = []
for entry in all_files:
status, stat = fs._client.stat(entry)
checksum = ":".join(fs.xrd_checksum(entry))
all_entries.append(
{
"filename": os.path.basename(entry),
"size": stat.size,
"checksum": checksum,
"uri": XROOTD_URI_BASE + entry,
}
)

return all_entries


def create_index_file(filebase, files, eos_dir, style, volume_dir):
"Create index file in the given style format (text, json)."
filebase = get_dataset_index_file_base(dataset) + '_' + \
volume + '_' + \
'file_index'
filename = filebase + '.' + style
fdesc = open(eos_dir + '/' + filename, 'w')
if style == 'txt':
for afile in files:
fdesc.write(afile['uri'])
fdesc.write('\n')
elif style == 'json':
fdesc.write(json.dumps(files, indent=2, sort_keys=True))
fdesc.write('\n')
fdesc.close()

filename = filebase + "." + style
try:
fdesc = open(f"{eos_dir}/{str(volume_dir)}/{filename}", "w")
if style == "txt":
for afile in files:
fdesc.write(afile["uri"])
fdesc.write("\n")
elif style == "json":
fdesc.write(json.dumps(files, indent=2, sort_keys=True))
fdesc.write("\n")
fdesc.close()
except Exception as exc:
print("Error doing the file '", filename, "': ", exc)
return None
return filename


def copy_index_file(dataset, volume, filename, eos_dir):
def copy_index_file(dataset, volume, filename, eos_dir, volume_dir):
"Copy index file filename to its final destination on EOS."
dataset_location = get_dataset_location(dataset)
cmd = 'eos cp ' + eos_dir + '/' + filename + ' ' + dataset_location + '/file-indexes/' + filename
cmd = (
"eos cp "
+ eos_dir
+ "/"
+ str(volume_dir)
+ "/"
+ filename
+ " "
+ dataset_location
+ "/file-indexes/"
+ filename
)
if DEBUG:
print(cmd)
else:
os.system(cmd)


def create_index_files(dataset, volume, eos_dir):
def create_index_files(fs, dataset, volume, eos_dir, volume_dir):
"Create index files for the given dataset and volumes."
files = get_dataset_volume_files(dataset, volume)
filename = create_index_file(dataset, volume, files, eos_dir, 'txt')
copy_index_file(dataset, volume, filename, eos_dir)
filename = create_index_file(dataset, volume, files, eos_dir, 'json')
copy_index_file(dataset, volume, filename, eos_dir)
files = get_dataset_volume_files(fs, dataset, volume)
filebase = get_dataset_index_file_base(dataset) + "_" + volume + "_" + "file_index"

for output_type in ["txt", "json"]:
filename = create_index_file(filebase, files, eos_dir, output_type, volume_dir)
if filename:
copy_index_file(dataset, volume, filename, eos_dir, volume_dir)


def main(datasets = [], eos_dir = './inputs/eos-file-indexes/'):
def main(datasets=[], eos_dir="./inputs/eos-file-indexes/"):
"Do the job."

if not os.path.exists(eos_dir):
os.makedirs(eos_dir)
volume_dir = 0
volume_counter = 0
if not os.path.isdir(f"{eos_dir}/{str(volume_dir)}"):
os.makedirs(f"{eos_dir}/{str(volume_dir)}")

try:
fs = XRootDPyFS(f"{XROOTD_URI_BASE}/")
except Exception as my_exc:
print("We can't get the xrootdpyfs instance:", my_exc)
return -1

dataset_counter = 1
for dataset in datasets:
volumes = get_dataset_volumes(dataset)
print(f"Doing {dataset} ({dataset_counter}/{len(datasets)})")
dataset_counter += 1
volumes = get_dataset_volumes(fs, dataset)
if not volumes:
print(f"Error with the dataset '{dataset}'!")
return -1
for volume in volumes:
create_index_files(dataset, volume, eos_dir)
create_index_files(fs, dataset, volume, eos_dir, volume_dir)
volume_counter += 1
if volume_counter > 999:
volume_counter = 0
volume_dir += 1

if not os.path.isdir(f"{eos_dir}/{str(volume_dir)}"):
os.makedirs(f"{eos_dir}/{str(volume_dir)}")


if __name__ == '__main__':
if __name__ == "__main__":
main()
Loading