Skip to content

Commit

Permalink
(go api: (xaction)): ref
Browse files Browse the repository at this point in the history
* "huge param" part six

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Nov 25, 2023
1 parent b78e3e3 commit bf38f12
Show file tree
Hide file tree
Showing 39 changed files with 193 additions and 185 deletions.
10 changes: 5 additions & 5 deletions ais/test/archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,12 +351,12 @@ func testArch(t *testing.T, bck *meta.Bck) {
if test.abrt {
time.Sleep(time.Duration(rand.Intn(5)+1) * time.Second)
tlog.Logln("Aborting...")
api.AbortXaction(baseParams, flt)
api.AbortXaction(baseParams, &flt)
}

var lstToAppend *cmn.LsoResult
for ii := 0; ii < 2; ii++ {
api.WaitForXactionIdle(baseParams, flt)
api.WaitForXactionIdle(baseParams, &flt)

tlog.Logf("List %s\n", bckTo)
msg := &apc.LsoMsg{Prefix: "test_"}
Expand Down Expand Up @@ -410,7 +410,7 @@ func testArch(t *testing.T, bck *meta.Bck) {

time.Sleep(10 * time.Second)
flt := xact.ArgsMsg{Kind: apc.ActArchive, Bck: m.bck}
api.WaitForXactionIdle(baseParams, flt)
api.WaitForXactionIdle(baseParams, &flt)
}

var (
Expand Down Expand Up @@ -536,7 +536,7 @@ func TestAppendToArch(t *testing.T) {
}

wargs := xact.ArgsMsg{Kind: apc.ActArchive, Bck: m.bck}
api.WaitForXactionIdle(baseParams, wargs)
api.WaitForXactionIdle(baseParams, &wargs)

lsmsg := &apc.LsoMsg{Prefix: "test_lst"}
lsmsg.AddProps(apc.GetPropsName, apc.GetPropsSize)
Expand Down Expand Up @@ -590,7 +590,7 @@ func TestAppendToArch(t *testing.T) {
}
if test.multi {
wargs := xact.ArgsMsg{Kind: apc.ActArchive, Bck: m.bck}
api.WaitForXactionIdle(baseParams, wargs)
api.WaitForXactionIdle(baseParams, &wargs)
}

lsmsg.SetFlag(apc.LsArchDir)
Expand Down
46 changes: 23 additions & 23 deletions ais/test/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ func overwriteLomCache(mdwrite apc.WritePolicy, t *testing.T) {
}
// wait for pending writes (of the copies)
args := xact.ArgsMsg{Kind: apc.ActPutCopies, Bck: m.bck}
api.WaitForXactionIdle(baseParams, args)
api.WaitForXactionIdle(baseParams, &args)

tlog.Logf("List %s new versions\n", m.bck)
msg = &apc.LsoMsg{}
Expand Down Expand Up @@ -1827,7 +1827,7 @@ func testLocalMirror(t *testing.T, numCopies []int) {
// Even though the bucket is empty, it can take a short while until the
// xaction is propagated and finished.
reqArgs := xact.ArgsMsg{ID: xid, Kind: apc.ActMakeNCopies, Bck: m.bck, Timeout: xactTimeout}
_, err = api.WaitForXactionIC(baseParams, reqArgs)
_, err = api.WaitForXactionIC(baseParams, &reqArgs)
tassert.CheckFatal(t, err)
}

Expand All @@ -1843,7 +1843,7 @@ func testLocalMirror(t *testing.T, numCopies []int) {
baseParams := tools.BaseAPIParams(m.proxyURL)

xargs := xact.ArgsMsg{Kind: apc.ActPutCopies, Bck: m.bck, Timeout: xactTimeout}
_, _ = api.WaitForXactionIC(baseParams, xargs)
_, _ = api.WaitForXactionIC(baseParams, &xargs)

for _, copies := range numCopies {
makeNCopies(t, baseParams, m.bck, copies)
Expand All @@ -1862,11 +1862,11 @@ func makeNCopies(t *testing.T, baseParams api.BaseParams, bck cmn.Bck, ncopies i
tassert.CheckFatal(t, err)

args := xact.ArgsMsg{ID: xid, Kind: apc.ActMakeNCopies}
_, err = api.WaitForXactionIC(baseParams, args)
_, err = api.WaitForXactionIC(baseParams, &args)
tassert.CheckFatal(t, err)

args = xact.ArgsMsg{Kind: apc.ActPutCopies, Bck: bck}
api.WaitForXactionIdle(baseParams, args)
api.WaitForXactionIdle(baseParams, &args)
}

func TestRemoteBucketMirror(t *testing.T) {
Expand Down Expand Up @@ -1985,7 +1985,7 @@ func TestRenameBucketEmpty(t *testing.T) {
tassert.CheckFatal(t, err)

args := xact.ArgsMsg{ID: uuid, Kind: apc.ActMoveBck, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, args)
_, err = api.WaitForXactionIC(baseParams, &args)
tassert.CheckFatal(t, err)

// Check if the new bucket appears in the list
Expand Down Expand Up @@ -2050,7 +2050,7 @@ func TestRenameBucketNonEmpty(t *testing.T) {
tassert.CheckFatal(t, err)

args := xact.ArgsMsg{ID: xid, Kind: apc.ActMoveBck, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, args)
_, err = api.WaitForXactionIC(baseParams, &args)
tassert.CheckFatal(t, err)

// Gets on renamed ais bucket
Expand Down Expand Up @@ -2170,7 +2170,7 @@ func TestRenameBucketTwice(t *testing.T) {

// Wait for rename to complete
args := xact.ArgsMsg{ID: xid, Kind: apc.ActMoveBck, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, args)
_, err = api.WaitForXactionIC(baseParams, &args)
tassert.CheckFatal(t, err)

// Check if the new bucket appears in the list
Expand Down Expand Up @@ -2257,7 +2257,7 @@ func TestRenameBucketWithBackend(t *testing.T) {

tassert.CheckFatal(t, err)
xargs := xact.ArgsMsg{ID: xid}
_, err = api.WaitForXactionIC(baseParams, xargs)
_, err = api.WaitForXactionIC(baseParams, &xargs)
tassert.CheckFatal(t, err)

exists, err := api.QueryBuckets(baseParams, cmn.QueryBcks(bck), apc.FltPresent)
Expand Down Expand Up @@ -2490,11 +2490,11 @@ func TestCopyBucket(t *testing.T) {
if test.evictRemoteSrc {
// wait for TCO idle (different x-kind)
args := xact.ArgsMsg{ID: uuid, Timeout: tools.CopyBucketTimeout}
err := api.WaitForXactionIdle(baseParams, args)
err := api.WaitForXactionIdle(baseParams, &args)
tassert.CheckFatal(t, err)
} else {
args := xact.ArgsMsg{ID: uuid, Kind: apc.ActCopyBck, Timeout: tools.CopyBucketTimeout}
_, err := api.WaitForXactionIC(baseParams, args)
_, err := api.WaitForXactionIC(baseParams, &args)
tassert.CheckFatal(t, err)
}
}
Expand Down Expand Up @@ -2628,10 +2628,10 @@ func testCopyBucketStats(t *testing.T, srcBck cmn.Bck, m *ioContext) {
})

args := xact.ArgsMsg{ID: xid, Kind: apc.ActCopyBck, Timeout: time.Minute}
_, err = api.WaitForXactionIC(baseParams, args)
_, err = api.WaitForXactionIC(baseParams, &args)
tassert.CheckFatal(t, err)

snaps, err := api.QueryXactionSnaps(baseParams, xact.ArgsMsg{ID: xid})
snaps, err := api.QueryXactionSnaps(baseParams, &xact.ArgsMsg{ID: xid})
tassert.CheckFatal(t, err)
objs, outObjs, inObjs := snaps.ObjCounts(xid)
tassert.Errorf(t, objs == int64(m.num), "expected %d objects copied, got (objs=%d, outObjs=%d, inObjs=%d)",
Expand Down Expand Up @@ -2662,7 +2662,7 @@ func testCopyBucketPrepend(t *testing.T, srcBck cmn.Bck, m *ioContext) {

tlog.Logf("Wating for x-%s[%s] %s => %s\n", apc.ActCopyBck, xid, srcBck, dstBck)
args := xact.ArgsMsg{ID: xid, Kind: apc.ActCopyBck, Timeout: time.Minute}
_, err = api.WaitForXactionIC(baseParams, args)
_, err = api.WaitForXactionIC(baseParams, &args)
tassert.CheckFatal(t, err)

list, err := api.ListObjects(baseParams, dstBck, nil, api.ListArgs{})
Expand All @@ -2687,7 +2687,7 @@ func testCopyBucketPrefix(t *testing.T, srcBck cmn.Bck, m *ioContext, expected i

tlog.Logf("Wating for x-%s[%s] %s => %s\n", apc.ActCopyBck, xid, srcBck, dstBck)
args := xact.ArgsMsg{ID: xid, Kind: apc.ActCopyBck, Timeout: time.Minute}
_, err = api.WaitForXactionIC(baseParams, args)
_, err = api.WaitForXactionIC(baseParams, &args)
tassert.CheckFatal(t, err)

list, err := api.ListObjects(baseParams, dstBck, nil, api.ListArgs{})
Expand All @@ -2710,11 +2710,11 @@ func testCopyBucketAbort(t *testing.T, srcBck cmn.Bck, m *ioContext) {
time.Sleep(time.Second)

tlog.Logf("Aborting x-%s[%s]\n", apc.ActCopyBck, xid)
err = api.AbortXaction(baseParams, xact.ArgsMsg{ID: xid})
err = api.AbortXaction(baseParams, &xact.ArgsMsg{ID: xid})
tassert.CheckError(t, err)

time.Sleep(time.Second)
snaps, err := api.QueryXactionSnaps(baseParams, xact.ArgsMsg{ID: xid})
snaps, err := api.QueryXactionSnaps(baseParams, &xact.ArgsMsg{ID: xid})
tassert.CheckError(t, err)
aborted, err := snaps.IsAborted(xid)
tassert.CheckError(t, err)
Expand All @@ -2737,10 +2737,10 @@ func testCopyBucketDryRun(t *testing.T, srcBck cmn.Bck, m *ioContext) {

tlog.Logf("Wating for x-%s[%s]\n", apc.ActCopyBck, xid)
args := xact.ArgsMsg{ID: xid, Kind: apc.ActCopyBck, Timeout: time.Minute}
_, err = api.WaitForXactionIC(baseParams, args)
_, err = api.WaitForXactionIC(baseParams, &args)
tassert.CheckFatal(t, err)

snaps, err := api.QueryXactionSnaps(baseParams, xact.ArgsMsg{ID: xid})
snaps, err := api.QueryXactionSnaps(baseParams, &xact.ArgsMsg{ID: xid})
tassert.CheckFatal(t, err)

locObjs, outObjs, inObjs := snaps.ObjCounts(xid)
Expand Down Expand Up @@ -2808,7 +2808,7 @@ func TestRenameAndCopyBucket(t *testing.T) {
tlog.Logf("Waiting for x-%s[%s] to finish\n", apc.ActMoveBck, xid)
time.Sleep(2 * time.Second)
args := xact.ArgsMsg{ID: xid, Kind: apc.ActMoveBck, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, args)
_, err = api.WaitForXactionIC(baseParams, &args)
tassert.CheckFatal(t, err)

time.Sleep(time.Second)
Expand Down Expand Up @@ -2896,7 +2896,7 @@ func TestCopyAndRenameBucket(t *testing.T) {

// Wait for copy to complete
args := xact.ArgsMsg{ID: xid, Kind: apc.ActMoveBck, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, args)
_, err = api.WaitForXactionIC(baseParams, &args)
tassert.CheckFatal(t, err)

// Check if the new bucket appears in the list
Expand Down Expand Up @@ -3153,14 +3153,14 @@ func testWarmValidation(t *testing.T, cksumType string, mirrored, eced bool) {
// wait for mirroring
if mirrored {
args := xact.ArgsMsg{Kind: apc.ActPutCopies, Bck: m.bck, Timeout: xactTimeout}
api.WaitForXactionIdle(baseParams, args)
api.WaitForXactionIdle(baseParams, &args)
// NOTE: ref 1377
m.ensureNumCopies(baseParams, copyCnt, false /*greaterOk*/)
}
// wait for erasure-coding
if eced {
args := xact.ArgsMsg{Kind: apc.ActECPut, Bck: m.bck, Timeout: xactTimeout}
api.WaitForXactionIdle(baseParams, args)
api.WaitForXactionIdle(baseParams, &args)
}

// read all
Expand Down
6 changes: 3 additions & 3 deletions ais/test/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ func (m *ioContext) ensureNumCopies(baseParams api.BaseParams, expectedCopies in
m.t.Helper()
time.Sleep(time.Second)
xargs := xact.ArgsMsg{Kind: apc.ActMakeNCopies, Bck: m.bck, Timeout: tools.RebalanceTimeout}
_, err := api.WaitForXactionIC(baseParams, xargs)
_, err := api.WaitForXactionIC(baseParams, &xargs)
tassert.CheckFatal(m.t, err)

// List Bucket - primarily for the copies
Expand Down Expand Up @@ -694,7 +694,7 @@ func ensurePrevRebalanceIsFinished(baseParams api.BaseParams, err error) bool {
tlog.Logln("Warning: wait for unfinished rebalance(?)")
time.Sleep(5 * time.Second)
args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout}
_, _ = api.WaitForXactionIC(baseParams, args)
_, _ = api.WaitForXactionIC(baseParams, &args)
time.Sleep(5 * time.Second)
return true
}
Expand Down Expand Up @@ -727,7 +727,7 @@ func (m *ioContext) stopMaintenance(target *meta.Snode) string {
tassert.Fatalf(m.t, xact.IsValidRebID(rebID), "invalid reb ID %q", rebID)

xargs := xact.ArgsMsg{ID: rebID, Kind: apc.ActRebalance, Timeout: tools.RebalanceStartTimeout}
api.WaitForXactionNode(bp, xargs, xactSnapRunning)
api.WaitForXactionNode(bp, &xargs, xactSnapRunning)

return rebID
}
Expand Down
6 changes: 3 additions & 3 deletions ais/test/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func TestConfigSetGlobal(t *testing.T) {

// wait for ec
flt := xact.ArgsMsg{Kind: apc.ActECEncode}
_, _ = api.WaitForXactionIC(baseParams, flt)
_, _ = api.WaitForXactionIC(baseParams, &flt)
}

func TestConfigFailOverrideClusterOnly(t *testing.T) {
Expand All @@ -180,7 +180,7 @@ func TestConfigFailOverrideClusterOnly(t *testing.T) {

// wait for ec
flt := xact.ArgsMsg{Kind: apc.ActECEncode}
_, _ = api.WaitForXactionIC(baseParams, flt)
_, _ = api.WaitForXactionIC(baseParams, &flt)
}

func TestConfigOverrideAndRestart(t *testing.T) {
Expand Down Expand Up @@ -280,7 +280,7 @@ func TestConfigSyncToNewNode(t *testing.T) {

// wait for ec
flt := xact.ArgsMsg{Kind: apc.ActECEncode}
_, _ = api.WaitForXactionIC(baseParams, flt)
_, _ = api.WaitForXactionIC(baseParams, &flt)
}

func checkConfig(t *testing.T, smap *meta.Smap, check func(*meta.Snode, *cmn.Config)) {
Expand Down
4 changes: 2 additions & 2 deletions ais/test/cp_multiobj_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestCopyMultiObjSimple(t *testing.T) {
}

wargs := xact.ArgsMsg{ID: xid, Kind: apc.ActCopyObjects}
api.WaitForXactionIdle(baseParams, wargs)
api.WaitForXactionIdle(baseParams, &wargs)

tlog.Logln("prefix: test/")
msg := &apc.LsoMsg{Prefix: "test/"}
Expand Down Expand Up @@ -251,7 +251,7 @@ func testCopyMobj(t *testing.T, bck *meta.Bck) {
tassert.CheckFatal(t, erv.Load().(error))
}
wargs := xact.ArgsMsg{Kind: apc.ActCopyObjects, Bck: m.bck}
api.WaitForXactionIdle(baseParams, wargs)
api.WaitForXactionIdle(baseParams, &wargs)

msg := &apc.LsoMsg{Prefix: m.prefix}
msg.AddProps(apc.GetPropsName, apc.GetPropsSize)
Expand Down
4 changes: 2 additions & 2 deletions ais/test/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ func TestDownloadRemote(t *testing.T) {
xid, err := api.EvictList(baseParams, test.srcBck, expectedObjs)
tassert.CheckFatal(t, err)
args := xact.ArgsMsg{ID: xid, Kind: apc.ActEvictObjects, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, args)
_, err = api.WaitForXactionIC(baseParams, &args)
tassert.CheckFatal(t, err)

if test.dstBck.IsAIS() {
Expand Down Expand Up @@ -559,7 +559,7 @@ func TestDownloadRemote(t *testing.T) {
xid, err = api.EvictList(baseParams, test.srcBck, expectedObjs)
tassert.CheckFatal(t, err)
args = xact.ArgsMsg{ID: xid, Kind: apc.ActEvictObjects, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, args)
_, err = api.WaitForXactionIC(baseParams, &args)
if test.srcBck.Equal(&test.dstBck) {
tassert.CheckFatal(t, err)
} else {
Expand Down
4 changes: 2 additions & 2 deletions ais/test/dsort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1374,7 +1374,7 @@ func TestDsortAbort(t *testing.T) {

if asXaction {
tlog.Logf("aborting dsort[%s] via api.AbortXaction\n", df.managerUUID)
err = api.AbortXaction(df.baseParams, xact.ArgsMsg{ID: df.managerUUID})
err = api.AbortXaction(df.baseParams, &xact.ArgsMsg{ID: df.managerUUID})
} else {
tlog.Logf("aborting dsort[%s] via api.AbortDsort\n", df.managerUUID)
err = api.AbortDsort(df.baseParams, df.managerUUID)
Expand Down Expand Up @@ -1431,7 +1431,7 @@ func TestDsortAbortDuringPhases(t *testing.T) {
var err error
if asXaction {
tlog.Logf("aborting dsort[%s] via api.AbortXaction\n", df.managerUUID)
err = api.AbortXaction(df.baseParams, xact.ArgsMsg{ID: df.managerUUID})
err = api.AbortXaction(df.baseParams, &xact.ArgsMsg{ID: df.managerUUID})
} else {
tlog.Logf("aborting dsort[%s] via api.AbortDsort\n", df.managerUUID)
err = api.AbortDsort(df.baseParams, df.managerUUID)
Expand Down
Loading

0 comments on commit bf38f12

Please sign in to comment.