Skip to content

Commit

Permalink
feat/seasearch: add wiki index script
Browse files Browse the repository at this point in the history
  • Loading branch information
cir9no committed Aug 26, 2024
1 parent 4f828d5 commit 9f1909d
Show file tree
Hide file tree
Showing 5 changed files with 320 additions and 16 deletions.
8 changes: 7 additions & 1 deletion seasearch/index_store/index_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,19 @@ def delete_repo_filename_index(self, repo_id, repo_filename_index, repo_status_f
repo_filename_index.delete_index_by_index_name(repo_filename_index_name)
repo_status_filename_index.delete_documents_by_repo(repo_id)

def delete_wiki_index(self, wiki_id, wiki_index, wiki_status_index):
# first delete wiki_index
wiki_index_name = WIKI_INDEX_PREFIX + wiki_id
wiki_index.delete_index_by_index_name(wiki_index_name)
wiki_status_index.delete_documents_by_repo(wiki_id)

def keyword_search(self, query, repos, repo_filename_index, count, suffixes=None):
return repo_filename_index.search_files(repos, query, 0, count, suffixes)

def wiki_search(self, query, wikis, wiki_index, count):
return wiki_index.search_wikis(wikis, query, 0, count)

def update_wiki_page_index(self, wiki_id, commit_id, wiki_index, wiki_status_index):
def update_wiki_index(self, wiki_id, commit_id, wiki_index, wiki_status_index):
try:
new_commit_id = commit_id
index_name = WIKI_INDEX_PREFIX + wiki_id
Expand Down
7 changes: 5 additions & 2 deletions seasearch/index_store/wiki_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def get_wiki_conf(self, wiki_id):
return json.loads(f.get_content().decode())

def extract_doc_uuids(self, config):
'''Extract the uuid of the undeleted wikis'''
"""Extract the uuid of the undeleted wiki pages not in the recycle bin"""
def extract_ids_from_navigation(navigation_items, navigation_ids):
for item in navigation_items:
navigation_ids.add(item['id'])
Expand All @@ -151,7 +151,7 @@ def extract_ids_from_navigation(navigation_items, navigation_ids):
return doc_uuids

def extract_deleted_doc_uuids(self, config):
"""Extract the uuid of the deleted wikis"""
"""Extract the uuid of the deleted wiki pages in the recycle bin"""
def extract_ids_from_navigation(navigation_items, navigation_ids):
for item in navigation_items:
navigation_ids.add(item['id'])
Expand Down Expand Up @@ -312,3 +312,6 @@ def search_wikis(self, wikis, keyword, start=0, size=10):

query_match.extend(res_wikis)
return query_match

def delete_index_by_index_name(self, index_name):
self.seasearch_api.delete_index_by_name(index_name)
26 changes: 13 additions & 13 deletions seasearch/index_task/wiki_index_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,19 +78,19 @@ def start(self):
).start()


def clear_deleted_repo(wiki_status_index, wiki_index, index_manager, repos):
logger.info("start to clear wiki index deleted repo")
def clear_deleted_wiki(wiki_status_index, wiki_index, index_manager, wikis):
logger.info("start to clear deleted wiki index")

repo_list = wiki_status_index.get_all_repos_from_index()
repo_all = [e.get('repo_id') for e in repo_list]
wiki_list = wiki_status_index.get_all_repos_from_index()
wiki_all = [e.get('repo_id') for e in wiki_list]

repo_deleted = set(repo_all) - set(repos)
wiki_deleted = set(wiki_all) - set(wikis)

logger.info("wiki index %d repos need to be deleted." % len(repo_deleted))
for repo_id in repo_deleted:
index_manager.delete_repo_filename_index(repo_id, wiki_index, wiki_status_index)
logger.info('Repo %s has been deleted from wiki index.' % repo_id)
logger.info("wiki index deleted repo has been cleared")
logger.info("wiki index %d need to be deleted." % len(wiki_deleted))
for wiki_id in wiki_deleted:
index_manager.delete_wiki_index(wiki_id, wiki_index, wiki_status_index)
logger.info('Wiki %s has been deleted from wiki index.' % wiki_id)
logger.info("wiki index deleted wiki has been cleared")


def update_wiki_indexes(wiki_status_index, wiki_index, index_manager, repo_data):
Expand All @@ -109,11 +109,11 @@ def update_wiki_indexes(wiki_status_index, wiki_index, index_manager, repo_data)
for wiki_id, commit_id in wiki_commits:
all_wikis.append(wiki_id)

index_manager.update_wiki_page_index(wiki_id, commit_id, wiki_index, wiki_status_index)
index_manager.update_wiki_index(wiki_id, commit_id, wiki_index, wiki_status_index)

logger.info("Finish update wiki index")

clear_deleted_repo(wiki_status_index, wiki_index, index_manager, all_wikis)
clear_deleted_wiki(wiki_status_index, wiki_index, index_manager, all_wikis)


