Skip to content

Commit

Permalink
fix: Infinite logs in case of non-existent stream logs (#17004)
Browse files Browse the repository at this point in the history
Signed-off-by: Noble Mittal <[email protected]>
  • Loading branch information
beingnoble03 authored Oct 19, 2024
1 parent a34e22c commit aba477f
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 2 deletions.
4 changes: 2 additions & 2 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -864,8 +864,8 @@ ORDER BY

if stream.Id > streamLog.StreamId {
s.Logger().Warningf("Found stream log for nonexistent stream: %+v", streamLog)
// This can happen on manual/failed workflow cleanup so keep going.
continue
// This can happen on manual/failed workflow cleanup so move to the next log.
break
}

// stream.Id == streamLog.StreamId
Expand Down
59 changes: 59 additions & 0 deletions go/vt/vtctl/workflow/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1833,3 +1833,62 @@ func createReadVReplicationWorkflowFunc(t *testing.T, workflowType binlogdatapb.
}, nil
}
}

// Test checks that we don't include logs from non-existent streams in the result.
// Ensures that we just skip the logs from non-existent streams and include the rest.
func TestGetWorkflowsStreamLogs(t *testing.T) {
ctx := context.Background()

sourceKeyspace := "source_keyspace"
targetKeyspace := "target_keyspace"
workflow := "test_workflow"

sourceShards := []string{"-"}
targetShards := []string{"-"}

te := newTestMaterializerEnv(t, ctx, &vtctldatapb.MaterializeSettings{
SourceKeyspace: sourceKeyspace,
TargetKeyspace: targetKeyspace,
Workflow: workflow,
TableSettings: []*vtctldatapb.TableMaterializeSettings{
{
TargetTable: "table1",
SourceExpression: fmt.Sprintf("select * from %s", "table1"),
},
{
TargetTable: "table2",
SourceExpression: fmt.Sprintf("select * from %s", "table2"),
},
},
}, sourceShards, targetShards)

logResult := sqltypes.MakeTestResult(
sqltypes.MakeTestFields("id|vrepl_id|type|state|message|created_at|updated_at|`count`", "int64|int64|varchar|varchar|varchar|varchar|varchar|int64"),
"1|0|State Change|Running|test message for non-existent 1|2006-01-02 15:04:05|2006-01-02 15:04:05|1",
"2|0|State Change|Stopped|test message for non-existent 2|2006-01-02 15:04:06|2006-01-02 15:04:06|1",
"3|1|State Change|Running|log message|2006-01-02 15:04:07|2006-01-02 15:04:07|1",
)

te.tmc.expectVRQuery(200, "select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (1) and id in (select max(id) from _vt.copy_state where vrepl_id in (1) group by vrepl_id, table_name)", &sqltypes.Result{})
te.tmc.expectVRQuery(200, "select id from _vt.vreplication where db_name = 'vt_target_keyspace' and workflow = 'test_workflow'", &sqltypes.Result{})
te.tmc.expectVRQuery(200, "select id, vrepl_id, type, state, message, created_at, updated_at, `count` from _vt.vreplication_log where vrepl_id in (1) order by vrepl_id asc, id asc", logResult)

res, err := te.ws.GetWorkflows(ctx, &vtctldatapb.GetWorkflowsRequest{
Keyspace: targetKeyspace,
Workflow: workflow,
IncludeLogs: true,
})
require.NoError(t, err)

assert.Len(t, res.Workflows, 1)
assert.NotNil(t, res.Workflows[0].ShardStreams["-/cell-0000000200"])
assert.Len(t, res.Workflows[0].ShardStreams["-/cell-0000000200"].Streams, 1)

gotLogs := res.Workflows[0].ShardStreams["-/cell-0000000200"].Streams[0].Logs

// The non-existent stream logs shouldn't be part of the result
assert.Len(t, gotLogs, 1)
assert.Equal(t, gotLogs[0].Message, "log message")
assert.Equal(t, gotLogs[0].State, "Running")
assert.Equal(t, gotLogs[0].Id, int64(3))
}

0 comments on commit aba477f

Please sign in to comment.