diff --git a/coldfront/plugins/sftocf/utils.py b/coldfront/plugins/sftocf/utils.py index 43be0b84c..305802340 100644 --- a/coldfront/plugins/sftocf/utils.py +++ b/coldfront/plugins/sftocf/utils.py @@ -22,6 +22,7 @@ save_json, log_missing, determine_size_fmt, + allocation_reaching_capacity_operations, id_present_missing_users, locate_or_create_dirpath, ) @@ -579,19 +580,13 @@ def __init__(self, volume=None): self._allocations = None # self.collection_filter = self.set_collection_parameters() - self.sf_user_data = self.collect_sf_user_data() - self.sf_usage_data = self.collect_sf_usage_data() + self._sf_user_data = None + self._sf_usage_data = None self._allocationquerymatches = None def return_connection_obj(self): raise NotImplementedError - def collect_sf_user_data(self): - raise NotImplementedError - - def collect_sf_usage_data(self): - raise NotImplementedError - @property def allocations(self): if self._allocations: @@ -604,7 +599,7 @@ def allocations(self): return self._allocations @property - def allocationquerymatches(self): + def allocationquerymatches(self, usage=False): # limit allocations to those in the volumes collected if self._allocationquerymatches: return self._allocationquerymatches @@ -613,13 +608,16 @@ def allocationquerymatches(self): allocation_list = [ (a.get_parent_resource.name.split('/')[0], a.path) for a in allocations ] - total_sort_key = itemgetter('path','volume') - allocation_usage_grouped = return_dict_of_groupings(self.sf_usage_data, total_sort_key) - missing_allocations = [ - (k,a) for k, a in allocation_usage_grouped if k not in allocation_list - ] - print("missing_allocations:", missing_allocations) - logger.warning('starfish allocations missing in coldfront: %s', missing_allocations) + # if usage is true, collect and update usage + allocation_usage_grouped = {} + if usage: + total_sort_key = itemgetter('path','volume') + allocation_usage_grouped = return_dict_of_groupings(self.sf_usage_data, total_sort_key) + missing_allocations = [ + (k,a) for k, a in allocation_usage_grouped if k not in allocation_list + ] + print("missing_allocations:", missing_allocations) + logger.warning('starfish allocations missing in coldfront: %s', missing_allocations) user_usage = [user for user in self.sf_user_data if user['path'] is not None] user_sort_key = itemgetter('path','volume') @@ -647,7 +645,6 @@ def clean_collected_data(self): user_usernames = {d['username'] for d in self.sf_user_data} user_models, missing_usernames = id_present_missing_users(user_usernames) missing_username_list = [d['username'] for d in missing_usernames] - logger.debug('allocation_usage:\n%s', self.sf_usage_data) # identify and remove allocation users that are no longer in the AD group for obj in self.allocationquerymatches: @@ -665,26 +662,30 @@ def clean_collected_data(self): obj.user_usage_entries.remove(i) return self.allocationquerymatches, user_models - def update_coldfront_objects(self, user_models): - """update coldfront allocation objects""" + def update_coldfront_allocation_usage(self): + """update coldfront allocation usage""" allocation_attribute_types = AllocationAttributeType.objects.all() - quota_b_attrtype = allocation_attribute_types.get(name='Quota_In_Bytes') quota_tb_attrtype = allocation_attribute_types.get(name='Storage Quota (TB)') - # 3. iterate across allocations for obj in self.allocationquerymatches: - logger.debug('updating allocation %s %s (path %s)', - obj.lab, obj.volume, obj.allocation.path - ) + allocation_reaching_capacity_operations(obj.allocation, obj.total_usage_entry['total_size']) obj.update_usage_attr(quota_b_attrtype, obj.total_usage_entry['total_size']) obj.update_usage_attr(quota_tb_attrtype, obj.total_usage_tb) - logger.info('allocation usage for allocation %s: %s bytes, %s terabytes', obj.allocation.pk, obj.total_usage_entry['total_size'], obj.total_usage_tb ) + + def update_coldfront_objects(self, user_models, usage=False): + """update coldfront allocation objects""" + if usage: + self.update_coldfront_allocation_usage() + # 3. iterate across allocations + for obj in self.allocationquerymatches(usage=usage): + logger.debug('updating allocation %s %s (path %s)', + obj.lab, obj.volume, obj.allocation.path + ) # identify and remove allocation users that are no longer in the AD group self.zero_out_absent_allocationusers(obj.query_usernames, obj.allocation) - for userdict in obj.user_usage_entries: user = next( u for u in user_models if userdict['username'].lower() == u.username.lower() @@ -717,25 +718,31 @@ def return_connection_obj(self): # 1. grab data from redash return StarFishRedash() - def collect_sf_user_data(self): + @property + def sf_user_data(self): """Collect starfish data using the Redash API. Return the results.""" - user_usage = self.connection_obj.return_query_results( - query='path_usage_query', volumes=self.volumes - ) - for d in user_usage: - d['username'] = d.pop('user_name') - d['volume'] = d.pop('vol_name') - d['path'] = d.pop('lab_path') - return user_usage - - def collect_sf_usage_data(self): - allocation_usage = self.connection_obj.return_query_results( - query='subdirectory', volumes=self.volumes - ) - for d in allocation_usage: - d['username'] = d.pop('user_name') - d['volume'] = d.pop('vol_name') - return allocation_usage + if not self._sf_user_data: + user_usage = self.connection_obj.return_query_results( + query='path_usage_query', volumes=self.volumes + ) + for d in user_usage: + d['username'] = d.pop('user_name') + d['volume'] = d.pop('vol_name') + d['path'] = d.pop('lab_path') + self._sf_user_data = user_usage + return self._sf_user_data + + @property + def sf_usage_data(self): + if not self._sf_usage_data: + allocation_usage = self.connection_obj.return_query_results( + query='subdirectory', volumes=self.volumes + ) + for d in allocation_usage: + d['username'] = d.pop('user_name') + d['volume'] = d.pop('vol_name') + self._sf_usage_data = allocation_usage + return self._sf_usage_data def collect_sf_data_for_lab(self, lab_name, volume_name): """Collect user-level and allocation-level usage data for a specific lab.""" @@ -839,105 +846,111 @@ def items_to_pop(self): return ['size_sum_hum', 'rec_aggrs', 'physical_nlinks_size_sum', 'physical_nlinks_size_sum_hum', 'volume_display_name', 'count', 'fn'] - def collect_sf_user_data(self): + @property + def sf_user_data(self): """Collect starfish data using the REST API. Return the results.""" - # 1. produce dict of all labs to be collected & volumes on which their data is located - lab_res = self.produce_lab_dict() - # 2. produce list of files collected & list of lab/volume/filename tuples to collect - filepaths, to_collect = self.check_volume_collection(lab_res) - # 3. produce set of all volumes to be queried - vol_set = {i[1] for i in to_collect} - vols = [vol for vol in vol_set if vol in svp['volumes']] - for volume in vols: - projects = [t for t in to_collect if t[1] == volume] - logger.debug('vol: %s\nto_collect_subset: %s', volume, projects) - - ### OLD METHOD ### - for tup in projects: - p = tup[0] - filepath = tup[3] - lab_volpath = tup[2] #volumepath[0] if '_l3' not in p else volumepath[1] - logger.debug('filepath: %s lab: %s volpath: %s', filepath, p, lab_volpath) - usage_query = self.connection_obj.create_query( - f'groupname={p} type=f', - 'volume,username,groupname', - f'{volume}:{lab_volpath}', - ) - data = self.return_usage_query_data(usage_query.result) - if data: - contents = [d for d in data if d['username'] != 'root'] - for entry in contents: - # entry['size_sum'] = entry['rec_aggrs']['size'] - # entry['full_path'] = entry['parent_path']+'/'+entry['fn'] - for item in self.items_to_pop: - entry.pop(item, None) - record = { - 'server': self.connection_obj.name, - 'volume': volume, - 'path': lab_volpath, - 'project': p, - 'date': DATESTR, - 'contents': contents, - } - save_json(filepath, record) - filepaths.append(filepath) - - collected_data = [] - for filepath in filepaths: - content = read_json(filepath) - for user in content['contents']: - user.update({ - 'volume': content['volume'], - 'path': content['path'], - 'project': content['project'], - }) - collected_data.append(user) - return collected_data - - def collect_sf_usage_data(self): - """Collect usage data from starfish for all labs in the lab list.""" - # 1. produce dict of all labs to be collected & volumes on which their data is located - lab_res = self.produce_lab_dict() - lab_res = [(k, i[0], i[1]) for k, v in lab_res.items() for i in v] - # 2. produce set of all volumes to be queried - vol_set = {i[1] for i in lab_res} - vols = [vol for vol in vol_set if vol in svp['volumes']] - entries = [] - for volume in vols: - volumepath = svp['volumes'][volume] - projects = [t for t in lab_res if t[1] == volume] - logger.debug('vol: %s\nto_collect_subset: %s', volume, projects) - - ### OLD METHOD ### - for tup in projects: - p = tup[0] - lab_volpath = volumepath[0] if '_l3' not in p else volumepath[1] - logger.debug('lab: %s volpath: %s', p, lab_volpath) - usage_query = self.connection_obj.create_query( - f'groupname={p} type=d depth=1', - 'volume,parent_path,groupname,rec_aggrs.size,fn', - f'{volume}:{lab_volpath}', - qformat='parent_path +aggrs.by_gid', - ) - data = self.return_usage_query_data(usage_query.result) - if data: - if len(data) > 1: - logger.error('too many data entries for %s: %s', p, data) - continue - entry = data[0] - entry.update({ - 'size_sum': entry['rec_aggrs']['size'], - 'full_path': entry['parent_path']+'/'+entry['fn'], - 'server': self.connection_obj.name, - 'volume': volume, - 'path': lab_volpath, - 'project': p, - 'date': DATESTR, + if not self._sf_user_data: + # 1. produce dict of all labs to be collected & volumes on which their data is located + lab_res = self.produce_lab_dict() + # 2. produce list of files collected & list of lab/volume/filename tuples to collect + filepaths, to_collect = self.check_volume_collection(lab_res) + # 3. produce set of all volumes to be queried + vol_set = {i[1] for i in to_collect} + vols = [vol for vol in vol_set if vol in svp['volumes']] + for volume in vols: + projects = [t for t in to_collect if t[1] == volume] + logger.debug('vol: %s\nto_collect_subset: %s', volume, projects) + + ### OLD METHOD ### + for tup in projects: + p = tup[0] + filepath = tup[3] + lab_volpath = tup[2] #volumepath[0] if '_l3' not in p else volumepath[1] + logger.debug('filepath: %s lab: %s volpath: %s', filepath, p, lab_volpath) + usage_query = self.connection_obj.create_query( + f'groupname={p} type=f', + 'volume,username,groupname', + f'{volume}:{lab_volpath}', + ) + data = self.return_usage_query_data(usage_query.result) + if data: + contents = [d for d in data if d['username'] != 'root'] + for entry in contents: + # entry['size_sum'] = entry['rec_aggrs']['size'] + # entry['full_path'] = entry['parent_path']+'/'+entry['fn'] + for item in self.items_to_pop: + entry.pop(item, None) + record = { + 'server': self.connection_obj.name, + 'volume': volume, + 'path': lab_volpath, + 'project': p, + 'date': DATESTR, + 'contents': contents, + } + save_json(filepath, record) + filepaths.append(filepath) + + collected_data = [] + for filepath in filepaths: + content = read_json(filepath) + for user in content['contents']: + user.update({ + 'volume': content['volume'], + 'path': content['path'], + 'project': content['project'], }) - for item in self.items_to_pop: - entry.pop(item) - entries.append(entry) - return entries + collected_data.append(user) + self._sf_user_data = collected_data + return self._sf_user_data + + @property + def sf_usage_data(self): + """Collect usage data from starfish for all labs in the lab list.""" + if not self._sf_usage_data: + # 1. produce dict of all labs to be collected & volumes containing their data + lab_res = self.produce_lab_dict() + lab_res = [(k, i[0], i[1]) for k, v in lab_res.items() for i in v] + # 2. produce set of all volumes to be queried + vol_set = {i[1] for i in lab_res} + vols = [vol for vol in vol_set if vol in svp['volumes']] + entries = [] + for volume in vols: + volumepath = svp['volumes'][volume] + projects = [t for t in lab_res if t[1] == volume] + logger.debug('vol: %s\nto_collect_subset: %s', volume, projects) + + ### OLD METHOD ### + for tup in projects: + p = tup[0] + lab_volpath = volumepath[0] if '_l3' not in p else volumepath[1] + logger.debug('lab: %s volpath: %s', p, lab_volpath) + usage_query = self.connection_obj.create_query( + f'groupname={p} type=d depth=1', + 'volume,parent_path,groupname,rec_aggrs.size,fn', + f'{volume}:{lab_volpath}', + qformat='parent_path +aggrs.by_gid', + ) + data = self.return_usage_query_data(usage_query.result) + if data: + if len(data) > 1: + logger.error('too many data entries for %s: %s', p, data) + continue + entry = data[0] + entry.update({ + 'size_sum': entry['rec_aggrs']['size'], + 'full_path': entry['parent_path']+'/'+entry['fn'], + 'server': self.connection_obj.name, + 'volume': volume, + 'path': lab_volpath, + 'project': p, + 'date': DATESTR, + }) + for item in self.items_to_pop: + entry.pop(item) + entries.append(entry) + self._sf_usage_data = entries + return self._sf_usage_data @receiver(allocation_activate) def update_allocation(sender, **kwargs):