Skip to content

Commit 3642c14

Browse files
authored
Allow PARQUET format for uploading data. (#609)
1 parent 7a624ea commit 3642c14

File tree

7 files changed

+87
-75
lines changed

7 files changed

+87
-75
lines changed

.github/workflows/R-CMD-check.yaml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ jobs:
2525
- {os: macos-latest, r: 'release'}
2626

2727
- {os: windows-latest, r: 'release'}
28-
# Use 3.6 to trigger usage of RTools35
29-
- {os: windows-latest, r: '3.6'}
3028
# use 4.1 to check with rtools40's older compiler
3129
- {os: windows-latest, r: '4.1'}
3230

DESCRIPTION

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ License: MIT + file LICENSE
1313
URL: https://bigrquery.r-dbi.org, https://github.com/r-dbi/bigrquery
1414
BugReports: https://github.com/r-dbi/bigrquery/issues
1515
Depends:
16-
R (>= 3.6)
16+
R (>= 4.0)
1717
Imports:
1818
bit64,
1919
brio,
@@ -28,8 +28,9 @@ Imports:
2828
methods,
2929
prettyunits,
3030
rlang (>= 1.1.0),
31-
tibble
32-
Suggests:
31+
tibble,
32+
nanoparquet
33+
Suggests:
3334
blob,
3435
covr,
3536
dbplyr (>= 2.4.0),

NAMESPACE

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,9 +159,11 @@ importFrom(httr,DELETE)
159159
importFrom(httr,GET)
160160
importFrom(httr,PATCH)
161161
importFrom(httr,POST)
162+
importFrom(httr,PUT)
162163
importFrom(httr,add_headers)
163164
importFrom(httr,config)
164165
importFrom(httr,content)
166+
importFrom(httr,headers)
165167
importFrom(httr,http_status)
166168
importFrom(httr,parse_media)
167169
importFrom(httr,status_code)

NEWS.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
# bigrquery (development version)
22

3+
* The `bq_perform_upload()` function now allows users to choose the transmission format (JSON or PARQUET) for data sent to BigQuery (@apalacio9502, #608).
4+
* bigrquery now requires R 4.0, in line with our version support principles.
5+
36
# bigrquery 1.5.1
47

58
* Forward compatibility with upcoming dbplyr release (#601).

R/bq-perform.R

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ bq_perform_extract <- function(x,
9191
#' @export
9292
#' @name api-perform
9393
#' @param values Data frame of values to insert.
94+
#' @param source_format The format of the data files:
95+
#' * For newline-delimited JSON, specify "NEWLINE_DELIMITED_JSON".
96+
#' * For parquet, specify "PARQUET".
9497
#' @param create_disposition Specifies whether the job is allowed to create
9598
#' new tables.
9699
#'
@@ -110,6 +113,7 @@ bq_perform_extract <- function(x,
110113
#' 'duplicate' error is returned in the job result.
111114
bq_perform_upload <- function(x, values,
112115
fields = NULL,
116+
source_format = c("NEWLINE_DELIMITED_JSON", "PARQUET"),
113117
create_disposition = "CREATE_IF_NEEDED",
114118
write_disposition = "WRITE_EMPTY",
115119
...,
@@ -121,12 +125,13 @@ bq_perform_upload <- function(x, values,
121125
cli::cli_abort("{.arg values} must be a data frame.")
122126
}
123127
fields <- as_bq_fields(fields)
128+
arg_match(source_format)
124129
check_string(create_disposition)
125130
check_string(write_disposition)
126131
check_string(billing)
127132

128133
load <- list(
129-
sourceFormat = unbox("NEWLINE_DELIMITED_JSON"),
134+
sourceFormat = unbox(source_format),
130135
destinationTable = tableReference(x),
131136
createDisposition = unbox(create_disposition),
132137
writeDisposition = unbox(write_disposition)
@@ -139,22 +144,30 @@ bq_perform_upload <- function(x, values,
139144
load$autodetect <- unbox(TRUE)
140145
}
141146

142-
config <- list(configuration = list(load = load))
143-
config <- bq_body(config, ...)
144-
config_part <- part(
145-
c("Content-type" = "application/json; charset=UTF-8"),
146-
jsonlite::toJSON(config, pretty = TRUE)
147+
metadata <- list(configuration = list(load = load))
148+
metadata <- bq_body(metadata, ...)
149+
metadata <- list(
150+
"type" = "application/json; charset=UTF-8",
151+
"content" = jsonlite::toJSON(metadata, pretty = TRUE)
147152
)
148153

149-
data_part <- part(
150-
c("Content-type" = "application/json; charset=UTF-8"),
151-
export_json(values)
152-
)
154+
if (source_format == "NEWLINE_DELIMITED_JSON") {
155+
media <- list(
156+
"type" = "application/json; charset=UTF-8",
157+
"content" = export_json(values)
158+
)
159+
} else {
160+
media <- list(
161+
"type" = "application/vnd.apache.parquet",
162+
"content" = export_parquet(values)
163+
)
164+
}
153165

154166
url <- bq_path(billing, jobs = "")
155167
res <- bq_upload(
156168
url,
157-
parts = c(config_part, data_part),
169+
metadata,
170+
media,
158171
query = list(fields = "jobReference")
159172
)
160173
as_bq_job(res$jobReference)
@@ -186,6 +199,21 @@ export_json <- function(values) {
186199
rawToChar(rawConnectionValue(con))
187200
}
188201

202+
# https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-parquet?hl=es-419
203+
export_parquet <- function(values) {
204+
205+
tmpfile <- tempfile(fileext = ".parquet")
206+
207+
defer(unlink(tmpfile))
208+
209+
# write to disk
210+
nanoparquet::write_parquet(values, tmpfile)
211+
212+
# read back results
213+
readBin(tmpfile, what = "raw", n = file.info(tmpfile)$size)
214+
215+
}
216+
189217
#' @export
190218
#' @name api-perform
191219
#' @param source_uris The fully-qualified URIs that point to your data in

R/bq-request.R

Lines changed: 28 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -147,17 +147,36 @@ bq_patch <- function(url, body, ..., query = NULL, token = bq_token()) {
147147
process_request(req)
148148
}
149149

150-
#' @importFrom httr POST add_headers config
151-
bq_upload <- function(url, parts, ..., query = list(), token = bq_token()) {
152-
url <- paste0(upload_url, url)
153-
req <- POST_multipart_related(
154-
url,
155-
parts = parts,
156-
token,
150+
#' @importFrom httr POST PUT add_headers headers config status_code
151+
# https://cloud.google.com/bigquery/docs/reference/api-uploads
152+
bq_upload <- function(url, metadata, media, query = list(), token = bq_token()) {
153+
154+
query <- utils::modifyList(list(fields = "jobReference",uploadType = "resumable"), query)
155+
config <- add_headers("Content-Type" = metadata[["type"]])
156+
157+
req <- POST(
158+
paste0(upload_url, url),
159+
body = metadata[["content"]],
157160
httr::user_agent(bq_ua()),
158-
...,
159-
query = prepare_bq_query(query)
161+
token,
162+
config,
163+
query = query
160164
)
165+
166+
if (status_code(req) == 200) {
167+
168+
config <- add_headers("Content-Type" = media[["type"]])
169+
170+
req <- PUT(
171+
headers(req)$location,
172+
body = media[["content"]],
173+
httr::user_agent(bq_ua()),
174+
token,
175+
config
176+
)
177+
178+
}
179+
161180
process_request(req)
162181
}
163182

@@ -242,43 +261,3 @@ gargle_abort <- function(reason, message, status, call = caller_env()) {
242261
cli::cli_abort(message, class = class, call = call)
243262
}
244263

245-
# Multipart/related ------------------------------------------------------------
246-
247-
248-
# http://www.w3.org/Protocols/rfc1341/7_2_Multipart.html
249-
POST_multipart_related <- function(url, config = NULL, parts = NULL,
250-
query = list(), ...,
251-
boundary = random_boundary(),
252-
handle = NULL) {
253-
if (is.null(config)) config <- config()
254-
255-
sep <- paste0("\n--", boundary, "\n")
256-
end <- paste0("\n--", boundary, "--\n")
257-
258-
body <- paste0(sep, paste0(parts, collapse = sep), end)
259-
260-
type <- paste0("multipart/related; boundary=", boundary)
261-
config <- c(config, add_headers("Content-Type" = type))
262-
263-
query <- utils::modifyList(list(uploadType = "multipart"), query)
264-
265-
POST(url, config = config, body = body, query = query, ..., handle = handle)
266-
}
267-
268-
part <- function(headers, body) {
269-
if (length(headers) == 0) {
270-
header <- "\n"
271-
} else {
272-
header <- paste0(names(headers), ": ", headers, "\n", collapse = "")
273-
}
274-
body <- paste0(body, collapse = "\n")
275-
276-
paste0(header, "\n", body)
277-
}
278-
279-
random_boundary <- function() {
280-
valid <- c(LETTERS, letters, 0:9) # , "'", "(", ")", "+", ",", "-", ".", "/",
281-
# ":", "?")
282-
paste0(sample(valid, 50, replace = TRUE), collapse = "")
283-
}
284-

man/api-perform.Rd

Lines changed: 11 additions & 10 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)