Skip to content
This repository was archived by the owner on Sep 30, 2024. It is now read-only.

Commit 9c0909f

Browse files
authored
Search backend: Unify job tracing in observe.go (#35685)
* Add Tags() method to jobs without traceable fields Some jobs have only a single field which is their child job tree. We want these jobs to continue to implement observableJob so that we can pass them to job.StartSpan(). This commit adds a Tags() method to these jobs which returns an empty slice, in preparation for adding Tags() to the observableJob interface. * Add Tags() to observableJob interface and add fields to spans in job.StartSpan() All jobs now have their fields added to their spans in job.StartSpan(). * Merge observableJob into Job Add Tags() method on the Job interface instead of having observableJob as a separate interface. This more clearly indicates that all jobs should be observable.
1 parent 7533ada commit 9c0909f

17 files changed

+155
-30
lines changed

internal/search/commit/commit.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,8 @@ type GitserverClient interface {
5050
}
5151

5252
func (j *CommitSearchJob) Run(ctx context.Context, clients job.RuntimeClients, stream streaming.Sender) (alert *search.Alert, err error) {
53-
tr, ctx, stream, finish := job.StartSpan(ctx, stream, j)
53+
_, ctx, stream, finish := job.StartSpan(ctx, stream, j)
5454
defer func() { finish(alert, err) }()
55-
tr.TagFields(trace.LazyFields(j.Tags))
5655

5756
if err := j.ExpandUsernames(ctx, clients.DB); err != nil {
5857
return nil, err

internal/search/job/job.go

+2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88

99
"github.com/google/zoekt"
10+
"github.com/opentracing/opentracing-go/log"
1011

1112
"github.com/sourcegraph/sourcegraph/internal/database"
1213
"github.com/sourcegraph/sourcegraph/internal/endpoint"
@@ -23,6 +24,7 @@ import (
2324
type Job interface {
2425
Run(context.Context, RuntimeClients, streaming.Sender) (*search.Alert, error)
2526
Name() string
27+
Tags() []log.Field
2628
}
2729

2830
type RuntimeClients struct {

internal/search/job/jobutil/alert.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,8 @@ type alertJob struct {
3434
}
3535

3636
func (j *alertJob) Run(ctx context.Context, clients job.RuntimeClients, stream streaming.Sender) (alert *search.Alert, err error) {
37-
tr, ctx, stream, finish := job.StartSpan(ctx, stream, j)
37+
_, ctx, stream, finish := job.StartSpan(ctx, stream, j)
3838
defer func() { finish(alert, err) }()
39-
tr.TagFields(trace.LazyFields(j.Tags))
4039

4140
start := time.Now()
4241
countingStream := streaming.NewResultCountingStream(stream)

internal/search/job/jobutil/combinators.go

+14-4
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ func (s *SequentialJob) Name() string {
3636
return "SequentialJob"
3737
}
3838

39+
func (s *SequentialJob) Tags() []log.Field {
40+
return []log.Field{}
41+
}
42+
3943
func (s *SequentialJob) Run(ctx context.Context, clients job.RuntimeClients, stream streaming.Sender) (alert *search.Alert, err error) {
4044
var maxAlerter search.MaxAlerter
4145
var errs errors.MultiError
@@ -73,6 +77,10 @@ func (p *ParallelJob) Name() string {
7377
return "ParallelJob"
7478
}
7579

80+
func (p *ParallelJob) Tags() []log.Field {
81+
return []log.Field{}
82+
}
83+
7684
func (p *ParallelJob) Run(ctx context.Context, clients job.RuntimeClients, s streaming.Sender) (alert *search.Alert, err error) {
7785
_, ctx, s, finish := job.StartSpan(ctx, s, p)
7886
defer func() { finish(alert, err) }()
@@ -110,9 +118,8 @@ type TimeoutJob struct {
110118
}
111119

112120
func (t *TimeoutJob) Run(ctx context.Context, clients job.RuntimeClients, s streaming.Sender) (alert *search.Alert, err error) {
113-
tr, ctx, s, finish := job.StartSpan(ctx, s, t)
121+
_, ctx, s, finish := job.StartSpan(ctx, s, t)
114122
defer func() { finish(alert, err) }()
115-
tr.TagFields(trace.LazyFields(t.Tags))
116123

117124
ctx, cancel := context.WithTimeout(ctx, t.timeout)
118125
defer cancel()
@@ -150,9 +157,8 @@ type LimitJob struct {
150157
}
151158

152159
func (l *LimitJob) Run(ctx context.Context, clients job.RuntimeClients, s streaming.Sender) (alert *search.Alert, err error) {
153-
tr, ctx, s, finish := job.StartSpan(ctx, s, l)
160+
_, ctx, s, finish := job.StartSpan(ctx, s, l)
154161
defer func() { finish(alert, err) }()
155-
tr.TagFields(trace.LazyFields(l.Tags))
156162

157163
ctx, s, cancel := streaming.WithLimit(ctx, s, l.limit)
158164
defer cancel()
@@ -187,3 +193,7 @@ func (e *NoopJob) Run(context.Context, job.RuntimeClients, streaming.Sender) (*s
187193
}
188194

189195
func (e *NoopJob) Name() string { return "NoopJob" }
196+
197+
func (e *NoopJob) Tags() []log.Field {
198+
return []log.Field{}
199+
}

internal/search/job/jobutil/expression_job.go

+9
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package jobutil
33
import (
44
"context"
55

6+
"github.com/opentracing/opentracing-go/log"
67
"go.uber.org/atomic"
78
"golang.org/x/sync/semaphore"
89

@@ -79,6 +80,10 @@ func (a *AndJob) Name() string {
7980
return "AndJob"
8081
}
8182

83+
func (a *AndJob) Tags() []log.Field {
84+
return []log.Field{}
85+
}
86+
8287
// NewAndJob creates a job that will run each of its child jobs and stream
8388
// deduplicated matches that were streamed by at least one of the jobs.
8489
func NewOrJob(children ...job.Job) job.Job {
@@ -171,3 +176,7 @@ func (j *OrJob) Run(ctx context.Context, clients job.RuntimeClients, stream stre
171176
func (j *OrJob) Name() string {
172177
return "OrJob"
173178
}
179+
180+
func (j *OrJob) Tags() []log.Field {
181+
return []log.Field{}
182+
}

internal/search/job/jobutil/repo_pager_job.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,8 @@ func setRepos(job job.Job, indexed *zoekt.IndexedRepoRevs, unindexed []*search.R
6060
}
6161

6262
func (p *repoPagerJob) Run(ctx context.Context, clients job.RuntimeClients, stream streaming.Sender) (alert *search.Alert, err error) {
63-
tr, ctx, stream, finish := job.StartSpan(ctx, stream, p)
63+
_, ctx, stream, finish := job.StartSpan(ctx, stream, p)
6464
defer func() { finish(alert, err) }()
65-
tr.TagFields(trace.LazyFields(p.Tags))
6665

6766
var maxAlerter search.MaxAlerter
6867

internal/search/job/jobutil/select.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,8 @@ type selectJob struct {
2424
}
2525

2626
func (j *selectJob) Run(ctx context.Context, clients job.RuntimeClients, stream streaming.Sender) (alert *search.Alert, err error) {
27-
tr, ctx, stream, finish := job.StartSpan(ctx, stream, j)
27+
_, ctx, stream, finish := job.StartSpan(ctx, stream, j)
2828
defer func() { finish(alert, err) }()
29-
tr.TagFields(trace.LazyFields(j.Tags))
3029

3130
selectingStream := streaming.WithSelect(stream, j.path)
3231
return j.child.Run(ctx, clients, selectingStream)

internal/search/job/jobutil/sub_repo_perms_job.go

+5
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"sync"
66

77
"github.com/inconshreveable/log15"
8+
"github.com/opentracing/opentracing-go/log"
89

910
"github.com/sourcegraph/sourcegraph/internal/actor"
1011
"github.com/sourcegraph/sourcegraph/internal/authz"
@@ -58,6 +59,10 @@ func (s *subRepoPermsFilterJob) Name() string {
5859
return "SubRepoPermsFilterJob"
5960
}
6061

62+
func (s *subRepoPermsFilterJob) Tags() []log.Field {
63+
return []log.Field{}
64+
}
65+
6166
// applySubRepoFiltering filters a set of matches using the provided
6267
// authz.SubRepoPermissionChecker
6368
func applySubRepoFiltering(ctx context.Context, checker authz.SubRepoPermissionChecker, matches []result.Match) ([]result.Match, error) {

internal/search/job/mockjob/job_mock.go

+115
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/search/job/observe.go

+2-5
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,11 @@ import (
1111
"github.com/sourcegraph/sourcegraph/internal/trace"
1212
)
1313

14-
type observableJob interface {
15-
Name() string
16-
}
17-
1814
type finishSpanFunc func(*search.Alert, error)
1915

20-
func StartSpan(ctx context.Context, stream streaming.Sender, job observableJob) (*trace.Trace, context.Context, streaming.Sender, finishSpanFunc) {
16+
func StartSpan(ctx context.Context, stream streaming.Sender, job Job) (*trace.Trace, context.Context, streaming.Sender, finishSpanFunc) {
2117
tr, ctx := trace.New(ctx, job.Name(), "")
18+
tr.TagFields(trace.LazyFields(job.Tags))
2219

2320
observingStream := newObservingStream(tr, stream)
2421

internal/search/repos/excluded_job.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,8 @@ type ComputeExcludedReposJob struct {
1616
}
1717

1818
func (c *ComputeExcludedReposJob) Run(ctx context.Context, clients job.RuntimeClients, s streaming.Sender) (alert *search.Alert, err error) {
19-
tr, ctx, s, finish := job.StartSpan(ctx, s, c)
19+
_, ctx, s, finish := job.StartSpan(ctx, s, c)
2020
defer func() { finish(alert, err) }()
21-
tr.TagFields(trace.LazyFields(c.Tags))
2221

2322
excluded, err := computeExcludedRepos(ctx, clients.DB, c.RepoOpts)
2423
if err != nil {

internal/search/run/repository.go

-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ type RepoSearchJob struct {
2727
func (s *RepoSearchJob) Run(ctx context.Context, clients job.RuntimeClients, stream streaming.Sender) (alert *search.Alert, err error) {
2828
tr, ctx, stream, finish := job.StartSpan(ctx, stream, s)
2929
defer func() { finish(alert, err) }()
30-
tr.TagFields(trace.LazyFields(s.Tags))
3130

3231
repos := &searchrepos.Resolver{DB: clients.DB, Opts: s.RepoOpts}
3332
err = repos.Paginate(ctx, func(page *searchrepos.Resolved) error {

internal/search/searcher/search.go

-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ type SearcherJob struct {
4949
func (s *SearcherJob) Run(ctx context.Context, clients job.RuntimeClients, stream streaming.Sender) (alert *search.Alert, err error) {
5050
tr, ctx, stream, finish := job.StartSpan(ctx, stream, s)
5151
defer func() { finish(alert, err) }()
52-
tr.TagFields(trace.LazyFields(s.Tags))
5352

5453
var fetchTimeout time.Duration
5554
if len(s.Repos) == 1 || s.UseFullDeadline {

internal/search/searcher/symbol_search_job.go

-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ type SymbolSearcherJob struct {
3333
func (s *SymbolSearcherJob) Run(ctx context.Context, clients job.RuntimeClients, stream streaming.Sender) (alert *search.Alert, err error) {
3434
tr, ctx, stream, finish := job.StartSpan(ctx, stream, s)
3535
defer func() { finish(alert, err) }()
36-
tr.TagFields(trace.LazyFields(s.Tags))
3736

3837
ctx, cancel := context.WithCancel(ctx)
3938
defer cancel()

internal/search/structural/structural.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -161,9 +161,8 @@ type StructuralSearchJob struct {
161161
}
162162

163163
func (s *StructuralSearchJob) Run(ctx context.Context, clients job.RuntimeClients, stream streaming.Sender) (alert *search.Alert, err error) {
164-
tr, ctx, stream, finish := job.StartSpan(ctx, stream, s)
164+
_, ctx, stream, finish := job.StartSpan(ctx, stream, s)
165165
defer func() { finish(alert, err) }()
166-
tr.TagFields(trace.LazyFields(s.Tags))
167166

168167
repos := &searchrepos.Resolver{DB: clients.DB, Opts: s.RepoOpts}
169168
return nil, repos.Paginate(ctx, func(page *searchrepos.Resolved) error {

internal/search/zoekt/indexed_search.go

+2-4
Original file line numberDiff line numberDiff line change
@@ -527,9 +527,8 @@ type ZoektRepoSubsetSearchJob struct {
527527

528528
// ZoektSearch is a job that searches repositories using zoekt.
529529
func (z *ZoektRepoSubsetSearchJob) Run(ctx context.Context, clients job.RuntimeClients, stream streaming.Sender) (alert *search.Alert, err error) {
530-
tr, ctx, stream, finish := job.StartSpan(ctx, stream, z)
530+
_, ctx, stream, finish := job.StartSpan(ctx, stream, z)
531531
defer func() { finish(alert, err) }()
532-
tr.TagFields(trace.LazyFields(z.Tags))
533532

534533
if z.Repos == nil {
535534
return nil, nil
@@ -572,9 +571,8 @@ type ZoektGlobalSearchJob struct {
572571
}
573572

574573
func (t *ZoektGlobalSearchJob) Run(ctx context.Context, clients job.RuntimeClients, stream streaming.Sender) (alert *search.Alert, err error) {
575-
tr, ctx, stream, finish := job.StartSpan(ctx, stream, t)
574+
_, ctx, stream, finish := job.StartSpan(ctx, stream, t)
576575
defer func() { finish(alert, err) }()
577-
tr.TagFields(trace.LazyFields(t.Tags))
578576

579577
userPrivateRepos := searchrepos.PrivateReposForActor(ctx, clients.DB, t.RepoOpts)
580578
t.GlobalZoektQuery.ApplyPrivateFilter(userPrivateRepos)

0 commit comments

Comments
 (0)