-
-
Notifications
You must be signed in to change notification settings - Fork 300
/
Copy paththreadregular.go
142 lines (123 loc) · 3.42 KB
/
threadregular.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package frankenphp
import (
"sync"
)
// representation of a non-worker PHP thread
// executes PHP scripts in a web context
// implements the threadHandler interface
type regularThread struct {
state *threadState
thread *phpThread
requestContext *frankenPHPContext
}
var (
regularThreads []*phpThread
regularThreadMu = &sync.RWMutex{}
regularRequestChan chan *frankenPHPContext
)
func convertToRegularThread(thread *phpThread) {
thread.setHandler(®ularThread{
thread: thread,
state: thread.state,
})
attachRegularThread(thread)
}
// beforeScriptExecution returns the name of the script or an empty string on shutdown
func (handler *regularThread) beforeScriptExecution() string {
switch handler.state.get() {
case stateTransitionRequested:
detachRegularThread(handler.thread)
return handler.thread.transitionToNewHandler()
case stateTransitionComplete:
handler.state.set(stateReady)
return handler.waitForRequest()
case stateReady:
return handler.waitForRequest()
case stateShuttingDown:
detachRegularThread(handler.thread)
// signal to stop
return ""
}
panic("unexpected state: " + handler.state.name())
}
// return true if the worker should continue to run
func (handler *regularThread) afterScriptExecution(exitStatus int) {
handler.afterRequest()
}
func (handler *regularThread) getRequestContext() *frankenPHPContext {
return handler.requestContext
}
func (handler *regularThread) name() string {
return "Regular PHP Thread"
}
func (handler *regularThread) waitForRequest() string {
// clear any previously sandboxed env
clearSandboxedEnv(handler.thread)
handler.state.markAsWaiting(true)
var fc *frankenPHPContext
select {
case <-handler.thread.drainChan:
// go back to beforeScriptExecution
return handler.beforeScriptExecution()
case fc = <-regularRequestChan:
}
handler.requestContext = fc
handler.state.markAsWaiting(false)
if err := updateServerContext(handler.thread, fc, false); err != nil {
fc.rejectBadRequest(err.Error())
handler.afterRequest()
handler.thread.Unpin()
// go back to beforeScriptExecution
return handler.beforeScriptExecution()
}
// set the scriptFilename that should be executed
return fc.scriptFilename
}
func (handler *regularThread) afterRequest() {
handler.requestContext.closeContext()
handler.requestContext = nil
}
func handleRequestWithRegularPHPThreads(fc *frankenPHPContext) {
metrics.StartRequest()
select {
case regularRequestChan <- fc:
// a thread was available to handle the request immediately
<-fc.done
metrics.StopRequest()
return
default:
// no thread was available
}
// if no thread was available, mark the request as queued and fan it out to all threads
metrics.QueuedRequest()
for {
select {
case regularRequestChan <- fc:
metrics.DequeuedRequest()
<-fc.done
metrics.StopRequest()
return
case scaleChan <- fc:
// the request has triggered scaling, continue to wait for a thread
case <-timeoutChan(maxWaitTime):
// the request has timed out stalling
fc.reject(504, "Gateway Timeout")
return
}
}
}
func attachRegularThread(thread *phpThread) {
regularThreadMu.Lock()
regularThreads = append(regularThreads, thread)
regularThreadMu.Unlock()
}
func detachRegularThread(thread *phpThread) {
regularThreadMu.Lock()
for i, t := range regularThreads {
if t == thread {
regularThreads = append(regularThreads[:i], regularThreads[i+1:]...)
break
}
}
regularThreadMu.Unlock()
}