diff --git a/cmd/filebeat-keeper/cmd.go b/cmd/filebeat-keeper/cmd.go new file mode 100644 index 00000000..bf166193 --- /dev/null +++ b/cmd/filebeat-keeper/cmd.go @@ -0,0 +1,65 @@ +package main + +import ( + "os/exec" + "syscall" + "time" + + "github.com/caicloud/nirvana/log" +) + +const ( + waitingTime = 60 +) + +type AsyncCmd struct { + cmd *exec.Cmd + waitDone chan struct{} + finished bool +} + +func WrapCmd(cmd *exec.Cmd) *AsyncCmd { + return &AsyncCmd{ + cmd: cmd, + waitDone: make(chan struct{}), + finished: false, + } +} + +func (ac *AsyncCmd) Start() error { + if err := ac.cmd.Start(); err != nil { + return err + } + + go func(ac *AsyncCmd) { + ac.cmd.Wait() + close(ac.waitDone) + ac.finished = true + }(ac) + + return nil +} + +func (ac *AsyncCmd) Stop() error { + log.Infoln("Send TERM signal") + if err := ac.cmd.Process.Signal(syscall.SIGTERM); err != nil { + return err + } + + select { + case <-ac.waitDone: + return nil + case <-time.After(waitingTime * time.Second): + log.Infoln("Kill Process") + if err := ac.cmd.Process.Kill(); err != nil { + return err + } + } + + <-ac.waitDone + return nil +} + +func (ac *AsyncCmd) Exited() bool { + return ac.finished +} diff --git a/cmd/filebeat-keeper/main.go b/cmd/filebeat-keeper/main.go index d2279ca6..a6453ca1 100644 --- a/cmd/filebeat-keeper/main.go +++ b/cmd/filebeat-keeper/main.go @@ -1,15 +1,18 @@ package main import ( + "crypto/sha256" "flag" "fmt" + "io" "io/ioutil" "os" "os/exec" "path/filepath" + "strconv" "sync" - "syscall" "text/template" + "time" "gopkg.in/yaml.v2" @@ -17,59 +20,105 @@ import ( "github.com/caicloud/logging-admin/pkg/util/osutil" "github.com/caicloud/nirvana/log" - "gopkg.in/fsnotify/fsnotify.v1" +) + +const ( + HeatlthCheckInterval = "HEATLTH_CHECK_INTERVAL" + ConfigCheckInterval = "CONFIG_CHECK_INTERVAL" ) var ( filebeatExecutablePath = osutil.Getenv("FB_EXE_PATH", "filebeat") srcConfigPath = osutil.Getenv("SRC_CONFIG_PATH", "/config/filebeat-output.yml") dstConfigPath = osutil.Getenv("DST_CONFIG_PATH", "/etc/filebeat/filebeat.yml") + heatlthCheckInterval = int64(10) + configCheckInterval = int64(600) ) -// When configmap being created for the first time, following events received: -// INFO 1206-09:38:39.496+00 main.go:41 | Event: "/config/..2018_12_06_09_38_39.944532540": CREATE -// INFO 1206-09:38:39.496+00 main.go:41 | Event: "/config/..2018_12_06_09_38_39.944532540": CHMOD -// INFO 1206-09:38:39.497+00 main.go:41 | Event: "/config/filebeat-output.yml": CREATE -// INFO 1206-09:38:39.497+00 main.go:41 | Event: "/config/..data_tmp": RENAME -// INFO 1206-09:38:39.497+00 main.go:41 | Event: "/config/..data": CREATE -// INFO 1206-09:38:39.497+00 main.go:41 | Event: "/config/..2018_12_06_09_37_32.878326343": REMOVE -// When configmap being modified, following events received: -// INFO 1206-09:42:56.488+00 main.go:41 | Event: "/config/..2018_12_06_09_42_56.160544363": CREATE -// INFO 1206-09:42:56.488+00 main.go:41 | Event: "/config/..2018_12_06_09_42_56.160544363": CHMOD -// INFO 1206-09:42:56.488+00 main.go:41 | Event: "/config/..data_tmp": RENAME -// INFO 1206-09:42:56.488+00 main.go:41 | Event: "/config/..data": CREATE -// INFO 1206-09:42:56.488+00 main.go:41 | Event: "/config/..2018_12_06_09_38_39.944532540": REMOVE -func watchFileChange(path string, reloadCh chan<- struct{}) error { - w, err := fsnotify.NewWatcher() +func init() { + sec, err := strconv.ParseInt(osutil.Getenv(HeatlthCheckInterval, + strconv.FormatInt(heatlthCheckInterval, 10)), 10, 64) + if err != nil || sec < 0 { + log.Warningf("%s is Invalid, use default value %d", HeatlthCheckInterval, heatlthCheckInterval) + } else { + heatlthCheckInterval = sec + } + + sec, err = strconv.ParseInt(osutil.Getenv(ConfigCheckInterval, + strconv.FormatInt(configCheckInterval, 10)), 10, 64) + if err != nil || sec < 0 { + log.Warningf("%s is Invalid, use default value %d", ConfigCheckInterval, configCheckInterval) + } else { + configCheckInterval = sec + } +} + +func hashFile(path string) (string, error) { + f, err := os.Open(path) if err != nil { - return err + return "", err } - if err := w.Add(path); err != nil { - return err + defer f.Close() + + h := sha256.New() + if _, err := io.Copy(h, f); err != nil { + return "", err } - for { - select { - case ev := <-w.Events: - log.Infoln("Event:", ev.String()) - if ev.Op&fsnotify.Create == fsnotify.Create { - if filepath.Base(ev.Name) == "..data" { - log.Infoln("Configmap updated") - reloadCh <- struct{}{} - } - } - case err := <-w.Errors: - log.Errorf("Watch error: %v", err) + return string(h.Sum(nil)), nil +} + +func newFileChecker(path string, notify func()) func() { + var ( + curHash string + mtx sync.Mutex + err error + ) + + curHash, err = hashFile(path) + if err != nil { + log.Warningln(err) + } + + return func() { + mtx.Lock() + defer mtx.Unlock() + + h, err := hashFile(path) + if err != nil { + log.Warningln(err) + return + } + + if curHash != h { + log.Infof("file need reload, old: %x, new: %x", curHash, h) + curHash = h + notify() } } } +func watchFileChange(path string, reloadCh chan<- struct{}) { + checker := newFileChecker(path, func() { reloadCh <- struct{}{} }) + + //watch CM + go watchConfigMapUpdate(filepath.Dir(path), checker) + + //定时监测 + go func(checkFile func()) { + check := time.Tick(time.Duration(configCheckInterval) * time.Second) + for range check { + checkFile() + } + }(checker) +} + func run(stopCh <-chan struct{}) error { reloadCh := make(chan struct{}, 1) started := false cmd := newCmd() - go watchFileChange(filepath.Dir(srcConfigPath), reloadCh) + watchFileChange(srcConfigPath, reloadCh) if err := applyChange(); err == nil { reloadCh <- struct{}{} @@ -78,11 +127,12 @@ func run(stopCh <-chan struct{}) error { log.Infoln("Filebeat will not start until configmap being updated") } + check := time.Tick(time.Duration(heatlthCheckInterval) * time.Second) for { select { case <-stopCh: log.Infoln("Wait filebeat shutdown") - if err := cmd.Wait(); err != nil { + if err := cmd.Stop(); err != nil { return fmt.Errorf("filebeat quit with error: %v", err) } return nil @@ -100,11 +150,7 @@ func run(stopCh <-chan struct{}) error { log.Infoln("Filebeat start") started = true } else { - log.Infoln("Send TERM signal") - if err := cmd.Process.Signal(syscall.SIGTERM); err != nil { - return fmt.Errorf("error send signal: %v", err) - } - if err := cmd.Wait(); err != nil { + if err := cmd.Stop(); err != nil { return fmt.Errorf("filebeat quit with error: %v", err) } log.Infoln("Filebeat quit") @@ -114,6 +160,13 @@ func run(stopCh <-chan struct{}) error { return fmt.Errorf("error run filebeat: %v", err) } } + case <-check: + if started { + if cmd != nil && cmd.Exited() { + log.Fatalln("Filebeat has unexpectedly exited") + os.Exit(1) + } + } } } } @@ -158,12 +211,13 @@ var ( fbArgs []string ) -func newCmd() *exec.Cmd { +func newCmd() *AsyncCmd { log.Infof("Will run filebeat with command: %v %v", filebeatExecutablePath, fbArgs) cmd := exec.Command(filebeatExecutablePath, fbArgs...) cmd.Stderr = os.Stderr cmd.Stdout = os.Stdout - return cmd + + return WrapCmd(cmd) } func main() { diff --git a/cmd/filebeat-keeper/watch.go b/cmd/filebeat-keeper/watch.go new file mode 100644 index 00000000..8533f782 --- /dev/null +++ b/cmd/filebeat-keeper/watch.go @@ -0,0 +1,43 @@ +package main + +import ( + "path/filepath" + + "github.com/caicloud/nirvana/log" + "gopkg.in/fsnotify/fsnotify.v1" +) + +const ( + dataDirName = "..data" +) + +// ref_link [https://github.com/jimmidyson/configmap-reload/issues/6#issuecomment-355203620] +// ConfigMap volumes use an atomic writer. You could familarize yourself with +// the mechanic how atomic writes are implemented. In the end you could check +// if the actual change you do in your ConfigMap results in the rename of the +// ..data-symlink (step 9). +// ref_link [https://github.com/kubernetes/kubernetes/blob/6d98cdbbfb055757a9846dee97dafd4177d9a222/pkg/volume/util/atomic_writer.go#L56] +func watchConfigMapUpdate(path string, update func()) error { + w, err := fsnotify.NewWatcher() + if err != nil { + return err + } + if err := w.Add(path); err != nil { + return err + } + + for { + select { + case ev := <-w.Events: + log.Infoln("Event:", ev.String()) + if ev.Op&fsnotify.Create == fsnotify.Create { + if filepath.Base(ev.Name) == dataDirName { + log.Infoln("Configmap updated") + update() + } + } + case err := <-w.Errors: + log.Errorf("Watch error: %v", err) + } + } +} diff --git a/release/logging-filebeat.yaml b/release/logging-filebeat.yaml index b2fb436c..700f11af 100644 --- a/release/logging-filebeat.yaml +++ b/release/logging-filebeat.yaml @@ -78,12 +78,15 @@ _config: - name: varlog path: /opt/filebeat subpath: filebeat + propagation: HostToContainer - name: varlibdocker path: /var/lib/docker readonly: true + propagation: HostToContainer - name: docker-sock path: /var/run/docker.sock readonly: true + propagation: HostToContainer - image: '[[ registry_release ]]/beat-exporter:v0.1.2' imagePullPolicy: Always resources: