-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Handle serde for ScalarUDF #9395
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @thinkharderdev
I want to help #8706 in this PR but I have some questions.
I have put them in this review :)
fn to_bytes(&self) -> Result<Bytes> { | ||
let mut buffer = BytesMut::new(); | ||
let protobuf: protobuf::LogicalExprNode = self | ||
.try_into() | ||
let extension_codec = DefaultLogicalExtensionCodec {}; | ||
let protobuf: protobuf::LogicalExprNode = serialize_expr(self, &extension_codec) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this be OK?
I think DefaultLogicalExtensionCodec
may need to implement some functions it didn't implement yet. I will also look into it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I think this is fine. These extension methods can be used for vanilla serialization and then if user needs custom stuff with a custom codec they can use the lower-level apis
@@ -177,7 +178,8 @@ impl Serializeable for Expr { | |||
let protobuf = protobuf::LogicalExprNode::decode(bytes) | |||
.map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?; | |||
|
|||
logical_plan::from_proto::parse_expr(&protobuf, registry) | |||
let extension_codec = DefaultLogicalExtensionCodec {}; | |||
logical_plan::from_proto::parse_expr(&protobuf, registry, &extension_codec) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And also here
|
||
fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>>; | ||
|
||
fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I plan to add aggregate_udf and window_udf after scalar_udf.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here I would include default implementations for all the new methods so it doesn't break backwards compatibility:
fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
Ok(())
}
// buf: &[u8], | ||
// ) -> Result<Arc<WindowUDF>>; | ||
|
||
// fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec<u8>) -> Result<()>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Plan to finish parts related to PhysicalExtensionCodec
after LogicalExtensionCodec
@@ -999,7 +1002,7 @@ pub fn parse_expr( | |||
let operands = binary_expr | |||
.operands | |||
.iter() | |||
.map(|expr| parse_expr(expr, registry)) | |||
.map(|expr| parse_expr(expr, registry, codec)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In from_proto.rs file, a lot of places need to change like this line.
I haven't finish it but submit this draft PR to verify the road.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good
|
||
fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>>; | ||
|
||
fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here I would include default implementations for all the new methods so it doesn't break backwards compatibility:
fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
Ok(())
}
fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> { | ||
not_impl_err!("LogicalExtensionCodec is not provided") | ||
} | ||
|
||
fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> { | ||
not_impl_err!("LogicalExtensionCodec is not provided") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These shouldn't be neccessary if we add the default impl in the trait
@@ -133,6 +136,10 @@ pub trait LogicalExtensionCodec: Debug + Send + Sync { | |||
node: Arc<dyn TableProvider>, | |||
buf: &mut Vec<u8>, | |||
) -> Result<()>; | |||
|
|||
fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>>; | |
fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> { | |
not_impl_err!("LogicalExtensionCodec is not provided for scalar function {name}") | |
} |
fn to_bytes(&self) -> Result<Bytes> { | ||
let mut buffer = BytesMut::new(); | ||
let protobuf: protobuf::LogicalExprNode = self | ||
.try_into() | ||
let extension_codec = DefaultLogicalExtensionCodec {}; | ||
let protobuf: protobuf::LogicalExprNode = serialize_expr(self, &extension_codec) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I think this is fine. These extension methods can be used for vanilla serialization and then if user needs custom stuff with a custom codec they can use the lower-level apis
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @thinkharderdev
Thanks for your review and I think this can pass all CIs and is close to merge.
A question I have is when designing the test case, I put the details below.
pub struct ScalarUDFProto { | ||
#[prost(message, tag = "1")] | ||
pub expr: ::core::option::Option<datafusion_proto::protobuf::LogicalExprNode>, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is my test proto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not how this would work. Expressions would be passed to the udf as arguments. What would potentially get serialized into ScalarUDFProto
would just be some non-expr state. For example, a regex UDF might look like:
struct MyRegexUdf {
// compiled regex
regex: Regex,
// regex as original string
pattern: String,
}
impl ScalarUdfImpl for MyRegexUdf {
// implementation here, not important for example
}
This would take a Utf8
expression in as an argument when evaluated but that will be serialized and deserialized separately. In the proto we only need to serialized the regex. So the proto would be something like
message MyRegexUdfNode {
string pattern = 1;
}
Then in our custom codec we have
fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
if let Ok(proto) = MyRegexUdfNode::decode(buf) {
let regex = Regex::new(&proto.pattern).map_err(|err| ...)?;
Ok(Arc::new(
ScalarUDF::new_from_impl(MyRegexUdf {
regex,
pattern: proto.pattern,
})
))
} else {
not_impl_err!("unrecognized scalar UDF implementation, cannot decode")
}
}
DataFusionError::Internal("Error decoding test table".to_string()) | ||
})?; | ||
if let Some(expr) = msg.expr.as_ref() { | ||
let node = from_proto::parse_expr(expr, ctx, self)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here if I want to convert a LogicalExprNode
to a Expr
, I have to call parse_expr
.
However, parse_expr
will call codec. try_decode_udf
, which lead circle call.
That makes me a little confused how to implement this method.
}); | ||
|
||
let proto = proto::ScalarUDFProto { | ||
expr: Some(serialize_expr(&func, self)?), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @thinkharderdev
Thanks for your guide and this PR can work now. :)
Can you review it when you are available?
datafusion/proto/Cargo.toml
Outdated
@@ -55,5 +55,6 @@ serde_json = { workspace = true, optional = true } | |||
|
|||
[dev-dependencies] | |||
doc-comment = { workspace = true } | |||
regex = "1.10.3" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think add this dependency for test is OK?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the test works just fine without actually having regex
. We can just make the example like
struct MyRegexUdf {
pattern: String
}
so we don't need to add an extra dependency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think so.
Fixed it.
fn try_decode_udf(&self, _name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> { | ||
if let Ok(proto) = proto::MyRegexUdfNode::decode(buf) { | ||
let regex = Regex::new(&proto.pattern).map_err(|_err| ..); | ||
match regex { | ||
Ok(regex) => Ok(Arc::new(ScalarUDF::new_from_impl(MyRegexUdf::new( | ||
regex, | ||
proto.pattern, | ||
)))), | ||
Err(e) => internal_err!("unsupported regex pattern {e:?}"), | ||
} | ||
} else { | ||
not_impl_err!("unrecognized scalar UDF implementation, cannot decode") | ||
} | ||
} | ||
|
||
fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> { | ||
let binding = node.inner(); | ||
let udf = binding.as_any().downcast_ref::<MyRegexUdf>().unwrap(); | ||
let proto = proto::MyRegexUdfNode { | ||
pattern: udf.pattern.clone(), | ||
}; | ||
proto.encode(buf).map_err(|e| { | ||
DataFusionError::Internal(format!("failed to encode udf: {e:?}")) | ||
})?; | ||
Ok(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the encode and decode logic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One small suggestion on the test but this looks great. Really nice work!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yyy1000 We do still need to handle serde for physical expressions as well. This would follow the same pattern as for logical expressions.
We can either do that here or in a follow up PR. If you'd rather do it in follow-up PR then I wouldn't object to merging this one as is as I think being able to serialize the logical plan is a useful feature on it's own
Co-authored-by: Dan Harris <[email protected]>
Hi, @thinkharderdev
Thank you again for your guidance and review, it's so helpful! |
Yep makes sense. No worries! Thanks for your work on this! |
🎉 |
Which issue does this PR close?
Closes #8706 .
Rationale for this change
See the #8706
What changes are included in this PR?
Add a new field in
ScalarUDFExprNode
in protoExtend LogicalExtensionCodec to have methods like
try_decode_udf
andtry_encode_udf
Add a
serialize_expr
function which accept aLogicalExtensionCodec
param to serializeExpr
In
parse_expr
, add aLogicalExtensionCodec
param to serializeExpr
to deserializeAre these changes tested?
Are there any user-facing changes?