Skip to content

Commit e6503ff

Browse files
committed
Much progress in A74 using DependencyManager
1 parent 9efd2f2 commit e6503ff

13 files changed

+1851
-1318
lines changed

xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java

+1,066-144
Large diffs are not rendered by default.

xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java

+2-778
Large diffs are not rendered by default.

xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java

-152
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,13 @@
1616

1717
package io.grpc.xds;
1818

19-
import static com.google.common.base.Preconditions.checkNotNull;
20-
21-
import com.google.common.base.MoreObjects;
22-
import com.google.common.collect.ImmutableMap;
23-
import com.google.protobuf.Struct;
2419
import io.grpc.Internal;
2520
import io.grpc.LoadBalancer;
2621
import io.grpc.LoadBalancer.Helper;
2722
import io.grpc.LoadBalancerProvider;
2823
import io.grpc.NameResolver.ConfigOrError;
2924
import io.grpc.Status;
30-
import io.grpc.xds.EnvoyServerProtoData.OutlierDetection;
31-
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
32-
import io.grpc.xds.client.Bootstrapper.ServerInfo;
33-
import java.util.List;
3425
import java.util.Map;
35-
import java.util.Objects;
36-
import javax.annotation.Nullable;
3726

3827
/**
3928
* The provider for the cluster_resolver load balancing policy. This class should not be directly
@@ -69,145 +58,4 @@ public LoadBalancer newLoadBalancer(Helper helper) {
6958
return new ClusterResolverLoadBalancer(helper);
7059
}
7160

72-
static final class ClusterResolverConfig {
73-
// Ordered list of clusters to be resolved.
74-
final List<DiscoveryMechanism> discoveryMechanisms;
75-
// GracefulSwitch configuration
76-
final Object lbConfig;
77-
78-
ClusterResolverConfig(List<DiscoveryMechanism> discoveryMechanisms, Object lbConfig) {
79-
this.discoveryMechanisms = checkNotNull(discoveryMechanisms, "discoveryMechanisms");
80-
this.lbConfig = checkNotNull(lbConfig, "lbConfig");
81-
}
82-
83-
@Override
84-
public int hashCode() {
85-
return Objects.hash(discoveryMechanisms, lbConfig);
86-
}
87-
88-
@Override
89-
public boolean equals(Object o) {
90-
if (this == o) {
91-
return true;
92-
}
93-
if (o == null || getClass() != o.getClass()) {
94-
return false;
95-
}
96-
ClusterResolverConfig that = (ClusterResolverConfig) o;
97-
return discoveryMechanisms.equals(that.discoveryMechanisms)
98-
&& lbConfig.equals(that.lbConfig);
99-
}
100-
101-
@Override
102-
public String toString() {
103-
return MoreObjects.toStringHelper(this)
104-
.add("discoveryMechanisms", discoveryMechanisms)
105-
.add("lbConfig", lbConfig)
106-
.toString();
107-
}
108-
109-
// Describes the mechanism for a specific cluster.
110-
static final class DiscoveryMechanism {
111-
// Name of the cluster to resolve.
112-
final String cluster;
113-
// Type of the cluster.
114-
final Type type;
115-
// Load reporting server info. Null if not enabled.
116-
@Nullable
117-
final ServerInfo lrsServerInfo;
118-
// Cluster-level max concurrent request threshold. Null if not specified.
119-
@Nullable
120-
final Long maxConcurrentRequests;
121-
// TLS context for connections to endpoints in the cluster.
122-
@Nullable
123-
final UpstreamTlsContext tlsContext;
124-
// Resource name for resolving endpoints via EDS. Only valid for EDS clusters.
125-
@Nullable
126-
final String edsServiceName;
127-
// Hostname for resolving endpoints via DNS. Only valid for LOGICAL_DNS clusters.
128-
@Nullable
129-
final String dnsHostName;
130-
@Nullable
131-
final OutlierDetection outlierDetection;
132-
final Map<String, Struct> filterMetadata;
133-
134-
enum Type {
135-
EDS,
136-
LOGICAL_DNS,
137-
}
138-
139-
private DiscoveryMechanism(String cluster, Type type, @Nullable String edsServiceName,
140-
@Nullable String dnsHostName, @Nullable ServerInfo lrsServerInfo,
141-
@Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext,
142-
Map<String, Struct> filterMetadata, @Nullable OutlierDetection outlierDetection) {
143-
this.cluster = checkNotNull(cluster, "cluster");
144-
this.type = checkNotNull(type, "type");
145-
this.edsServiceName = edsServiceName;
146-
this.dnsHostName = dnsHostName;
147-
this.lrsServerInfo = lrsServerInfo;
148-
this.maxConcurrentRequests = maxConcurrentRequests;
149-
this.tlsContext = tlsContext;
150-
this.filterMetadata = ImmutableMap.copyOf(checkNotNull(filterMetadata, "filterMetadata"));
151-
this.outlierDetection = outlierDetection;
152-
}
153-
154-
static DiscoveryMechanism forEds(String cluster, @Nullable String edsServiceName,
155-
@Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests,
156-
@Nullable UpstreamTlsContext tlsContext, Map<String, Struct> filterMetadata,
157-
OutlierDetection outlierDetection) {
158-
return new DiscoveryMechanism(cluster, Type.EDS, edsServiceName, null, lrsServerInfo,
159-
maxConcurrentRequests, tlsContext, filterMetadata, outlierDetection);
160-
}
161-
162-
static DiscoveryMechanism forLogicalDns(String cluster, String dnsHostName,
163-
@Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests,
164-
@Nullable UpstreamTlsContext tlsContext, Map<String, Struct> filterMetadata) {
165-
return new DiscoveryMechanism(cluster, Type.LOGICAL_DNS, null, dnsHostName,
166-
lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, null);
167-
}
168-
169-
@Override
170-
public int hashCode() {
171-
return Objects.hash(cluster, type, lrsServerInfo, maxConcurrentRequests, tlsContext,
172-
edsServiceName, dnsHostName, filterMetadata, outlierDetection);
173-
}
174-
175-
@Override
176-
public boolean equals(Object o) {
177-
if (this == o) {
178-
return true;
179-
}
180-
if (o == null || getClass() != o.getClass()) {
181-
return false;
182-
}
183-
DiscoveryMechanism that = (DiscoveryMechanism) o;
184-
return cluster.equals(that.cluster)
185-
&& type == that.type
186-
&& Objects.equals(edsServiceName, that.edsServiceName)
187-
&& Objects.equals(dnsHostName, that.dnsHostName)
188-
&& Objects.equals(lrsServerInfo, that.lrsServerInfo)
189-
&& Objects.equals(maxConcurrentRequests, that.maxConcurrentRequests)
190-
&& Objects.equals(tlsContext, that.tlsContext)
191-
&& Objects.equals(filterMetadata, that.filterMetadata)
192-
&& Objects.equals(outlierDetection, that.outlierDetection);
193-
}
194-
195-
@Override
196-
public String toString() {
197-
MoreObjects.ToStringHelper toStringHelper =
198-
MoreObjects.toStringHelper(this)
199-
.add("cluster", cluster)
200-
.add("type", type)
201-
.add("edsServiceName", edsServiceName)
202-
.add("dnsHostName", dnsHostName)
203-
.add("lrsServerInfo", lrsServerInfo)
204-
// Exclude tlsContext as its string representation is cumbersome.
205-
.add("maxConcurrentRequests", maxConcurrentRequests)
206-
.add("filterMetadata", filterMetadata)
207-
// Exclude outlierDetection as its string representation is long.
208-
;
209-
return toStringHelper.toString();
210-
}
211-
}
212-
}
21361
}

xds/src/main/java/io/grpc/xds/RoutingUtils.java

+4
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ private RoutingUtils() {
4242
*/
4343
@Nullable
4444
static VirtualHost findVirtualHostForHostName(List<VirtualHost> virtualHosts, String hostName) {
45+
if (virtualHosts == null || virtualHosts.isEmpty()) {
46+
return null;
47+
}
48+
4549
// Domain search order:
4650
// 1. Exact domain names: ``www.foo.com``.
4751
// 2. Suffix domain wildcards: ``*.foo.com`` or ``*-bar.foo.com``.

xds/src/main/java/io/grpc/xds/XdsAttributes.java

+7
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,13 @@ final class XdsAttributes {
3636
static final Attributes.Key<ObjectPool<XdsClient>> XDS_CLIENT_POOL =
3737
Attributes.Key.create("io.grpc.xds.XdsAttributes.xdsClientPool");
3838

39+
/**
40+
* Attribute key for passing around the XdsClient object pool across NameResolver/LoadBalancers.
41+
*/
42+
@NameResolver.ResolutionResultAttr
43+
static final Attributes.Key<XdsConfig> XDS_CONFIG =
44+
Attributes.Key.create("io.grpc.xds.XdsAttributes.xdsConfig");
45+
3946
/**
4047
* Attribute key for obtaining the global provider that provides atomics for aggregating
4148
* outstanding RPCs sent to each cluster.

xds/src/main/java/io/grpc/xds/XdsConfig.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,21 @@ public ImmutableMap<String, StatusOr<XdsClusterConfig>> getClusters() {
100100
return clusters;
101101
}
102102

103+
public XdsConfigBuilder toBuilder() {
104+
XdsConfigBuilder builder = new XdsConfigBuilder()
105+
.setVirtualHost(getVirtualHost())
106+
.setRoute(getRoute())
107+
.setListener(getListener());
108+
109+
if (clusters != null) {
110+
for (Map.Entry<String, StatusOr<XdsClusterConfig>> entry : clusters.entrySet()) {
111+
builder.addCluster(entry.getKey(), entry.getValue());
112+
}
113+
}
114+
115+
return builder;
116+
}
117+
103118
static final class XdsClusterConfig {
104119
private final String clusterName;
105120
private final CdsUpdate clusterResource;
@@ -181,7 +196,6 @@ XdsConfigBuilder setVirtualHost(VirtualHost virtualHost) {
181196

182197
XdsConfig build() {
183198
checkNotNull(listener, "listener");
184-
checkNotNull(route, "route");
185199
return new XdsConfig(listener, route, clusters, virtualHost);
186200
}
187201
}

xds/src/main/java/io/grpc/xds/XdsDependencyManager.java

+52-17
Original file line numberDiff line numberDiff line change
@@ -288,16 +288,24 @@ XdsConfig buildConfig() {
288288
}
289289
}
290290

291-
resourceWatchers.get(XdsRouteConfigureResource.getInstance()).watchers.values().stream()
292-
.map(watcher -> (RdsWatcher) watcher)
293-
.forEach(watcher -> builder.setRoute(watcher.getData().getValue()));
291+
if (resourceWatchers.containsKey(XdsRouteConfigureResource.getInstance())) {
292+
resourceWatchers.get(XdsRouteConfigureResource.getInstance()).watchers.values().stream()
293+
.map(watcher -> (RdsWatcher) watcher)
294+
.forEach(watcher -> builder.setRoute(watcher.getData().getValue()));
295+
}
294296

295-
builder.setVirtualHost(activeVirtualHost);
297+
if (activeVirtualHost != null) {
298+
builder.setVirtualHost(activeVirtualHost);
299+
}
296300

297301
Map<String, ? extends XdsWatcherBase<?>> edsWatchers =
298-
resourceWatchers.get(ENDPOINT_RESOURCE).watchers;
302+
resourceWatchers.containsKey(ENDPOINT_RESOURCE)
303+
? resourceWatchers.get(ENDPOINT_RESOURCE).watchers
304+
: Collections.EMPTY_MAP;
299305
Map<String, ? extends XdsWatcherBase<?>> cdsWatchers =
300-
resourceWatchers.get(CLUSTER_RESOURCE).watchers;
306+
resourceWatchers.containsKey(CLUSTER_RESOURCE)
307+
? resourceWatchers.get(CLUSTER_RESOURCE).watchers
308+
: Collections.EMPTY_MAP;
301309

302310
// Iterate CDS watchers
303311
for (XdsWatcherBase<?> watcher : cdsWatchers.values()) {
@@ -450,28 +458,39 @@ public void onChanged(XdsListenerResource.LdsUpdate update) {
450458

451459
if (virtualHosts != null) {
452460
// No RDS watcher since we are getting RDS updates via LDS
453-
updateRoutes(virtualHosts, this, activeVirtualHost, this.rdsName == null);
461+
boolean updateSuccessful = updateRoutes(virtualHosts, this, activeVirtualHost, this.rdsName == null);
454462
this.rdsName = null;
463+
if (!updateSuccessful) {
464+
lastXdsConfig = null;
465+
return;
466+
}
467+
455468
} else if (changedRdsName) {
456-
cleanUpRdsWatcher();
469+
lastXdsConfig = null;
457470
this.rdsName = rdsName;
458471
addWatcher(new RdsWatcher(rdsName));
459472
logger.log(XdsLogger.XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName);
460473
}
461474

462475
setData(update);
463-
maybePublishConfig();
476+
if (virtualHosts != null || changedRdsName) {
477+
maybePublishConfig();
478+
}
464479
}
465480

466481
@Override
467482
public void onError(Status error) {
468483
super.onError(checkNotNull(error, "error"));
484+
lastXdsConfig = null; //When we get a good update, we will publish it
469485
xdsConfigWatcher.onError(toContextString(), error);
470486
}
471487

472488
@Override
473489
public void onResourceDoesNotExist(String resourceName) {
474490
handleDoesNotExist(resourceName);
491+
cleanUpRdsWatcher();
492+
rdsName = null;
493+
lastXdsConfig = null; // Publishing an empty result
475494
xdsConfigWatcher.onResourceDoesNotExist(toContextString());
476495
}
477496

@@ -518,20 +537,23 @@ public void onChanged(RdsUpdate update) {
518537
? RoutingUtils.findVirtualHostForHostName(oldData.virtualHosts, dataPlaneAuthority)
519538
: null;
520539
setData(update);
521-
updateRoutes(update.virtualHosts, this, oldVirtualHost, true);
522-
maybePublishConfig();
540+
if (updateRoutes(update.virtualHosts, this, oldVirtualHost, true)) {
541+
maybePublishConfig();
542+
}
523543
}
524544

525545
@Override
526546
public void onError(Status error) {
527547
super.onError(checkNotNull(error, "error"));
528548
xdsConfigWatcher.onError(toContextString(), error);
549+
lastXdsConfig = null; // will publish when we get a good update
529550
}
530551

531552
@Override
532553
public void onResourceDoesNotExist(String resourceName) {
533554
handleDoesNotExist(checkNotNull(resourceName, "resourceName"));
534555
xdsConfigWatcher.onResourceDoesNotExist(toContextString());
556+
lastXdsConfig = null; // Published an empty result
535557
}
536558

537559
ImmutableList<String> getCdsNames() {
@@ -557,6 +579,12 @@ public void onChanged(XdsClusterResource.CdsUpdate update) {
557579
switch (update.clusterType()) {
558580
case EDS:
559581
setData(update);
582+
if (update.edsServiceName() == null) {
583+
Status error = Status.UNAVAILABLE.withDescription("EDS cluster missing edsServiceName");
584+
setDataAsStatus(error);
585+
maybePublishConfig();
586+
return;
587+
}
560588
if (!addEdsWatcher(update.edsServiceName(), this)) {
561589
maybePublishConfig();
562590
}
@@ -577,8 +605,13 @@ public void onChanged(XdsClusterResource.CdsUpdate update) {
577605
setDataAsStatus(error);
578606
}
579607
if (hasDataValue()) {
580-
Set<String> oldNames = new HashSet<>(getData().getValue().prioritizedClusterNames());
581-
Set<String> newNames = new HashSet<>(update.prioritizedClusterNames());
608+
ImmutableList<String> oldChildNames = getData().getValue().prioritizedClusterNames();
609+
Set<String> oldNames = oldChildNames != null
610+
? new HashSet<>(oldChildNames)
611+
: new HashSet<>();
612+
ImmutableList<String> newChildNames = update.prioritizedClusterNames();
613+
Set<String> newNames =
614+
newChildNames != null ? new HashSet<>(newChildNames) : new HashSet<>();
582615

583616

584617
Set<String> deletedClusters = Sets.difference(oldNames, newNames);
@@ -670,17 +703,17 @@ void addParentContext(CdsWatcher parentContext) {
670703
}
671704
}
672705

673-
private void updateRoutes(List<VirtualHost> virtualHosts, Object newParentContext,
706+
private boolean updateRoutes(List<VirtualHost> virtualHosts, Object newParentContext,
674707
VirtualHost oldVirtualHost, boolean sameParentContext) {
675708
VirtualHost virtualHost =
676709
RoutingUtils.findVirtualHostForHostName(virtualHosts, dataPlaneAuthority);
677710
if (virtualHost == null) {
678711
String error = "Failed to find virtual host matching hostname: " + dataPlaneAuthority;
679712
logger.log(XdsLogger.XdsLogLevel.WARNING, error);
680713
cleanUpRoutes();
681-
xdsConfigWatcher.onError(
682-
"xDS node ID:" + dataPlaneAuthority, Status.UNAVAILABLE.withDescription(error));
683-
return;
714+
Status errorStatus = Status.UNAVAILABLE.withDescription(error);
715+
xdsConfigWatcher.onError("xDS node ID:" + dataPlaneAuthority, errorStatus);
716+
return false;
684717
}
685718

686719
Set<String> newClusters = getClusterNamesFromVirtualHost(virtualHost);
@@ -697,6 +730,8 @@ private void updateRoutes(List<VirtualHost> virtualHosts, Object newParentContex
697730
} else {
698731
newClusters.forEach((cluster) -> addClusterWatcher(cluster, newParentContext, 1));
699732
}
733+
734+
return true;
700735
}
701736

702737
private static Set<String> getClusterNamesFromVirtualHost(VirtualHost virtualHost) {

0 commit comments

Comments
 (0)