Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions common/monitoring/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,17 @@ func (metric *Metric) AddTag(tagName string, value string) {
metric.tags = append(metric.tags, Tag{name: tagName, value: value})
}

const (
ERROR = "error"
SUCCESS = "success"
CANCELLED = "cancelled"
TIMEOUT = "timeout"
)

func (metric *Metric) AddResult(result string) {
metric.AddTag("result", result)
}

func (metric *Metric) setField(fieldName string, field any) {
if metric.fields == nil {
metric.fields = make(FieldsType)
Expand Down
10 changes: 9 additions & 1 deletion core/environment/transition.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ package environment
import (
"errors"

"github.com/AliceO2Group/Control/core/protos"
"github.com/AliceO2Group/Control/common/monitoring"
pb "github.com/AliceO2Group/Control/core/protos"
"github.com/AliceO2Group/Control/core/task"
)

Expand Down Expand Up @@ -74,3 +75,10 @@ func (t baseTransition) check() (err error) {
func (t baseTransition) eventName() string {
return t.name
}

func (t baseTransition) transitionDoMetric(env *Environment) monitoring.Metric {
metric := monitoring.NewMetric("transition_do")
metric.AddTag("transition", t.name)
metric.AddTag("envId", env.Id().String())
return metric
}
6 changes: 6 additions & 0 deletions core/environment/transition_configure.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/AliceO2Group/Control/core/workflow"

"github.com/AliceO2Group/Control/common/event"
"github.com/AliceO2Group/Control/common/monitoring"
"github.com/AliceO2Group/Control/core/task"
"github.com/AliceO2Group/Control/core/task/taskop"
)
Expand All @@ -52,6 +53,9 @@ func (t ConfigureTransition) do(env *Environment) (err error) {
return errors.New("cannot transition in NIL environment")
}

metric := t.transitionDoMetric(env)
defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)()

wf := env.Workflow()

activeTasks := workflow.GetActiveTasks(wf)
Expand All @@ -64,9 +68,11 @@ func (t ConfigureTransition) do(env *Environment) (err error) {
incomingEv := <-env.stateChangedCh
// If some tasks failed to transition
if tasksStateErrors := incomingEv.GetTasksStateChangedError(); tasksStateErrors != nil {
metric.AddResult(monitoring.ERROR)
return tasksStateErrors
}

env.sendEnvironmentEvent(&event.EnvironmentEvent{EnvironmentID: env.Id().String(), State: "CONFIGURED"})
metric.AddResult(monitoring.SUCCESS)
return
}
6 changes: 6 additions & 0 deletions core/environment/transition_deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (

"github.com/AliceO2Group/Control/common/event"
"github.com/AliceO2Group/Control/common/logger/infologger"
"github.com/AliceO2Group/Control/common/monitoring"
"github.com/AliceO2Group/Control/core/task"
"github.com/AliceO2Group/Control/core/task/sm"
"github.com/AliceO2Group/Control/core/task/taskop"
Expand Down Expand Up @@ -66,6 +67,9 @@ func (t DeployTransition) do(env *Environment) (err error) {
return errors.New("cannot transition in NIL environment")
}

metric := t.transitionDoMetric(env)
defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)()

wf := env.Workflow()

// Skip cleanup for anything other than readout-dataflow
Expand Down Expand Up @@ -347,10 +351,12 @@ func (t DeployTransition) do(env *Environment) (err error) {
log.WithField("level", infologger.IL_Ops).
WithField("partition", env.Id().String()).
Error(err)
metric.AddResult(monitoring.ERROR)
return
}

env.sendEnvironmentEvent(&event.EnvironmentEvent{EnvironmentID: env.Id().String(), State: "DEPLOYED"})
metric.AddResult(monitoring.SUCCESS)
return
}

Expand Down
4 changes: 4 additions & 0 deletions core/environment/transition_goerror.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package environment

