Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add online gc #706

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 52 additions & 1 deletion common/branch-mgr.c
Original file line number Diff line number Diff line change
Expand Up @@ -368,19 +368,70 @@ on_branch_updated (SeafBranchManager *mgr, SeafBranch *branch)
publish_repo_update_event (branch->repo_id, branch->commit_id);
}

static gboolean
get_gc_id (SeafDBRow *row, void *data)
{
char **out_gc_id = data;

*out_gc_id = g_strdup(seaf_db_row_get_column_text (row, 0));

return FALSE;
}

int
seaf_branch_manager_test_and_update_branch (SeafBranchManager *mgr,
SeafBranch *branch,
const char *old_commit_id)
const char *old_commit_id,
gboolean check_gc,
const char *last_gc_id,
const char *origin_repo_id,
gboolean *gc_conflict)
{
SeafDBTrans *trans;
char *sql;
char commit_id[41] = { 0 };
char *gc_id = NULL;

if (check_gc)
*gc_conflict = FALSE;

trans = seaf_db_begin_transaction (mgr->seaf->db);
if (!trans)
return -1;

if (check_gc) {
sql = "SELECT gc_id FROM GCID WHERE repo_id = ? FOR UPDATE";
if (!origin_repo_id) {
if (seaf_db_trans_foreach_selected_row (trans, sql,
get_gc_id, &gc_id,
1, "string", branch->repo_id) < 0) {
seaf_db_rollback (trans);
seaf_db_trans_close (trans);
return -1;
}
}
else {
if (seaf_db_trans_foreach_selected_row (trans, sql,
get_gc_id, &gc_id,
1, "string", origin_repo_id) < 0) {
seaf_db_rollback (trans);
seaf_db_trans_close (trans);
return -1;
}
}

if (g_strcmp0 (last_gc_id, gc_id) != 0) {
seaf_warning ("Head branch update for repo %s conflicts with GC.\n",
branch->repo_id);
seaf_db_rollback (trans);
seaf_db_trans_close (trans);
*gc_conflict = TRUE;
g_free (gc_id);
return -1;
}
g_free (gc_id);
}

switch (seaf_db_type (mgr->seaf->db)) {
case SEAF_DB_TYPE_MYSQL:
case SEAF_DB_TYPE_PGSQL:
Expand Down
6 changes: 5 additions & 1 deletion common/branch-mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ seaf_branch_manager_update_branch (SeafBranchManager *mgr,
int
seaf_branch_manager_test_and_update_branch (SeafBranchManager *mgr,
SeafBranch *branch,
const char *old_commit_id);
const char *old_commit_id,
gboolean check_gc,
const char *last_gc_id,
const char *origin_repo_id,
gboolean *gc_conflict);
#endif

SeafBranch *
Expand Down
13 changes: 12 additions & 1 deletion common/rpc-service.c
Original file line number Diff line number Diff line change
Expand Up @@ -1108,7 +1108,8 @@ seafile_change_repo_passwd (const char *repo_id,
seaf_branch_set_commit (repo->head, commit->commit_id);
if (seaf_branch_manager_test_and_update_branch (seaf->branch_mgr,
repo->head,
parent->commit_id) < 0) {
parent->commit_id,
FALSE, NULL, NULL, NULL) < 0) {
seaf_repo_unref (repo);
seaf_commit_unref (commit);
seaf_commit_unref (parent);
Expand Down Expand Up @@ -1365,6 +1366,16 @@ seafile_get_repo_history_limit (const char *repo_id,
return seaf_repo_manager_get_repo_history_limit (seaf->repo_mgr, repo_id);
}

int
seafile_set_repo_valid_since (const char *repo_id,
gint64 timestamp,
GError **error)
{
return seaf_repo_manager_set_repo_valid_since (seaf->repo_mgr,
repo_id,
timestamp);
}

int
seafile_repo_set_access_property (const char *repo_id, const char *ap, GError **error)
{
Expand Down
112 changes: 82 additions & 30 deletions fileserver/fileop.go
Original file line number Diff line number Diff line change
Expand Up @@ -1455,7 +1455,7 @@ func mkdirWithParents(repoID, parentDir, newDirPath, user string) error {
}

buf := fmt.Sprintf("Added directory \"%s\"", relativeDirCan)
_, err = genNewCommit(repo, headCommit, rootID, user, buf, true)
_, err = genNewCommit(repo, headCommit, rootID, user, buf, true, "", false)
if err != nil {
err := fmt.Errorf("failed to generate new commit: %v", err)
return err
Expand Down Expand Up @@ -1714,7 +1714,13 @@ func postMultiFiles(rsp http.ResponseWriter, r *http.Request, repoID, parentDir,
}
}

retStr, err := postFilesAndGenCommit(fileNames, repo.ID, user, canonPath, replace, ids, sizes, lastModify)
gcID, err := repomgr.GetCurrentGCID(repo.StoreID)
if err != nil {
err := fmt.Errorf("failed to get current gc id for repo %s: %v", repoID, err)
return &appError{err, "", http.StatusInternalServerError}
}

retStr, err := postFilesAndGenCommit(fileNames, repo.ID, user, canonPath, replace, ids, sizes, lastModify, gcID)
if err != nil {
err := fmt.Errorf("failed to post files and gen commit: %v", err)
return &appError{err, "", http.StatusInternalServerError}
Expand Down Expand Up @@ -1770,7 +1776,7 @@ func checkFilesWithSameName(repo *repomgr.Repo, canonPath string, fileNames []st
return false
}

func postFilesAndGenCommit(fileNames []string, repoID string, user, canonPath string, replace bool, ids []string, sizes []int64, lastModify int64) (string, error) {
func postFilesAndGenCommit(fileNames []string, repoID string, user, canonPath string, replace bool, ids []string, sizes []int64, lastModify int64, lastGCID string) (string, error) {
handleConncurrentUpdate := true
if !replace {
handleConncurrentUpdate = false
Expand Down Expand Up @@ -1816,7 +1822,7 @@ retry:
buf = fmt.Sprintf("Added \"%s\".", fileNames[0])
}

_, err = genNewCommit(repo, headCommit, rootID, user, buf, handleConncurrentUpdate)
_, err = genNewCommit(repo, headCommit, rootID, user, buf, handleConncurrentUpdate, lastGCID, true)
if err != nil {
if err != ErrConflict {
err := fmt.Errorf("failed to generate new commit: %v", err)
Expand Down Expand Up @@ -1880,7 +1886,7 @@ func getCanonPath(p string) string {

var ErrConflict = fmt.Errorf("Concurent upload conflict")

func genNewCommit(repo *repomgr.Repo, base *commitmgr.Commit, newRoot, user, desc string, handleConncurrentUpdate bool) (string, error) {
func genNewCommit(repo *repomgr.Repo, base *commitmgr.Commit, newRoot, user, desc string, handleConncurrentUpdate bool, lastGCID string, checkGC bool) (string, error) {
var retryCnt int
repoID := repo.ID
commit := commitmgr.NewCommit(repoID, base.CommitID, newRoot, user, desc)
Expand All @@ -1895,7 +1901,7 @@ func genNewCommit(repo *repomgr.Repo, base *commitmgr.Commit, newRoot, user, des
maxRetryCnt := 10

for {
retry, err := genCommitNeedRetry(repo, base, commit, newRoot, user, handleConncurrentUpdate, &commitID)
retry, err := genCommitNeedRetry(repo, base, commit, newRoot, user, handleConncurrentUpdate, &commitID, lastGCID, checkGC)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -1925,10 +1931,19 @@ func genNewCommit(repo *repomgr.Repo, base *commitmgr.Commit, newRoot, user, des
return commitID, nil
}

func fastForwardOrMerge(user string, repo *repomgr.Repo, base, newCommit *commitmgr.Commit) error {
func fastForwardOrMerge(user, token string, repo *repomgr.Repo, base, newCommit *commitmgr.Commit) error {
var retryCnt int
checkGC, err := repomgr.HasLastGCID(repo.ID, token)
if err != nil {
return err
}
var lastGCID string
if checkGC {
lastGCID, _ = repomgr.GetLastGCID(repo.ID, token)
repomgr.RemoveLastGCID(repo.ID, token)
}
for {
retry, err := genCommitNeedRetry(repo, base, newCommit, newCommit.RootID, user, true, nil)
retry, err := genCommitNeedRetry(repo, base, newCommit, newCommit.RootID, user, true, nil, lastGCID, checkGC)
if err != nil {
return err
}
Expand All @@ -1948,7 +1963,7 @@ func fastForwardOrMerge(user string, repo *repomgr.Repo, base, newCommit *commit
return nil
}

func genCommitNeedRetry(repo *repomgr.Repo, base *commitmgr.Commit, commit *commitmgr.Commit, newRoot, user string, handleConncurrentUpdate bool, commitID *string) (bool, error) {
func genCommitNeedRetry(repo *repomgr.Repo, base *commitmgr.Commit, commit *commitmgr.Commit, newRoot, user string, handleConncurrentUpdate bool, commitID *string, lastGCID string, checkGC bool) (bool, error) {
var secondParentID string
repoID := repo.ID
var mergeDesc string
Expand Down Expand Up @@ -2001,7 +2016,10 @@ func genCommitNeedRetry(repo *repomgr.Repo, base *commitmgr.Commit, commit *comm
mergedCommit = commit
}

err = updateBranch(repoID, mergedCommit.CommitID, currentHead.CommitID, secondParentID)
gcConflict, err := updateBranch(repoID, repo.StoreID, mergedCommit.CommitID, currentHead.CommitID, secondParentID, checkGC, lastGCID)
if gcConflict {
return false, err
}
if err != nil {
return true, nil
}
Expand All @@ -2024,56 +2042,80 @@ func genMergeDesc(repo *repomgr.Repo, mergedRoot, p1Root, p2Root string) string
return desc
}

func updateBranch(repoID, newCommitID, oldCommitID, secondParentID string) error {
func updateBranch(repoID, originRepoID, newCommitID, oldCommitID, secondParentID string, checkGC bool, lastGCID string) (gcConflict bool, err error) {
ctx, cancel := context.WithTimeout(context.Background(), option.DBOpTimeout)
defer cancel()
trans, err := seafileDB.BeginTx(ctx, nil)
if err != nil {
err := fmt.Errorf("failed to start transaction: %v", err)
return false, err
}

var row *sql.Row
var sqlStr string
if checkGC {
sqlStr = "SELECT gc_id FROM GCID WHERE repo_id = ? FOR UPDATE"
if originRepoID == "" {
row = trans.QueryRowContext(ctx, sqlStr, repoID)
} else {
row = trans.QueryRowContext(ctx, sqlStr, originRepoID)
}
var gcID sql.NullString
if err := row.Scan(&gcID); err != nil {
if err != sql.ErrNoRows {
trans.Rollback()
return false, err
}
}

if lastGCID != gcID.String {
err = fmt.Errorf("Head branch update for repo %s conflicts with GC.", repoID)
trans.Rollback()
return true, err
}
}

var commitID string
name := "master"
var sqlStr string
if strings.EqualFold(dbType, "mysql") {
sqlStr = "SELECT commit_id FROM Branch WHERE name = ? AND repo_id = ? FOR UPDATE"
} else {
sqlStr = "SELECT commit_id FROM Branch WHERE name = ? AND repo_id = ?"
}

ctx, cancel := context.WithTimeout(context.Background(), option.DBOpTimeout)
defer cancel()
trans, err := seafileDB.BeginTx(ctx, nil)
if err != nil {
err := fmt.Errorf("failed to start transaction: %v", err)
return err
}
row := trans.QueryRowContext(ctx, sqlStr, name, repoID)
row = trans.QueryRowContext(ctx, sqlStr, name, repoID)
if err := row.Scan(&commitID); err != nil {
if err != sql.ErrNoRows {
trans.Rollback()
return err
return false, err
}
}
if oldCommitID != commitID {
trans.Rollback()
err := fmt.Errorf("head commit id has changed")
return err
return false, err
}

sqlStr = "UPDATE Branch SET commit_id = ? WHERE name = ? AND repo_id = ?"
_, err = trans.ExecContext(ctx, sqlStr, newCommitID, name, repoID)
if err != nil {
trans.Rollback()
return err
return false, err
}

trans.Commit()

if secondParentID != "" {
if err := onBranchUpdated(repoID, secondParentID, false); err != nil {
return err
return false, err
}
}

if err := onBranchUpdated(repoID, newCommitID, true); err != nil {
return err
return false, err
}

return nil
return false, nil
}

func onBranchUpdated(repoID string, commitID string, updateRepoInfo bool) error {
Expand Down Expand Up @@ -2726,7 +2768,7 @@ func updateDir(repoID, dirPath, newDirID, user, headID string) (string, error) {
if commitDesc == "" {
commitDesc = "Auto merge by system"
}
newCommitID, err := genNewCommit(repo, headCommit, newDirID, user, commitDesc, true)
newCommitID, err := genNewCommit(repo, headCommit, newDirID, user, commitDesc, true, "", false)
if err != nil {
err := fmt.Errorf("failed to generate new commit: %v", err)
return "", err
Expand Down Expand Up @@ -2767,7 +2809,7 @@ func updateDir(repoID, dirPath, newDirID, user, headID string) (string, error) {
commitDesc = "Auto merge by system"
}

newCommitID, err := genNewCommit(repo, headCommit, rootID, user, commitDesc, true)
newCommitID, err := genNewCommit(repo, headCommit, rootID, user, commitDesc, true, "", false)
if err != nil {
err := fmt.Errorf("failed to generate new commit: %v", err)
return "", err
Expand Down Expand Up @@ -3166,8 +3208,13 @@ func putFile(rsp http.ResponseWriter, r *http.Request, repoID, parentDir, user,
return &appError{err, "", http.StatusInternalServerError}
}

gcID, err := repomgr.GetCurrentGCID(repo.StoreID)
if err != nil {
err := fmt.Errorf("failed to get current gc id: %v", err)
return &appError{err, "", http.StatusInternalServerError}
}
desc := fmt.Sprintf("Modified \"%s\"", fileName)
_, err = genNewCommit(repo, headCommit, rootID, user, desc, true)
_, err = genNewCommit(repo, headCommit, rootID, user, desc, true, gcID, true)
if err != nil {
err := fmt.Errorf("failed to generate new commit: %v", err)
return &appError{err, "", http.StatusInternalServerError}
Expand Down Expand Up @@ -3398,8 +3445,13 @@ func commitFileBlocks(repoID, parentDir, fileName, blockIDsJSON, user string, fi
return "", &appError{err, "", http.StatusInternalServerError}
}

gcID, err := repomgr.GetCurrentGCID(repo.StoreID)
if err != nil {
err := fmt.Errorf("failed to get current gc id: %v", err)
return "", &appError{err, "", http.StatusInternalServerError}
}
desc := fmt.Sprintf("Added \"%s\"", fileName)
_, err = genNewCommit(repo, headCommit, rootID, user, desc, true)
_, err = genNewCommit(repo, headCommit, rootID, user, desc, true, gcID, true)
if err != nil {
err := fmt.Errorf("failed to generate new commit: %v", err)
return "", &appError{err, "", http.StatusInternalServerError}
Expand Down
Loading
Loading