Part 1 of SERVER-1097: Make mongoimport adhere to CSV spec.

This commit is contained in:
Spencer T Brody
2011-06-30 17:22:27 -04:00
parent eaf7961e83
commit f44433b04c
3 changed files with 157 additions and 77 deletions

View File

@@ -27,6 +27,7 @@
#include <iostream>
#include <boost/program_options.hpp>
#include <boost/algorithm/string.hpp>
using namespace mongo;
@@ -46,21 +47,66 @@ class Import : public Tool {
vector<string> _upsertFields;
static const int BUF_SIZE = 1024 * 1024 * 4;
string stripLeadingWhitespace(string str) {
int i = 0;
while (isspace(str[i])) { ++i; }; // Finds index of first non-whitespace character
return str.substr(i, str.size() - i);
string trimWhitespace(const string& str) {
int begin = 0;
int end = str.size() - 1;
while (isspace(str[begin])) { ++begin; } // Finds index of first non-whitespace character
while (isspace(str[end])) { --end; } // Finds index of last non-whitespace character
return str.substr(begin, end - begin + 1);
}
void csvTokenizeRow(const string& row, vector<string>& tokens) {
bool inQuotes = false;
bool prevWasQuote = false;
bool tokenQuoted = false;
string curtoken = "";
for (string::const_iterator it = row.begin(); it != row.end(); ++it) {
char element = *it;
if (element == '"') {
if (!inQuotes) {
inQuotes = true;
tokenQuoted = true;
curtoken = "";
} else {
if (prevWasQuote) {
curtoken += "\"";
prevWasQuote = false;
} else {
prevWasQuote = true;
}
}
} else {
if (inQuotes && prevWasQuote) {
inQuotes = false;
prevWasQuote = false;
tokens.push_back(curtoken);
}
if (element == ',' && !inQuotes) {
if (!tokenQuoted) { // If token was quoted, it's already been added
tokens.push_back(trimWhitespace(curtoken));
}
curtoken = "";
tokenQuoted = false;
} else {
curtoken += element;
}
}
}
if (!tokenQuoted || (inQuotes && prevWasQuote)) {
tokens.push_back(trimWhitespace(curtoken));
}
}
void _append( BSONObjBuilder& b , const string& fieldName , const string& data ) {
if ( b.appendAsNumber( fieldName , data ) )
return;
if ( _ignoreBlanks && data.size() == 0 )
return;
if ( b.appendAsNumber( fieldName , data ) )
return;
// TODO: other types?
b.append( fieldName , data );
b.append ( fieldName , data );
}
/*
@@ -118,88 +164,75 @@ class Import : public Tool {
* Returns a true if a BSONObj was successfully created and false if not.
*/
bool parseRow(istream* in, BSONObj& o, int& numBytesRead) {
boost::scoped_array<char> line(new char[BUF_SIZE+2]);
char* buf = line.get();
boost::scoped_array<char> buffer(new char[BUF_SIZE+2]);
char* line = buffer.get();
numBytesRead = getLine(in, buf);
buf += numBytesRead;
numBytesRead = getLine(in, line);
line += numBytesRead;
while ((_type != TSV || buf[0] != '\t') && isspace( buf[0] )) {
numBytesRead++;
buf++;
}
if (buf[0] == '\0') {
if (line[0] == '\0') {
return false;
}
numBytesRead += strlen( buf );
numBytesRead += strlen( line );
if (_type == JSON) {
// Strip out trailing whitespace
char * end = ( buf + strlen( buf ) ) - 1;
char * end = ( line + strlen( line ) ) - 1;
while ( isspace(*end) ) {
*end = 0;
end--;
}
o = fromjson( buf );
o = fromjson( line );
return true;
}
BSONObjBuilder b;
vector<string> tokens;
if (_type == CSV) {
string row;
bool inside_quotes = false;
size_t last_quote = 0;
while (true) {
string lineStr(line);
// Deal with line breaks in quoted strings
last_quote = lineStr.find_first_of('"');
while (last_quote != string::npos) {
inside_quotes = !inside_quotes;
last_quote = lineStr.find_first_of('"', last_quote+1);
}
unsigned int pos=0;
while ( buf[0] ) {
bool done = false;
string data;
char * end;
if ( _type == CSV && buf[0] == '"' ) {
buf++; //skip first '"'
row.append(lineStr);
while (true) {
end = strchr( buf , '"' );
if (!end) {
data += buf;
done = true;
break;
}
else if (end[1] == '"') {
// two '"'s get appended as one
data.append(buf, end-buf+1); //include '"'
buf = end+2; //skip both '"'s
}
else if (end[-1] == '\\') {
// "\\\"" gets appended as '"'
data.append(buf, end-buf-1); //exclude '\\'
data.append("\"");
buf = end+1; //skip the '"'
}
else {
data.append(buf, end-buf);
buf = end+2; //skip '"' and ','
break;
}
if (inside_quotes) {
row.append("\n");
int num = getLine(in, line);
line += num;
numBytesRead += num;
uassert (15854, "CSV file ends while inside quoted field", line[0] != '\0');
numBytesRead += strlen( line );
} else {
break;
}
}
else {
end = strstr( buf , _sep );
if ( ! end ) {
done = true;
data = string( buf );
}
else {
data = string( buf , end - buf );
buf = end+1;
}
// now 'row' is string corresponding to one row of the CSV file
// (which may span multiple lines) and represents one BSONObj
csvTokenizeRow(row, tokens);
}
else { // _type == TSV
while (line[0] != '\t' && isspace(line[0])) { // Strip leading whitespace, but not tabs
line++;
}
tokens.push_back(data);
if ( done )
break;
boost::split(tokens, line, boost::is_any_of(_sep));
}
for (vector<string>::iterator token = tokens.begin(); token != tokens.end(); ++token) {
// Now that the row is tokenized, create a BSONObj out of it.
BSONObjBuilder b;
unsigned int pos=0;
for (vector<string>::iterator it = tokens.begin(); it != tokens.end(); ++it) {
string token = *it;
if ( _headerLine ) {
_fields.push_back(stripLeadingWhitespace(*token));
_fields.push_back(token);
}
else {
string name;
@@ -213,10 +246,9 @@ class Import : public Tool {
}
pos++;
_append( b , name , *token );
_append( b , name , token );
}
}
o = b.obj();
return true;
}
@@ -342,26 +374,26 @@ public:
int num = 0;
int errors = 0;
int len = 0;
// buf and line are only used when parsing a jsonArray
boost::scoped_array<char> line(new char[BUF_SIZE+2]);
char* buf = line.get();
// buffer and line are only used when parsing a jsonArray
boost::scoped_array<char> buffer(new char[BUF_SIZE+2]);
char* line = buffer.get();
while ( _jsonArray || in->rdstate() == 0 ) {
try {
BSONObj o;
if (_jsonArray) {
int bytesProcessed = 0;
if (buf == line.get()) { // Only read on first pass - the whole array must be on one line.
bytesProcessed = getLine(in, buf);
buf += bytesProcessed;
if (line == buffer.get()) { // Only read on first pass - the whole array must be on one line.
bytesProcessed = getLine(in, line);
line += bytesProcessed;
len += bytesProcessed;
}
if ((bytesProcessed = parseJSONArray(buf, o)) < 0) {
if ((bytesProcessed = parseJSONArray(line, o)) < 0) {
len += bytesProcessed;
break;
}
len += bytesProcessed;
buf += len;
line += len;
}
else {
if (!parseRow(in, o, len)) {
@@ -398,7 +430,7 @@ public:
}
catch ( std::exception& e ) {
cout << "exception:" << e.what() << endl;
cout << buf << endl;
cout << line << endl;
errors++;
if (hasParam("stopOnError") || _jsonArray)