From aba477f42aa0026c5071c59937bb1acd33fd7ddf Mon Sep 17 00:00:00 2001 From: Noble Mittal <62551163+beingnoble03@users.noreply.github.com> Date: Sat, 19 Oct 2024 15:16:25 +0530 Subject: [PATCH] fix: Infinite logs in case of non-existent stream logs (#17004) Signed-off-by: Noble Mittal --- go/vt/vtctl/workflow/server.go | 4 +- go/vt/vtctl/workflow/server_test.go | 59 +++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 2 deletions(-) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index a61f31a8241..80c8569978c 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -864,8 +864,8 @@ ORDER BY if stream.Id > streamLog.StreamId { s.Logger().Warningf("Found stream log for nonexistent stream: %+v", streamLog) - // This can happen on manual/failed workflow cleanup so keep going. - continue + // This can happen on manual/failed workflow cleanup so move to the next log. + break } // stream.Id == streamLog.StreamId diff --git a/go/vt/vtctl/workflow/server_test.go b/go/vt/vtctl/workflow/server_test.go index 8b1f09b8fde..401ed625be5 100644 --- a/go/vt/vtctl/workflow/server_test.go +++ b/go/vt/vtctl/workflow/server_test.go @@ -1833,3 +1833,62 @@ func createReadVReplicationWorkflowFunc(t *testing.T, workflowType binlogdatapb. }, nil } } + +// Test checks that we don't include logs from non-existent streams in the result. +// Ensures that we just skip the logs from non-existent streams and include the rest. +func TestGetWorkflowsStreamLogs(t *testing.T) { + ctx := context.Background() + + sourceKeyspace := "source_keyspace" + targetKeyspace := "target_keyspace" + workflow := "test_workflow" + + sourceShards := []string{"-"} + targetShards := []string{"-"} + + te := newTestMaterializerEnv(t, ctx, &vtctldatapb.MaterializeSettings{ + SourceKeyspace: sourceKeyspace, + TargetKeyspace: targetKeyspace, + Workflow: workflow, + TableSettings: []*vtctldatapb.TableMaterializeSettings{ + { + TargetTable: "table1", + SourceExpression: fmt.Sprintf("select * from %s", "table1"), + }, + { + TargetTable: "table2", + SourceExpression: fmt.Sprintf("select * from %s", "table2"), + }, + }, + }, sourceShards, targetShards) + + logResult := sqltypes.MakeTestResult( + sqltypes.MakeTestFields("id|vrepl_id|type|state|message|created_at|updated_at|`count`", "int64|int64|varchar|varchar|varchar|varchar|varchar|int64"), + "1|0|State Change|Running|test message for non-existent 1|2006-01-02 15:04:05|2006-01-02 15:04:05|1", + "2|0|State Change|Stopped|test message for non-existent 2|2006-01-02 15:04:06|2006-01-02 15:04:06|1", + "3|1|State Change|Running|log message|2006-01-02 15:04:07|2006-01-02 15:04:07|1", + ) + + te.tmc.expectVRQuery(200, "select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (1) and id in (select max(id) from _vt.copy_state where vrepl_id in (1) group by vrepl_id, table_name)", &sqltypes.Result{}) + te.tmc.expectVRQuery(200, "select id from _vt.vreplication where db_name = 'vt_target_keyspace' and workflow = 'test_workflow'", &sqltypes.Result{}) + te.tmc.expectVRQuery(200, "select id, vrepl_id, type, state, message, created_at, updated_at, `count` from _vt.vreplication_log where vrepl_id in (1) order by vrepl_id asc, id asc", logResult) + + res, err := te.ws.GetWorkflows(ctx, &vtctldatapb.GetWorkflowsRequest{ + Keyspace: targetKeyspace, + Workflow: workflow, + IncludeLogs: true, + }) + require.NoError(t, err) + + assert.Len(t, res.Workflows, 1) + assert.NotNil(t, res.Workflows[0].ShardStreams["-/cell-0000000200"]) + assert.Len(t, res.Workflows[0].ShardStreams["-/cell-0000000200"].Streams, 1) + + gotLogs := res.Workflows[0].ShardStreams["-/cell-0000000200"].Streams[0].Logs + + // The non-existent stream logs shouldn't be part of the result + assert.Len(t, gotLogs, 1) + assert.Equal(t, gotLogs[0].Message, "log message") + assert.Equal(t, gotLogs[0].State, "Running") + assert.Equal(t, gotLogs[0].Id, int64(3)) +}