Skip to content

Commit bede0a0

Browse files
Add X-Ray daemon (#14)
1 parent 6fbdf2e commit bede0a0

File tree

7 files changed

+397
-48
lines changed

7 files changed

+397
-48
lines changed

cmd/localstack/awsutil.go

+5-23
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ package main
88
import (
99
"context"
1010
"fmt"
11-
"github.com/jessevdk/go-flags"
1211
log "github.com/sirupsen/logrus"
1312
"go.amzn.com/lambda/interop"
1413
"go.amzn.com/lambda/rapidcore"
@@ -27,29 +26,12 @@ const (
2726
runtimeBootstrap = "/var/runtime/bootstrap"
2827
)
2928

30-
type options struct {
31-
LogLevel string `long:"log-level" default:"info" description:"log level"`
32-
InitCachingEnabled bool `long:"enable-init-caching" description:"Enable support for Init Caching"`
33-
}
34-
35-
func getCLIArgs() (options, []string) {
36-
var opts options
37-
parser := flags.NewParser(&opts, flags.IgnoreUnknown)
38-
args, err := parser.ParseArgs(os.Args)
39-
40-
if err != nil {
41-
log.WithError(err).Fatal("Failed to parse command line arguments:", os.Args)
42-
}
43-
44-
return opts, args
45-
}
46-
4729
func isBootstrapFileExist(filePath string) bool {
4830
file, err := os.Stat(filePath)
4931
return !os.IsNotExist(err) && !file.IsDir()
5032
}
5133

52-
func getBootstrap(args []string, opts options) (*rapidcore.Bootstrap, string) {
34+
func getBootstrap(args []string) (*rapidcore.Bootstrap, string) {
5335
var bootstrapLookupCmd []string
5436
var handler string
5537
currentWorkingDir := "/var/task" // default value
@@ -148,7 +130,7 @@ func resetListener(changeChannel <-chan bool, server *CustomInteropServer) {
148130

149131
func RunDNSRewriter(opts *LsOpts, ctx context.Context) {
150132
if opts.EnableDnsServer != "1" {
151-
log.Debugln("Dns server disabled")
133+
log.Debugln("DNS server disabled.")
152134
return
153135
}
154136
dnsForwarder, err := NewDnsForwarder(opts.LocalstackIP)
@@ -160,7 +142,7 @@ func RunDNSRewriter(opts *LsOpts, ctx context.Context) {
160142
dnsForwarder.Start()
161143

162144
<-ctx.Done()
163-
log.Debugln("Shutting down dns server")
145+
log.Debugln("DNS server stopped")
164146
}
165147

166148
func RunHotReloadingListener(server *CustomInteropServer, targetPaths []string, ctx context.Context) {
@@ -234,11 +216,11 @@ func InitHandler(sandbox Sandbox, functionVersion string, timeout int64) (time.T
234216
// pass to rapid
235217
sandbox.Init(&interop.Init{
236218
Handler: GetenvWithDefault("AWS_LAMBDA_FUNCTION_HANDLER", os.Getenv("_HANDLER")),
237-
CorrelationID: "initCorrelationID",
219+
CorrelationID: "initCorrelationID", // TODO
238220
AwsKey: os.Getenv("AWS_ACCESS_KEY_ID"),
239221
AwsSecret: os.Getenv("AWS_SECRET_ACCESS_KEY"),
240222
AwsSession: os.Getenv("AWS_SESSION_TOKEN"),
241-
XRayDaemonAddress: "0.0.0.0:0", // TODO
223+
XRayDaemonAddress: GetenvWithDefault("AWS_XRAY_DAEMON_ADDRESS", "127.0.0.1:2000"),
242224
FunctionName: GetenvWithDefault("AWS_LAMBDA_FUNCTION_NAME", "test_function"),
243225
FunctionVersion: functionVersion,
244226

cmd/localstack/custom_interop.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,15 @@ func (l *LocalStackAdapter) SendStatus(status LocalStackStatus) error {
4444
return nil
4545
}
4646

47+
// The InvokeRequest is sent by LocalStack to trigger an invocation
4748
type InvokeRequest struct {
4849
InvokeId string `json:"invoke-id"`
4950
InvokedFunctionArn string `json:"invoked-function-arn"`
5051
Payload string `json:"payload"`
52+
TraceId string `json:"trace-id"`
5153
}
5254

55+
// The ErrorResponse is sent TO LocalStack when encountering an error
5356
type ErrorResponse struct {
5457
ErrorMessage string `json:"errorMessage"`
5558
ErrorType string `json:"errorType,omitempty"`
@@ -95,10 +98,8 @@ func NewCustomInteropServer(lsOpts *LsOpts, delegate rapidcore.InteropServer, lo
9598
Payload: strings.NewReader(invokeR.Payload), // r.Body,
9699
NeedDebugLogs: true,
97100
CorrelationID: "invokeCorrelationID",
98-
// TODO: should we use the env _X_AMZN_TRACE_ID here or get the value from the request headers from the direct invoke?
99-
// for now we just set a "real" static value
100-
TraceID: "Root=1-53cfd31b-192638fa13e39d2c2bcea001;Parent=365fb4b15f2e3987;Sampled=0", // r.Header.Get("X-Amzn-Trace-Id"),
101-
//TraceID: GetEnvOrDie("_X_AMZN_TRACE_ID"), // r.Header.Get("X-Amzn-Trace-Id"),
101+
102+
TraceID: invokeR.TraceId,
102103
// TODO: set correct segment ID from request
103104
//LambdaSegmentID: "LambdaSegmentID", // r.Header.Get("X-Amzn-Segment-Id"),
104105
//CognitoIdentityID: "",

cmd/localstack/main.go

+43-13
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ type LsOpts struct {
2424
HotReloadingPaths []string
2525
EnableDnsServer string
2626
LocalstackIP string
27+
InitLogLevel string
28+
EdgePort string
2729
}
2830

2931
func GetEnvOrDie(env string) string {
@@ -36,12 +38,15 @@ func GetEnvOrDie(env string) string {
3638

3739
func InitLsOpts() *LsOpts {
3840
return &LsOpts{
41+
// required
3942
RuntimeEndpoint: GetEnvOrDie("LOCALSTACK_RUNTIME_ENDPOINT"),
4043
RuntimeId: GetEnvOrDie("LOCALSTACK_RUNTIME_ID"),
4144
// optional with default
4245
InteropPort: GetenvWithDefault("LOCALSTACK_INTEROP_PORT", "9563"),
4346
InitTracingPort: GetenvWithDefault("LOCALSTACK_RUNTIME_TRACING_PORT", "9564"),
4447
User: GetenvWithDefault("LOCALSTACK_USER", "sbx_user1051"),
48+
InitLogLevel: GetenvWithDefault("LOCALSTACK_INIT_LOG_LEVEL", "debug"),
49+
EdgePort: GetenvWithDefault("EDGE_PORT", "4566"),
4550
// optional or empty
4651
CodeArchives: os.Getenv("LOCALSTACK_CODE_ARCHIVES"),
4752
HotReloadingPaths: strings.Split(GetenvWithDefault("LOCALSTACK_HOT_RELOADING_PATHS", ""), ","),
@@ -62,6 +67,7 @@ func UnsetLsEnvs() {
6267
"LOCALSTACK_CODE_ARCHIVES",
6368
"LOCALSTACK_HOT_RELOADING_PATHS",
6469
"LOCALSTACK_ENABLE_DNS_SERVER",
70+
"LOCALSTACK_INIT_LOG_LEVEL",
6571
// Docker container ID
6672
"HOSTNAME",
6773
// User
@@ -78,22 +84,33 @@ func main() {
7884
// we're setting this to the same value as in the official RIE
7985
debug.SetGCPercent(33)
8086

87+
// configuration parsing
8188
lsOpts := InitLsOpts()
8289
UnsetLsEnvs()
8390

84-
// set up logging (logrus)
85-
//log.SetFormatter(&log.JSONFormatter{})
86-
//log.SetLevel(log.TraceLevel)
87-
log.SetLevel(log.DebugLevel)
91+
// set up logging
8892
log.SetReportCaller(true)
93+
switch lsOpts.InitLogLevel {
94+
case "debug":
95+
log.SetLevel(log.DebugLevel)
96+
case "trace":
97+
log.SetFormatter(&log.JSONFormatter{})
98+
log.SetLevel(log.TraceLevel)
99+
default:
100+
log.Fatal("Invalid value for LOCALSTACK_INIT_LOG_LEVEL")
101+
}
102+
103+
// enable dns server
104+
dnsServerContext, stopDnsServer := context.WithCancel(context.Background())
105+
go RunDNSRewriter(lsOpts, dnsServerContext)
89106

90107
// download code archive if env variable is set
91108
if err := DownloadCodeArchives(lsOpts.CodeArchives); err != nil {
92109
log.Fatal("Failed to download code archives")
93110
}
94-
// enable dns server
95-
dnsServerContext, stopDnsServer := context.WithCancel(context.Background())
96-
go RunDNSRewriter(lsOpts, dnsServerContext)
111+
112+
// parse CLI args
113+
bootstrap, handler := getBootstrap(os.Args)
97114

98115
// Switch to non-root user and drop root privileges
99116
if IsRootUser() && lsOpts.User != "" {
@@ -108,23 +125,36 @@ func main() {
108125
UserLogger().Debugln("Process running as non-root user.")
109126
}
110127

111-
// parse CLI args
112-
opts, args := getCLIArgs()
113-
bootstrap, handler := getBootstrap(args, opts)
114128
logCollector := NewLogCollector()
129+
130+
// file watcher for hot-reloading
115131
fileWatcherContext, cancelFileWatcher := context.WithCancel(context.Background())
132+
133+
// build sandbox
116134
sandbox := rapidcore.
117135
NewSandboxBuilder(bootstrap).
136+
//SetTracer(tracer).
118137
AddShutdownFunc(func() {
119-
log.Debugln("Closing contexts")
138+
log.Debugln("Stopping file watcher")
120139
cancelFileWatcher()
140+
log.Debugln("Stopping DNS server")
121141
stopDnsServer()
122142
}).
123-
AddShutdownFunc(func() { os.Exit(0) }).
124143
SetExtensionsFlag(true).
125144
SetInitCachingFlag(true).
126145
SetTailLogOutput(logCollector)
127146

147+
// xray daemon
148+
xrayConfig := initConfig("http://" + lsOpts.LocalstackIP + ":" + lsOpts.EdgePort)
149+
d := initDaemon(xrayConfig)
150+
sandbox.AddShutdownFunc(func() {
151+
log.Debugln("Shutting down xray daemon")
152+
d.stop()
153+
log.Debugln("Flushing segments in xray daemon")
154+
d.close()
155+
})
156+
runDaemon(d) // async
157+
128158
defaultInterop := sandbox.InteropServer()
129159
interopServer := NewCustomInteropServer(lsOpts, defaultInterop, logCollector)
130160
sandbox.SetInteropServer(interopServer)
@@ -136,7 +166,7 @@ func main() {
136166
go sandbox.Create()
137167

138168
// get timeout
139-
invokeTimeoutEnv := GetEnvOrDie("AWS_LAMBDA_FUNCTION_TIMEOUT")
169+
invokeTimeoutEnv := GetEnvOrDie("AWS_LAMBDA_FUNCTION_TIMEOUT") // TODO: collect all AWS_* env parsing
140170
invokeTimeoutSeconds, err := strconv.Atoi(invokeTimeoutEnv)
141171
if err != nil {
142172
log.Fatalln(err)

0 commit comments

Comments
 (0)