在前面文章中介绍了golang开发rpc命令行工具,今天继续后续功能的介绍。
场景描述:
1,客户端运行一个daemon程序,执行文件上传任务,使用boltdb数据库记录任务执行状态。支持继续上传未完成文件。
2,客户端命令行通过rpc调用客户端daemon程序API,进行文件上传任务管理,包括任务创建,查看,启停。
通过这个功能开发,我们将golang并发编程的goroutine协程,channel通道,context上下文, Mutex互斥锁, WaitGroup协程同步等技术得到应用。
基本知识:
WaiGroup
等待一组协程执行完成后继续向下执行,WaitGroup内部有一个计数器,从0开始计数,有3个方法:Add(),Done(), Wait()。Add()添加计数,Done()减掉一个计数,Wait()执行阻塞,直到WaitGroup数量变成0。
Select
select和channel配合使用,通过select可以监听多个channel的I/O读写事件。
如果没有default分支,select会阻塞在多个channel上,对多个channel进行监控。如果有default分支,多个channel都没有满足,则执行default分支。
Context
Golang的Context称之为上下文,用来跟踪goroutine关系链,传递通知,达到控制他们的目的。主要用法是,传递取消信号,传递数据。
下面是一个传递取消信号的使用过程,首先context.Background()返回一个空的Context,一般用于整个context tree的根节点。context.WithCancel()返回一个ctx可取消的Sub Context,作为Run的参数传入goroutine,这样可以使用ctx跟踪这个Goroutine。cancel()调用Sub Context的取消函数,向关联的Goroutine发送一个"取消"通知。在Run函数中,接收ctx.Done的cancel通知,做相关清理后退出。
Mutext互斥锁
Mutext互斥锁在同一时间只被一个goroutine访问,不区分读写。有两个方法:Lock()和Unlock()。当一个goroutine申请了Lock(),那么另一个goroutine申请Lock()时会阻塞等待直到Unlock()释放锁。
multipart/form请求
multipart/form请求是http Post方法,可以发送文件和消息,在请求的Header中包含一个特殊头信息Content-Type: multipart/form-data; boundary=,boundary的值为随机计算生成的值,用于分隔上传多个form-data的间隔。
工作池
worker pool就是线程池thread pool,在go中对应的就是goroutine协程。在线程池模型中,包括:任务队列,已完成任务队列和线程池。完成任务队列,根据实际情况判断是否需要。
代码
当命令行执行daemon命令时,执行server方法,启动daemon进程。
可以看到:
1,context.Background()作为协程树的根
2,context.WithCancel创建ctx一个可取消的sub context,
3,go pool.Run(ctx)开启一个协程,入参ctx。
4,监听进程信号sigs,关闭时执行cancel()取消函数,所有的goroutine都会同步收到这一取消信号。
启动线程池
Pool中包含:mutex互斥锁,tasks任务队列,WaitGroup用于等待所有线程池结束。taskQueue记录线程池中的任务。stopTaskID保存停止的任务。
在Run方法中,开启定时器,每10秒发送一个信号。开启workersCount数量的线程池,每开启一个waitGroup计数器加1。监听ctx.Done()取消信号,监听定时器信号,定时查询boltdb中的任务执行分发任务。
分发任务
申请排斥锁,防止多次分发相同任务,task发送给pool的tasks缓冲channel,同时在taskQueue中记录任务。
线程池
当线程退出时,执行p.wg.Done()减少waitGroup计数器数量。
这里监听四个channel:
1,p.stopCmd,当命令行停止任务时,在p.stopTaskID记录停止任务的任务ID
2,p.tasks,读取任务队列中一条任务进行处理。
3,p.results,读取任务返回的结果消息进行处理。这里申请了一个p.Lock()锁,保证前一个结果消息处理结束,再处理下一个结果消息,防止结果数据更新错误。
4,ctx.Done(),当根ctx执行取消,接收取消信号,当前线程能出。
任务处理
这里可以看到:
1,每一个业务处理过程中,都有一个判断ctx.Done(),是否协程被取消。取消则退出。
2,每一个业务处理过程中,都有一个判断sliceutil.HasUint64(p.stopTaskID, task.ID),任务是否在停止任务的slice中,有则退出。因为我们可以通过命令行暂停某一任务的执行,而在pool中无法知道某一任务是哪一个goroutine协程在执行的,所以这里通过查询并判断是否是自己在执行任务。
3,ProcessTask中会查询并等待task.StartKey有数据时再继续业务逻辑,因为LoadTaskData执行后发送到results channel消息处理会有时间延时,所以等待LoadTaskData results中的startkey更新后再继续。
4,在ProcessTask中,读取Boltdb中的图片数据时,因为通过startKey, endKey来进行seek偏移读取数据的,而在boltdb中,key保存的是字节码,所以我们根据ID读取会存在数据变多的情况。比如我们读取整型的749-751会返回749,75,750。
5,批量上传文件PostFiles
批量上传
使用mime/multipart库上传文件的基本过程
1,创建http client,ai.Requester.Client = http.DefaultClient
2,生成请求体body,通过multipart.NewWriter创建一个multipart写接口,将formdata数据和文件写入到body缓冲区中。CreateFormFile在字段名为"file"字段中添加一个文件,io.copy将源文件数据写入到CreateFormFile的文件中。writer.WriteField将k/v写入上传数据中。
2,创建http request,req, err = http.NewRequest(ar.Method, URL.String(), body)
3,添加request头信息,req.Header.Set("Content-Type", writer.FormDataContentType()),writer会自动生成文件之间的分隔字符。
4,发送请求,response, err := r.Client.Do(req)
执行