项目代码地址 #
项目分析 #
- 读取日志文件
- 解析文件
- 存储解析后存储到influxdb库
- 通过grafana实时大屏幕
代码解析 #
入口main.go #
package main
import (
	"example/pkg/db"
	"example/pkg/logger"
	"example/pkg/process"
	"os"
	"os/signal"
)
func main()  {
	r := logger.NewLogger("log/access.log")
	w := db.NewDB("http://192.168.99.100:8086@admin@12345678@mydb@s")
	logProc := process.NewLogProcess(r,w)
	go logProc.Reader.Read(logProc.Rc)
	go logProc.Process()
	go logProc.Writer.Write(logProc.Wc)
	sig := make(chan os.Signal)
	signal.Notify(sig,os.Interrupt)
	<-sig
}
1、创建日志文件对象
2、创建数据库对象
3、创建处理程序对象
except包 #
异常处理包
pkg/except
code.go
package except
const (
	SUCCESS        = 1
	ERROR          = -1
)
pkg/except
message.go
var MsgFlags = map[int]string{
	SUCCESS:                        "ok",
	ERROR:                          "fail",
}
func GetMsg(code int) string {
	msg, ok := MsgFlags[code]
	if ok {
		return msg
	}
	return MsgFlags[ERROR]
}
func CheckError(err error)  {
	if err != nil {
		panic(err)
	}
}
logger包 #
读取日志文件包
pkg/logger
manage.go
package logger
func NewLogger(p string) logger {
	return logger{path:p}
}
逐条读取日志文件,写入读取字符串数据写入logprocess的rc 信道
pkg/logger
model.go
package logger
import (
	"bufio"
	"example/pkg/except"
	"io"
	"os"
	"time"
)
type Reader interface {
	Read(rc chan string)
}
type logger struct {
	path string
}
func (l logger) Read(rc chan string)  {
	fp, err := os.Open(l.path)
	except.CheckError(err)
	_, _ = fp.Seek(0, 2)
    rd := bufio.NewReader(fp)
    for {
		line,err := rd.ReadBytes('\n')
		if err == io.EOF {
			time.Sleep(time.Millisecond * 500)
			continue
		}
		except.CheckError(err)
		rc <- string(line[:len(line)-1])
	}
}
database包 #
安装influxdb,通过docker安装,注意不能安装lastest,要安装1.x版本
D:\site\gostudy> docker pull influxdb:1.7.8
D:\site\gostudy> docker run -d --name influxdb -p 8086:8086 -p 8083:8083 -v c:/docker/influxdb:/var/lib/influxdb influxdb:1.7.8
操作influxdb的包
pkg/db
manage.go
package db
func NewDB(d string) influxdb {
	return influxdb{dns:d}
}
接收logprocess的channel wc,并写入数据库
pkg/db
model.go
package db
import (
	"example/pkg/message"
	"fmt"
	"github.com/influxdata/influxdb1-client/v2"
	"log"
	"strings"
	"time"
)
type Writer interface {
	Write(wc chan *message.Message)
}
type influxdb struct {
	dns string
}
func (i influxdb) Write(wc chan *message.Message) {
	insli := strings.Split(i.dns,"@")
	fmt.Println(insli)
	c, err := client.NewHTTPClient(client.HTTPConfig{
		Addr: insli[0],
		Username: insli[1],
		Password: insli[2],
	})
	if err != nil {
		log.Fatal(err)
	}
	defer c.Close()
	for v := range wc {
		// Create a new point batch
		bp, _ := client.NewBatchPoints(client.BatchPointsConfig{
			Database:  insli[3],
			//Precision: "infinite",
			Precision: insli[4],
		})
		if err != nil {
			fmt.Println("Error: ", err.Error())
		}
		fmt.Println(wc)
		tags := map[string]string{
			"Path": v.Path,
			"Method": v.Method,
			"Status": v.Status,
			"Scheme": v.Scheme,
			"Brower": v.Browser,
			"Os": v.Os,
		}
		fields := map[string]interface{}{
			"Address": v.Address,
			"Size":   v.Size,
			"LocalTime": v.TimeLocal,
		}
		fmt.Println(tags,fields)
		pt, err := client.NewPoint("nginx_log", tags, fields, time.Now())
		if err != nil {
			fmt.Println("Error: ", err.Error())
		}
		bp.AddPoint(pt)
		err = c.Write(bp)
		if err != nil {
			fmt.Println("Error: ", err.Error())
		}
	}
}
message包 #
pkg/message
mode.go
package message
import "time"
type Message struct {
	TimeLocal time.Time
	Size int
	Address string
	Path string
	Method string
	Scheme string
	Status string
	Os string
	Browser string
}
通过正则表达式解析读取日志文件,分析出日志相应的列
pkg/message
manage.go
package message
import (
	"errors"
	"regexp"
	"strconv"
	"time"
)
func ParseMessage(m string) (*Message,error) {
	msg := &Message{}
	p := `([\d\.]*)\s+[^ ]+\s+[^ ]+\s+\[([^\]]+)\]\s+\"([^\ ]+)\s+([^ ]+)\s+([^\"]+)\"\s+(\d+)\s+(\d+)\s+\"[^\"]+\"\s+\"[^\(]*\s+\(([^\)]+)\)\s+[^ ]+\s+\([^\)]+\)\s+([^\"]+)`
	r := regexp.MustCompile(p)
	ret := r.FindStringSubmatch(m)
	if len(ret) != 10 {
		return nil,errors.New("regexp complie err")
	}
	loc, _ := time.LoadLocation("Local")
	t,_ := time.ParseInLocation("02/Jan/2006:15:04:05 +0800",ret[2],loc)
	msg.TimeLocal = t
	msg.Address = ret[1]
	msg.Method = ret[3]
	msg.Path = ret[4]
	msg.Scheme = ret[5]
	msg.Status = ret[6]
	msg.Size, _ = strconv.Atoi(ret[7])
	msg.Os = ret[8]
	msg.Browser = ret[9]
	return msg,nil
}
process包 #
pkg/process
manage.go
package process
import (
	"example/pkg/db"
	"example/pkg/logger"
	"example/pkg/message"
)
func NewLogProcess(r logger.Reader,w db.Writer) *LogProcess {
	return &LogProcess{
		Rc:         make(chan string),
		Wc:         make(chan *message.Message),
		Reader:    r,
		Writer: w,
	}
}
pkg/process
model.go
package process
import (
	"example/pkg/db"
	"example/pkg/logger"
	"example/pkg/message"
	"fmt"
	"strings"
)
type Reader interface {
	Read()
}
type LogProcess struct {
	Rc     chan string           //读取到解析传递数据
	Wc     chan *message.Message //解析到写入模块
	Reader logger.Reader
	Writer db.Writer
}
func (l *LogProcess)Process()  {
	for v := range l.Rc {
		b := strings.Trim(v, "")
		if  b== "" {
			continue
		}
		msg,err :=  message.ParseMessage(v)
		if err != nil {
			fmt.Println(err)
			continue
		}
		l.Wc <- msg
	}
}
运行 #
配置influxdb #
docker run --name influxdb -p 8086:8086 influxdb:1.7
docker exec -it influxdb bash
influx
> create database mydb //创建数据库
> show databases  //显示库
> show measurements //显示表
> CREATE USER admin WITH PASSWORD '12345678' WITH ALL PRIVILEGES //授权
> select * from nginx_log //查询数据
go build main.go