Skip to content

Commit d86e1f5

Browse files
ben-schwenaitap
andauthored
add fread file connection support (#7422)
* add fread connection support * fix testnum * make linterse happy * make linters even more happy * remove read bytes %d since this can overflow * add coverage * be fully experimental API compliant * more coverage * update error message for nrow and mmap * add wording changes Co-authored-by: aitap <krylov.r00t@gmail.com> * add connections guard Co-authored-by: aitap <krylov.r00t@gmail.com> * add strerrors Co-authored-by: aitap <krylov.r00t@gmail.com> * add errno lib * add reopen_connection generic * close con on exit * adjust doc * update conncection info * reopen connection * change modes * update docs * add nocov * use R_ExecWithCleanup to clean up on errors * add test for consuming before fread * use factory pattern * add aliases for S3 methods * capture print in test * fix namespace * rename to binary_reopener * More #ifdef wrapping for connections API Wrap the helper functions too. Avoid double negatives. * R_FINITE will always be true for a size_t argument * Fail when nrow_limit exceeds SIZE_MAX Otherwise truncation occurs silently, possibly setting the limit to something like 100. * Use translateChar() for native encoding string CHAR() could in theory return Latin-1 or UTF-8 text. translateChar() checks the encoding bits and only converts if needed, releasing the memory upon return from the .Call(). * update NEWS * clean up merge * more clean up merge * move extra args * update NEWS * make linter happy * Actually compile with R_CONNNECTIONS_VERSION != 1 * Rename Rd file to match \name{} * Register methods for exported binary_reopener() --------- Co-authored-by: aitap <krylov.r00t@gmail.com>
1 parent 625e497 commit d86e1f5

10 files changed

Lines changed: 309 additions & 11 deletions

File tree

NAMESPACE

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,14 @@ export(rbindlist)
2020
export(fifelse)
2121
export(fcase)
2222
export(fread)
23+
export(binary_reopener)
24+
S3method(binary_reopener, bzfile)
25+
S3method(binary_reopener, default)
26+
S3method(binary_reopener, file)
27+
S3method(binary_reopener, gzfile)
28+
S3method(binary_reopener, pipe)
29+
S3method(binary_reopener, unz)
30+
S3method(binary_reopener, url)
2331
export(fwrite)
2432
export(foverlaps)
2533
export(shift)

NEWS.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@
4444

4545
6. By-reference sub-assignments of strings to factor columns now _actually_ match the levels in UTF-8 when required and now don't result in invalid factors being created, [#7648](https://github.com/Rdatatable/data.table/issues/7648), amending a previous incomplete fix to [#6886](https://github.com/Rdatatable/data.table/issues/6886) in v1.17.2. Thanks @BASS-JN for the report and @aitap for the fix.
4646

47+
7. `fread()` can now read from connections directly by spilling to a temporary file first, [#561](https://github.com/Rdatatable/data.table/issues/561). For the best throughput, point `tmpdir=` (or the global temp directory) to fast storage like an SSD or RAM. Thanks to Chris Neff for the report and @ben-schwen for the implementation.
48+
4749
### Notes
4850

4951
1. {data.table} now depends on R 3.5.0 (2018).
@@ -377,7 +379,8 @@ See [#2611](https://github.com/Rdatatable/data.table/issues/2611) for details. T
377379
# user system elapsed
378380
# 0.028 0.000 0.005
379381
```
380-
20. `fread()` now supports the `comment.char` argument to skip trailing comments or comment-only lines, consistent with `read.table()`, [#856](https://github.com/Rdatatable/data.table/issues/856). The default remains `comment.char = ""` (no comment parsing) for backward compatibility and performance, in contrast to `read.table(comment.char = "#")`. Thanks to @arunsrinivasan and many others for the suggestion and @ben-schwen for the implementation.
382+
383+
20. `fread()` now supports the `comment.char` argument to skip trailing comments or comment-only lines, consistent with `read.table()`, [#856](https://github.com/Rdatatable/data.table/issues/856). The default remains `comment.char = ""` (no comment parsing) for backward compatibility and performance, in contrast to `read.table(comment.char = "#")`. Thanks to @arunsrinivasan and many others for the suggestion and @ben-schwen for the implementation.
381384

382385
### BUG FIXES
383386

R/fread.R

Lines changed: 92 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,39 @@
1+
# nocov start
2+
# S3 generic that returns a function to open connections in binary mode
3+
binary_reopener = function(con, ...) {
4+
UseMethod("binary_reopener")
5+
}
6+
7+
binary_reopener.default = function(con, ...) {
8+
con_class = class1(con)
9+
stopf("Don't know how to reopen connection type '%s'. Need a connection opened in binary mode to continue.", con_class)
10+
}
11+
12+
binary_reopener.file = function(con, ...) {
13+
function(description) file(description, "rb", ...)
14+
}
15+
16+
binary_reopener.gzfile = function(con, ...) {
17+
function(description) gzfile(description, "rb", ...)
18+
}
19+
20+
binary_reopener.bzfile = function(con, ...) {
21+
function(description) bzfile(description, "rb", ...)
22+
}
23+
24+
binary_reopener.url = function(con, ...) {
25+
function(description) url(description, "rb", ...)
26+
}
27+
28+
binary_reopener.unz = function(con, ...) {
29+
function(description) unz(description, "rb", ...)
30+
}
31+
32+
binary_reopener.pipe = function(con, ...) {
33+
function(description) pipe(description, "rb", ...)
34+
}
35+
# nocov end
36+
137
fread = function(
238
input="", file=NULL, text=NULL, cmd=NULL, sep="auto", sep2="auto", dec="auto", quote="\"", nrows=Inf, header="auto",
339
na.strings=getOption("datatable.na.strings","NA"), stringsAsFactors=FALSE, verbose=getOption("datatable.verbose",FALSE),
@@ -55,7 +91,16 @@ yaml=FALSE, tmpdir=tempdir(), tz="UTC")
5591
input = text
5692
}
5793
}
58-
else if (is.null(cmd)) {
94+
# Check if input is a connection and read it into memory
95+
input_is_con = FALSE
96+
if (!missing(input) && inherits(input, "connection")) {
97+
input_is_con = TRUE
98+
} else if (!is.null(file) && inherits(file, "connection")) {
99+
input = file
100+
input_is_con = TRUE
101+
file = NULL
102+
}
103+
if (!input_is_con && is.null(cmd) && is.null(text)) {
59104
if (!is.character(input) || length(input)!=1L) {
60105
stopf("input= must be a single character string containing a file name, a system command containing at least one space, a URL starting 'http[s]://', 'ftp[s]://' or 'file://', or, the input data itself containing at least one \\n or \\r")
61106
}
@@ -81,6 +126,51 @@ yaml=FALSE, tmpdir=tempdir(), tz="UTC")
81126
}
82127
file = tmpFile
83128
}
129+
connection_spill_info = NULL
130+
if (input_is_con) {
131+
if (verbose) {
132+
catf("[00] Spill connection to tempfile\n Connection class: %s\n Reading connection into a temporary file... ", toString(class(input)))
133+
flush.console()
134+
}
135+
spill_started.at = proc.time()
136+
con_open = isOpen(input)
137+
138+
needs_reopen = FALSE
139+
if (con_open) {
140+
con_summary = summary(input)
141+
binary_modes = c("rb", "r+b")
142+
if (!con_summary$mode %chin% binary_modes) needs_reopen = TRUE
143+
}
144+
145+
close_con = NULL
146+
147+
if (needs_reopen) {
148+
close(input)
149+
input = binary_reopener(input)(con_summary$description)
150+
close_con = input
151+
} else if (!con_open) {
152+
open(input, "rb")
153+
close_con = input
154+
}
155+
if (!is.null(close_con)) on.exit(close(close_con), add=TRUE)
156+
tmpFile = tempfile(tmpdir=tmpdir)
157+
on.exit(unlink(tmpFile), add=TRUE)
158+
bytes_copied = .Call(CspillConnectionToFile, input, tmpFile, as.numeric(nrows))
159+
spill_elapsed = (proc.time() - spill_started.at)[["elapsed"]]
160+
161+
if (bytes_copied == 0) {
162+
warningf("Connection has size 0. Returning a NULL %s.", if (data.table) 'data.table' else 'data.frame')
163+
return(if (data.table) data.table(NULL) else data.frame(NULL))
164+
}
165+
166+
if (verbose) {
167+
catf("done in %s\n", timetaken(spill_started.at))
168+
flush.console()
169+
}
170+
connection_spill_info = c(spill_elapsed, bytes_copied)
171+
input = tmpFile
172+
file = tmpFile
173+
}
84174
if (!is.null(file)) {
85175
if (!is.character(file) || length(file)!=1L)
86176
stopf("file= must be a single character string containing a filename, or URL starting 'http[s]://', 'ftp[s]://' or 'file://'")
@@ -264,7 +354,7 @@ yaml=FALSE, tmpdir=tempdir(), tz="UTC")
264354
tz="UTC"
265355
}
266356
ans = .Call(CfreadR,input,identical(input,file),sep,dec,quote,header,nrows,skip,na.strings,strip.white,blank.lines.skip,comment.char,
267-
fill,showProgress,nThread,verbose,warnings2errors,logical01,logicalYN,select,drop,colClasses,integer64,encoding,keepLeadingZeros,tz=="UTC")
357+
fill,showProgress,nThread,verbose,warnings2errors,logical01,logicalYN,select,drop,colClasses,integer64,encoding,keepLeadingZeros,tz=="UTC",connection_spill_info)
268358
if (!length(ans)) return(null.data.table()) # test 1743.308 drops all columns
269359
nr = length(ans[[1L]])
270360
require_bit64_if_needed(ans)

inst/tests/tests.Rraw

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2631,11 +2631,13 @@ if (test_bit64) {
26312631
# getwd() has been set by test.data.table() to the location of this tests.Rraw file. Test files should be in the same directory.
26322632
if (test_R.utils) {
26332633
f = testDir("ch11b.dat.bz2") # http://www.stats.ox.ac.uk/pub/datasets/csb/ch11b.dat
2634-
test(900.1, fread(f, logical01=FALSE), as.data.table(read.table(f)))
2634+
test(900.1, DT<-fread(f, logical01=FALSE), as.data.table(read.table(f)))
2635+
test(900.15, fread(file(f), logical01=FALSE), DT)
26352636
test(900.2, fread(f, logical01=TRUE), as.data.table(read.table(f))[,V5:=as.logical(V5)])
26362637

26372638
f = testDir("1206FUT.txt.bz2") # a CRLF line ending file (DOS)
26382639
test(901.1, DT<-fread(f,strip.white=FALSE), setDT(read.table(f,sep="\t",header=TRUE,colClasses=as.vector(sapply(DT,class)))))
2640+
test(901.15, fread(file(f), strip.white=FALSE), DT)
26392641
test(901.2, DT<-fread(f), setDT(read.table(f,sep="\t",header=TRUE,colClasses=as.vector(sapply(DT,class)),strip.white=TRUE)))
26402642
}
26412643

@@ -6459,8 +6461,10 @@ if (test_bit64 && test_R.utils) {
64596461
ZBJBLOAJAQI = c("LHCYS AYE ZLEMYA IFU HEI JG FEYE", "", ""),
64606462
JKCRUUBAVQ = c("", ".\\YAPCNXJ\\004570_850034_757\\VWBZSS_848482_600874_487_PEKT-6-KQTVIL-7_30\\IRVQT\\HUZWLBSJYHZ\\XFWPXQ-WSPJHC-00-0770000855383.KKZ", "")
64616463
)
6462-
test(1449.1, fread(testDir("quoted_multiline.csv.bz2"))[c(1L, 43:44), c(1L, 22:24)], DT)
6463-
test(1449.2, fread(testDir("quoted_multiline.csv.bz2"), integer64='character', select = 'GPMLHTLN')[c(1L, 43:44)][[1L]], DT[ , as.character(GPMLHTLN)])
6464+
f = testDir("quoted_multiline.csv.bz2")
6465+
test(1449.1, fread(f)[c(1L, 43:44), c(1L, 22:24)], DT)
6466+
test(1449.15, fread(file(f))[c(1L, 43:44), c(1L, 22:24)], DT)
6467+
test(1449.2, fread(f, integer64='character', select = 'GPMLHTLN')[c(1L, 43:44)][[1L]], DT[ , as.character(GPMLHTLN)])
64646468
}
64656469

64666470
# Fix for #927
@@ -21559,3 +21563,17 @@ xenv_empty = new.env()
2155921563
test(2366.5, tables(env=xenv_empty, depth=1L), invisible(data.table(NULL)))
2156021564
test(2366.6, tables(env=xenv_empty), invisible(data.table(NULL)))
2156121565
rm(xenv_empty)
21566+
21567+
# fread supports connections #561
21568+
f = testDir("russellCRLF.csv")
21569+
test(2367.1, fread(file=file(f, "r"), verbose=TRUE), fread(f), output="Spill connection to tempfile")
21570+
test(2367.2, fread(file(f, "r"), nrows=0L), fread(f, nrows=0L))
21571+
test(2367.3, fread(file(f, "r"), nrows=5), fread(f, nrows=5))
21572+
test(2367.4, fread(file(f, "r"), nrows=5, header=FALSE), fread(f, nrows=5, header=FALSE))
21573+
# test with open connection consuming part of the connection before fread
21574+
con = file(f, "rb")
21575+
test(2367.5, {readLines(con, n=3); fread(con)}, fread(f, skip=3L))
21576+
close(con)
21577+
file.create(f <- tempfile())
21578+
test(2367.6, fread(file(f)), data.table(), warning="Connection has size 0.")
21579+
unlink(f)

man/binary_reopener.Rd

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
\name{binary_reopener}
2+
\alias{binary_reopener}
3+
\alias{binary_reopener.default}
4+
\alias{binary_reopener.file}
5+
\alias{binary_reopener.gzfile}
6+
\alias{binary_reopener.bzfile}
7+
\alias{binary_reopener.url}
8+
\alias{binary_reopener.unz}
9+
\alias{binary_reopener.pipe}
10+
\title{ Create a function to open connections in binary mode }
11+
\description{
12+
S3 generic that returns a function to open a connection in binary read mode. Used internally by \code{fread}. Exported so packages with custom connection classes can define methods.
13+
}
14+
\usage{
15+
binary_reopener(con, ...)
16+
}
17+
\arguments{
18+
\item{con}{ A connection object. }
19+
\item{...}{ Additional arguments passed to the connection constructor. }
20+
}
21+
\details{
22+
Returns a function that accepts a description argument and opens a connection in binary read mode (\code{"rb"}). Methods are provided for \code{file}, \code{gzfile}, \code{bzfile}, \code{url}, \code{unz} and \code{pipe} connections.
23+
24+
To support custom connection types with \code{fread}, define a method for your connection class that returns an opener function.
25+
}
26+
\value{
27+
A function that accepts a description argument and returns a connection object opened in binary read mode.
28+
}
29+
\examples{
30+
\dontrun{
31+
# Define a method for a custom connection class
32+
binary_reopener.my_con = function(con, ...) {
33+
function(description) my_con(description, mode = "rb", ...)
34+
}
35+
}
36+
}
37+
\seealso{
38+
\code{\link{fread}}
39+
}
40+
\keyword{ data }

src/data.table.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,8 @@ SEXP setcharvec(SEXP, SEXP, SEXP);
412412
SEXP chmatch_R(SEXP, SEXP, SEXP);
413413
SEXP chmatchdup_R(SEXP, SEXP, SEXP);
414414
SEXP chin_R(SEXP, SEXP);
415-
SEXP freadR(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
415+
SEXP freadR(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
416+
SEXP spillConnectionToFile(SEXP, SEXP, SEXP);
416417
SEXP fwriteR(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
417418
SEXP rbindlist(SEXP, SEXP, SEXP, SEXP, SEXP);
418419
SEXP setlistelt(SEXP, SEXP, SEXP);

src/fread.c

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1576,9 +1576,16 @@ int freadMain(freadMainArgs _args)
15761576
CloseHandle(hFile); // see https://msdn.microsoft.com/en-us/library/windows/desktop/aa366537(v=vs.85).aspx
15771577
if (mmp == NULL) {
15781578
#endif
1579-
int nbit = 8 * sizeof(char*); // #nocov
1580-
STOP(_("Opened %s file ok but could not memory map it. This is a %dbit process. %s."), filesize_to_str(fileSize), nbit, // # nocov
1581-
nbit <= 32 ? _("Please upgrade to 64bit") : _("There is probably not enough contiguous virtual memory available")); // # nocov
1579+
// # nocov start
1580+
int nbit = 8 * sizeof(char*);
1581+
if (nrowLimit < INT64_MAX) {
1582+
STOP(_("Opened %s file ok but could not memory map it. This is a %dbit process. Since you specified nrows=%"PRId64", try wrapping the file in a connection: fread(file('filename'), nrows=%"PRId64")."),
1583+
filesize_to_str(fileSize), nbit, nrowLimit, nrowLimit);
1584+
} else {
1585+
STOP(_("Opened %s file ok but could not memory map it. This is a %dbit process. %s."), filesize_to_str(fileSize), nbit,
1586+
nbit <= 32 ? _("Please upgrade to 64bit") : _("There is probably not enough contiguous virtual memory available"));
1587+
}
1588+
// # nocov end
15821589
}
15831590
sof = (const char*) mmp;
15841591
if (verbose) DTPRINT(_(" Memory mapped ok\n"));
@@ -2970,7 +2977,10 @@ int freadMain(freadMainArgs _args)
29702977

29712978
if (verbose) {
29722979
DTPRINT("=============================\n"); // # notranslate
2980+
tTot = tTot + (args.connectionSpillActive ? args.connectionSpillSeconds : 0.0);
29732981
if (tTot < 0.000001) tTot = 0.000001; // to avoid nan% output in some trivially small tests where tot==0.000s
2982+
if (args.connectionSpillActive)
2983+
DTPRINT(_("%8.3fs (%3.0f%%) Spill connection to tempfile (%.3fGiB)\n"), args.connectionSpillSeconds, 100.0 * args.connectionSpillSeconds / tTot, args.connectionSpillBytes / (1024.0 * 1024.0 * 1024.0));
29742984
DTPRINT(_("%8.3fs (%3.0f%%) Memory map %.3fGiB file\n"), tMap - t0, 100.0 * (tMap - t0) / tTot, 1.0 * fileSize / (1024 * 1024 * 1024));
29752985
DTPRINT(_("%8.3fs (%3.0f%%) sep="), tLayout - tMap, 100.0 * (tLayout - tMap) / tTot);
29762986
DTPRINT(sep == '\t' ? "'\\t'" : (sep == '\n' ? "'\\n'" : "'%c'"), sep); // # notranslate

src/fread.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,11 @@ typedef struct freadMainArgs
174174
// Any additional implementation-specific parameters.
175175
bool oldNoDateTime;
176176

177+
// Connection spill tracking
178+
bool connectionSpillActive;
179+
double connectionSpillSeconds;
180+
double connectionSpillBytes;
181+
177182
} freadMainArgs;
178183

179184

0 commit comments

Comments
 (0)