Skip to content

Commit

Permalink
feat: node client discovery
Browse files Browse the repository at this point in the history
refs akash-network/support#165

Signed-off-by: Artur Troian <[email protected]>
  • Loading branch information
troian committed Jan 9, 2024
1 parent b589d50 commit 8c7e0b6
Show file tree
Hide file tree
Showing 28 changed files with 311 additions and 568 deletions.
14 changes: 7 additions & 7 deletions balance_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@ import (
"github.com/tendermint/tendermint/libs/log"
tmrpc "github.com/tendermint/tendermint/rpc/core/types"

aclient "github.com/akash-network/akash-api/go/node/client/v1beta2"
dtypes "github.com/akash-network/akash-api/go/node/deployment/v1beta3"
mtypes "github.com/akash-network/akash-api/go/node/market/v1beta4"

aclient "github.com/akash-network/node/client"

"github.com/akash-network/node/pubsub"
netutil "github.com/akash-network/node/util/network"
"github.com/akash-network/node/util/runner"
Expand Down Expand Up @@ -124,7 +123,7 @@ func (bc *balanceChecker) doEscrowCheck(ctx context.Context, lid mtypes.LeaseID,
}

var syncInfo *tmrpc.SyncInfo
syncInfo, resp.err = bc.session.Client().NodeSyncInfo(ctx)
syncInfo, resp.err = bc.session.Client().Node().SyncInfo(ctx)
if resp.err != nil {
return resp
}
Expand Down Expand Up @@ -187,14 +186,15 @@ func (bc *balanceChecker) doEscrowCheck(ctx context.Context, lid mtypes.LeaseID,
}

func (bc *balanceChecker) startWithdraw(ctx context.Context, lid mtypes.LeaseID) error {
ctx, cancel := context.WithTimeout(ctx, withdrawTimeout)
defer cancel()

msg := &mtypes.MsgWithdrawLease{
LeaseID: lid,
}

ctx, cancel := context.WithTimeout(ctx, withdrawTimeout)
defer cancel()

return bc.session.Client().Tx().Broadcast(ctx, msg)
_, err := bc.session.Client().Tx().Broadcast(ctx, []sdk.Msg{msg}, aclient.WithResultCodeAsError())
return err
}

func (bc *balanceChecker) run(startCh chan<- error) {
Expand Down
9 changes: 6 additions & 3 deletions bidengine/order.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"regexp"
"time"

aclient "github.com/akash-network/akash-api/go/node/client/v1beta2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

Expand Down Expand Up @@ -404,7 +405,7 @@ loop:
// Begin submitting fulfillment
msg = mtypes.NewMsgCreateBid(o.orderID, o.session.Provider().Address(), price, o.cfg.Deposit, offer)
bidch = runner.Do(func() runner.Result {
return runner.NewResult(nil, o.session.Client().Tx().Broadcast(ctx, msg))
return runner.NewResult(o.session.Client().Tx().Broadcast(ctx, []sdk.Msg{msg}, aclient.WithResultCodeAsError()))
})

case result := <-bidch:
Expand Down Expand Up @@ -456,9 +457,11 @@ loop:
if bidPlaced {
o.log.Debug("closing bid", "order-id", o.orderID)

err := o.session.Client().Tx().Broadcast(ctx, &mtypes.MsgCloseBid{
msg := &mtypes.MsgCloseBid{
BidID: mtypes.MakeBidID(o.orderID, o.session.Provider().Address()),
})
}

_, err := o.session.Client().Tx().Broadcast(ctx, []sdk.Msg{msg}, aclient.WithResultCodeAsError())
if err != nil {
o.log.Error("closing bid", "err", err)
bidCounter.WithLabelValues("close", metricsutils.FailLabel).Inc()
Expand Down
107 changes: 59 additions & 48 deletions bidengine/order_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ import (
"github.com/stretchr/testify/require"

audittypes "github.com/akash-network/akash-api/go/node/audit/v1beta3"
clientmocks "github.com/akash-network/akash-api/go/node/client/v1beta2/mocks"
dtypes "github.com/akash-network/akash-api/go/node/deployment/v1beta3"
mtypes "github.com/akash-network/akash-api/go/node/market/v1beta4"
ptypes "github.com/akash-network/akash-api/go/node/provider/v1beta3"
atypes "github.com/akash-network/akash-api/go/node/types/v1beta3"
broadcastmocks "github.com/akash-network/node/client/broadcaster/mocks"
clientmocks "github.com/akash-network/node/client/mocks"

"github.com/akash-network/node/pubsub"
"github.com/akash-network/node/testutil"

Expand All @@ -32,19 +32,17 @@ import (
)

type orderTestScaffold struct {
orderID mtypes.OrderID
groupID dtypes.GroupID
testBus pubsub.Bus
testAddr sdk.AccAddress
deploymentID dtypes.DeploymentID
bidID *mtypes.BidID

queryClient *clientmocks.QueryClient
client *clientmocks.Client
txClient *broadcastmocks.Client

orderID mtypes.OrderID
groupID dtypes.GroupID
testBus pubsub.Bus
testAddr sdk.AccAddress
deploymentID dtypes.DeploymentID
bidID *mtypes.BidID
client *clientmocks.Client
queryClient *clientmocks.QueryClient
txClient *clientmocks.TxClient
cluster *clustermocks.Cluster
broadcasts chan sdk.Msg
broadcasts chan []sdk.Msg
reserveCallNotify chan int
}

Expand Down Expand Up @@ -98,11 +96,12 @@ func makeMocks(s *orderTestScaffold) {
queryClientMock.On("Orders", mock.Anything, mock.Anything).Return(&mtypes.QueryOrdersResponse{}, nil)
queryClientMock.On("Provider", mock.Anything, mock.Anything).Return(&ptypes.QueryProviderResponse{}, nil)

txMocks := &broadcastmocks.Client{}
s.broadcasts = make(chan sdk.Msg, 1)
txMocks.On("Broadcast", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
s.broadcasts <- args.Get(1).(sdk.Msg)
}).Return(nil)
txMocks := &clientmocks.TxClient{}
s.broadcasts = make(chan []sdk.Msg, 1)

txMocks.On("Broadcast", mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
s.broadcasts <- args.Get(1).([]sdk.Msg)
}).Return(&sdk.Result{}, nil)

clientMocks := &clientmocks.Client{}
clientMocks.On("Query").Return(queryClientMock)
Expand Down Expand Up @@ -214,16 +213,26 @@ func makeOrderForTest(
return order, scaffold, reservationFulfilledNotify
}

func requireMsgType[T any](t *testing.T, res interface{}) T {
t.Helper()

require.IsType(t, []sdk.Msg{}, res)

msgs := res.([]sdk.Msg)
require.Len(t, msgs, 1)
require.IsType(t, *new(T), msgs[0])

return msgs[0].(T)
}

func Test_BidOrderAndUnreserve(t *testing.T) {
order, scaffold, _ := makeOrderForTest(t, false, mtypes.BidStateInvalid, nil, nil, testBidCreatedAt)

broadcast := testutil.ChannelWaitForValue(t, scaffold.broadcasts)
// Should have called reserve once
scaffold.cluster.AssertCalled(t, "Reserve", scaffold.orderID, mock.Anything)

require.IsType(t, &mtypes.MsgCreateBid{}, broadcast)

createBidMsg := broadcast.(*mtypes.MsgCreateBid)
createBidMsg := requireMsgType[*mtypes.MsgCreateBid](t, broadcast)

require.Equal(t, createBidMsg.Order, scaffold.orderID)

Expand Down Expand Up @@ -251,9 +260,8 @@ func Test_BidOrderAndUnreserveOnTimeout(t *testing.T) {
// Should have called reserve once
scaffold.cluster.AssertCalled(t, "Reserve", scaffold.orderID, mock.Anything)

require.IsType(t, &mtypes.MsgCreateBid{}, broadcast)
createBidMsg := requireMsgType[*mtypes.MsgCreateBid](t, broadcast)

createBidMsg := broadcast.(*mtypes.MsgCreateBid)
require.Equal(t, createBidMsg.Order, scaffold.orderID)

priceDenom := createBidMsg.Price.Denom
Expand All @@ -266,7 +274,8 @@ func Test_BidOrderAndUnreserveOnTimeout(t *testing.T) {
// After the broadcast call the timeout should take effect
// and then close the bid, unreserving capacity in the process
broadcast = testutil.ChannelWaitForValue(t, scaffold.broadcasts)
require.IsType(t, &mtypes.MsgCloseBid{}, broadcast)

_ = requireMsgType[*mtypes.MsgCloseBid](t, broadcast)

// After the broadcast call shut down happens automatically
order.lc.Shutdown(nil)
Expand Down Expand Up @@ -371,9 +380,8 @@ func Test_BidOrderAndThenLeaseCreated(t *testing.T) {

// Wait for first broadcast
broadcast := testutil.ChannelWaitForValue(t, scaffold.broadcasts)
require.IsType(t, &mtypes.MsgCreateBid{}, broadcast)
createBidMsg := requireMsgType[*mtypes.MsgCreateBid](t, broadcast)

createBidMsg := broadcast.(*mtypes.MsgCreateBid)
require.Equal(t, createBidMsg.Order, scaffold.orderID)
priceDenom := createBidMsg.Price.Denom
require.Equal(t, testutil.CoinDenom, priceDenom)
Expand Down Expand Up @@ -414,9 +422,9 @@ func Test_BidOrderAndThenLeaseCreatedForDifferentDeployment(t *testing.T) {

// Should have called reserve once
scaffold.cluster.AssertCalled(t, "Reserve", scaffold.orderID, mock.Anything)
require.IsType(t, &mtypes.MsgCreateBid{}, broadcast)

createBidMsg := broadcast.(*mtypes.MsgCreateBid)
createBidMsg := requireMsgType[*mtypes.MsgCreateBid](t, broadcast)

require.Equal(t, createBidMsg.Order, scaffold.orderID)

otherOrderID := scaffold.orderID
Expand Down Expand Up @@ -451,9 +459,8 @@ func Test_BidOrderAndThenLeaseCreatedForDifferentDeployment(t *testing.T) {
require.NotEqual(t, 0, len(txCalls))
lastBroadcast := txCalls[len(txCalls)-1]
require.Equal(t, "Broadcast", lastBroadcast.Method)
msg := lastBroadcast.Arguments[1]
require.IsType(t, &mtypes.MsgCloseBid{}, msg)
closeBidMsg := msg.(*mtypes.MsgCloseBid)

closeBidMsg := requireMsgType[*mtypes.MsgCloseBid](t, lastBroadcast.Arguments[1])

expectedBidID := mtypes.MakeBidID(order.orderID, scaffold.testAddr)
require.Equal(t, closeBidMsg.BidID, expectedBidID)
Expand Down Expand Up @@ -504,15 +511,13 @@ func Test_ShouldNotBidWhenAlreadySet(t *testing.T) {
// Should have called unreserve during shutdown
scaffold.cluster.AssertCalled(t, "Unreserve", scaffold.orderID, mock.Anything)

var broadcast sdk.Msg
var broadcast []sdk.Msg
select {
case broadcast = <-scaffold.broadcasts:
default:
}
// Should have broadcast
require.IsType(t, &mtypes.MsgCloseBid{}, broadcast)

closeBid := broadcast.(*mtypes.MsgCloseBid)
closeBid := requireMsgType[*mtypes.MsgCloseBid](t, broadcast)

require.Equal(t, closeBid.BidID, *scaffold.bidID)
}
Expand All @@ -535,9 +540,11 @@ func Test_ShouldCloseBidWhenAlreadySetAndOld(t *testing.T) {
scaffold.cluster.AssertNotCalled(t, "Reserve", scaffold.orderID, mock.Anything)

// Should have closed the bid
scaffold.txClient.AssertCalled(t, "Broadcast", mock.Anything, &mtypes.MsgCloseBid{
expMsgs := []sdk.Msg{&mtypes.MsgCloseBid{
BidID: mtypes.MakeBidID(order.orderID, scaffold.testAddr),
})
}}

scaffold.txClient.AssertCalled(t, "Broadcast", mock.Anything, expMsgs, mock.Anything)
}

func Test_ShouldExitWhenAlreadySetAndLost(t *testing.T) {
Expand All @@ -558,9 +565,11 @@ func Test_ShouldExitWhenAlreadySetAndLost(t *testing.T) {
scaffold.cluster.AssertNotCalled(t, "Reserve", scaffold.orderID, mock.Anything)

// Should not have closed the bid
scaffold.txClient.AssertNotCalled(t, "Broadcast", mock.Anything, &mtypes.MsgCloseBid{
expMsgs := &mtypes.MsgCloseBid{
BidID: mtypes.MakeBidID(order.orderID, scaffold.testAddr),
})
}

scaffold.txClient.AssertNotCalled(t, "Broadcast", mock.Anything, expMsgs, mock.Anything)
}
func Test_ShouldCloseBidWhenAlreadySetAndThenTimeout(t *testing.T) {
pricing, err := MakeRandomRangePricing()
Expand All @@ -580,9 +589,12 @@ func Test_ShouldCloseBidWhenAlreadySetAndThenTimeout(t *testing.T) {
scaffold.cluster.AssertCalled(t, "Reserve", scaffold.orderID, mock.Anything)

// Should have closed the bid
scaffold.txClient.AssertCalled(t, "Broadcast", mock.Anything, &mtypes.MsgCloseBid{
BidID: mtypes.MakeBidID(order.orderID, scaffold.testAddr),
})
expMsgs := []sdk.Msg{
&mtypes.MsgCloseBid{
BidID: mtypes.MakeBidID(order.orderID, scaffold.testAddr),
},
}
scaffold.txClient.AssertCalled(t, "Broadcast", mock.Anything, expMsgs, mock.Anything)

// Should have called unreserve
scaffold.cluster.AssertCalled(t, "Unreserve", scaffold.orderID)
Expand Down Expand Up @@ -617,7 +629,7 @@ func Test_ShouldRecognizeLeaseCreatedIfBiddingIsSkipped(t *testing.T) {
// Should not have called unreserve during shutdown
scaffold.cluster.AssertNotCalled(t, "Unreserve", mock.Anything, mock.Anything)

var broadcast sdk.Msg
var broadcast []sdk.Msg

select {
case broadcast = <-scaffold.broadcasts:
Expand All @@ -638,9 +650,8 @@ func Test_BidOrderUsesBidPricingStrategy(t *testing.T) {
order, scaffold, _ := makeOrderForTest(t, false, mtypes.BidStateInvalid, pricing, nil, testBidCreatedAt)

broadcast := testutil.ChannelWaitForValue(t, scaffold.broadcasts)
require.IsType(t, &mtypes.MsgCreateBid{}, broadcast)
createBidMsg := requireMsgType[*mtypes.MsgCreateBid](t, broadcast)

createBidMsg := broadcast.(*mtypes.MsgCreateBid)
require.Equal(t, createBidMsg.Order, scaffold.orderID)

priceDenom := createBidMsg.Price.Denom
Expand Down Expand Up @@ -673,7 +684,7 @@ func Test_BidOrderFailsAndAborts(t *testing.T) {
// Should have called reserve once
scaffold.cluster.AssertCalled(t, "Reserve", scaffold.orderID, mock.Anything)

var broadcast sdk.Msg
var broadcast []sdk.Msg

select {
case broadcast = <-scaffold.broadcasts:
Expand Down Expand Up @@ -701,7 +712,7 @@ func Test_ShouldntBidIfOrderAttrsDontMatch(t *testing.T) {
// Should not have called reserve ever
scaffold.cluster.AssertNotCalled(t, "Reserve", scaffold.orderID, mock.Anything)

var broadcast sdk.Msg
var broadcast []sdk.Msg

select {
case broadcast = <-scaffold.broadcasts:
Expand Down
2 changes: 1 addition & 1 deletion bidengine/provider_attributes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
ptypes "github.com/akash-network/akash-api/go/node/provider/v1beta3"
akashtypes "github.com/akash-network/akash-api/go/node/types/v1beta3"

clientmocks "github.com/akash-network/node/client/mocks"
clientmocks "github.com/akash-network/akash-api/go/node/client/v1beta2/mocks"
"github.com/akash-network/node/pubsub"
"github.com/akash-network/node/testutil"

Expand Down
Loading

0 comments on commit 8c7e0b6

Please sign in to comment.