From 8c7e0b6dda2f9c846d011169bf52f718e7a9a3d2 Mon Sep 17 00:00:00 2001 From: Artur Troian Date: Wed, 20 Dec 2023 10:48:19 -0800 Subject: [PATCH] feat: node client discovery refs akash-network/support#165 Signed-off-by: Artur Troian --- balance_checker.go | 14 +- bidengine/order.go | 9 +- bidengine/order_test.go | 107 ++--- bidengine/provider_attributes_test.go | 2 +- client/broadcaster/serial.go | 381 ------------------ client/client.go | 57 +++ cluster/monitor.go | 9 +- cmd/provider-services/cmd/helpers.go | 8 +- cmd/provider-services/cmd/leaseEvents.go | 15 +- cmd/provider-services/cmd/leaseLogs.go | 15 +- cmd/provider-services/cmd/leaseStatus.go | 11 +- cmd/provider-services/cmd/manifest.go | 13 +- .../cmd/migrate_endpoints.go | 11 +- .../cmd/migrate_hostnames.go | 11 +- cmd/provider-services/cmd/run.go | 69 ++-- cmd/provider-services/cmd/serviceStatus.go | 11 +- cmd/provider-services/cmd/shell.go | 17 +- cmd/provider-services/cmd/status.go | 11 +- gateway/rest/client.go | 18 +- gateway/rest/integration_test.go | 2 +- gateway/rest/router_test.go | 2 +- go.mod | 11 +- go.sum | 19 +- manifest/watchdog.go | 8 +- manifest/watchdog_test.go | 31 +- service.go | 11 +- session/session.go | 2 +- testutil/rest/restserver.go | 4 +- 28 files changed, 311 insertions(+), 568 deletions(-) delete mode 100644 client/broadcaster/serial.go create mode 100644 client/client.go diff --git a/balance_checker.go b/balance_checker.go index de910dc2..51a45ece 100644 --- a/balance_checker.go +++ b/balance_checker.go @@ -11,11 +11,10 @@ import ( "github.com/tendermint/tendermint/libs/log" tmrpc "github.com/tendermint/tendermint/rpc/core/types" + aclient "github.com/akash-network/akash-api/go/node/client/v1beta2" dtypes "github.com/akash-network/akash-api/go/node/deployment/v1beta3" mtypes "github.com/akash-network/akash-api/go/node/market/v1beta4" - aclient "github.com/akash-network/node/client" - "github.com/akash-network/node/pubsub" netutil "github.com/akash-network/node/util/network" "github.com/akash-network/node/util/runner" @@ -124,7 +123,7 @@ func (bc *balanceChecker) doEscrowCheck(ctx context.Context, lid mtypes.LeaseID, } var syncInfo *tmrpc.SyncInfo - syncInfo, resp.err = bc.session.Client().NodeSyncInfo(ctx) + syncInfo, resp.err = bc.session.Client().Node().SyncInfo(ctx) if resp.err != nil { return resp } @@ -187,14 +186,15 @@ func (bc *balanceChecker) doEscrowCheck(ctx context.Context, lid mtypes.LeaseID, } func (bc *balanceChecker) startWithdraw(ctx context.Context, lid mtypes.LeaseID) error { + ctx, cancel := context.WithTimeout(ctx, withdrawTimeout) + defer cancel() + msg := &mtypes.MsgWithdrawLease{ LeaseID: lid, } - ctx, cancel := context.WithTimeout(ctx, withdrawTimeout) - defer cancel() - - return bc.session.Client().Tx().Broadcast(ctx, msg) + _, err := bc.session.Client().Tx().Broadcast(ctx, []sdk.Msg{msg}, aclient.WithResultCodeAsError()) + return err } func (bc *balanceChecker) run(startCh chan<- error) { diff --git a/bidengine/order.go b/bidengine/order.go index cdafbf78..70411bbb 100644 --- a/bidengine/order.go +++ b/bidengine/order.go @@ -6,6 +6,7 @@ import ( "regexp" "time" + aclient "github.com/akash-network/akash-api/go/node/client/v1beta2" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -404,7 +405,7 @@ loop: // Begin submitting fulfillment msg = mtypes.NewMsgCreateBid(o.orderID, o.session.Provider().Address(), price, o.cfg.Deposit, offer) bidch = runner.Do(func() runner.Result { - return runner.NewResult(nil, o.session.Client().Tx().Broadcast(ctx, msg)) + return runner.NewResult(o.session.Client().Tx().Broadcast(ctx, []sdk.Msg{msg}, aclient.WithResultCodeAsError())) }) case result := <-bidch: @@ -456,9 +457,11 @@ loop: if bidPlaced { o.log.Debug("closing bid", "order-id", o.orderID) - err := o.session.Client().Tx().Broadcast(ctx, &mtypes.MsgCloseBid{ + msg := &mtypes.MsgCloseBid{ BidID: mtypes.MakeBidID(o.orderID, o.session.Provider().Address()), - }) + } + + _, err := o.session.Client().Tx().Broadcast(ctx, []sdk.Msg{msg}, aclient.WithResultCodeAsError()) if err != nil { o.log.Error("closing bid", "err", err) bidCounter.WithLabelValues("close", metricsutils.FailLabel).Inc() diff --git a/bidengine/order_test.go b/bidengine/order_test.go index c339b8bd..b329f6f0 100644 --- a/bidengine/order_test.go +++ b/bidengine/order_test.go @@ -18,12 +18,12 @@ import ( "github.com/stretchr/testify/require" audittypes "github.com/akash-network/akash-api/go/node/audit/v1beta3" + clientmocks "github.com/akash-network/akash-api/go/node/client/v1beta2/mocks" dtypes "github.com/akash-network/akash-api/go/node/deployment/v1beta3" mtypes "github.com/akash-network/akash-api/go/node/market/v1beta4" ptypes "github.com/akash-network/akash-api/go/node/provider/v1beta3" atypes "github.com/akash-network/akash-api/go/node/types/v1beta3" - broadcastmocks "github.com/akash-network/node/client/broadcaster/mocks" - clientmocks "github.com/akash-network/node/client/mocks" + "github.com/akash-network/node/pubsub" "github.com/akash-network/node/testutil" @@ -32,19 +32,17 @@ import ( ) type orderTestScaffold struct { - orderID mtypes.OrderID - groupID dtypes.GroupID - testBus pubsub.Bus - testAddr sdk.AccAddress - deploymentID dtypes.DeploymentID - bidID *mtypes.BidID - - queryClient *clientmocks.QueryClient - client *clientmocks.Client - txClient *broadcastmocks.Client - + orderID mtypes.OrderID + groupID dtypes.GroupID + testBus pubsub.Bus + testAddr sdk.AccAddress + deploymentID dtypes.DeploymentID + bidID *mtypes.BidID + client *clientmocks.Client + queryClient *clientmocks.QueryClient + txClient *clientmocks.TxClient cluster *clustermocks.Cluster - broadcasts chan sdk.Msg + broadcasts chan []sdk.Msg reserveCallNotify chan int } @@ -98,11 +96,12 @@ func makeMocks(s *orderTestScaffold) { queryClientMock.On("Orders", mock.Anything, mock.Anything).Return(&mtypes.QueryOrdersResponse{}, nil) queryClientMock.On("Provider", mock.Anything, mock.Anything).Return(&ptypes.QueryProviderResponse{}, nil) - txMocks := &broadcastmocks.Client{} - s.broadcasts = make(chan sdk.Msg, 1) - txMocks.On("Broadcast", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { - s.broadcasts <- args.Get(1).(sdk.Msg) - }).Return(nil) + txMocks := &clientmocks.TxClient{} + s.broadcasts = make(chan []sdk.Msg, 1) + + txMocks.On("Broadcast", mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + s.broadcasts <- args.Get(1).([]sdk.Msg) + }).Return(&sdk.Result{}, nil) clientMocks := &clientmocks.Client{} clientMocks.On("Query").Return(queryClientMock) @@ -214,6 +213,18 @@ func makeOrderForTest( return order, scaffold, reservationFulfilledNotify } +func requireMsgType[T any](t *testing.T, res interface{}) T { + t.Helper() + + require.IsType(t, []sdk.Msg{}, res) + + msgs := res.([]sdk.Msg) + require.Len(t, msgs, 1) + require.IsType(t, *new(T), msgs[0]) + + return msgs[0].(T) +} + func Test_BidOrderAndUnreserve(t *testing.T) { order, scaffold, _ := makeOrderForTest(t, false, mtypes.BidStateInvalid, nil, nil, testBidCreatedAt) @@ -221,9 +232,7 @@ func Test_BidOrderAndUnreserve(t *testing.T) { // Should have called reserve once scaffold.cluster.AssertCalled(t, "Reserve", scaffold.orderID, mock.Anything) - require.IsType(t, &mtypes.MsgCreateBid{}, broadcast) - - createBidMsg := broadcast.(*mtypes.MsgCreateBid) + createBidMsg := requireMsgType[*mtypes.MsgCreateBid](t, broadcast) require.Equal(t, createBidMsg.Order, scaffold.orderID) @@ -251,9 +260,8 @@ func Test_BidOrderAndUnreserveOnTimeout(t *testing.T) { // Should have called reserve once scaffold.cluster.AssertCalled(t, "Reserve", scaffold.orderID, mock.Anything) - require.IsType(t, &mtypes.MsgCreateBid{}, broadcast) + createBidMsg := requireMsgType[*mtypes.MsgCreateBid](t, broadcast) - createBidMsg := broadcast.(*mtypes.MsgCreateBid) require.Equal(t, createBidMsg.Order, scaffold.orderID) priceDenom := createBidMsg.Price.Denom @@ -266,7 +274,8 @@ func Test_BidOrderAndUnreserveOnTimeout(t *testing.T) { // After the broadcast call the timeout should take effect // and then close the bid, unreserving capacity in the process broadcast = testutil.ChannelWaitForValue(t, scaffold.broadcasts) - require.IsType(t, &mtypes.MsgCloseBid{}, broadcast) + + _ = requireMsgType[*mtypes.MsgCloseBid](t, broadcast) // After the broadcast call shut down happens automatically order.lc.Shutdown(nil) @@ -371,9 +380,8 @@ func Test_BidOrderAndThenLeaseCreated(t *testing.T) { // Wait for first broadcast broadcast := testutil.ChannelWaitForValue(t, scaffold.broadcasts) - require.IsType(t, &mtypes.MsgCreateBid{}, broadcast) + createBidMsg := requireMsgType[*mtypes.MsgCreateBid](t, broadcast) - createBidMsg := broadcast.(*mtypes.MsgCreateBid) require.Equal(t, createBidMsg.Order, scaffold.orderID) priceDenom := createBidMsg.Price.Denom require.Equal(t, testutil.CoinDenom, priceDenom) @@ -414,9 +422,9 @@ func Test_BidOrderAndThenLeaseCreatedForDifferentDeployment(t *testing.T) { // Should have called reserve once scaffold.cluster.AssertCalled(t, "Reserve", scaffold.orderID, mock.Anything) - require.IsType(t, &mtypes.MsgCreateBid{}, broadcast) - createBidMsg := broadcast.(*mtypes.MsgCreateBid) + createBidMsg := requireMsgType[*mtypes.MsgCreateBid](t, broadcast) + require.Equal(t, createBidMsg.Order, scaffold.orderID) otherOrderID := scaffold.orderID @@ -451,9 +459,8 @@ func Test_BidOrderAndThenLeaseCreatedForDifferentDeployment(t *testing.T) { require.NotEqual(t, 0, len(txCalls)) lastBroadcast := txCalls[len(txCalls)-1] require.Equal(t, "Broadcast", lastBroadcast.Method) - msg := lastBroadcast.Arguments[1] - require.IsType(t, &mtypes.MsgCloseBid{}, msg) - closeBidMsg := msg.(*mtypes.MsgCloseBid) + + closeBidMsg := requireMsgType[*mtypes.MsgCloseBid](t, lastBroadcast.Arguments[1]) expectedBidID := mtypes.MakeBidID(order.orderID, scaffold.testAddr) require.Equal(t, closeBidMsg.BidID, expectedBidID) @@ -504,15 +511,13 @@ func Test_ShouldNotBidWhenAlreadySet(t *testing.T) { // Should have called unreserve during shutdown scaffold.cluster.AssertCalled(t, "Unreserve", scaffold.orderID, mock.Anything) - var broadcast sdk.Msg + var broadcast []sdk.Msg select { case broadcast = <-scaffold.broadcasts: default: } - // Should have broadcast - require.IsType(t, &mtypes.MsgCloseBid{}, broadcast) - closeBid := broadcast.(*mtypes.MsgCloseBid) + closeBid := requireMsgType[*mtypes.MsgCloseBid](t, broadcast) require.Equal(t, closeBid.BidID, *scaffold.bidID) } @@ -535,9 +540,11 @@ func Test_ShouldCloseBidWhenAlreadySetAndOld(t *testing.T) { scaffold.cluster.AssertNotCalled(t, "Reserve", scaffold.orderID, mock.Anything) // Should have closed the bid - scaffold.txClient.AssertCalled(t, "Broadcast", mock.Anything, &mtypes.MsgCloseBid{ + expMsgs := []sdk.Msg{&mtypes.MsgCloseBid{ BidID: mtypes.MakeBidID(order.orderID, scaffold.testAddr), - }) + }} + + scaffold.txClient.AssertCalled(t, "Broadcast", mock.Anything, expMsgs, mock.Anything) } func Test_ShouldExitWhenAlreadySetAndLost(t *testing.T) { @@ -558,9 +565,11 @@ func Test_ShouldExitWhenAlreadySetAndLost(t *testing.T) { scaffold.cluster.AssertNotCalled(t, "Reserve", scaffold.orderID, mock.Anything) // Should not have closed the bid - scaffold.txClient.AssertNotCalled(t, "Broadcast", mock.Anything, &mtypes.MsgCloseBid{ + expMsgs := &mtypes.MsgCloseBid{ BidID: mtypes.MakeBidID(order.orderID, scaffold.testAddr), - }) + } + + scaffold.txClient.AssertNotCalled(t, "Broadcast", mock.Anything, expMsgs, mock.Anything) } func Test_ShouldCloseBidWhenAlreadySetAndThenTimeout(t *testing.T) { pricing, err := MakeRandomRangePricing() @@ -580,9 +589,12 @@ func Test_ShouldCloseBidWhenAlreadySetAndThenTimeout(t *testing.T) { scaffold.cluster.AssertCalled(t, "Reserve", scaffold.orderID, mock.Anything) // Should have closed the bid - scaffold.txClient.AssertCalled(t, "Broadcast", mock.Anything, &mtypes.MsgCloseBid{ - BidID: mtypes.MakeBidID(order.orderID, scaffold.testAddr), - }) + expMsgs := []sdk.Msg{ + &mtypes.MsgCloseBid{ + BidID: mtypes.MakeBidID(order.orderID, scaffold.testAddr), + }, + } + scaffold.txClient.AssertCalled(t, "Broadcast", mock.Anything, expMsgs, mock.Anything) // Should have called unreserve scaffold.cluster.AssertCalled(t, "Unreserve", scaffold.orderID) @@ -617,7 +629,7 @@ func Test_ShouldRecognizeLeaseCreatedIfBiddingIsSkipped(t *testing.T) { // Should not have called unreserve during shutdown scaffold.cluster.AssertNotCalled(t, "Unreserve", mock.Anything, mock.Anything) - var broadcast sdk.Msg + var broadcast []sdk.Msg select { case broadcast = <-scaffold.broadcasts: @@ -638,9 +650,8 @@ func Test_BidOrderUsesBidPricingStrategy(t *testing.T) { order, scaffold, _ := makeOrderForTest(t, false, mtypes.BidStateInvalid, pricing, nil, testBidCreatedAt) broadcast := testutil.ChannelWaitForValue(t, scaffold.broadcasts) - require.IsType(t, &mtypes.MsgCreateBid{}, broadcast) + createBidMsg := requireMsgType[*mtypes.MsgCreateBid](t, broadcast) - createBidMsg := broadcast.(*mtypes.MsgCreateBid) require.Equal(t, createBidMsg.Order, scaffold.orderID) priceDenom := createBidMsg.Price.Denom @@ -673,7 +684,7 @@ func Test_BidOrderFailsAndAborts(t *testing.T) { // Should have called reserve once scaffold.cluster.AssertCalled(t, "Reserve", scaffold.orderID, mock.Anything) - var broadcast sdk.Msg + var broadcast []sdk.Msg select { case broadcast = <-scaffold.broadcasts: @@ -701,7 +712,7 @@ func Test_ShouldntBidIfOrderAttrsDontMatch(t *testing.T) { // Should not have called reserve ever scaffold.cluster.AssertNotCalled(t, "Reserve", scaffold.orderID, mock.Anything) - var broadcast sdk.Msg + var broadcast []sdk.Msg select { case broadcast = <-scaffold.broadcasts: diff --git a/bidengine/provider_attributes_test.go b/bidengine/provider_attributes_test.go index 187dd44e..916bf89e 100644 --- a/bidengine/provider_attributes_test.go +++ b/bidengine/provider_attributes_test.go @@ -14,7 +14,7 @@ import ( ptypes "github.com/akash-network/akash-api/go/node/provider/v1beta3" akashtypes "github.com/akash-network/akash-api/go/node/types/v1beta3" - clientmocks "github.com/akash-network/node/client/mocks" + clientmocks "github.com/akash-network/akash-api/go/node/client/v1beta2/mocks" "github.com/akash-network/node/pubsub" "github.com/akash-network/node/testutil" diff --git a/client/broadcaster/serial.go b/client/broadcaster/serial.go deleted file mode 100644 index 52ef3fae..00000000 --- a/client/broadcaster/serial.go +++ /dev/null @@ -1,381 +0,0 @@ -package broadcaster - -import ( - "context" - "encoding/hex" - "errors" - "strings" - "time" - "unsafe" - - "github.com/boz/go-lifecycle" - sdkclient "github.com/cosmos/cosmos-sdk/client" - "github.com/cosmos/cosmos-sdk/client/tx" - "github.com/cosmos/cosmos-sdk/crypto/keyring" - sdk "github.com/cosmos/cosmos-sdk/types" - sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" - authtx "github.com/cosmos/cosmos-sdk/x/auth/tx" - "github.com/tendermint/tendermint/libs/log" - ttypes "github.com/tendermint/tendermint/types" - - mtypes "github.com/akash-network/akash-api/go/node/market/v1beta4" - abroadcaster "github.com/akash-network/node/client/broadcaster" -) - -const ( - broadcastBlockRetryPeriod = time.Second - sequenceSyncTimeout = 30 * time.Second -) - -var ( - ErrNotRunning = errors.New("not running") - ErrSyncTimedOut = errors.New("serial broadcast: timed-out waiting for sequence sync") - - // sadface. - // Only way to detect the timeout error. - // https://github.com/tendermint/tendermint/blob/46e06c97320bc61c4d98d3018f59d47ec69863c9/rpc/core/mempool.go#L124 - timeoutErrorMessage = "timed out waiting for tx to be included in a block" - - // Only way to check for tx not found error. - // https://github.com/tendermint/tendermint/blob/46e06c97320bc61c4d98d3018f59d47ec69863c9/rpc/core/tx.go#L31-L33 - notFoundErrorMessageSuffix = ") not found" -) - -type SerialClient interface { - abroadcaster.Client - Close() -} - -type broadcastRequest struct { - id uintptr - responsech chan<- error - msgs []sdk.Msg -} - -type seqreq struct { - curr uint64 - ch chan<- uint64 -} - -type broadcast struct { - donech chan<- error - respch chan<- error - msgs []sdk.Msg -} - -type serialBroadcaster struct { - ctx context.Context - cctx sdkclient.Context - info keyring.Info - broadcastTimeout time.Duration - reqch chan broadcastRequest - broadcastch chan broadcast - seqreqch chan seqreq - lc lifecycle.Lifecycle - log log.Logger -} - -func NewSerialClient(ctx context.Context, log log.Logger, cctx sdkclient.Context, timeout time.Duration, txf tx.Factory, info keyring.Info) (SerialClient, error) { - // populate account number, current sequence number - poptxf, err := abroadcaster.PrepareFactory(cctx, txf) - if err != nil { - return nil, err - } - - poptxf = poptxf.WithSimulateAndExecute(true) - client := &serialBroadcaster{ - ctx: ctx, - cctx: cctx, - info: info, - broadcastTimeout: timeout, - lc: lifecycle.New(), - reqch: make(chan broadcastRequest, 1), - broadcastch: make(chan broadcast, 1), - seqreqch: make(chan seqreq), - log: log.With("cmp", "client/broadcaster"), - } - - go client.lc.WatchContext(ctx) - go client.run() - go client.broadcaster(poptxf) - - return client, nil -} - -func (c *serialBroadcaster) Close() { - c.lc.Shutdown(nil) -} - -func (c *serialBroadcaster) Broadcast(ctx context.Context, msgs ...sdk.Msg) error { - responsech := make(chan error, 1) - request := broadcastRequest{ - responsech: responsech, - msgs: msgs, - } - - request.id = uintptr(unsafe.Pointer(&request)) - - select { - case c.reqch <- request: - case <-ctx.Done(): - return ctx.Err() - case <-c.lc.ShuttingDown(): - return ErrNotRunning - } - - select { - case err := <-responsech: - return err - case <-ctx.Done(): - return ctx.Err() - case <-c.lc.ShuttingDown(): - return ErrNotRunning - } -} - -func (c *serialBroadcaster) run() { - defer c.lc.ShutdownCompleted() - - donech := make(chan struct{}) - - go func() { - defer close(donech) - c.sequenceSync() - }() - - defer func() { <-donech }() - - var pending []broadcastRequest - var pendingBids []broadcastRequest - - // var broadcastDoneCh chan error - var broadcastDoneCh chan error - - signalCh := make(chan struct{}, 1) - signal := signalCh - - trySignal := func() { - if (len(pendingBids) == 0) && (len(pending) == 0) { - return - } - - select { - case signal <- struct{}{}: - default: - } - } - -loop: - for { - select { - case err := <-c.lc.ShutdownRequest(): - c.lc.ShutdownInitiated(err) - break loop - case req := <-c.reqch: - if _, ok := req.msgs[0].(*mtypes.MsgCreateBid); ok { - pendingBids = append(pendingBids, req) - } else { - pending = append(pending, req) - } - - trySignal() - case <-signal: - signal = nil - - var req broadcastRequest - - if len(pendingBids) > 0 { - req, pendingBids = pendingBids[len(pendingBids)-1], pendingBids[:len(pendingBids)-1] - } else { - req, pending = pending[len(pending)-1], pending[:len(pending)-1] - } - - broadcastDoneCh = make(chan error, 1) - c.broadcastch <- broadcast{ - donech: broadcastDoneCh, - respch: req.responsech, - msgs: req.msgs, - } - case err := <-broadcastDoneCh: - broadcastDoneCh = nil - signal = signalCh - - if err != nil { - c.log.Error("unable to broadcast messages", "error", err) - } - trySignal() - } - } -} - -func (c *serialBroadcaster) broadcaster(txf tx.Factory) { - for { - select { - case <-c.lc.ShuttingDown(): - return - case req := <-c.broadcastch: - // broadcast the messages - var err error - txf, err = c.broadcast(txf, false, req.msgs...) - // send response to the broadcast caller - req.respch <- err - - if err != nil { - c.log.Error("transaction broadcast failed", "err", err) - - if _, valid := err.(sdkerrors.Error); valid { - // attempt to sync account sequence - rSeq, err := c.syncAccountSequence(txf.Sequence()) - if err == nil { - txf = txf.WithSequence(rSeq + 1) - } else { - c.log.Error("failed to sync account sequence number", "err", err) - } - } - } - - req.donech <- err - } - } -} - -func (c *serialBroadcaster) sequenceSync() { - for { - select { - case <-c.lc.ShuttingDown(): - return - case req := <-c.seqreqch: - // query sequence number - _, seq, err := c.cctx.AccountRetriever.GetAccountNumberSequence(c.cctx, c.info.GetAddress()) - - if err != nil { - c.log.Error("error requesting account", "err", err) - seq = req.curr - } - - select { - case req.ch <- seq: - case <-c.lc.ShuttingDown(): - } - } - } -} - -func (c *serialBroadcaster) broadcast(txf tx.Factory, retry bool, msgs ...sdk.Msg) (tx.Factory, error) { - var err error - - if !retry { - txf, err = abroadcaster.AdjustGas(c.cctx, txf, msgs...) - if err != nil { - return txf, err - } - } - - response, err := c.doBroadcast(c.cctx, txf, c.broadcastTimeout, c.info.GetName(), msgs...) - if err != nil { - return txf, err - } - - if response.Code == 0 { - txf = txf.WithSequence(txf.Sequence() + 1) - return txf, nil - } - - if response.Code != sdkerrors.ErrWrongSequence.ABCICode() || retry { - return txf, sdkerrors.ABCIError(response.Codespace, response.Code, response.RawLog) - } - - // transaction has failed, perform the query of account sequence to make sure correct one is used - // for the next transaction - - c.log.Info("account sequence mismatch. querying current value") - rSeq, err := c.syncAccountSequence(txf.Sequence()) - if err != nil { - return txf, err - } - - txf.WithSequence(rSeq + 1) - - return c.broadcast(txf, retry, msgs...) -} - -func (c *serialBroadcaster) syncAccountSequence(lSeq uint64) (uint64, error) { - ch := make(chan uint64, 1) - - c.seqreqch <- seqreq{ - curr: lSeq, - ch: ch, - } - - ctx, cancel := context.WithTimeout(c.ctx, sequenceSyncTimeout) - defer cancel() - - select { - case rSeq := <-ch: - return rSeq, nil - case <-ctx.Done(): - return lSeq, ErrSyncTimedOut - case <-c.lc.ShuttingDown(): - return lSeq, ErrNotRunning - } -} - -func (c *serialBroadcaster) doBroadcast(cctx sdkclient.Context, txf tx.Factory, timeout time.Duration, keyName string, msgs ...sdk.Msg) (*sdk.TxResponse, error) { - txn, err := tx.BuildUnsignedTx(txf, msgs...) - if err != nil { - return nil, err - } - - txn.SetFeeGranter(cctx.GetFeeGranterAddress()) - err = tx.Sign(txf, keyName, txn, true) - if err != nil { - return nil, err - } - - bytes, err := cctx.TxConfig.TxEncoder()(txn.GetTx()) - if err != nil { - return nil, err - } - - txb := ttypes.Tx(bytes) - hash := hex.EncodeToString(txb.Hash()) - - // broadcast-mode=block - // submit with mode commit/block - cres, err := cctx.BroadcastTxCommit(txb) - if err == nil { - // good job - return cres, nil - } else if !strings.HasSuffix(err.Error(), timeoutErrorMessage) { - return cres, err - } - - // timeout error, continue on to retry - - // loop - lctx, cancel := context.WithTimeout(c.ctx, timeout) - defer cancel() - - for lctx.Err() == nil { - // wait up to one second - select { - case <-lctx.Done(): - return cres, err - case <-time.After(broadcastBlockRetryPeriod): - } - - // check transaction - // https://github.com/cosmos/cosmos-sdk/pull/8734 - res, err := authtx.QueryTx(cctx, hash) - if err == nil { - return res, nil - } - - // if it's not a "not found" error, return - if !strings.HasSuffix(err.Error(), notFoundErrorMessageSuffix) { - return res, err - } - } - - return cres, lctx.Err() -} diff --git a/client/client.go b/client/client.go new file mode 100644 index 00000000..bd6b54f7 --- /dev/null +++ b/client/client.go @@ -0,0 +1,57 @@ +package client + +import ( + "errors" + "fmt" + "reflect" + + "github.com/spf13/pflag" + "golang.org/x/net/context" + + sdkclient "github.com/cosmos/cosmos-sdk/client" + + aclient "github.com/akash-network/akash-api/go/node/client" + "github.com/akash-network/akash-api/go/node/client/v1beta2" +) + +var ( + ErrInvalidClient = errors.New("invalid client") +) + +func DiscoverQueryClient(ctx context.Context, cctx sdkclient.Context) (v1beta2.QueryClient, error) { + var cl v1beta2.QueryClient + err := aclient.DiscoverQueryClient(ctx, cctx, func(i interface{}) error { + var valid bool + + if cl, valid = i.(v1beta2.QueryClient); !valid { + return fmt.Errorf("%w: expected %s, actual %s", ErrInvalidClient, reflect.TypeOf(cl), reflect.TypeOf(i)) + } + + return nil + }) + + if err != nil { + return nil, err + } + + return cl, nil +} + +func DiscoverClient(ctx context.Context, cctx sdkclient.Context, flags *pflag.FlagSet) (v1beta2.Client, error) { + var cl v1beta2.Client + err := aclient.DiscoverClient(ctx, cctx, flags, func(i interface{}) error { + var valid bool + + if cl, valid = i.(v1beta2.Client); !valid { + return fmt.Errorf("%w: expected %s, actual %s", ErrInvalidClient, reflect.TypeOf(cl), reflect.TypeOf(i)) + } + + return nil + }) + + if err != nil { + return nil, err + } + + return cl, nil +} diff --git a/cluster/monitor.go b/cluster/monitor.go index 745a4ee1..bd034e43 100644 --- a/cluster/monitor.go +++ b/cluster/monitor.go @@ -5,6 +5,8 @@ import ( "math/rand" "time" + aclient "github.com/akash-network/akash-api/go/node/client/v1beta2" + sdk "github.com/cosmos/cosmos-sdk/types" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -200,15 +202,16 @@ func (m *deploymentMonitor) doCheck(ctx context.Context) (bool, error) { func (m *deploymentMonitor) runCloseLease(ctx context.Context) <-chan runner.Result { return runner.Do(func() runner.Result { // TODO: retry, timeout - err := m.session.Client().Tx().Broadcast(ctx, &mtypes.MsgCloseBid{ + msg := &mtypes.MsgCloseBid{ BidID: m.deployment.LeaseID().BidID(), - }) + } + res, err := m.session.Client().Tx().Broadcast(ctx, []sdk.Msg{msg}, aclient.WithResultCodeAsError()) if err != nil { m.log.Error("closing deployment", "err", err) } else { m.log.Info("bidding on lease closed") } - return runner.NewResult(nil, err) + return runner.NewResult(res, err) }) } diff --git a/cmd/provider-services/cmd/helpers.go b/cmd/provider-services/cmd/helpers.go index de1e9094..173ec0ba 100644 --- a/cmd/provider-services/cmd/helpers.go +++ b/cmd/provider-services/cmd/helpers.go @@ -7,7 +7,7 @@ import ( "fmt" "net/url" - "github.com/cosmos/cosmos-sdk/client" + aclient "github.com/akash-network/akash-api/go/node/client/v1beta2" "github.com/cosmos/cosmos-sdk/client/flags" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/spf13/cobra" @@ -16,7 +16,6 @@ import ( dtypes "github.com/akash-network/akash-api/go/node/deployment/v1beta3" mtypes "github.com/akash-network/akash-api/go/node/market/v1beta4" "github.com/akash-network/node/app" - akashclient "github.com/akash-network/node/client" ) const ( @@ -94,7 +93,7 @@ func providerFromFlags(flags *pflag.FlagSet) (sdk.Address, error) { return addr, nil } -func leasesForDeployment(ctx context.Context, cctx client.Context, flags *pflag.FlagSet, did dtypes.DeploymentID) ([]mtypes.LeaseID, error) { +func leasesForDeployment(ctx context.Context, cl aclient.QueryClient, flags *pflag.FlagSet, did dtypes.DeploymentID) ([]mtypes.LeaseID, error) { filter := mtypes.LeaseFilters{ Owner: did.Owner, DSeq: did.DSeq, @@ -118,8 +117,7 @@ func leasesForDeployment(ctx context.Context, cctx client.Context, flags *pflag. filter.OSeq = val } - cclient := akashclient.NewQueryClientFromCtx(cctx) - resp, err := cclient.Leases(ctx, &mtypes.QueryLeasesRequest{ + resp, err := cl.Leases(ctx, &mtypes.QueryLeasesRequest{ Filters: filter, }) if err != nil { diff --git a/cmd/provider-services/cmd/leaseEvents.go b/cmd/provider-services/cmd/leaseEvents.go index 7235e5fb..810ed781 100644 --- a/cmd/provider-services/cmd/leaseEvents.go +++ b/cmd/provider-services/cmd/leaseEvents.go @@ -10,10 +10,10 @@ import ( dtypes "github.com/akash-network/akash-api/go/node/deployment/v1beta3" mtypes "github.com/akash-network/akash-api/go/node/market/v1beta4" - akashclient "github.com/akash-network/node/client" cmdcommon "github.com/akash-network/node/cmd/common" cutils "github.com/akash-network/node/x/cert/utils" + aclient "github.com/akash-network/provider/client" cltypes "github.com/akash-network/provider/cluster/types/v1beta3" gwrest "github.com/akash-network/provider/gateway/rest" ) @@ -42,6 +42,13 @@ func doLeaseEvents(cmd *cobra.Command) error { return err } + ctx := cmd.Context() + + cl, err := aclient.DiscoverQueryClient(ctx, cctx) + if err != nil { + return err + } + cert, err := cutils.LoadAndQueryCertificateForAccount(cmd.Context(), cctx, nil) if err != nil { return markRPCServerError(err) @@ -52,7 +59,7 @@ func doLeaseEvents(cmd *cobra.Command) error { return err } - leases, err := leasesForDeployment(cmd.Context(), cctx, cmd.Flags(), dtypes.DeploymentID{ + leases, err := leasesForDeployment(cmd.Context(), cl, cmd.Flags(), dtypes.DeploymentID{ Owner: cctx.GetFromAddress().String(), DSeq: dseq, }) @@ -70,8 +77,6 @@ func doLeaseEvents(cmd *cobra.Command) error { return err } - ctx := cmd.Context() - type result struct { lid mtypes.LeaseID error error @@ -83,7 +88,7 @@ func doLeaseEvents(cmd *cobra.Command) error { for _, lid := range leases { stream := result{lid: lid} prov, _ := sdk.AccAddressFromBech32(lid.Provider) - gclient, err := gwrest.NewClient(akashclient.NewQueryClientFromCtx(cctx), prov, []tls.Certificate{cert}) + gclient, err := gwrest.NewClient(cl, prov, []tls.Certificate{cert}) if err == nil { stream.stream, stream.error = gclient.LeaseEvents(ctx, lid, svcs, follow) } else { diff --git a/cmd/provider-services/cmd/leaseLogs.go b/cmd/provider-services/cmd/leaseLogs.go index fcfa2f96..cecf4924 100644 --- a/cmd/provider-services/cmd/leaseLogs.go +++ b/cmd/provider-services/cmd/leaseLogs.go @@ -12,10 +12,10 @@ import ( dtypes "github.com/akash-network/akash-api/go/node/deployment/v1beta3" mtypes "github.com/akash-network/akash-api/go/node/market/v1beta4" - akashclient "github.com/akash-network/node/client" cmdcommon "github.com/akash-network/node/cmd/common" cutils "github.com/akash-network/node/x/cert/utils" + aclient "github.com/akash-network/provider/client" gwrest "github.com/akash-network/provider/gateway/rest" ) @@ -45,6 +45,13 @@ func doLeaseLogs(cmd *cobra.Command) error { return err } + ctx := cmd.Context() + + cl, err := aclient.DiscoverQueryClient(ctx, cctx) + if err != nil { + return err + } + cert, err := cutils.LoadAndQueryCertificateForAccount(cmd.Context(), cctx, nil) if err != nil { return markRPCServerError(err) @@ -55,7 +62,7 @@ func doLeaseLogs(cmd *cobra.Command) error { return err } - leases, err := leasesForDeployment(cmd.Context(), cctx, cmd.Flags(), dtypes.DeploymentID{ + leases, err := leasesForDeployment(cmd.Context(), cl, cmd.Flags(), dtypes.DeploymentID{ Owner: cctx.GetFromAddress().String(), DSeq: dseq, }) @@ -99,12 +106,10 @@ func doLeaseLogs(cmd *cobra.Command) error { streams := make([]result, 0, len(leases)) - ctx := cmd.Context() - for _, lid := range leases { stream := result{lid: lid} prov, _ := sdk.AccAddressFromBech32(lid.Provider) - gclient, err := gwrest.NewClient(akashclient.NewQueryClientFromCtx(cctx), prov, []tls.Certificate{cert}) + gclient, err := gwrest.NewClient(cl, prov, []tls.Certificate{cert}) if err == nil { stream.stream, stream.error = gclient.LeaseLogs(ctx, lid, svcs, follow, tailLines) } else { diff --git a/cmd/provider-services/cmd/leaseStatus.go b/cmd/provider-services/cmd/leaseStatus.go index bf7306bf..954fc424 100644 --- a/cmd/provider-services/cmd/leaseStatus.go +++ b/cmd/provider-services/cmd/leaseStatus.go @@ -6,12 +6,12 @@ import ( sdkclient "github.com/cosmos/cosmos-sdk/client" "github.com/spf13/cobra" - akashclient "github.com/akash-network/node/client" cmdcommon "github.com/akash-network/node/cmd/common" cutils "github.com/akash-network/node/x/cert/utils" dcli "github.com/akash-network/node/x/deployment/client/cli" mcli "github.com/akash-network/node/x/market/client/cli" + aclient "github.com/akash-network/provider/client" gwrest "github.com/akash-network/provider/gateway/rest" ) @@ -37,6 +37,13 @@ func doLeaseStatus(cmd *cobra.Command) error { return err } + ctx := cmd.Context() + + cl, err := aclient.DiscoverQueryClient(ctx, cctx) + if err != nil { + return err + } + prov, err := providerFromFlags(cmd.Flags()) if err != nil { return err @@ -52,7 +59,7 @@ func doLeaseStatus(cmd *cobra.Command) error { return markRPCServerError(err) } - gclient, err := gwrest.NewClient(akashclient.NewQueryClientFromCtx(cctx), prov, []tls.Certificate{cert}) + gclient, err := gwrest.NewClient(cl, prov, []tls.Certificate{cert}) if err != nil { return err } diff --git a/cmd/provider-services/cmd/manifest.go b/cmd/provider-services/cmd/manifest.go index 4d247db9..56379dfe 100644 --- a/cmd/provider-services/cmd/manifest.go +++ b/cmd/provider-services/cmd/manifest.go @@ -14,10 +14,10 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" dtypes "github.com/akash-network/akash-api/go/node/deployment/v1beta3" - akashclient "github.com/akash-network/node/client" "github.com/akash-network/node/sdl" cutils "github.com/akash-network/node/x/cert/utils" + aclient "github.com/akash-network/provider/client" gwrest "github.com/akash-network/provider/gateway/rest" ) @@ -51,6 +51,13 @@ func doSendManifest(cmd *cobra.Command, sdlpath string) error { return err } + ctx := cmd.Context() + + cl, err := aclient.DiscoverQueryClient(ctx, cctx) + if err != nil { + return err + } + sdl, err := sdl.ReadFile(sdlpath) if err != nil { return err @@ -72,7 +79,7 @@ func doSendManifest(cmd *cobra.Command, sdlpath string) error { } // owner address in FlagFrom has already been validated thus save to just pull its value as string - leases, err := leasesForDeployment(cmd.Context(), cctx, cmd.Flags(), dtypes.DeploymentID{ + leases, err := leasesForDeployment(cmd.Context(), cl, cmd.Flags(), dtypes.DeploymentID{ Owner: cctx.GetFromAddress().String(), DSeq: dseq, }) @@ -93,7 +100,7 @@ func doSendManifest(cmd *cobra.Command, sdlpath string) error { for i, lid := range leases { prov, _ := sdk.AccAddressFromBech32(lid.Provider) - gclient, err := gwrest.NewClient(akashclient.NewQueryClientFromCtx(cctx), prov, []tls.Certificate{cert}) + gclient, err := gwrest.NewClient(cl, prov, []tls.Certificate{cert}) if err != nil { return err } diff --git a/cmd/provider-services/cmd/migrate_endpoints.go b/cmd/provider-services/cmd/migrate_endpoints.go index 1b1e2dc0..cd7408db 100644 --- a/cmd/provider-services/cmd/migrate_endpoints.go +++ b/cmd/provider-services/cmd/migrate_endpoints.go @@ -8,9 +8,9 @@ import ( sdkclient "github.com/cosmos/cosmos-sdk/client" - akashclient "github.com/akash-network/node/client" cutils "github.com/akash-network/node/x/cert/utils" + aclient "github.com/akash-network/provider/client" gwrest "github.com/akash-network/provider/gateway/rest" ) @@ -27,6 +27,13 @@ func migrateEndpoints(cmd *cobra.Command, args []string) error { return err } + ctx := cmd.Context() + + cl, err := aclient.DiscoverQueryClient(ctx, cctx) + if err != nil { + return err + } + prov, err := providerFromFlags(cmd.Flags()) if err != nil { return err @@ -37,7 +44,7 @@ func migrateEndpoints(cmd *cobra.Command, args []string) error { return markRPCServerError(err) } - gclient, err := gwrest.NewClient(akashclient.NewQueryClientFromCtx(cctx), prov, []tls.Certificate{cert}) + gclient, err := gwrest.NewClient(cl, prov, []tls.Certificate{cert}) if err != nil { return err } diff --git a/cmd/provider-services/cmd/migrate_hostnames.go b/cmd/provider-services/cmd/migrate_hostnames.go index 9b10feb2..4f388911 100644 --- a/cmd/provider-services/cmd/migrate_hostnames.go +++ b/cmd/provider-services/cmd/migrate_hostnames.go @@ -8,9 +8,9 @@ import ( sdkclient "github.com/cosmos/cosmos-sdk/client" - akashclient "github.com/akash-network/node/client" cutils "github.com/akash-network/node/x/cert/utils" + aclient "github.com/akash-network/provider/client" gwrest "github.com/akash-network/provider/gateway/rest" ) @@ -26,6 +26,13 @@ func migrateHostnames(cmd *cobra.Command, args []string) error { return err } + ctx := cmd.Context() + + cl, err := aclient.DiscoverQueryClient(ctx, cctx) + if err != nil { + return err + } + prov, err := providerFromFlags(cmd.Flags()) if err != nil { return err @@ -36,7 +43,7 @@ func migrateHostnames(cmd *cobra.Command, args []string) error { return markRPCServerError(err) } - gclient, err := gwrest.NewClient(akashclient.NewQueryClientFromCtx(cctx), prov, []tls.Certificate{cert}) + gclient, err := gwrest.NewClient(cl, prov, []tls.Certificate{cert}) if err != nil { return err } diff --git a/cmd/provider-services/cmd/run.go b/cmd/provider-services/cmd/run.go index b81f2f07..04942522 100644 --- a/cmd/provider-services/cmd/run.go +++ b/cmd/provider-services/cmd/run.go @@ -11,6 +11,7 @@ import ( "strings" "time" + sdkclient "github.com/cosmos/cosmos-sdk/client" "github.com/pkg/errors" "github.com/shopspring/decimal" "github.com/spf13/cobra" @@ -19,26 +20,22 @@ import ( "github.com/tendermint/tendermint/libs/log" - sdkclient "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/client/flags" - "github.com/cosmos/cosmos-sdk/client/tx" sdk "github.com/cosmos/cosmos-sdk/types" ctypes "github.com/akash-network/akash-api/go/node/cert/v1beta3" mparams "github.com/akash-network/akash-api/go/node/market/v1beta4" ptypes "github.com/akash-network/akash-api/go/node/provider/v1beta3" - "github.com/akash-network/node/client" "github.com/akash-network/node/cmd/common" "github.com/akash-network/node/events" "github.com/akash-network/node/pubsub" "github.com/akash-network/node/sdl" - cmodule "github.com/akash-network/node/x/cert" cutils "github.com/akash-network/node/x/cert/utils" config2 "github.com/akash-network/node/x/provider/config" "github.com/akash-network/provider" "github.com/akash-network/provider/bidengine" - "github.com/akash-network/provider/client/broadcaster" + "github.com/akash-network/provider/client" "github.com/akash-network/provider/cluster" "github.com/akash-network/provider/cluster/kube" "github.com/akash-network/provider/cluster/kube/builder" @@ -453,7 +450,7 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error { dockerImagePullSecretsName := viper.GetString(FlagDockerImagePullSecretsName) strategy := viper.GetString(FlagBidPricingStrategy) deploymentIngressExposeLBHosts := viper.GetBool(FlagDeploymentIngressExposeLBHosts) - from := viper.GetString(flags.FlagFrom) + // from := viper.GetString(flags.FlagFrom) overcommitPercentStorage := 1.0 + float64(viper.GetUint64(FlagOvercommitPercentStorage)/100.0) overcommitPercentCPU := 1.0 + float64(viper.GetUint64(FlagOvercommitPercentCPU)/100.0) // no GPU overcommit @@ -469,7 +466,7 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error { cachedResultMaxAge := viper.GetDuration(FlagCachedResultMaxAge) rpcQueryTimeout := viper.GetDuration(FlagRPCQueryTimeout) enableIPOperator := viper.GetBool(FlagEnableIPOperator) - txTimeout := viper.GetDuration(FlagTxBroadcastTimeout) + // txTimeout := viper.GetDuration(FlagTxBroadcastTimeout) pricing, err := createBidPricingStrategy(strategy) if err != nil { @@ -487,25 +484,23 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error { metricsRouter = makeMetricsRouter() } - cctx := sdkclient.GetClientContextFromCmd(cmd) + clGroup, clCtx := errgroup.WithContext(ctx) - _, _, _, err = sdkclient.GetFromFields(cctx, cctx.Keyring, from) + cctx, err := sdkclient.GetClientTxContext(cmd) if err != nil { return err } - cctx, err = sdkclient.GetClientTxContext(cmd) + cl, err := client.DiscoverClient(clCtx, cctx, cmd.Flags()) if err != nil { return err } - txFactory := tx.NewFactoryCLI(cctx, cmd.Flags()).WithTxConfig(cctx.TxConfig).WithAccountRetriever(cctx.AccountRetriever) - - keyname := cctx.GetFromName() - info, err := txFactory.Keybase().Key(keyname) - if err != nil { - return err - } + // keyname := cctx.GetFromName() + // info, err := txFactory.Keybase().Key(keyname) + // if err != nil { + // return err + // } gwaddr := viper.GetString(FlagGatewayListenAddress) @@ -514,7 +509,7 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error { certFromFlag = bytes.NewBufferString(val) } - kpm, err := cutils.NewKeyPairManager(cctx, cctx.FromAddress) + kpm, err := cutils.NewKeyPairManager(cl.ClientContext(), cctx.FromAddress) if err != nil { return err } @@ -525,8 +520,7 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error { } // Check that the certificate exists on chain and is not revoked - cquery := cmodule.AppModuleBasic{}.GetQueryClient(cctx) - cresp, err := cquery.Certificates(cmd.Context(), &ctypes.QueryCertificatesRequest{ + cresp, err := cl.Query().Certificates(cmd.Context(), &ctypes.QueryCertificatesRequest{ Filter: ctypes.CertificateFilter{ Owner: cctx.FromAddress.String(), Serial: x509cert.SerialNumber.String(), @@ -541,23 +535,9 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error { return errors.Errorf("no valid found on chain certificate for account %s", cctx.FromAddress) } - broadcasterInstance, err := broadcaster.NewSerialClient(cmd.Context(), logger, cctx, txTimeout, txFactory, info) - if err != nil { - return err - } - - aclient := client.NewClientWithBroadcaster( - logger, - cctx, - txFactory, - info, - client.NewQueryClientFromCtx(cctx), - broadcasterInstance, - ) - - res, err := aclient.Query().Provider( + res, err := cl.Query().Provider( cmd.Context(), - &ptypes.QueryProviderRequest{Owner: info.GetAddress().String()}, + &ptypes.QueryProviderRequest{Owner: cctx.FromAddress.String()}, ) if err != nil { return err @@ -597,7 +577,7 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error { return err } currentBlockHeight := statusResult.SyncInfo.LatestBlockHeight - session := session.New(logger, aclient, pinfo, currentBlockHeight) + session := session.New(logger, cl, pinfo, currentBlockHeight) if err := cctx.Client.Start(); err != nil { return err @@ -606,7 +586,7 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error { bus := pubsub.NewBus() defer bus.Close() - group, ctx := errgroup.WithContext(ctx) + group, ctx := errgroup.WithContext(clCtx) // Provider service creation config := provider.NewDefaultConfig() @@ -682,7 +662,7 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error { operatorWaiter := waiter.NewOperatorWaiter(cmd.Context(), logger, waitClients...) - service, err := provider.NewService(ctx, cctx, info.GetAddress(), session, bus, cclient, ipOperatorClient, operatorWaiter, config) + service, err := provider.NewService(ctx, cctx, cctx.FromAddress, session, bus, cclient, ipOperatorClient, operatorWaiter, config) if err != nil { return err } @@ -691,7 +671,7 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error { ctx, logger, service, - cquery, + cl.Query(), ipOperatorClient, gwaddr, cctx.FromAddress, @@ -702,6 +682,10 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error { return err } + clGroup.Go(func() error { + return group.Wait() + }) + group.Go(func() error { return events.Publish(ctx, cctx.Client, "provider-cli", bus) }) @@ -738,11 +722,12 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error { }) } - err = group.Wait() - broadcasterInstance.Close() + err = clGroup.Wait() + if ipOperatorClient != nil { ipOperatorClient.Stop() } + hostnameOperatorClient.Stop() if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, http.ErrServerClosed) { return err diff --git a/cmd/provider-services/cmd/serviceStatus.go b/cmd/provider-services/cmd/serviceStatus.go index 4f09e5cf..c64fe67c 100644 --- a/cmd/provider-services/cmd/serviceStatus.go +++ b/cmd/provider-services/cmd/serviceStatus.go @@ -6,12 +6,12 @@ import ( sdkclient "github.com/cosmos/cosmos-sdk/client" "github.com/spf13/cobra" - akashclient "github.com/akash-network/node/client" cmdcommon "github.com/akash-network/node/cmd/common" cutils "github.com/akash-network/node/x/cert/utils" dcli "github.com/akash-network/node/x/deployment/client/cli" mcli "github.com/akash-network/node/x/market/client/cli" + aclient "github.com/akash-network/provider/client" gwrest "github.com/akash-network/provider/gateway/rest" ) @@ -40,6 +40,13 @@ func doServiceStatus(cmd *cobra.Command) error { return err } + ctx := cmd.Context() + + cl, err := aclient.DiscoverQueryClient(ctx, cctx) + if err != nil { + return err + } + svcName, err := cmd.Flags().GetString(FlagService) if err != nil { return err @@ -60,7 +67,7 @@ func doServiceStatus(cmd *cobra.Command) error { return markRPCServerError(err) } - gclient, err := gwrest.NewClient(akashclient.NewQueryClientFromCtx(cctx), prov, []tls.Certificate{cert}) + gclient, err := gwrest.NewClient(cl, prov, []tls.Certificate{cert}) if err != nil { return err } diff --git a/cmd/provider-services/cmd/shell.go b/cmd/provider-services/cmd/shell.go index c885e036..a1bf1436 100644 --- a/cmd/provider-services/cmd/shell.go +++ b/cmd/provider-services/cmd/shell.go @@ -19,11 +19,11 @@ import ( sdkclient "github.com/cosmos/cosmos-sdk/client" - akashclient "github.com/akash-network/node/client" cutils "github.com/akash-network/node/x/cert/utils" dcli "github.com/akash-network/node/x/deployment/client/cli" mcli "github.com/akash-network/node/x/market/client/cli" + aclient "github.com/akash-network/provider/client" gwrest "github.com/akash-network/provider/gateway/rest" ) @@ -34,7 +34,7 @@ const ( ) var ( - errTerminalNotATty = errors.New("Input is not a terminal, cannot setup TTY") + errTerminalNotATty = errors.New("input is not a terminal, cannot setup TTY") ) func LeaseShellCmd() *cobra.Command { @@ -107,6 +107,13 @@ func doLeaseShell(cmd *cobra.Command, args []string) error { return err } + ctx := cmd.Context() + + cl, err := aclient.DiscoverQueryClient(ctx, cctx) + if err != nil { + return err + } + prov, err := providerFromFlags(cmd.Flags()) if err != nil { return err @@ -118,12 +125,12 @@ func doLeaseShell(cmd *cobra.Command, args []string) error { } lID := bidID.LeaseID() - cert, err := cutils.LoadAndQueryCertificateForAccount(cmd.Context(), cctx, nil) + cert, err := cutils.LoadAndQueryCertificateForAccount(ctx, cctx, nil) if err != nil { return markRPCServerError(err) } - gclient, err := gwrest.NewClient(akashclient.NewQueryClientFromCtx(cctx), prov, []tls.Certificate{cert}) + gclient, err := gwrest.NewClient(cl, prov, []tls.Certificate{cert}) if err != nil { return err } @@ -133,7 +140,7 @@ func doLeaseShell(cmd *cobra.Command, args []string) error { var terminalResizes chan remotecommand.TerminalSize wg := &sync.WaitGroup{} - ctx, cancel := context.WithCancel(cmd.Context()) + ctx, cancel := context.WithCancel(ctx) if tsq != nil { terminalResizes = make(chan remotecommand.TerminalSize, 1) diff --git a/cmd/provider-services/cmd/status.go b/cmd/provider-services/cmd/status.go index e5664ad3..272bded9 100644 --- a/cmd/provider-services/cmd/status.go +++ b/cmd/provider-services/cmd/status.go @@ -5,9 +5,9 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" "github.com/spf13/cobra" - akashclient "github.com/akash-network/node/client" cmdcommon "github.com/akash-network/node/cmd/common" + aclient "github.com/akash-network/provider/client" gwrest "github.com/akash-network/provider/gateway/rest" ) @@ -36,7 +36,14 @@ func doStatus(cmd *cobra.Command, addr sdk.Address) error { return err } - gclient, err := gwrest.NewClient(akashclient.NewQueryClientFromCtx(cctx), addr, nil) + ctx := cmd.Context() + + cl, err := aclient.DiscoverQueryClient(ctx, cctx) + if err != nil { + return err + } + + gclient, err := gwrest.NewClient(cl, addr, nil) if err != nil { return err } diff --git a/gateway/rest/client.go b/gateway/rest/client.go index 2cdaaa66..871aca2c 100644 --- a/gateway/rest/client.go +++ b/gateway/rest/client.go @@ -16,6 +16,7 @@ import ( "sync" "time" + aclient "github.com/akash-network/akash-api/go/node/client/v1beta2" "github.com/golang-jwt/jwt/v4" "github.com/gorilla/websocket" "github.com/pkg/errors" @@ -30,7 +31,6 @@ import ( mtypes "github.com/akash-network/akash-api/go/node/market/v1beta4" ptypes "github.com/akash-network/akash-api/go/node/provider/v1beta3" - akashclient "github.com/akash-network/node/client" cutils "github.com/akash-network/node/x/cert/utils" "github.com/akash-network/provider" @@ -86,7 +86,7 @@ type ServiceLogs struct { } // NewClient returns a new Client -func NewClient(qclient akashclient.QueryClient, addr sdk.Address, certs []tls.Certificate) (Client, error) { +func NewClient(qclient aclient.QueryClient, addr sdk.Address, certs []tls.Certificate) (Client, error) { res, err := qclient.Provider(context.Background(), &ptypes.QueryProviderRequest{Owner: addr.String()}) if err != nil { return nil, err @@ -100,7 +100,7 @@ func NewClient(qclient akashclient.QueryClient, addr sdk.Address, certs []tls.Ce return newClient(qclient, addr, certs, uri), nil } -func newClient(qclient akashclient.QueryClient, addr sdk.Address, certs []tls.Certificate, uri *url.URL) *client { +func newClient(qclient aclient.QueryClient, addr sdk.Address, certs []tls.Certificate, uri *url.URL) *client { cl := &client{ host: uri, addr: addr, @@ -164,12 +164,12 @@ func (cd *ClientDirectory) GetClient(providerAddr sdk.Address) (Client, error) { return client, nil } - client, err := NewClient(akashclient.NewQueryClientFromCtx(cd.cosmosContext), providerAddr, []tls.Certificate{cd.clientCert}) - if err != nil { - return nil, err - } - - cd.clients[providerAddr.String()] = client // Store the client + // client, err := NewClient(akashclient.NewQueryClientFromCtx(cd.cosmosContext), providerAddr, []tls.Certificate{cd.clientCert}) + // if err != nil { + // return nil, err + // } + // + // cd.clients[providerAddr.String()] = client // Store the client return client, nil } diff --git a/gateway/rest/integration_test.go b/gateway/rest/integration_test.go index c7ee5fe1..e7977e4e 100644 --- a/gateway/rest/integration_test.go +++ b/gateway/rest/integration_test.go @@ -13,11 +13,11 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" akashmanifest "github.com/akash-network/akash-api/go/manifest/v2beta2" + qmock "github.com/akash-network/akash-api/go/node/client/v1beta2/mocks" dtypes "github.com/akash-network/akash-api/go/node/deployment/v1beta3" mtypes "github.com/akash-network/akash-api/go/node/market/v1beta4" providertypes "github.com/akash-network/akash-api/go/node/provider/v1beta3" - qmock "github.com/akash-network/node/client/mocks" "github.com/akash-network/node/testutil" "github.com/akash-network/provider" diff --git a/gateway/rest/router_test.go b/gateway/rest/router_test.go index d1179ab7..de769dc8 100644 --- a/gateway/rest/router_test.go +++ b/gateway/rest/router_test.go @@ -20,11 +20,11 @@ import ( "github.com/cosmos/cosmos-sdk/version" manifestValidation "github.com/akash-network/akash-api/go/manifest/v2beta2" + qmock "github.com/akash-network/akash-api/go/node/client/v1beta2/mocks" dtypes "github.com/akash-network/akash-api/go/node/deployment/v1beta3" mtypes "github.com/akash-network/akash-api/go/node/market/v1beta4" types "github.com/akash-network/akash-api/go/node/market/v1beta4" - qmock "github.com/akash-network/node/client/mocks" "github.com/akash-network/node/sdl" "github.com/akash-network/node/testutil" diff --git a/go.mod b/go.mod index 7cbc2bf7..922a044f 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,8 @@ module github.com/akash-network/provider go 1.21 require ( - github.com/akash-network/akash-api v0.0.33 - github.com/akash-network/node v0.28.2 + github.com/akash-network/akash-api v0.0.41 + github.com/akash-network/node v0.30.1-rc3 github.com/avast/retry-go/v4 v4.5.0 github.com/boz/go-lifecycle v0.1.1 github.com/cosmos/cosmos-sdk v0.45.16 @@ -29,6 +29,7 @@ require ( github.com/stretchr/testify v1.8.4 github.com/tendermint/tendermint v0.34.27 go.uber.org/zap v1.24.0 + golang.org/x/net v0.14.0 golang.org/x/sync v0.3.0 gopkg.in/yaml.v3 v3.0.1 k8s.io/api v0.26.1 @@ -70,7 +71,6 @@ require ( github.com/ChainSafe/go-schnorrkel v0.0.0-20200405005733-88cbf1b4c40d // indirect github.com/DataDog/zstd v1.5.0 // indirect github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect - github.com/ProtonMail/go-crypto v0.0.0-20230217124315-7d5c6f04bbb8 // indirect github.com/Workiva/go-datastructures v1.0.53 // indirect github.com/alessio/shellescape v1.4.1 // indirect github.com/armon/go-metrics v0.4.1 // indirect @@ -82,7 +82,6 @@ require ( github.com/cenkalti/backoff/v3 v3.2.2 // indirect github.com/cespare/xxhash v1.1.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect - github.com/cloudflare/circl v1.3.3 // indirect github.com/cockroachdb/errors v1.9.1 // indirect github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect github.com/cockroachdb/pebble v0.0.0-20220817183557-09c6e030a677 // indirect @@ -110,6 +109,7 @@ require ( github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect github.com/dustin/go-humanize v1.0.0 // indirect github.com/dvsekhvalnov/jose2go v1.5.0 // indirect + github.com/edwingeng/deque/v2 v2.1.1 // indirect github.com/emicklei/go-restful/v3 v3.10.0 // indirect github.com/evanphx/json-patch v5.6.0+incompatible // indirect github.com/evanphx/json-patch/v5 v5.6.0 // indirect @@ -130,7 +130,7 @@ require ( github.com/google/btree v1.1.2 // indirect github.com/google/gnostic v0.6.9 // indirect github.com/google/go-cmp v0.5.9 // indirect - github.com/google/go-github/v53 v53.2.0 // indirect + github.com/google/go-github/v56 v56.0.0 // indirect github.com/google/go-querystring v1.1.0 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/orderedcode v0.0.1 // indirect @@ -236,7 +236,6 @@ require ( golang.org/x/crypto v0.12.0 // indirect golang.org/x/exp v0.0.0-20221019170559-20944726eadf // indirect golang.org/x/mod v0.12.0 // indirect - golang.org/x/net v0.14.0 // indirect golang.org/x/oauth2 v0.11.0 // indirect golang.org/x/sys v0.11.0 // indirect golang.org/x/term v0.11.0 // indirect diff --git a/go.sum b/go.sum index f768a3bd..b3cf62aa 100644 --- a/go.sum +++ b/go.sum @@ -173,8 +173,6 @@ github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEV github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk= github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= -github.com/ProtonMail/go-crypto v0.0.0-20230217124315-7d5c6f04bbb8 h1:wPbRQzjjwFc0ih8puEVAOFGELsn1zoIIYdxvML7mDxA= -github.com/ProtonMail/go-crypto v0.0.0-20230217124315-7d5c6f04bbb8/go.mod h1:I0gYDMZ6Z5GRU7l58bNFSkPTFN6Yl12dsUlAZ8xy98g= github.com/PuerkitoBio/purell v1.0.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/purell v1.1.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= @@ -200,16 +198,16 @@ github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia github.com/agnivade/levenshtein v1.0.1/go.mod h1:CURSv5d9Uaml+FovSIICkLbAUZ9S4RqaHDIsdSBg7lM= github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= -github.com/akash-network/akash-api v0.0.33 h1:SvOht1F3BDz3el8XKC7lbRzc6D++kERs5wFRisMWzCY= -github.com/akash-network/akash-api v0.0.33/go.mod h1:pW5NnJNhxynCOUGEcgxa338GSU2qzMkpn3MMYcAY6O4= +github.com/akash-network/akash-api v0.0.41 h1:N3NUF0ZZNU/ypHEUyd8U7wY9+OcR4pTgA2HkeLz7NOI= +github.com/akash-network/akash-api v0.0.41/go.mod h1:aeB/9lti2LegbrOm0fSRNB0iKoE8JmMD8ou9EXJ8QGY= github.com/akash-network/cometbft v0.34.27-akash h1:V1dApDOr8Ee7BJzYyQ7Z9VBtrAul4+baMeA6C49dje0= github.com/akash-network/cometbft v0.34.27-akash/go.mod h1:BcCbhKv7ieM0KEddnYXvQZR+pZykTKReJJYf7YC7qhw= github.com/akash-network/ledger-go v0.14.3 h1:LCEFkTfgGA2xFMN2CtiKvXKE7dh0QSM77PJHCpSkaAo= github.com/akash-network/ledger-go v0.14.3/go.mod h1:NfsjfFvno9Kaq6mfpsKz4sqjnAVVEsVsnBJfKB4ueAs= github.com/akash-network/ledger-go/cosmos v0.14.3 h1:bEI9jLHM+Lm55idi4RfJlDez4/rVJs7E1MT0U2whYqI= github.com/akash-network/ledger-go/cosmos v0.14.3/go.mod h1:SjAfheQTE4rWk0ir+wjbOWxwj8nc8E4AZ08NdsvYG24= -github.com/akash-network/node v0.28.2 h1:I+8GOegN2l3GsR1P1bmEMz0Ej2buLd1E44F1ZiXpnUc= -github.com/akash-network/node v0.28.2/go.mod h1:03sIQZlCoOmP/VIEKZ9m4ptvaadHgyk5wnfypeLsYyI= +github.com/akash-network/node v0.30.1-rc3 h1:GZ0Ox7pVkH2IKD6sTTRrhXk8SXX/9f9coxI8hwXrbFw= +github.com/akash-network/node v0.30.1-rc3/go.mod h1:Wx/R2O/mSyZEKmDsY2vOHcSETyHeAFw/vJ3fzrrnDsk= github.com/alecthomas/participle/v2 v2.0.0-alpha7 h1:cK4vjj0VSgb3lN1nuKA5F7dw+1s1pWBe5bx7nNCnN+c= github.com/alecthomas/participle/v2 v2.0.0-alpha7/go.mod h1:NumScqsC42o9x+dGj8/YqsIfhrIQjFEOFovxotbBirA= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -354,9 +352,6 @@ github.com/circonus-labs/circonusllhist v0.1.3 h1:TJH+oke8D16535+jHExHj4nQvzlZrj github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= github.com/clbanning/x2j v0.0.0-20191024224557-825249438eec/go.mod h1:jMjuTZXRI4dUb/I5gc9Hdhagfvm9+RyrPryS/auMzxE= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= -github.com/cloudflare/circl v1.1.0/go.mod h1:prBCrKB9DV4poKZY1l9zBXg2QJY7mvgRvtMxxK7fi4I= -github.com/cloudflare/circl v1.3.3 h1:fE/Qz0QdIGqeWfnwq0RE0R7MI51s0M2E4Ga9kq5AEMs= -github.com/cloudflare/circl v1.3.3/go.mod h1:5XYMA4rFBvNIrhs50XuiBJ15vF2pZn4nnUKZrLbUZFA= github.com/cloudflare/cloudflare-go v0.14.0/go.mod h1:EnwdgGMaFOruiPZRFSgn+TsQ3hQ7C/YWzIGLeu5c304= github.com/cloudfoundry-community/go-cfclient v0.0.0-20190201205600-f136f9222381/go.mod h1:e5+USP2j8Le2M0Jo3qKPFnNhuo1wueU4nWHCXBOfQ14= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= @@ -533,6 +528,8 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1 github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts= github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= +github.com/edwingeng/deque/v2 v2.1.1 h1:+xjC3TnaeMPLZMi7QQf9jN2K00MZmTwruApqplbL9IY= +github.com/edwingeng/deque/v2 v2.1.1/go.mod h1:HukI8CQe9KDmZCcURPZRYVYjH79Zy2tIjTF9sN3Bgb0= github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM= github.com/elazarl/go-bindata-assetfs v1.0.0/go.mod h1:v+YaWX3bdea5J/mo8dSETolEo7R71Vk1u8bnjau5yw4= github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= @@ -859,8 +856,8 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ= -github.com/google/go-github/v53 v53.2.0 h1:wvz3FyF53v4BK+AsnvCmeNhf8AkTaeh2SoYu/XUvTtI= -github.com/google/go-github/v53 v53.2.0/go.mod h1:XhFRObz+m/l+UCm9b7KSIC3lT3NWSXGt7mOsAWEloao= +github.com/google/go-github/v56 v56.0.0 h1:TysL7dMa/r7wsQi44BjqlwaHvwlFlqkK8CtBWCX3gb4= +github.com/google/go-github/v56 v56.0.0/go.mod h1:D8cdcX98YWJvi7TLo7zM4/h8ZTx6u6fwGEkCdisopo0= github.com/google/go-metrics-stackdriver v0.2.0 h1:rbs2sxHAPn2OtUj9JdR/Gij1YKGl0BTVD0augB+HEjE= github.com/google/go-metrics-stackdriver v0.2.0/go.mod h1:KLcPyp3dWJAFD+yHisGlJSZktIsTjb50eB72U2YZ9K0= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= diff --git a/manifest/watchdog.go b/manifest/watchdog.go index 0be3c2fc..c4dd4114 100644 --- a/manifest/watchdog.go +++ b/manifest/watchdog.go @@ -4,7 +4,9 @@ import ( "context" "time" + aclient "github.com/akash-network/akash-api/go/node/client/v1beta2" "github.com/boz/go-lifecycle" + sdk "github.com/cosmos/cosmos-sdk/types" "github.com/tendermint/tendermint/libs/log" @@ -67,9 +69,11 @@ func (wd *watchdog) run() { wd.log.Info("watchdog closing bid") runch = runner.Do(func() runner.Result { - return runner.NewResult(nil, wd.sess.Client().Tx().Broadcast(wd.ctx, &types.MsgCloseBid{ + msg := &types.MsgCloseBid{ BidID: types.MakeBidID(wd.leaseID.OrderID(), wd.sess.Provider().Address()), - })) + } + + return runner.NewResult(wd.sess.Client().Tx().Broadcast(wd.ctx, []sdk.Msg{msg}, aclient.WithResultCodeAsError())) }) case err = <-wd.lc.ShutdownRequest(): } diff --git a/manifest/watchdog_test.go b/manifest/watchdog_test.go index adb8a7a3..f9c465e2 100644 --- a/manifest/watchdog_test.go +++ b/manifest/watchdog_test.go @@ -9,12 +9,11 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" + clientmocks "github.com/akash-network/akash-api/go/node/client/v1beta2/mocks" dtypes "github.com/akash-network/akash-api/go/node/deployment/v1beta3" types "github.com/akash-network/akash-api/go/node/market/v1beta4" ptypes "github.com/akash-network/akash-api/go/node/provider/v1beta3" - broadcastmocks "github.com/akash-network/node/client/broadcaster/mocks" - clientmocks "github.com/akash-network/node/client/mocks" "github.com/akash-network/node/testutil" "github.com/akash-network/provider/session" @@ -24,7 +23,7 @@ type watchdogTestScaffold struct { client *clientmocks.Client parentCh chan struct{} doneCh chan dtypes.DeploymentID - broadcasts chan sdk.Msg + broadcasts chan []sdk.Msg leaseID types.LeaseID provider ptypes.Provider } @@ -33,16 +32,15 @@ func makeWatchdogTestScaffold(t *testing.T, timeout time.Duration) (*watchdog, * scaffold := &watchdogTestScaffold{} scaffold.parentCh = make(chan struct{}) scaffold.doneCh = make(chan dtypes.DeploymentID, 1) - scaffold.broadcasts = make(chan sdk.Msg) scaffold.provider = testutil.Provider(t) scaffold.leaseID = testutil.LeaseID(t) scaffold.leaseID.Provider = scaffold.provider.Owner - scaffold.broadcasts = make(chan sdk.Msg, 1) + scaffold.broadcasts = make(chan []sdk.Msg, 1) - txClientMock := &broadcastmocks.Client{} - txClientMock.On("Broadcast", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { - scaffold.broadcasts <- args.Get(1).(sdk.Msg) - }).Return(nil) + txClientMock := &clientmocks.TxClient{} + txClientMock.On("Broadcast", mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + scaffold.broadcasts <- args.Get(1).([]sdk.Msg) + }).Return(&sdk.Result{}, nil) scaffold.client = &clientmocks.Client{} scaffold.client.On("Tx").Return(txClientMock) @@ -55,7 +53,7 @@ func makeWatchdogTestScaffold(t *testing.T, timeout time.Duration) (*watchdog, * return wd, scaffold } -func TestWatchdogTimesout(t *testing.T) { +func TestWatchdogTimeout(t *testing.T) { wd, scaffold := makeWatchdogTestScaffold(t, 3*time.Second) select { @@ -65,10 +63,15 @@ func TestWatchdogTimesout(t *testing.T) { } // Check that close bid was sent - msg := testutil.ChannelWaitForValue(t, scaffold.broadcasts) - closeBid, ok := msg.(*types.MsgCloseBid) - require.True(t, ok) - require.Equal(t, closeBid.BidID.LeaseID(), scaffold.leaseID) + broadcasts := testutil.ChannelWaitForValue(t, scaffold.broadcasts) + require.IsType(t, []sdk.Msg{}, broadcasts) + + msgs := broadcasts.([]sdk.Msg) + require.Len(t, msgs, 1) + require.IsType(t, &types.MsgCloseBid{}, msgs[0]) + + msg := msgs[0].(*types.MsgCloseBid) + require.Equal(t, scaffold.leaseID, msg.BidID.LeaseID()) deploymentID := testutil.ChannelWaitForValue(t, scaffold.doneCh) require.Equal(t, deploymentID, scaffold.leaseID.DeploymentID()) diff --git a/service.go b/service.go index 11a4054f..b297f6bc 100644 --- a/service.go +++ b/service.go @@ -11,10 +11,10 @@ import ( bankTypes "github.com/cosmos/cosmos-sdk/x/bank/types" dtypes "github.com/akash-network/akash-api/go/node/deployment/v1beta3" - aclient "github.com/akash-network/node/client" "github.com/akash-network/node/pubsub" "github.com/akash-network/provider/bidengine" + aclient "github.com/akash-network/provider/client" "github.com/akash-network/provider/cluster" "github.com/akash-network/provider/cluster/operatorclients" ctypes "github.com/akash-network/provider/cluster/types/v1beta3" @@ -56,7 +56,6 @@ type Service interface { // NewService creates and returns new Service instance // Simple wrapper around various services needed for running a provider. - func NewService(ctx context.Context, cctx client.Context, accAddr sdk.AccAddress, @@ -82,7 +81,13 @@ func NewService(ctx context.Context, clusterConfig.DeploymentIngressDomain = cfg.DeploymentIngressDomain clusterConfig.ClusterSettings = cfg.ClusterSettings - bc, err := newBalanceChecker(ctx, bankTypes.NewQueryClient(cctx), aclient.NewQueryClientFromCtx(cctx), accAddr, session, bus, cfg.BalanceCheckerCfg) + cl, err := aclient.DiscoverQueryClient(ctx, cctx) + if err != nil { + cancel() + return nil, err + } + + bc, err := newBalanceChecker(ctx, bankTypes.NewQueryClient(cctx), cl, accAddr, session, bus, cfg.BalanceCheckerCfg) if err != nil { session.Log().Error("starting balance checker", "err", err) cancel() diff --git a/session/session.go b/session/session.go index 741576a5..e2890098 100644 --- a/session/session.go +++ b/session/session.go @@ -3,8 +3,8 @@ package session import ( "github.com/tendermint/tendermint/libs/log" + aclient "github.com/akash-network/akash-api/go/node/client/v1beta2" ptypes "github.com/akash-network/akash-api/go/node/provider/v1beta3" - aclient "github.com/akash-network/node/client" ) // Session interface wraps Log, Client, Provider and ForModule methods diff --git a/testutil/rest/restserver.go b/testutil/rest/restserver.go index 2afc6e34..2bc68fb2 100644 --- a/testutil/rest/restserver.go +++ b/testutil/rest/restserver.go @@ -7,12 +7,12 @@ import ( "net/http/httptest" "testing" - akashclient "github.com/akash-network/node/client" + aclient "github.com/akash-network/akash-api/go/node/client/v1beta2" gwutils "github.com/akash-network/provider/gateway/utils" ) -func NewServer(t testing.TB, qclient akashclient.QueryClient, handler http.Handler, certs []tls.Certificate) *httptest.Server { +func NewServer(t testing.TB, qclient aclient.QueryClient, handler http.Handler, certs []tls.Certificate) *httptest.Server { t.Helper() ts := httptest.NewUnstartedServer(handler)