自定义channel队列池

定义执行接口和Task #

package task


type Job interface {
	Do()
}

type Task struct {
	Idx int
	JobQueue chan Job
}

//门面模式构造Task对象
func NewTask(idx int) *Task  {
	return &Task{JobQueue:make(chan Job),Idx:idx}
}

//运行task,引用chan *Task信道并写入task对象
func (t *Task)Run(wq chan *Task)  {
	go func() {
		for  {
			//发送到工作池通道
			wq <- t
			select {
                //接收来自Task.JobQueue的通道
				case job := <- t.JobQueue:
					job.Do()
			}
		}
	}()
}

定义工作池对象 #

package task

type Worker struct {
	TaskQueueLen int
	TaskQueue chan *Task
	JobQueue chan Job
}

//构造工作池
func NewWork(l int) *Worker  {
	return &Worker{
		TaskQueueLen: l,
		TaskQueue:    make(chan *Task,l),
		JobQueue:     make(chan Job),
	}
}

//运行工作池
func (w *Worker)Run()  {
    //启动TaskQueueLen个goroutine
	for i := 0; i < w.TaskQueueLen ; i++  {
		t := NewTask(i)
		t.Run(w.TaskQueue)
	}

	go func() {
		for{
			select {
                //接收主函数写入信道,传递的是执行Do方法的对象
				case job := <- w.JobQueue:
                	//上面的t.Run写入信道,从这里读出,这样task就是*Task的指针对象
					task := <- w.TaskQueue
                	//把执行do方法的对象写入到Task.JobQueue信道中,Task中的JobQueue接收执行
					task.JobQueue <- job
			}
		}
	}()
}

优化修改 #

修改worker,taskQueue为一个chan chan job的类型

type Worker struct {
	TaskQueueLen int
	TaskQueue chan chan Job //不定义channel Task,而是定义了一个chan chan job的信道
	JobQueue chan Job
}

修改task中Run方法,写入TaskQueue中不在是Task对象,而是Task对象的JobQueue属性

func (t *Task)Run(wq chan chan Job)  {
	go func() {
		for  {
			//发送到工作池通道
			wq <- t.JobQueue
worker -> Run -> goroutine

for{
			select {
                //接收主函数写入信道,传递的是执行Do方法的对象
				case job := <- w.JobQueue:
                	//上面的t.Run写入信道,从这里读出,这样task就是*Task的指针对象
					task := <- w.TaskQueue
                	//把执行do方法的对象写入到Task.JobQueue信道中,Task中的JobQueue接收执行
					task <- job
			}

主函数 #

package main

import (
	"charper02/task"
	"fmt"
	"runtime"
	"strconv"
	"time"
)

// 操作对象student
type Student struct {
	Name string
}

// 继承接口Job
func (s *Student)Do()  {
	fmt.Println("do " + s.Name)
}


func main()  {
    // 跟踪go tool trace trace.out
	// fp, _  := os.Create("trace.out")
	// _ = trace.Start(fp)
	
    // 统计运行时间
	start := time.Now()
    // 标志运行结束
	done := make(chan bool)

    // 启动10个goroutine处理task
	wk := task.NewWork(10)
	wk.Run()

	go func() {
        // 运行3000个执行操作
		for i := 0; i < 30000 ; i++  {
			s := &Student{Name:"z" + strconv.Itoa(i)}
			wk.JobQueue <- s
		}
        // 完成标注done为true
		done <- true
	}()

	select {
		case <-done:
	}

	defer func() {
		fmt.Println("runtime.NumGoroutine() :", runtime.NumGoroutine())
		fmt.Printf("cost[%s]\n",time.Since(start))
		//trace.Stop()
		//_ = fp.Close()
	}()
}