Skip to content

Commit

Permalink
update sftocf to not update allocation-wide usage
Browse files Browse the repository at this point in the history
  • Loading branch information
claire-peters committed Aug 13, 2024
1 parent 9cee55c commit 9acbde1
Showing 1 changed file with 154 additions and 141 deletions.
295 changes: 154 additions & 141 deletions coldfront/plugins/sftocf/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
save_json,
log_missing,
determine_size_fmt,
allocation_reaching_capacity_operations,
id_present_missing_users,
locate_or_create_dirpath,
)
Expand Down Expand Up @@ -579,19 +580,13 @@ def __init__(self, volume=None):

self._allocations = None
# self.collection_filter = self.set_collection_parameters()
self.sf_user_data = self.collect_sf_user_data()
self.sf_usage_data = self.collect_sf_usage_data()
self._sf_user_data = None
self._sf_usage_data = None
self._allocationquerymatches = None

def return_connection_obj(self):
raise NotImplementedError

def collect_sf_user_data(self):
raise NotImplementedError

def collect_sf_usage_data(self):
raise NotImplementedError

@property
def allocations(self):
if self._allocations:
Expand All @@ -604,7 +599,7 @@ def allocations(self):
return self._allocations

@property
def allocationquerymatches(self):
def allocationquerymatches(self, usage=False):
# limit allocations to those in the volumes collected
if self._allocationquerymatches:
return self._allocationquerymatches
Expand All @@ -613,13 +608,16 @@ def allocationquerymatches(self):
allocation_list = [
(a.get_parent_resource.name.split('/')[0], a.path) for a in allocations
]
total_sort_key = itemgetter('path','volume')
allocation_usage_grouped = return_dict_of_groupings(self.sf_usage_data, total_sort_key)
missing_allocations = [
(k,a) for k, a in allocation_usage_grouped if k not in allocation_list
]
print("missing_allocations:", missing_allocations)
logger.warning('starfish allocations missing in coldfront: %s', missing_allocations)
# if usage is true, collect and update usage
allocation_usage_grouped = {}
if usage:
total_sort_key = itemgetter('path','volume')
allocation_usage_grouped = return_dict_of_groupings(self.sf_usage_data, total_sort_key)
missing_allocations = [
(k,a) for k, a in allocation_usage_grouped if k not in allocation_list
]
print("missing_allocations:", missing_allocations)
logger.warning('starfish allocations missing in coldfront: %s', missing_allocations)

user_usage = [user for user in self.sf_user_data if user['path'] is not None]
user_sort_key = itemgetter('path','volume')
Expand Down Expand Up @@ -647,7 +645,6 @@ def clean_collected_data(self):
user_usernames = {d['username'] for d in self.sf_user_data}
user_models, missing_usernames = id_present_missing_users(user_usernames)
missing_username_list = [d['username'] for d in missing_usernames]
logger.debug('allocation_usage:\n%s', self.sf_usage_data)

# identify and remove allocation users that are no longer in the AD group
for obj in self.allocationquerymatches:
Expand All @@ -665,26 +662,30 @@ def clean_collected_data(self):
obj.user_usage_entries.remove(i)
return self.allocationquerymatches, user_models

def update_coldfront_objects(self, user_models):
"""update coldfront allocation objects"""
def update_coldfront_allocation_usage(self):
"""update coldfront allocation usage"""
allocation_attribute_types = AllocationAttributeType.objects.all()

quota_b_attrtype = allocation_attribute_types.get(name='Quota_In_Bytes')
quota_tb_attrtype = allocation_attribute_types.get(name='Storage Quota (TB)')
# 3. iterate across allocations
for obj in self.allocationquerymatches:
logger.debug('updating allocation %s %s (path %s)',
obj.lab, obj.volume, obj.allocation.path
)
allocation_reaching_capacity_operations(obj.allocation, obj.total_usage_entry['total_size'])
obj.update_usage_attr(quota_b_attrtype, obj.total_usage_entry['total_size'])
obj.update_usage_attr(quota_tb_attrtype, obj.total_usage_tb)

logger.info('allocation usage for allocation %s: %s bytes, %s terabytes',
obj.allocation.pk, obj.total_usage_entry['total_size'], obj.total_usage_tb
)

def update_coldfront_objects(self, user_models, usage=False):
"""update coldfront allocation objects"""
if usage:
self.update_coldfront_allocation_usage()
# 3. iterate across allocations
for obj in self.allocationquerymatches(usage=usage):
logger.debug('updating allocation %s %s (path %s)',
obj.lab, obj.volume, obj.allocation.path
)
# identify and remove allocation users that are no longer in the AD group
self.zero_out_absent_allocationusers(obj.query_usernames, obj.allocation)

