Skip to content

Commit

Permalink
Update tenantQuerierAssignments to tenantQuerierSharding
Browse files Browse the repository at this point in the history
  • Loading branch information
chencs committed Oct 18, 2024
1 parent 2e87e0a commit 9718425
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -440,9 +440,9 @@ func TestMultiDimensionalQueueAlgorithmSlowConsumerEffects(t *testing.T) {
)
require.NoError(t, err)

// NewRequestQueue constructor does not allow passing in a tree or tenantQuerierAssignments
// NewRequestQueue constructor does not allow passing in a tree or tenantQuerierSharding
// so we have to override here to use the same structures as the test case
queue.queueBroker.tenantQuerierAssignments = &tenantQuerierAssignments{
queue.queueBroker.tenantQuerierAssignments = &tenantQuerierSharding{
querierIDsSorted: make([]tree.QuerierID, 0),
tenantsByID: make(map[string]*queueTenant),
queuingAlgorithm: scenario.tqa,
Expand Down
6 changes: 3 additions & 3 deletions pkg/scheduler/queue/queue_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type tenantRequest struct {
type queueBroker struct {
tree tree.Tree

tenantQuerierAssignments *tenantQuerierAssignments
tenantQuerierAssignments *tenantQuerierSharding
querierConnections *querierConnections

maxTenantQueueSize int
Expand Down Expand Up @@ -178,8 +178,8 @@ func (qb *queueBroker) dequeueRequestForQuerier(
return request, tenant, qb.tenantQuerierAssignments.queuingAlgorithm.TenantOrderIndex(), nil
}

// below methods simply pass through to the queueBroker's tenantQuerierAssignments; this layering could be skipped
// but there is no reason to make consumers know that they need to call through to the tenantQuerierAssignments.
// below methods simply pass through to the queueBroker's tenantQuerierSharding; this layering could be skipped
// but there is no reason to make consumers know that they need to call through to the tenantQuerierSharding.

func (qb *queueBroker) addQuerierWorkerConn(conn *QuerierWorkerConn) (resharded bool) {
// if conn is for a new querier, we need to recompute tenant querier relationship; otherwise, we don't reshard
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (s querierIDSlice) Search(x tree.QuerierID) int {
return sort.Search(len(s), func(i int) bool { return s[i] >= x })
}

// tenantQuerierAssignments maintains information about tenants and connected queriers, and uses
// tenantQuerierSharding maintains information about tenants and connected queriers, and uses
// this information to update a mapping of queriers to tenants in tree.TenantQuerierQueuingAlgorithm.
// This supports dequeuing from an appropriate tenant for a given querier when shuffle-sharding is enabled.
//
Expand All @@ -55,20 +55,21 @@ func (s querierIDSlice) Search(x tree.QuerierID) int {
// - a querier connection is added or removed
// - it is detected during request enqueueing that a tenant's queriers were calculated from
// an outdated max-queriers-per-tenant value
type tenantQuerierAssignments struct {
type tenantQuerierSharding struct {
// Sorted list of querier ids, used when shuffle-sharding queriers for tenant
querierIDsSorted querierIDSlice

// Tenant information used in shuffle-sharding
tenantsByID map[string]*queueTenant

// State used by the tree queue to dequeue queries, including a tenant-querier mapping
// State used by the tree queue to dequeue queries, including a tenant-querier mapping which
// will be updated by tenantQuerierSharding when the mapping is changed.
queuingAlgorithm *tree.TenantQuerierQueuingAlgorithm
}

func newTenantQuerierAssignments() *tenantQuerierAssignments {
func newTenantQuerierAssignments() *tenantQuerierSharding {
tqqa := tree.NewTenantQuerierQueuingAlgorithm()
return &tenantQuerierAssignments{
return &tenantQuerierSharding{
querierIDsSorted: nil,
tenantsByID: map[string]*queueTenant{},
queuingAlgorithm: tqqa,
Expand All @@ -79,7 +80,7 @@ func newTenantQuerierAssignments() *tenantQuerierAssignments {
//
// New tenants are added to tenantsByID and the queuing algorithm state, and tenant-querier shards are shuffled if needed.
// Existing tenants have the tenant-querier shards shuffled only if their maxQueriers has changed.
func (tqa *tenantQuerierAssignments) createOrUpdateTenant(tenantID string, maxQueriers int) error {
func (tqa *tenantQuerierSharding) createOrUpdateTenant(tenantID string, maxQueriers int) error {
if tenantID == "" {
// empty tenantID is not allowed; "" is used for free spot
return ErrInvalidTenantID
Expand Down Expand Up @@ -119,25 +120,25 @@ func (tqa *tenantQuerierAssignments) createOrUpdateTenant(tenantID string, maxQu

// removeTenant deletes a *queueTenant from tenantsByID. Any updates to remove a tenant from queuingAlgorithm
// are managed by a tree.Tree during dequeue.
func (tqa *tenantQuerierAssignments) removeTenant(tenantID string) {
func (tqa *tenantQuerierSharding) removeTenant(tenantID string) {
tenant := tqa.tenantsByID[tenantID]
if tenant == nil {
return
}
delete(tqa.tenantsByID, tenantID)
}

// addQuerier adds the given querierID to tenantQuerierAssignments' querierIDsSorted. It does not do any checks to
// addQuerier adds the given querierID to tenantQuerierSharding' querierIDsSorted. It does not do any checks to
// validate that a querier connection matching this ID exists. That logic is handled by querierConnections,
// and coordinated by the queueBroker.
func (tqa *tenantQuerierAssignments) addQuerier(querierID string) {
func (tqa *tenantQuerierSharding) addQuerier(querierID string) {
tqa.querierIDsSorted = append(tqa.querierIDsSorted, tree.QuerierID(querierID))
sort.Sort(tqa.querierIDsSorted)
}

// removeQueriers deletes an arbitrary number of queriers from the querier connection manager, and returns true if
// the tenant-querier sharding was recomputed.
func (tqa *tenantQuerierAssignments) removeQueriers(querierIDs ...string) (resharded bool) {
func (tqa *tenantQuerierSharding) removeQueriers(querierIDs ...string) (resharded bool) {
for _, querierID := range querierIDs {
ix := tqa.querierIDsSorted.Search(tree.QuerierID(querierID))
if ix >= len(tqa.querierIDsSorted) || tqa.querierIDsSorted[ix] != tree.QuerierID(querierID) {
Expand All @@ -149,7 +150,7 @@ func (tqa *tenantQuerierAssignments) removeQueriers(querierIDs ...string) (resha
return tqa.recomputeTenantQueriers()
}

func (tqa *tenantQuerierAssignments) recomputeTenantQueriers() (resharded bool) {
func (tqa *tenantQuerierSharding) recomputeTenantQueriers() (resharded bool) {
var scratchpad []tree.QuerierID
for tenantID, tenant := range tqa.tenantsByID {
if tenant.maxQueriers > 0 && tenant.maxQueriers < len(tqa.querierIDsSorted) && scratchpad == nil {
Expand All @@ -164,7 +165,7 @@ func (tqa *tenantQuerierAssignments) recomputeTenantQueriers() (resharded bool)
return resharded
}

func (tqa *tenantQuerierAssignments) shuffleTenantQueriers(tenantID string, scratchpad []tree.QuerierID) (resharded bool) {
func (tqa *tenantQuerierSharding) shuffleTenantQueriers(tenantID string, scratchpad []tree.QuerierID) (resharded bool) {
tenant := tqa.tenantsByID[tenantID]
if tenant == nil {
return false
Expand Down Expand Up @@ -196,6 +197,6 @@ func (tqa *tenantQuerierAssignments) shuffleTenantQueriers(tenantID string, scra
return true
}

func (tqa *tenantQuerierAssignments) queriersForTenant(tenantID string) map[tree.QuerierID]struct{} {
func (tqa *tenantQuerierSharding) queriersForTenant(tenantID string) map[tree.QuerierID]struct{} {
return tqa.queuingAlgorithm.QueriersForTenant(tenantID)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (

func TestShuffleQueriers(t *testing.T) {
allQueriers := []tree.QuerierID{"a", "b", "c", "d", "e"}
tqs := &tenantQuerierAssignments{
tqs := &tenantQuerierSharding{
querierIDsSorted: allQueriers,
tenantsByID: map[string]*queueTenant{
"team-a": {
Expand Down Expand Up @@ -66,7 +66,7 @@ func TestShuffleQueriersCorrectness(t *testing.T) {
}
slices.Sort(allSortedQueriers)

tqs := tenantQuerierAssignments{
tqs := tenantQuerierSharding{
querierIDsSorted: allSortedQueriers,
tenantsByID: map[string]*queueTenant{
"team-a": {
Expand Down

0 comments on commit 9718425

Please sign in to comment.