Skip to content

Commit

Permalink
deterministic processing order, cache structure fixed, labels per mode
Browse files Browse the repository at this point in the history
  • Loading branch information
PJDude committed May 17, 2024
1 parent 80d2788 commit da7c296
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 54 deletions.
65 changes: 35 additions & 30 deletions src/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
if os_name=='nt':
from subprocess import CREATE_NO_WINDOW

from sys import exit as sys_exit
from pickle import dumps,loads
from zstandard import ZstdCompressor,ZstdDecompressor

Expand Down Expand Up @@ -567,10 +568,6 @@ def crc_cache_write(self):
info_speed=0
info_threads='?'

#images_hashes_cache={}
#def image_hash_cache_file(self,dev,hash_size):
# return sep.join([self.cache_dir,f'{dev}.{hash_size}.ih.dat'])

def images_hashes_cache_read(self):
self.info='image hashes cache read'

Expand Down Expand Up @@ -622,7 +619,7 @@ def my_hash_combo(file,hash_size):

return tuple(seq_hash)

for index_tuple,fullpath in source_dict.items():
for index_tuple,fullpath in sorted(source_dict.items(), key = lambda x : x[0][7], reverse=True):
if self.abort_action:
break

Expand All @@ -636,8 +633,9 @@ def my_hash_combo(file,hash_size):
continue

if all_rotations:
file_rotate = file.rotate
try:
result_dict[index_tuple]=( my_hash_combo(file,hash_size),my_hash_combo(file.rotate(90),hash_size),my_hash_combo(file.rotate(180),hash_size),my_hash_combo(file.rotate(270),hash_size) )
result_dict[index_tuple]=( my_hash_combo(file,hash_size),my_hash_combo(file_rotate(90),hash_size),my_hash_combo(file_rotate(180),hash_size),my_hash_combo(file_rotate(270),hash_size) )

except Exception as e:
self.log.error(f'hashing file: {fullpath} error: {e}.')
Expand All @@ -650,7 +648,7 @@ def my_hash_combo(file,hash_size):
self.log.error(f'hashing file: {fullpath} error: {e}.')
continue

#sys.exit(0)
sys_exit() #thread

info=''
def image_hashing(self,hash_size,all_rotations):
Expand All @@ -669,15 +667,14 @@ def image_hashing(self,hash_size,all_rotations):
anything_new=False

self.scan_results_images_hashes={}
#self.hashes_to_calculate=set()

max_threads = cpu_count()

imagehash_threads_sets_source = {i:{} for i in range(max_threads)}
imagehash_threads_sets_results = {i:{} for i in range(max_threads)}
imagehash_threads = {i:Thread(target=lambda iloc=i: self.imagehsh_calc_in_thread(iloc,hash_size,all_rotations,imagehash_threads_sets_source[iloc],imagehash_threads_sets_results[iloc]),daemon=True) for i in range(max_threads)}

set_index=0
thread_index=0

images_quantity_cache_read=0
images_quantity_need_to_calculate=0
Expand All @@ -686,30 +683,29 @@ def image_hashing(self,hash_size,all_rotations):
size_to_calculate = 0

rotations_list = (0,1,2,3) if all_rotations else (0,)
for pathnr,path,file_name,mtime,ctime,dev,inode,size in self.scan_results_images:

for pathnr,path,file_name,mtime,ctime,dev,inode,size in sorted(self.scan_results_images, key = lambda x : x[7], reverse=True):
all_rotations_from_cache = True
for rotation in rotations_list:
dict_key = (dev,inode,mtime,rotation)
dict_key = (dev,inode,mtime,hash_size,rotation)
if dict_key in self.images_hashes_cache:
if val := self.images_hashes_cache[dict_key]:
#print('read from cache:',dict_key,val)
self.scan_results_images_hashes[(pathnr,path,file_name,mtime,ctime,dev,inode,size,rotation)] = val
if rotation==0:
images_quantity_cache_read+=1
size_from_cache += size
continue
#else:
# self.images_hashes_cache[dict_key]={}
else:
all_rotations_from_cache = False
break

fullpath=self.get_full_path_to_scan(pathnr,path,file_name)
if all_rotations_from_cache:
images_quantity_cache_read+=1
size_from_cache += size
else:
fullpath=self.get_full_path_to_scan(pathnr,path,file_name)

imagehash_threads_sets_source[set_index][(pathnr,path,file_name,mtime,ctime,dev,inode,size)]=fullpath
imagehash_threads_sets_source[thread_index][(pathnr,path,file_name,mtime,ctime,dev,inode,size)]=fullpath

