Skip to content

Commit

Permalink
[release-1.2-starbucks]chore(*): cherrypick stability fix from master (
Browse files Browse the repository at this point in the history
…#49)

* fix(*): monitor subprocess status (#43)

* refactor subprocess start and stop

* refactor watch /config/filebeat-output.yml

* fix bug

* fix bug

(cherry picked from commit cae2680)

* chore(*): add mount propagation (#45)

(cherry picked from commit f0b561a)
  • Loading branch information
lichuan0620 authored and caicloud-bot committed Sep 7, 2019
1 parent 931a1de commit 354326a
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 41 deletions.
65 changes: 65 additions & 0 deletions cmd/filebeat-keeper/cmd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package main

import (
"os/exec"
"syscall"
"time"

"github.com/caicloud/nirvana/log"
)

const (
waitingTime = 60
)

type AsyncCmd struct {
cmd *exec.Cmd
waitDone chan struct{}
finished bool
}

func WrapCmd(cmd *exec.Cmd) *AsyncCmd {
return &AsyncCmd{
cmd: cmd,
waitDone: make(chan struct{}),
finished: false,
}
}

func (ac *AsyncCmd) Start() error {
if err := ac.cmd.Start(); err != nil {
return err
}

go func(ac *AsyncCmd) {
ac.cmd.Wait()
close(ac.waitDone)
ac.finished = true
}(ac)

return nil
}

func (ac *AsyncCmd) Stop() error {
log.Infoln("Send TERM signal")
if err := ac.cmd.Process.Signal(syscall.SIGTERM); err != nil {
return err
}

select {
case <-ac.waitDone:
return nil
case <-time.After(waitingTime * time.Second):
log.Infoln("Kill Process")
if err := ac.cmd.Process.Kill(); err != nil {
return err
}
}

<-ac.waitDone
return nil
}

func (ac *AsyncCmd) Exited() bool {
return ac.finished
}
136 changes: 95 additions & 41 deletions cmd/filebeat-keeper/main.go
Original file line number Diff line number Diff line change
@@ -1,75 +1,124 @@
package main

import (
"crypto/sha256"
"flag"
"fmt"
"io"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"strconv"
"sync"
"syscall"
"text/template"
"time"

"gopkg.in/yaml.v2"

"github.com/caicloud/logging-admin/pkg/util/graceful"
"github.com/caicloud/logging-admin/pkg/util/osutil"

"github.com/caicloud/nirvana/log"
"gopkg.in/fsnotify/fsnotify.v1"
)

const (
HeatlthCheckInterval = "HEATLTH_CHECK_INTERVAL"
ConfigCheckInterval = "CONFIG_CHECK_INTERVAL"
)

var (
filebeatExecutablePath = osutil.Getenv("FB_EXE_PATH", "filebeat")
srcConfigPath = osutil.Getenv("SRC_CONFIG_PATH", "/config/filebeat-output.yml")
dstConfigPath = osutil.Getenv("DST_CONFIG_PATH", "/etc/filebeat/filebeat.yml")
heatlthCheckInterval = int64(10)
configCheckInterval = int64(600)
)

// When configmap being created for the first time, following events received:
// INFO 1206-09:38:39.496+00 main.go:41 | Event: "/config/..2018_12_06_09_38_39.944532540": CREATE
// INFO 1206-09:38:39.496+00 main.go:41 | Event: "/config/..2018_12_06_09_38_39.944532540": CHMOD
// INFO 1206-09:38:39.497+00 main.go:41 | Event: "/config/filebeat-output.yml": CREATE
// INFO 1206-09:38:39.497+00 main.go:41 | Event: "/config/..data_tmp": RENAME
// INFO 1206-09:38:39.497+00 main.go:41 | Event: "/config/..data": CREATE
// INFO 1206-09:38:39.497+00 main.go:41 | Event: "/config/..2018_12_06_09_37_32.878326343": REMOVE
// When configmap being modified, following events received:
// INFO 1206-09:42:56.488+00 main.go:41 | Event: "/config/..2018_12_06_09_42_56.160544363": CREATE
// INFO 1206-09:42:56.488+00 main.go:41 | Event: "/config/..2018_12_06_09_42_56.160544363": CHMOD
// INFO 1206-09:42:56.488+00 main.go:41 | Event: "/config/..data_tmp": RENAME
// INFO 1206-09:42:56.488+00 main.go:41 | Event: "/config/..data": CREATE
// INFO 1206-09:42:56.488+00 main.go:41 | Event: "/config/..2018_12_06_09_38_39.944532540": REMOVE
func watchFileChange(path string, reloadCh chan<- struct{}) error {
w, err := fsnotify.NewWatcher()
func init() {
sec, err := strconv.ParseInt(osutil.Getenv(HeatlthCheckInterval,
strconv.FormatInt(heatlthCheckInterval, 10)), 10, 64)
if err != nil || sec < 0 {
log.Warningf("%s is Invalid, use default value %d", HeatlthCheckInterval, heatlthCheckInterval)
} else {
heatlthCheckInterval = sec
}

sec, err = strconv.ParseInt(osutil.Getenv(ConfigCheckInterval,
strconv.FormatInt(configCheckInterval, 10)), 10, 64)
if err != nil || sec < 0 {
log.Warningf("%s is Invalid, use default value %d", ConfigCheckInterval, configCheckInterval)
} else {
configCheckInterval = sec
}
}

func hashFile(path string) (string, error) {
f, err := os.Open(path)
if err != nil {
return err
return "", err
}
if err := w.Add(path); err != nil {
return err
defer f.Close()

h := sha256.New()
if _, err := io.Copy(h, f); err != nil {
return "", err
}

for {
select {
case ev := <-w.Events:
log.Infoln("Event:", ev.String())
if ev.Op&fsnotify.Create == fsnotify.Create {
if filepath.Base(ev.Name) == "..data" {
log.Infoln("Configmap updated")
reloadCh <- struct{}{}
}
}
case err := <-w.Errors:
log.Errorf("Watch error: %v", err)
return string(h.Sum(nil)), nil
}

func newFileChecker(path string, notify func()) func() {
var (
curHash string
mtx sync.Mutex
err error
)

curHash, err = hashFile(path)
if err != nil {
log.Warningln(err)
}

return func() {
mtx.Lock()
defer mtx.Unlock()

h, err := hashFile(path)
if err != nil {
log.Warningln(err)
return
}

if curHash != h {
log.Infof("file need reload, old: %x, new: %x", curHash, h)
curHash = h
notify()
}
}
}

func watchFileChange(path string, reloadCh chan<- struct{}) {
checker := newFileChecker(path, func() { reloadCh <- struct{}{} })

//watch CM
go watchConfigMapUpdate(filepath.Dir(path), checker)

//定时监测
go func(checkFile func()) {
check := time.Tick(time.Duration(configCheckInterval) * time.Second)
for range check {
checkFile()
}
}(checker)
}

func run(stopCh <-chan struct{}) error {
reloadCh := make(chan struct{}, 1)
started := false
cmd := newCmd()

go watchFileChange(filepath.Dir(srcConfigPath), reloadCh)
watchFileChange(srcConfigPath, reloadCh)

if err := applyChange(); err == nil {
reloadCh <- struct{}{}
Expand All @@ -78,11 +127,12 @@ func run(stopCh <-chan struct{}) error {
log.Infoln("Filebeat will not start until configmap being updated")
}

check := time.Tick(time.Duration(heatlthCheckInterval) * time.Second)
for {
select {
case <-stopCh:
log.Infoln("Wait filebeat shutdown")
if err := cmd.Wait(); err != nil {
if err := cmd.Stop(); err != nil {
return fmt.Errorf("filebeat quit with error: %v", err)
}
return nil
Expand All @@ -100,11 +150,7 @@ func run(stopCh <-chan struct{}) error {
log.Infoln("Filebeat start")
started = true
} else {
log.Infoln("Send TERM signal")
if err := cmd.Process.Signal(syscall.SIGTERM); err != nil {
return fmt.Errorf("error send signal: %v", err)
}
if err := cmd.Wait(); err != nil {
if err := cmd.Stop(); err != nil {
return fmt.Errorf("filebeat quit with error: %v", err)
}
log.Infoln("Filebeat quit")
Expand All @@ -114,6 +160,13 @@ func run(stopCh <-chan struct{}) error {
return fmt.Errorf("error run filebeat: %v", err)
}
}
case <-check:
if started {
if cmd != nil && cmd.Exited() {
log.Fatalln("Filebeat has unexpectedly exited")
os.Exit(1)
}
}
}
}
}
Expand Down Expand Up @@ -158,12 +211,13 @@ var (
fbArgs []string
)

func newCmd() *exec.Cmd {
func newCmd() *AsyncCmd {
log.Infof("Will run filebeat with command: %v %v", filebeatExecutablePath, fbArgs)
cmd := exec.Command(filebeatExecutablePath, fbArgs...)
cmd.Stderr = os.Stderr
cmd.Stdout = os.Stdout
return cmd

return WrapCmd(cmd)
}

func main() {
Expand Down
43 changes: 43 additions & 0 deletions cmd/filebeat-keeper/watch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package main

import (
"path/filepath"

"github.com/caicloud/nirvana/log"
"gopkg.in/fsnotify/fsnotify.v1"
)

const (
dataDirName = "..data"
)

// ref_link [https://github.com/jimmidyson/configmap-reload/issues/6#issuecomment-355203620]
// ConfigMap volumes use an atomic writer. You could familarize yourself with
// the mechanic how atomic writes are implemented. In the end you could check
// if the actual change you do in your ConfigMap results in the rename of the
// ..data-symlink (step 9).
// ref_link [https://github.com/kubernetes/kubernetes/blob/6d98cdbbfb055757a9846dee97dafd4177d9a222/pkg/volume/util/atomic_writer.go#L56]
func watchConfigMapUpdate(path string, update func()) error {
w, err := fsnotify.NewWatcher()
if err != nil {
return err
}
if err := w.Add(path); err != nil {
return err
}

for {
select {
case ev := <-w.Events:
log.Infoln("Event:", ev.String())
if ev.Op&fsnotify.Create == fsnotify.Create {
if filepath.Base(ev.Name) == dataDirName {
log.Infoln("Configmap updated")
update()
}
}
case err := <-w.Errors:
log.Errorf("Watch error: %v", err)
}
}
}
3 changes: 3 additions & 0 deletions release/logging-filebeat.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,15 @@ _config:
- name: varlog
path: /opt/filebeat
subpath: filebeat
propagation: HostToContainer
- name: varlibdocker
path: /var/lib/docker
readonly: true
propagation: HostToContainer
- name: docker-sock
path: /var/run/docker.sock
readonly: true
propagation: HostToContainer
- image: '[[ registry_release ]]/beat-exporter:v0.1.2'
imagePullPolicy: Always
resources:
Expand Down

0 comments on commit 354326a

Please sign in to comment.