import (
"github.com/AliceO2Group/Control/common/monitoring"
"github.com/AliceO2Group/Control/core/controlcommands"
"github.com/AliceO2Group/Control/core/task"
"github.com/AliceO2Group/Control/core/task/sm"
Expand All @@ -44,6 +45,8 @@ type GoErrorTransition struct {
}

func (t GoErrorTransition) do(env *Environment) (err error) {
metric := t.transitionDoMetric(env)
defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)()

// we stop all tasks which are in RUNNING
toStop := env.Workflow().GetTasks().Filtered(func(t *task.Task) bool {
Expand Down Expand Up @@ -72,5 +75,6 @@ func (t GoErrorTransition) do(env *Environment) (err error) {
<-env.stateChangedCh
}

metric.AddResult(monitoring.SUCCESS)
return
}
6 changes: 6 additions & 0 deletions core/environment/transition_reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"errors"

"github.com/AliceO2Group/Control/common/event"
"github.com/AliceO2Group/Control/common/monitoring"
"github.com/AliceO2Group/Control/core/task"
"github.com/AliceO2Group/Control/core/task/sm"
"github.com/AliceO2Group/Control/core/workflow"
Expand All @@ -51,6 +52,9 @@ func (t ResetTransition) do(env *Environment) (err error) {
return errors.New("cannot transition in NIL environment")
}

metric := t.transitionDoMetric(env)
defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)()

taskmanMessage := task.NewTransitionTaskMessage(
workflow.GetActiveTasks(env.Workflow()),
sm.CONFIGURED.String(),
Expand All @@ -64,9 +68,11 @@ func (t ResetTransition) do(env *Environment) (err error) {
incomingEv := <-env.stateChangedCh
// If some tasks failed to transition
if tasksStateErrors := incomingEv.GetTasksStateChangedError(); tasksStateErrors != nil {
metric.AddResult(monitoring.ERROR)
return tasksStateErrors
}

env.sendEnvironmentEvent(&event.EnvironmentEvent{EnvironmentID: env.Id().String(), State: "RESET"})
metric.AddResult(monitoring.SUCCESS)
return
}
6 changes: 6 additions & 0 deletions core/environment/transition_startactivity.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

"github.com/AliceO2Group/Control/common/event"
"github.com/AliceO2Group/Control/common/logger/infologger"
"github.com/AliceO2Group/Control/common/monitoring"
"github.com/AliceO2Group/Control/core/controlcommands"
"github.com/AliceO2Group/Control/core/task"
"github.com/iancoleman/strcase"
Expand Down Expand Up @@ -72,6 +73,9 @@ func (t StartActivityTransition) do(env *Environment) (err error) {
return errors.New("cannot transition in NIL environment")
}

metric := t.transitionDoMetric(env)
defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)()

runNumber := env.currentRunNumber

log.WithField(infologger.Run, runNumber).
Expand Down Expand Up @@ -120,6 +124,7 @@ func (t StartActivityTransition) do(env *Environment) (err error) {
incomingEv := <-env.stateChangedCh
// If some tasks failed to transition
if tasksStateErrors := incomingEv.GetTasksStateChangedError(); tasksStateErrors != nil {
metric.AddResult(monitoring.ERROR)
return tasksStateErrors
}

Expand All @@ -133,5 +138,6 @@ func (t StartActivityTransition) do(env *Environment) (err error) {
Run: env.currentRunNumber,
})

metric.AddResult(monitoring.SUCCESS)
return
}
6 changes: 6 additions & 0 deletions core/environment/transition_stopactivity.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/AliceO2Group/Control/common/event"
"github.com/AliceO2Group/Control/common/logger/infologger"
"github.com/AliceO2Group/Control/common/monitoring"
"github.com/AliceO2Group/Control/core/controlcommands"
"github.com/AliceO2Group/Control/core/task"
"github.com/AliceO2Group/Control/core/task/sm"
Expand Down Expand Up @@ -63,6 +64,9 @@ func (t StopActivityTransition) do(env *Environment) (err error) {
return errors.New("cannot transition in NIL environment")
}

metric := t.transitionDoMetric(env)
defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)()

