Skip to content

Commit 655764d

Browse files
authored
gnd: Support multiple subgraphs, grafting, subgraph composition in dev mode (#6000)
* graph: Add clone_for_deployment to FileLinkResolver to create FileLinkResolver with the right base dir for a subgraph * graph: Add for_deployment to LinkResolverTrait * core, graph: use for_deployment to get properly scoped resolver * graph: Implement aliases for file link resolver * node: Make gnd work with multiple subgraphs * node: Support subgraph datasource in gnd * node: correct the default value for manfiest * core, node, graph: Ignore graft base in dev mode * node: Allow providing a postgres url for gnd * node: Do not use pgtemp in windows * store: enable `vendored` feature for openssl crate * chain/ethereum: Return error when ipc is used in non unix platform * node: Refactor launcher * node/dev : Better error message when database directory doesn't exist * node: refactor watcher * core, node, graph: Manipulate raw manifest instead of passing ignore_graft_base This reverts commit b5bbf93. * node: Correct comments on `redeploy_all_subgraphs` * node/gnd: Deploy all subgraphs first before wathcing files * core, graph : Refactor LinkResolver trait
1 parent ae8d082 commit 655764d

File tree

22 files changed

+655
-154
lines changed

22 files changed

+655
-154
lines changed

Cargo.lock

Lines changed: 10 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

chain/ethereum/src/transport.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ impl Transport {
3232
.expect("Failed to connect to Ethereum IPC")
3333
}
3434

35+
#[cfg(not(unix))]
36+
pub async fn new_ipc(_ipc: &str) -> Self {
37+
panic!("IPC connections are not supported on non-Unix platforms")
38+
}
39+
3540
/// Creates a WebSocket transport.
3641
pub async fn new_ws(ws: &str) -> Self {
3742
ws::WebSocket::new(ws)

chain/substreams/src/data_source.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -705,6 +705,10 @@ mod test {
705705
unimplemented!()
706706
}
707707

708+
fn for_manifest(&self, _manifest_path: &str) -> Result<Box<dyn LinkResolver>, Error> {
709+
unimplemented!()
710+
}
711+
708712
async fn cat(&self, _logger: &Logger, _link: &Link) -> Result<Vec<u8>, Error> {
709713
Ok(gen_package().encode_to_vec())
710714
}

core/src/subgraph/instance_manager.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,12 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
287287
let manifest = UnresolvedSubgraphManifest::parse(deployment.hash.cheap_clone(), manifest)?;
288288

289289
// Allow for infinite retries for subgraph definition files.
290-
let link_resolver = Arc::from(self.link_resolver.with_retries());
290+
let link_resolver = Arc::from(
291+
self.link_resolver
292+
.for_manifest(&deployment.hash.to_string())
293+
.map_err(SubgraphRegistrarError::Unknown)?
294+
.with_retries(),
295+
);
291296

292297
// Make sure the `raw_yaml` is present on both this subgraph and the graft base.
293298
self.subgraph_store

core/src/subgraph/provider.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,12 @@ impl<I: SubgraphInstanceManager> SubgraphAssignmentProviderTrait for SubgraphAss
8686
));
8787
}
8888

89-
let file_bytes = self
89+
let link_resolver = self
9090
.link_resolver
91+
.for_manifest(&loc.hash.to_string())
92+
.map_err(SubgraphAssignmentProviderError::ResolveError)?;
93+
94+
let file_bytes = link_resolver
9195
.cat(&logger, &loc.hash.to_ipfs_link())
9296
.await
9397
.map_err(SubgraphAssignmentProviderError::ResolveError)?;

