Skip to content

Commit 835971c

Browse files
committed
Extract daemon statsCollector to its own package
Signed-off-by: Vincent Demeester <[email protected]>
1 parent 6129e6c commit 835971c

11 files changed

+205
-175
lines changed

daemon/daemon.go

+8-5
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,13 @@ import (
2626
"github.com/docker/docker/container"
2727
"github.com/docker/docker/daemon/events"
2828
"github.com/docker/docker/daemon/exec"
29-
"github.com/docker/docker/daemon/initlayer"
30-
"github.com/docker/docker/dockerversion"
31-
"github.com/docker/docker/plugin"
32-
"github.com/docker/libnetwork/cluster"
3329
// register graph drivers
3430
_ "github.com/docker/docker/daemon/graphdriver/register"
31+
"github.com/docker/docker/daemon/initlayer"
32+
"github.com/docker/docker/daemon/stats"
3533
dmetadata "github.com/docker/docker/distribution/metadata"
3634
"github.com/docker/docker/distribution/xfer"
35+
"github.com/docker/docker/dockerversion"
3736
"github.com/docker/docker/image"
3837
"github.com/docker/docker/layer"
3938
"github.com/docker/docker/libcontainerd"
@@ -46,13 +45,15 @@ import (
4645
"github.com/docker/docker/pkg/sysinfo"
4746
"github.com/docker/docker/pkg/system"
4847
"github.com/docker/docker/pkg/truncindex"
48+
"github.com/docker/docker/plugin"
4949
"github.com/docker/docker/reference"
5050
"github.com/docker/docker/registry"
5151
"github.com/docker/docker/runconfig"
5252
volumedrivers "github.com/docker/docker/volume/drivers"
5353
"github.com/docker/docker/volume/local"
5454
"github.com/docker/docker/volume/store"
5555
"github.com/docker/libnetwork"
56+
"github.com/docker/libnetwork/cluster"
5657
nwconfig "github.com/docker/libnetwork/config"
5758
"github.com/docker/libtrust"
5859
"github.com/pkg/errors"
@@ -82,7 +83,7 @@ type Daemon struct {
8283
trustKey libtrust.PrivateKey
8384
idIndex *truncindex.TruncIndex
8485
configStore *Config
85-
statsCollector *statsCollector
86+
statsCollector *stats.Collector
8687
defaultLogConfig containertypes.LogConfig
8788
RegistryService registry.Service
8889
EventsService *events.Events
@@ -106,6 +107,8 @@ type Daemon struct {
106107
clusterProvider cluster.Provider
107108
cluster Cluster
108109

110+
machineMemory uint64
111+
109112
seccompProfile []byte
110113
seccompProfilePath string
111114
}

daemon/daemon_unix.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -1125,8 +1125,8 @@ func (daemon *Daemon) stats(c *container.Container) (*types.StatsJSON, error) {
11251125
Limit: mem.Limit,
11261126
}
11271127
// if the container does not set memory limit, use the machineMemory
1128-
if mem.Limit > daemon.statsCollector.machineMemory && daemon.statsCollector.machineMemory > 0 {
1129-
s.MemoryStats.Limit = daemon.statsCollector.machineMemory
1128+
if mem.Limit > daemon.machineMemory && daemon.machineMemory > 0 {
1129+
s.MemoryStats.Limit = daemon.machineMemory
11301130
}
11311131
if cgs.PidsStats != nil {
11321132
s.PidsStats = types.PidsStats{

daemon/delete.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func (daemon *Daemon) cleanupContainer(container *container.Container, forceRemo
8989

9090
// stop collection of stats for the container regardless
9191
// if stats are currently getting collected.
92-
daemon.statsCollector.stopCollection(container)
92+
daemon.statsCollector.StopCollection(container)
9393

9494
if err = daemon.containerStop(container, 3); err != nil {
9595
return err

daemon/stats.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -133,11 +133,11 @@ func (daemon *Daemon) ContainerStats(ctx context.Context, prefixOrName string, c
133133
}
134134

135135
func (daemon *Daemon) subscribeToContainerStats(c *container.Container) chan interface{} {
136-
return daemon.statsCollector.collect(c)
136+
return daemon.statsCollector.Collect(c)
137137
}
138138

139139
func (daemon *Daemon) unsubscribeToContainerStats(c *container.Container, ch chan interface{}) {
140-
daemon.statsCollector.unsubscribe(c, ch)
140+
daemon.statsCollector.Unsubscribe(c, ch)
141141
}
142142

143143
// GetContainerStats collects all the stats published by a container

daemon/stats/collector.go

+101
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
// +build !solaris
2+
3+
package stats
4+
5+
import (
6+
"time"
7+
8+
"github.com/Sirupsen/logrus"
9+
"github.com/docker/docker/container"
10+
"github.com/docker/docker/pkg/pubsub"
11+
)
12+
13+
// Collect registers the container with the collector and adds it to
14+
// the event loop for collection on the specified interval returning
15+
// a channel for the subscriber to receive on.
16+
func (s *Collector) Collect(c *container.Container) chan interface{} {
17+
s.m.Lock()
18+
defer s.m.Unlock()
19+
publisher, exists := s.publishers[c]
20+
if !exists {
21+
publisher = pubsub.NewPublisher(100*time.Millisecond, 1024)
22+
s.publishers[c] = publisher
23+
}
24+
return publisher.Subscribe()
25+
}
26+
27+
// StopCollection closes the channels for all subscribers and removes
28+
// the container from metrics collection.
29+
func (s *Collector) StopCollection(c *container.Container) {
30+
s.m.Lock()
31+
if publisher, exists := s.publishers[c]; exists {
32+
publisher.Close()
33+
delete(s.publishers, c)
34+
}
35+
s.m.Unlock()
36+
}
37+
38+
// Unsubscribe removes a specific subscriber from receiving updates for a container's stats.
39+
func (s *Collector) Unsubscribe(c *container.Container, ch chan interface{}) {
40+
s.m.Lock()
41+
publisher := s.publishers[c]
42+
if publisher != nil {
43+
publisher.Evict(ch)
44+
if publisher.Len() == 0 {
45+
delete(s.publishers, c)
46+
}
47+
}
48+
s.m.Unlock()
49+
}
50+
51+
// Run starts the collectors and will indefinitely collect stats from the supervisor
52+
func (s *Collector) Run() {
53+
type publishersPair struct {
54+
container *container.Container
55+
publisher *pubsub.Publisher
56+
}
57+
// we cannot determine the capacity here.
58+
// it will grow enough in first iteration
59+
var pairs []publishersPair
60+
61+
for range time.Tick(s.interval) {
62+
// it does not make sense in the first iteration,
63+
// but saves allocations in further iterations
64+
pairs = pairs[:0]
65+
66+
s.m.Lock()
67+
for container, publisher := range s.publishers {
68+
// copy pointers here to release the lock ASAP
69+
pairs = append(pairs, publishersPair{container, publisher})
70+
}
71+
s.m.Unlock()
72+
if len(pairs) == 0 {
73+
continue
74+
}
75+
76+
systemUsage, err := s.getSystemCPUUsage()
77+
if err != nil {
78+
logrus.Errorf("collecting system cpu usage: %v", err)
79+
continue
80+
}
81+
82+
for _, pair := range pairs {
83+
stats, err := s.supervisor.GetContainerStats(pair.container)
84+
if err != nil {
85+
if _, ok := err.(notRunningErr); !ok {
86+
logrus.Errorf("collecting stats for %s: %v", pair.container.ID, err)
87+
}
88+
continue
89+
}
90+
// FIXME: move to containerd on Linux (not Windows)
91+
stats.CPUStats.SystemUsage = systemUsage
92+
93+
pair.publisher.Publish(*stats)
94+
}
95+
}
96+
}
97+
98+
type notRunningErr interface {
99+
error
100+
ContainerIsRunning() bool
101+
}

daemon/stats/collector_solaris.go

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package stats
2+
3+
import (
4+
"github.com/docker/docker/container"
5+
)
6+
7+
// platformNewStatsCollector performs platform specific initialisation of the
8+
// Collector structure. This is a no-op on Windows.
9+
func platformNewStatsCollector(s *Collector) {
10+
}
11+
12+
// Collect registers the container with the collector and adds it to
13+
// the event loop for collection on the specified interval returning
14+
// a channel for the subscriber to receive on.
15+
// Currently not supported on Solaris
16+
func (s *Collector) Collect(c *container.Container) chan interface{} {
17+
return nil
18+
}
19+
20+
// StopCollection closes the channels for all subscribers and removes
21+
// the container from metrics collection.
22+
// Currently not supported on Solaris
23+
func (s *Collector) StopCollection(c *container.Container) {
24+
}
25+
26+
// Unsubscribe removes a specific subscriber from receiving updates for a container's stats.
27+
// Currently not supported on Solaris
28+
func (s *Collector) Unsubscribe(c *container.Container, ch chan interface{}) {
29+
}

daemon/stats_collector_unix.go renamed to daemon/stats/collector_unix.go

+4-9
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,20 @@
11
// +build !windows,!solaris
22

3-
package daemon
3+
package stats
44

55
import (
66
"fmt"
77
"os"
88
"strconv"
99
"strings"
1010

11-
sysinfo "github.com/docker/docker/pkg/system"
1211
"github.com/opencontainers/runc/libcontainer/system"
1312
)
1413

1514
// platformNewStatsCollector performs platform specific initialisation of the
16-
// statsCollector structure.
17-
func platformNewStatsCollector(s *statsCollector) {
15+
// Collector structure.
16+
func platformNewStatsCollector(s *Collector) {
1817
s.clockTicksPerSecond = uint64(system.GetClockTicks())
19-
meminfo, err := sysinfo.ReadMemInfo()
20-
if err == nil && meminfo.MemTotal > 0 {
21-
s.machineMemory = uint64(meminfo.MemTotal)
22-
}
2318
}
2419

2520
const nanoSecondsPerSecond = 1e9
@@ -32,7 +27,7 @@ const nanoSecondsPerSecond = 1e9
3227
// statistics line and then sums up the first seven fields
3328
// provided. See `man 5 proc` for details on specific field
3429
// information.
35-
func (s *statsCollector) getSystemCPUUsage() (uint64, error) {
30+
func (s *Collector) getSystemCPUUsage() (uint64, error) {
3631
var line string
3732
f, err := os.Open("/proc/stat")
3833
if err != nil {
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
// +build windows
22

3-
package daemon
3+
package stats
44

55
// platformNewStatsCollector performs platform specific initialisation of the
6-
// statsCollector structure. This is a no-op on Windows.
7-
func platformNewStatsCollector(s *statsCollector) {
6+
// Collector structure. This is a no-op on Windows.
7+
func platformNewStatsCollector(s *Collector) {
88
}
99

1010
// getSystemCPUUsage returns the host system's cpu usage in
1111
// nanoseconds. An error is returned if the format of the underlying
1212
// file does not match. This is a no-op on Windows.
13-
func (s *statsCollector) getSystemCPUUsage() (uint64, error) {
13+
func (s *Collector) getSystemCPUUsage() (uint64, error) {
1414
return 0, nil
1515
}

daemon/stats/types.go

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package stats
2+
3+
import (
4+
"bufio"
5+
"sync"
6+
"time"
7+
8+
"github.com/docker/docker/api/types"
9+
"github.com/docker/docker/container"
10+
"github.com/docker/docker/pkg/pubsub"
11+
)
12+
13+
type supervisor interface {
14+
// GetContainerStats collects all the stats related to a container
15+
GetContainerStats(container *container.Container) (*types.StatsJSON, error)
16+
}
17+
18+
// NewCollector creates a stats collector that will poll the supervisor with the specified interval
19+
func NewCollector(supervisor supervisor, interval time.Duration) *Collector {
20+
s := &Collector{
21+
interval: interval,
22+
supervisor: supervisor,
23+
publishers: make(map[*container.Container]*pubsub.Publisher),
24+
bufReader: bufio.NewReaderSize(nil, 128),
25+
}
26+
27+
platformNewStatsCollector(s)
28+
29+
return s
30+
}
31+
32+
// Collector manages and provides container resource stats
33+
type Collector struct {
34+
m sync.Mutex
35+
supervisor supervisor
36+
interval time.Duration
37+
publishers map[*container.Container]*pubsub.Publisher
38+
bufReader *bufio.Reader
39+
40+
// The following fields are not set on Windows currently.
41+
clockTicksPerSecond uint64
42+
}

0 commit comments

Comments
 (0)