log.WithField(infologger.Run, env.currentRunNumber).
WithField("partition", env.Id().String()).
WithField(infologger.Level, infologger.IL_Support).
Expand Down Expand Up @@ -98,6 +102,7 @@ func (t StopActivityTransition) do(env *Environment) (err error) {
incomingEv := <-env.stateChangedCh
// If some tasks failed to transition
if tasksStateErrors := incomingEv.GetTasksStateChangedError(); tasksStateErrors != nil {
metric.AddResult(monitoring.ERROR)
return tasksStateErrors
}
env.sendEnvironmentEvent(&event.EnvironmentEvent{EnvironmentID: env.Id().String(), State: "CONFIGURED"})
Expand All @@ -107,5 +112,6 @@ func (t StopActivityTransition) do(env *Environment) (err error) {
WithField(infologger.Level, infologger.IL_Support).
Info("run stopped")

metric.AddResult(monitoring.SUCCESS)
return
}
2 changes: 1 addition & 1 deletion core/environment/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type WorkflowPublicInfo struct {
func parseWorkflowPublicInfo(workflowExpr string) (WorkflowPublicInfo, error) {
repoManager := the.RepoManager()

resolvedWorkflowPath, _, err := repoManager.GetWorkflow(workflowExpr) //Will fail if repo unknown
resolvedWorkflowPath, _, err := repoManager.GetWorkflow(workflowExpr) // Will fail if repo unknown
if err != nil {
return WorkflowPublicInfo{}, err
}
Expand Down
34 changes: 22 additions & 12 deletions core/workflow/callable/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,23 @@ func (s Calls) AwaitAll() map[*Call]error {
return errs
}

func (c *Call) callableMetric(name string) monitoring.Metric {
metric := monitoring.NewMetric(name)
metric.AddTag("runtype", c.getRunTypeTag())
metric.AddTag("name", c.GetName())
metric.AddTag("trigger", c.GetTraits().Trigger)
metric.AddTag("envId", c.parentRole.GetEnvironmentId().String())
return metric
}

func (c *Call) Call() error {
log.WithField("trigger", c.Traits.Trigger).
WithField("await", c.Traits.Await).
WithField("partition", c.parentRole.GetEnvironmentId().String()).
WithField("level", infologger.IL_Devel).
Debugf("calling hook function %s", c.Func)

metric := c.newMetric("callablecall")
metric := c.callableMetric("callablecall")
defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)()

the.EventWriterWithTopic(topic.Call).WriteEvent(&evpb.Ev_CallEvent{
Expand Down Expand Up @@ -178,6 +187,7 @@ func (c *Call) Call() error {
EnvironmentId: c.parentRole.GetEnvironmentId().String(),
})

metric.AddResult(monitoring.ERROR)
return err
}
if len(returnVar) > 0 {
Expand Down Expand Up @@ -206,6 +216,7 @@ func (c *Call) Call() error {
EnvironmentId: c.parentRole.GetEnvironmentId().String(),
})

metric.AddResult(monitoring.ERROR)
return errors.New(errMsg)
}

Expand All @@ -224,32 +235,31 @@ func (c *Call) Call() error {
EnvironmentId: c.parentRole.GetEnvironmentId().String(),
})

metric.AddResult(monitoring.SUCCESS)
return nil
}

func (c *Call) newMetric(name string) monitoring.Metric {
metric := monitoring.NewMetric(name)
metric.AddTag("runtype", c.getRunTypeTag())
metric.AddTag("name", c.GetName())
metric.AddTag("trigger", c.GetTraits().Trigger)
metric.AddTag("envId", c.parentRole.GetEnvironmentId().String())
return metric
}

func (c *Call) Start() {
c.await = make(chan error)
ctx, cancel := context.WithCancel(context.Background())
c.awaitCancel = cancel
go func() {
metric := c.newMetric("callablewrapped")
metric := c.callableMetric("callablewrapped")
defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)()

callId := fmt.Sprintf("hook:%s:%s", c.GetTraits().Trigger, c.GetName())
log.Debugf("%s started", callId)
defer utils.TimeTrack(time.Now(), callId, log.WithPrefix("callable"))
err := c.Call()
select {
case c.await <- c.Call():
case c.await <- err:
if err == nil {
metric.AddResult(monitoring.SUCCESS)
} else {
metric.AddResult(monitoring.ERROR)
}
case <-ctx.Done():
metric.AddResult(monitoring.CANCELLED)
log.Debugf("%s cancelled", callId)
}
close(c.await)
Expand Down
Loading