-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstress.py
134 lines (106 loc) · 3.46 KB
/
stress.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
from multiprocessing import cpu_count
from subprocess import Popen, check_output
from time import time, sleep
from smbus import SMBus
import os, signal
# the file that data should be logged to
filename = '/home/pi/stress.csv'
# how many idle seconds should be logged before and after the stress test
idle = 60
# how many seconds should the stress test last
duration = 300
# the interval between data logging events.
interval = 1.0
class stress(object):
def __init__(self, filename, idle, duration, interval):
self.filename = filename
self.idle = idle
self.duration = duration
self.interval = interval
self.bus = SMBus(10)
self.address = 0X2F
self.fanPwmReg = 0x30
self.stress = None
self.fp = None
def getCpuTemp(self):
f = open('/sys/class/thermal/thermal_zone0/temp')
temp = float(f.read().strip()) / 1000
f.close()
return '{:.2f}'.format(round(temp,2))
def getCpuFreq(self):
out = check_output(["vcgencmd", "measure_clock arm"]).decode("utf-8")
return '{:.2f}'.format(round(float(out.split("=")[1]) / 1000000,2))
def getPwm(self):
pwm = self.bus.read_byte_data(self.address, self.fanPwmReg)
#pwm = 10
return '{:.2f}'.format(round(pwm / 255 * 100,2))
def openFile(self):
if self.fp is None:
self.fp = open(self.filename, 'w')
labels = [
'Time (s)',
'Temperature (C)',
'CPU Frequency (MHz)',
'PWM Duty Cycle (%)'
]
self.writeToFile(labels)
def closeFile(self):
if self.fp is not None:
self.fp.close()
self.fp = None
def writeToFile(self, args):
self.openFile()
self.fp.write(','.join(args) + "\n")
def logData(self, time):
args = [
str(round(time,2)),
self.getCpuTemp(),
self.getCpuFreq(),
self.getPwm()
]
self.writeToFile(args)
args = [
'{:>10}'.format('{:.2f}'.format(round(time,2)) + 's'),
'{:>7}'.format(self.getCpuTemp() + 'c'),
'{:>10}'.format(self.getCpuFreq() + 'MHz'),
'{:>7}'.format(self.getPwm() + '%')
]
print(' '.join(args))
def startStress(self):
if self.stress is None:
cpus = cpu_count()
self.stress = Popen(['stress', '-q', '-c', str(cpus), '-t', str(self.duration)], preexec_fn=os.setsid);
def stopStress(self):
if self.stress is not None:
os.killpg(self.stress.pid, signal.SIGINT)
self.stress = None
def run(self):
start = time()
now = time()
ref = self.idle
while now - start <= ref:
self.logData(now - start)
sleep(self.interval)
now = time()
self.startStress()
ref = self.idle + self.duration
while now - start <= ref:
self.logData(now - start)
sleep(self.interval)
now = time()
self.stopStress()
ref = self.idle + self.duration + self.idle
while now - start <= ref:
self.logData(now - start)
sleep(self.interval)
now = time()
try:
stress = stress(filename, idle, duration, interval)
stress.run()
except (Exception) as e:
print(e)
except KeyboardInterrupt:
pass
finally:
stress.closeFile()
stress.stopStress()