Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37625][python] Don't skip type validation for Rows made with positional arguments #26414

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

autophagy
Copy link
Contributor

@autophagy autophagy commented Apr 7, 2025

What is the purpose of the change

When creating a table using TableEnvironment.from_elements, the Table API skips type validation on any Row elements that were created using positional arguments, rather than keyword arguments.

For example, take a table with a single column, whose type is an array of Rows. These rows have 2 columns, a VARCHAR and b BOOLEAN. If we create a table with elements where one of these rows has columns with incorrect datatypes:

schema = DataTypes.ROW(
    [
        DataTypes.FIELD(
            "col",
            DataTypes.ARRAY(
                DataTypes.ROW(
                    [
                        DataTypes.FIELD("a", DataTypes.STRING()),
                        DataTypes.FIELD("b", DataTypes.BOOLEAN()),
                    ]
                )
            ),
        ),
    ]
) 
elements = [(
    [("pyflink", True), ("pyflink", False), (True, "pyflink")],
)] 
table = self.t_env.from_elements(elements, schema)
table_result = list(table.execute().collect())

This results in a type validation error:

TypeError: field a in element in array field col: VARCHAR can not accept object True in type <class 'bool'>

In an example where we use Row instead of tuples, but with column arguments:

elements = [(
    [Row(a="pyflink", b=True), Row(a="pyflink", b=False), Row(a=True, b="pyflink")],
)]

We also get the same type validation error. However, when we use Row with positional arguments:

elements = [(
    [Row("pyflink", True), Row("pyflink", False), Row(True, "pyflink")],
)]

the type validation is skipped, leading to an unpickling error when collecting:

>           data = pickle.loads(data)
E           EOFError: Ran out of input 

The type validator skips this by stating that the order in the row could be different to the order of the datatype fields, but I don't think this is true. Both rows made from tuples and lists are type verified positionally with the positions of the Datatype fields, and in the case of the Row class the order the row's internal values are preserved. Similarly, Row class equality in cases where both of the rows are created with positional arguments is also determined by the order of the row's internal values.

Brief change log

  • Change the type validation logic used by TableEnvironment.from_elements so that Rows constructed with positional arguments are not skipped.

Verifying this change

This change added tests and can be verified as follows:

  • Added a test to ensure consistent type validation behaviour with rows constructed from tuples, lists, Rows with keyword arguments and Rows with positional arguments

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

@flinkbot
Copy link
Collaborator

flinkbot commented Apr 7, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@@ -108,6 +108,7 @@ def __init__(self, *args, **kwargs):
self._from_dict = True
else:
self._values = list(args)
self._from_dict = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we do this outside the if with self._from_dict = kwargs;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would set self._from_dict to {'a': 'pyflink', 'b': True} rather than a boolean which feels a little unintuitive

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants