Skip to content

Commit

Permalink
Add filterFilesInRucio function
Browse files Browse the repository at this point in the history
  • Loading branch information
vkuznet committed Jan 21, 2021
1 parent 93f268e commit 65f68bf
Showing 1 changed file with 38 additions and 1 deletion.
39 changes: 38 additions & 1 deletion services/combined.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,9 @@ func files4dbRunsSite(dasquery dasql.DASQuery) []mongo.DASRecord {
}
// check files in Phedex for give site (should take it form spec)
site := spec["site"].(string)
for _, fname := range filterFiles(fileList, site) {
dataset := spec["dataset"].(string)
// for _, fname := range filterFiles(fileList, site) {
for _, fname := range filterFilesInRucio(dasquery, fileList, dataset, site) {
row := make(mongo.DASRecord)
// put into file das record, internal type must be list
row["file"] = []mongo.DASRecord{{"name": fname}}
Expand All @@ -645,3 +647,38 @@ func (LocalAPIs) Files4DatasetRunsSite(dasquery dasql.DASQuery) []mongo.DASRecor
func (LocalAPIs) Files4BlockRunsSite(dasquery dasql.DASQuery) []mongo.DASRecord {
return files4dbRunsSite(dasquery)
}

type RucioRecordRSE struct {
DIDs []map[string]string `json:"dids"`
Domain string `json:"domain"`
RSE string `json:"rse_expression"`
}

// helper function to filter files which belong to given site using Rucio API
func filterFilesInRucio(dasquery dasql.DASQuery, files []string, dataset, site string) []string {
var out []string
rec := make(map[string]string)
rec["name"] = dataset
rec["scope"] = "cms"
var dids []map[string]string
dids = append(dids, rec)
spec := RucioRecordRSE{DIDs: dids, Domain: "all", RSE: site}
// make POST request to Rucio to obtain list of files for given RSE request record
args, err := json.Marshal(spec)
if err != nil {
log.Printf("ERROR: unable to unmarshal spec %+v, error %v\n", spec, err)
return out
}
furl := fmt.Sprintf("%s/replicas/list", RucioUrl())
resp := utils.FetchResponse(furl, string(args)) // POST request
records := RucioUnmarshal(dasquery, "full_record", resp.Data)
for _, r := range records {
if v, ok := r["name"]; ok {
fname := v.(string)
if utils.FindInList(fname, files) {
out = append(out, fname)
}
}
}
return out
}

0 comments on commit 65f68bf

Please sign in to comment.