core/src/subgraph/registrar.rs

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,7 @@ where
278278
start_block_override: Option<BlockPtr>,
279279
graft_block_override: Option<BlockPtr>,
280280
history_blocks: Option<i32>,
281+
ignore_graft_base: bool,
281282
) -> Result<DeploymentLocator, SubgraphRegistrarError> {
282283
// We don't have a location for the subgraph yet; that will be
283284
// assigned when we deploy for real. For logging purposes, make up a
@@ -286,19 +287,33 @@ where
286287
.logger_factory
287288
.subgraph_logger(&DeploymentLocator::new(DeploymentId(0), hash.clone()));
288289

289-
let raw: serde_yaml::Mapping = {
290-
let file_bytes = self
291-
.resolver
292-
.cat(&logger, &hash.to_ipfs_link())
293-
.await
294-
.map_err(|e| {
295-
SubgraphRegistrarError::ResolveError(
296-
SubgraphManifestResolveError::ResolveError(e),
297-
)
298-
})?;
299-
300-
serde_yaml::from_slice(&file_bytes)
301-
.map_err(|e| SubgraphRegistrarError::ResolveError(e.into()))?
290+
let resolver: Arc<dyn LinkResolver> = Arc::from(
291+
self.resolver
292+
.for_manifest(&hash.to_string())
293+
.map_err(SubgraphRegistrarError::Unknown)?,
294+
);
295+
296+
let raw = {
297+
let mut raw: serde_yaml::Mapping = {
298+
let file_bytes =
299+
resolver
300+
.cat(&logger, &hash.to_ipfs_link())
301+
.await
302+
.map_err(|e| {
303+
SubgraphRegistrarError::ResolveError(
304+
SubgraphManifestResolveError::ResolveError(e),
305+
)
306+
})?;
307+
308+
serde_yaml::from_slice(&file_bytes)
309+
.map_err(|e| SubgraphRegistrarError::ResolveError(e.into()))?
310+
};
311+
312+
if ignore_graft_base {
313+
raw.remove("graft");
314+
}
315+
316+
raw
302317
};
303318

304319
let kind = BlockchainKind::from_manifest(&raw).map_err(|e| {
@@ -323,7 +338,7 @@ where
323338
node_id,
324339
debug_fork,
325340
self.version_switching_mode,
326-
&self.resolver,
341+
&resolver,
327342
history_blocks,
328343
)
329344
.await?
@@ -341,7 +356,7 @@ where
341356
node_id,
342357
debug_fork,
343358
self.version_switching_mode,
344-
&self.resolver,
359+
&resolver,
345360
history_blocks,
346361
)
347362
.await?
@@ -359,7 +374,7 @@ where
359374
node_id,
360375
debug_fork,
361376
self.version_switching_mode,
362-
&self.resolver,
377+
&resolver,
363378
history_blocks,
364379
)
365380
.await?
@@ -377,7 +392,7 @@ where
377392
node_id,
378393
debug_fork,
379394
self.version_switching_mode,
380-
&self.resolver,
395+
&resolver,
381396
history_blocks,
382397
)
383398
.await?
@@ -567,10 +582,11 @@ async fn create_subgraph_version<C: Blockchain, S: SubgraphStore>(
567582
history_blocks_override: Option<i32>,
568583
) -> Result<DeploymentLocator, SubgraphRegistrarError> {
569584
let raw_string = serde_yaml::to_string(&raw).unwrap();
585+
570586
let unvalidated = UnvalidatedSubgraphManifest::<C>::resolve(
571587
deployment.clone(),
572588
raw,
573-
resolver,
589+
&resolver,
574590
logger,
575591
ENV_VARS.max_spec_version.clone(),
576592
)

graph/src/components/link_resolver/file.rs

Lines changed: 122 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::collections::HashMap;
12
use std::path::{Path, PathBuf};
23
use std::time::Duration;
34

@@ -12,16 +13,29 @@ use crate::prelude::{Error, JsonValueStream, LinkResolver as LinkResolverTrait};
1213
pub struct FileLinkResolver {
1314
base_dir: Option<PathBuf>,
1415
timeout: Duration,
16+
// This is a hashmap that maps the alias name to the path of the file that is aliased
17+
aliases: HashMap<String, PathBuf>,
18+
}
19+
20+
impl Default for FileLinkResolver {
21+
fn default() -> Self {
22+
Self {
23+
base_dir: None,
24+
timeout: Duration::from_secs(30),
25+
aliases: HashMap::new(),
26+
}
27+
}
1528
}
1629

1730
impl FileLinkResolver {
1831
/// Create a new FileLinkResolver
1932
///
2033
/// All paths are treated as absolute paths.
21-
pub fn new() -> Self {
34+
pub fn new(base_dir: Option<PathBuf>, aliases: HashMap<String, PathBuf>) -> Self {
2235
Self {
23-
base_dir: None,
36+
base_dir: base_dir,
2437
timeout: Duration::from_secs(30),
38+
aliases,
2539
}
2640
}
2741

@@ -33,18 +47,59 @@ impl FileLinkResolver {
3347
Self {
3448
base_dir: Some(base_dir.as_ref().to_owned()),
3549
timeout: Duration::from_secs(30),
50+
aliases: HashMap::new(),
3651
}
3752
}
3853

3954
fn resolve_path(&self, link: &str) -> PathBuf {
4055
let path = Path::new(link);
4156

57+
// If the path is an alias, use the aliased path
58+
if let Some(aliased) = self.aliases.get(link) {
59+
return aliased.clone();
60+
}
61+
4262
// Return the path as is if base_dir is None, or join with base_dir if present.
4363
// if "link" is an absolute path, join will simply return that path.
4464
self.base_dir
4565
.as_ref()
4666
.map_or_else(|| path.to_owned(), |base_dir| base_dir.join(link))
4767
}
68+
69+
/// This method creates a new resolver that is scoped to a specific subgraph
70+
/// It will set the base directory to the parent directory of the manifest path
71+
/// This is required because paths mentioned in the subgraph manifest are relative paths
72+
/// and we need a new resolver with the right base directory for the specific subgraph
73+
fn clone_for_manifest(&self, manifest_path_str: &str) -> Result<Self, Error> {
74+
let mut resolver = self.clone();
75+
76+
// Create a path to the manifest based on the current resolver's
77+
// base directory or default to using the deployment string as path
78+
// If the deployment string is an alias, use the aliased path
79+
let manifest_path = if let Some(aliased) = self.aliases.get(&manifest_path_str.to_string())
80+
{
81+
aliased.clone()
82+
} else {
83+
match &resolver.base_dir {
84+
Some(dir) => dir.join(&manifest_path_str),
85+
None => PathBuf::from(manifest_path_str),
86+
}
87+
};
88+
89+
let canonical_manifest_path = manifest_path
90+
.canonicalize()
91+
.map_err(|e| Error::from(anyhow!("Failed to canonicalize manifest path: {}", e)))?;
92+
93+
// The manifest path is the path of the subgraph manifest file in the build directory
94+
// We use the parent directory as the base directory for the new resolver
95+
let base_dir = canonical_manifest_path
96+
.parent()
97+
.ok_or_else(|| Error::from(anyhow!("Manifest path has no parent directory")))?
98+
.to_path_buf();
99+
100+
resolver.base_dir = Some(base_dir);
101+
Ok(resolver)
102+
}
48103
}
49104

50105
pub fn remove_prefix(link: &str) -> &str {
@@ -86,6 +141,10 @@ impl LinkResolverTrait for FileLinkResolver {
86141
}
87142
}
88143

144+
fn for_manifest(&self, manifest_path: &str) -> Result<Box<dyn LinkResolverTrait>, Error> {
145+
Ok(Box::new(self.clone_for_manifest(manifest_path)?))
146+
}
147+
89148
async fn get_block(&self, _logger: &Logger, _link: &Link) -> Result<Vec<u8>, Error> {
90149
Err(anyhow!("get_block is not implemented for FileLinkResolver").into())
91150
}
@@ -117,7 +176,7 @@ mod tests {
117176
file.write_all(test_content).unwrap();
118177

119178
// Create a resolver without a base directory
120-
let resolver = FileLinkResolver::new();
179+
let resolver = FileLinkResolver::default();
121180
let logger = slog::Logger::root(slog::Discard, slog::o!());
122181

123182
// Test valid path resolution
@@ -185,4 +244,64 @@ mod tests {
185244
let _ = fs::remove_file(test_file_path);
186245
let _ = fs::remove_dir(temp_dir);
187246
}
247+
248+
#[tokio::test]
249+
async fn test_file_resolver_with_aliases() {
250+
// Create a temporary directory for test files
251+
let temp_dir = env::temp_dir().join("file_resolver_test_aliases");
252+
let _ = fs::create_dir_all(&temp_dir);
253+
254+
// Create two test files with different content
255+
let test_file1_path = temp_dir.join("file.txt");
256+
let test_content1 = b"This is the file content";
257+
let mut file1 = fs::File::create(&test_file1_path).unwrap();
258+
file1.write_all(test_content1).unwrap();
259+
260+
let test_file2_path = temp_dir.join("another_file.txt");
261+
let test_content2 = b"This is another file content";
262+
let mut file2 = fs::File::create(&test_file2_path).unwrap();
263+
file2.write_all(test_content2).unwrap();
264+
265+
// Create aliases mapping
266+
let mut aliases = HashMap::new();
267+
aliases.insert("alias1".to_string(), test_file1_path.clone());
268+
aliases.insert("alias2".to_string(), test_file2_path.clone());
269+
aliases.insert("deployment-id".to_string(), test_file1_path.clone());
270+
271+
// Create resolver with aliases
272+
let resolver = FileLinkResolver::new(Some(temp_dir.clone()), aliases);
273+
let logger = slog::Logger::root(slog::Discard, slog::o!());
274+
275+
// Test resolving by aliases
276+
let link1 = Link {
277+
link: "alias1".to_string(),
278+
};
279+
let result1 = resolver.cat(&logger, &link1).await.unwrap();
280+
assert_eq!(result1, test_content1);
281+
282+
let link2 = Link {
283+
link: "alias2".to_string(),
284+
};
285+
let result2 = resolver.cat(&logger, &link2).await.unwrap();
286+
assert_eq!(result2, test_content2);
287+
288+
// Test that the alias works in for_deployment as well
289+
let deployment_resolver = resolver.clone_for_manifest("deployment-id").unwrap();
290+
291+
let expected_dir = test_file1_path.parent().unwrap();
292+
let deployment_base_dir = deployment_resolver.base_dir.clone().unwrap();
293+
294+
let canonical_expected_dir = expected_dir.canonicalize().unwrap();
295+
let canonical_deployment_dir = deployment_base_dir.canonicalize().unwrap();
296+
297+
assert_eq!(
298+
canonical_deployment_dir, canonical_expected_dir,
299+
"Build directory paths don't match"
300+
);
301+
302+
// Clean up
303+
let _ = fs::remove_file(test_file1_path);
304+
let _ = fs::remove_file(test_file2_path);
305+
let _ = fs::remove_dir(temp_dir);
306+
}
188307
}

graph/src/components/link_resolver/ipfs.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@ impl LinkResolverTrait for IpfsResolver {
7474
Box::new(s)
7575
}
7676

77+
fn for_manifest(&self, _manifest_path: &str) -> Result<Box<dyn LinkResolverTrait>, Error> {
78+
Ok(Box::new(self.cheap_clone()))
79+
}
80+
7781
async fn cat(&self, logger: &Logger, link: &Link) -> Result<Vec<u8>, Error> {
7882
let path = ContentPath::new(&link.link)?;
7983
let timeout = self.timeout;

graph/src/components/link_resolver/mod.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,20 @@ pub trait LinkResolver: Send + Sync + 'static + Debug {
3030
/// Fetches the IPLD block contents as bytes.
3131
async fn get_block(&self, logger: &Logger, link: &Link) -> Result<Vec<u8>, Error>;
3232

33+
/// Creates a new resolver scoped to a specific subgraph manifest.
34+
///
35+
/// For FileLinkResolver, this sets the base directory to the manifest's parent directory.
36+
/// Note the manifest here is the manifest in the build directory, not the manifest in the source directory
37+
/// to properly resolve relative paths referenced in the manifest (schema, mappings, etc.).
38+
/// For other resolvers (IPFS/Arweave), this simply returns a clone since they use
39+
/// absolute content identifiers.
40+
///
41+
/// The `manifest_path` parameter can be a filesystem path or an alias. Aliases are used
42+
/// in development environments (via `gnd --sources`) to map user-defined
43+
/// aliases to actual subgraph paths, enabling local development with file-based
44+
/// subgraphs that reference each other.
45+
fn for_manifest(&self, manifest_path: &str) -> Result<Box<dyn LinkResolver>, Error>;
46+
3347
/// Read the contents of `link` and deserialize them into a stream of JSON
3448
/// values. The values must each be on a single line; newlines are significant
3549
/// as they are used to split the file contents and each line is deserialized

0 commit comments

Comments
 (0)