日志采集分析存储

项目代码地址 #

项目地址

项目分析 #

  1. 读取日志文件
  2. 解析文件
  3. 存储解析后存储到influxdb库
  4. 通过grafana实时大屏幕

代码解析 #

入口main.go #

package main

import (
	"example/pkg/db"
	"example/pkg/logger"
	"example/pkg/process"
	"os"
	"os/signal"
)


func main()  {
	r := logger.NewLogger("log/access.log")
	w := db.NewDB("http://192.168.99.100:8086@admin@12345678@mydb@s")
	logProc := process.NewLogProcess(r,w)

	go logProc.Reader.Read(logProc.Rc)
	go logProc.Process()
	go logProc.Writer.Write(logProc.Wc)

	sig := make(chan os.Signal)
	signal.Notify(sig,os.Interrupt)
	<-sig
}

1、创建日志文件对象

2、创建数据库对象

3、创建处理程序对象

except包 #

异常处理包

pkg/except
code.go

package except

const (
	SUCCESS        = 1
	ERROR          = -1
)

pkg/except
message.go

var MsgFlags = map[int]string{
	SUCCESS:                        "ok",
	ERROR:                          "fail",
}

func GetMsg(code int) string {
	msg, ok := MsgFlags[code]
	if ok {
		return msg
	}

	return MsgFlags[ERROR]
}

func CheckError(err error)  {
	if err != nil {
		panic(err)
	}
}

logger包 #

读取日志文件包

pkg/logger
manage.go

package logger

func NewLogger(p string) logger {
	return logger{path:p}
}

逐条读取日志文件,写入读取字符串数据写入logprocess的rc 信道

pkg/logger
model.go

package logger

import (
	"bufio"
	"example/pkg/except"
	"io"
	"os"
	"time"
)

type Reader interface {
	Read(rc chan string)
}


type logger struct {
	path string
}

func (l logger) Read(rc chan string)  {
	fp, err := os.Open(l.path)
	except.CheckError(err)

	_, _ = fp.Seek(0, 2)
    rd := bufio.NewReader(fp)

    for {
		line,err := rd.ReadBytes('\n')
		if err == io.EOF {
			time.Sleep(time.Millisecond * 500)
			continue
		}
		except.CheckError(err)

		rc <- string(line[:len(line)-1])
	}

}

database包 #

安装influxdb,通过docker安装,注意不能安装lastest,要安装1.x版本

D:\site\gostudy> docker pull influxdb:1.7.8
D:\site\gostudy> docker run -d --name influxdb -p 8086:8086 -p 8083:8083 -v c:/docker/influxdb:/var/lib/influxdb influxdb:1.7.8

操作influxdb的包

pkg/db
manage.go

package db

func NewDB(d string) influxdb {
	return influxdb{dns:d}
}

接收logprocess的channel wc,并写入数据库

pkg/db
model.go

package db

import (
	"example/pkg/message"
	"fmt"
	"github.com/influxdata/influxdb1-client/v2"
	"log"
	"strings"
	"time"
)

type Writer interface {
	Write(wc chan *message.Message)
}

type influxdb struct {
	dns string
}

func (i influxdb) Write(wc chan *message.Message) {
	insli := strings.Split(i.dns,"@")
	fmt.Println(insli)
	c, err := client.NewHTTPClient(client.HTTPConfig{
		Addr: insli[0],
		Username: insli[1],
		Password: insli[2],
	})
	if err != nil {
		log.Fatal(err)
	}
	defer c.Close()

	for v := range wc {
		// Create a new point batch
		bp, _ := client.NewBatchPoints(client.BatchPointsConfig{
			Database:  insli[3],
			//Precision: "infinite",
			Precision: insli[4],
		})
		if err != nil {
			fmt.Println("Error: ", err.Error())
		}
		fmt.Println(wc)
		tags := map[string]string{
			"Path": v.Path,
			"Method": v.Method,
			"Status": v.Status,
			"Scheme": v.Scheme,
			"Brower": v.Browser,
			"Os": v.Os,
		}
		fields := map[string]interface{}{
			"Address": v.Address,
			"Size":   v.Size,
			"LocalTime": v.TimeLocal,
		}
		fmt.Println(tags,fields)
		pt, err := client.NewPoint("nginx_log", tags, fields, time.Now())
		if err != nil {
			fmt.Println("Error: ", err.Error())
		}
		bp.AddPoint(pt)

		err = c.Write(bp)
		if err != nil {
			fmt.Println("Error: ", err.Error())
		}
	}

}

message包 #

pkg/message
mode.go

package message

import "time"

type Message struct {
	TimeLocal time.Time
	Size int
	Address string
	Path string
	Method string
	Scheme string
	Status string
	Os string
	Browser string
}

通过正则表达式解析读取日志文件,分析出日志相应的列

pkg/message
manage.go

package message

import (
	"errors"
	"regexp"
	"strconv"
	"time"
)

func ParseMessage(m string) (*Message,error) {
	msg := &Message{}
	p := `([\d\.]*)\s+[^ ]+\s+[^ ]+\s+\[([^\]]+)\]\s+\"([^\ ]+)\s+([^ ]+)\s+([^\"]+)\"\s+(\d+)\s+(\d+)\s+\"[^\"]+\"\s+\"[^\(]*\s+\(([^\)]+)\)\s+[^ ]+\s+\([^\)]+\)\s+([^\"]+)`
	r := regexp.MustCompile(p)
	ret := r.FindStringSubmatch(m)

	if len(ret) != 10 {
		return nil,errors.New("regexp complie err")
	}


	loc, _ := time.LoadLocation("Local")
	t,_ := time.ParseInLocation("02/Jan/2006:15:04:05 +0800",ret[2],loc)
	msg.TimeLocal = t


	msg.Address = ret[1]
	msg.Method = ret[3]
	msg.Path = ret[4]
	msg.Scheme = ret[5]
	msg.Status = ret[6]

	msg.Size, _ = strconv.Atoi(ret[7])

	msg.Os = ret[8]
	msg.Browser = ret[9]

	return msg,nil

}

process包 #

pkg/process
manage.go

package process

import (
	"example/pkg/db"
	"example/pkg/logger"
	"example/pkg/message"
)

func NewLogProcess(r logger.Reader,w db.Writer) *LogProcess {
	return &LogProcess{
		Rc:         make(chan string),
		Wc:         make(chan *message.Message),
		Reader:    r,
		Writer: w,
	}
}

pkg/process
model.go

package process

import (
	"example/pkg/db"
	"example/pkg/logger"
	"example/pkg/message"
	"fmt"
	"strings"
)

type Reader interface {
	Read()
}

type LogProcess struct {
	Rc     chan string           //读取到解析传递数据
	Wc     chan *message.Message //解析到写入模块
	Reader logger.Reader
	Writer db.Writer
}

func (l *LogProcess)Process()  {
	for v := range l.Rc {

		b := strings.Trim(v, "")
		if  b== "" {
			continue
		}

		msg,err :=  message.ParseMessage(v)
		if err != nil {
			fmt.Println(err)
			continue
		}


		l.Wc <- msg
	}
}

运行 #

配置influxdb #

docker run --name influxdb -p 8086:8086 influxdb:1.7
docker exec -it influxdb bash
influx
> create database mydb //创建数据库
> show databases  //显示库
> show measurements //显示表
> CREATE USER admin WITH PASSWORD '12345678' WITH ALL PRIVILEGES //授权
> select * from nginx_log //查询数据


go build main.go