-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathgossub.go
106 lines (90 loc) · 2.63 KB
/
gossub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package gossub
import (
"os"
"os/exec"
"path"
"runtime"
)
const (
SPARK_MASTER = "spark.master"
DEPLOY_MODE = "spark.submit.deployMode"
DRIVER_MEMORY = "spark.driver.memory"
DRIVER_EXTRA_CLASSPATH = "spark.driver.extraClassPath"
DRIVER_DEFAULT_JAVA_OPTIONS = "spark.driver.defaultJavaOptions"
DRIVER_EXTRA_JAVA_OPTIONS = "spark.driver.extraJavaOptions"
DRIVER_EXTRA_LIBRARY_PATH = "spark.driver.extraLibraryPath"
EXECUTOR_MEMORY = "spark.executor.memory"
EXECUTOR_EXTRA_CLASSPATH = "spark.executor.extraClassPath"
EXECUTOR_DEFAULT_JAVA_OPTIONS = "spark.executor.defaultJavaOptions"
EXECUTOR_EXTRA_JAVA_OPTIONS = "spark.executor.extraJavaOptions"
EXECUTOR_EXTRA_LIBRARY_PATH = "spark.executor.extraLibraryPath"
EXECUTOR_CORES = "spark.executor.cores"
PYSPARK_DRIVER_PYTHON = "spark.pyspark.driver.python"
PYSPARK_PYTHON = "spark.pyspark.python"
SPARKR_R_SHELL = "spark.r.shell.command"
CHILD_PROCESS_LOGGER_NAME = "spark.launcher.childProcLoggerName"
NO_RESOURCE = "spark-internal"
CHILD_CONNECTION_TIMEOUT = "spark.launcher.childConectionTimeout"
)
type Launcher interface {
SetConfig(k, v string) *SparkLauncher
Directory(file os.File) *SparkLauncher
RedirectError(file *os.File)
RedirectOutput(file *os.File)
SetConf(k, v string) *SparkLauncher
AddSparkArgs(arg string) *SparkLauncher
Launch() *os.Process
}
type SparkLauncher struct {
cmd *exec.Cmd
conf map[string]string
sparkArgs []string
JavaHome string
SparkHome string
Pwd string
AppName string
Master string
DeployMode string
AppResource string
MainClass string
Jar string
PyFile string
Verbose bool
}
func (sl *SparkLauncher) Init() {
sl.cmd = exec.Command(
sl.findSparkSubmit(),
"--class",
sl.MainClass,
sl.Jar,
)
}
func (sl *SparkLauncher) RedirectError(file *os.File) {
sl.cmd.Stderr = file
}
func (sl *SparkLauncher) RedirectOutput(file *os.File) {
sl.cmd.Stdout = file
}
func (sl *SparkLauncher) SetConf(k, v string) *SparkLauncher {
sl.conf[k] = v
return sl
}
func (sl *SparkLauncher) AddSparkArgs(arg string) *SparkLauncher {
sl.sparkArgs = append(sl.sparkArgs, arg)
return sl
}
func (sl *SparkLauncher) findSparkSubmit() string {
script := func() string {
scr := "spark-submit"
if runtime.GOOS == "windows" {
scr += ".cmd"
}
return scr
}()
return path.Join(sl.SparkHome, "bin", script)
}
func (sl *SparkLauncher) Launch() (*os.Process, error) {
var err error
err = sl.cmd.Run()
return sl.cmd.Process, err
}