diff --git a/python/pydantic_core/core_schema.py b/python/pydantic_core/core_schema.py index c7ca7cfd8..3fc307ee3 100644 --- a/python/pydantic_core/core_schema.py +++ b/python/pydantic_core/core_schema.py @@ -3494,6 +3494,120 @@ def arguments_schema( ) +class ArgumentsV3Parameter(TypedDict, total=False): + name: Required[str] + schema: Required[CoreSchema] + mode: Literal[ + 'positional_only', + 'positional_or_keyword', + 'keyword_only', + 'var_args', + 'var_kwargs_uniform', + 'var_kwargs_unpacked_typed_dict', + ] # default positional_or_keyword + alias: Union[str, list[Union[str, int]], list[list[Union[str, int]]]] + + +def arguments_v3_parameter( + name: str, + schema: CoreSchema, + *, + mode: Literal[ + 'positional_only', + 'positional_or_keyword', + 'keyword_only', + 'var_args', + 'var_kwargs_uniform', + 'var_kwargs_unpacked_typed_dict', + ] + | None = None, + alias: str | list[str | int] | list[list[str | int]] | None = None, +) -> ArgumentsV3Parameter: + """ + Returns a schema that matches an argument parameter, e.g.: + + ```py + from pydantic_core import SchemaValidator, core_schema + + param = core_schema.arguments_v3_parameter( + name='a', schema=core_schema.str_schema(), mode='positional_only' + ) + schema = core_schema.arguments_v3_schema([param]) + v = SchemaValidator(schema) + assert v.validate_python({'a': 'hello'}) == (('hello',), {}) + ``` + + Args: + name: The name to use for the argument parameter + schema: The schema to use for the argument parameter + mode: The mode to use for the argument parameter + alias: The alias to use for the argument parameter + """ + return _dict_not_none(name=name, schema=schema, mode=mode, alias=alias) + + +class ArgumentsV3Schema(TypedDict, total=False): + type: Required[Literal['arguments-v3']] + arguments_schema: Required[list[ArgumentsV3Parameter]] + validate_by_name: bool + validate_by_alias: bool + extra_behavior: Literal['forbid', 'ignore'] # 'allow' doesn't make sense here. + ref: str + metadata: dict[str, Any] + serialization: SerSchema + + +def arguments_v3_schema( + arguments: list[ArgumentsV3Parameter], + *, + validate_by_name: bool | None = None, + validate_by_alias: bool | None = None, + extra_behavior: Literal['forbid', 'ignore'] | None = None, + ref: str | None = None, + metadata: dict[str, Any] | None = None, + serialization: SerSchema | None = None, +) -> ArgumentsV3Schema: + """ + Returns a schema that matches an arguments schema, e.g.: + + ```py + from pydantic_core import SchemaValidator, core_schema + + param_a = core_schema.arguments_v3_parameter( + name='a', schema=core_schema.str_schema(), mode='positional_only' + ) + param_b = core_schema.arguments_v3_parameter( + name='kwargs', schema=core_schema.bool_schema(), mode='var_kwargs_uniform' + ) + schema = core_schema.arguments_v3_schema([param_a, param_b]) + v = SchemaValidator(schema) + assert v.validate_python({'a': 'hi', 'kwargs': {'b': True}}) == (('hi',), {'b': True}) + ``` + + This schema is currently not used by other Pydantic components. In V3, it will most likely + become the default arguments schema for the `'call'` schema. + + Args: + arguments: The arguments to use for the arguments schema. + validate_by_name: Whether to populate by the parameter names, defaults to `False`. + validate_by_alias: Whether to populate by the parameter aliases, defaults to `True`. + extra_behavior: The extra behavior to use. + ref: optional unique identifier of the schema, used to reference the schema in other places. + metadata: Any other information you want to include with the schema, not used by pydantic-core. + serialization: Custom serialization schema. + """ + return _dict_not_none( + type='arguments-v3', + arguments_schema=arguments, + validate_by_name=validate_by_name, + validate_by_alias=validate_by_alias, + extra_behavior=extra_behavior, + ref=ref, + metadata=metadata, + serialization=serialization, + ) + + class CallSchema(TypedDict, total=False): type: Required[Literal['call']] arguments_schema: Required[CoreSchema] @@ -3921,6 +4035,7 @@ def definition_reference_schema( DataclassArgsSchema, DataclassSchema, ArgumentsSchema, + ArgumentsV3Schema, CallSchema, CustomErrorSchema, JsonSchema, @@ -3978,6 +4093,7 @@ def definition_reference_schema( 'dataclass-args', 'dataclass', 'arguments', + 'arguments-v3', 'call', 'custom-error', 'json', diff --git a/src/input/input_abstract.rs b/src/input/input_abstract.rs index aa9c6eb16..119c862ac 100644 --- a/src/input/input_abstract.rs +++ b/src/input/input_abstract.rs @@ -81,6 +81,8 @@ pub trait Input<'py>: fmt::Debug { fn validate_args(&self) -> ValResult>; + fn validate_args_v3(&self) -> ValResult>; + fn validate_dataclass_args<'a>(&'a self, dataclass_name: &str) -> ValResult>; fn validate_str(&self, strict: bool, coerce_numbers_to_str: bool) -> ValMatch>; @@ -265,6 +267,7 @@ pub trait ValidatedList<'py> { pub trait ValidatedTuple<'py> { type Item: BorrowInput<'py>; fn len(&self) -> Option; + fn try_for_each(self, f: impl FnMut(PyResult) -> ValResult<()>) -> ValResult<()>; fn iterate(self, consumer: impl ConsumeIterator, Output = R>) -> ValResult; } @@ -313,6 +316,9 @@ impl<'py> ValidatedTuple<'py> for Never { fn len(&self) -> Option { unreachable!() } + fn try_for_each(self, _f: impl FnMut(PyResult) -> ValResult<()>) -> ValResult<()> { + unreachable!() + } fn iterate(self, _consumer: impl ConsumeIterator, Output = R>) -> ValResult { unreachable!() } diff --git a/src/input/input_json.rs b/src/input/input_json.rs index 6479357cc..4de3fd4cf 100644 --- a/src/input/input_json.rs +++ b/src/input/input_json.rs @@ -85,6 +85,11 @@ impl<'py, 'data> Input<'py> for JsonValue<'data> { } } + #[cfg_attr(has_coverage_attribute, coverage(off))] + fn validate_args_v3(&self) -> ValResult> { + Err(ValError::new(ErrorTypeDefaults::ArgumentsType, self)) + } + fn validate_dataclass_args<'a>(&'a self, class_name: &str) -> ValResult> { match self { JsonValue::Object(object) => Ok(JsonArgs::new(None, Some(object))), @@ -383,6 +388,11 @@ impl<'py> Input<'py> for str { Err(ValError::new(ErrorTypeDefaults::ArgumentsType, self)) } + #[cfg_attr(has_coverage_attribute, coverage(off))] + fn validate_args_v3(&self) -> ValResult { + Err(ValError::new(ErrorTypeDefaults::ArgumentsType, self)) + } + #[cfg_attr(has_coverage_attribute, coverage(off))] fn validate_dataclass_args(&self, class_name: &str) -> ValResult { let class_name = class_name.to_string(); @@ -579,6 +589,12 @@ impl<'a, 'data> ValidatedTuple<'_> for &'a JsonArray<'data> { fn len(&self) -> Option { Some(Vec::len(self)) } + fn try_for_each(self, mut f: impl FnMut(PyResult) -> ValResult<()>) -> ValResult<()> { + for item in self.iter() { + f(Ok(item))?; + } + Ok(()) + } fn iterate(self, consumer: impl ConsumeIterator, Output = R>) -> ValResult { Ok(consumer.consume_iterator(self.iter().map(Ok))) } diff --git a/src/input/input_python.rs b/src/input/input_python.rs index ea6eab054..e82cbaed7 100644 --- a/src/input/input_python.rs +++ b/src/input/input_python.rs @@ -117,6 +117,16 @@ impl<'py> Input<'py> for Bound<'py, PyAny> { } } + fn validate_args_v3(&self) -> ValResult> { + if let Ok(args_kwargs) = self.extract::() { + let args = args_kwargs.args.into_bound(self.py()); + let kwargs = args_kwargs.kwargs.map(|d| d.into_bound(self.py())); + Ok(PyArgs::new(Some(args), kwargs)) + } else { + Err(ValError::new(ErrorTypeDefaults::ArgumentsType, self)) + } + } + fn validate_dataclass_args<'a>(&'a self, class_name: &str) -> ValResult> { if let Ok(dict) = self.downcast::() { Ok(PyArgs::new(None, Some(dict.clone()))) @@ -915,7 +925,15 @@ impl<'py> PySequenceIterable<'_, 'py> { PySequenceIterable::Iterator(iter) => iter.len().ok(), } } - + fn generic_try_for_each(self, f: impl FnMut(PyResult>) -> ValResult<()>) -> ValResult<()> { + match self { + PySequenceIterable::List(iter) => iter.iter().map(Ok).try_for_each(f), + PySequenceIterable::Tuple(iter) => iter.iter().map(Ok).try_for_each(f), + PySequenceIterable::Set(iter) => iter.iter().map(Ok).try_for_each(f), + PySequenceIterable::FrozenSet(iter) => iter.iter().map(Ok).try_for_each(f), + PySequenceIterable::Iterator(mut iter) => iter.try_for_each(f), + } + } fn generic_iterate( self, consumer: impl ConsumeIterator>, Output = R>, @@ -951,6 +969,9 @@ impl<'py> ValidatedTuple<'py> for PySequenceIterable<'_, 'py> { fn len(&self) -> Option { self.generic_len() } + fn try_for_each(self, f: impl FnMut(PyResult) -> ValResult<()>) -> ValResult<()> { + self.generic_try_for_each(f) + } fn iterate(self, consumer: impl ConsumeIterator, Output = R>) -> ValResult { self.generic_iterate(consumer) } diff --git a/src/input/input_string.rs b/src/input/input_string.rs index a50b3cff2..0ab4ad014 100644 --- a/src/input/input_string.rs +++ b/src/input/input_string.rs @@ -89,6 +89,11 @@ impl<'py> Input<'py> for StringMapping<'py> { Err(ValError::new(ErrorTypeDefaults::ArgumentsType, self)) } + fn validate_args_v3(&self) -> ValResult> { + // do we want to support this? + Err(ValError::new(ErrorTypeDefaults::ArgumentsType, self)) + } + fn validate_dataclass_args<'a>(&'a self, _dataclass_name: &str) -> ValResult> { match self { StringMapping::String(_) => Err(ValError::new(ErrorTypeDefaults::ArgumentsType, self)), diff --git a/src/validators/arguments_v3.rs b/src/validators/arguments_v3.rs new file mode 100644 index 000000000..22c9c6d0f --- /dev/null +++ b/src/validators/arguments_v3.rs @@ -0,0 +1,782 @@ +use std::str::FromStr; + +use pyo3::intern; +use pyo3::prelude::*; +use pyo3::types::{PyDict, PyList, PyString, PyTuple}; + +use ahash::AHashSet; +use pyo3::IntoPyObjectExt; + +use crate::build_tools::py_schema_err; +use crate::build_tools::{schema_or_config_same, ExtraBehavior}; +use crate::errors::LocItem; +use crate::errors::{ErrorTypeDefaults, ValError, ValLineError, ValResult}; +use crate::input::ConsumeIterator; +use crate::input::{ + Arguments, BorrowInput, Input, KeywordArgs, PositionalArgs, ValidatedDict, ValidatedTuple, ValidationMatch, +}; +use crate::lookup_key::LookupKeyCollection; +use crate::tools::SchemaDict; + +use super::validation_state::ValidationState; +use super::{build_validator, BuildValidator, CombinedValidator, DefinitionsBuilder, Validator}; + +#[derive(Debug, PartialEq)] +enum ParameterMode { + PositionalOnly, + PositionalOrKeyword, + VarArgs, + KeywordOnly, + VarKwargsUniform, + VarKwargsUnpackedTypedDict, +} + +impl FromStr for ParameterMode { + type Err = PyErr; + + fn from_str(s: &str) -> Result { + match s { + "positional_only" => Ok(Self::PositionalOnly), + "positional_or_keyword" => Ok(Self::PositionalOrKeyword), + "var_args" => Ok(Self::VarArgs), + "keyword_only" => Ok(Self::KeywordOnly), + "var_kwargs_uniform" => Ok(Self::VarKwargsUniform), + "var_kwargs_unpacked_typed_dict" => Ok(Self::VarKwargsUnpackedTypedDict), + s => py_schema_err!("Invalid var_kwargs mode: `{}`", s), + } + } +} + +#[derive(Debug)] +struct Parameter { + name: String, + mode: ParameterMode, + lookup_key_collection: LookupKeyCollection, + validator: CombinedValidator, +} + +impl Parameter { + fn is_variadic(&self) -> bool { + matches!( + self.mode, + ParameterMode::VarArgs | ParameterMode::VarKwargsUniform | ParameterMode::VarKwargsUnpackedTypedDict + ) + } +} + +#[derive(Debug)] +pub struct ArgumentsV3Validator { + parameters: Vec, + positional_params_count: usize, + loc_by_alias: bool, + extra: ExtraBehavior, + validate_by_alias: Option, + validate_by_name: Option, +} + +impl BuildValidator for ArgumentsV3Validator { + const EXPECTED_TYPE: &'static str = "arguments-v3"; + + fn build( + schema: &Bound<'_, PyDict>, + config: Option<&Bound<'_, PyDict>>, + definitions: &mut DefinitionsBuilder, + ) -> PyResult { + let py = schema.py(); + + let arguments_schema: Bound<'_, PyList> = schema.get_as_req(intern!(py, "arguments_schema"))?; + let mut parameters: Vec = Vec::with_capacity(arguments_schema.len()); + + let mut had_default_arg = false; + let mut had_positional_or_keyword = false; + let mut had_var_args = false; + let mut had_keyword_only = false; + let mut had_var_kwargs = false; + + let mut names: AHashSet = AHashSet::with_capacity(arguments_schema.len()); + + for arg in arguments_schema.iter() { + let arg = arg.downcast::()?; + + let py_name: Bound = arg.get_as_req(intern!(py, "name"))?; + let name = py_name.to_string(); + if !names.insert(name.clone()) { + return py_schema_err!("Duplicate parameter '{}'", name); + } + + let py_mode = arg.get_as::>(intern!(py, "mode"))?; + let py_mode = py_mode + .as_ref() + .map(|py_str| py_str.to_str()) + .transpose()? + .unwrap_or("positional_or_keyword"); + + let mode = ParameterMode::from_str(py_mode)?; + + match mode { + ParameterMode::PositionalOnly => { + if had_positional_or_keyword || had_var_args || had_keyword_only || had_var_kwargs { + return py_schema_err!( + "Positional only parameter '{}' cannot follow other parameter kinds", + name + ); + } + } + ParameterMode::PositionalOrKeyword => { + if had_var_args || had_keyword_only || had_var_kwargs { + return py_schema_err!( + "Positional or keyword parameter '{}' cannot follow variadic or keyword only parameters", + name + ); + } + had_positional_or_keyword = true; + } + ParameterMode::VarArgs => { + if had_var_args { + return py_schema_err!("Duplicate variadic positional parameter '{}'", name); + } + if had_keyword_only || had_var_kwargs { + return py_schema_err!( + "Variadic positional parameter '{}' cannot follow variadic or keyword only parameters", + name + ); + } + had_var_args = true; + } + ParameterMode::KeywordOnly => { + if had_var_kwargs { + return py_schema_err!( + "Keyword only parameter '{}' cannot follow variadic keyword only parameter", + name + ); + } + had_keyword_only = true; + } + ParameterMode::VarKwargsUniform | ParameterMode::VarKwargsUnpackedTypedDict => { + if had_var_kwargs { + return py_schema_err!("Duplicate variadic keyword parameter '{}'", name); + } + had_var_kwargs = true; + } + } + + let schema = arg.get_as_req(intern!(py, "schema"))?; + + let validator = match build_validator(&schema, config, definitions) { + Ok(v) => v, + Err(err) => return py_schema_err!("Parameter '{}':\n {}", name, err), + }; + + let has_default = match validator { + CombinedValidator::WithDefault(ref v) => { + if v.omit_on_error() { + return py_schema_err!("Parameter '{}': omit_on_error cannot be used with arguments", name); + } + v.has_default() + } + _ => false, + }; + + if had_default_arg && !has_default && !had_keyword_only { + return py_schema_err!("Required parameter '{}' follows parameter with default", name); + } else if has_default { + had_default_arg = true; + } + + let validation_alias = arg.get_item(intern!(py, "alias"))?; + let lookup_key_collection = LookupKeyCollection::new(py, validation_alias, name.as_str())?; + + parameters.push(Parameter { + name, + mode, + lookup_key_collection, + validator, + }); + } + + let positional_params_count = parameters + .iter() + .filter(|p| { + matches!( + p.mode, + ParameterMode::PositionalOnly | ParameterMode::PositionalOrKeyword + ) + }) + .count(); + + Ok(Self { + parameters, + positional_params_count, + loc_by_alias: config.get_as(intern!(py, "loc_by_alias"))?.unwrap_or(true), + extra: ExtraBehavior::from_schema_or_config(py, schema, config, ExtraBehavior::Forbid)?, + validate_by_alias: schema_or_config_same(schema, config, intern!(py, "validate_by_alias"))?, + validate_by_name: schema_or_config_same(schema, config, intern!(py, "validate_by_name"))?, + } + .into()) + } +} + +impl_py_gc_traverse!(Parameter { validator }); + +impl_py_gc_traverse!(ArgumentsV3Validator { parameters }); + +impl ArgumentsV3Validator { + /// Validate the arguments from a mapping: + /// ```py + /// def func(a: int, /, *, b: str, **kwargs: int) -> None: + /// ... + /// + /// valid_mapping = {'a': 1, 'b': 'test', 'kwargs': {'c': 1, 'd': 2}} + /// ``` + fn validate_from_mapping<'py>( + &self, + py: Python<'py>, + original_input: &(impl Input<'py> + ?Sized), + mapping: impl ValidatedDict<'py>, + state: &mut ValidationState<'_, 'py>, + ) -> ValResult { + let mut output_args: Vec = Vec::with_capacity(self.positional_params_count); + let output_kwargs = PyDict::new(py); + let mut errors: Vec = Vec::new(); + + let validate_by_alias = state.validate_by_alias_or(self.validate_by_alias); + let validate_by_name = state.validate_by_name_or(self.validate_by_name); + + // Keep track of used keys for extra behavior: + let mut used_keys: Option> = if self.extra == ExtraBehavior::Ignore || mapping.is_py_get_attr() { + None + } else { + Some(AHashSet::with_capacity(self.parameters.len())) + }; + + for parameter in &self.parameters { + let lookup_key = parameter + .lookup_key_collection + .select(validate_by_alias, validate_by_name)?; + + // A value is present in the mapping: + if let Some((lookup_path, dict_value)) = mapping.get_item(lookup_key)? { + if let Some(ref mut used_keys) = used_keys { + // key is "used" whether or not validation passes, since we want to skip this key in + // extra logic either way + used_keys.insert(lookup_path.first_key()); + } + + match parameter.mode { + ParameterMode::PositionalOnly | ParameterMode::PositionalOrKeyword => { + match parameter.validator.validate(py, dict_value.borrow_input(), state) { + Ok(value) => output_args.push(value), + Err(ValError::LineErrors(line_errors)) => { + errors.extend( + line_errors.into_iter().map(|err| { + lookup_path.apply_error_loc(err, self.loc_by_alias, ¶meter.name) + }), + ); + } + Err(err) => return Err(err), + } + } + ParameterMode::VarArgs => match dict_value.borrow_input().validate_tuple(false) { + Ok(tuple) => { + let mut i: i64 = 0; + tuple.unpack(state).try_for_each(|v| { + match parameter.validator.validate(py, v.unwrap().borrow_input(), state) { + Ok(tuple_value) => { + output_args.push(tuple_value); + i += 1; + Ok(()) + } + Err(ValError::LineErrors(line_errors)) => { + errors.extend(line_errors.into_iter().map(|err| { + lookup_path.apply_error_loc( + err.with_outer_location(i), + self.loc_by_alias, + ¶meter.name, + ) + })); + i += 1; + Ok(()) + } + Err(err) => { + i += 1; + Err(err) + } + } + })?; + } + Err(_) => { + let val_error = ValLineError::new(ErrorTypeDefaults::TupleType, dict_value.borrow_input()); + errors.push(lookup_path.apply_error_loc(val_error, self.loc_by_alias, ¶meter.name)); + } + }, + ParameterMode::KeywordOnly => { + match parameter.validator.validate(py, dict_value.borrow_input(), state) { + Ok(value) => { + output_kwargs.set_item(PyString::new(py, parameter.name.as_str()).unbind(), value)?; + } + Err(ValError::LineErrors(line_errors)) => { + errors.extend( + line_errors.into_iter().map(|err| { + lookup_path.apply_error_loc(err, self.loc_by_alias, ¶meter.name) + }), + ); + } + Err(err) => return Err(err), + } + } + ParameterMode::VarKwargsUniform => match dict_value.borrow_input().as_kwargs(py) { + // We will validate that keys are strings, and values match the validator: + Some(value) => { + for (dict_key, dict_value) in value { + // Validate keys are strings: + match dict_key.validate_str(true, false).map(ValidationMatch::into_inner) { + Ok(_) => (), + Err(ValError::LineErrors(line_errors)) => { + for err in line_errors { + errors.push( + err.with_outer_location(dict_key.clone()) + .with_outer_location(¶meter.name) + .with_type(ErrorTypeDefaults::InvalidKey), + ); + } + continue; + } + Err(err) => return Err(err), + } + // Validate values: + match parameter.validator.validate(py, dict_value.borrow_input(), state) { + Ok(value) => output_kwargs.set_item(dict_key, value)?, + Err(ValError::LineErrors(line_errors)) => { + errors.extend(line_errors.into_iter().map(|err| { + lookup_path.apply_error_loc( + err.with_outer_location(dict_key.clone()), + self.loc_by_alias, + ¶meter.name, + ) + })); + } + Err(err) => return Err(err), + } + } + } + None => { + let val_error = ValLineError::new(ErrorTypeDefaults::DictType, dict_value); + errors.push(lookup_path.apply_error_loc(val_error, self.loc_by_alias, ¶meter.name)); + } + }, + ParameterMode::VarKwargsUnpackedTypedDict => { + match parameter.validator.validate(py, dict_value.borrow_input(), state) { + Ok(value) => { + output_kwargs.update(value.downcast_bound::(py).unwrap().as_mapping())?; + } + Err(ValError::LineErrors(line_errors)) => { + errors.extend( + line_errors.into_iter().map(|err| { + lookup_path.apply_error_loc(err, self.loc_by_alias, ¶meter.name) + }), + ); + } + Err(err) => return Err(err), + } + } + } + // No value is present in the mapping... + } else { + match parameter.mode { + // ... fallback to the default value (and error if no default): + ParameterMode::PositionalOnly | ParameterMode::PositionalOrKeyword | ParameterMode::KeywordOnly => { + if let Some(value) = + parameter + .validator + .default_value(py, Some(parameter.name.as_str()), state)? + { + if matches!( + parameter.mode, + ParameterMode::PositionalOnly | ParameterMode::PositionalOrKeyword + ) { + output_args.push(value); + } else { + output_kwargs.set_item(PyString::new(py, parameter.name.as_str()).unbind(), value)?; + } + } else { + let error_type = match parameter.mode { + ParameterMode::PositionalOnly => ErrorTypeDefaults::MissingPositionalOnlyArgument, + ParameterMode::PositionalOrKeyword => ErrorTypeDefaults::MissingArgument, + ParameterMode::KeywordOnly => ErrorTypeDefaults::MissingKeywordOnlyArgument, + _ => unreachable!(), + }; + + errors.push(lookup_key.error( + error_type, + original_input, + self.loc_by_alias, + ¶meter.name, + )); + } + } + // ... validate the unpacked kwargs against an empty dict: + ParameterMode::VarKwargsUnpackedTypedDict => { + match parameter.validator.validate(py, PyDict::new(py).borrow_input(), state) { + Ok(value) => { + output_kwargs.update(value.downcast_bound::(py).unwrap().as_mapping())?; + } + Err(ValError::LineErrors(line_errors)) => { + errors.extend( + line_errors + .into_iter() + .map(|err| err.with_outer_location(¶meter.name)), + ); + } + Err(err) => return Err(err), + } + } + // Variadic args/uniform kwargs can be empty by definition: + _ => (), + } + } + } + + if let Some(used_keys) = used_keys { + struct ValidateExtra<'a> { + used_keys: AHashSet<&'a str>, + errors: &'a mut Vec, + extra_behavior: ExtraBehavior, + } + + impl<'py, Key, Value> ConsumeIterator> for ValidateExtra<'_> + where + Key: BorrowInput<'py> + Clone + Into, + Value: BorrowInput<'py>, + { + type Output = ValResult<()>; + fn consume_iterator(self, iterator: impl Iterator>) -> Self::Output { + for item_result in iterator { + let (raw_key, value) = item_result?; + let either_str = match raw_key + .borrow_input() + .validate_str(true, false) + .map(ValidationMatch::into_inner) + { + Ok(k) => k, + Err(ValError::LineErrors(line_errors)) => { + for err in line_errors { + self.errors.push( + err.with_outer_location(raw_key.clone()) + .with_type(ErrorTypeDefaults::InvalidKey), + ); + } + continue; + } + Err(err) => return Err(err), + }; + let cow = either_str.as_cow()?; + if self.used_keys.contains(cow.as_ref()) { + continue; + } + + let value = value.borrow_input(); + + if self.extra_behavior == ExtraBehavior::Forbid { + self.errors.push(ValLineError::new_with_loc( + ErrorTypeDefaults::ExtraForbidden, + value, + raw_key.clone(), + )); + } + } + + Ok(()) + } + } + + mapping.iterate(ValidateExtra { + used_keys, + errors: &mut errors, + extra_behavior: self.extra, + })??; + } + + if !errors.is_empty() { + Err(ValError::LineErrors(errors)) + } else { + Ok((PyTuple::new(py, output_args)?, output_kwargs).into_py_any(py)?) + } + } + + /// Validate the arguments from an [`ArgsKwargs`][crate::argument_markers::ArgsKwargs] instance: + /// ```py + /// def func(a: int, /, *, b: str, **kwargs: int) -> None: + /// ... + /// + /// valid_argskwargs = ArgsKwargs((1,), {'b': 'test', 'c': 1, 'd': 2}) + /// ``` + fn validate_from_argskwargs<'py>( + &self, + py: Python<'py>, + original_input: &(impl Input<'py> + ?Sized), + args_kwargs: impl Arguments<'py>, + state: &mut ValidationState<'_, 'py>, + ) -> ValResult { + let mut output_args: Vec = Vec::with_capacity(self.positional_params_count); + let output_kwargs = PyDict::new(py); + let mut errors: Vec = Vec::new(); + let mut used_kwargs: AHashSet<&str> = AHashSet::with_capacity(self.parameters.len()); + + let validate_by_alias = state.validate_by_alias_or(self.validate_by_alias); + let validate_by_name = state.validate_by_name_or(self.validate_by_name); + + // go through non variadic parameters, getting the value from args or kwargs and validating it + for (index, parameter) in self.parameters.iter().filter(|p| !p.is_variadic()).enumerate() { + let lookup_key = parameter + .lookup_key_collection + .select(validate_by_alias, validate_by_name)?; + + let mut pos_value = None; + if let Some(args) = args_kwargs.args() { + if matches!( + parameter.mode, + ParameterMode::PositionalOnly | ParameterMode::PositionalOrKeyword + ) { + pos_value = args.get_item(index); + } + } + + let mut kw_value = None; + if let Some(kwargs) = args_kwargs.kwargs() { + if matches!( + parameter.mode, + ParameterMode::PositionalOrKeyword | ParameterMode::KeywordOnly + ) { + if let Some((lookup_path, value)) = kwargs.get_item(lookup_key)? { + used_kwargs.insert(lookup_path.first_key()); + kw_value = Some((lookup_path, value)); + } + } + } + + match (pos_value, kw_value) { + (Some(_), Some((_, kw_value))) => { + errors.push(ValLineError::new_with_loc( + ErrorTypeDefaults::MultipleArgumentValues, + kw_value.borrow_input(), + parameter.name.clone(), + )); + } + (Some(pos_value), None) => match parameter.validator.validate(py, pos_value.borrow_input(), state) { + Ok(value) => output_args.push(value), + Err(ValError::LineErrors(line_errors)) => { + errors.extend(line_errors.into_iter().map(|err| err.with_outer_location(index))); + } + Err(err) => return Err(err), + }, + (None, Some((lookup_path, kw_value))) => { + match parameter.validator.validate(py, kw_value.borrow_input(), state) { + Ok(value) => { + output_kwargs.set_item(PyString::new(py, parameter.name.as_str()).unbind(), value)?; + } + Err(ValError::LineErrors(line_errors)) => { + errors.extend( + line_errors + .into_iter() + .map(|err| lookup_path.apply_error_loc(err, self.loc_by_alias, ¶meter.name)), + ); + } + Err(err) => return Err(err), + } + } + (None, None) => { + if let Some(value) = parameter + .validator + .default_value(py, Some(parameter.name.as_str()), state)? + { + if parameter.mode == ParameterMode::PositionalOnly { + output_args.push(value); + } else { + output_kwargs.set_item(PyString::new(py, parameter.name.as_str()).unbind(), value)?; + } + } else { + // Required and no default, error: + match parameter.mode { + ParameterMode::PositionalOnly => { + errors.push(ValLineError::new_with_loc( + ErrorTypeDefaults::MissingPositionalOnlyArgument, + original_input, + index, + )); + } + ParameterMode::PositionalOrKeyword => { + errors.push(lookup_key.error( + ErrorTypeDefaults::MissingArgument, + original_input, + self.loc_by_alias, + ¶meter.name, + )); + } + ParameterMode::KeywordOnly => { + errors.push(lookup_key.error( + ErrorTypeDefaults::MissingKeywordOnlyArgument, + original_input, + self.loc_by_alias, + ¶meter.name, + )); + } + _ => unreachable!(), + } + } + } + } + } + + // if there are args check any where index > positional_params_count since they won't have been checked yet + if let Some(args) = args_kwargs.args() { + let len = args.len(); + if len > self.positional_params_count { + if let Some(var_args_param) = self.parameters.iter().find(|p| p.mode == ParameterMode::VarArgs) { + for (index, item) in args.iter().enumerate().skip(self.positional_params_count) { + match var_args_param.validator.validate(py, item.borrow_input(), state) { + Ok(value) => output_args.push(value), + Err(ValError::LineErrors(line_errors)) => { + errors.extend(line_errors.into_iter().map(|err| err.with_outer_location(index))); + } + Err(err) => return Err(err), + } + } + } else { + for (index, item) in args.iter().enumerate().skip(self.positional_params_count) { + errors.push(ValLineError::new_with_loc( + ErrorTypeDefaults::UnexpectedPositionalArgument, + item, + index, + )); + } + } + } + } + + let remaining_kwargs = PyDict::new(py); + + // if there are kwargs check any that haven't been processed yet + if let Some(kwargs) = args_kwargs.kwargs() { + if kwargs.len() > used_kwargs.len() { + for result in kwargs.iter() { + let (raw_key, value) = result?; + let either_str = match raw_key + .borrow_input() + .validate_str(true, false) + .map(ValidationMatch::into_inner) + { + Ok(k) => k, + Err(ValError::LineErrors(line_errors)) => { + for err in line_errors { + errors.push( + err.with_outer_location(raw_key.clone()) + .with_type(ErrorTypeDefaults::InvalidKey), + ); + } + continue; + } + Err(err) => return Err(err), + }; + if !used_kwargs.contains(either_str.as_cow()?.as_ref()) { + let maybe_var_kwargs_parameter = self.parameters.iter().find(|p| { + matches!( + p.mode, + ParameterMode::VarKwargsUniform | ParameterMode::VarKwargsUnpackedTypedDict + ) + }); + + match maybe_var_kwargs_parameter { + None => { + if self.extra == ExtraBehavior::Forbid { + errors.push(ValLineError::new_with_loc( + ErrorTypeDefaults::UnexpectedKeywordArgument, + value, + raw_key.clone(), + )); + } + } + Some(var_kwargs_parameter) => { + match var_kwargs_parameter.mode { + ParameterMode::VarKwargsUniform => { + match var_kwargs_parameter.validator.validate(py, value.borrow_input(), state) { + Ok(value) => { + output_kwargs + .set_item(either_str.as_py_string(py, state.cache_str()), value)?; + } + Err(ValError::LineErrors(line_errors)) => { + for err in line_errors { + errors.push(err.with_outer_location(raw_key.clone())); + } + } + Err(err) => return Err(err), + } + } + ParameterMode::VarKwargsUnpackedTypedDict => { + // Save to the remaining kwargs, we will validate as a single dict: + remaining_kwargs.set_item( + either_str.as_py_string(py, state.cache_str()), + value.borrow_input().to_object(py)?, + )?; + } + _ => unreachable!(), + } + } + } + } + } + } + } + + let maybe_var_kwargs_parameter = self + .parameters + .iter() + .find(|p| p.mode == ParameterMode::VarKwargsUnpackedTypedDict); + + if let Some(var_kwargs_parameter) = maybe_var_kwargs_parameter { + match var_kwargs_parameter + .validator + .validate(py, remaining_kwargs.as_any(), state) + { + Ok(value) => { + output_kwargs.update(value.downcast_bound::(py).unwrap().as_mapping())?; + } + Err(ValError::LineErrors(line_errors)) => { + errors.extend(line_errors); + } + Err(err) => return Err(err), + } + } + + if !errors.is_empty() { + Err(ValError::LineErrors(errors)) + } else { + Ok((PyTuple::new(py, output_args)?, output_kwargs).into_py_any(py)?) + } + } +} + +impl Validator for ArgumentsV3Validator { + fn validate<'py>( + &self, + py: Python<'py>, + input: &(impl Input<'py> + ?Sized), + state: &mut ValidationState<'_, 'py>, + ) -> ValResult { + // this validator does not yet support partial validation, disable it to avoid incorrect results + state.allow_partial = false.into(); + + let args_dict = input.validate_dict(false); + + // Validation from a dictionary, mapping parameter names to the values: + if let Ok(dict) = args_dict { + self.validate_from_mapping(py, input, dict, state) + } else { + let args = input.validate_args_v3()?; + self.validate_from_argskwargs(py, input, args, state) + } + } + + fn get_name(&self) -> &str { + Self::EXPECTED_TYPE + } +} diff --git a/src/validators/mod.rs b/src/validators/mod.rs index 4e40bf080..f105e1854 100644 --- a/src/validators/mod.rs +++ b/src/validators/mod.rs @@ -21,6 +21,7 @@ pub(crate) use config::ValBytesMode; mod any; mod arguments; +mod arguments_v3; mod bool; mod bytes; mod call; @@ -636,6 +637,7 @@ pub fn build_validator( callable::CallableValidator, // arguments arguments::ArgumentsValidator, + arguments_v3::ArgumentsV3Validator, // default value with_default::WithDefaultValidator, // chain validators @@ -802,6 +804,7 @@ pub enum CombinedValidator { Callable(callable::CallableValidator), // arguments Arguments(arguments::ArgumentsValidator), + ArgumentsV3(arguments_v3::ArgumentsV3Validator), // default value WithDefault(with_default::WithDefaultValidator), // chain validators diff --git a/tests/conftest.py b/tests/conftest.py index 56d94a0c1..226ace018 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -14,7 +14,7 @@ import hypothesis import pytest -from pydantic_core import ArgsKwargs, SchemaValidator, ValidationError, validate_core_schema +from pydantic_core import ArgsKwargs, CoreSchema, SchemaValidator, ValidationError, validate_core_schema from pydantic_core.core_schema import CoreConfig __all__ = 'Err', 'PyAndJson', 'plain_repr', 'infinite_generator' @@ -52,7 +52,11 @@ def json_default(obj): class PyAndJsonValidator: def __init__( - self, schema, config: CoreConfig | None = None, *, validator_type: Literal['json', 'python'] | None = None + self, + schema: CoreSchema, + config: CoreConfig | None = None, + *, + validator_type: Literal['json', 'python'] | None = None, ): self.validator = SchemaValidator(validate_core_schema(schema), config) self.validator_type = validator_type diff --git a/tests/test_schema_functions.py b/tests/test_schema_functions.py index a15adfca5..a9cb358ff 100644 --- a/tests/test_schema_functions.py +++ b/tests/test_schema_functions.py @@ -225,6 +225,24 @@ def args(*args, **kwargs): 'serialization': {'type': 'format', 'formatting_string': 'd'}, }, ), + ( + core_schema.arguments_v3_schema, + args( + [ + core_schema.arguments_v3_parameter('foo', core_schema.int_schema()), + core_schema.arguments_v3_parameter('bar', core_schema.str_schema()), + ], + serialization=core_schema.format_ser_schema('d'), + ), + { + 'type': 'arguments-v3', + 'arguments_schema': [ + {'name': 'foo', 'schema': {'type': 'int'}}, + {'name': 'bar', 'schema': {'type': 'str'}}, + ], + 'serialization': {'type': 'format', 'formatting_string': 'd'}, + }, + ), ( core_schema.call_schema, args(core_schema.arguments_schema([core_schema.arguments_parameter('foo', {'type': 'int'})]), val_function), diff --git a/tests/validators/arguments_v3/__init__.py b/tests/validators/arguments_v3/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/validators/arguments_v3/test_alias.py b/tests/validators/arguments_v3/test_alias.py new file mode 100644 index 000000000..0f365c353 --- /dev/null +++ b/tests/validators/arguments_v3/test_alias.py @@ -0,0 +1,158 @@ +from __future__ import annotations + +import re + +import pytest + +from pydantic_core import ArgsKwargs, SchemaValidator, ValidationError +from pydantic_core import core_schema as cs + +from ...conftest import Err, PyAndJson + + +@pytest.mark.parametrize( + ['input_value', 'expected'], + ( + [ArgsKwargs((1,)), ((1,), {})], + [ArgsKwargs((), {'Foo': 1}), ((), {'a': 1})], + [ArgsKwargs((), {'a': 1}), Err('Foo\n Missing required argument [type=missing_argument,')], + [{'Foo': 1}, ((1,), {})], + [{'a': 1}, Err('Foo\n Missing required argument [type=missing_argument,')], + ), + ids=repr, +) +def test_alias(py_and_json: PyAndJson, input_value, expected) -> None: + v = py_and_json( + cs.arguments_v3_schema( + [ + cs.arguments_v3_parameter(name='a', schema=cs.int_schema(), alias='Foo', mode='positional_or_keyword'), + ] + ) + ) + if isinstance(expected, Err): + with pytest.raises(ValidationError, match=re.escape(expected.message)): + v.validate_test(input_value) + else: + assert v.validate_test(input_value) == expected + + +@pytest.mark.parametrize( + ['input_value', 'expected'], + ( + [ArgsKwargs((1,)), ((1,), {})], + [ArgsKwargs((), {'Foo': 1}), ((), {'a': 1})], + [ArgsKwargs((), {'a': 1}), ((), {'a': 1})], + [ArgsKwargs((), {'a': 1, 'b': 2}), Err('b\n Unexpected keyword argument [type=unexpected_keyword_argument,')], + [ + ArgsKwargs((), {'a': 1, 'Foo': 2}), + Err('a\n Unexpected keyword argument [type=unexpected_keyword_argument,'), + ], + [{'Foo': 1}, ((1,), {})], + [{'a': 1}, ((1,), {})], + ), + ids=repr, +) +def test_alias_validate_by_name(py_and_json: PyAndJson, input_value, expected): + v = py_and_json( + cs.arguments_v3_schema( + [ + cs.arguments_v3_parameter(name='a', schema=cs.int_schema(), alias='Foo', mode='positional_or_keyword'), + ], + validate_by_name=True, + ) + ) + if isinstance(expected, Err): + with pytest.raises(ValidationError, match=re.escape(expected.message)): + v.validate_test(input_value) + else: + assert v.validate_test(input_value) == expected + + +def test_only_validate_by_name(py_and_json) -> None: + v = py_and_json( + cs.arguments_v3_schema( + [ + cs.arguments_v3_parameter( + name='a', schema=cs.str_schema(), alias='FieldA', mode='positional_or_keyword' + ), + ], + validate_by_name=True, + validate_by_alias=False, + ) + ) + + assert v.validate_test(ArgsKwargs((), {'a': 'hello'})) == ((), {'a': 'hello'}) + assert v.validate_test({'a': 'hello'}) == (('hello',), {}) + + with pytest.raises(ValidationError, match=r'a\n +Missing required argument \[type=missing_argument,'): + assert v.validate_test(ArgsKwargs((), {'FieldA': 'hello'})) + with pytest.raises(ValidationError, match=r'a\n +Missing required argument \[type=missing_argument,'): + assert v.validate_test({'FieldA': 'hello'}) + + +def test_only_allow_alias(py_and_json) -> None: + v = py_and_json( + cs.arguments_v3_schema( + [ + cs.arguments_v3_parameter( + name='a', schema=cs.str_schema(), alias='FieldA', mode='positional_or_keyword' + ), + ], + validate_by_name=False, + validate_by_alias=True, + ) + ) + assert v.validate_test(ArgsKwargs((), {'FieldA': 'hello'})) == ((), {'a': 'hello'}) + assert v.validate_test({'FieldA': 'hello'}) == (('hello',), {}) + + with pytest.raises(ValidationError, match=r'FieldA\n +Missing required argument \[type=missing_argument,'): + assert v.validate_test(ArgsKwargs((), {'a': 'hello'})) + with pytest.raises(ValidationError, match=r'FieldA\n +Missing required argument \[type=missing_argument,'): + assert v.validate_test({'a': 'hello'}) + + +@pytest.mark.parametrize('config_by_alias', [None, True, False]) +@pytest.mark.parametrize('config_by_name', [None, True, False]) +@pytest.mark.parametrize('runtime_by_alias', [None, True, False]) +@pytest.mark.parametrize('runtime_by_name', [None, True, False]) +def test_by_alias_and_name_config_interaction( + config_by_alias: bool | None, + config_by_name: bool | None, + runtime_by_alias: bool | None, + runtime_by_name: bool | None, +) -> None: + """This test reflects the priority that applies for config vs runtime validation alias configuration. + + Runtime values take precedence over config values, when set. + By default, by_alias is True and by_name is False. + """ + + if config_by_alias is False and config_by_name is False and runtime_by_alias is False and runtime_by_name is False: + pytest.skip("Can't have both by_alias and by_name as effectively False") + + schema = cs.arguments_v3_schema( + arguments=[ + cs.arguments_v3_parameter(name='my_field', schema=cs.int_schema(), alias='my_alias'), + ], + **({'validate_by_alias': config_by_alias} if config_by_alias is not None else {}), + **({'validate_by_name': config_by_name} if config_by_name is not None else {}), + ) + s = SchemaValidator(schema) + + alias_allowed = next(x for x in (runtime_by_alias, config_by_alias, True) if x is not None) + name_allowed = next(x for x in (runtime_by_name, config_by_name, False) if x is not None) + + if alias_allowed: + assert s.validate_python( + ArgsKwargs((), {'my_alias': 1}), by_alias=runtime_by_alias, by_name=runtime_by_name + ) == ( + (), + {'my_field': 1}, + ) + if name_allowed: + assert s.validate_python( + ArgsKwargs((), {'my_field': 1}), by_alias=runtime_by_alias, by_name=runtime_by_name + ) == ( + (), + {'my_field': 1}, + ) diff --git a/tests/validators/arguments_v3/test_build_errors.py b/tests/validators/arguments_v3/test_build_errors.py new file mode 100644 index 000000000..a81244577 --- /dev/null +++ b/tests/validators/arguments_v3/test_build_errors.py @@ -0,0 +1,87 @@ +import pytest + +from pydantic_core import SchemaError, SchemaValidator +from pydantic_core import core_schema as cs + + +def test_build_non_default_follows_default() -> None: + with pytest.raises(SchemaError, match="Required parameter 'b' follows parameter with default"): + SchemaValidator( + schema=cs.arguments_v3_schema( + [ + cs.arguments_v3_parameter( + name='a', + schema=cs.with_default_schema(schema=cs.int_schema(), default_factory=lambda: 42), + mode='positional_or_keyword', + ), + cs.arguments_v3_parameter(name='b', schema=cs.int_schema(), mode='positional_or_keyword'), + ] + ) + ) + + +def test_duplicate_parameter_name() -> None: + with pytest.raises(SchemaError, match="Duplicate parameter 'test'"): + SchemaValidator( + schema=cs.arguments_v3_schema( + [ + cs.arguments_v3_parameter(name='test', schema=cs.int_schema()), + cs.arguments_v3_parameter(name='a', schema=cs.int_schema()), + cs.arguments_v3_parameter(name='test', schema=cs.int_schema()), + ] + ) + ) + + +def test_invalid_positional_only_parameter_position() -> None: + with pytest.raises(SchemaError, match="Positional only parameter 'test' cannot follow other parameter kinds"): + SchemaValidator( + schema=cs.arguments_v3_schema( + [ + cs.arguments_v3_parameter(name='a', schema=cs.int_schema(), mode='var_args'), + cs.arguments_v3_parameter(name='test', schema=cs.int_schema(), mode='positional_only'), + ] + ) + ) + + +def test_invalid_positional_or_keyword_parameter_position() -> None: + with pytest.raises( + SchemaError, match="Positional or keyword parameter 'test' cannot follow variadic or keyword only parameters" + ): + SchemaValidator( + schema=cs.arguments_v3_schema( + [ + cs.arguments_v3_parameter(name='a', schema=cs.int_schema(), mode='var_args'), + cs.arguments_v3_parameter(name='test', schema=cs.int_schema(), mode='positional_or_keyword'), + ] + ) + ) + + +def test_invalid_var_args_parameter_position() -> None: + with pytest.raises( + SchemaError, match="Variadic positional parameter 'test' cannot follow variadic or keyword only parameters" + ): + SchemaValidator( + schema=cs.arguments_v3_schema( + [ + cs.arguments_v3_parameter(name='a', schema=cs.int_schema(), mode='keyword_only'), + cs.arguments_v3_parameter(name='test', schema=cs.int_schema(), mode='var_args'), + ] + ) + ) + + +def test_invalid_keyword_only_parameter_position() -> None: + with pytest.raises( + SchemaError, match="Keyword only parameter 'test' cannot follow variadic keyword only parameter" + ): + SchemaValidator( + schema=cs.arguments_v3_schema( + [ + cs.arguments_v3_parameter(name='a', schema=cs.int_schema(), mode='var_kwargs_uniform'), + cs.arguments_v3_parameter(name='test', schema=cs.int_schema(), mode='keyword_only'), + ] + ) + ) diff --git a/tests/validators/arguments_v3/test_extra.py b/tests/validators/arguments_v3/test_extra.py new file mode 100644 index 000000000..749c3f342 --- /dev/null +++ b/tests/validators/arguments_v3/test_extra.py @@ -0,0 +1,57 @@ +import pytest + +from pydantic_core import ArgsKwargs, ValidationError +from pydantic_core import core_schema as cs + +from ...conftest import PyAndJson + + +@pytest.mark.parametrize( + ['input_value', 'err_type'], + ( + [ArgsKwargs((), {'a': 1, 'b': 2, 'c': 3}), 'unexpected_keyword_argument'], + [ArgsKwargs((), {'a': 1, 'c': 3, 'extra': 'value'}), 'unexpected_keyword_argument'], + [{'a': 1, 'b': 2, 'c': 3}, 'extra_forbidden'], + [{'a': 1, 'c': 3, 'extra': 'value'}, 'extra_forbidden'], + ), +) +def test_extra_forbid(py_and_json: PyAndJson, input_value, err_type) -> None: + v = py_and_json( + cs.arguments_v3_schema( + [ + cs.arguments_v3_parameter(name='a', schema=cs.int_schema()), + cs.arguments_v3_parameter(name='b', schema=cs.int_schema(), alias='c'), + ], + extra_behavior='forbid', + ), + ) + + with pytest.raises(ValidationError) as exc_info: + v.validate_test(input_value) + + error = exc_info.value.errors()[0] + + assert error['type'] == err_type + + +@pytest.mark.parametrize( + 'input_value', + [ + ArgsKwargs((), {'a': 1, 'b': 2, 'c': 3}), + ArgsKwargs((), {'a': 1, 'c': 3, 'extra': 'value'}), + {'a': 1, 'b': 2, 'c': 3}, + {'a': 1, 'c': 3, 'extra': 'value'}, + ], +) +def test_extra_ignore(py_and_json: PyAndJson, input_value) -> None: + v = py_and_json( + cs.arguments_v3_schema( + [ + cs.arguments_v3_parameter(name='a', schema=cs.int_schema(), mode='keyword_only'), + cs.arguments_v3_parameter(name='b', schema=cs.int_schema(), alias='c', mode='keyword_only'), + ], + extra_behavior='ignore', + ), + ) + + assert v.validate_test(input_value) == ((), {'a': 1, 'b': 3}) diff --git a/tests/validators/arguments_v3/test_general.py b/tests/validators/arguments_v3/test_general.py new file mode 100644 index 000000000..bc2e91436 --- /dev/null +++ b/tests/validators/arguments_v3/test_general.py @@ -0,0 +1,131 @@ +import pytest + +from pydantic_core import ArgsKwargs, SchemaValidator, ValidationError +from pydantic_core import core_schema as cs + +from ...conftest import Err, PyAndJson, plain_repr + + +@pytest.mark.parametrize( + ['input_value', 'expected'], + ( + [ArgsKwargs(()), ((), {})], + [{}, ((), {})], + [ArgsKwargs((1,)), Err('', [{'type': 'unexpected_positional_argument'}])], + [ArgsKwargs((), {'a': 1}), Err('', [{'type': 'unexpected_keyword_argument'}])], + ), +) +def test_no_args(py_and_json: PyAndJson, input_value, expected) -> None: + v = py_and_json(cs.arguments_v3_schema([])) + + if isinstance(expected, Err): + with pytest.raises(ValidationError) as exc_info: + v.validate_test(input_value) + + error = exc_info.value.errors()[0] + + assert error['type'] == expected.errors[0]['type'] + else: + assert v.validate_test(input_value) == expected + + +@pytest.mark.parametrize( + ['input_value', 'expected'], + [ + [ArgsKwargs((1, 2)), ((1, 2), {})], + [ArgsKwargs((1,)), ((1,), {'b': 42})], + [ArgsKwargs((1,), {'b': 3}), ((1,), {'b': 3})], + [ArgsKwargs((), {'a': 1}), ((), {'a': 1, 'b': 42})], + [{'a': 1, 'b': 2}, ((1, 2), {})], + [{'a': 1}, ((1, 42), {})], + ], + ids=repr, +) +def test_default_factory(py_and_json: PyAndJson, input_value, expected) -> None: + v = py_and_json( + cs.arguments_v3_schema( + [ + cs.arguments_v3_parameter(name='a', schema=cs.int_schema(), mode='positional_or_keyword'), + cs.arguments_v3_parameter( + name='b', + schema=cs.with_default_schema(schema=cs.int_schema(), default_factory=lambda: 42), + mode='positional_or_keyword', + ), + ] + ) + ) + + assert v.validate_test(input_value) == expected + + +def double_or_bust(input_value): + if input_value == 1: + raise RuntimeError('bust') + return input_value * 2 + + +def test_internal_error(py_and_json: PyAndJson) -> None: + v = py_and_json( + cs.arguments_v3_schema( + [ + cs.arguments_v3_parameter(name='a', schema=cs.int_schema(), mode='positional_only'), + cs.arguments_v3_parameter( + name='b', schema=cs.no_info_plain_validator_function(double_or_bust), mode='positional_only' + ), + ] + ) + ) + + assert v.validate_test(ArgsKwargs((1, 2))) == ((1, 4), {}) + with pytest.raises(RuntimeError, match='bust'): + v.validate_test(ArgsKwargs((1, 1))) + + +def test_repr() -> None: + v = SchemaValidator( + cs.arguments_v3_schema( + [ + cs.arguments_v3_parameter(name='a', schema=cs.int_schema(), mode='positional_or_keyword'), + cs.arguments_v3_parameter( + name='b', + schema=cs.with_default_schema(schema=cs.int_schema(), default_factory=lambda: 42), + mode='keyword_only', + ), + ] + ) + ) + assert 'positional_params_count:1,' in plain_repr(v) + + +@pytest.mark.parametrize( + ['input_value', 'expected'], + ( + [ArgsKwargs((1, 't', 2, 3, 4), {'c': True, 'other': 1}), ((1, 't', 2, 3, 4), {'c': True, 'other': 1})], + [ + {'aa': 1, 'b': 't', 'args': [2, 3, 4], 'c': True, 'kwargs': {'other': 1}}, + ((1, 't', 2, 3, 4), {'c': True, 'other': 1}), + ], + ), +) +def test_full(py_and_json: PyAndJson, input_value, expected) -> None: + """Test inputs against all parameter types: + + ```python + def func(a: Annotated[int, Field(alias='aa')], /, b: str, *args: int, c: bool, **kwargs: int): + ... + ``` + """ + + v = py_and_json( + cs.arguments_v3_schema( + [ + cs.arguments_v3_parameter(name='a', schema=cs.int_schema(), alias='aa', mode='positional_only'), + cs.arguments_v3_parameter(name='b', schema=cs.str_schema(), mode='positional_or_keyword'), + cs.arguments_v3_parameter(name='args', schema=cs.int_schema(), mode='var_args'), + cs.arguments_v3_parameter(name='c', schema=cs.bool_schema(), mode='keyword_only'), + cs.arguments_v3_parameter(name='kwargs', schema=cs.int_schema(), mode='var_kwargs_uniform'), + ] + ) + ) + + assert v.validate_test(input_value) == expected diff --git a/tests/validators/arguments_v3/test_keyword_only.py b/tests/validators/arguments_v3/test_keyword_only.py new file mode 100644 index 000000000..5a7f00a05 --- /dev/null +++ b/tests/validators/arguments_v3/test_keyword_only.py @@ -0,0 +1,99 @@ +import pytest + +from pydantic_core import ArgsKwargs, ValidationError +from pydantic_core import core_schema as cs + +from ...conftest import PyAndJson + + +@pytest.mark.parametrize( + 'input_value', + [ + ArgsKwargs((), {'a': 1, 'b': True}), + ArgsKwargs((), {'a': 1}), + {'a': 1, 'b': True}, + {'a': 1}, + ], +) +def test_keyword_only(py_and_json: PyAndJson, input_value) -> None: + """Test valid inputs against keyword-only parameters: + + ```python + def func(*, a: int, b: bool = True): + ... + ``` + """ + v = py_and_json( + cs.arguments_v3_schema( + [ + cs.arguments_v3_parameter(name='a', schema=cs.int_schema(), mode='keyword_only'), + cs.arguments_v3_parameter( + name='b', schema=cs.with_default_schema(cs.bool_schema(), default=True), mode='keyword_only' + ), + ] + ) + ) + + assert v.validate_test(input_value) == ((), {'a': 1, 'b': True}) + + +@pytest.mark.parametrize( + 'input_value', + [ArgsKwargs((), {'a': 'not_an_int'}), {'a': 'not_an_int'}], +) +def test_keyword_only_validation_error(py_and_json: PyAndJson, input_value) -> None: + """Test invalid inputs against keyword-only parameters: + + ```python + def func(*, a: int): + ... + + func('not_an_int') + ``` + """ + v = py_and_json( + cs.arguments_v3_schema( + [ + cs.arguments_v3_parameter(name='a', schema=cs.int_schema(), mode='keyword_only'), + ] + ) + ) + + with pytest.raises(ValidationError) as exc_info: + v.validate_test(input_value) + + error = exc_info.value.errors()[0] + + assert error['type'] == 'int_parsing' + assert error['loc'] == ('a',) + + +@pytest.mark.parametrize( + 'input_value', + [ArgsKwargs((), {}), {}], +) +def test_keyword_only_error_required(py_and_json: PyAndJson, input_value) -> None: + """Test missing inputs against keyword-only parameters: + + ```python + def func(*, a: int): + ... + + func() + ``` + """ + v = py_and_json( + cs.arguments_v3_schema( + [ + cs.arguments_v3_parameter(name='a', schema=cs.int_schema(), mode='keyword_only'), + ] + ) + ) + + with pytest.raises(ValidationError) as exc_info: + v.validate_test(input_value) + + error = exc_info.value.errors()[0] + + assert error['type'] == 'missing_keyword_only_argument' + assert error['loc'] == ('a',) diff --git a/tests/validators/arguments_v3/test_positional_only.py b/tests/validators/arguments_v3/test_positional_only.py new file mode 100644 index 000000000..5f6d4ba95 --- /dev/null +++ b/tests/validators/arguments_v3/test_positional_only.py @@ -0,0 +1,107 @@ +import pytest + +from pydantic_core import ArgsKwargs, ValidationError +from pydantic_core import core_schema as cs + +from ...conftest import PyAndJson + + +@pytest.mark.parametrize( + 'input_value', + [ + ArgsKwargs((1, True)), + ArgsKwargs((1,)), + {'a': 1, 'b': True}, + {'a': 1}, + ], +) +def test_positional_only(py_and_json: PyAndJson, input_value) -> None: + """Test valid inputs against positional-only parameters: + + ```python + def func(a: int, b: bool = True, /): + ... + ``` + """ + v = py_and_json( + cs.arguments_v3_schema( + [ + cs.arguments_v3_parameter(name='a', schema=cs.int_schema(), mode='positional_only'), + cs.arguments_v3_parameter( + name='b', schema=cs.with_default_schema(cs.bool_schema(), default=True), mode='positional_only' + ), + ] + ) + ) + + assert v.validate_test(input_value) == ((1, True), {}) + + +def test_positional_only_validation_error(py_and_json: PyAndJson) -> None: + """Test invalid inputs against positional-only parameters: + + ```python + def func(a: int, /): + ... + + func('not_an_int') + ``` + """ + v = py_and_json( + cs.arguments_v3_schema( + [ + cs.arguments_v3_parameter(name='a', schema=cs.int_schema(), mode='positional_only'), + ] + ) + ) + + with pytest.raises(ValidationError) as exc_info: + v.validate_test(ArgsKwargs(('not_an_int',), {})) + + error = exc_info.value.errors()[0] + + assert error['type'] == 'int_parsing' + assert error['loc'] == (0,) + + with pytest.raises(ValidationError) as exc_info: + v.validate_test({'a': 'not_an_int'}) + + error = exc_info.value.errors()[0] + + assert error['type'] == 'int_parsing' + assert error['loc'] == ('a',) + + +def test_positional_only_error_required(py_and_json: PyAndJson) -> None: + """Test missing inputs against positional-only parameters: + + ```python + def func(a: int, /): + ... + + func() + ``` + """ + v = py_and_json( + cs.arguments_v3_schema( + [ + cs.arguments_v3_parameter(name='a', schema=cs.int_schema(), mode='positional_only'), + ] + ) + ) + + with pytest.raises(ValidationError) as exc_info: + v.validate_test(ArgsKwargs((), {})) + + error = exc_info.value.errors()[0] + + assert error['type'] == 'missing_positional_only_argument' + assert error['loc'] == (0,) + + with pytest.raises(ValidationError) as exc_info: + v.validate_test({}) + + error = exc_info.value.errors()[0] + + assert error['type'] == 'missing_positional_only_argument' + assert error['loc'] == ('a',) diff --git a/tests/validators/arguments_v3/test_positional_or_keyword.py b/tests/validators/arguments_v3/test_positional_or_keyword.py new file mode 100644 index 000000000..40d0fa55c --- /dev/null +++ b/tests/validators/arguments_v3/test_positional_or_keyword.py @@ -0,0 +1,107 @@ +import pytest + +from pydantic_core import ArgsKwargs, ValidationError +from pydantic_core import core_schema as cs + +from ...conftest import PyAndJson + + +@pytest.mark.parametrize( + ['input_value', 'expected'], + ( + [ArgsKwargs((1, True)), ((1, True), {})], + [ArgsKwargs((1,)), ((1,), {'b': True})], + [ArgsKwargs((1,), {'b': True}), ((1,), {'b': True})], + [ArgsKwargs((), {'a': 1, 'b': True}), ((), {'a': 1, 'b': True})], + [{'a': 1, 'b': True}, ((1, True), {})], + [{'a': 1}, ((1, True), {})], + ), +) +def test_positional_or_keyword(py_and_json: PyAndJson, input_value, expected) -> None: + """Test valid inputs against positional-or-keyword parameters: + + ```python + def func(a: int, b: bool = True): + ... + ``` + """ + v = py_and_json( + cs.arguments_v3_schema( + [ + cs.arguments_v3_parameter(name='a', schema=cs.int_schema(), mode='positional_or_keyword'), + cs.arguments_v3_parameter( + name='b', + schema=cs.with_default_schema(cs.bool_schema(), default=True), + mode='positional_or_keyword', + ), + ] + ) + ) + + assert v.validate_test(input_value) == expected + + +@pytest.mark.parametrize( + ['input_value', 'err_loc'], + ( + [ArgsKwargs(('not_an_int',), {}), (0,)], + [ArgsKwargs((), {'a': 'not_an_int'}), ('a',)], + [{'a': 'not_an_int'}, ('a',)], + ), +) +def test_positional_or_keyword_validation_error(py_and_json: PyAndJson, input_value, err_loc) -> None: + """Test invalid inputs against positional-or-keyword parameters: + + ```python + def func(a: int): + ... + + func('not_an_int') + ``` + """ + v = py_and_json( + cs.arguments_v3_schema( + [ + cs.arguments_v3_parameter(name='a', schema=cs.int_schema(), mode='positional_or_keyword'), + ] + ) + ) + + with pytest.raises(ValidationError) as exc_info: + v.validate_test(input_value) + + error = exc_info.value.errors()[0] + + assert error['type'] == 'int_parsing' + assert error['loc'] == err_loc + + +@pytest.mark.parametrize( + 'input_value', + [ArgsKwargs((), {}), {}], +) +def test_positional_only_error_required(py_and_json: PyAndJson, input_value) -> None: + """Test missing inputs against positional-or-keyword parameters: + + ```python + def func(a: int): + ... + + func() + ``` + """ + v = py_and_json( + cs.arguments_v3_schema( + [ + cs.arguments_v3_parameter(name='a', schema=cs.int_schema(), mode='positional_or_keyword'), + ] + ) + ) + + with pytest.raises(ValidationError) as exc_info: + v.validate_test(input_value) + + error = exc_info.value.errors()[0] + + assert error['type'] == 'missing_argument' + assert error['loc'] == ('a',) diff --git a/tests/validators/arguments_v3/test_var_args.py b/tests/validators/arguments_v3/test_var_args.py new file mode 100644 index 000000000..5fa2a5eb3 --- /dev/null +++ b/tests/validators/arguments_v3/test_var_args.py @@ -0,0 +1,100 @@ +import pytest + +from pydantic_core import ArgsKwargs, ValidationError +from pydantic_core import core_schema as cs + +from ...conftest import PyAndJson + + +@pytest.mark.parametrize( + ['input_value', 'expected'], + ( + [ArgsKwargs(()), ((), {})], + [ArgsKwargs((1, 2, 3)), ((1, 2, 3), {})], + [{'args': ()}, ((), {})], + [{'args': (1, 2, 3)}, ((1, 2, 3), {})], + # Also validates against other sequence types, as long as it is + # possible to validate it as a tuple: + [{'args': [1, 2, 3]}, ((1, 2, 3), {})], + ), +) +def test_var_args(py_and_json: PyAndJson, input_value, expected) -> None: + """Test valid inputs against var-args parameters: + + ```python + def func(*args: int): + ... + ``` + """ + v = py_and_json( + cs.arguments_v3_schema( + [ + cs.arguments_v3_parameter(name='args', schema=cs.int_schema(), mode='var_args'), + ] + ) + ) + + assert v.validate_test(input_value) == expected + + +@pytest.mark.parametrize( + ['input_value', 'err_loc'], + ( + [ArgsKwargs(('not_an_int',)), (0,)], + [ + ArgsKwargs( + ( + 1, + 'not_an_int', + ) + ), + (1,), + ], + [{'args': ['not_an_int']}, ('args', 0)], + [{'args': [1, 'not_an_int']}, ('args', 1)], + ), +) +def test_var_args_validation_error(py_and_json: PyAndJson, input_value, err_loc) -> None: + """Test invalid inputs against var-args parameters: + + ```python + def func(*args: int): + ... + + func(1, 'not_an_int') + ``` + """ + v = py_and_json( + cs.arguments_v3_schema( + [ + cs.arguments_v3_parameter(name='args', schema=cs.int_schema(), mode='var_args'), + ] + ) + ) + + with pytest.raises(ValidationError) as exc_info: + v.validate_test(input_value) + + error = exc_info.value.errors()[0] + + assert error['type'] == 'int_parsing' + assert error['loc'] == err_loc + + +def test_var_args_invalid_tuple(py_and_json: PyAndJson) -> None: + """Test invalid tuple-like input against var-args parameters in mapping validation mode.""" + v = py_and_json( + cs.arguments_v3_schema( + [ + cs.arguments_v3_parameter(name='args', schema=cs.int_schema(), mode='var_args'), + ] + ) + ) + + with pytest.raises(ValidationError) as exc_info: + v.validate_test({'args': 'not_a_tuple'}) + + error = exc_info.value.errors()[0] + + assert error['type'] == 'tuple_type' + assert error['loc'] == ('args',) diff --git a/tests/validators/arguments_v3/test_var_kwargs_uniform.py b/tests/validators/arguments_v3/test_var_kwargs_uniform.py new file mode 100644 index 000000000..ed44721d6 --- /dev/null +++ b/tests/validators/arguments_v3/test_var_kwargs_uniform.py @@ -0,0 +1,89 @@ +import pytest + +from pydantic_core import ArgsKwargs, ValidationError +from pydantic_core import core_schema as cs + +from ...conftest import PyAndJson + + +@pytest.mark.parametrize( + ['input_value', 'expected'], + ( + [ArgsKwargs(()), ((), {})], + [ArgsKwargs((), {'a': 1, 'b': 2}), ((), {'a': 1, 'b': 2})], + [{}, ((), {})], + [{'kwargs': {'a': 1, 'b': 2}}, ((), {'a': 1, 'b': 2})], + ), +) +def test_var_kwargs(py_and_json: PyAndJson, input_value, expected) -> None: + """Test valid inputs against var-args parameters (uniform): + + ```python + def func(**kwargs: int): + ... + ``` + """ + v = py_and_json( + cs.arguments_v3_schema( + [ + cs.arguments_v3_parameter(name='kwargs', schema=cs.int_schema(), mode='var_kwargs_uniform'), + ] + ) + ) + + assert v.validate_test(input_value) == expected + + +@pytest.mark.parametrize( + ['input_value', 'err_loc'], + ( + [ArgsKwargs((), {'a': 'not_an_int'}), ('a',)], + [ArgsKwargs((), {'a': 1, 'b': 'not_an_int'}), ('b',)], + [{'kwargs': {'a': 'not_an_int'}}, ('kwargs', 'a')], + [{'kwargs': {'a': 1, 'b': 'not_an_int'}}, ('kwargs', 'b')], + ), +) +def test_var_kwargs_validation_error(py_and_json: PyAndJson, input_value, err_loc) -> None: + """Test invalid inputs against var-args parameters (uniform): + + ```python + def func(**kwargs: int): + ... + + func(a='not_an_int') + ``` + """ + v = py_and_json( + cs.arguments_v3_schema( + [ + cs.arguments_v3_parameter(name='kwargs', schema=cs.int_schema(), mode='var_kwargs_uniform'), + ] + ) + ) + + with pytest.raises(ValidationError) as exc_info: + v.validate_test(input_value) + + error = exc_info.value.errors()[0] + + assert error['type'] == 'int_parsing' + assert error['loc'] == err_loc + + +def test_var_kwargs_invalid_dict(py_and_json: PyAndJson) -> None: + """Test invalid dict-like input against var-kwargs parameters in mapping validation mode.""" + v = py_and_json( + cs.arguments_v3_schema( + [ + cs.arguments_v3_parameter(name='kwargs', schema=cs.int_schema(), mode='var_kwargs_uniform'), + ] + ) + ) + + with pytest.raises(ValidationError) as exc_info: + v.validate_test({'kwargs': 'not_a_dict'}) + + error = exc_info.value.errors()[0] + + assert error['type'] == 'dict_type' + assert error['loc'] == ('kwargs',) diff --git a/tests/validators/arguments_v3/test_var_kwargs_unpacked_typed_dict.py b/tests/validators/arguments_v3/test_var_kwargs_unpacked_typed_dict.py new file mode 100644 index 000000000..6ffb90a1e --- /dev/null +++ b/tests/validators/arguments_v3/test_var_kwargs_unpacked_typed_dict.py @@ -0,0 +1,120 @@ +import pytest + +from pydantic_core import ArgsKwargs, ValidationError +from pydantic_core import core_schema as cs + +from ...conftest import Err, PyAndJson + + +@pytest.mark.parametrize( + ['input_value', 'expected'], + ( + [ArgsKwargs((), {'x': 1}), ((), {'x': 1})], + [ArgsKwargs((), {'x': 1, 'z': True}), ((), {'x': 1, 'y': True})], + [ArgsKwargs((), {}), Err('', [{'type': 'missing', 'loc': ('x',)}])], + [ArgsKwargs((), {'x': 'not_an_int'}), Err('', [{'type': 'int_parsing', 'loc': ('x',)}])], + [ArgsKwargs((), {'x': 1, 'y': True}), Err('', [{'type': 'extra_forbidden', 'loc': ('y',)}])], + [{'kwargs': {'x': 1}}, ((), {'x': 1})], + [{'kwargs': {'x': 1, 'z': True}}, ((), {'x': 1, 'y': True})], + [{'kwargs': {}}, Err('', [{'type': 'missing', 'loc': ('kwargs', 'x')}])], + [{}, Err('', [{'type': 'missing', 'loc': ('kwargs', 'x')}])], + [ + {'kwargs': {'x': 'not_an_int'}}, + Err( + '', + [ + { + 'type': 'int_parsing', + 'loc': ( + 'kwargs', + 'x', + ), + } + ], + ), + ], + [ + {'kwargs': {'x': 1, 'y': True}}, + Err( + '', + [ + { + 'type': 'extra_forbidden', + 'loc': ( + 'kwargs', + 'y', + ), + } + ], + ), + ], + ), +) +def test_var_kwargs(py_and_json: PyAndJson, input_value, expected) -> None: + """Test (in)valid inputs against var-args parameters (unpacked typed dict): + + ```python + class TD(TypedDict, total=false): + x: Required[int] + y: Annotated[bool, Field(validation_alias='z')] + + def func(**kwargs: Unpack[TD]): + ... + ``` + """ + v = py_and_json( + cs.arguments_v3_schema( + [ + cs.arguments_v3_parameter( + name='kwargs', + schema=cs.typed_dict_schema( + { + 'x': cs.typed_dict_field( + schema=cs.int_schema(), + required=True, + ), + 'y': cs.typed_dict_field( + schema=cs.bool_schema(), + required=False, + validation_alias='z', + ), + }, + extra_behavior='forbid', + ), + mode='var_kwargs_unpacked_typed_dict', + ), + ] + ) + ) + + if isinstance(expected, Err): + with pytest.raises(ValidationError) as exc_info: + v.validate_test(input_value) + + error = exc_info.value.errors()[0] + + assert error['type'] == expected.errors[0]['type'] + assert error['loc'] == expected.errors[0]['loc'] + else: + assert v.validate_test(input_value) == expected + + +def test_var_kwargs_invalid_dict(py_and_json: PyAndJson) -> None: + """Test invalid dict-like input against var-kwargs parameters in mapping validation mode.""" + v = py_and_json( + cs.arguments_v3_schema( + [ + cs.arguments_v3_parameter( + name='kwargs', schema=cs.typed_dict_schema({}), mode='var_kwargs_unpacked_typed_dict' + ), + ] + ) + ) + + with pytest.raises(ValidationError) as exc_info: + v.validate_test({'kwargs': 'not_a_dict'}) + + error = exc_info.value.errors()[0] + + assert error['type'] == 'dict_type' + assert error['loc'] == ('kwargs',)