set_index += 1
set_index %= max_threads
thread_index = (thread_index+1) % max_threads

images_quantity_need_to_calculate += 1
size_to_calculate += size
images_quantity_need_to_calculate += 1
size_to_calculate += size

#self.images_quantity_need_to_calculate = images_quantity_need_to_calculate
#self.images_quantity_cache_read = images_quantity_cache_read
Expand All @@ -734,14 +730,13 @@ def image_hashing(self,hash_size,all_rotations):
sto_by_self_info_total = 100.0/self.info_total
sto_by_self_sum_size = 100.0/self.sum_size

self.info = f'Threads:{max_threads}'
#self.info = f'Threads:{max_threads}'

while True:
all_dead=True
for i in range(max_threads):
if imagehash_threads[i].is_alive():
all_dead=False
sleep(0.02)

if all_dead:
break
Expand All @@ -751,6 +746,7 @@ def image_hashing(self,hash_size,all_rotations):

self.info_size_done_perc = sto_by_self_sum_size*self.info_size_done
self.info_files_done_perc = sto_by_self_info_total*self.info_files_done
sleep(0.02)

for i in range(max_threads):
imagehash_threads[i].join()
Expand All @@ -761,19 +757,23 @@ def image_hashing(self,hash_size,all_rotations):
for rotation,ihash in enumerate(ihash_rotations):
if (rotation in rotations_list) and ihash:
self.scan_results_images_hashes[(pathnr,path,file_name,mtime,ctime,dev,inode,size,rotation)]=numpy_array(ihash)
self.images_hashes_cache[(dev,inode,mtime,rotation)]=ihash
self.images_hashes_cache[(dev,inode,mtime,hash_size,rotation)]=ihash

anything_new=True

if anything_new:
self.info = self.info_line = 'Writing cache ...'
self.images_hashes_cache_write()

sys_exit() #thread

def similarity_clustering(self,hash_size,distance,all_rotations):
pool = []
keys = []

for key,imagehash in self.scan_results_images_hashes.items():
self.info_line = self.info = 'Preparing data pool ...'

for key,imagehash in sorted(self.scan_results_images_hashes.items(), key=lambda x :x[0][7],reverse = True) :
pool.append(imagehash)
keys.append( key )

Expand All @@ -782,14 +782,15 @@ def similarity_clustering(self,hash_size,distance,all_rotations):
self.info_line = self.info = 'Clustering ...'

model = DBSCAN(eps=de_norm_distance, min_samples=2,n_jobs=-1)

fit = model.fit(pool)

labels = fit.labels_

unique_labels = set(labels)
groups = defaultdict(set)

self.info_line = self.info = 'Separating groups ...'

for label,key in zip(labels,keys):
if label!=-1:
groups[label].add(key)
Expand All @@ -801,6 +802,10 @@ def similarity_clustering(self,hash_size,distance,all_rotations):
for key in lab_keys:
self_files_of_images_groups_str_label_add(key)

del model

sys_exit() #thread

def crc_calc(self):
self.crc_cache_read()

