Skip to content

feat(worker): Assign WorkerGroup via worker configuration #332

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions vermeer/apps/master/bl/grpc_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (h *ServerHandler) SayHelloMaster(ctx context.Context, req *pb.HelloMasterR
}
}

reqWorker, err := workerMgr.CreateWorker(ip+":"+port, ip, req.Version)
reqWorker, err := workerMgr.CreateWorker(ip+":"+port, ip, req.Version, req.WorkerGroup)
if err != nil {
logrus.Errorf("failed to create a WorkerClient, error: %s", err)
return &pb.HelloMasterResp{WorkerId: -1, WorkerName: reqWorker.Name}, err
Expand All @@ -104,7 +104,7 @@ func (h *ServerHandler) SayHelloMaster(ctx context.Context, req *pb.HelloMasterR
return &pb.HelloMasterResp{}, err
}

logrus.Infof("worker say hello name: %s, client: %s", reqWorker.Name, p.Addr.String())
logrus.Infof("worker say hello name: %s and set to workgroup: %s, client: %s", reqWorker.Name, reqWorker.Group, p.Addr.String())

resp := pb.HelloMasterResp{
WorkerId: reqWorker.Id,
Expand Down
14 changes: 12 additions & 2 deletions vermeer/apps/master/workers/worker_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (wm *workerManager) Init() {
// CreateWorker Build a WorkerClient without an ID, and it'll receive one upon joining the WorkerManager.
// The new WokerClient instance will be assigned a same name with the old one added to the WorkerManager,
// which has the same workerPeer property.
func (wm *workerManager) CreateWorker(workerPeer string, ipAddr string, version string) (*WorkerClient, error) {
func (wm *workerManager) CreateWorker(workerPeer string, ipAddr string, version string, workerGroup string) (*WorkerClient, error) {
if workerPeer == "" {
return nil, fmt.Errorf("the argument 'workerPeer' is invalid")
}
Expand All @@ -115,12 +115,17 @@ func (wm *workerManager) CreateWorker(workerPeer string, ipAddr string, version
return nil, fmt.Errorf("the argument 'version' is invalid")
}

// must check if workerGroup is valid
if workerGroup == "" {
workerGroup = "$"
}

worker := &WorkerClient{
GrpcPeer: workerPeer,
IpAddr: ipAddr,
Version: version,
LaunchTime: time.Now(),
Group: "$",
Group: workerGroup,
}

workerInDB := wm.retrieveWorker(workerPeer)
Expand All @@ -133,6 +138,11 @@ func (wm *workerManager) CreateWorker(workerPeer string, ipAddr string, version
worker.InitTime = worker.LaunchTime
}

// if workerGroup in workerInDB is different from the one in worker, give a warning to the user
if workerGroup != "$" && worker.Group != workerGroup {
logrus.Warnf("worker manager, worker group mismatch: given %s, but found %s in db for worker %s", workerGroup, worker.Group, worker.Name)
}

return worker, nil
}

Expand Down
Loading
Loading