前言

我们公司在各地都有分公司,分公司下面基本都有一套属于自己的hadoop集群。这时候需要一个监控,来采集一些我们需要的信息,在发生异常时候可以告知我们

hdfs

本篇主要讲解下怎么监控hdfs文件夹大小,这个需求是因为我们会把采集的数据写进hive,有时候前一天的数据会特别少,可能只有几M,这时候就属于异常情况,我们不可能每天都登录服务器去执行命令查看,这时候怎么设计一款监控程序能够告诉我们前一天的数据到底是否正常。

实现思路

我们要监控的是hadoop生态圈的各种组件,不只是hdfs。所以我们要有多模块功能,并且这个模块是根据我们的需要来决定是否调用。然后我的目录结构设这样的

目录说明
config存放了程序需要的配置文件
deploy通过playbook脚本自动构建并上传到服务器
.vscode里面定义了快速执行playbook脚本
hadoopgo package ,里面用于存放本项目的各种包

入口程序

hadoop-go.go

package main

import (
	"fmt"
	"hadoop-go/hadoop"
	"io/ioutil"
	"log"
	"os"
	"time"

	"github.com/urfave/cli"
	"gopkg.in/yaml.v2"
)

var (
	hadoopConfig map[string]hadoop.HadoopConfig
	config_path  string

	ding hadoop.Ding
)

func init() {

}

func load_config() {
	log.Print("读取配置文件:", config_path)
	f, err := os.OpenFile(config_path, os.O_RDONLY, 0444)
	if err != nil {
		log.Panic("读取配置文件失败 ", err)
	}
	data, _ := ioutil.ReadAll(f)
	err = yaml.Unmarshal(data, &hadoopConfig)
	if err != nil {
		log.Panic("格式化配置文件失败", err)
	}
}

func action(c *cli.Context) error {
	load_config()
	// fix: error caused when modifying the configuration path
	if len(c.Args()) == 0 {
		println("请选择一个配置")
		i := 0
		for k := range hadoopConfig {
			i++
			println(i, k)
		}
		return nil
	}

	config, err := hadoopConfig[c.Args()[0]]
	if !err {
		log.Panic("没有", c.Args()[0], "的配置")
	}

	// 初始化dingding监听
	if config.Dingding != "" {
		log.Println("启用钉钉,token:", config.Dingding)
		ding = hadoop.Ding{Token: config.Dingding, Msg: make(chan string, 999)}
		go ding.Send()
	}

	// 循环遍历任务加到定时任务里面
	for n, t := range config.Tasks {
		log.Println("任务:", n)
		t.Ding = &ding
		switch t.Module {
		case "hdfs":
			hadoop.Hdfs{}.Select(t)
		default:
			log.Println("没有对应的模块")
		}

	}
	time.Sleep(10 * time.Second)

	return nil
}

func main() {
	app := cli.NewApp()
	app.Name = "hadoop-go"
	app.Version = "1.0.0"
	app.Usage = "hadoop监控"
	app.Flags = []cli.Flag{
		cli.StringFlag{
			Name:        "c",
			Usage:       "配置文件路径。default: ./config/hadoop.yml",
			Value:       "./config/hadoop.yml",
			Destination: &config_path,
		},
	}
	app.Action = action
	err := app.Run(os.Args)
	if err != nil {
		fmt.Println(err)
	}
}

配置文件

可以看到的是,在入口文件我们读取了一个yaml格式的文件,然后通过yaml.Unmarshal将配置文件转换成go struct(结构体)

key描述
chengdu/xian对应的分公司名称
dingding这里可以不设置,设置的话代表启用dingding
tasks在当前地域执行哪些任务
v_report任务名,可以自定义
module要调用哪个模块,这次用的是hdfs,还会有hive,hbase等
type调用模块的什么方法
args传给模块方法的参数
crond是否启用定时任务,秒 分 时 日 月 周
desc任务描述

yaml文件如下:

chengdu:
  dingding: dingding-token
  tasks:
    v_report: 
      module: hdfs
      type: CompareSize
      args: /user/hive/warehouse/zcsy.db/v_report/ptdate={{.Yesterday}} gt 40282905455
      crond: 00 00 09 * * *
      desc: 成都{{.Yesterday}} v_report状态