Expand Down
74 changes: 50 additions & 24 deletions src/dude.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ def __init__(self,cwd,paths_to_add=None,exclude=None,exclude_regexp=None,norun=N

hg_indices=('01','02','03','04','05','06','07','08', '11','12','13','14','15','16','17','18', '21','22','23','24','25','26','27','28', '31','32','33','34','35','36','37','38',)
self.hg_ico={ i:self_ico[str('hg'+j)] for i,j in enumerate(hg_indices) }
self.hg_ico_len = len(self.hg_ico)

self.icon_softlink_target=self_ico['softlink_target']
self.icon_softlink_dir_target=self_ico['softlink_dir_target']
Expand Down Expand Up @@ -1185,6 +1186,13 @@ def help_cascade_post():

self_main.mainloop()
#######################################################################

hg_index = 0

def get_hg_ico(self):
self.hg_index=(self.hg_index+1) % self.hg_ico_len
return self.hg_ico[self.hg_index]

def similarity_mode_change(self):

if self.similarity_mode_var.get():
Expand Down Expand Up @@ -1814,7 +1822,11 @@ def show_tooltip_groups(self,event):
self.tooltip_deiconify()
else:
crc=item
self.tooltip_lab_configure(text='CRC: %s' % crc )
if self.similarity_mode:
self.tooltip_lab_configure(text='GROUP: %s' % crc )
else:
self.tooltip_lab_configure(text='CRC: %s' % crc )

self.tooltip_deiconify()

elif col:
Expand Down Expand Up @@ -3064,8 +3076,6 @@ def scan(self):

time_without_busy_sign=0

hr_index=0

self_progress_dialog_on_scan_progr1var.set(0)
self_progress_dialog_on_scan_lab_r1_config(text='- - - -')
self_progress_dialog_on_scan_progr2var.set(0)
Expand All @@ -3082,8 +3092,7 @@ def scan(self):

scan_thread_is_alive = scan_thread.is_alive

self_hg_ico = self.hg_ico
len_self_hg_ico = len(self_hg_ico)
self_get_hg_ico = self.get_hg_ico

local_bytes_to_str = bytes_to_str

Expand Down Expand Up @@ -3120,8 +3129,7 @@ def scan(self):
self_progress_dialog_on_scan_lab[2].configure(image=self.ico_empty)
else :
if now>time_without_busy_sign+1.0:
self_progress_dialog_on_scan_lab[2].configure(image=self_hg_ico[hr_index],text = '', compound='left')
hr_index=(hr_index+1) % len_self_hg_ico
self_progress_dialog_on_scan_lab[2].configure(image=self_get_hg_ico(),text = '', compound='left')

self_tooltip_message[str_self_progress_dialog_on_scan_abort_button]='currently scanning:\n%s...' % dude_core.info_line
self_configure_tooltip(str_self_progress_dialog_on_scan_abort_button)
Expand Down Expand Up @@ -3159,6 +3167,8 @@ def scan(self):
prev_progress_size=0
prev_progress_quant=0

self_get_hg_ico = self.get_hg_ico

if similarity_mode:
self_progress_dialog_on_scan_lab[0].configure(image='',text='')
self_progress_dialog_on_scan_lab[1].configure(text='')
Expand Down Expand Up @@ -3208,17 +3218,16 @@ def scan(self):

if dude_core.can_abort:
if self.action_abort:
self_progress_dialog_on_scan_lab[0].configure(image='',text='Images hashing aborted')
self_progress_dialog_on_scan_lab[1].configure(text='')
self_progress_dialog_on_scan_lab[0].configure(text='',image='')
self_progress_dialog_on_scan_lab[1].configure(image='',text='Images hashing aborted')
self_progress_dialog_on_scan_lab[2].configure(text='')
self_progress_dialog_on_scan_lab[3].configure(text='')
self_progress_dialog_on_scan_lab[4].configure(text='')
self_progress_dialog_on_scan_area_main_update()
dude_core.abort()
break

self_progress_dialog_on_scan_lab[0].configure(image=self_hg_ico[hr_index],text='')
hr_index=(hr_index+1) % len_self_hg_ico
self_progress_dialog_on_scan_lab[0].configure(image=self_get_hg_ico(),text='')

self_status(dude_core.info)

Expand Down Expand Up @@ -3250,8 +3259,7 @@ def scan(self):
self_progress_dialog_on_scan.abort_button.configure(state='disabled',text='',image='')

while sc_thread_is_alive():
self_progress_dialog_on_scan_lab[0].configure(image=self_hg_ico[hr_index],text='')
hr_index=(hr_index+1) % len_self_hg_ico
self_progress_dialog_on_scan_lab[0].configure(image=self_get_hg_ico(),text='')

self_main_after(50,lambda : wait_var_set(not wait_var_get()))
self_main_wait_variable(wait_var)
Expand All @@ -3277,8 +3285,6 @@ def scan(self):
self_progress_dialog_on_scan.widget.update()
self.main.focus_set()

ih_thread.join()

else:
self_status('Calculating CRC ...')

Expand Down Expand Up @@ -3352,8 +3358,7 @@ def scan(self):
self_progress_dialog_on_scan_lab[0].configure(image=self.ico_empty)
else :
if now>time_without_busy_sign+1.0:
self_progress_dialog_on_scan_lab[0].configure(image=self_hg_ico[hr_index],text='')
hr_index=(hr_index+1) % len_self_hg_ico
self_progress_dialog_on_scan_lab[0].configure(image=self_get_hg_ico(),text='')

self_tooltip_message[str_self_progress_dialog_on_scan_abort_button]='crc calculating:\n%s...' % dude_core.info_line
self_configure_tooltip(str_self_progress_dialog_on_scan_abort_button)
Expand Down Expand Up @@ -3753,9 +3758,14 @@ def initial_focus(self):
def groups_show(self):
#self.menu_disable()


if self.similarity_mode:
self_idfunc=self.idfunc = (lambda i,d,r : '%s-%s-$s' % (i,d,r)) if len(dude_core.devs)>1 else (lambda i,d,r : '%s-%s' % (i,r))
self.groups_tree.heading('#0',text='GROUP/Scan Path',anchor='w')
self.folder_tree.heading('#0',text='GROUP',anchor='w')
self_idfunc=self.idfunc = (lambda i,d,r : '%s-%s-%s' % (i,d,r)) if len(dude_core.devs)>1 else (lambda i,d,r : '%s-%s' % (i,r))
else:
self.groups_tree.heading('#0',text='CRC/Scan Path',anchor='w')
self.folder_tree.heading('#0',text='CRC',anchor='w')
self_idfunc=self.idfunc = (lambda i,d,r=0 : '%s-%s' % (i,d)) if len(dude_core.devs)>1 else (lambda i,d,r=0 : str(i))

self_status=self.status
Expand Down Expand Up @@ -4746,8 +4756,12 @@ def process_files_confirm(self,action,processed_items,remaining_items,scope_titl

size=self.crc_to_size[crc]

if cfg_show_crc_size:
message_append('CRC:' + crc + ' size:' + bytes_to_str(size) + '|GRAY')
if self.similarity_mode:
if cfg_show_crc_size:
message_append('size:' + bytes_to_str(size) + '|GRAY')
else:
if cfg_show_crc_size:
message_append('CRC:' + crc + ' size:' + bytes_to_str(size) + '|GRAY')

for index,item in items_dict.items():
size_sum += size
Expand Down Expand Up @@ -4893,7 +4907,10 @@ def process_files_core(self,action,processed_items,remaining_items):
size = self.crc_to_size[crc]

self.process_files_core_info0 = f'size:{bytes_to_str(size)}'
self.process_files_core_info1 = f'crc:{crc}'
if self.similarity_mode:
self.process_files_core_info1 = f'group:{crc}'
else:
self.process_files_core_info1 = f'crc:{crc}'

for item in items_dict.values():
index_tuple=self_groups_tree_item_to_data[item][3]
Expand Down Expand Up @@ -4961,7 +4978,10 @@ def process_files_core(self,action,processed_items,remaining_items):
self.process_files_size_sum+=size

self.process_files_core_info0 = f'size:{bytes_to_str(size)}'
self.process_files_core_info1 = f'crc:{crc}'
if self.similarity_mode:
self.process_files_core_info1 = f'group:{crc}'
else:
self.process_files_core_info1 = f'crc:{crc}'

if resmsg:=dude_core_link_wrapper(SOFTLINK, do_rel_symlink, size,crc, index_tuple_ref, [self_groups_tree_item_to_data[item][3] for item in items_dict.values() ],to_trash,self.file_remove_callback,self.crc_remove_callback ):
l_error(resmsg)
Expand All @@ -4987,7 +5007,10 @@ def process_files_core(self,action,processed_items,remaining_items):
self.process_files_size_sum+=size

self.process_files_core_info0 = f'size:{bytes_to_str(size)}'
self.process_files_core_info1 = f'crc:{crc}'
if self.similarity_mode:
self.process_files_core_info1 = f'group:{crc}'
else:
self.process_files_core_info1 = f'crc:{crc}'

if resmsg:=dude_core_link_wrapper(WIN_LNK, False, size,crc, index_tuple_ref, [self_groups_tree_item_to_data[item][3] for item in items_dict.values() ],to_trash,self.file_remove_callback,self.crc_remove_callback ):
l_error(resmsg)
Expand All @@ -5010,7 +5033,10 @@ def process_files_core(self,action,processed_items,remaining_items):
self.process_files_size_sum+=size

self.process_files_core_info0 = f'size:{bytes_to_str(size)}'
self.process_files_core_info1 = f'crc:{crc}'
if self.similarity_mode:
self.process_files_core_info1 = f'group:{crc}'
else:
self.process_files_core_info1 = f'crc:{crc}'

if resmsg:=dude_core_link_wrapper(HARDLINK, False, size,crc, index_tuple_ref, [self_groups_tree_item_to_data[item][3] for index,item in items_dict.items() if index!=0 ],to_trash,self.file_remove_callback,self.crc_remove_callback ):
l_error(resmsg)
Expand Down

0 comments on commit da7c296

Please sign in to comment.