class WikiIndexUpdaterTimer(Thread):
Expand All @@ -128,7 +128,7 @@ def run(self):
sched = GeventScheduler()
logging.info('Start to update wiki index...')
try:
sched.add_job(update_wiki_indexes, CronTrigger(second='*/30'),
sched.add_job(update_wiki_indexes, CronTrigger(minute='*/15'),
args=(self.wiki_status_index, self.wiki_index, self.index_manager, self.repo_data))
except Exception as e:
logging.exception('periodical update wiki index error: %s', e)
Expand Down
11 changes: 11 additions & 0 deletions seasearch/script/wiki_index.sh.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
export CCNET_CONF_DIR=$CONF_PATH
export SEAFILE_CONF_DIR=$CONF_PATH/seafile-data
export EVENTS_CONFIG_FILE=$CONF_PATH/seafevents.conf
export PYTHONPATH=$COMPILE_PATH:$CONF_PATH:$PYTHONPATH:/usr/lib/python3.8/dist-packages:/usr/lib/python3.8/site-packages:/usr/local/lib/python3.8/dist-packages:/usr/local/lib/python3.8/site-packages:/data/dev/seahub/thirdpart:/data/dev/pyes/pyes:/data/dev/portable-python-libevent/libevent:/data/dev/seafobj:/data/dev/seahub/seahub/:/data/dev/
export SEAHUB_DIR=/data/dev/seahub/

if [[ $# == 1 && $1 == "clear" ]]; then
python -m seafevents.seasearch.script.wiki_index_local clear
else
python -m seafevents.seasearch.script.wiki_index_local update
fi
284 changes: 284 additions & 0 deletions seasearch/script/wiki_index_local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,284 @@
import os
import sys
import time
import queue
import logging
import argparse
import threading

from seafobj import commit_mgr, fs_mgr, block_mgr
from seafevents.utils import get_opt_from_conf_or_env
from seafevents.app.config import get_config
from seafevents.seasearch.utils import init_logging
from seafevents.repo_data import repo_data
from seafevents.seasearch.index_store.index_manager import IndexManager
from seafevents.seasearch.utils.seasearch_api import SeaSearchAPI
from seafevents.seasearch.index_store.repo_status_index import RepoStatusIndex
from seafevents.seasearch.utils.constants import WIKI_INDEX_PREFIX, WIKI_STATUS_INDEX_NAME
from seafevents.seasearch.index_store.wiki_index import WikiIndex

logger = logging.getLogger('seasearch')

UPDATE_FILE_LOCK = os.path.join(os.path.dirname(__file__), 'update.lock')
lockfile = None
NO_TASKS = False


class WikiIndexLocal(object):
""" Independent update wiki page index.
"""
def __init__(self, index_manager, wiki_status_index, wiki_index, repo_data, workers=3):
self.index_manager = index_manager
self.wiki_status_index = wiki_status_index
self.wiki_index = wiki_index
self.repo_data = repo_data
self.error_counter = 0
self.worker_list = []
self.workers = workers

def clear_worker(self):
for th in self.worker_list:
th.join()
logger.info("All worker threads has stopped.")

def run(self):
time_start = time.time()
wikis_queue = queue.Queue(0)
for i in range(self.workers):
thread_name = "worker" + str(i)
logger.info("starting %s worker threads for wiki indexing"
% thread_name)
t = threading.Thread(target=self.thread_task, args=(wikis_queue, ), name=thread_name)
t.start()
self.worker_list.append(t)

start, per_size = 0, 1000
wikis = {}
while True:
global NO_TASKS
try:
wiki_commits = self.repo_data.get_wiki_id_commit_id(start, per_size)
except Exception as e:
logger.error("Error: %s" % e)
NO_TASKS = True
self.clear_worker()
return
else:
if len(wiki_commits) == 0:
NO_TASKS = True
break
for wiki_id, commit_id in wiki_commits:
wikis_queue.put((wiki_id, commit_id))
wikis[wiki_id] = commit_id
start += per_size

self.clear_worker()
logger.info("wiki index updated, total time %s seconds" % str(time.time() - time_start))
try:
self.clear_deleted_wiki(list(wikis.keys()))
except Exception as e:
logger.exception('Delete Wiki Error: %s' % e)
self.incr_error()

def thread_task(self, wikis_queue):
while True:
try:
queue_data = wikis_queue.get(False)
except queue.Empty:
if NO_TASKS:
logger.debug(
"Queue is empty, %s worker threads stop"
% (threading.currentThread().getName())
)
break
else:
time.sleep(2)
else:
wiki_id = queue_data[0]
commit_id = queue_data[1]
try:
self.index_manager.update_wiki_index(wiki_id, commit_id, self.wiki_index, self.wiki_status_index)
except Exception as e:
logger.exception('Wiki index error: %s, wiki_id: %s' % (e, wiki_id), exc_info=True)
self.incr_error()

logger.info(
"%s worker updated at %s time"
% (threading.currentThread().getName(),
time.strftime("%Y-%m-%d %H:%M", time.localtime(time.time())))
)
logger.info(
"%s worker get %s error"
% (threading.currentThread().getName(),
str(self.error_counter))
)

def clear_deleted_wiki(self, wikis):
logger.info("start to clear deletde wiki")
wiki_all = [e.get('repo_id') for e in self.wiki_status_index.get_all_repos_from_index()]

wiki_deleted = set(wiki_all) - set(wikis)
logger.info("wiki index %d need to be deleted." % len(wiki_deleted))
for wiki_id in wiki_deleted:
self.delete_wiki_index(wiki_id)
logger.info('Wiki %s has been deleted from index.' % wiki_id)
logger.info("deleted wiki has been cleared")

def incr_error(self):
self.error_counter += 1

def delete_wiki_index(self, wiki_id):
if len(wiki_id) != 36:
return
self.index_manager.delete_wiki_index(wiki_id, self.wiki_index, self.wiki_status_index)


def start_index_local():
if not check_concurrent_update():
return
section_name = 'SEASEARCH'
seafevents_conf = os.environ.get('EVENTS_CONFIG_FILE')
config = get_config(seafevents_conf)
seasearch_url = get_opt_from_conf_or_env(
config, section_name, 'seasearch_url'
)
seasearch_token = get_opt_from_conf_or_env(
config, section_name, 'seasearch_token'
)

index_manager = IndexManager()
seasearch_api = SeaSearchAPI(seasearch_url, seasearch_token)
wiki_status_index = RepoStatusIndex(seasearch_api, WIKI_STATUS_INDEX_NAME)
wiki_index = WikiIndex(seasearch_api, repo_data, shard_num=1)

try:
index_local = WikiIndexLocal(index_manager, wiki_status_index, wiki_index, repo_data)
except Exception as e:
logger.error("Index wiki process init error: %s." % e)
return

logger.info("Index wiki process initialized.")
index_local.run()

logger.info('\n\nWiki index updated, statistic report:\n')
logger.info('[commit read] %s', commit_mgr.read_count())


def delete_indices():
section_name = 'SEASEARCH'
conf_path = os.environ.get('CONF_PATH') or os.environ.get('SEAFILE_CENTRAL_CONF_DIR')
seafevents_conf = os.path.join(conf_path, 'seafevents.conf')
config = get_config(seafevents_conf)
seasearch_url = get_opt_from_conf_or_env(
config, section_name, 'seasearch_url'
)
seasearch_token = get_opt_from_conf_or_env(
config, section_name, 'seasearch_token'
)

seasearch_api = SeaSearchAPI(seasearch_url, seasearch_token)
wiki_status_index = RepoStatusIndex(seasearch_api, WIKI_STATUS_INDEX_NAME)
wiki_index = WikiIndex(seasearch_api, repo_data, shard_num=1)

start, count = 0, 1000
while True:
try:
wiki_commits = repo_data.get_wiki_id_commit_id(start, count)
except Exception as e:
logger.error("Error: %s" % e)
return
start += 1000

if len(wiki_commits) == 0:
break

for wiki_id, commit_id in wiki_commits:
wiki_index_name = WIKI_INDEX_PREFIX + wiki_id
wiki_index.delete_index_by_index_name(wiki_index_name)

wiki_status_index.delete_index_by_index_name()


def main():
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(title='subcommands', description='')

parser.add_argument(
'--logfile',
default=sys.stdout,
type=argparse.FileType('a'),
help='log file')

parser.add_argument(
'--loglevel',
default='info',
help='log level')

# update index
parser_update = subparsers.add_parser('update', help='update seafile wiki index')
parser_update.set_defaults(func=start_index_local)

# clear
parser_clear = subparsers.add_parser('clear', help='clear all wiki index')
parser_clear.set_defaults(func=delete_indices)

if len(sys.argv) == 1:
print(parser.format_help())
return

args = parser.parse_args()
init_logging(args)

logger.info('storage: using ' + commit_mgr.get_backend_name())

args.func()


def do_lock(fn):
if os.name == 'nt':
return do_lock_win32(fn)
else:
return do_lock_linux(fn)


def do_lock_win32(fn):
import ctypes

CreateFileW = ctypes.windll.kernel32.CreateFileW
GENERIC_WRITE = 0x40000000
OPEN_ALWAYS = 4

def lock_file(path):
lock_file_handle = CreateFileW(path, GENERIC_WRITE, 0, None, OPEN_ALWAYS, 0, None)

return lock_file_handle

global lockfile

lockfile = lock_file(fn)

return lockfile != -1


def do_lock_linux(fn):
from seafevents.seasearch.script import portalocker
global lockfile
lockfile = open(fn, 'w')
try:
portalocker.lock(lockfile, portalocker.LOCK_NB | portalocker.LOCK_EX)
return True
except portalocker.LockException:
return False


def check_concurrent_update():
"""Use a lock file to ensure only one task can be running"""
if not do_lock(UPDATE_FILE_LOCK):
logger.error('another index task is running, quit now')
return False

return True


if __name__ == "__main__":
main()

0 comments on commit 9f1909d

Please sign in to comment.