// syncclusterconnection.cpp /* * Copyright 2010 10gen Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "stdafx.h" #include "syncclusterconnection.h" // error codes 8000-8009 namespace mongo { SyncClusterConnection::SyncClusterConnection( string commaSeperated ){ string::size_type idx; while ( ( idx = commaSeperated.find( ',' ) ) != string::npos ){ string h = commaSeperated.substr( 0 , idx ); commaSeperated = commaSeperated.substr( idx + 1 ); _connect( h ); } _connect( commaSeperated ); uassert( 8004 , "SyncClusterConnection needs 3 servers" , _conns.size() == 3 ); } SyncClusterConnection::SyncClusterConnection( string a , string b , string c ){ // connect to all even if not working _connect( a ); _connect( b ); _connect( c ); } SyncClusterConnection::~SyncClusterConnection(){ for ( size_t i=0; i<_conns.size(); i++ ) delete _conns[i]; _conns.clear(); } bool SyncClusterConnection::prepare( string& errmsg ){ return fsync( errmsg ); } bool SyncClusterConnection::fsync( string& errmsg ){ bool ok = true; errmsg = ""; for ( size_t i=0; i<_conns.size(); i++ ){ BSONObj res; try { if ( _conns[i]->simpleCommand( "admin" , 0 , "fsync" ) ) continue; } catch ( std::exception& e ){ errmsg += e.what(); } catch ( ... ){ } ok = false; errmsg += _conns[i]->toString() + ":" + res.toString(); } return ok; } void SyncClusterConnection::_checkLast(){ vector all; vector errors; for ( size_t i=0; i<_conns.size(); i++ ){ BSONObj res; string err; try { if ( ! _conns[i]->runCommand( "admin" , BSON( "getlasterror" << 1 << "fsync" << 1 ) , res ) ) err = "cmd failed: "; } catch ( std::exception& e ){ err += e.what(); } catch ( ... ){ err += "unknown failure"; } all.push_back( res ); errors.push_back( err ); } assert( all.size() == errors.size() && all.size() == _conns.size() ); stringstream err; bool ok = true; for ( size_t i = 0; i<_conns.size(); i++ ){ BSONObj res = all[i]; if ( res["ok"].trueValue() && res["fsyncFiles"].numberInt() > 0 ) continue; ok = false; err << _conns[i]->toString() << ": " << res << " " << errors[i]; } if ( ok ) return; throw UserException( 8001 , (string)"SyncClusterConnection write op failed: " + err.str() ); } void SyncClusterConnection::_connect( string host ){ log() << "SyncClusterConnection connecting to: " << host << endl; DBClientConnection * c = new DBClientConnection( true ); string errmsg; if ( ! c->connect( host , errmsg ) ) log() << "SyncClusterConnection connect fail to: " << host << " errmsg: " << errmsg << endl; _conns.push_back( c ); } auto_ptr SyncClusterConnection::query(const string &ns, Query query, int nToReturn, int nToSkip, const BSONObj *fieldsToReturn, int queryOptions){ uassert( 10021 , "$cmd not support yet in SyncClusterConnection::query" , ns.find( "$cmd" ) == string::npos ); for ( size_t i=0; i<_conns.size(); i++ ){ try { auto_ptr cursor = _conns[i]->query( ns , query , nToReturn , nToSkip , fieldsToReturn , queryOptions ); if ( cursor.get() ) return cursor; log() << "query failed to: " << _conns[i]->toString() << " no data" << endl; } catch ( ... ){ log() << "query failed to: " << _conns[i]->toString() << " exception" << endl; } } throw UserException( 8002 , "all servers down!" ); } auto_ptr SyncClusterConnection::getMore( const string &ns, long long cursorId, int nToReturn, int options ){ uassert( 10022 , "SyncClusterConnection::getMore not supported yet" , 0); auto_ptr c; return c; } void SyncClusterConnection::insert( const string &ns, BSONObj obj ){ string errmsg; if ( ! prepare( errmsg ) ) throw UserException( 8003 , (string)"SyncClusterConnection::insert prepare failed: " + errmsg ); for ( size_t i=0; i<_conns.size(); i++ ){ _conns[i]->insert( ns , obj ); } _checkLast(); } void SyncClusterConnection::insert( const string &ns, const vector< BSONObj >& v ){ uassert( 10023 , "SyncClusterConnection bulk insert not implemented" , 0); } void SyncClusterConnection::remove( const string &ns , Query query, bool justOne ){ assert(0); } void SyncClusterConnection::update( const string &ns , Query query , BSONObj obj , bool upsert , bool multi ){ assert(0); } string SyncClusterConnection::toString(){ stringstream ss; ss << "SyncClusterConnection ["; for ( size_t i=0; i<_conns.size(); i++ ){ if ( i > 0 ) ss << ","; ss << _conns[i]->toString(); } ss << "]"; return ss.str(); } }