-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpwm.py
206 lines (165 loc) · 6.56 KB
/
pwm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
from smbus import SMBus
from time import sleep
# keys are the cpu temperature, and values are the desired fan duty cycle in %
# keys and values must be numeric
# keys and values must lie between 0 and 100
# keys must be sorted in ascending order
#
# If the measured cpu temp is below the first keys value the pwm duty cycle
# will be set to 0% (off)
#
# If the measured cpu temp is above the last keys value the pwm duty cycle
# will be set to 100% (max)
curve = {
30: 30,
45: 32,
47: 34,
53: 55,
55: 57,
70: 59,
}
# the time in seconds between cpu temperature readings
interval = 0.1
# the number of readings used to calculate a moving temperate average.
# Averaging multiple values helps reduce noise and thus fan hunting.
readings = 30
# Even though the fan curve duty cycle is a float value between 0 and 100 the
# EMC2301 is only an 8 bit devise. Thus after the duty cycle is calculated
# in % it gets scaled and rounded to a discrete integer value (0-255).
# a min step size is a second level of noise reduction. if the current value
# is not minSteps or more away from the previous value, the duty cycle
# will not be set to the current value.
minStep = 3
class pwmFan(object):
def __init__(self, curve, interval, readings, minStep):
self.curve = curve
self.interval = interval
self.readings = readings
self.minStep = minStep
self.validate()
# connect to bus
self.bus = SMBus(10)
# EMC2301 address
self.address = 0X2F
# fan max step size used internally by the EMC2301
# valid decimal values range between 0 and 63
self.fanMaxStepReg = 0x37
self.fanMaxStepVal = 0x0A
# fan configuration 1 - is used to set the EMC2301 drive mode and
# update interval. In Driect drive mode valid values are as follows
# 0x00 100ms update interval
# 0x01 200ms update interval
# 0x02 300ms update interval
# 0x03 400ms update interval
# 0x04 500ms update interval
# 0x05 800ms update interval
# 0x06 1200ms update interval
# 0x07 1600ms update interval
self.fanConOneReg = 0x32
self.fanConOneVal = 0x00
# fan configuration 2 - does not need to be changed when working in
# direct drive mode as its only need to enable ramp control.
# ramp control uses max step size, & update interval to limit how fast
# the duty cycle can be changed
self.fanConTwoReg = 0x33
self.fanConTwoVal = 0x40
# fan PWM register
self.fanPwmReg = 0x30
# the previous pwm value
self.previousPwm = 0
# the readings used to calculate the moving average
self.samples = []
# the temperature values from the fan curve
self.temps = list(self.curve)
# the number of line segments in the fan curve
self.segments = len(self.temps) - 1
# The minimum temperature in the fan curve
self.min = self.temps[0]
# The maximum temperature in the fan curve
self.max = self.temps[-1]
def updateReg(self, reg, val):
self.bus.write_byte_data(self.address, reg, val)
def getCpuTemp(self):
f = open('/sys/class/thermal/thermal_zone0/temp')
temp = float(f.read().strip())/1000
f.close()
return temp
def getPwm(self, temp):
if temp < self.min:
# turn the fan off
return 0;
if temp > self.max:
# run the fan at 100%
return 255;
i = 0
while i < self.segments:
if temp >= self.temps[i] and temp <= self.temps[i+1]:
# linear interpolation of 2 points
x1 = self.temps[i]
x2 = self.temps[i+1]
y1 = self.curve[x1]
y2 = self.curve[x2]
m = (y2 - y1) / (x2 - x1)
b = y1 - m * x1
y = m * temp + b
return round(y / 100 * 255)
i += 1
# default to 100%
return 255;
def run(self):
self.updateReg(self.fanMaxStepReg, self.fanMaxStepVal)
self.updateReg(self.fanConOneReg, self.fanConOneVal)
self.updateReg(self.fanConTwoReg, self.fanConTwoVal)
while True:
self.samples.append(self.getCpuTemp())
if len(self.samples) > self.readings:
self.samples.pop(0)
avg = sum(self.samples)/len(self.samples)
pwm = self.getPwm(avg)
if pwm == 0:
# ensure the fan can always be turned off regardless of
# what self.minStep is set to
self.previousPwm = 0
self.updateReg(self.fanPwmReg, 0)
elif pwm == 255:
# ensure the fan can always be turned up to 100% regardless of
# what self.minStep is set to
self.previousPwm = 255
self.updateReg(self.fanPwmReg, 255)
else:
if abs(self.previousPwm - pwm) >= self.minStep:
self.previousPwm = pwm
self.updateReg(self.fanPwmReg, pwm)
sleep(self.interval)
def validate(self):
if not isinstance(self.curve, dict):
raise Exception('Error - curve must be a dict')
if not isinstance(self.interval, (int, float)):
raise Exception('Error - interval is not numeric')
if not isinstance(self.readings, int):
raise Exception('Error - readings must be an integer')
if not isinstance(self.minStep, int):
raise Exception('Error - minStep must be an integer')
if len(self.curve) < 2:
raise Exception('Error - curve must contain at least 2 key:value pairs')
previous = -1
for key, val in self.curve.items():
if not isinstance(key, (int, float)):
raise Exception('Error - a curve key is not numeric')
if not isinstance(val, (int, float)):
raise Exception('Error - a curve value is not numeric')
if key < 0 or key > 100:
raise Exception('Error - a curve key is out of range')
if val < 0 or val > 100:
raise Exception('Error - a curve value is out of range')
if key <= previous:
raise Exception('Error - curve keys are not sorted in ascending order')
else:
previous = key
try:
fan = pwmFan(curve, interval, readings, minStep)
fan.run()
except (Exception) as e:
print(e)
except KeyboardInterrupt:
pass