目录
prometheus branch: master
1. 初始化rule manager
根据prometheus的evaluation_interval,规则文件,以及external_labels来更新。
1. 解析规则文件,循环遍历每个规则文件,读取groups。读取规则文件的group
2. 循环遍历group里面的规则,解析每个规则,如果规则有设置interval那么group interval取值为规则的interval,否则直接取evaluation_interval。
3. 分组(group)格式:规则文件名称;group名称,比如存在xx.yaml的规则文件,group name为yy,那么最终的分组为xx.yaml;yy
4. 循环遍历得到的group,先删掉manegr里面的该group。判断新旧group是否相等,相等则跳过,否则stop旧的group,并把旧group状态复制给新的group
5. 运行新的group,根据规则文件名称和group name以及evaluation_interval 得到evalTimestamp,再根据evalTimestamp评估告警规则。按照evaluationInterval间隔执行,每执行一次,计算missed=((now-evalTimestamp)/evaluationInterval) - 1, 重置evalTimestamp=(evalTimestamp+missed+1)*evaluationInterval, 这个直接影响告警规则评估Eval
6. 评估告警规则并发送评估group的规则
init rule manager
代码位置:初始化rule manager
queryEngine = promql.NewEngine(opts)
ruleManager = rules.NewManager(&rules.ManagerOptions{
Appendable: fanoutStorage,
TSDB: localStorage,
QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage),
NotifyFunc: sendAlerts(notifierManager, cfg.web.ExternalURL.String()),
Context: ctxRule,
ExternalURL: cfg.web.ExternalURL,
Registerer: prometheus.DefaultRegisterer,
Logger: log.With(logger, "component", "rule manager"),
OutageTolerance: time.Duration(cfg.outageTolerance),
ForGracePeriod: time.Duration(cfg.forGracePeriod),
ResendDelay: time.Duration(cfg.resendDelay),
})
rule manager状态更新
func(cfg *config.Config) error {
// Get all rule files matching the configuration paths.
var files []string
for _, pat := range cfg.RuleFiles {
fs, err := filepath.Glob(pat)
if err != nil {
// The only error can be a bad pattern.
return errors.Wrapf(err, "error retrieving rule files for %s", pat)
}
files = append(files, fs...)
}
return ruleManager.Update(
time.Duration(cfg.GlobalConfig.EvaluationInterval),
files,
cfg.GlobalConfig.ExternalLabels,
)
}
update
// Update the rule manager's state as the config requires. If
// loading the new rules failed the old rule set is restored.
func (m *Manager) Update(interval time.Duration, files []string, externalLabels labels.Labels) error {
m.mtx.Lock()
defer m.mtx.Unlock()
groups, errs := m.LoadGroups(interval, externalLabels, files...)
...
var wg sync.WaitGroup
for _, newg := range groups {
// If there is an old group with the same identifier,
// check if new group equals with the old group, if yes then skip it.
// If not equals, stop it and wait for it to finish the current iteration.
// Then copy it into the new group.
gn := groupKey(newg.file, newg.name)
oldg, ok := m.groups[gn]
delete(m.groups, gn)
if ok && oldg.Equals(newg) {
groups[gn] = oldg
continue
}
wg.Add(1)
go func(newg *Group) {
if ok {
oldg.stop()
newg.CopyState(oldg)
}
go func() {
// Wait with starting evaluation until the rule manager
// is told to run. This is necessary to avoid running
// queries against a bootstrapping storage.
<-m.block
newg.run(m.opts.Context)
}()
wg.Done()
}(newg)
}
// Stop remaining old groups.
...
wg.Wait()
m.groups = groups
return nil
}
load groups
// LoadGroups reads groups from a list of files.
func (m *Manager) LoadGroups(
interval time.Duration, externalLabels labels.Labels, filenames ...string,
) (map[string]*Group, []error) {
groups := make(map[string]*Group)
shouldRestore := !m.restored
for _, fn := range filenames {
rgs, errs := rulefmt.ParseFile(fn)
if errs != nil {
return nil, errs
}
for _, rg := range rgs.Groups {
itv := interval
if rg.Interval != 0 {
itv = time.Duration(rg.Interval)
}
rules := make([]Rule, 0, len(rg.Rules))
for _, r := range rg.Rules {
expr, err := parser.ParseExpr(r.Expr.Value)
if err != nil {
return nil, []error{errors.Wrap(err, fn)}
}
if r.Alert.Value != "" {
rules = append(rules, NewAlertingRule(
r.Alert.Value,
expr,
time.Duration(r.For),
labels.FromMap(r.Labels),
labels.FromMap(r.Annotations),
externalLabels,
m.restored,
log.With(m.logger, "alert", r.Alert),
))
continue
}
...
}
groups[groupKey(fn, rg.Name)] = NewGroup(GroupOptions{
Name: rg.Name,
File: fn,
Interval: itv,
Rules: rules,
ShouldRestore: shouldRestore,
Opts: m.opts,
done: m.done,
})
}
}
return groups, nil
}
group key
// Group names need not be unique across filenames.
func groupKey(file, name string) string {
return file + ";" + name
}
run new group
func (m *Manager) Update(interval time.Duration, files []string, externalLabels labels.Labels) error {
...
wg.Add(1)
go func(newg *Group) {
if ok {
oldg.stop()
newg.CopyState(oldg)
}
go func() {
// Wait with starting evaluation until the rule manager
// is told to run. This is necessary to avoid running
// queries against a bootstrapping storage.
<-m.block
newg.run(m.opts.Context)
}()
wg.Done()
}(newg)
...
}
evalTimestamp 最终的结果要比当前时间晚几秒或者几十秒,具体几秒取决于告警规则名称和group name。
time.Untils(evalTimestamp) 是evalTimestamp-当前时间
按照evaluationInterval间隔执行,每执行一次,计算missed=((now-evalTimestamp)/evaluationInterval) - 1
重置evalTimestamp=(evalTimestamp+missed+1)*evaluationInterval, 这个直接影响告警规则评估Eval
func (g *Group) run(ctx context.Context) {
defer close(g.terminated)
// Wait an initial amount to have consistently slotted intervals.
evalTimestamp := g.evalTimestamp().Add(g.interval)
select {
case <-time.After(time.Until(evalTimestamp)):
case <-g.done:
return
}
ctx = promql.NewOriginContext(ctx, map[string]interface{}{
"ruleGroup": map[string]string{
"file": g.File(),
"name": g.Name(),
},
})
iter := func() {
g.metrics.iterationsScheduled.Inc()
start := time.Now()
g.Eval(ctx, evalTimestamp)
timeSinceStart := time.Since(start)
g.metrics.iterationDuration.Observe(timeSinceStart.Seconds())
g.setEvaluationDuration(timeSinceStart)
g.setEvaluationTimestamp(start)
}
// The assumption here is that since the ticker was started after having
// waited for `evalTimestamp` to pass, the ticks will trigger soon
// after each `evalTimestamp + N * g.interval` occurrence.
tick := time.NewTicker(g.interval)
defer tick.Stop()
defer func() {
if !g.markStale {
return
}
go func(now time.Time) {
for _, rule := range g.seriesInPreviousEval {
for _, r := range rule {
g.staleSeries = append(g.staleSeries, r)
}
}
// That can be garbage collected at this point.
g.seriesInPreviousEval = nil
// Wait for 2 intervals to give the opportunity to renamed rules
// to insert new series in the tsdb. At this point if there is a
// renamed rule, it should already be started.
select {
case <-g.managerDone:
case <-time.After(2 * g.interval):
g.cleanupStaleSeries(now)
}
}(time.Now())
}()
iter()
if g.shouldRestore {
// If we have to restore, we wait for another Eval to finish.
// The reason behind this is, during first eval (or before it)
// we might not have enough data scraped, and recording rules would not
// have updated the latest values, on which some alerts might depend.
select {
case <-g.done:
return
case <-tick.C:
missed := (time.Since(evalTimestamp) / g.interval) - 1
if missed > 0 {
g.metrics.iterationsMissed.Add(float64(missed))
g.metrics.iterationsScheduled.Add(float64(missed))
}
evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval)
iter()
}
g.RestoreForState(time.Now())
g.shouldRestore = false
}
for {
select {
case <-g.done:
return
default:
select {
case <-g.done:
return
case <-tick.C:
missed := (time.Since(evalTimestamp) / g.interval) - 1
if missed > 0 {
g.metrics.iterationsMissed.Add(float64(missed))
g.metrics.iterationsScheduled.Add(float64(missed))
}
evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval)
iter()
}
}
}
}
计算evalTimestamp
通过规则group的file name和guoup name 取hash % evaluation_interval ,在evaluation_interval 固定的情况下,file name或者group name越长,得到的时间越大。
// evalTimestamp returns the immediately preceding consistently slotted evaluation time.
func (g *Group) evalTimestamp() time.Time {
var (
offset = int64(g.hash() % uint64(g.interval))
now = time.Now().UnixNano()
adjNow = now - offset
base = adjNow - (adjNow % int64(g.interval))
)
return time.Unix(0, base+offset).UTC()
}
评估规则并发送
// Eval runs a single evaluation cycle in which all rules are evaluated sequentially.
func (g *Group) Eval(ctx context.Context, ts time.Time) {
for i, rule := range g.rules {
...
vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL)
if err != nil {
// Canceled queries are intentional termination of queries. This normally
// happens on shutdown and thus we skip logging of any errors here.
if _, ok := err.(promql.ErrQueryCanceled); !ok {
level.Warn(g.logger).Log("msg", "Evaluating rule failed", "rule", rule, "err", err)
}
g.metrics.evalFailures.WithLabelValues(groupKey(g.File(), g.Name())).Inc()
return
}
...
}
评估
如果alert不是inactive,则修改为inactive并且给resolved赋值为evaluationInterval。
resolvedRetention=15m
通过query查询prometheus 里面的数据来进行评估。这里调用QueryFunc.
// Eval evaluates the rule expression and then creates pending alerts and fires
// or removes previously pending alerts accordingly.
func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL) (promql.Vector, error) {
res, err := query(ctx, r.vector.String(), ts)
if err != nil {
r.SetHealth(HealthBad)
r.SetLastError(err)
return nil, err
}
r.mtx.Lock()
defer r.mtx.Unlock()
// Create pending alerts for any new vector elements in the alert expression
// or update the expression value for existing elements.
resultFPs := map[uint64]struct{}{}
var vec promql.Vector
var alerts = make(map[uint64]*Alert, len(res))
for _, smpl := range res {
// Provide the alert information to the template.
l := make(map[string]string, len(smpl.Metric))
for _, lbl := range smpl.Metric {
l[lbl.Name] = lbl.Value
}
...
lb := labels.NewBuilder(smpl.Metric).Del(labels.MetricName)
....
lbs := lb.Labels()
h := lbs.Hash()
resultFPs[h] = struct{}{}
if _, ok := alerts[h]; ok {
err = fmt.Errorf("vector contains metrics with the same labelset after applying alert labels")
// We have already acquired the lock above hence using SetHealth and
// SetLastError will deadlock.
r.health = HealthBad
r.lastError = err
return nil, err
}
alerts[h] = &Alert{
Labels: lbs,
Annotations: annotations,
ActiveAt: ts,
State: StatePending,
Value: smpl.V,
}
}
for h, a := range alerts {
// Check whether we already have alerting state for the identifying label set.
// Update the last value and annotations if so, create a new alert entry otherwise.
if alert, ok := r.active[h]; ok && alert.State != StateInactive {
alert.Value = a.Value
alert.Annotations = a.Annotations
continue
}
r.active[h] = a
}
// Check if any pending alerts should be removed or fire now. Write out alert timeseries.
for fp, a := range r.active {
if _, ok := resultFPs[fp]; !ok {
// If the alert was previously firing, keep it around for a given
// retention time so it is reported as resolved to the AlertManager.
if a.State == StatePending || (!a.ResolvedAt.IsZero() && ts.Sub(a.ResolvedAt) > resolvedRetention) {
delete(r.active, fp)
}
if a.State != StateInactive {
a.State = StateInactive
a.ResolvedAt = ts
}
continue
}
...
}
// We have already acquired the lock above hence using SetHealth and
// SetLastError will deadlock.
r.health = HealthGood
r.lastError = err
return vec, nil
}
发送
// Eval runs a single evaluation cycle in which all rules are evaluated sequentially.
func (g *Group) Eval(ctx context.Context, ts time.Time) {
...
if ar, ok := rule.(*AlertingRule); ok {
ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc)
}
...
}
调用NotifyFunc
这里就会调用manager的NotifyFunc。
代码里面的resendDelay对应的是prometheus的rules.alert.resend-delay参数,这个参数默认是1m。
也就是说如果没有修改过rules.alert.resend-delay的值,那么alert的ValidUntil 值等于evalTimestamp + 4*1m。ValidUntil最终赋值给了alert的EndsAt字段(如果当前alert不是resolved)。见main.go的sendAlerts()
func (r *AlertingRule) sendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) {
alerts := []*Alert{}
r.ForEachActiveAlert(func(alert *Alert) {
if alert.needsSending(ts, resendDelay) {
alert.LastSentAt = ts
// Allow for two Eval or Alertmanager send failures.
delta := resendDelay
if interval > resendDelay {
delta = interval
}
alert.ValidUntil = ts.Add(4 * delta)
anew := *alert
alerts = append(alerts, &anew)
}
})
notifyFunc(ctx, r.vector.String(), alerts...)
}
needsSending
func (a *Alert) needsSending(ts time.Time, resendDelay time.Duration) bool {
if a.State == StatePending {
return false
}
// if an alert has been resolved since the last send, resend it
if a.ResolvedAt.After(a.LastSentAt) {
return true
}
return a.LastSentAt.Add(resendDelay).Before(ts)
}
QueryFunc
QueryFunc,这个func最终会在评估告警规则的时候用来从prometheus tsdb查询metric.
// EngineQueryFunc returns a new query function that executes instant queries against
// the given engine.
// It converts scalar into vector results.
func EngineQueryFunc(engine *promql.Engine, q storage.Queryable) QueryFunc {
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
q, err := engine.NewInstantQuery(q, qs, t)
if err != nil {
return nil, err
}
res := q.Exec(ctx)
if res.Err != nil {
return nil, res.Err
}
switch v := res.Value.(type) {
case promql.Vector:
return v, nil
case promql.Scalar:
return promql.Vector{promql.Sample{
Point: promql.Point(v),
Metric: labels.Labels{},
}}, nil
default:
return nil, errors.New("rule result is not a vector or scalar")
}
}
}
exec query
QueryFunc里面的Exec就是根据表达式查询prometheus。
// exec executes the query.
//
// At this point per query only one EvalStmt is evaluated. Alert and record
// statements are not handled by the Engine.
func (ng *Engine) exec(ctx context.Context, q *query) (v parser.Value, w storage.Warnings, err error) {
...
defer func() {
ng.queryLoggerLock.RLock()
if l := ng.queryLogger; l != nil {
params := make(map[string]interface{}, 4)
params["query"] = q.q
if eq, ok := q.Statement().(*parser.EvalStmt); ok {
params["start"] = formatDate(eq.Start)
params["end"] = formatDate(eq.End)
// The step provided by the user is in seconds.
params["step"] = int64(eq.Interval / (time.Second / time.Nanosecond))
}
f := []interface{}{"params", params}
if err != nil {
f = append(f, "error", err)
}
f = append(f, "stats", stats.NewQueryStats(q.Stats()))
if span := opentracing.SpanFromContext(ctx); span != nil {
if spanCtx, ok := span.Context().(jaeger.SpanContext); ok {
f = append(f, "spanID", spanCtx.SpanID())
}
}
if origin := ctx.Value(queryOrigin{}); origin != nil {
for k, v := range origin.(map[string]interface{}) {
f = append(f, k, v)
}
}
....
}
...
}()
...
// The base context might already be canceled on the first iteration (e.g. during shutdown).
if err := contextDone(ctx, env); err != nil {
return nil, nil, err
}
switch s := q.Statement().(type) {
case *parser.EvalStmt:
return ng.execEvalStmt(ctx, q, s)
case parser.TestStmt:
return nil, nil, s(ctx)
}
panic(errors.Errorf("promql.Engine.exec: unhandled statement of type %T", q.Statement()))
}
NotifyFunc
NotifyFunc, 这个func最终会用来推送alerts到AlertManager.
如果resolved时间非0,那么EndsAt为resolved时间,否则EndsAt为VaildUntil.即evalTimestamp+4*delta。
sendAlerts返回一个NotifyFunc。
type sender interface {
Send(alerts ...*notifier.Alert)
}
// sendAlerts implements the rules.NotifyFunc for a Notifier.
func sendAlerts(s sender, externalURL string) rules.NotifyFunc {
return func(ctx context.Context, expr string, alerts ...*rules.Alert) {
var res []*notifier.Alert
for _, alert := range alerts {
a := ¬ifier.Alert{
StartsAt: alert.FiredAt,
Labels: alert.Labels,
Annotations: alert.Annotations,
GeneratorURL: externalURL + strutil.TableLinkForExpression(expr),
}
if !alert.ResolvedAt.IsZero() {
a.EndsAt = alert.ResolvedAt
} else {
// ValidUntil = 4 * rules.alert.resend-delay(默认为1m)
a.EndsAt = alert.ValidUntil
}
res = append(res, a)
}
if len(alerts) > 0 {
s.Send(res...)
}
}
}
初始化notify manager
notifierManager = notifier.NewManager(&cfg.notifier, log.With(logger, "component", "notifier"))
运行notify manager
notify manager通知AlertManager
// Notifier.
notifierManager.Run(discoveryManagerNotify.SyncCh())
notifier run
// Run dispatches notifications continuously.
func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) {
for {
select {
case <-n.ctx.Done():
return
case ts := <-tsets:
n.reload(ts)
case <-n.more:
}
alerts := n.nextBatch()
if !n.sendAll(alerts...) {
n.metrics.dropped.Add(float64(len(alerts)))
}
// If the queue still has items left, kick off the next iteration.
if n.queueLen() > 0 {
n.setMore()
}
}
}
send all
1. 判断alerts的长度,如果长度为0,则说明已经全部发送完了。
2. 根据prometheus的版本v1或者v2把alerts生成payload。
3. 循环每个AlertManager,调用sendOne发送post请求到alrtmanager。
// sendAll sends the alerts to all configured Alertmanagers concurrently.
// It returns true if the alerts could be sent successfully to at least one Alertmanager.
func (n *Manager) sendAll(alerts ...*Alert) bool {
if len(alerts) == 0 {
return true
}
begin := time.Now()
// v1Payload and v2Payload represent 'alerts' marshaled for Alertmanager API
// v1 or v2. Marshaling happens below. Reference here is for caching between
// for loop iterations.
...
for _, ams := range amSets {
var (
payload []byte
err error
)
ams.mtx.RLock()
switch ams.cfg.APIVersion {
case config.AlertmanagerAPIVersionV1:
{
...
payload = v1Payload
}
case config.AlertmanagerAPIVersionV2:
{
...
payload = v2Payload
}
....
for _, am := range ams.ams {
wg.Add(1)
ctx, cancel := context.WithTimeout(n.ctx, time.Duration(ams.cfg.Timeout))
defer cancel()
go func(client *http.Client, url string) {
if err := n.sendOne(ctx, client, url, payload); err != nil {
level.Error(n.logger).Log("alertmanager", url, "count", len(alerts), "msg", "Error sending alert", "err", err)
n.metrics.errors.WithLabelValues(url).Inc()
} else {
atomic.AddUint64(&numSuccess, 1)
}
....
}(ams.client, am.url().String())
}
ams.mtx.RUnlock()
}
wg.Wait()
return numSuccess > 0
}
sendOne
发送post请求到alertmanager
func (n *Manager) sendOne(ctx context.Context, c *http.Client, url string, b []byte) error {
req, err := http.NewRequest("POST", url, bytes.NewReader(b))
if err != nil {
return err
}
req.Header.Set("User-Agent", userAgent)
req.Header.Set("Content-Type", contentTypeJSON)
resp, err := n.opts.Do(ctx, c, req)
if err != nil {
return err
}
defer func() {
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
}()
// Any HTTP status 2xx is OK.
if resp.StatusCode/100 != 2 {
return errors.Errorf("bad response status %s", resp.Status)
}
return nil
}