diff --git a/fileserver/option/option.go b/fileserver/option/option.go index 0943fa28..9e89b2d0 100644 --- a/fileserver/option/option.go +++ b/fileserver/option/option.go @@ -40,6 +40,7 @@ var ( WindowsEncoding string SkipBlockHash bool FsCacheLimit int64 + VerifyClientBlocks bool // general options CloudMode bool @@ -79,6 +80,7 @@ func initDefaultOptions() { ClusterSharedTempFileMode = 0600 DefaultQuota = InfiniteQuota FsCacheLimit = 2 << 30 + VerifyClientBlocks = true FsIdListRequestTimeout = -1 DBOpTimeout = 60 * time.Second } @@ -211,6 +213,9 @@ func parseFileServerSection(section *ini.Section) { FsIdListRequestTimeout = fsIdListRequestTimeout } } + if key, err := section.GetKey("verify_client_blocks_after_sync"); err == nil { + VerifyClientBlocks, _ = key.Bool() + } } func parseQuota(quotaStr string) int64 { diff --git a/fileserver/sync_api.go b/fileserver/sync_api.go index 9351ff67..3fa3f15a 100644 --- a/fileserver/sync_api.go +++ b/fileserver/sync_api.go @@ -1055,6 +1055,12 @@ func putUpdateBranchCB(rsp http.ResponseWriter, r *http.Request) *appError { return &appError{nil, msg, seafHTTPResNoQuota} } + if option.VerifyClientBlocks { + if body, err := checkBlocks(r.Context(), repo, base, newCommit); err != nil { + return &appError{nil, body, seafHTTPResBlockMissing} + } + } + token := r.Header.Get("Seafile-Repo-Token") if token == "" { token = utils.GetAuthorizationToken(r.Header) @@ -1072,6 +1078,81 @@ func putUpdateBranchCB(rsp http.ResponseWriter, r *http.Request) *appError { return nil } +type checkBlockAux struct { + storeID string + version int + fileList []string +} + +func checkBlocks(ctx context.Context, repo *repomgr.Repo, base, remote *commitmgr.Commit) (string, error) { + aux := new(checkBlockAux) + aux.storeID = repo.StoreID + aux.version = repo.Version + opt := &diff.DiffOptions{ + FileCB: checkFileBlocks, + DirCB: checkDirCB, + Ctx: ctx, + RepoID: repo.StoreID} + opt.Data = aux + + trees := []string{base.RootID, remote.RootID} + if err := diff.DiffTrees(trees, opt); err != nil { + return "", err + } + + if len(aux.fileList) == 0 { + return "", nil + } + + body, _ := json.Marshal(aux.fileList) + + return string(body), fmt.Errorf("block is missing") +} + +func checkFileBlocks(ctx context.Context, baseDir string, files []*fsmgr.SeafDirent, data interface{}) error { + select { + case <-ctx.Done(): + return context.Canceled + default: + } + + file1 := files[0] + file2 := files[1] + + aux, ok := data.(*checkBlockAux) + if !ok { + err := fmt.Errorf("failed to assert results") + return err + } + + if file2 == nil || file2.ID == emptySHA1 || (file1 != nil && file1.ID == file2.ID) { + return nil + } + + file, err := fsmgr.GetSeafile(aux.storeID, file2.ID) + if err != nil { + return err + } + for _, blkID := range file.BlkIDs { + if !blockmgr.Exists(aux.storeID, blkID) { + aux.fileList = append(aux.fileList, file2.Name) + return nil + } + } + + return nil +} + +func checkDirCB(ctx context.Context, baseDir string, dirs []*fsmgr.SeafDirent, data interface{}, recurse *bool) error { + select { + case <-ctx.Done(): + return context.Canceled + default: + } + + return nil +} + func getHeadCommit(rsp http.ResponseWriter, r *http.Request) *appError { vars := mux.Vars(r) repoID := vars["repoid"] diff --git a/server/http-server.c b/server/http-server.c index c7999908..12eb1fcc 100644 --- a/server/http-server.c +++ b/server/http-server.c @@ -152,6 +152,7 @@ load_http_config (HttpServerStruct *htp_server, SeafileSession *session) int worker_threads; char *encoding; char *cluster_shared_temp_file_mode = NULL; + gboolean verify_client_blocks; host = fileserver_config_get_string (session->config, HOST, &error); if (!error) { @@ -194,6 +195,18 @@ load_http_config (HttpServerStruct *htp_server, SeafileSession *session) } seaf_message ("fileserver: worker_threads = %d\n", htp_server->worker_threads); + verify_client_blocks = fileserver_config_get_boolean (session->config, + "verify_client_blocks_after_sync", + &error); + if (error) { + htp_server->verify_client_blocks = TRUE; + g_clear_error(&error); + } else { + htp_server->verify_client_blocks = verify_client_blocks; + } + seaf_message ("fileserver: verify_client_blocks = %d\n", + htp_server->verify_client_blocks); + cluster_shared_temp_file_mode = fileserver_config_get_string (session->config, "cluster_shared_temp_file_mode", &error); @@ -1090,6 +1103,95 @@ fast_forward_or_merge (const char *repo_id, return ret; } +typedef struct CheckBlockAux { + GList *file_list; + const char *store_id; + int version; +} CheckBlockAux; + +static int +check_file_blocks (int n, const char *basedir, SeafDirent *files[], void *data) +{ + Seafile *file = NULL; + char *block_id; + int i = 0; + SeafDirent *file1 = files[0]; + SeafDirent *file2 = files[1]; + CheckBlockAux *aux = (CheckBlockAux*)data; + + if (!file2 || strcmp (file2->id, EMPTY_SHA1) == 0 || (file1 && strcmp (file1->id, file2->id) == 0)) { + return 0; + } + + file = seaf_fs_manager_get_seafile (seaf->fs_mgr, aux->store_id, aux->version, file2->id); + if (!file) { + return -1; + } + + for (i = 0; i < file->n_blocks; ++i) { + block_id = file->blk_sha1s[i]; + if (!seaf_block_manager_block_exists (seaf->block_mgr, aux->store_id, aux->version, block_id)) { + aux->file_list = g_list_prepend (aux->file_list, g_strdup (file2->name)); + goto out; + } + } + +out: + seafile_unref (file); + return 0; +} + +static int +check_dir_cb (int n, const char *basedir, SeafDirent *dirs[], void *data, + gboolean *recurse) +{ + return 0; +} + +static int +check_blocks (SeafRepo *repo, SeafCommit *base, SeafCommit *remote, char **ret_body) { + DiffOptions opts; + memset (&opts, 0, sizeof(opts)); + memcpy (opts.store_id, repo->store_id, 36); + opts.version = repo->version; + + opts.file_cb = check_file_blocks; + opts.dir_cb = check_dir_cb; + + CheckBlockAux aux; + memset (&aux, 0, sizeof(aux)); + aux.store_id = repo->store_id; + aux.version = repo->version; + opts.data = &aux; + + const char *trees[2]; + trees[0] = base->root_id; + trees[1] = remote->root_id; + + if (diff_trees (2, trees, &opts) < 0) { + seaf_warning ("Failed to diff base and remote head for repo %.8s.\n", + repo->id); + return -1; + } + + if (!aux.file_list) { + return 0; + } + + json_t *obj_array = json_array (); + GList *ptr; + for (ptr = aux.file_list; ptr; ptr = ptr->next) { + json_array_append_new (obj_array, json_string (ptr->data)); + g_free (ptr->data); + } + g_list_free (aux.file_list); + + *ret_body = json_dumps (obj_array, JSON_COMPACT); + json_decref (obj_array); + + return -1; +} + static void put_update_branch_cb (evhtp_request_t *req, void *arg) { @@ -1154,6 +1256,19 @@ put_update_branch_cb (evhtp_request_t *req, void *arg) token = get_auth_token (req); + if (seaf->http_server->verify_client_blocks) { + char *ret_body = NULL; + int rc = check_blocks(repo, base, new_commit, &ret_body); + if (rc < 0) { + if (ret_body) { + evbuffer_add (req->buffer_out, ret_body, strlen (ret_body)); + } + evhtp_send_reply (req, SEAF_HTTP_RES_BLOCK_MISSING); + g_free (ret_body); + goto out; + } + } + if (fast_forward_or_merge (repo_id, base, new_commit, token) < 0) { seaf_warning ("Fast forward merge for repo %s is failed.\n", repo_id); evhtp_send_reply (req, EVHTP_RES_SERVERR); diff --git a/server/http-server.h b/server/http-server.h index bcff4e56..ecd4246a 100644 --- a/server/http-server.h +++ b/server/http-server.h @@ -19,6 +19,8 @@ struct _HttpServerStruct { char *windows_encoding; int worker_threads; int cluster_shared_temp_file_mode; + + gboolean verify_client_blocks; }; typedef struct _HttpServerStruct HttpServerStruct;