From bf38f129116959c6e3a3005346e62a745f8e7f28 Mon Sep 17 00:00:00 2001 From: Alex Aizman Date: Sat, 25 Nov 2023 12:24:56 -0500 Subject: [PATCH] (go api: (xaction)): ref * "huge param" part six Signed-off-by: Alex Aizman --- ais/test/archive_test.go | 10 ++-- ais/test/bucket_test.go | 46 +++++++++---------- ais/test/common_test.go | 6 +-- ais/test/config_test.go | 6 +-- ais/test/cp_multiobj_test.go | 4 +- ais/test/downloader_test.go | 4 +- ais/test/dsort_test.go | 4 +- ais/test/ec_test.go | 20 ++++---- ais/test/etl_cp_multiobj_test.go | 2 +- ais/test/etl_stress_test.go | 14 +++--- ais/test/etl_test.go | 4 +- ais/test/fshc_test.go | 8 ++-- ais/test/integration_test.go | 38 +++++++-------- ais/test/maintain_test.go | 12 ++--- ais/test/multiproxy_test.go | 8 ++-- ais/test/object_test.go | 20 ++++---- ais/test/objprops_test.go | 2 +- ais/test/promote_test.go | 6 +-- ais/test/regression_test.go | 28 +++++------ ais/test/xaction_test.go | 6 +-- api/object.go | 9 +++- api/xaction.go | 28 +++++------ .../integration/act_bench_test.go | 4 +- bench/tools/aisloader/run.go | 4 +- cmd/cli/cli/cluster_hdlr.go | 6 +-- cmd/cli/cli/cpr.go | 2 +- cmd/cli/cli/downloader.go | 2 +- cmd/cli/cli/dsort.go | 2 +- cmd/cli/cli/job_hdlr.go | 10 ++-- cmd/cli/cli/reb_hdlr.go | 2 +- cmd/cli/cli/show_hdlr.go | 6 +-- cmd/cli/cli/storage_hdlr.go | 2 +- cmd/cli/cli/tcbtco.go | 2 +- cmd/cli/cli/xact.go | 12 ++--- cmd/cli/go.mod | 2 +- cmd/cli/go.sum | 4 +- tools/client.go | 19 ++++---- tools/node.go | 2 +- tools/tetl/etl.go | 12 ++--- 39 files changed, 193 insertions(+), 185 deletions(-) diff --git a/ais/test/archive_test.go b/ais/test/archive_test.go index a5fa10e5db..f21d331cd7 100644 --- a/ais/test/archive_test.go +++ b/ais/test/archive_test.go @@ -351,12 +351,12 @@ func testArch(t *testing.T, bck *meta.Bck) { if test.abrt { time.Sleep(time.Duration(rand.Intn(5)+1) * time.Second) tlog.Logln("Aborting...") - api.AbortXaction(baseParams, flt) + api.AbortXaction(baseParams, &flt) } var lstToAppend *cmn.LsoResult for ii := 0; ii < 2; ii++ { - api.WaitForXactionIdle(baseParams, flt) + api.WaitForXactionIdle(baseParams, &flt) tlog.Logf("List %s\n", bckTo) msg := &apc.LsoMsg{Prefix: "test_"} @@ -410,7 +410,7 @@ func testArch(t *testing.T, bck *meta.Bck) { time.Sleep(10 * time.Second) flt := xact.ArgsMsg{Kind: apc.ActArchive, Bck: m.bck} - api.WaitForXactionIdle(baseParams, flt) + api.WaitForXactionIdle(baseParams, &flt) } var ( @@ -536,7 +536,7 @@ func TestAppendToArch(t *testing.T) { } wargs := xact.ArgsMsg{Kind: apc.ActArchive, Bck: m.bck} - api.WaitForXactionIdle(baseParams, wargs) + api.WaitForXactionIdle(baseParams, &wargs) lsmsg := &apc.LsoMsg{Prefix: "test_lst"} lsmsg.AddProps(apc.GetPropsName, apc.GetPropsSize) @@ -590,7 +590,7 @@ func TestAppendToArch(t *testing.T) { } if test.multi { wargs := xact.ArgsMsg{Kind: apc.ActArchive, Bck: m.bck} - api.WaitForXactionIdle(baseParams, wargs) + api.WaitForXactionIdle(baseParams, &wargs) } lsmsg.SetFlag(apc.LsArchDir) diff --git a/ais/test/bucket_test.go b/ais/test/bucket_test.go index 135285d398..fabb8cc4f3 100644 --- a/ais/test/bucket_test.go +++ b/ais/test/bucket_test.go @@ -475,7 +475,7 @@ func overwriteLomCache(mdwrite apc.WritePolicy, t *testing.T) { } // wait for pending writes (of the copies) args := xact.ArgsMsg{Kind: apc.ActPutCopies, Bck: m.bck} - api.WaitForXactionIdle(baseParams, args) + api.WaitForXactionIdle(baseParams, &args) tlog.Logf("List %s new versions\n", m.bck) msg = &apc.LsoMsg{} @@ -1827,7 +1827,7 @@ func testLocalMirror(t *testing.T, numCopies []int) { // Even though the bucket is empty, it can take a short while until the // xaction is propagated and finished. reqArgs := xact.ArgsMsg{ID: xid, Kind: apc.ActMakeNCopies, Bck: m.bck, Timeout: xactTimeout} - _, err = api.WaitForXactionIC(baseParams, reqArgs) + _, err = api.WaitForXactionIC(baseParams, &reqArgs) tassert.CheckFatal(t, err) } @@ -1843,7 +1843,7 @@ func testLocalMirror(t *testing.T, numCopies []int) { baseParams := tools.BaseAPIParams(m.proxyURL) xargs := xact.ArgsMsg{Kind: apc.ActPutCopies, Bck: m.bck, Timeout: xactTimeout} - _, _ = api.WaitForXactionIC(baseParams, xargs) + _, _ = api.WaitForXactionIC(baseParams, &xargs) for _, copies := range numCopies { makeNCopies(t, baseParams, m.bck, copies) @@ -1862,11 +1862,11 @@ func makeNCopies(t *testing.T, baseParams api.BaseParams, bck cmn.Bck, ncopies i tassert.CheckFatal(t, err) args := xact.ArgsMsg{ID: xid, Kind: apc.ActMakeNCopies} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) tassert.CheckFatal(t, err) args = xact.ArgsMsg{Kind: apc.ActPutCopies, Bck: bck} - api.WaitForXactionIdle(baseParams, args) + api.WaitForXactionIdle(baseParams, &args) } func TestRemoteBucketMirror(t *testing.T) { @@ -1985,7 +1985,7 @@ func TestRenameBucketEmpty(t *testing.T) { tassert.CheckFatal(t, err) args := xact.ArgsMsg{ID: uuid, Kind: apc.ActMoveBck, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) tassert.CheckFatal(t, err) // Check if the new bucket appears in the list @@ -2050,7 +2050,7 @@ func TestRenameBucketNonEmpty(t *testing.T) { tassert.CheckFatal(t, err) args := xact.ArgsMsg{ID: xid, Kind: apc.ActMoveBck, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) tassert.CheckFatal(t, err) // Gets on renamed ais bucket @@ -2170,7 +2170,7 @@ func TestRenameBucketTwice(t *testing.T) { // Wait for rename to complete args := xact.ArgsMsg{ID: xid, Kind: apc.ActMoveBck, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) tassert.CheckFatal(t, err) // Check if the new bucket appears in the list @@ -2257,7 +2257,7 @@ func TestRenameBucketWithBackend(t *testing.T) { tassert.CheckFatal(t, err) xargs := xact.ArgsMsg{ID: xid} - _, err = api.WaitForXactionIC(baseParams, xargs) + _, err = api.WaitForXactionIC(baseParams, &xargs) tassert.CheckFatal(t, err) exists, err := api.QueryBuckets(baseParams, cmn.QueryBcks(bck), apc.FltPresent) @@ -2490,11 +2490,11 @@ func TestCopyBucket(t *testing.T) { if test.evictRemoteSrc { // wait for TCO idle (different x-kind) args := xact.ArgsMsg{ID: uuid, Timeout: tools.CopyBucketTimeout} - err := api.WaitForXactionIdle(baseParams, args) + err := api.WaitForXactionIdle(baseParams, &args) tassert.CheckFatal(t, err) } else { args := xact.ArgsMsg{ID: uuid, Kind: apc.ActCopyBck, Timeout: tools.CopyBucketTimeout} - _, err := api.WaitForXactionIC(baseParams, args) + _, err := api.WaitForXactionIC(baseParams, &args) tassert.CheckFatal(t, err) } } @@ -2628,10 +2628,10 @@ func testCopyBucketStats(t *testing.T, srcBck cmn.Bck, m *ioContext) { }) args := xact.ArgsMsg{ID: xid, Kind: apc.ActCopyBck, Timeout: time.Minute} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) tassert.CheckFatal(t, err) - snaps, err := api.QueryXactionSnaps(baseParams, xact.ArgsMsg{ID: xid}) + snaps, err := api.QueryXactionSnaps(baseParams, &xact.ArgsMsg{ID: xid}) tassert.CheckFatal(t, err) objs, outObjs, inObjs := snaps.ObjCounts(xid) tassert.Errorf(t, objs == int64(m.num), "expected %d objects copied, got (objs=%d, outObjs=%d, inObjs=%d)", @@ -2662,7 +2662,7 @@ func testCopyBucketPrepend(t *testing.T, srcBck cmn.Bck, m *ioContext) { tlog.Logf("Wating for x-%s[%s] %s => %s\n", apc.ActCopyBck, xid, srcBck, dstBck) args := xact.ArgsMsg{ID: xid, Kind: apc.ActCopyBck, Timeout: time.Minute} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) tassert.CheckFatal(t, err) list, err := api.ListObjects(baseParams, dstBck, nil, api.ListArgs{}) @@ -2687,7 +2687,7 @@ func testCopyBucketPrefix(t *testing.T, srcBck cmn.Bck, m *ioContext, expected i tlog.Logf("Wating for x-%s[%s] %s => %s\n", apc.ActCopyBck, xid, srcBck, dstBck) args := xact.ArgsMsg{ID: xid, Kind: apc.ActCopyBck, Timeout: time.Minute} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) tassert.CheckFatal(t, err) list, err := api.ListObjects(baseParams, dstBck, nil, api.ListArgs{}) @@ -2710,11 +2710,11 @@ func testCopyBucketAbort(t *testing.T, srcBck cmn.Bck, m *ioContext) { time.Sleep(time.Second) tlog.Logf("Aborting x-%s[%s]\n", apc.ActCopyBck, xid) - err = api.AbortXaction(baseParams, xact.ArgsMsg{ID: xid}) + err = api.AbortXaction(baseParams, &xact.ArgsMsg{ID: xid}) tassert.CheckError(t, err) time.Sleep(time.Second) - snaps, err := api.QueryXactionSnaps(baseParams, xact.ArgsMsg{ID: xid}) + snaps, err := api.QueryXactionSnaps(baseParams, &xact.ArgsMsg{ID: xid}) tassert.CheckError(t, err) aborted, err := snaps.IsAborted(xid) tassert.CheckError(t, err) @@ -2737,10 +2737,10 @@ func testCopyBucketDryRun(t *testing.T, srcBck cmn.Bck, m *ioContext) { tlog.Logf("Wating for x-%s[%s]\n", apc.ActCopyBck, xid) args := xact.ArgsMsg{ID: xid, Kind: apc.ActCopyBck, Timeout: time.Minute} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) tassert.CheckFatal(t, err) - snaps, err := api.QueryXactionSnaps(baseParams, xact.ArgsMsg{ID: xid}) + snaps, err := api.QueryXactionSnaps(baseParams, &xact.ArgsMsg{ID: xid}) tassert.CheckFatal(t, err) locObjs, outObjs, inObjs := snaps.ObjCounts(xid) @@ -2808,7 +2808,7 @@ func TestRenameAndCopyBucket(t *testing.T) { tlog.Logf("Waiting for x-%s[%s] to finish\n", apc.ActMoveBck, xid) time.Sleep(2 * time.Second) args := xact.ArgsMsg{ID: xid, Kind: apc.ActMoveBck, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) tassert.CheckFatal(t, err) time.Sleep(time.Second) @@ -2896,7 +2896,7 @@ func TestCopyAndRenameBucket(t *testing.T) { // Wait for copy to complete args := xact.ArgsMsg{ID: xid, Kind: apc.ActMoveBck, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) tassert.CheckFatal(t, err) // Check if the new bucket appears in the list @@ -3153,14 +3153,14 @@ func testWarmValidation(t *testing.T, cksumType string, mirrored, eced bool) { // wait for mirroring if mirrored { args := xact.ArgsMsg{Kind: apc.ActPutCopies, Bck: m.bck, Timeout: xactTimeout} - api.WaitForXactionIdle(baseParams, args) + api.WaitForXactionIdle(baseParams, &args) // NOTE: ref 1377 m.ensureNumCopies(baseParams, copyCnt, false /*greaterOk*/) } // wait for erasure-coding if eced { args := xact.ArgsMsg{Kind: apc.ActECPut, Bck: m.bck, Timeout: xactTimeout} - api.WaitForXactionIdle(baseParams, args) + api.WaitForXactionIdle(baseParams, &args) } // read all diff --git a/ais/test/common_test.go b/ais/test/common_test.go index 274f647d48..ece66e6634 100644 --- a/ais/test/common_test.go +++ b/ais/test/common_test.go @@ -593,7 +593,7 @@ func (m *ioContext) ensureNumCopies(baseParams api.BaseParams, expectedCopies in m.t.Helper() time.Sleep(time.Second) xargs := xact.ArgsMsg{Kind: apc.ActMakeNCopies, Bck: m.bck, Timeout: tools.RebalanceTimeout} - _, err := api.WaitForXactionIC(baseParams, xargs) + _, err := api.WaitForXactionIC(baseParams, &xargs) tassert.CheckFatal(m.t, err) // List Bucket - primarily for the copies @@ -694,7 +694,7 @@ func ensurePrevRebalanceIsFinished(baseParams api.BaseParams, err error) bool { tlog.Logln("Warning: wait for unfinished rebalance(?)") time.Sleep(5 * time.Second) args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout} - _, _ = api.WaitForXactionIC(baseParams, args) + _, _ = api.WaitForXactionIC(baseParams, &args) time.Sleep(5 * time.Second) return true } @@ -727,7 +727,7 @@ func (m *ioContext) stopMaintenance(target *meta.Snode) string { tassert.Fatalf(m.t, xact.IsValidRebID(rebID), "invalid reb ID %q", rebID) xargs := xact.ArgsMsg{ID: rebID, Kind: apc.ActRebalance, Timeout: tools.RebalanceStartTimeout} - api.WaitForXactionNode(bp, xargs, xactSnapRunning) + api.WaitForXactionNode(bp, &xargs, xactSnapRunning) return rebID } diff --git a/ais/test/config_test.go b/ais/test/config_test.go index 5e8ccb7657..b896ac5876 100644 --- a/ais/test/config_test.go +++ b/ais/test/config_test.go @@ -157,7 +157,7 @@ func TestConfigSetGlobal(t *testing.T) { // wait for ec flt := xact.ArgsMsg{Kind: apc.ActECEncode} - _, _ = api.WaitForXactionIC(baseParams, flt) + _, _ = api.WaitForXactionIC(baseParams, &flt) } func TestConfigFailOverrideClusterOnly(t *testing.T) { @@ -180,7 +180,7 @@ func TestConfigFailOverrideClusterOnly(t *testing.T) { // wait for ec flt := xact.ArgsMsg{Kind: apc.ActECEncode} - _, _ = api.WaitForXactionIC(baseParams, flt) + _, _ = api.WaitForXactionIC(baseParams, &flt) } func TestConfigOverrideAndRestart(t *testing.T) { @@ -280,7 +280,7 @@ func TestConfigSyncToNewNode(t *testing.T) { // wait for ec flt := xact.ArgsMsg{Kind: apc.ActECEncode} - _, _ = api.WaitForXactionIC(baseParams, flt) + _, _ = api.WaitForXactionIC(baseParams, &flt) } func checkConfig(t *testing.T, smap *meta.Smap, check func(*meta.Snode, *cmn.Config)) { diff --git a/ais/test/cp_multiobj_test.go b/ais/test/cp_multiobj_test.go index c121476674..284dbeb044 100644 --- a/ais/test/cp_multiobj_test.go +++ b/ais/test/cp_multiobj_test.go @@ -85,7 +85,7 @@ func TestCopyMultiObjSimple(t *testing.T) { } wargs := xact.ArgsMsg{ID: xid, Kind: apc.ActCopyObjects} - api.WaitForXactionIdle(baseParams, wargs) + api.WaitForXactionIdle(baseParams, &wargs) tlog.Logln("prefix: test/") msg := &apc.LsoMsg{Prefix: "test/"} @@ -251,7 +251,7 @@ func testCopyMobj(t *testing.T, bck *meta.Bck) { tassert.CheckFatal(t, erv.Load().(error)) } wargs := xact.ArgsMsg{Kind: apc.ActCopyObjects, Bck: m.bck} - api.WaitForXactionIdle(baseParams, wargs) + api.WaitForXactionIdle(baseParams, &wargs) msg := &apc.LsoMsg{Prefix: m.prefix} msg.AddProps(apc.GetPropsName, apc.GetPropsSize) diff --git a/ais/test/downloader_test.go b/ais/test/downloader_test.go index af0ac2d354..c54e804778 100644 --- a/ais/test/downloader_test.go +++ b/ais/test/downloader_test.go @@ -527,7 +527,7 @@ func TestDownloadRemote(t *testing.T) { xid, err := api.EvictList(baseParams, test.srcBck, expectedObjs) tassert.CheckFatal(t, err) args := xact.ArgsMsg{ID: xid, Kind: apc.ActEvictObjects, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) tassert.CheckFatal(t, err) if test.dstBck.IsAIS() { @@ -559,7 +559,7 @@ func TestDownloadRemote(t *testing.T) { xid, err = api.EvictList(baseParams, test.srcBck, expectedObjs) tassert.CheckFatal(t, err) args = xact.ArgsMsg{ID: xid, Kind: apc.ActEvictObjects, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) if test.srcBck.Equal(&test.dstBck) { tassert.CheckFatal(t, err) } else { diff --git a/ais/test/dsort_test.go b/ais/test/dsort_test.go index 125759de49..7d57a422a2 100644 --- a/ais/test/dsort_test.go +++ b/ais/test/dsort_test.go @@ -1374,7 +1374,7 @@ func TestDsortAbort(t *testing.T) { if asXaction { tlog.Logf("aborting dsort[%s] via api.AbortXaction\n", df.managerUUID) - err = api.AbortXaction(df.baseParams, xact.ArgsMsg{ID: df.managerUUID}) + err = api.AbortXaction(df.baseParams, &xact.ArgsMsg{ID: df.managerUUID}) } else { tlog.Logf("aborting dsort[%s] via api.AbortDsort\n", df.managerUUID) err = api.AbortDsort(df.baseParams, df.managerUUID) @@ -1431,7 +1431,7 @@ func TestDsortAbortDuringPhases(t *testing.T) { var err error if asXaction { tlog.Logf("aborting dsort[%s] via api.AbortXaction\n", df.managerUUID) - err = api.AbortXaction(df.baseParams, xact.ArgsMsg{ID: df.managerUUID}) + err = api.AbortXaction(df.baseParams, &xact.ArgsMsg{ID: df.managerUUID}) } else { tlog.Logf("aborting dsort[%s] via api.AbortDsort\n", df.managerUUID) err = api.AbortDsort(df.baseParams, df.managerUUID) diff --git a/ais/test/ec_test.go b/ais/test/ec_test.go index 4907d2ed3b..c4abc88303 100644 --- a/ais/test/ec_test.go +++ b/ais/test/ec_test.go @@ -510,7 +510,7 @@ func clearAllECObjects(t *testing.T, bck cmn.Bck, failOnDelErr bool, o *ecOption } wg.Wait() reqArgs := xact.ArgsMsg{Kind: apc.ActECPut, Bck: bck} - api.WaitForXactionIdle(tools.BaseAPIParams(proxyURL), reqArgs) + api.WaitForXactionIdle(tools.BaseAPIParams(proxyURL), &reqArgs) } func objectsExist(t *testing.T, baseParams api.BaseParams, bck cmn.Bck, objPatt string, objCount int) { @@ -825,12 +825,12 @@ func TestECRestoreObjAndSliceRemote(t *testing.T) { defer func() { tlog.Logln("Wait for PUTs to finish...") args := xact.ArgsMsg{Kind: apc.ActECPut} - err := api.WaitForXactionIdle(baseParams, args) + err := api.WaitForXactionIdle(baseParams, &args) tassert.CheckError(t, err) clearAllECObjects(t, bck, true, o) reqArgs := xact.ArgsMsg{Kind: apc.ActECPut, Bck: bck} - err = api.WaitForXactionIdle(baseParams, reqArgs) + err = api.WaitForXactionIdle(baseParams, &reqArgs) tassert.CheckError(t, err) }() @@ -1207,7 +1207,7 @@ func TestECDisableEnableDuringLoad(t *testing.T) { }) tassert.CheckError(t, err) reqArgs := xact.ArgsMsg{Kind: apc.ActECEncode, Bck: bck} - _, err = api.WaitForXactionIC(baseParams, reqArgs) + _, err = api.WaitForXactionIC(baseParams, &reqArgs) tassert.CheckError(t, err) abortCh.Close() @@ -1654,7 +1654,7 @@ func TestECDestroyBucket(t *testing.T) { wg.Wait() tlog.Logf("EC put files resulted in error in %d out of %d files\n", errCnt.Load(), o.objCount) args := xact.ArgsMsg{Kind: apc.ActECPut} - api.WaitForXactionIC(baseParams, args) + api.WaitForXactionIC(baseParams, &args) // create bucket with the same name and check if puts are successful newLocalBckWithProps(t, baseParams, bck, bckProps, o) @@ -2031,7 +2031,7 @@ func TestECEmergencyMountpath(t *testing.T) { // Wait for ec to finish flt := xact.ArgsMsg{Kind: apc.ActECPut, Bck: bck} - _ = api.WaitForXactionIdle(baseParams, flt) + _ = api.WaitForXactionIdle(baseParams, &flt) } func TestECRebalance(t *testing.T) { @@ -2094,7 +2094,7 @@ func TestECMountpaths(t *testing.T) { } reqArgs := xact.ArgsMsg{Kind: apc.ActECPut, Bck: bck} - api.WaitForXactionIdle(tools.BaseAPIParams(proxyURL), reqArgs) + api.WaitForXactionIdle(tools.BaseAPIParams(proxyURL), &reqArgs) } // The test only checks that the number of object after rebalance equals @@ -2216,7 +2216,7 @@ func TestECBucketEncode(t *testing.T) { tlog.Logf("Wait for EC %s\n", m.bck) xargs := xact.ArgsMsg{Kind: apc.ActECEncode, Bck: m.bck, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, xargs) + _, err = api.WaitForXactionIC(baseParams, &xargs) tassert.CheckFatal(t, err) objList, err = api.ListObjects(baseParams, m.bck, nil, api.ListArgs{}) @@ -2583,10 +2583,10 @@ func ecAndRegularUnregisterWhileRebalancing(t *testing.T, o *ecOptions, bckEC cm } }() xargs := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: startTimeout} - err = api.WaitForXactionNode(baseParams, xargs, xactSnapRunning) + err = api.WaitForXactionNode(baseParams, &xargs, xactSnapRunning) tassert.CheckError(t, err) - err = api.AbortXaction(baseParams, xargs) + err = api.AbortXaction(baseParams, &xargs) tassert.CheckError(t, err) tools.WaitForRebalAndResil(t, baseParams) tassert.CheckError(t, err) diff --git a/ais/test/etl_cp_multiobj_test.go b/ais/test/etl_cp_multiobj_test.go index 8f972da690..cf4f01ce44 100644 --- a/ais/test/etl_cp_multiobj_test.go +++ b/ais/test/etl_cp_multiobj_test.go @@ -168,7 +168,7 @@ func testETLMultiObj(t *testing.T, etlName string, bckFrom, bckTo cmn.Bck, fileR tlog.Logf("Running x-etl[%s]: %s => %s ...\n", xid, bckFrom.Cname(""), bckTo.Cname("")) wargs := xact.ArgsMsg{ID: xid, Kind: apc.ActETLObjects} - err = api.WaitForXactionIdle(baseParams, wargs) + err = api.WaitForXactionIdle(baseParams, &wargs) tassert.CheckFatal(t, err) list, err := api.ListObjects(baseParams, bckTo, nil, api.ListArgs{}) diff --git a/ais/test/etl_stress_test.go b/ais/test/etl_stress_test.go index 31cb9cb255..a6cf167dba 100644 --- a/ais/test/etl_stress_test.go +++ b/ais/test/etl_stress_test.go @@ -87,13 +87,15 @@ func TestETLBucketAbort(t *testing.T) { } xid := etlPrepareAndStart(t, m, tetl.Echo, etl.Hpull) - args := xact.ArgsMsg{ID: xid, Kind: apc.ActETLBck} + time.Sleep(time.Duration(rand.Intn(5)) * time.Second) - tlog.Logf("Aborting ETL xaction %q\n", xid) - err := api.AbortXaction(baseParams, args) + tlog.Logf("Aborting etl[%s]\n", xid) + args := xact.ArgsMsg{ID: xid, Kind: apc.ActETLBck} + err := api.AbortXaction(baseParams, &args) tassert.CheckFatal(t, err) - err = tetl.WaitForAborted(baseParams, xid, apc.ActETLBck, 5*time.Minute) + + err = tetl.WaitForAborted(baseParams, xid, apc.ActETLBck, 2*time.Minute) tassert.CheckFatal(t, err) etls, err := api.ETLList(baseParams) tassert.CheckFatal(t, err) @@ -134,7 +136,7 @@ func TestETLTargetDown(t *testing.T) { m.waitAndCheckCluState() args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout} - _, _ = api.WaitForXactionIC(baseParams, args) + _, _ = api.WaitForXactionIC(baseParams, &args) tetl.CheckNoRunningETLContainers(t, baseParams) }) @@ -246,7 +248,7 @@ def transform(input_bytes): etlDoneCh.Close() tassert.CheckFatal(t, err) - snaps, err := api.QueryXactionSnaps(baseParams, xact.ArgsMsg{ID: xid}) + snaps, err := api.QueryXactionSnaps(baseParams, &xact.ArgsMsg{ID: xid}) tassert.CheckFatal(t, err) total, err := snaps.TotalRunningTime(xid) tassert.CheckFatal(t, err) diff --git a/ais/test/etl_test.go b/ais/test/etl_test.go index 5bb9d8409c..ec75bb078f 100644 --- a/ais/test/etl_test.go +++ b/ais/test/etl_test.go @@ -215,7 +215,7 @@ func testETLObjectCloud(t *testing.T, bck cmn.Bck, etlName string, onlyLong, cac // NOTE: BytesCount references number of bytes *before* the transformation. func checkETLStats(t *testing.T, xid string, expectedObjCnt int, expectedBytesCnt uint64, skipByteStats bool) { - snaps, err := api.QueryXactionSnaps(baseParams, xact.ArgsMsg{ID: xid}) + snaps, err := api.QueryXactionSnaps(baseParams, &xact.ArgsMsg{ID: xid}) tassert.CheckFatal(t, err) objs, outObjs, inObjs := snaps.ObjCounts(xid) @@ -704,7 +704,7 @@ func TestETLBucketDryRun(t *testing.T) { tassert.CheckFatal(t, err) args := xact.ArgsMsg{ID: xid, Timeout: time.Minute} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) tassert.CheckFatal(t, err) exists, err := api.QueryBuckets(baseParams, cmn.QueryBcks(bckTo), apc.FltPresent) diff --git a/ais/test/fshc_test.go b/ais/test/fshc_test.go index 265cdf8ecd..ebff740d9a 100644 --- a/ais/test/fshc_test.go +++ b/ais/test/fshc_test.go @@ -521,7 +521,7 @@ func TestFSCheckerTargetDisableAllMountpaths(t *testing.T) { tlog.Logf("Wait for rebalance (triggered by %s leaving the cluster after having lost all mountpaths)\n", target.StringEx()) args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout} - _, _ = api.WaitForXactionIC(baseParams, args) + _, _ = api.WaitForXactionIC(baseParams, &args) tlog.Logf("Restoring target %s mountpaths\n", target.ID()) for _, mpath := range oldMpaths.Available { @@ -534,7 +534,7 @@ func TestFSCheckerTargetDisableAllMountpaths(t *testing.T) { tlog.Logf("Wait for rebalance (when target %s that has previously lost all mountpaths joins back)\n", target.StringEx()) args = xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout} - _, _ = api.WaitForXactionIC(baseParams, args) + _, _ = api.WaitForXactionIC(baseParams, &args) tools.WaitForResilvering(t, baseParams, nil) @@ -601,7 +601,7 @@ func TestFSAddMountpathRestartNode(t *testing.T) { } tlog.Logf("Wait for rebalance\n") args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout} - _, _ = api.WaitForXactionIC(baseParams, args) + _, _ = api.WaitForXactionIC(baseParams, &args) // Check if the node has newly added mountpath newMpaths, err = api.GetMountpaths(baseParams, target) @@ -678,7 +678,7 @@ func TestFSDisableAllExceptOneMountpathRestartNode(t *testing.T) { tassert.Fatalf(t, smap.GetTarget(target.ID()) != nil, "removed target didn't rejoin") args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout} - _, _ = api.WaitForXactionIC(baseParams, args) + _, _ = api.WaitForXactionIC(baseParams, &args) // Check if the the mountpaths are disabled after restart. newMpaths, err := api.GetMountpaths(baseParams, target) diff --git a/ais/test/integration_test.go b/ais/test/integration_test.go index c356e94dee..ceeb3a20ab 100644 --- a/ais/test/integration_test.go +++ b/ais/test/integration_test.go @@ -136,7 +136,7 @@ func TestProxyFailbackAndReRegisterInParallel(t *testing.T) { wg.Wait() xargs := xact.ArgsMsg{Kind: apc.ActRebalance, OnlyRunning: true, Timeout: tools.RebalanceTimeout} - _, _ = api.WaitForXactionIC(baseParams, xargs) + _, _ = api.WaitForXactionIC(baseParams, &xargs) // Step 5. m.ensureNoGetErrors() @@ -682,7 +682,7 @@ func TestGetDuringResilver(t *testing.T) { tlog.Logf("Wait for rebalance (when target %s that has previously lost all mountpaths joins back)\n", target.StringEx()) args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout} - _, _ = api.WaitForXactionIC(baseParams, args) + _, _ = api.WaitForXactionIC(baseParams, &args) tools.WaitForResilvering(t, baseParams, nil) @@ -820,7 +820,7 @@ func TestMountpathDetachAll(t *testing.T) { time.Sleep(time.Second) tlog.Logf("Wait for rebalance (triggered by %s leaving the cluster after having lost all mountpaths)\n", tname) args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout} - _, _ = api.WaitForXactionIC(baseParams, args) + _, _ = api.WaitForXactionIC(baseParams, &args) // Check if mountpaths were actually removed mountpaths, err := api.GetMountpaths(baseParams, target) @@ -842,7 +842,7 @@ func TestMountpathDetachAll(t *testing.T) { time.Sleep(2 * time.Second) tlog.Logf("Wait for rebalance (when target %s that has previously lost all mountpaths joins back)\n", target.StringEx()) args = xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout} - _, _ = api.WaitForXactionIC(baseParams, args) + _, _ = api.WaitForXactionIC(baseParams, &args) tools.WaitForResilvering(t, baseParams, target) @@ -1045,7 +1045,7 @@ func TestMountpathDisableAll(t *testing.T) { tlog.Logf("Wait for rebalance (when target %s that has previously lost all mountpaths joins back)\n", tname) args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout} - _, _ = api.WaitForXactionIC(baseParams, args) + _, _ = api.WaitForXactionIC(baseParams, &args) tools.WaitForResilvering(t, baseParams, nil) } @@ -1059,7 +1059,7 @@ func TestMountpathDisableAll(t *testing.T) { time.Sleep(2 * time.Second) tlog.Logf("Wait for rebalance (triggered by %s leaving the cluster after having lost all mountpaths)\n", tname) args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout} - _, _ = api.WaitForXactionIC(baseParams, args) + _, _ = api.WaitForXactionIC(baseParams, &args) // Check if mountpaths were actually disabled time.Sleep(time.Second) @@ -1087,7 +1087,7 @@ func TestMountpathDisableAll(t *testing.T) { time.Sleep(2 * time.Second) tlog.Logf("Wait for rebalance (when target %s that has previously lost all mountpaths joins back)\n", target.StringEx()) args = xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout} - _, _ = api.WaitForXactionIC(baseParams, args) + _, _ = api.WaitForXactionIC(baseParams, &args) tools.WaitForResilvering(t, baseParams, target) @@ -1341,7 +1341,7 @@ func TestAtimePrefetch(t *testing.T) { xid, err := api.EvictList(baseParams, bck, objs) tassert.CheckFatal(t, err) args := xact.ArgsMsg{ID: xid, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) tassert.CheckFatal(t, err) timeAfterPut := time.Now() @@ -1349,7 +1349,7 @@ func TestAtimePrefetch(t *testing.T) { xid, err = api.PrefetchList(baseParams, bck, objs) tassert.CheckFatal(t, err) args = xact.ArgsMsg{ID: xid, Kind: apc.ActPrefetchObjects, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) tassert.CheckFatal(t, err) timeFormat := time.RFC3339Nano @@ -1508,7 +1508,7 @@ func TestRenewRebalance(t *testing.T) { // Step 4: Re-register target (triggers rebalance) m.stopMaintenance(target) xargs := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceStartTimeout} - err := api.WaitForXactionNode(baseParams, xargs, xactSnapRunning) + err := api.WaitForXactionNode(baseParams, &xargs, xactSnapRunning) tassert.CheckError(t, err) tlog.Logf("rebalance started\n") @@ -1528,14 +1528,14 @@ func TestRenewRebalance(t *testing.T) { <-m.controlCh // wait for half the GETs to complete - rebID, err = api.StartXaction(baseParams, xact.ArgsMsg{Kind: apc.ActRebalance}) + rebID, err = api.StartXaction(baseParams, &xact.ArgsMsg{Kind: apc.ActRebalance}) tassert.CheckFatal(t, err) tlog.Logf("manually initiated rebalance\n") }() wg.Wait() args := xact.ArgsMsg{ID: rebID, Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) tassert.CheckError(t, err) m.ensureNoGetErrors() @@ -1663,7 +1663,7 @@ func TestGetFromMirroredWithLostMountpathAllExceptOne(t *testing.T) { // Wait for async mirroring to finish flt := xact.ArgsMsg{Kind: apc.ActPutCopies, Bck: m.bck} - api.WaitForXactionIdle(baseParams, flt) + api.WaitForXactionIdle(baseParams, &flt) time.Sleep(time.Second) // pending writes // GET @@ -1777,11 +1777,11 @@ func TestICRebalance(t *testing.T) { baseParams := tools.BaseAPIParams(m.proxyURL) tlog.Logf("Manually initiated rebalance\n") - rebID, err = api.StartXaction(baseParams, xact.ArgsMsg{Kind: apc.ActRebalance}) + rebID, err = api.StartXaction(baseParams, &xact.ArgsMsg{Kind: apc.ActRebalance}) tassert.CheckFatal(t, err) xargs := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceStartTimeout} - api.WaitForXactionNode(baseParams, xargs, xactSnapRunning) + api.WaitForXactionNode(baseParams, &xargs, xactSnapRunning) tlog.Logf("Killing %s\n", icNode.StringEx()) // cmd and args are the original command line of how the proxy is started @@ -1805,7 +1805,7 @@ func TestICRebalance(t *testing.T) { tlog.Logf("Wait for rebalance: %s\n", rebID) args := xact.ArgsMsg{ID: rebID, Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout} - _, _ = api.WaitForXactionIC(baseParams, args) + _, _ = api.WaitForXactionIC(baseParams, &args) m.waitAndCheckCluState() } @@ -1890,17 +1890,17 @@ func TestSingleResilver(t *testing.T) { // Start resilvering just on the target args := xact.ArgsMsg{Kind: apc.ActResilver, DaemonID: target.ID()} - id, err := api.StartXaction(baseParams, args) + id, err := api.StartXaction(baseParams, &args) tassert.CheckFatal(t, err) // Wait for specific resilvering x[id] args = xact.ArgsMsg{ID: id, Kind: apc.ActResilver, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) tassert.CheckFatal(t, err) // Make sure other nodes were not resilvered args = xact.ArgsMsg{ID: id} - snaps, err := api.QueryXactionSnaps(baseParams, args) + snaps, err := api.QueryXactionSnaps(baseParams, &args) tassert.CheckFatal(t, err) tassert.Errorf(t, len(snaps) == 1, "expected only 1 resilver") } diff --git a/ais/test/maintain_test.go b/ais/test/maintain_test.go index 43720e8d7d..34a9686123 100644 --- a/ais/test/maintain_test.go +++ b/ais/test/maintain_test.go @@ -99,7 +99,7 @@ func TestMaintenanceListObjects(t *testing.T) { _, err = tools.WaitForClusterState(proxyURL, "target is back", m.smap.Version, m.smap.CountActivePs(), m.smap.CountTargets()) args := xact.ArgsMsg{ID: rebID, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) tassert.CheckFatal(t, err) }() @@ -143,7 +143,7 @@ func TestMaintenanceMD(t *testing.T) { t.Cleanup(func() { args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout} - api.WaitForXactionIC(baseParams, args) + api.WaitForXactionIC(baseParams, &args) }) tlog.Logf("Decommission %s\n", dcmTarget.StringEx()) @@ -270,7 +270,7 @@ func TestMaintenanceDecommissionRebalance(t *testing.T) { if dcm != nil { tlog.Logf("Canceling maintenance for %s\n", dcm.ID()) args := xact.ArgsMsg{Kind: apc.ActRebalance} - err = api.AbortXaction(baseParams, args) + err = api.AbortXaction(baseParams, &args) tassert.CheckError(t, err) val := &apc.ActValRmNode{DaemonID: dcm.ID()} rebID, err = api.StopMaintenance(baseParams, val) @@ -278,7 +278,7 @@ func TestMaintenanceDecommissionRebalance(t *testing.T) { tools.WaitForRebalanceByID(t, baseParams, rebID) } else { args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) tassert.CheckError(t, err) } @@ -496,7 +496,7 @@ func testNodeShutdown(t *testing.T, nodeType string) { time.Sleep(time.Second) xargs := xact.ArgsMsg{ID: rebID, Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout} for i := 0; i < 3; i++ { - status, err := api.WaitForXactionIC(baseParams, xargs) + status, err := api.WaitForXactionIC(baseParams, &xargs) if err == nil { tlog.Logf("%v\n", status) break @@ -605,7 +605,7 @@ func TestShutdownListObjects(t *testing.T) { time.Sleep(time.Second) xargs := xact.ArgsMsg{ID: rebID, Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout} for i := 0; i < 3; i++ { - status, err := api.WaitForXactionIC(baseParams, xargs) + status, err := api.WaitForXactionIC(baseParams, &xargs) if err == nil { tlog.Logf("%v\n", status) break diff --git a/ais/test/multiproxy_test.go b/ais/test/multiproxy_test.go index 14ed0ac6c5..5d74318e36 100644 --- a/ais/test/multiproxy_test.go +++ b/ais/test/multiproxy_test.go @@ -1503,7 +1503,7 @@ func icSyncOwnershipTable(t *testing.T) { baseParams = tools.BaseAPIParams(newICNode.URL(cmn.NetPublic)) xargs := xact.ArgsMsg{ID: xid, Kind: apc.ActCopyBck} - _, err = api.GetOneXactionStatus(baseParams, xargs) + _, err = api.GetOneXactionStatus(baseParams, &xargs) tassert.CheckError(t, err) err = tools.RestoreNode(cmd, false, "proxy") @@ -1518,7 +1518,7 @@ func icSyncOwnershipTable(t *testing.T) { tassert.Fatalf(t, smap.IsIC(cmd.Node), "primary (%s) should be a IC member, (were: %s)", primary, smap.StrIC(primary)) baseParams = tools.BaseAPIParams(cmd.Node.URL(cmn.NetPublic)) - _, err = api.GetOneXactionStatus(baseParams, xargs) + _, err = api.GetOneXactionStatus(baseParams, &xargs) tassert.CheckError(t, err) } @@ -1574,7 +1574,7 @@ func icSinglePrimaryRevamp(t *testing.T) { tassert.CheckFatal(t, err) baseParams = tools.BaseAPIParams(cmd.Node.URL(cmn.NetPublic)) - _, err = api.GetOneXactionStatus(baseParams, xargs) + _, err = api.GetOneXactionStatus(baseParams, &xargs) tassert.CheckError(t, err) } } @@ -1634,7 +1634,7 @@ func startCPBckAndWait(t testing.TB, srcBck cmn.Bck, count int) *sync.WaitGroup wg.Done() }() xargs := xact.ArgsMsg{ID: xid, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, xargs) + _, err = api.WaitForXactionIC(baseParams, &xargs) tassert.CheckError(t, err) }(i) } diff --git a/ais/test/object_test.go b/ais/test/object_test.go index 7709b704bc..b5ef363fe4 100644 --- a/ais/test/object_test.go +++ b/ais/test/object_test.go @@ -351,28 +351,28 @@ func Test_SameLocalAndRemoteBckNameValidate(t *testing.T) { prefetchListID, err := api.PrefetchList(baseParams, bckRemote, files) tassert.CheckFatal(t, err) args := xact.ArgsMsg{ID: prefetchListID, Kind: apc.ActPrefetchObjects, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) tassert.CheckFatal(t, err) tlog.Logf("PrefetchRange %s\n", objRange) prefetchRangeID, err := api.PrefetchRange(baseParams, bckRemote, objRange) tassert.CheckFatal(t, err) args = xact.ArgsMsg{ID: prefetchRangeID, Kind: apc.ActPrefetchObjects, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) tassert.CheckFatal(t, err) tlog.Logf("EvictList %v\n", files) evictListID, err := api.EvictList(baseParams, bckRemote, files) tassert.CheckFatal(t, err) args = xact.ArgsMsg{ID: evictListID, Kind: apc.ActEvictObjects, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) tassert.Errorf(t, err != nil, "list iterator must produce not-found when not finding listed objects") tlog.Logf("EvictRange\n") evictRangeID, err := api.EvictRange(baseParams, bckRemote, objRange) tassert.CheckFatal(t, err) args = xact.ArgsMsg{ID: evictRangeID, Kind: apc.ActEvictObjects, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) tassert.CheckFatal(t, err) tools.CreateBucket(t, proxyURL, bckLocal, nil, true /*cleanup*/) @@ -402,13 +402,13 @@ func Test_SameLocalAndRemoteBckNameValidate(t *testing.T) { prefetchListID, err = api.PrefetchList(baseParams, bckRemote, files) tassert.CheckFatal(t, err) args = xact.ArgsMsg{ID: prefetchListID, Kind: apc.ActPrefetchObjects, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) tassert.CheckFatal(t, err) evictListID, err = api.EvictList(baseParams, bckRemote, files) tassert.CheckFatal(t, err) args = xact.ArgsMsg{ID: evictListID, Kind: apc.ActEvictObjects, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) tassert.CheckFatal(t, err) // Deleting from cloud bucket @@ -416,7 +416,7 @@ func Test_SameLocalAndRemoteBckNameValidate(t *testing.T) { deleteID, err := api.DeleteList(baseParams, bckRemote, files) tassert.CheckFatal(t, err) args = xact.ArgsMsg{ID: deleteID, Kind: apc.ActDeleteObjects, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) tassert.CheckFatal(t, err) // Deleting from ais bucket @@ -424,7 +424,7 @@ func Test_SameLocalAndRemoteBckNameValidate(t *testing.T) { deleteID, err = api.DeleteList(baseParams, bckLocal, files) tassert.CheckFatal(t, err) args = xact.ArgsMsg{ID: deleteID, Kind: apc.ActDeleteObjects, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) tassert.CheckFatal(t, err) _, err = api.HeadObject(baseParams, bckLocal, fileName1, apc.FltPresent, false /*silent*/) @@ -906,7 +906,7 @@ func testEvictRemoteBucket(t *testing.T, bck cmn.Bck, keepMD bool) { // Wait for async mirroring to finish flt := xact.ArgsMsg{Kind: apc.ActMakeNCopies, Bck: m.bck} - api.WaitForXactionIdle(baseParams, flt) + api.WaitForXactionIdle(baseParams, &flt) time.Sleep(time.Second) err = api.EvictRemoteBucket(baseParams, m.bck, keepMD) @@ -1549,7 +1549,7 @@ func TestOperationsWithRanges(t *testing.T) { } args := xact.ArgsMsg{ID: xid, Kind: kind, Timeout: waitTimeout} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) tassert.CheckFatal(t, err) totalFiles -= test.delta diff --git a/ais/test/objprops_test.go b/ais/test/objprops_test.go index 844662700a..abf84c9609 100644 --- a/ais/test/objprops_test.go +++ b/ais/test/objprops_test.go @@ -134,7 +134,7 @@ func propsEvict(t *testing.T, proxyURL string, bck cmn.Bck, objMap map[string]st t.Errorf("Failed to evict objects: %v\n", err) } args := xact.ArgsMsg{ID: xid, Kind: apc.ActEvictObjects, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) tassert.CheckFatal(t, err) tlog.Logf("Reading object list...\n") diff --git a/ais/test/promote_test.go b/ais/test/promote_test.go index 2452af5d48..c2b2bf2cc9 100644 --- a/ais/test/promote_test.go +++ b/ais/test/promote_test.go @@ -287,7 +287,7 @@ func (test *prmTests) wait(t *testing.T, xid, tempdir string, target *meta.Snode // wait "cases" 1. through 3. if xid != "" && !test.singleTarget { // 1. cluster-wide xaction tlog.Logf("Waiting for global %s(%s=>%s)\n", xname, tempdir, m.bck) - notifStatus, err := api.WaitForXactionIC(baseParams, xargs) + notifStatus, err := api.WaitForXactionIC(baseParams, &xargs) tassert.CheckFatal(t, err) if notifStatus != nil && (notifStatus.AbortedX || notifStatus.ErrMsg != "") { tlog.Logf("Warning: notif-status: %+v\n", notifStatus) @@ -295,14 +295,14 @@ func (test *prmTests) wait(t *testing.T, xid, tempdir string, target *meta.Snode } else if xid != "" && test.singleTarget { // 2. single-target xaction xargs.DaemonID = target.ID() tlog.Logf("Waiting for %s(%s=>%s) at %s\n", xname, tempdir, m.bck, target.StringEx()) - err := api.WaitForXactionNode(baseParams, xargs, xactSnapNotRunning) + err := api.WaitForXactionNode(baseParams, &xargs, xactSnapNotRunning) tassert.CheckFatal(t, err) } else { // 3. synchronous execution tlog.Logf("Promoting without xaction (%s=>%s)\n", tempdir, m.bck) } // collect stats - xs, err := api.QueryXactionSnaps(baseParams, xargs) + xs, err := api.QueryXactionSnaps(baseParams, &xargs) tassert.CheckFatal(t, err) if xid != "" { locObjs, outObjs, inObjs = xs.ObjCounts(xid) diff --git a/ais/test/regression_test.go b/ais/test/regression_test.go index d3e6308328..ef28bb4f8f 100644 --- a/ais/test/regression_test.go +++ b/ais/test/regression_test.go @@ -366,7 +366,7 @@ func doBucketRegressionTest(t *testing.T, proxyURL string, rtd regressionTestDat func postRenameWaitAndCheck(t *testing.T, baseParams api.BaseParams, rtd regressionTestData, numPuts int, objNames []string, xid string) { xargs := xact.ArgsMsg{ID: xid, Kind: apc.ActMoveBck, Bck: rtd.renamedBck, Timeout: tools.RebalanceTimeout} - _, err := api.WaitForXactionIC(baseParams, xargs) + _, err := api.WaitForXactionIC(baseParams, &xargs) if err != nil { if herr, ok := err.(*cmn.ErrHTTP); ok && herr.Status == http.StatusNotFound { smap := tools.GetClusterMap(t, proxyURL) @@ -700,11 +700,11 @@ func TestLRU(t *testing.T) { }) tlog.Logln("starting LRU...") - xid, err := api.StartXaction(baseParams, xact.ArgsMsg{Kind: apc.ActLRU}) + xid, err := api.StartXaction(baseParams, &xact.ArgsMsg{Kind: apc.ActLRU}) tassert.CheckFatal(t, err) args := xact.ArgsMsg{ID: xid, Kind: apc.ActLRU, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) tassert.CheckFatal(t, err) // Check results @@ -751,7 +751,7 @@ func TestPrefetchList(t *testing.T) { } args := xact.ArgsMsg{ID: xid, Kind: apc.ActEvictObjects, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) tassert.CheckFatal(t, err) // 3. Prefetch evicted objects @@ -761,12 +761,12 @@ func TestPrefetchList(t *testing.T) { } args = xact.ArgsMsg{ID: xid, Kind: apc.ActPrefetchObjects, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) tassert.CheckFatal(t, err) // 4. Ensure that all the prefetches occurred. xargs := xact.ArgsMsg{ID: xid, Timeout: tools.RebalanceTimeout} - snaps, err := api.QueryXactionSnaps(baseParams, xargs) + snaps, err := api.QueryXactionSnaps(baseParams, &xargs) tassert.CheckFatal(t, err) locObjs, _, _ := snaps.ObjCounts(xid) if locObjs != int64(m.num) { @@ -821,7 +821,7 @@ func TestDeleteList(t *testing.T) { tassert.CheckError(t, err) args := xact.ArgsMsg{ID: xid, Kind: apc.ActDeleteObjects, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) tassert.CheckFatal(t, err) // 3. Check to see that all the files have been deleted @@ -878,18 +878,18 @@ func TestPrefetchRange(t *testing.T) { xid, err := api.EvictRange(baseParams, bck, rng) tassert.CheckError(t, err) args := xact.ArgsMsg{ID: xid, Kind: apc.ActEvictObjects, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) tassert.CheckFatal(t, err) xid, err = api.PrefetchRange(baseParams, bck, rng) tassert.CheckError(t, err) args = xact.ArgsMsg{ID: xid, Kind: apc.ActPrefetchObjects, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) tassert.CheckFatal(t, err) // 4. Ensure all done xargs := xact.ArgsMsg{ID: xid, Timeout: tools.RebalanceTimeout} - snaps, err := api.QueryXactionSnaps(baseParams, xargs) + snaps, err := api.QueryXactionSnaps(baseParams, &xargs) tassert.CheckFatal(t, err) locObjs, _, _ := snaps.ObjCounts(xid) if locObjs != int64(len(files)) { @@ -957,7 +957,7 @@ func TestDeleteRange(t *testing.T) { xid, err := api.DeleteRange(baseParams, b, smallrange) tassert.CheckError(t, err) args := xact.ArgsMsg{ID: xid, Kind: apc.ActDeleteObjects, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) tassert.CheckFatal(t, err) // 3. Check to see that the correct files have been deleted @@ -986,7 +986,7 @@ func TestDeleteRange(t *testing.T) { xid, err = api.DeleteRange(baseParams, b, bigrange) tassert.CheckError(t, err) args = xact.ArgsMsg{ID: xid, Kind: apc.ActDeleteObjects, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) tassert.CheckFatal(t, err) // 5. Check to see that all the files have been deleted @@ -1063,7 +1063,7 @@ func TestStressDeleteRange(t *testing.T) { xid, err := api.DeleteRange(baseParams, bck, partialRange) tassert.CheckError(t, err) args := xact.ArgsMsg{ID: xid, Kind: apc.ActDeleteObjects, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) tassert.CheckFatal(t, err) // 3. Check to see that correct objects have been deleted @@ -1095,7 +1095,7 @@ func TestStressDeleteRange(t *testing.T) { xid, err = api.DeleteRange(baseParams, bck, fullRange) tassert.CheckError(t, err) args = xact.ArgsMsg{ID: xid, Kind: apc.ActDeleteObjects, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, args) + _, err = api.WaitForXactionIC(baseParams, &args) tassert.CheckFatal(t, err) // 5. Check to see that all files have been deleted diff --git a/ais/test/xaction_test.go b/ais/test/xaction_test.go index 728c225a10..a85ffde5d5 100644 --- a/ais/test/xaction_test.go +++ b/ais/test/xaction_test.go @@ -24,7 +24,7 @@ func TestXactionNotFound(t *testing.T) { proxyURL = tools.RandomProxyURL(t) baseParams = tools.BaseAPIParams(proxyURL) ) - _, err := api.QueryXactionSnaps(baseParams, xact.ArgsMsg{ID: "dummy-" + cos.GenUUID()}) + _, err := api.QueryXactionSnaps(baseParams, &xact.ArgsMsg{ID: "dummy-" + cos.GenUUID()}) tools.CheckErrIsNotFound(t, err) } @@ -45,7 +45,7 @@ func TestXactionAllStatus(t *testing.T) { _, xname := xact.GetKindName(kind) xargs.Kind = xname } - vec, err := api.GetAllXactionStatus(baseParams, xargs, test.force) + vec, err := api.GetAllXactionStatus(baseParams, &xargs, test.force) tassert.CheckFatal(t, err) if len(vec) == 0 { continue @@ -80,7 +80,7 @@ func TestXactionAllStatus(t *testing.T) { time.Sleep(2 * time.Second) xargs = xact.ArgsMsg{Kind: kind, OnlyRunning: false} - vec, err = api.GetAllXactionStatus(baseParams, xargs, test.force) + vec, err = api.GetAllXactionStatus(baseParams, &xargs, test.force) tassert.CheckFatal(t, err) for _, a := range aborted { found := false diff --git a/api/object.go b/api/object.go index 818b578a37..cd841e2308 100644 --- a/api/object.go +++ b/api/object.go @@ -504,6 +504,7 @@ func FlushObject(args *FlushArgs) error { var ( header http.Header q = make(url.Values, 4) + method = args.BaseParams.Method ) q.Set(apc.QparamAppendType, apc.FlushOp) q.Set(apc.QparamAppendHandle, args.Handle) @@ -524,6 +525,7 @@ func FlushObject(args *FlushArgs) error { } err := reqParams.DoRequest() FreeRp(reqParams) + args.BaseParams.Method = method return err } @@ -546,8 +548,10 @@ func RenameObject(bp BaseParams, bck cmn.Bck, oldName, newName string) error { // promote files and directories to ais objects func Promote(args *PromoteArgs) (xid string, err error) { - actMsg := apc.ActMsg{Action: apc.ActPromote, Name: args.SrcFQN} - actMsg.Value = &args.PromoteArgs + var ( + actMsg = apc.ActMsg{Action: apc.ActPromote, Name: args.SrcFQN, Value: &args.PromoteArgs} + method = args.BaseParams.Method + ) args.BaseParams.Method = http.MethodPost reqParams := AllocRp() { @@ -559,6 +563,7 @@ func Promote(args *PromoteArgs) (xid string, err error) { } _, err = reqParams.doReqStr(&xid) FreeRp(reqParams) + args.BaseParams.Method = method return } diff --git a/api/xaction.go b/api/xaction.go index a7bc2104b8..bc632363a4 100644 --- a/api/xaction.go +++ b/api/xaction.go @@ -20,7 +20,7 @@ import ( ) // Start xaction -func StartXaction(bp BaseParams, args xact.ArgsMsg) (xid string, err error) { +func StartXaction(bp BaseParams, args *xact.ArgsMsg) (xid string, err error) { if !xact.Table[args.Kind].Startable { return "", fmt.Errorf("xaction %q is not startable", args.Kind) } @@ -44,7 +44,7 @@ func StartXaction(bp BaseParams, args xact.ArgsMsg) (xid string, err error) { } // Abort ("stop") xactions -func AbortXaction(bp BaseParams, args xact.ArgsMsg) (err error) { +func AbortXaction(bp BaseParams, args *xact.ArgsMsg) (err error) { msg := apc.ActMsg{Action: apc.ActXactStop, Value: args} bp.Method = http.MethodPut reqParams := AllocRp() @@ -85,7 +85,7 @@ func GetAllRunningXactions(bp BaseParams, kindOrName string) (out []string, err // QueryXactionSnaps gets all xaction snaps based on the specified selection. // NOTE: args.Kind can be either xaction kind or name - here and elsewhere -func QueryXactionSnaps(bp BaseParams, args xact.ArgsMsg) (xs xact.MultiSnap, err error) { +func QueryXactionSnaps(bp BaseParams, args *xact.ArgsMsg) (xs xact.MultiSnap, err error) { msg := xact.QueryMsg{ID: args.ID, Kind: args.Kind, Bck: args.Bck} if args.OnlyRunning { msg.OnlyRunning = apc.Bool(true) @@ -113,7 +113,7 @@ func QueryXactionSnaps(bp BaseParams, args xact.ArgsMsg) (xs xact.MultiSnap, err // any matching xaction that's currently running, or - if nothing's running - // the one that's finished most recently, // if exists -func GetOneXactionStatus(bp BaseParams, args xact.ArgsMsg) (status *nl.Status, err error) { +func GetOneXactionStatus(bp BaseParams, args *xact.ArgsMsg) (status *nl.Status, err error) { status = &nl.Status{} q := url.Values{apc.QparamWhat: []string{apc.WhatOneXactStatus}} err = getxst(status, q, bp, args) @@ -121,7 +121,7 @@ func GetOneXactionStatus(bp BaseParams, args xact.ArgsMsg) (status *nl.Status, e } // same as above, except that it returns _all_ matching xactions -func GetAllXactionStatus(bp BaseParams, args xact.ArgsMsg, force bool) (matching nl.StatusVec, err error) { +func GetAllXactionStatus(bp BaseParams, args *xact.ArgsMsg, force bool) (matching nl.StatusVec, err error) { q := url.Values{apc.QparamWhat: []string{apc.WhatAllXactStatus}} if force { // (force just-in-time) @@ -134,7 +134,7 @@ func GetAllXactionStatus(bp BaseParams, args xact.ArgsMsg, force bool) (matching return } -func getxst(out any, q url.Values, bp BaseParams, args xact.ArgsMsg) (err error) { +func getxst(out any, q url.Values, bp BaseParams, args *xact.ArgsMsg) (err error) { bp.Method = http.MethodGet msg := xact.QueryMsg{ID: args.ID, Kind: args.Kind, Bck: args.Bck} if args.OnlyRunning { @@ -176,16 +176,18 @@ func (ci *consIdle) check(snaps xact.MultiSnap) (done, resetProbeFreq bool) { } // WaitForXactionIdle waits for a given on-demand xaction to be idle. -func WaitForXactionIdle(bp BaseParams, args xact.ArgsMsg) error { - ci := &consIdle{xid: args.ID} +func WaitForXactionIdle(bp BaseParams, args *xact.ArgsMsg) (err error) { + ci, running := &consIdle{xid: args.ID}, args.OnlyRunning args.OnlyRunning = true - return WaitForXactionNode(bp, args, ci.check) + err = WaitForXactionNode(bp, args, ci.check) + args.OnlyRunning = running + return err } // WaitForXactionIC waits for a given xaction to complete. // Use it only for global xactions // (those that execute on all targets and report their status to IC, e.g. rebalance). -func WaitForXactionIC(bp BaseParams, args xact.ArgsMsg) (status *nl.Status, err error) { +func WaitForXactionIC(bp BaseParams, args *xact.ArgsMsg) (status *nl.Status, err error) { return _waitx(bp, args, nil) } @@ -193,7 +195,7 @@ func WaitForXactionIC(bp BaseParams, args xact.ArgsMsg) (status *nl.Status, err // Use for xactions that do _not_ report their status to IC members, namely: // - xact.IdlesBeforeFinishing() // - x-resilver (as it usually runs on a single node) -func WaitForXactionNode(bp BaseParams, args xact.ArgsMsg, fn func(xact.MultiSnap) (bool, bool)) error { +func WaitForXactionNode(bp BaseParams, args *xact.ArgsMsg, fn func(xact.MultiSnap) (bool, bool)) error { debug.Assert(args.Kind != "" || xact.IsValidUUID(args.ID)) _, err := _waitx(bp, args, fn) return err @@ -201,7 +203,7 @@ func WaitForXactionNode(bp BaseParams, args xact.ArgsMsg, fn func(xact.MultiSnap // TODO: `status` is currently always nil when we wait with a (`fn`) callback // TODO: un-defer cancel() -func _waitx(bp BaseParams, args xact.ArgsMsg, fn func(xact.MultiSnap) (bool, bool)) (status *nl.Status, err error) { +func _waitx(bp BaseParams, args *xact.ArgsMsg, fn func(xact.MultiSnap) (bool, bool)) (status *nl.Status, err error) { var ( elapsed time.Duration begin = mono.NanoTime() @@ -240,7 +242,7 @@ func _waitx(bp BaseParams, args xact.ArgsMsg, fn func(xact.MultiSnap) (bool, boo } } -func _times(args xact.ArgsMsg) (time.Duration, time.Duration) { +func _times(args *xact.ArgsMsg) (time.Duration, time.Duration) { total := args.Timeout switch { case args.Timeout == 0: diff --git a/bench/microbenchmarks/integration/act_bench_test.go b/bench/microbenchmarks/integration/act_bench_test.go index f7b705e549..3966921a7c 100644 --- a/bench/microbenchmarks/integration/act_bench_test.go +++ b/bench/microbenchmarks/integration/act_bench_test.go @@ -87,7 +87,7 @@ func BenchmarkECEncode(b *testing.B) { tassert.CheckFatal(b, err) reqArgs := xact.ArgsMsg{Kind: apc.ActECEncode, Bck: bck, Timeout: ecTime} - _, err = api.WaitForXactionIC(baseParams, reqArgs) + _, err = api.WaitForXactionIC(baseParams, &reqArgs) tassert.CheckFatal(b, err) }) } @@ -138,7 +138,7 @@ func BenchmarkECRebalance(b *testing.B) { tassert.CheckFatal(b, err) reqArgs := xact.ArgsMsg{Kind: apc.ActECEncode, Bck: bck, Timeout: ecTime} - _, err = api.WaitForXactionIC(baseParams, reqArgs) + _, err = api.WaitForXactionIC(baseParams, &reqArgs) tassert.CheckFatal(b, err) args := &apc.ActValRmNode{DaemonID: tgtLost.ID()} diff --git a/bench/tools/aisloader/run.go b/bench/tools/aisloader/run.go index bf16b5850c..a8bc30fc6b 100644 --- a/bench/tools/aisloader/run.go +++ b/bench/tools/aisloader/run.go @@ -1086,7 +1086,7 @@ func cleanupObjs(objs []string, wg *sync.WaitGroup) { fmt.Println("delete err ", err) } args := xact.ArgsMsg{ID: xid, Kind: apc.ActDeleteObjects} - if _, err = api.WaitForXactionIC(runParams.bp, args); err != nil { + if _, err = api.WaitForXactionIC(runParams.bp, &args); err != nil { fmt.Println("wait for xaction err ", err) } } @@ -1097,7 +1097,7 @@ func cleanupObjs(objs []string, wg *sync.WaitGroup) { fmt.Println("delete err ", err) } args := xact.ArgsMsg{ID: xid, Kind: apc.ActDeleteObjects} - if _, err = api.WaitForXactionIC(runParams.bp, args); err != nil { + if _, err = api.WaitForXactionIC(runParams.bp, &args); err != nil { fmt.Println("wait for xaction err ", err) } } diff --git a/cmd/cli/cli/cluster_hdlr.go b/cmd/cli/cli/cluster_hdlr.go index 4bb6cbd59b..1fe32cd784 100644 --- a/cmd/cli/cli/cluster_hdlr.go +++ b/cmd/cli/cli/cluster_hdlr.go @@ -492,7 +492,7 @@ func startClusterRebalanceHandler(c *cli.Context) (err error) { func stopClusterRebalanceHandler(c *cli.Context) error { xargs := xact.ArgsMsg{Kind: apc.ActRebalance, OnlyRunning: true} - snap, err := getXactSnap(xargs) + snap, err := getXactSnap(&xargs) if err != nil { return err } @@ -501,7 +501,7 @@ func stopClusterRebalanceHandler(c *cli.Context) error { } xargs.ID, xargs.OnlyRunning = snap.ID, false - if err := api.AbortXaction(apiBP, xargs); err != nil { + if err := api.AbortXaction(apiBP, &xargs); err != nil { return V(err) } fmt.Fprintf(c.App.Writer, "Stopped %s[%s]\n", apc.ActRebalance, snap.ID) @@ -532,7 +532,7 @@ func showClusterRebalanceHandler(c *cli.Context) error { DaemonID: daemonID, OnlyRunning: !flagIsSet(c, allJobsFlag), } - _, err := xactList(c, xargs, false) + _, err := xactList(c, &xargs, false) return err } diff --git a/cmd/cli/cli/cpr.go b/cmd/cli/cli/cpr.go index 0933980527..bd32dd4967 100644 --- a/cmd/cli/cli/cpr.go +++ b/cmd/cli/cli/cpr.go @@ -154,7 +154,7 @@ outer: var ( size, objs int64 nrun int - xs, err = queryXactions(xargs) + xs, err = queryXactions(&xargs) ) if err != nil { if herr, ok := err.(*cmn.ErrHTTP); ok && herr.Status == http.StatusNotFound { diff --git a/cmd/cli/cli/downloader.go b/cmd/cli/cli/downloader.go index bba10ee303..f81d290dd5 100644 --- a/cmd/cli/cli/downloader.go +++ b/cmd/cli/cli/downloader.go @@ -372,7 +372,7 @@ func downloadJobsList(c *cli.Context, regex string, caption bool) (int, error) { return l, err } xargs := xact.ArgsMsg{ID: j.XactID, Kind: apc.ActDownload} - if _, err := xactList(c, xargs, false /*caption*/); err != nil { + if _, err := xactList(c, &xargs, false /*caption*/); err != nil { return l, err } fmt.Fprintln(c.App.Writer) diff --git a/cmd/cli/cli/dsort.go b/cmd/cli/cli/dsort.go index b848d90ab2..7ddb064090 100644 --- a/cmd/cli/cli/dsort.go +++ b/cmd/cli/cli/dsort.go @@ -639,7 +639,7 @@ func dsortJobsList(c *cli.Context, list []*dsort.JobInfo, usejs bool) error { // super-verbose if cliConfVerbose() { xargs := xact.ArgsMsg{ID: j.ID, Kind: apc.ActDsort} - if _, err := xactList(c, xargs, false /*caption*/); err != nil { + if _, err := xactList(c, &xargs, false /*caption*/); err != nil { return err } } diff --git a/cmd/cli/cli/job_hdlr.go b/cmd/cli/cli/job_hdlr.go index 2ec4877a5e..ddd0b63ec4 100644 --- a/cmd/cli/cli/job_hdlr.go +++ b/cmd/cli/cli/job_hdlr.go @@ -298,7 +298,7 @@ func startXaction(c *cli.Context, xname string, bck cmn.Bck, sid string) error { } xargs := xact.ArgsMsg{Kind: xname, Bck: bck, DaemonID: sid} - xid, err := api.StartXaction(apiBP, xargs) + xid, err := api.StartXaction(apiBP, &xargs) if err != nil { return V(err) } @@ -624,7 +624,7 @@ func startLRUHandler(c *cli.Context) (err error) { id string xargs = xact.ArgsMsg{Kind: apc.ActLRU, Buckets: buckets, Force: flagIsSet(c, forceFlag)} ) - if id, err = api.StartXaction(apiBP, xargs); err != nil { + if id, err = api.StartXaction(apiBP, &xargs); err != nil { return } @@ -764,7 +764,7 @@ func stopJobHandler(c *cli.Context) error { // query msg := formatXactMsg(xactID, xname, bck) xargs := xact.ArgsMsg{ID: xactID, Kind: xactKind} - snap, err := getXactSnap(xargs) + snap, err := getXactSnap(&xargs) if err != nil { return fmt.Errorf("cannot stop %s: %v", msg, err) } @@ -795,7 +795,7 @@ func stopJobHandler(c *cli.Context) error { // abort args := xact.ArgsMsg{ID: xactID, Kind: xactKind, Bck: snap.Bck} - if err := api.AbortXaction(apiBP, args); err != nil { + if err := api.AbortXaction(apiBP, &args); err != nil { return V(err) } actionDone(c, fmt.Sprintf("Stopped %s\n", msg)) @@ -819,7 +819,7 @@ func stopXactionKind(c *cli.Context, xactKind, xname string, bck cmn.Bck) error args = xact.ArgsMsg{ID: xactID, Kind: xactKind, Bck: bck} msg = formatXactMsg(xactID, xname, bck) ) - if err := api.AbortXaction(apiBP, args); err != nil { + if err := api.AbortXaction(apiBP, &args); err != nil { actionWarn(c, fmt.Sprintf("failed to stop %s: %v", msg, err)) } else { actionDone(c, "Stopped "+msg) diff --git a/cmd/cli/cli/reb_hdlr.go b/cmd/cli/cli/reb_hdlr.go index 6fa790d008..ab8c7c9b03 100644 --- a/cmd/cli/cli/reb_hdlr.go +++ b/cmd/cli/cli/reb_hdlr.go @@ -89,7 +89,7 @@ func showRebalanceHandler(c *cli.Context) error { ) longRun.init(c, true /*run once unless*/) for countdown := longRun.count; countdown > 0 || longRun.isForever(); countdown-- { - rebSnaps, err := api.QueryXactionSnaps(apiBP, xargs) + rebSnaps, err := api.QueryXactionSnaps(apiBP, &xargs) if err != nil { if herr, ok := err.(*cmn.ErrHTTP); ok { if herr.Status == http.StatusNotFound { diff --git a/cmd/cli/cli/show_hdlr.go b/cmd/cli/cli/show_hdlr.go index ab9b820088..acbd51e95b 100644 --- a/cmd/cli/cli/show_hdlr.go +++ b/cmd/cli/cli/show_hdlr.go @@ -325,7 +325,7 @@ func _showJobs(c *cli.Context, name, xid, daemonID string, bck cmn.Bck, caption return 0, nil } } - return xactList(c, xargs, caption) + return xactList(c, &xargs, caption) } } @@ -397,7 +397,7 @@ func showClusterHandler(c *cli.Context) error { return cluDaeStatus(c, smap, tstatusMap, pstatusMap, cluConfig, what) } -func xactList(c *cli.Context, xargs xact.ArgsMsg, caption bool) (int, error) { +func xactList(c *cli.Context, xargs *xact.ArgsMsg, caption bool) (int, error) { // override the caller's choice if explicitly identified if xargs.ID != "" { debug.Assert(xact.IsValidUUID(xargs.ID), xargs.ID) @@ -434,7 +434,7 @@ func xactList(c *cli.Context, xargs xact.ArgsMsg, caption bool) (int, error) { return ll, nil } -func xlistByKindID(c *cli.Context, xargs xact.ArgsMsg, caption bool, xs xact.MultiSnap) (int, error) { +func xlistByKindID(c *cli.Context, xargs *xact.ArgsMsg, caption bool, xs xact.MultiSnap) (int, error) { // first, extract snaps for: xargs.ID, Kind filteredXs := make(xact.MultiSnap, 8) for tid, snaps := range xs { diff --git a/cmd/cli/cli/storage_hdlr.go b/cmd/cli/cli/storage_hdlr.go index 25fc59eb1f..0bcbbe632b 100644 --- a/cmd/cli/cli/storage_hdlr.go +++ b/cmd/cli/cli/storage_hdlr.go @@ -218,7 +218,7 @@ func cleanupStorageHandler(c *cli.Context) (err error) { } } xargs := xact.ArgsMsg{Kind: apc.ActStoreCleanup, Bck: bck} - if id, err = api.StartXaction(apiBP, xargs); err != nil { + if id, err = api.StartXaction(apiBP, &xargs); err != nil { return } diff --git a/cmd/cli/cli/tcbtco.go b/cmd/cli/cli/tcbtco.go index 09554652ed..681e5110fe 100644 --- a/cmd/cli/cli/tcbtco.go +++ b/cmd/cli/cli/tcbtco.go @@ -194,7 +194,7 @@ func etlBucket(c *cli.Context, etlName string, bckFrom, bckTo cmn.Bck, allInclud } // [DRY-RUN] - snaps, err := api.QueryXactionSnaps(apiBP, xargs) + snaps, err := api.QueryXactionSnaps(apiBP, &xargs) if err != nil { return V(err) } diff --git a/cmd/cli/cli/xact.go b/cmd/cli/cli/xact.go index 79bc1a3992..54d0cac062 100644 --- a/cmd/cli/cli/xact.go +++ b/cmd/cli/cli/xact.go @@ -66,10 +66,10 @@ func waitXact(apiBP api.BaseParams, args xact.ArgsMsg) error { kind, xname := xact.GetKindName(args.Kind) debug.Assert(kind != "") // relying on it to decide between APIs if xact.IdlesBeforeFinishing(kind) { - return api.WaitForXactionIdle(apiBP, args) + return api.WaitForXactionIdle(apiBP, &args) } // otherwise, IC - status, err := api.WaitForXactionIC(apiBP, args) + status, err := api.WaitForXactionIC(apiBP, &args) if err != nil { return V(err) } @@ -81,7 +81,7 @@ func waitXact(apiBP api.BaseParams, args xact.ArgsMsg) error { func getKindNameForID(xid string, otherKind ...string) (kind, xname string, rerr error) { xargs := xact.ArgsMsg{ID: xid} - status, err := api.GetOneXactionStatus(apiBP, xargs) // via IC + status, err := api.GetOneXactionStatus(apiBP, &xargs) // via IC if err == nil { kind, xname = xact.GetKindName(status.Kind) return @@ -89,7 +89,7 @@ func getKindNameForID(xid string, otherKind ...string) (kind, xname string, rerr if herr, ok := err.(*cmn.ErrHTTP); ok && herr.Status == http.StatusNotFound { // 2nd attempt assuming xaction in question `IdlesBeforeFinishing` time.Sleep(time.Second) - xs, err := queryXactions(xargs) + xs, err := queryXactions(&xargs) if err != nil { rerr = err return @@ -185,7 +185,7 @@ func flattenXactStats(snap *cluster.Snap, units string) nvpairList { return props } -func getXactSnap(xargs xact.ArgsMsg) (*cluster.Snap, error) { +func getXactSnap(xargs *xact.ArgsMsg) (*cluster.Snap, error) { xs, err := api.QueryXactionSnaps(apiBP, xargs) if err != nil { return nil, V(err) @@ -198,7 +198,7 @@ func getXactSnap(xargs xact.ArgsMsg) (*cluster.Snap, error) { return nil, nil } -func queryXactions(xargs xact.ArgsMsg) (xs xact.MultiSnap, err error) { +func queryXactions(xargs *xact.ArgsMsg) (xs xact.MultiSnap, err error) { orig := apiBP.Client.Timeout if !xargs.OnlyRunning { apiBP.Client.Timeout = min(orig, longClientTimeout) diff --git a/cmd/cli/go.mod b/cmd/cli/go.mod index 603f2e1185..5143ff2e48 100644 --- a/cmd/cli/go.mod +++ b/cmd/cli/go.mod @@ -4,7 +4,7 @@ go 1.21 // direct require ( - github.com/NVIDIA/aistore v1.3.22-0.20231125154206-22015e78a64d + github.com/NVIDIA/aistore v1.3.22-0.20231125164830-e00e95a6487e github.com/fatih/color v1.16.0 github.com/json-iterator/go v1.1.12 github.com/onsi/ginkgo v1.16.5 diff --git a/cmd/cli/go.sum b/cmd/cli/go.sum index c5bad65c39..9874a3f65a 100644 --- a/cmd/cli/go.sum +++ b/cmd/cli/go.sum @@ -1,7 +1,7 @@ code.cloudfoundry.org/bytefmt v0.0.0-20190710193110-1eb035ffe2b6/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc= github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= -github.com/NVIDIA/aistore v1.3.22-0.20231125154206-22015e78a64d h1:M2Wa2gSi3yAUz8flRUF1yHZBSUVAxLL437rBgXqoso0= -github.com/NVIDIA/aistore v1.3.22-0.20231125154206-22015e78a64d/go.mod h1:cOTgDt5fVCQOB+rnvYZgVFRF3dEzPqu8f22F3F+Yvtg= +github.com/NVIDIA/aistore v1.3.22-0.20231125164830-e00e95a6487e h1:Y1o9M5A7PB18VpTrgNR9pmKW/Or9Wuclsp956KW+DOo= +github.com/NVIDIA/aistore v1.3.22-0.20231125164830-e00e95a6487e/go.mod h1:cOTgDt5fVCQOB+rnvYZgVFRF3dEzPqu8f22F3F+Yvtg= github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8= github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA= diff --git a/tools/client.go b/tools/client.go index f0cbe0744c..84f494d669 100644 --- a/tools/client.go +++ b/tools/client.go @@ -237,7 +237,7 @@ func CleanupRemoteBucket(t *testing.T, proxyURL string, bck cmn.Bck, prefix stri xid, err := api.DeleteList(bp, bck, toDelete) tassert.CheckFatal(t, err) args := xact.ArgsMsg{ID: xid, Kind: apc.ActDeleteObjects, Timeout: BucketCleanupTimeout} - _, err = api.WaitForXactionIC(bp, args) + _, err = api.WaitForXactionIC(bp, &args) tassert.CheckFatal(t, err) } @@ -498,7 +498,7 @@ func EvictObjects(t *testing.T, proxyURL string, bck cmn.Bck, objList []string) } args := xact.ArgsMsg{ID: xid, Kind: apc.ActEvictObjects, Timeout: EvictPrefetchTimeout} - if _, err := api.WaitForXactionIC(bp, args); err != nil { + if _, err := api.WaitForXactionIC(bp, &args); err != nil { t.Errorf("Wait for xaction to finish failed, err = %v", err) } } @@ -535,7 +535,7 @@ func WaitForRebalAndResil(t testing.TB, bp api.BaseParams, timeouts ...time.Dura go func() { defer wg.Done() xargs := xact.ArgsMsg{Kind: apc.ActRebalance, OnlyRunning: true, Timeout: timeout} - if _, err := api.WaitForXactionIC(bp, xargs); err != nil { + if _, err := api.WaitForXactionIC(bp, &xargs); err != nil { if cmn.IsStatusNotFound(err) { return } @@ -546,7 +546,7 @@ func WaitForRebalAndResil(t testing.TB, bp api.BaseParams, timeouts ...time.Dura go func() { defer wg.Done() xargs := xact.ArgsMsg{Kind: apc.ActResilver, OnlyRunning: true, Timeout: timeout} - if _, err := api.WaitForXactionIC(bp, xargs); err != nil { + if _, err := api.WaitForXactionIC(bp, &xargs); err != nil { if cmn.IsStatusNotFound(err) { return } @@ -565,14 +565,14 @@ func WaitForRebalAndResil(t testing.TB, bp api.BaseParams, timeouts ...time.Dura // compare w/ `tools.WaitForResilvering` func _waitResil(t testing.TB, bp api.BaseParams, timeout time.Duration) { xargs := xact.ArgsMsg{Kind: apc.ActResilver, OnlyRunning: true, Timeout: timeout} - _, err := api.WaitForXactionIC(bp, xargs) + _, err := api.WaitForXactionIC(bp, &xargs) if err == nil { return } if herr, ok := err.(*cmn.ErrHTTP); ok { if herr.Status == http.StatusNotFound { // double check iff not found time.Sleep(xactPollSleep) - _, err = api.WaitForXactionIC(bp, xargs) + _, err = api.WaitForXactionIC(bp, &xargs) } } if err == nil { @@ -597,7 +597,7 @@ func WaitForRebalanceByID(t *testing.T, bp api.BaseParams, rebID string, timeout } tlog.Logf("Wait for rebalance %s\n", rebID) xargs := xact.ArgsMsg{ID: rebID, Kind: apc.ActRebalance, OnlyRunning: true, Timeout: timeout} - _, err := api.WaitForXactionIC(bp, xargs) + _, err := api.WaitForXactionIC(bp, &xargs) tassert.CheckFatal(t, err) } @@ -606,12 +606,11 @@ func _waitReToStart(bp api.BaseParams) { kinds = []string{apc.ActRebalance, apc.ActResilver} timeout = max(10*xactPollSleep, MaxCplaneTimeout) retries = int(timeout / xactPollSleep) - args = xact.ArgsMsg{Timeout: xactPollSleep, OnlyRunning: true} ) for i := 0; i < retries; i++ { for _, kind := range kinds { - args.Kind = kind - status, err := api.GetOneXactionStatus(bp, args) + args := xact.ArgsMsg{Timeout: xactPollSleep, OnlyRunning: true, Kind: kind} + status, err := api.GetOneXactionStatus(bp, &args) if err == nil { if !status.Finished() { return diff --git a/tools/node.go b/tools/node.go index 5991843558..8a58271dcb 100644 --- a/tools/node.go +++ b/tools/node.go @@ -312,7 +312,7 @@ func WaitForResilvering(t *testing.T, bp api.BaseParams, target *meta.Snode) { } return true, true } - err := api.WaitForXactionNode(bp, args, allFinished) + err := api.WaitForXactionNode(bp, &args, allFinished) tassert.CheckFatal(t, err) } diff --git a/tools/tetl/etl.go b/tools/tetl/etl.go index 8fe27a5284..1b9ac46d8e 100644 --- a/tools/tetl/etl.go +++ b/tools/tetl/etl.go @@ -188,7 +188,7 @@ func WaitForContainersStopped(t *testing.T, bp api.BaseParams) { func WaitForAborted(bp api.BaseParams, xid, kind string, timeout time.Duration) error { tlog.Logf("Waiting for ETL x-%s[%s] to abort...\n", kind, xid) args := xact.ArgsMsg{ID: xid, Kind: kind, Timeout: timeout /* total timeout */} - status, err := api.WaitForXactionIC(bp, args) + status, err := api.WaitForXactionIC(bp, &args) if err == nil { if !status.Aborted() { err = fmt.Errorf("expected ETL x-%s[%s] status to indicate 'abort', got: %+v", kind, xid, status) @@ -196,7 +196,7 @@ func WaitForAborted(bp api.BaseParams, xid, kind string, timeout time.Duration) return err } tlog.Logf("Aborting ETL x-%s[%s]\n", kind, xid) - if abortErr := api.AbortXaction(bp, args); abortErr != nil { + if abortErr := api.AbortXaction(bp, &args); abortErr != nil { tlog.Logf("Nested error: failed to abort upon api.wait failure: %v\n", abortErr) } return err @@ -208,15 +208,15 @@ func WaitForFinished(bp api.BaseParams, xid, kind string, timeout time.Duration) tlog.Logf("Waiting for ETL x-%s[%s] to finish...\n", kind, xid) args := xact.ArgsMsg{ID: xid, Kind: kind, Timeout: timeout /* total timeout */} if xact.IdlesBeforeFinishing(kind) { - err = api.WaitForXactionIdle(bp, args) + err = api.WaitForXactionIdle(bp, &args) } else { - _, err = api.WaitForXactionIC(bp, args) + _, err = api.WaitForXactionIC(bp, &args) } if err == nil { return } tlog.Logf("Aborting ETL x-%s[%s]\n", kind, xid) - if abortErr := api.AbortXaction(bp, args); abortErr != nil { + if abortErr := api.AbortXaction(bp, &args); abortErr != nil { tlog.Logf("Nested error: failed to abort upon api.wait failure: %v\n", abortErr) } return err @@ -233,7 +233,7 @@ func ReportXactionStatus(bp api.BaseParams, xid string, stopCh *cos.StopCh, inte select { case <-etlTicker.C: // Check number of objects transformed. - xs, err := api.QueryXactionSnaps(bp, xact.ArgsMsg{ID: xid}) + xs, err := api.QueryXactionSnaps(bp, &xact.ArgsMsg{ID: xid}) if err != nil { tlog.Logf("Failed to get x-etl[%s] stats: %v\n", xid, err) continue