Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/seasearch add wiki search sup #366

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from seafevents.seafevent_server.seafevent_server import SeafEventServer
from seafevents.app.config import ENABLE_METADATA_MANAGEMENT
from seafevents.seasearch.index_task.filename_index_updater import RepoFilenameIndexUpdater
from seafevents.seasearch.index_task.wiki_index_updater import WikiIndexUpdater


class App(object):
Expand Down Expand Up @@ -42,6 +43,7 @@ def __init__(self, config, ccnet_config, seafile_config,
self._index_worker = RepoMetadataIndexWorker(config)
self._slow_task_handler = SlowTaskHandler(config)
self._repo_filename_index_updater = RepoFilenameIndexUpdater(config)
self._wiki_index_updater = WikiIndexUpdater(config)

def serve_forever(self):
if self._fg_tasks_enabled:
Expand All @@ -66,3 +68,4 @@ def serve_forever(self):
self._index_worker.start()
self._slow_task_handler.start()
self._repo_filename_index_updater.start()
self._wiki_index_updater.start()
4 changes: 2 additions & 2 deletions repo_data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ def _get_virtual_repo_in_repos(self, repo_ids):
if not repo_ids:
return []
try:
cmd = """SELECT repo_id from VirtualRepo WHERE repo_id IN {}""".format(tuple(repo_ids))
formatted_ids = ", ".join("'{}'".format(id) for id in repo_ids)
cmd = """SELECT repo_id from VirtualRepo WHERE repo_id IN ({})""".format(formatted_ids)
res = session.execute(text(cmd)).fetchall()
return res
except Exception as e:
Expand Down Expand Up @@ -142,5 +143,4 @@ def get_virtual_repo_in_repos(self, repo_ids):
logger.error(e)
return self._get_virtual_repo_in_repos(repo_ids)


repo_data = RepoData()
33 changes: 33 additions & 0 deletions seafevent_server/request_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,36 @@ def search():
results = index_task_manager.keyword_search(query, repos, count, suffixes, search_path)

return {'results': results}, 200


@app.route('/wiki-search', methods=['POST'])
def search_wikis():
is_valid = check_auth_token(request)
if not is_valid:
return {'error_msg': 'Permission denied'}, 403

# Check seasearch is enable
if not index_task_manager.enabled:
return {'error_msg': 'Seasearch is not enabled by seafevents.conf'}
try:
data = json.loads(request.data)
except Exception as e:
logger.exception(e)
return {'error_msg': 'Bad request.'}, 400

query = data.get('query').strip()
wikis = data.get('wikis')

if not query:
return {'error_msg': 'query invalid.'}, 400
if not wikis:
return {'error_msg': 'wikis invalid.'}, 400

try:
count = int(data.get('count'))
except:
count = 20

results = index_task_manager.wiki_search(query, wikis, count)

return {'results': results}, 200
49 changes: 46 additions & 3 deletions seasearch/index_store/index_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

from seafevents.seasearch.utils import need_index_metadata_info
from seafevents.db import init_db_session_class
from seafevents.seasearch.utils.constants import ZERO_OBJ_ID, REPO_FILENAME_INDEX_PREFIX
from seafevents.seasearch.utils.constants import ZERO_OBJ_ID, REPO_FILENAME_INDEX_PREFIX, \
WIKI_INDEX_PREFIX
from seafevents.repo_metadata.metadata_server_api import MetadataServerAPI
from seafevents.repo_metadata.utils import METADATA_TABLE
from seafevents.utils import timestamp_to_isoformat_timestr
Expand Down Expand Up @@ -57,9 +58,9 @@ def update_library_filename_index(self, repo_id, commit_id, repo_filename_index,
commit_id = to_commit
time.sleep(1)

repo_status_filename_index.begin_update_repo(repo_id, commit_id, new_commit_id, metadata_last_updated_time)
repo_status_filename_index.begin_update_repo(repo_id, commit_id, new_commit_id, metadata_updated_time=metadata_last_updated_time)
repo_filename_index.update(index_name, repo_id, commit_id, new_commit_id, rows, self.metadata_server_api, need_index_metadata)
repo_status_filename_index.finish_update_repo(repo_id, new_commit_id, metadata_query_time)
repo_status_filename_index.finish_update_repo(repo_id, new_commit_id, metadata_updated_time=metadata_query_time)

logger.info('repo: %s, update repo filename index success', repo_id)

Expand All @@ -74,3 +75,45 @@ def delete_repo_filename_index(self, repo_id, repo_filename_index, repo_status_f

def keyword_search(self, query, repos, repo_filename_index, count, suffixes=None, search_path=None):
return repo_filename_index.search_files(repos, query, 0, count, suffixes, search_path)

def delete_wiki_index(self, wiki_id, wiki_index, wiki_status_index):
# first delete wiki_index
wiki_index_name = WIKI_INDEX_PREFIX + wiki_id
wiki_index.delete_index_by_index_name(wiki_index_name)
wiki_status_index.delete_documents_by_repo(wiki_id)

def wiki_search(self, query, wikis, wiki_index, count):
return wiki_index.search_wikis(wikis, query, 0, count)

def update_wiki_index(self, wiki_id, commit_id, wiki_index, wiki_status_index):
try:
new_commit_id = commit_id
index_name = WIKI_INDEX_PREFIX + wiki_id

wiki_index.create_index_if_missing(index_name)

wiki_status = wiki_status_index.get_repo_status_by_id(wiki_id)
from_commit = wiki_status.from_commit
to_commit = wiki_status.to_commit

if new_commit_id == from_commit:
return

if not from_commit:
commit_id = ZERO_OBJ_ID
else:
commit_id = from_commit

if wiki_status.need_recovery():
logger.warning('%s: wiki index inrecovery', wiki_id)
wiki_index.update(index_name, wiki_id, commit_id, to_commit)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

按照现在的逻辑这里还能正常recovery吗?

commit_id = to_commit
time.sleep(1)
wiki_status_index.begin_update_repo(wiki_id, commit_id, new_commit_id)
wiki_index.update(index_name, wiki_id, commit_id, new_commit_id)
wiki_status_index.finish_update_repo(wiki_id, new_commit_id)

logger.info('wiki: %s, update wiki index success', wiki_id)

except Exception as e:
logger.exception('wiki_id: %s, update wiki index error: %s.', wiki_id, e)
2 changes: 1 addition & 1 deletion seasearch/index_store/repo_file_name_index.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
import os
import logging

Expand Down Expand Up @@ -159,6 +158,7 @@ def search_files(self, repos, keyword, start=0, size=10, suffixes=None, search_p
bulk_search_params.append(data)
search_path = None


results = self.seasearch_api.m_search(bulk_search_params)
files = []

Expand Down
98 changes: 59 additions & 39 deletions seasearch/index_store/repo_status_index.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
from seafevents.seasearch.utils.constants import REPO_STATUS_FILENAME_INDEX_NAME


class RepoStatus(object):
def __init__(self, repo_id, from_commit, to_commit, metadata_updated_time):
def __init__(self, repo_id, from_commit, to_commit, **kwargs):
self.repo_id = repo_id
self.from_commit = from_commit
self.to_commit = to_commit
self.metadata_updated_time = metadata_updated_time
if 'metadata_updated_time' in kwargs:
self.metadata_updated_time = kwargs['metadata_updated_time']

def need_recovery(self):
return self.to_commit is not None
Expand Down Expand Up @@ -35,9 +39,6 @@ class RepoStatusIndex(object):
'updatingto': {
'type': 'keyword'
},
'metadata_updated_time': {
'type': 'keyword'
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里把之前加的属性给删了,别的功能无法工作了

},
}

Expand All @@ -46,8 +47,15 @@ def __init__(self, seasearch_api, index_name):
self.seasearch_api = seasearch_api
self.create_index_if_missing()

def is_status_filename_index(self):
return self.index_name.startswith(REPO_STATUS_FILENAME_INDEX_NAME)

def create_index_if_missing(self):
if not self.seasearch_api.check_index_mapping(self.index_name).get('is_exist'):
if self.is_status_filename_index():
self.mapping['properties']['metadata_updated_time'] = {
'type': 'keyword'
}
data = {
'mappings': self.mapping,
}
Expand All @@ -56,35 +64,45 @@ def create_index_if_missing(self):
def check_repo_status(self, repo_id):
return self.seasearch_api.check_document_by_id(self.index_name, repo_id).get('is_exist')

def add_repo_status(self, repo_id, commit_id, updatingto, metadata_updated_time):
date = {
def add_repo_status(self, repo_id, commit_id, updatingto, **kwargs):
data = {
'repo_id': repo_id,
'commit_id': commit_id,
'updatingto': updatingto,
'metadata_updated_time': metadata_updated_time,
}

if 'metadata_updated_time' in kwargs:
data.update(metadata_updated_time=kwargs['metadata_updated_time'])

doc_id = repo_id
self.seasearch_api.create_document_by_id(self.index_name, doc_id, date)
self.seasearch_api.create_document_by_id(self.index_name, doc_id, data)

def begin_update_repo(self, repo_id, old_commit_id, new_commit_id, metadata_updated_time):
self.add_repo_status(repo_id, old_commit_id, new_commit_id, metadata_updated_time)
def begin_update_repo(self, repo_id, old_commit_id, new_commit_id, **kwargs):
self.add_repo_status(repo_id, old_commit_id, new_commit_id, **kwargs)

def finish_update_repo(self, repo_id, commit_id, metadata_updated_time):
self.add_repo_status(repo_id, commit_id, None, metadata_updated_time)
def finish_update_repo(self, repo_id, commit_id, **kwargs):
self.add_repo_status(repo_id, commit_id, None, **kwargs)

def delete_documents_by_repo(self, repo_id):
return self.seasearch_api.delete_document_by_id(self.index_name, repo_id)

def get_repo_status_by_id(self, repo_id):
doc = self.seasearch_api.get_document_by_id(self.index_name, repo_id)
if doc.get('error'):
return RepoStatus(repo_id, None, None, None)
if self.is_status_filename_index():
return RepoStatus(repo_id, None, None, metadata_updated_time=None)
else:
return RepoStatus(repo_id, None, None)

commit_id = doc['_source']['commit_id']
updatingto = doc['_source']['updatingto']
metadata_updated_time = doc['_source']['metadata_updated_time']
repo_id = doc['_source']['repo_id']

return RepoStatus(repo_id, commit_id, updatingto, metadata_updated_time)
if self.is_status_filename_index():
metadata_updated_time = doc['_source']['metadata_updated_time']
return RepoStatus(repo_id, commit_id, updatingto, metadata_updated_time=metadata_updated_time)

return RepoStatus(repo_id, commit_id, updatingto)

def update_repo_status_by_id(self, doc_id, data):
self.seasearch_api.update_document_by_id(self.index_name, doc_id, data)
Expand All @@ -93,27 +111,28 @@ def get_repo_status_by_time(self, check_time):
per_size = 2000
start = 0
repo_head_list = []
while True:
query_params = {
"query": {
"bool": {
"must": [
{"range":
{"@timestamp":
{
"lt": check_time
}
query_params = {
"query": {
"bool": {
"must": [
{"range":
{"@timestamp":
{
"lt": check_time
}
}
]
}
},
"_source": ["commit_id", "updatingto", "metadata_updated_time"],
"from": start,
"size": per_size,
"sort": ["-@timestamp"],
}

}
]
}
},
"_source": ["commit_id", "updatingto"],
"from": start,
"size": per_size,
"sort": ["-@timestamp"],
}
if self.is_status_filename_index():
query_params['_source'].append('metadata_updated_time')
while True:
repo_heads, total = self._repo_head_search(query_params)
repo_head_list.extend(repo_heads)
start += per_size
Expand Down Expand Up @@ -151,13 +170,14 @@ def _repo_head_search(self, query_params):
repo_id = hit['_id']
commit_id = hit.get('_source').get('commit_id')
updatingto = hit.get('_source').get('updatingto')
metadata_updated_time = hit.get('_source').get('metadata_updated_time')
repo_heads.append({
repo_head = {
'repo_id': repo_id,
'commit_id': commit_id,
'updatingto': updatingto,
'metadata_updated_time': metadata_updated_time,
})
}
if 'metadata_updated_time' in hit.get('_source', {}):
repo_head['metadata_updated_time'] = hit.get('_source').get('metadata_updated_time')
repo_heads.append(repo_head)
return repo_heads, total

def delete_index_by_index_name(self):
Expand Down
Loading