for userdict in obj.user_usage_entries:
user = next(
u for u in user_models if userdict['username'].lower() == u.username.lower()
Expand Down Expand Up @@ -717,25 +718,31 @@ def return_connection_obj(self):
# 1. grab data from redash
return StarFishRedash()

def collect_sf_user_data(self):
@property
def sf_user_data(self):
"""Collect starfish data using the Redash API. Return the results."""
user_usage = self.connection_obj.return_query_results(
query='path_usage_query', volumes=self.volumes
)
for d in user_usage:
d['username'] = d.pop('user_name')
d['volume'] = d.pop('vol_name')
d['path'] = d.pop('lab_path')
return user_usage

def collect_sf_usage_data(self):
allocation_usage = self.connection_obj.return_query_results(
query='subdirectory', volumes=self.volumes
)
for d in allocation_usage:
d['username'] = d.pop('user_name')
d['volume'] = d.pop('vol_name')
return allocation_usage
if not self._sf_user_data:
user_usage = self.connection_obj.return_query_results(
query='path_usage_query', volumes=self.volumes
)
for d in user_usage:
d['username'] = d.pop('user_name')
d['volume'] = d.pop('vol_name')
d['path'] = d.pop('lab_path')
self._sf_user_data = user_usage
return self._sf_user_data

@property
def sf_usage_data(self):
if not self._sf_usage_data:
allocation_usage = self.connection_obj.return_query_results(
query='subdirectory', volumes=self.volumes
)
for d in allocation_usage:
d['username'] = d.pop('user_name')
d['volume'] = d.pop('vol_name')
self._sf_usage_data = allocation_usage
return self._sf_usage_data

def collect_sf_data_for_lab(self, lab_name, volume_name):
"""Collect user-level and allocation-level usage data for a specific lab."""
Expand Down Expand Up @@ -839,105 +846,111 @@ def items_to_pop(self):
return ['size_sum_hum', 'rec_aggrs', 'physical_nlinks_size_sum',
'physical_nlinks_size_sum_hum', 'volume_display_name', 'count', 'fn']

def collect_sf_user_data(self):
@property
def sf_user_data(self):
"""Collect starfish data using the REST API. Return the results."""
# 1. produce dict of all labs to be collected & volumes on which their data is located
lab_res = self.produce_lab_dict()
# 2. produce list of files collected & list of lab/volume/filename tuples to collect
filepaths, to_collect = self.check_volume_collection(lab_res)
# 3. produce set of all volumes to be queried
vol_set = {i[1] for i in to_collect}
vols = [vol for vol in vol_set if vol in svp['volumes']]
for volume in vols:
projects = [t for t in to_collect if t[1] == volume]
logger.debug('vol: %s\nto_collect_subset: %s', volume, projects)

### OLD METHOD ###
for tup in projects:
p = tup[0]
filepath = tup[3]
lab_volpath = tup[2] #volumepath[0] if '_l3' not in p else volumepath[1]
logger.debug('filepath: %s lab: %s volpath: %s', filepath, p, lab_volpath)
usage_query = self.connection_obj.create_query(
f'groupname={p} type=f',
'volume,username,groupname',
f'{volume}:{lab_volpath}',
)
data = self.return_usage_query_data(usage_query.result)
if data:
contents = [d for d in data if d['username'] != 'root']
for entry in contents:
# entry['size_sum'] = entry['rec_aggrs']['size']
# entry['full_path'] = entry['parent_path']+'/'+entry['fn']
for item in self.items_to_pop:
entry.pop(item, None)
record = {
'server': self.connection_obj.name,
'volume': volume,
'path': lab_volpath,
'project': p,
'date': DATESTR,
'contents': contents,
}
save_json(filepath, record)
filepaths.append(filepath)

collected_data = []
for filepath in filepaths:
content = read_json(filepath)
for user in content['contents']:
user.update({
'volume': content['volume'],
'path': content['path'],
'project': content['project'],
})
collected_data.append(user)
return collected_data

def collect_sf_usage_data(self):
"""Collect usage data from starfish for all labs in the lab list."""
# 1. produce dict of all labs to be collected & volumes on which their data is located
lab_res = self.produce_lab_dict()
lab_res = [(k, i[0], i[1]) for k, v in lab_res.items() for i in v]
# 2. produce set of all volumes to be queried
vol_set = {i[1] for i in lab_res}
vols = [vol for vol in vol_set if vol in svp['volumes']]
entries = []
for volume in vols:
volumepath = svp['volumes'][volume]
projects = [t for t in lab_res if t[1] == volume]
logger.debug('vol: %s\nto_collect_subset: %s', volume, projects)

### OLD METHOD ###
for tup in projects:
p = tup[0]
lab_volpath = volumepath[0] if '_l3' not in p else volumepath[1]
logger.debug('lab: %s volpath: %s', p, lab_volpath)
usage_query = self.connection_obj.create_query(
f'groupname={p} type=d depth=1',
'volume,parent_path,groupname,rec_aggrs.size,fn',
f'{volume}:{lab_volpath}',
qformat='parent_path +aggrs.by_gid',
)
data = self.return_usage_query_data(usage_query.result)
if data:
if len(data) > 1:
logger.error('too many data entries for %s: %s', p, data)
continue
entry = data[0]
entry.update({
'size_sum': entry['rec_aggrs']['size'],
'full_path': entry['parent_path']+'/'+entry['fn'],
'server': self.connection_obj.name,
'volume': volume,
'path': lab_volpath,
'project': p,
'date': DATESTR,
if not self._sf_user_data:
# 1. produce dict of all labs to be collected & volumes on which their data is located
lab_res = self.produce_lab_dict()
# 2. produce list of files collected & list of lab/volume/filename tuples to collect
filepaths, to_collect = self.check_volume_collection(lab_res)
# 3. produce set of all volumes to be queried
vol_set = {i[1] for i in to_collect}
vols = [vol for vol in vol_set if vol in svp['volumes']]
for volume in vols:
projects = [t for t in to_collect if t[1] == volume]
logger.debug('vol: %s\nto_collect_subset: %s', volume, projects)

### OLD METHOD ###
for tup in projects:
p = tup[0]
filepath = tup[3]
lab_volpath = tup[2] #volumepath[0] if '_l3' not in p else volumepath[1]
logger.debug('filepath: %s lab: %s volpath: %s', filepath, p, lab_volpath)
usage_query = self.connection_obj.create_query(
f'groupname={p} type=f',
'volume,username,groupname',
f'{volume}:{lab_volpath}',
)
data = self.return_usage_query_data(usage_query.result)
if data:
contents = [d for d in data if d['username'] != 'root']
for entry in contents:
# entry['size_sum'] = entry['rec_aggrs']['size']
# entry['full_path'] = entry['parent_path']+'/'+entry['fn']
for item in self.items_to_pop:
entry.pop(item, None)
record = {
'server': self.connection_obj.name,
'volume': volume,
'path': lab_volpath,
'project': p,
'date': DATESTR,
'contents': contents,
}
save_json(filepath, record)
filepaths.append(filepath)

collected_data = []
for filepath in filepaths:
content = read_json(filepath)
for user in content['contents']:
user.update({
'volume': content['volume'],
'path': content['path'],
'project': content['project'],
})
for item in self.items_to_pop:
entry.pop(item)
entries.append(entry)
return entries
collected_data.append(user)
self._sf_user_data = collected_data
return self._sf_user_data

@property
def sf_usage_data(self):
"""Collect usage data from starfish for all labs in the lab list."""
if not self._sf_usage_data:
# 1. produce dict of all labs to be collected & volumes containing their data
lab_res = self.produce_lab_dict()
lab_res = [(k, i[0], i[1]) for k, v in lab_res.items() for i in v]
# 2. produce set of all volumes to be queried
vol_set = {i[1] for i in lab_res}
vols = [vol for vol in vol_set if vol in svp['volumes']]
entries = []
for volume in vols:
volumepath = svp['volumes'][volume]
projects = [t for t in lab_res if t[1] == volume]
logger.debug('vol: %s\nto_collect_subset: %s', volume, projects)

### OLD METHOD ###
for tup in projects:
p = tup[0]
lab_volpath = volumepath[0] if '_l3' not in p else volumepath[1]
logger.debug('lab: %s volpath: %s', p, lab_volpath)
usage_query = self.connection_obj.create_query(
f'groupname={p} type=d depth=1',
'volume,parent_path,groupname,rec_aggrs.size,fn',
f'{volume}:{lab_volpath}',
qformat='parent_path +aggrs.by_gid',
)
data = self.return_usage_query_data(usage_query.result)
if data:
if len(data) > 1:
logger.error('too many data entries for %s: %s', p, data)
continue
entry = data[0]
entry.update({
'size_sum': entry['rec_aggrs']['size'],
'full_path': entry['parent_path']+'/'+entry['fn'],
'server': self.connection_obj.name,
'volume': volume,
'path': lab_volpath,
'project': p,
'date': DATESTR,
})
for item in self.items_to_pop:
entry.pop(item)
entries.append(entry)
self._sf_usage_data = entries
return self._sf_usage_data

@receiver(allocation_activate)
def update_allocation(sender, **kwargs):
Expand Down

0 comments on commit 9acbde1

Please sign in to comment.