xian:
  dingding: dingding-token
  tasks:
    v_report: 
      module: hdfs
      type: CompareSize
      args: /user/hive/warehouse/zcsy.db/v_report/ptdate={{.Yesterday}} gt 20282905455
      crond: 00 00 09 * * *
      desc: 西安{{.Yesterday}} v_report状态

go结构体

type HadoopConfig struct {
	Dingding string
	Tasks    map[string]Task
}
type Task struct {
	Module string
	Type   string
	Args   string
	Crond  string
	Desc   string
	Ding   *Ding
}

CompareSize方法

在执行命令的时候会对命令进行一次render,主要就是替换掉里面的变量。我们比较的是昨天的,所以要将/user/hive/warehouse/zcsy.db/v_report/ptdate={{.Yesterday}}中的{.Yesterday}}替换成昨天的日期,这里写了一个方法,以后可以通过修改这个方法来添加变量

func TempFunc() TempArgs {
	ta := TempArgs{}
	ta.Yesterday = time.Now().AddDate(0, 0, -1).Format("2006-01-02")
	return ta
}
func CompareSizeTemp(s string) string {
	/* 用来生成对应的执行命令 */

	// 用来存放生成后的模板
	var buff bytes.Buffer

	// 利用模板动态解析
	ta := TempFunc()
	tmpl, err := template.New("test").Parse(s)
	if err != nil {
		log.Panic(err)
	}

	err = tmpl.Execute(&buff, ta)
	if err != nil {
		log.Panic("模板解析错误", err)
	}
	rs := buff.String()
	log.Print("Parse before:", s, "Parse after:", rs)
	return rs

}

下面就是这CompareSize的完整内容

func (h Hdfs) CompareSize(t Task) {
	var rs bool
	var color string

	cmd := strings.Split(CompareSizeTemp(t.Args), " ")
	r := "hadoop fs -du -s " + cmd[0]

	o, err := Exec(r)
	if err != nil {
		log.Println("执行 ", r, " 失败.", err)
	}
	if len(cmd) == 3 {
		log.Println("执行命令: ", r, "运算符:", cmd[1], "对比值:", cmd[2])
		v, err := strconv.ParseInt(cmd[2], 10, 64)
		if err != nil {
			log.Panic("CompareSize类型转换错误:", cmd[2], "->", err)
		}
		rs, color = FormatCompareInfo(o, cmd[1], v)

	} else {
		log.Println("执行命令: /opt/zcsy/hadoop/bin/hadoop fs -du -s ", cmd[0])
		rs, color = FormatCompareInfo(o, "", 0)

	}

	title := CompareSizeTemp(t.Desc)
	s := `# <font color=#` + color + `>` + title + `正常</font>
		- 返回信息:  ` + o + ``
	if !rs {
		s = `# <font color=#` + color + `>` + title + `异常</font>
		- 返回信息:  ` + o + ``
	}

	// 通过chan来进行异步发送
	md_msg := Msg_MD(title, s)
	t.Ding.Msg <- md_msg

}

Exec方法是调用bash去执行render后的命令

func Exec(cmd string) (string, error) {
	log.Print("执行命令: ", cmd)
	c := exec.Command("bash", "-c", cmd)
	c.Stderr = os.Stderr
	o, err := c.Output()
	if err != nil {
		return "", err
	}
	return string(o), nil
}

FormatCompareInfo是为了格式化调用Exec方法执行后的结果为预期格式。

func FormatCompareInfo(s string, c string, v int64) (bool, string) {
	ss := strings.Split(s, " ")
	color := "00A600"
	r := true
	log.Println("分割后的比较信息", ss)
	i, err := strconv.ParseInt(ss[0], 10, 64)
	if err != nil {
		log.Panic("类型转换错误:", ss[0], "->", err)
	}
	log.Println("进行比较", i, c, v)
	switch c {
	case "gt":
		if i < v {
			color = "FF0000"
			r = false
		}
	case "lt":
		if i > v {
			color = "FF0000"
			r = false
		}
	case "eq":
		if i > v {
			color = "FF0000"
			r = false
		}
	}

	return r, color

}

这些都完成之后我们通过channel 将消息发给dingding 协程。完成本次发送。
最后的实现效果
异常情况
在这里插入图片描述
正常情况:
在这里插入图片描述
这样就实现了一个对hdfs的文件夹大小监控了。十分的简单方便。后续我会更新怎么监控hadoop其他组件,以及定时任务实现。就不在本篇介绍了。