下面我能看下cadence的helloword例子的源码,它包含两个文件,第一个文件是启动程序,第二个定义了workflow和activity
package mainimport ("flag""time""github.com/pborman/uuid""go.uber.org/cadence/client""go.uber.org/cadence/worker"common "exp1/common")// This needs to be done as part of a bootstrap step when the process starts.// The workers are supposed to be long running.func startWorkers(h *common.SampleHelper) {// Configure worker options.workerOptions := worker.Options{MetricsScope: h.WorkerMetricScope,Logger: h.Logger,FeatureFlags: client.FeatureFlags{WorkflowExecutionAlreadyCompletedErrorEnabled: true,},}h.StartWorkers(h.Config.DomainName, ApplicationName, workerOptions)}func startShadower(h *common.SampleHelper) {workerOptions := worker.Options{MetricsScope: h.WorkerMetricScope,Logger: h.Logger,EnableShadowWorker: true,ShadowOptions: worker.ShadowOptions{WorkflowTypes: []string{helloWorldWorkflowName},WorkflowStatus: []string{"Completed"},ExitCondition: worker.ShadowExitCondition{ShadowCount: 10,},},}h.StartWorkers(h.Config.DomainName, ApplicationName, workerOptions)}func startWorkflow(h *common.SampleHelper) {workflowOptions := client.StartWorkflowOptions{ID: "helloworld_" + uuid.New(),TaskList: ApplicationName,ExecutionStartToCloseTimeout: time.Minute,DecisionTaskStartToCloseTimeout: time.Minute,}h.StartWorkflow(workflowOptions, helloWorldWorkflowName, "Cadence")}func registerWorkflowAndActivity(h *common.SampleHelper,) {h.RegisterWorkflowWithAlias(helloWorldWorkflow, helloWorldWorkflowName)h.RegisterActivity(helloWorldActivity)}func main() {var mode stringflag.StringVar(&mode, "m", "trigger", "Mode is worker, trigger or shadower.")flag.Parse()var h common.SampleHelperh.SetupServiceConfig()switch mode {case "worker":registerWorkflowAndActivity(&h)startWorkers(&h)// The workers are supposed to be long running process that should not exit.// Use select{} to block indefinitely for samples, you can quit by CMD+C.select {}case "shadower":registerWorkflowAndActivity(&h)startShadower(&h)select {}case "trigger":startWorkflow(&h)}}
package mainimport ("context""time""go.uber.org/cadence/activity""go.uber.org/cadence/workflow""go.uber.org/zap")/*** This is the hello world workflow sample.*/// ApplicationName is the task list for this sampleconst ApplicationName = "helloWorldGroup"const helloWorldWorkflowName = "helloWorldWorkflow"// helloWorkflow workflow deciderfunc helloWorldWorkflow(ctx workflow.Context, name string) error {ao := workflow.ActivityOptions{ScheduleToStartTimeout: time.Minute,StartToCloseTimeout: time.Minute,HeartbeatTimeout: time.Second * 20,}ctx = workflow.WithActivityOptions(ctx, ao)logger := workflow.GetLogger(ctx)logger.Info("helloworld workflow started")var helloworldResult stringerr := workflow.ExecuteActivity(ctx, helloWorldActivity, name).Get(ctx, &helloworldResult)if err != nil {logger.Error("Activity failed.", zap.Error(err))return err}// Adding a new activity to the workflow will result in a non-determinstic change for the workflow// Please check https://cadenceworkflow.io/docs/go-client/workflow-versioning/ for more information//// Un-commenting the following code and the TestReplayWorkflowHistoryFromFile in replay_test.go// will fail due to the non-determinstic change//// If you have a completed workflow execution without the following code and run the// TestWorkflowShadowing in shadow_test.go or start the worker in shadow mode (using -m shadower)// those two shadowing check will also fail due to the non-deterministic change//// err := workflow.ExecuteActivity(ctx, helloWorldActivity, name).Get(ctx, &helloworldResult)// if err != nil {// logger.Error("Activity failed.", zap.Error(err))// return err// }logger.Info("Workflow completed.", zap.String("Result", helloworldResult))return nil}func helloWorldActivity(ctx context.Context, name string) (string, error) {logger := activity.GetLogger(ctx)logger.Info("helloworld activity started")return "Hello " + name + "!", nil}
启动文件里首先定义了相关的各种配置,包括日志配和监控配置,监控采用的是prometheus。设置配置的函数是h.SetupServiceConfig(),位于common/sample_helper.go
type (// SampleHelper class for workflow sample helper.SampleHelper struct {Service workflowserviceclient.InterfaceWorkerMetricScope tally.ScopeServiceMetricScope tally.ScopeLogger *zap.LoggerConfig ConfigurationBuilder *WorkflowClientBuilderDataConverter encoded.DataConverterCtxPropagators []workflow.ContextPropagatorworkflowRegistries []registryOptionactivityRegistries []registryOptionTracer opentracing.TracerconfigFile string}
// Configuration for running samples.Configuration struct {DomainName string `yaml:"domain"`ServiceName string `yaml:"service"`HostNameAndPort string `yaml:"host"`Prometheus *prometheus.Configuration `yaml:"prometheus"`}
registryOption struct {registry interface{}alias string}
其中配置文件使用的是config/development.yaml
func (h *SampleHelper) SetupServiceConfig() {if h.configFile == "" {h.configFile = defaultConfigFileif err := yaml.Unmarshal(configData, &h.Config); err != nil {logger, err := zap.NewDevelopment()if h.Config.Prometheus != nil {reporter, err := h.Config.Prometheus.NewReporter(prometheus.ConfigurationOptions{Registry: prom.NewRegistry(),h.Builder = NewBuilder(logger).SetHostPort(h.Config.HostNameAndPort).SetDomain(h.Config.DomainName).SetMetricsScope(h.ServiceMetricScope).SetDataConverter(h.DataConverter).SetTracer(h.Tracer).SetContextPropagators(h.CtxPropagators)service, err := h.Builder.BuildServiceClient()h.Service = servicedomainClient, _ := h.Builder.BuildCadenceDomainClient()_, err = domainClient.Describe(context.Background(), h.Config.DomainName)h.workflowRegistries = make([]registryOption, 0, 1)h.activityRegistries = make([]registryOption, 0, 1)
const (defaultConfigFile = "config/development.yaml"
然后是注册workflow和activity 即registerWorkflowAndActivity(&h)
h.RegisterWorkflowWithAlias(helloWorldWorkflow, helloWorldWorkflowName)
common/sample_helper.go
func (h *SampleHelper) RegisterWorkflowWithAlias(workflow interface{}, alias string) {registryOption := registryOption{registry: workflow,alias: alias,}h.workflowRegistries = append(h.workflowRegistries, registryOption)h.RegisterActivity(helloWorldActivity)
func (h *SampleHelper) RegisterActivity(activity interface{}) {h.RegisterActivityWithAlias(activity, "")}
func (h *SampleHelper) RegisterActivityWithAlias(activity interface{}, alias string) {registryOption := registryOption{registry: activity,alias: alias,}h.activityRegistries = append(h.activityRegistries, registryOption)
前面workflow.go里定义我们的workflow和activity
func helloWorldWorkflow(ctx workflow.Context, name string) error {ao := workflow.ActivityOptions{ScheduleToStartTimeout: time.Minute,StartToCloseTimeout: time.Minute,HeartbeatTimeout: time.Second * 20,}err := workflow.ExecuteActivity(ctx, helloWorldActivity, name).Get(ctx, &helloworldResult)
func helloWorldActivity(ctx context.Context, name string) (string, error) {
go.uber.org/cadence@v0.19.1/workflow/workflow.go
func ExecuteActivity(ctx Context, activity interface{}, args ...interface{}) Future {return internal.ExecuteActivity(ctx, activity, args...)}
指定完成后就可以启动任务了startWorkers(&h)
workflowOptions := client.StartWorkflowOptions{ID: "helloworld_" + uuid.New(),TaskList: ApplicationName,ExecutionStartToCloseTimeout: time.Minute,DecisionTaskStartToCloseTimeout: time.Minute,}h.StartWorkflow(workflowOptions, helloWorldWorkflowName, "Cadence")
func startWorkers(h *common.SampleHelper) {workerOptions := worker.Options{MetricsScope: h.WorkerMetricScope,Logger: h.Logger,FeatureFlags: client.FeatureFlags{WorkflowExecutionAlreadyCompletedErrorEnabled: true,},}h.StartWorkers(h.Config.DomainName, ApplicationName, workerOptions)
common/sample_helper.go
func (h *SampleHelper) StartWorkers(domainName string, groupName string, options worker.Options) {worker := worker.New(h.Service, domainName, groupName, options)h.registerWorkflowAndActivity(worker)err := worker.Start()
func (h *SampleHelper) registerWorkflowAndActivity(worker worker.Worker) {for _, w := range h.workflowRegistries {if len(w.alias) == 0 {worker.RegisterWorkflow(w.registry)} else {worker.RegisterWorkflowWithOptions(w.registry, workflow.RegisterOptions{Name: w.alias})}}for _, act := range h.activityRegistries {if len(act.alias) == 0 {worker.RegisterActivity(act.registry)} else {worker.RegisterActivityWithOptions(act.registry, activity.RegisterOptions{Name: act.alias})}}
go.uber.org/cadence@v0.19.1/worker/worker.go
func New(service workflowserviceclient.Interface,domain string,taskList string,options Options,) Worker {return internal.NewWorker(service, domain, taskList, options)}
type (Worker hosts workflow and activity implementations.// Use worker.New(...) to create an instance.Worker interface {Registry// Start starts the worker in a non-blocking fashionStart() error// Run is a blocking start and cleans up resources when killed// returns error only if it fails to start the workerRun() error// Stop cleans up any resources opened by workerStop()}