port remaining shell util functions except for fork

This commit is contained in:
Aaron
2009-05-14 11:42:52 -04:00
parent 3200bd7872
commit 397e5b13e8
3 changed files with 258 additions and 3 deletions

View File

@@ -23,6 +23,9 @@ namespace mongo {
namespace shellUtils {
const char *argv0 = 0;
void RecordMyLocation( const char *_argv0 ) { argv0 = _argv0; }
// helpers
BSONObj makeUndefined() {
@@ -101,6 +104,11 @@ namespace mongo {
}
#ifndef _WIN32
#include <signal.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/wait.h>
BSONObj AllocatePorts( const BSONObj &args ) {
uassert( "allocatePorts takes exactly 1 argument", args.nFields() == 1 );
uassert( "allocatePorts needs to be passed an integer", args.firstElement().isNumber() );
@@ -132,14 +140,253 @@ namespace mongo {
b.append( "", ports );
return b.obj();
}
map< int, pair< pid_t, int > > dbs;
char *copyString( const char *original ) {
char *ret = reinterpret_cast< char * >( malloc( strlen( original ) + 1 ) );
strcpy( ret, original );
return ret;
}
boost::mutex &mongoProgramOutputMutex( *( new boost::mutex ) );
stringstream mongoProgramOutput_;
void writeMongoProgramOutputLine( int port, const char *line ) {
boost::mutex::scoped_lock lk( mongoProgramOutputMutex );
stringstream buf;
buf << "m" << port << "| " << line;
cout << buf.str() << endl;
mongoProgramOutput_ << buf.str() << endl;
}
BSONObj RawMongoProgramOutput( const BSONObj &args ) {
boost::mutex::scoped_lock lk( mongoProgramOutputMutex );
return BSON( "" << mongoProgramOutput_.str() );
}
class MongoProgramRunner {
char **argv_;
int port_;
int pipe_;
public:
MongoProgramRunner( const BSONObj &args ) {
assert( args.nFields() > 0 );
string program( args.firstElement().valuestrsafe() );
assert( !program.empty() );
boost::filesystem::path programPath = ( boost::filesystem::path( argv0 ) ).branch_path() / program;
massert( "couldn't find " + programPath.native_file_string(), boost::filesystem::exists( programPath ) );
port_ = -1;
argv_ = new char *[ args.nFields() + 1 ];
{
string s = programPath.native_file_string();
if ( s == program )
s = "./" + s;
argv_[ 0 ] = copyString( s.c_str() );
}
BSONObjIterator j( args );
j.next();
for( int i = 1; i < args.nFields(); ++i ) {
BSONElement e = j.next();
string str;
if ( e.isNumber() ) {
stringstream ss;
ss << e.number();
str = ss.str();
} else {
assert( e.type() == mongo::String );
str = e.valuestr();
}
char *s = copyString( str.c_str() );
if ( string( "--port" ) == s )
port_ = -2;
else if ( port_ == -2 )
port_ = strtol( s, 0, 10 );
argv_[ i ] = s;
}
argv_[ args.nFields() ] = 0;
assert( port_ > 0 );
if ( dbs.count( port_ ) != 0 ){
cerr << "count for port: " << port_ << " is not 0 is: " << dbs.count( port_ ) << endl;
assert( dbs.count( port_ ) == 0 );
}
}
void start() {
int pipeEnds[ 2 ];
assert( pipe( pipeEnds ) != -1 );
fflush( 0 );
pid_t pid = fork();
assert( pid != -1 );
if ( pid == 0 ) {
assert( dup2( pipeEnds[ 1 ], STDOUT_FILENO ) != -1 );
assert( dup2( pipeEnds[ 1 ], STDERR_FILENO ) != -1 );
execvp( argv_[ 0 ], argv_ );
assert( "Unable to start program" == 0 );
}
cout << "shell: started mongo program";
int i = 0;
while( argv_[ i ] )
cout << " " << argv_[ i++ ];
cout << endl;
i = 0;
while( argv_[ i ] )
free( argv_[ i++ ] );
free( argv_ );
dbs.insert( make_pair( port_, make_pair( pid, pipeEnds[ 1 ] ) ) );
pipe_ = pipeEnds[ 0 ];
}
// Continue reading output
void operator()() {
// This assumes there aren't any 0's in the mongo program output.
// Hope that's ok.
char buf[ 1024 ];
char temp[ 1024 ];
char *start = buf;
while( 1 ) {
int lenToRead = 1023 - ( start - buf );
int ret = read( pipe_, (void *)start, lenToRead );
assert( ret != -1 );
start[ ret ] = '\0';
if ( strlen( start ) != unsigned( ret ) )
writeMongoProgramOutputLine( port_, "WARNING: mongod wrote null bytes to output" );
char *last = buf;
for( char *i = strchr( buf, '\n' ); i; last = i + 1, i = strchr( last, '\n' ) ) {
*i = '\0';
writeMongoProgramOutputLine( port_, last );
}
if ( ret == 0 ) {
if ( *last )
writeMongoProgramOutputLine( port_, last );
close( pipe_ );
break;
}
if ( last != buf ) {
strcpy( temp, last );
strcpy( buf, temp );
} else {
assert( strlen( buf ) < 1023 );
}
start = buf + strlen( buf );
}
}
};
BSONObj StartMongoProgram( const BSONObj &a ) {
MongoProgramRunner r( a );
r.start();
boost::thread t( r );
return undefined_;
}
BSONObj ResetDbpath( const BSONObj &a ) {
assert( a.nFields() == 1 );
string path = a.firstElement().valuestrsafe();
assert( !path.empty() );
if ( boost::filesystem::exists( path ) )
boost::filesystem::remove_all( path );
boost::filesystem::create_directory( path );
return undefined_;
}
void killDb( int port, int signal ) {
if( dbs.count( port ) != 1 ) {
cout << "No db started on port: " << port << endl;
return;
}
pid_t pid = dbs[ port ].first;
assert( 0 == kill( pid, signal ) );
int i = 0;
for( ; i < 65; ++i ) {
if ( i == 5 ) {
char now[64];
time_t_to_String(time(0), now);
now[ 20 ] = 0;
cout << now << " process on port " << port << ", with pid " << pid << " not terminated, sending sigkill" << endl;
assert( 0 == kill( pid, SIGKILL ) );
}
int temp;
int ret = waitpid( pid, &temp, WNOHANG );
if ( ret == pid )
break;
sleepms( 1000 );
}
if ( i == 65 ) {
char now[64];
time_t_to_String(time(0), now);
now[ 20 ] = 0;
cout << now << " failed to terminate process on port " << port << ", with pid " << pid << endl;
assert( "Failed to terminate process" == 0 );
}
close( dbs[ port ].second );
dbs.erase( port );
if ( i > 4 || signal == SIGKILL ) {
sleepms( 4000 ); // allow operating system to reclaim resources
}
}
BSONObj StopMongoProgram( const BSONObj &a ) {
assert( a.nFields() == 1 || a.nFields() == 2 );
assert( a.firstElement().isNumber() );
int port = int( a.firstElement().number() );
int signal = SIGTERM;
if ( a.nFields() == 2 ) {
BSONObjIterator i( a );
i.next();
BSONElement e = i.next();
assert( e.isNumber() );
signal = int( e.number() );
}
killDb( port, signal );
cout << "shell: stopped mongo program on port " << port << endl;
return undefined_;
}
void KillMongoProgramInstances() {
vector< int > ports;
for( map< int, pair< pid_t, int > >::iterator i = dbs.begin(); i != dbs.end(); ++i )
ports.push_back( i->first );
for( vector< int >::iterator i = ports.begin(); i != ports.end(); ++i )
killDb( *i, SIGTERM );
}
MongoProgramScope::~MongoProgramScope() {
try {
KillMongoProgramInstances();
} catch ( ... ) {
assert( false );
}
}
#else
MongoProgramScope::~MongoProgramScope() {}
void KillMongoProgramInstances() {}
#endif
void installShellUtils( Scope& scope ){
void installShellUtils( Scope& scope ){
scope.injectNative( "listFiles" , listFiles );
scope.injectNative( "sleep" , JSSleep );
scope.injectNative( "quit", Quit );
#if !defined(_WIN32)
scope.injectNative( "allocatePorts", AllocatePorts );
scope.injectNative( "_startMongoProgram", StartMongoProgram );
scope.injectNative( "stopMongod", StopMongoProgram );
scope.injectNative( "stopMongoProgram", StopMongoProgram );
scope.injectNative( "resetDbpath", ResetDbpath );
scope.injectNative( "rawMongoProgramOutput", RawMongoProgramOutput );
#endif
}