0
0
mirror of https://github.com/mongodb/mongo.git synced 2024-12-01 09:32:32 +01:00

journal compression

This commit is contained in:
dwight 2011-07-21 17:41:30 -04:00
parent 97e6514ff2
commit 9a30ac81c4
12 changed files with 157 additions and 76 deletions

View File

@ -504,12 +504,11 @@ namespace mongo {
commitJob.notifyCommitted();
return true;
}
PREPLOGBUFFER();
RWLockRecursive::Shared lk3(MongoFile::mmmutex);
unsigned abLen = commitJob._ab.len();
unsigned abLen = commitJob._ab.len() + sizeof(JSectFooter);
commitJob.reset(); // must be reset before allowing anyone to write
DEV assert( !commitJob.hasWritten() );
@ -517,7 +516,6 @@ namespace mongo {
lk1.reset();
// ****** now other threads can do writes ******
WRITETOJOURNAL(commitJob._ab);
assert( abLen == commitJob._ab.len() ); // a check that no one touched the builder while we were doing work. if so, our locking is wrong.

View File

@ -95,6 +95,11 @@ namespace mongo {
assert(false);
}
JSectFooter::JSectFooter() {
memset(this, 0, sizeof(*this));
sentinel = JEntry::OpCode_Footer;
}
JSectFooter::JSectFooter(const void* begin, int len) { // needs buffer to compute hash
sentinel = JEntry::OpCode_Footer;
reserved = 0;
@ -629,24 +634,52 @@ namespace mongo {
}
}
/** write to journal
/** write to journal (called by WRITETOJOURNAL)
*/
void journal(const AlignedBuilder& b) {
void journal(AlignedBuilder& b) {
j.journal(b);
}
void Journal::journal(const AlignedBuilder& _b) {
#if defined(_NOCOMPRESS)
void Journal::journal(AlignedBuilder& _b) {
RACECHECK
#if 0
unsigned w = _b.len();
const AlignedBuilder& b = _b;
#else
static AlignedBuilder compressed(32*1024*1024);
compressed.reset( maxCompressedLength(_b.len()) );
size_t compressedLength = 0;
rawCompress(_b.buf(), _b.len(), compressed.atOfs(0), &compressedLength);
unsigned w = (compressedLength+8191)&(~8191);
const AlignedBuilder& b = compressed;
#endif
static AlignedBuilder cb(32*1024*1024);
const unsigned headTailSize = sizeof(JSectHeader) + sizeof(JSectFooter);
cb.reset( maxCompressedLength(_b.len()) + headTailSize );
{
JSectHeader *h = (JSectHeader *) _b.buf();
dassert( h->sectionLen() == (unsigned) 0xffffffff );
cb.appendStruct(*h);
}
size_t compressedLength = 0;
rawCompress(_b.buf(), _b.len() - headTailSize, cb.cur(), &compressedLength);
assert( compressedLength < 0xffffffff );
cb.skip(compressedLength);
// footer
unsigned L = 0xffffffff;
{
// pad to alignment, and set the total section length in the JSectHeader
assert( 0xffffe000 == (~(Alignment-1)) );
unsigned lenUnpadded = cb.len() + sizeof(JSectFooter);
L = (lenUnpadded + Alignment-1) & (~(Alignment-1));
dassert( L >= lenUnpadded );
((JSectHeader*)cb.atOfs(0))->setSectionLen(lenUnpadded);
JSectFooter f(cb.buf(), cb.len()); // computes checksum
cb.appendStruct(f);
// need a footer for uncompressed buffer too so that WRITETODATAFILES is happy.
// done this way as we do not need a checksum for that.
JSectFooter uncompressedFooter;
_b.appendStruct(uncompressedFooter);
((JSectHeader*)_b.atOfs(0))->setSectionLen(_b.len());
}
try {
mutex::scoped_lock lk(_curLogFileMutex);
@ -655,9 +688,11 @@ namespace mongo {
assert( _curLogFile );
stats.curr->_uncompressedBytes += _b.len();
stats.curr->_journaledBytes += w;
unsigned w = cb.len();
_written += w;
_curLogFile->synchronousAppend((void *) b.buf(), w);
assert( w <= L );
stats.curr->_journaledBytes += L;
_curLogFile->synchronousAppend((void *) cb.buf(), L);
_rotate();
}
catch(std::exception& e) {

View File

@ -41,7 +41,7 @@ namespace mongo {
@param buf - a buffer that will be written to the journal.
will not return until on disk
*/
void journal(const AlignedBuilder& buf);
void journal(AlignedBuilder& buf);
/** flag that something has gone wrong during writing to the journal
(not for recovery mode)

View File

@ -22,6 +22,8 @@ namespace mongo {
namespace dur {
const unsigned Alignment = 8192;
#pragma pack(1)
/** beginning header for a journal/j._<n> file
there is nothing important int this header at this time. except perhaps version #.
@ -59,11 +61,25 @@ namespace mongo {
/** "Section" header. A section corresponds to a group commit.
len is length of the entire section including header and footer.
header and footer are not compressed, just the stuff in between.
*/
struct JSectHeader {
unsigned len; // length in bytes of the whole section
private:
unsigned _sectionLen; // unpadded length in bytes of the whole section
public:
unsigned long long seqNumber; // sequence number that can be used on recovery to not do too much work
unsigned long long fileId; // matches JHeader::fileId
unsigned sectionLen() const { return _sectionLen; }
// we store the unpadded length so we can use that when we uncompress. to
// get the true total size this must be rounded up to the Alignment.
void setSectionLen(unsigned lenUnpadded) { _sectionLen = lenUnpadded; }
unsigned sectionLenWithPadding() const {
unsigned x = (sectionLen() + (Alignment-1)) & (~(Alignment-1));
dassert( x % Alignment == 0 );
return x;
}
};
/** an individual write operation within a group commit section. Either the entire section should
@ -115,6 +131,7 @@ namespace mongo {
/** group commit section footer. md5 is a key field. */
struct JSectFooter {
JSectFooter();
JSectFooter(const void* begin, int len); // needs buffer to compute hash
unsigned sentinel;
unsigned char hash[16];
@ -127,6 +144,8 @@ namespace mongo {
@return true if buffer looks valid
*/
bool checkHash(const void* begin, int len) const;
void checkMagic() const { assert( *((unsigned*)magic) == 0x0a0a0a0a ); }
};
/** declares "the next entry(s) are for this database / file path prefix" */

View File

@ -35,7 +35,7 @@ namespace mongo {
/** write to journal
*/
void journal(const AlignedBuilder& b);
void journal(AlignedBuilder& b);
boost::filesystem::path getFilePathFor(int filenumber) const;

View File

@ -130,7 +130,7 @@ namespace mongo {
// JSectHeader
JSectHeader h;
h.len = (unsigned) 0xffffffff; // total length, will fill in later
h.setSectionLen(0xffffffff); // total length, will fill in later
h.seqNumber = getLastDataFileFlushTime();
h.fileId = j.curFileId();
@ -153,7 +153,7 @@ namespace mongo {
}
AlignedBuilder& bb = commitJob._ab;
resetLogBuffer(bb);
resetLogBuffer(bb); // adds JSectHeader
// ops other than basic writes (DurOp's)
{
@ -162,27 +162,7 @@ namespace mongo {
}
}
{
prepBasicWrites(bb);
}
// pad to alignment, and set the total section length in the JSectHeader
assert( 0xffffe000 == (~(Alignment-1)) );
unsigned lenWillBe = bb.len() + sizeof(JSectFooter);
unsigned L = (lenWillBe + Alignment-1) & (~(Alignment-1));
dassert( L >= lenWillBe );
*((unsigned*)bb.atOfs(0)) = L;
{
JSectFooter f(bb.buf(), bb.len());
bb.appendStruct(f);
}
{
unsigned padding = L - bb.len();
bb.skip(padding);
dassert( bb.len() % Alignment == 0 );
}
prepBasicWrites(bb);
return;
}

View File

@ -27,6 +27,7 @@
#include "namespace.h"
#include "../util/mongoutils/str.h"
#include "../util/bufreader.h"
#include "../util/concurrency/race.h"
#include "pdfile.h"
#include "database.h"
#include "db.h"
@ -35,6 +36,7 @@
#include "cmdline.h"
#include "curop.h"
#include "mongommf.h"
#include "../util/compress.h"
#include <sys/stat.h>
#include <fcntl.h>
@ -92,15 +94,38 @@ namespace mongo {
throws
*/
class JournalSectionIterator : boost::noncopyable {
unique_ptr<BufReader> _br;
const JSectHeader* _sectHead;
const char *_lastDbName; // pointer into mmaped journal file
const bool _doDurOps;
string _uncompressed;
public:
JournalSectionIterator(const void *p, unsigned len, bool doDurOps)
: _br(p, len)
, _sectHead(static_cast<const JSectHeader*>(_br.skip(sizeof(JSectHeader))))
, _lastDbName(NULL)
, _doDurOps(doDurOps)
{}
JournalSectionIterator(const void *p, unsigned len, bool doDurOpsRecovering)
:
_sectHead(static_cast<const JSectHeader*>(p))
, _lastDbName(0)
, _doDurOps(doDurOpsRecovering)
{
if( doDurOpsRecovering ) {
// compressed case
assert( sizeof(JSectHeader) + sizeof(JSectFooter) >= len );
// this could be done in a streaming manner which would be better in terms of memory use - probably not worth the extra code complexity though
bool ok = uncompress((const char *)p, len - sizeof(JSectHeader) - sizeof(JSectFooter), &_uncompressed);
if( !ok ) {
// it should always be ok (i think?) as there is a previous check to see that the JSectFooter is ok
log() << "fatal error couldn't uncompress section during journal recovery" << endl;
mongoAbort("journal recovery uncompress failure");
}
_br = unique_ptr<BufReader>( new BufReader(_uncompressed.c_str(), _uncompressed.size()) );
}
else {
// we work with the uncompressed buffer when doing a WRITETODATAFILES (for speed)
_br = unique_ptr<BufReader>( new BufReader(p, len) );
_br->skip(sizeof(JSectHeader));
}
}
bool atEof() const { return _br.atEof(); }
bool atEof() const { return _br->atEof(); }
unsigned long long seqNumber() const { return _sectHead->seqNumber; }
@ -110,20 +135,27 @@ namespace mongo {
*/
bool next(ParsedJournalEntry& e) {
unsigned lenOrOpCode;
_br.read(lenOrOpCode);
_br->read(lenOrOpCode);
if (lenOrOpCode > JEntry::OpCode_Min) {
switch( lenOrOpCode ) {
case JEntry::OpCode_Footer: {
if (_doDurOps) {
const char* pos = (const char*) _br.pos();
const char* pos = (const char*) _br->pos();
pos -= sizeof(lenOrOpCode); // rewind to include OpCode
const JSectFooter& footer = *(const JSectFooter*)pos;
int len = pos - (char*)_sectHead;
if (!footer.checkHash(_sectHead, len)) {
massert(13594, "journal checksum doesn't match", false);
if( _doDurOps ) {
if (!footer.checkHash(_sectHead, len)) {
massert(13594, "journal checksum doesn't match", false);
}
}
else {
// no need to check on WRITEDODATAFILES. we check a little to self test
RARELY assert( footer.checkHash(_sectHead, len) );
}
footer.checkMagic();
}
return false; // false return value denotes end of section
}
@ -131,7 +163,7 @@ namespace mongo {
case JEntry::OpCode_FileCreated:
case JEntry::OpCode_DropDb: {
e.dbName = 0;
boost::shared_ptr<DurOp> op = DurOp::read(lenOrOpCode, _br);
boost::shared_ptr<DurOp> op = DurOp::read(lenOrOpCode, *_br);
if (_doDurOps) {
e.op = op;
}
@ -139,12 +171,12 @@ namespace mongo {
}
case JEntry::OpCode_DbContext: {
_lastDbName = (const char*) _br.pos();
const unsigned limit = std::min((unsigned)Namespace::MaxNsLen, _br.remaining());
_lastDbName = (const char*) _br->pos();
const unsigned limit = std::min((unsigned)Namespace::MaxNsLen, _br->remaining());
const unsigned len = strnlen(_lastDbName, limit);
massert(13533, "problem processing journal file during recovery", _lastDbName[len] == '\0');
_br.skip(len+1); // skip '\0' too
_br.read(lenOrOpCode);
_br->skip(len+1); // skip '\0' too
_br->read(lenOrOpCode);
}
// fall through as a basic operation always follows jdbcontext, and we don't have anything to return yet
@ -156,18 +188,13 @@ namespace mongo {
// JEntry - a basic write
assert( lenOrOpCode && lenOrOpCode < JEntry::OpCode_Min );
_br.rewind(4);
e.e = (JEntry *) _br.skip(sizeof(JEntry));
_br->rewind(4);
e.e = (JEntry *) _br->skip(sizeof(JEntry));
e.dbName = e.e->isLocalDbContext() ? "local" : _lastDbName;
assert( e.e->len == lenOrOpCode );
_br.skip(e.e->len);
_br->skip(e.e->len);
return true;
}
private:
BufReader _br;
const JSectHeader* _sectHead;
const char *_lastDbName; // pointer into mmaped journal file
const bool _doDurOps;
};
static string fileName(const char* dbName, int fileNo) {
@ -287,9 +314,22 @@ namespace mongo {
}
void RecoveryJob::processSection(const void *p, unsigned len) {
scoped_lock lk(_mx);
JSectHeader *h = (JSectHeader *) p;
scoped_lock lk(_mx);
RACECHECK
// we use a static so that we don't have to reallocate every time through. occasionally we
// go back to a small allocation so that if there were a spiky growth it won't stick forever.
static vector<ParsedJournalEntry> entries;
entries.clear();
RARELY OCCASIONALLY {
if( entries.capacity() > 2048 ) {
entries.shrink_to_fit();
entries.reserve(2048);
}
}
vector<ParsedJournalEntry> entries;
JournalSectionIterator i(p, len, _recovering);
//DEV log() << "recovery processSection seq:" << i.seqNumber() << endl;
@ -342,11 +382,11 @@ namespace mongo {
if( h.fileId != fileId ) {
if( debug || (cmdLine.durOptions & CmdLine::DurDumpJournal) ) {
log() << "Ending processFileBuffer at differing fileId want:" << fileId << " got:" << h.fileId << endl;
log() << " sect len:" << h.len << " seqnum:" << h.seqNumber << endl;
log() << " sect len:" << h.sectionLen() << " seqnum:" << h.seqNumber << endl;
}
return true;
}
processSection(br.skip(h.len), h.len);
processSection(br.skip(h.sectionLenWithPadding()), h.sectionLen());
// ctrl c check
killCurrentOp.checkForInterrupt(false);
@ -440,6 +480,8 @@ namespace mongo {
extern mutex groupCommitMutex;
boost::mutex foo;
/** recover from a crash
called during startup
throws on error

View File

@ -18,7 +18,10 @@ namespace mongo {
RecoveryJob() :_lastDataSyncedFromLastRun(0), _mx("recovery"), _recovering(false) { _lastSeqMentionedInConsoleLog = 1; }
void go(vector<path>& files);
~RecoveryJob();
/** @param recovering indicates we are doing recovery and not a WRITETODATAFILES */
void processSection(const void *, unsigned len);
void close(); // locks and calls _close()
static RecoveryJob & get() { return _instance; }

View File

@ -28,8 +28,6 @@ namespace mongo {
namespace dur {
const unsigned Alignment = 8192;
/** DurOp - Operations we journal that aren't just basic writes.
*
* Basic writes are logged as JEntry's, and indicated in ram temporarily as struct dur::WriteIntent.

View File

@ -75,7 +75,7 @@ namespace mongo {
fileSuffixNo() is 3
if the suffix is "ns", fileSuffixNo -1
*/
RelativePath relativePath() const {
const RelativePath& relativePath() const {
DEV assert( !_p._p.empty() );
return _p;
}

View File

@ -42,12 +42,14 @@ namespace mongo {
/** reset with a hint as to the upcoming needed size specified */
void AlignedBuilder::reset(unsigned sz) {
_len = 0;
unsigned Q = 64 * 1024 * 1024 - 1;
unsigned Q = 32 * 1024 * 1024 - 1;
unsigned want = (sz+Q) & (~Q);
if( _p._size == want ) {
return;
}
if( _p._size > want ) {
if( _p._size > want ) {
if( _p._size <= 64 * 1024 * 1024 )
return;
bool downsize = false;
RARELY { downsize = true; }
if( !downsize )

View File

@ -46,8 +46,12 @@ namespace mongo {
return l;
}
/** if buffer grows pointer no longer valid */
char* atOfs(unsigned ofs) { return _p._data + ofs; }
/** if buffer grows pointer no longer valid */
char* cur() { return _p._data + _len; }
void appendChar(char j) {
*((char*)grow(sizeof(char))) = j;
}