在前面文章中介绍了golang开发rpc命令行工具,今天继续后续功能的介绍。

场景描述:

1,客户端运行一个daemon程序,执行文件上传任务,使用boltdb数据库记录任务执行状态。支持继续上传未完成文件。

2,客户端命令行通过rpc调用客户端daemon程序API,进行文件上传任务管理,包括任务创建,查看,启停。

通过这个功能开发,我们将golang并发编程的goroutine协程,channel通道,context上下文, Mutex互斥锁, WaitGroup协程同步等技术得到应用。

基本知识:

WaiGroup

等待一组协程执行完成后继续向下执行,WaitGroup内部有一个计数器,从0开始计数,有3个方法:Add(),Done(), Wait()。Add()添加计数,Done()减掉一个计数,Wait()执行阻塞,直到WaitGroup数量变成0。

Select

select和channel配合使用,通过select可以监听多个channel的I/O读写事件。

如果没有default分支,select会阻塞在多个channel上,对多个channel进行监控。如果有default分支,多个channel都没有满足,则执行default分支。

Context

Golang的Context称之为上下文,用来跟踪goroutine关系链,传递通知,达到控制他们的目的。主要用法是,传递取消信号,传递数据。

下面是一个传递取消信号的使用过程,首先context.Background()返回一个空的Context,一般用于整个context tree的根节点。context.WithCancel()返回一个ctx可取消的Sub Context,作为Run的参数传入goroutine,这样可以使用ctx跟踪这个Goroutine。cancel()调用Sub Context的取消函数,向关联的Goroutine发送一个"取消"通知。在Run函数中,接收ctx.Done的cancel通知,做相关清理后退出。

Mutext互斥锁

Mutext互斥锁在同一时间只被一个goroutine访问,不区分读写。有两个方法:Lock()和Unlock()。当一个goroutine申请了Lock(),那么另一个goroutine申请Lock()时会阻塞等待直到Unlock()释放锁。

multipart/form请求

multipart/form请求是http Post方法,可以发送文件和消息,在请求的Header中包含一个特殊头信息Content-Type: multipart/form-data; boundary=,boundary的值为随机计算生成的值,用于分隔上传多个form-data的间隔。

工作池

worker pool就是线程池thread pool,在go中对应的就是goroutine协程。在线程池模型中,包括:任务队列,已完成任务队列和线程池。完成任务队列,根据实际情况判断是否需要。

代码

当命令行执行daemon命令时,执行server方法,启动daemon进程。

可以看到:

1,context.Background()作为协程树的根

2,context.WithCancel创建ctx一个可取消的sub context,

3,go pool.Run(ctx)开启一个协程,入参ctx。

4,监听进程信号sigs,关闭时执行cancel()取消函数,所有的goroutine都会同步收到这一取消信号。

启动线程池

Pool中包含:mutex互斥锁,tasks任务队列,WaitGroup用于等待所有线程池结束。taskQueue记录线程池中的任务。stopTaskID保存停止的任务。

在Run方法中,开启定时器,每10秒发送一个信号。开启workersCount数量的线程池,每开启一个waitGroup计数器加1。监听ctx.Done()取消信号,监听定时器信号,定时查询boltdb中的任务执行分发任务。

分发任务

申请排斥锁,防止多次分发相同任务,task发送给pool的tasks缓冲channel,同时在taskQueue中记录任务。

线程池

当线程退出时,执行p.wg.Done()减少waitGroup计数器数量。

这里监听四个channel:

1,p.stopCmd,当命令行停止任务时,在p.stopTaskID记录停止任务的任务ID

2,p.tasks,读取任务队列中一条任务进行处理。

3,p.results,读取任务返回的结果消息进行处理。这里申请了一个p.Lock()锁,保证前一个结果消息处理结束,再处理下一个结果消息,防止结果数据更新错误。

4,ctx.Done(),当根ctx执行取消,接收取消信号,当前线程能出。

任务处理

这里可以看到:

1,每一个业务处理过程中,都有一个判断ctx.Done(),是否协程被取消。取消则退出。

2,每一个业务处理过程中,都有一个判断sliceutil.HasUint64(p.stopTaskID, task.ID),任务是否在停止任务的slice中,有则退出。因为我们可以通过命令行暂停某一任务的执行,而在pool中无法知道某一任务是哪一个goroutine协程在执行的,所以这里通过查询并判断是否是自己在执行任务。

3,ProcessTask中会查询并等待task.StartKey有数据时再继续业务逻辑,因为LoadTaskData执行后发送到results channel消息处理会有时间延时,所以等待LoadTaskData results中的startkey更新后再继续。

4,在ProcessTask中,读取Boltdb中的图片数据时,因为通过startKey, endKey来进行seek偏移读取数据的,而在boltdb中,key保存的是字节码,所以我们根据ID读取会存在数据变多的情况。比如我们读取整型的749-751会返回749,75,750。

5,批量上传文件PostFiles

批量上传

使用mime/multipart库上传文件的基本过程

1,创建http client,ai.Requester.Client = http.DefaultClient

2,生成请求体body,通过multipart.NewWriter创建一个multipart写接口,将formdata数据和文件写入到body缓冲区中。CreateFormFile在字段名为"file"字段中添加一个文件,io.copy将源文件数据写入到CreateFormFile的文件中。writer.WriteField将k/v写入上传数据中。

2,创建http request,req, err = http.NewRequest(ar.Method, URL.String(), body)

3,添加request头信息,req.Header.Set("Content-Type", writer.FormDataContentType()),writer会自动生成文件之间的分隔字符。

4,发送请求,response, err := r.Client.Do(req)

执行