Logo Search packages:      
Sourcecode: ceph version File versions  Download package

journal.cc

// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
// vim: ts=8 sw=2 smarttab
/*
 * Ceph - scalable distributed file system
 *
 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
 *
 * This is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License version 2.1, as published by the Free Software 
 * Foundation.  See file COPYING.
 * 
 */

#include "config.h"
#include "events/EString.h"
#include "events/ESubtreeMap.h"
#include "events/ESession.h"
#include "events/ESessions.h"

#include "events/EMetaBlob.h"

#include "events/EUpdate.h"
#include "events/ESlaveUpdate.h"
#include "events/EOpen.h"
#include "events/ECommitted.h"

#include "events/EExport.h"
#include "events/EImportStart.h"
#include "events/EImportFinish.h"
#include "events/EFragment.h"

#include "events/ETableClient.h"
#include "events/ETableServer.h"


#include "LogSegment.h"

#include "MDS.h"
#include "MDLog.h"
#include "MDCache.h"
#include "Server.h"
#include "Migrator.h"

#include "InoTable.h"
#include "MDSTableClient.h"
#include "MDSTableServer.h"

#include "Locker.h"


#include "config.h"

#define DOUT_SUBSYS mds
#undef DOUT_COND
#define DOUT_COND(l) l<=g_conf.debug_mds || l <= g_conf.debug_mds_log || l <= g_conf.debug_mds_log_expire
#undef dout_prefix
#define dout_prefix *_dout << dbeginl << "mds" << mds->get_nodeid() << ".journal "


// -----------------------
// LogSegment

class C_MDL_RetryExpireSegment : public Context {
public:
  MDS *mds;
  LogSegment *ls;
  C_MDL_RetryExpireSegment(MDS *m, LogSegment *l) : mds(m), ls(l) {}
  void finish(int r) {
    ls->try_to_expire(mds);
  }
};

C_Gather *LogSegment::try_to_expire(MDS *mds)
{
  C_Gather *gather = 0;

  set<CDir*> commit;

  dout(6) << "LogSegment(" << offset << ").try_to_expire" << dendl;

  // commit dirs
  for (elist<CDir*>::iterator p = new_dirfrags.begin(); !p.end(); ++p) {
    dout(20) << " new_dirfrag " << **p << dendl;
    assert((*p)->is_auth());
    commit.insert(*p);
  }
  for (elist<CDir*>::iterator p = dirty_dirfrags.begin(); !p.end(); ++p) {
    dout(20) << " dirty_dirfrag " << **p << dendl;
    assert((*p)->is_auth());
    commit.insert(*p);
  }
  for (elist<CDentry*>::iterator p = dirty_dentries.begin(); !p.end(); ++p) {
    dout(20) << " dirty_dentry " << **p << dendl;
    assert((*p)->is_auth());
    commit.insert((*p)->get_dir());
  }
  for (elist<CInode*>::iterator p = dirty_inodes.begin(); !p.end(); ++p) {
    dout(20) << " dirty_inode " << **p << dendl;
    assert((*p)->is_auth());
    if ((*p)->is_base()) {
      if (!gather) gather = new C_Gather;
      (*p)->store(gather->new_sub());
    } else
      commit.insert((*p)->get_parent_dn()->get_dir());
  }

  if (!commit.empty()) {
    if (!gather) gather = new C_Gather;
    
    for (set<CDir*>::iterator p = commit.begin();
       p != commit.end();
       ++p) {
      CDir *dir = *p;
      assert(dir->is_auth());
      if (dir->can_auth_pin()) {
      dout(15) << "try_to_expire committing " << *dir << dendl;
      dir->commit(0, gather->new_sub());
      } else {
      dout(15) << "try_to_expire waiting for unfreeze on " << *dir << dendl;
      dir->add_waiter(CDir::WAIT_UNFREEZE, gather->new_sub());
      }
    }
  }

  // master ops with possibly uncommitted slaves
  for (set<metareqid_t>::iterator p = uncommitted_masters.begin();
       p != uncommitted_masters.end();
       p++) {
    dout(10) << "try_to_expire waiting for slaves to ack commit on " << *p << dendl;
    if (!gather) gather = new C_Gather;
    mds->mdcache->wait_for_uncommitted_master(*p, gather->new_sub());
  }

  // nudge scatterlocks
  for (elist<CInode*>::iterator p = dirty_dirfrag_dir.begin(); !p.end(); ++p) {
    CInode *in = *p;
    dout(10) << "try_to_expire waiting for dirlock flush on " << *in << dendl;
    if (!gather) gather = new C_Gather;
    mds->locker->scatter_nudge(&in->filelock, gather->new_sub());
  }
  for (elist<CInode*>::iterator p = dirty_dirfrag_dirfragtree.begin(); !p.end(); ++p) {
    CInode *in = *p;
    dout(10) << "try_to_expire waiting for dirfragtreelock flush on " << *in << dendl;
    if (!gather) gather = new C_Gather;
    mds->locker->scatter_nudge(&in->dirfragtreelock, gather->new_sub());
  }
  for (elist<CInode*>::iterator p = dirty_dirfrag_nest.begin(); !p.end(); ++p) {
    CInode *in = *p;
    dout(10) << "try_to_expire waiting for nest flush on " << *in << dendl;
    if (!gather) gather = new C_Gather;
    mds->locker->scatter_nudge(&in->nestlock, gather->new_sub());
  }

  // open files
  if (!open_files.empty()) {
    assert(!mds->mdlog->is_capped()); // hmm FIXME
    EOpen *le = 0;
    LogSegment *ls = mds->mdlog->get_current_segment();
    assert(ls != this);
    elist<CInode*>::iterator p = open_files.begin(member_offset(CInode, item_open_file));
    while (!p.end()) {
      CInode *in = *p;
      assert(in->last == CEPH_NOSNAP);
      ++p;
      if (in->is_any_caps()) {
      if (in->is_any_caps_wanted()) {
        dout(20) << "try_to_expire requeueing open file " << *in << dendl;
        if (!le) {
          le = new EOpen(mds->mdlog);
          mds->mdlog->start_entry(le);
        }
        le->add_clean_inode(in);
        ls->open_files.push_back(&in->item_open_file);
      } else {
        // drop inodes that aren't wanted
        dout(20) << "try_to_expire not requeueing and delisting unwanted file " << *in << dendl;
        in->item_open_file.remove_myself();
      }
      } else {
      /*
       * we can get a capless inode here if we replay an open file, the client fails to
       * reconnect it, but does REPLAY an open request (that adds it to the logseg).  AFAICS
       * it's ok for the client to replay an open on a file it doesn't have in it's cache
       * anymore.
       *
       * this makes the mds less sensitive to strict open_file consistency, although it does
       * make it easier to miss subtle problems.
       */
      dout(20) << "try_to_expire not requeueing and delisting capless file " << *in << dendl;
      in->item_open_file.remove_myself();
      }
    }
    if (le) {
      if (!gather) gather = new C_Gather;
      mds->mdlog->submit_entry(le, gather->new_sub());
      dout(10) << "try_to_expire waiting for open files to rejournal" << dendl;
    }
  }

  // parent pointers on renamed dirs
  for (elist<CInode*>::iterator p = renamed_files.begin(); !p.end(); ++p) {
    CInode *in = *p;
    dout(10) << "try_to_expire waiting for dir parent pointer update on " << *in << dendl;
    assert(in->state_test(CInode::STATE_DIRTYPARENT));
    if (!gather) gather = new C_Gather;
    in->store_parent(gather->new_sub());
  }

  // slave updates
  for (elist<MDSlaveUpdate*>::iterator p = slave_updates.begin(member_offset(MDSlaveUpdate,
                                                           item));
       !p.end(); ++p) {
    MDSlaveUpdate *su = *p;
    dout(10) << "try_to_expire waiting on slave update " << su << dendl;
    assert(su->waiter == 0);
    if (!gather) gather = new C_Gather;
    su->waiter = gather->new_sub();
  }

  // idalloc
  if (inotablev > mds->inotable->get_committed_version()) {
    dout(10) << "try_to_expire saving inotable table, need " << inotablev
            << ", committed is " << mds->inotable->get_committed_version()
            << " (" << mds->inotable->get_committing_version() << ")"
            << dendl;
    if (!gather) gather = new C_Gather;
    mds->inotable->save(gather->new_sub(), inotablev);
  }

  // sessionmap
  if (sessionmapv > mds->sessionmap.committed) {
    dout(10) << "try_to_expire saving sessionmap, need " << sessionmapv 
            << ", committed is " << mds->sessionmap.committed
            << " (" << mds->sessionmap.committing << ")"
            << dendl;
    if (!gather) gather = new C_Gather;
    mds->sessionmap.save(gather->new_sub(), sessionmapv);
  }

  // pending commit atids
  for (map<int, hash_set<version_t> >::iterator p = pending_commit_tids.begin();
       p != pending_commit_tids.end();
       ++p) {
    MDSTableClient *client = mds->get_table_client(p->first);
    for (hash_set<version_t>::iterator q = p->second.begin();
       q != p->second.end();
       ++q) {
      if (!gather) gather = new C_Gather;
      dout(10) << "try_to_expire " << get_mdstable_name(p->first) << " transaction " << *q 
             << " pending commit (not yet acked), waiting" << dendl;
      assert(!client->has_committed(*q));
      client->wait_for_ack(*q, gather->new_sub());
    }
  }
  
  // table servers
  for (map<int, version_t>::iterator p = tablev.begin();
       p != tablev.end();
       p++) {
    MDSTableServer *server = mds->get_table_server(p->first);
    if (p->second > server->get_committed_version()) {
      dout(10) << "try_to_expire waiting for " << get_mdstable_name(p->first) 
             << " to save, need " << p->second << dendl;
      if (!gather) gather = new C_Gather;
      server->save(gather->new_sub());
    }
  }

  // truncating
  for (set<CInode*>::iterator p = truncating_inodes.begin();
       p != truncating_inodes.end();
       p++) {
    dout(10) << "try_to_expire waiting for truncate of " << **p << dendl;
    if (!gather) gather = new C_Gather;
    (*p)->add_waiter(CInode::WAIT_TRUNC, gather->new_sub());
  }
  
  // FIXME client requests...?
  // audit handling of anchor transactions?

  // once we are otherwise trimmable, make sure journal is fully safe on disk.
  if (!gather) {
    if (!trimmable_at)
      trimmable_at = mds->mdlog->get_write_pos();

    if (trimmable_at <= mds->mdlog->get_safe_pos()) {
      dout(6) << "LogSegment(" << offset << ").try_to_expire trimmable at " << trimmable_at
            << " <= " << mds->mdlog->get_safe_pos() << dendl;
    } else {
      dout(6) << "LogSegment(" << offset << ").try_to_expire trimmable at " << trimmable_at
            << " > " << mds->mdlog->get_safe_pos()
            << ", waiting for safe journal flush" << dendl;
      if (!gather) gather = new C_Gather;
      mds->mdlog->wait_for_safe(gather->new_sub());
      mds->mdlog->flush();
    }
  }

  if (gather) {
    dout(6) << "LogSegment(" << offset << ").try_to_expire waiting" << dendl;
    assert(!gather->empty());
    mds->mdlog->flush();
  } else {
    dout(6) << "LogSegment(" << offset << ").try_to_expire success" << dendl;
  }
  return gather;
}


#undef DOUT_COND
#define DOUT_COND(l) l<=g_conf.debug_mds || l <= g_conf.debug_mds_log


// -----------------------
// EString

void EString::replay(MDS *mds)
{
  dout(10) << "EString.replay " << event << dendl; 
}



// -----------------------
// EMetaBlob

EMetaBlob::EMetaBlob(MDLog *mdlog) : root(NULL),
                             opened_ino(0),
                             inotablev(0), sessionmapv(0),
                             allocated_ino(0),
                             last_subtree_map(mdlog ? mdlog->get_last_segment_offset() : 0),
                             my_offset(mdlog ? mdlog->get_write_pos() : 0) //, _segment(0)
{ }

void EMetaBlob::update_segment(LogSegment *ls)
{
  // atids?
  //for (list<version_t>::iterator p = atids.begin(); p != atids.end(); ++p)
  //  ls->pending_commit_atids[*p] = ls;
  // -> handled directly by AnchorClient

  // dirty inode mtimes
  // -> handled directly by Server.cc, replay()

  // alloc table update?
  if (inotablev)
    ls->inotablev = inotablev;
  if (sessionmapv)
    ls->sessionmapv = sessionmapv;

  // truncated inodes
  // -> handled directly by Server.cc

  // client requests
  //  note the newest request per client
  //if (!client_reqs.empty())
    //    ls->last_client_tid[client_reqs.rbegin()->client] = client_reqs.rbegin()->tid);
}

void EMetaBlob::replay(MDS *mds, LogSegment *logseg)
{
  dout(10) << "EMetaBlob.replay " << lump_map.size() << " dirlumps" << dendl;

  assert(logseg);

  if (root) {
    CInode *in = mds->mdcache->get_inode(root->inode.ino);
    bool isnew = in ? false:true;
    if (!in)
      in = new CInode(mds->mdcache, true);
    in->inode = root->inode;
    in->xattrs = root->xattrs;
    if (in->inode.is_dir()) {
      in->dirfragtree = root->dirfragtree;
      /*
       * we can do this before linking hte inode bc the split_at would
       * be a no-op.. we have no children (namely open snaprealms) to
       * divy up 
       */
      in->decode_snap_blob(root->snapbl);  
    }
    if (in->inode.is_symlink()) in->symlink = root->symlink;
    if (isnew)
      mds->mdcache->add_inode(in);
    if (root->dirty) in->_mark_dirty(logseg);
    dout(10) << "EMetaBlob.replay " << (isnew ? " added root ":" updated root ") << *in << dendl;    
  }

  // walk through my dirs (in order!)
  for (list<dirfrag_t>::iterator lp = lump_order.begin();
       lp != lump_order.end();
       ++lp) {
    dout(10) << "EMetaBlob.replay dir " << *lp << dendl;
    dirlump &lump = lump_map[*lp];

    // the dir 
    CDir *dir = mds->mdcache->get_dirfrag(*lp);
    if (!dir) {
      // hmm.  do i have the inode?
      CInode *diri = mds->mdcache->get_inode((*lp).ino);
      if (!diri) {
      if (MDS_INO_IS_BASE(lp->ino)) {
        diri = mds->mdcache->create_system_inode(lp->ino, S_IFDIR|0755);
        dout(10) << "EMetaBlob.replay created base " << *diri << dendl;
      } else {
        dout(0) << "EMetaBlob.replay missing dir ino  " << (*lp).ino << dendl;
        assert(0);
      }
      }

      // create the dirfrag
      dir = diri->get_or_open_dirfrag(mds->mdcache, (*lp).frag);

      if (MDS_INO_IS_BASE(lp->ino))
      mds->mdcache->adjust_subtree_auth(dir, CDIR_AUTH_UNKNOWN);

      dout(10) << "EMetaBlob.replay added dir " << *dir << dendl;  
    }
    dir->set_version( lump.fnode.version );
    dir->fnode = lump.fnode;

    if (lump.is_dirty()) {
      dir->_mark_dirty(logseg);
      dir->get_inode()->filelock.mark_dirty();
      dir->get_inode()->nestlock.mark_dirty();

      if (!(dir->fnode.rstat == dir->fnode.accounted_rstat) ||
        !(dir->fnode.fragstat == dir->fnode.accounted_fragstat)) {
      dout(10) << "EMetaBlob.replay      dirty nestinfo on " << *dir << dendl;
      mds->locker->mark_updated_scatterlock(&dir->inode->nestlock);
      logseg->dirty_dirfrag_nest.push_back(&dir->inode->item_dirty_dirfrag_nest);
      } else {
      dout(10) << "EMetaBlob.replay      clean nestinfo on " << *dir << dendl;
      }
    }
    if (lump.is_new())
      dir->mark_new(logseg);
    if (lump.is_complete())
      dir->mark_complete();
    
    dout(10) << "EMetaBlob.replay updated dir " << *dir << dendl;  

    // decode bits
    lump._decode_bits();

    // full dentry+inode pairs
    for (list<fullbit>::iterator p = lump.get_dfull().begin();
       p != lump.get_dfull().end();
       p++) {
      CDentry *dn = dir->lookup_exact_snap(p->dn, p->dnlast);
      if (!dn) {
      dn = dir->add_null_dentry(p->dn, p->dnfirst, p->dnlast);
      dn->set_version(p->dnv);
      if (p->dirty) dn->_mark_dirty(logseg);
      dout(10) << "EMetaBlob.replay added " << *dn << dendl;
      } else {
      dn->set_version(p->dnv);
      if (p->dirty) dn->_mark_dirty(logseg);
      dout(10) << "EMetaBlob.replay for [" << p->dnfirst << "," << p->dnlast << "] had " << *dn << dendl;
      dn->first = p->dnfirst;
      assert(dn->last == p->dnlast);
      }

      CInode *in = mds->mdcache->get_inode(p->inode.ino, p->dnlast);
      if (!in) {
      in = new CInode(mds->mdcache, true, p->dnfirst, p->dnlast);
      in->inode = p->inode;
      in->xattrs = p->xattrs;
      if (in->inode.is_dir()) {
        in->dirfragtree = p->dirfragtree;
        /*
         * we can do this before linking hte inode bc the split_at would
         * be a no-op.. we have no children (namely open snaprealms) to
         * divy up 
         */
        in->decode_snap_blob(p->snapbl);  
      }
      if (in->inode.is_symlink()) in->symlink = p->symlink;
      mds->mdcache->add_inode(in);
      if (!dn->get_linkage()->is_null()) {
        if (dn->get_linkage()->is_primary()) {
          CInode *old_in = dn->get_linkage()->get_inode();
          stringstream ss;
          ss << "EMetaBlob.replay FIXME had dentry linked to wrong inode " << *dn
              << " " << *old_in
              << " should be " << p->inode.ino;
          dout(-10) << ss << dendl;
          mds->logclient.log(LOG_WARN, ss);
          dir->unlink_inode(dn);
          mds->mdcache->remove_inode(old_in);

          //assert(0); // hrm!  fallout from sloppy unlink?  or?  hmmm FIXME investigate further
        }
      }
      dir->link_primary_inode(dn, in);
      if (p->dirty) in->_mark_dirty(logseg);
      dout(10) << "EMetaBlob.replay added " << *in << dendl;
      } else {
      if (dn->get_linkage()->get_inode() != in && in->get_parent_dn()) {
        dout(10) << "EMetaBlob.replay unlinking " << *in << dendl;
        in->get_parent_dn()->get_dir()->unlink_inode(in->get_parent_dn());
      }
      if (in->get_parent_dn() && in->inode.anchored != p->inode.anchored)
        in->get_parent_dn()->adjust_nested_anchors( (int)p->inode.anchored - (int)in->inode.anchored );
      in->inode = p->inode;
      in->xattrs = p->xattrs;
      if (in->inode.is_dir()) {
        in->dirfragtree = p->dirfragtree;
        in->decode_snap_blob(p->snapbl);
      }
      if (in->inode.is_symlink()) in->symlink = p->symlink;
      if (p->dirty) in->_mark_dirty(logseg);
      if (dn->get_linkage()->get_inode() != in) {
        if (!dn->get_linkage()->is_null())  // note: might be remote.  as with stray reintegration.
          dir->unlink_inode(dn);
        dir->link_primary_inode(dn, in);
        dout(10) << "EMetaBlob.replay linked " << *in << dendl;
      } else {
        dout(10) << "EMetaBlob.replay for [" << p->dnfirst << "," << p->dnlast << "] had " << *in << dendl;
      }
      in->first = p->dnfirst;
      }
    }

    // remote dentries
    for (list<remotebit>::iterator p = lump.get_dremote().begin();
       p != lump.get_dremote().end();
       p++) {
      CDentry *dn = dir->lookup_exact_snap(p->dn, p->dnlast);
      if (!dn) {
      dn = dir->add_remote_dentry(p->dn, p->ino, p->d_type, p->dnfirst, p->dnlast);
      dn->set_version(p->dnv);
      if (p->dirty) dn->_mark_dirty(logseg);
      dout(10) << "EMetaBlob.replay added " << *dn << dendl;
      } else {
      if (!dn->get_linkage()->is_null()) {
        dout(10) << "EMetaBlob.replay unlinking " << *dn << dendl;
        dir->unlink_inode(dn);
      }
      dir->link_remote_inode(dn, p->ino, p->d_type);
      dn->set_version(p->dnv);
      if (p->dirty) dn->_mark_dirty(logseg);
      dout(10) << "EMetaBlob.replay for [" << p->dnfirst << "," << p->dnlast << "] had " << *dn << dendl;
      dn->first = p->dnfirst;
      assert(dn->last == p->dnlast);
      }
    }

    // null dentries
    for (list<nullbit>::iterator p = lump.get_dnull().begin();
       p != lump.get_dnull().end();
       p++) {
      CDentry *dn = dir->lookup_exact_snap(p->dn, p->dnlast);
      if (!dn) {
      dn = dir->add_null_dentry(p->dn, p->dnfirst, p->dnlast);
      dn->set_version(p->dnv);
      if (p->dirty) dn->_mark_dirty(logseg);
      dout(10) << "EMetaBlob.replay added " << *dn << dendl;
      } else {
      dn->first = p->dnfirst;
      if (!dn->get_linkage()->is_null()) {
        dout(10) << "EMetaBlob.replay unlinking " << *dn << dendl;
        dir->unlink_inode(dn);
      }
      dn->set_version(p->dnv);
      if (p->dirty) dn->_mark_dirty(logseg);
      dout(10) << "EMetaBlob.replay had " << *dn << dendl;
      assert(dn->last == p->dnlast);
      }
    }
  }

  // table client transactions
  for (list<pair<__u8,version_t> >::iterator p = table_tids.begin();
       p != table_tids.end();
       ++p) {
    dout(10) << "EMetaBlob.replay noting " << get_mdstable_name(p->first)
           << " transaction " << p->second << dendl;
    MDSTableClient *client = mds->get_table_client(p->first);
    client->got_journaled_agree(p->second, logseg);
  }

  // opened ino?
  if (opened_ino) {
    CInode *in = mds->mdcache->get_inode(opened_ino);
    assert(in);
    dout(10) << "EMetaBlob.replay noting opened inode " << *in << dendl;
    logseg->open_files.push_back(&in->item_open_file);
  }

  // allocated_inos
  if (inotablev) {
    if (mds->inotable->get_version() >= inotablev) {
      dout(10) << "EMetaBlob.replay inotable tablev " << inotablev
             << " <= table " << mds->inotable->get_version() << dendl;
    } else {
      dout(10) << "EMetaBlob.replay inotable v " << inotablev
             << " - 1 == table " << mds->inotable->get_version()
             << " allocated+used " << allocated_ino
             << " prealloc " << preallocated_inos
             << dendl;
      if (allocated_ino)
      mds->inotable->replay_alloc_id(allocated_ino);
      if (preallocated_inos.size())
      mds->inotable->replay_alloc_ids(preallocated_inos);

      // [repair bad inotable updates]
      if (inotablev > mds->inotable->get_version()) {
      stringstream ss;
      ss << "journal replay inotablev mismatch " << mds->inotable->get_version() << " -> " << inotablev;
      mds->logclient.log(LOG_ERROR, ss);
      mds->inotable->force_replay_version(inotablev);
      }

      assert(inotablev == mds->inotable->get_version());
    }
  }
  if (sessionmapv) {
    if (mds->sessionmap.version >= sessionmapv) {
      dout(10) << "EMetaBlob.replay sessionmap v " << sessionmapv
             << " <= table " << mds->sessionmap.version << dendl;
    } else {
      dout(10) << "EMetaBlob.replay sessionmap v" << sessionmapv
             << " -(1|2) == table " << mds->sessionmap.version
             << " prealloc " << preallocated_inos
             << " used " << used_preallocated_ino
             << dendl;
      Session *session = mds->sessionmap.get_session(client_name);
      assert(session);
      dout(20) << " (session prealloc " << session->prealloc_inos << ")" << dendl;
      if (used_preallocated_ino) {
      inodeno_t next = session->next_ino();
      inodeno_t i = session->take_ino(used_preallocated_ino);
      if (next != i) {
        stringstream ss;
        ss << " replayed op " << client_reqs << " used ino " << i << " but session next is " << next;
        mds->logclient.log(LOG_WARN, ss);
      }
      assert(i == used_preallocated_ino);
      session->used_inos.clear();
      mds->sessionmap.projected = ++mds->sessionmap.version;
      }
      if (preallocated_inos.size()) {
      session->prealloc_inos.insert(preallocated_inos);
      mds->sessionmap.projected = ++mds->sessionmap.version;
      }
      assert(sessionmapv == mds->sessionmap.version);
    }
  }

  // truncating inodes
  for (list<inodeno_t>::iterator p = truncate_start.begin();
       p != truncate_start.end();
       p++) {
    CInode *in = mds->mdcache->get_inode(*p);
    assert(in);
    mds->mdcache->add_recovered_truncate(in, logseg);
  }
  for (map<inodeno_t,uint64_t>::iterator p = truncate_finish.begin();
       p != truncate_finish.end();
       p++) {
    LogSegment *ls = mds->mdlog->get_segment(p->second);
    if (ls) {
      CInode *in = mds->mdcache->get_inode(p->first);
      assert(in);
      ls->truncating_inodes.erase(in);
    }
  }

  // destroyed inodes
  for (vector<inodeno_t>::iterator p = destroyed_inodes.begin();
       p != destroyed_inodes.end();
       p++) {
    CInode *in = mds->mdcache->get_inode(*p);
    if (in) {
      dout(10) << "EMetaBlob.replay destroyed " << *p << ", dropping " << *in << dendl;
      mds->mdcache->remove_inode(in);
    } else {
      dout(10) << "EMetaBlob.replay destroyed " << *p << ", not in cache" << dendl;
    }
  }

  // client requests
  for (list<pair<metareqid_t, uint64_t> >::iterator p = client_reqs.begin();
       p != client_reqs.end();
       ++p)
    if (p->first.name.is_client()) {
      dout(10) << "EMetaBlob.replay request " << p->first << " " << p->second << dendl;
      if (mds->sessionmap.have_session(p->first.name))
      mds->sessionmap.add_completed_request(p->first, p->second);
    }


  // update segment
  update_segment(logseg);
}

// -----------------------
// ESession

void ESession::update_segment()
{
  _segment->sessionmapv = cmapv;
  if (inos.size() && inotablev)
    _segment->inotablev = inotablev;
}

void ESession::replay(MDS *mds)
{
  if (mds->sessionmap.version >= cmapv) {
    dout(10) << "ESession.replay sessionmap " << mds->sessionmap.version 
           << " >= " << cmapv << ", noop" << dendl;
  } else {
    dout(10) << "ESession.replay sessionmap " << mds->sessionmap.version
           << " < " << cmapv << " " << (open ? "open":"close") << dendl;
    mds->sessionmap.projected = ++mds->sessionmap.version;
    assert(mds->sessionmap.version == cmapv);
    if (open) {
      Session *session = mds->sessionmap.get_or_add_session(client_inst);
      if (session->is_closed())
      mds->sessionmap.set_state(session, Session::STATE_OPEN);
      dout(10) << "session " << session << " state " << session->get_state() << dendl;
    } else {
      Session *session = mds->sessionmap.get_session(client_inst.name);
      if (session->is_closed())
      mds->sessionmap.remove_session(session);
    }
  }

  if (inos.size() && inotablev) {
    if (mds->inotable->get_version() >= inotablev) {
      dout(10) << "ESession.replay inotable " << mds->inotable->get_version()
             << " >= " << inotablev << ", noop" << dendl;
    } else {
      dout(10) << "ESession.replay inotable " << mds->inotable->get_version()
             << " < " << inotablev << " " << (open ? "add":"remove") << dendl;
      assert(!open);  // for now
      mds->inotable->replay_release_ids(inos);
      assert(mds->inotable->get_version() == inotablev);
    }
  }

  update_segment();
}

void ESessions::update_segment()
{
  _segment->sessionmapv = cmapv;
}

void ESessions::replay(MDS *mds)
{
  if (mds->sessionmap.version >= cmapv) {
    dout(10) << "ESessions.replay sessionmap " << mds->sessionmap.version
           << " >= " << cmapv << ", noop" << dendl;
  } else {
    dout(10) << "ESessions.replay sessionmap " << mds->sessionmap.version
           << " < " << cmapv << dendl;
    mds->sessionmap.open_sessions(client_map);
    assert(mds->sessionmap.version == cmapv);
    mds->sessionmap.projected = mds->sessionmap.version;
  }
  update_segment();
}




void ETableServer::update_segment()
{
  _segment->tablev[table] = version;
}

void ETableServer::replay(MDS *mds)
{
  MDSTableServer *server = mds->get_table_server(table);
  if (server->get_version() >= version) {
    dout(10) << "ETableServer.replay " << get_mdstable_name(table)
           << " " << get_mdstableserver_opname(op)
           << " event " << version
           << " <= table " << server->get_version() << dendl;
    return;
  }
  
  dout(10) << " ETableServer.replay " << get_mdstable_name(table)
         << " " << get_mdstableserver_opname(op)
         << " event " << version << " - 1 == table " << server->get_version() << dendl;
  assert(version-1 == server->get_version());

  switch (op) {
  case TABLESERVER_OP_PREPARE:
    server->_prepare(mutation, reqid, bymds);
    server->_note_prepare(bymds, reqid);
    break;
  case TABLESERVER_OP_COMMIT:
    server->_commit(tid);
    server->_note_commit(tid);
    break;
  case TABLESERVER_OP_ROLLBACK:
    server->_rollback(tid);
    server->_note_rollback(tid);
    break;
  case TABLESERVER_OP_SERVER_UPDATE:
    server->_server_update(mutation);
    break;
  default:
    assert(0);
  }
  
  assert(version == server->get_version());
  update_segment();
}


void ETableClient::replay(MDS *mds)
{
  dout(10) << " ETableClient.replay " << get_mdstable_name(table)
         << " op " << get_mdstableserver_opname(op)
         << " tid " << tid << dendl;
    
  MDSTableClient *client = mds->get_table_client(table);
  assert(op == TABLESERVER_OP_ACK);
  client->got_journaled_ack(tid);
}


// -----------------------
// ESnap
/*
void ESnap::update_segment()
{
  _segment->tablev[TABLE_SNAP] = version;
}

void ESnap::replay(MDS *mds)
{
  if (mds->snaptable->get_version() >= version) {
    dout(10) << "ESnap.replay event " << version
           << " <= table " << mds->snaptable->get_version() << dendl;
    return;
  } 
  
  dout(10) << " ESnap.replay event " << version
         << " - 1 == table " << mds->snaptable->get_version() << dendl;
  assert(version-1 == mds->snaptable->get_version());

  if (create) {
    version_t v;
    snapid_t s = mds->snaptable->create(snap.dirino, snap.name, snap.stamp, &v);
    assert(s == snap.snapid);
  } else {
    mds->snaptable->remove(snap.snapid);
  }

  assert(version == mds->snaptable->get_version());
}
*/



// -----------------------
// EUpdate

void EUpdate::update_segment()
{
  metablob.update_segment(_segment);

  if (had_slaves)
    _segment->uncommitted_masters.insert(reqid);
}

void EUpdate::replay(MDS *mds)
{
  metablob.replay(mds, _segment);

  if (had_slaves) {
    dout(10) << "EUpdate.replay " << reqid << " had slaves, expecting a matching ECommitted" << dendl;
    _segment->uncommitted_masters.insert(reqid);
    set<int> slaves;
    mds->mdcache->add_uncommitted_master(reqid, _segment, slaves);
  }
}


// ------------------------
// EOpen

void EOpen::update_segment()
{
  // ??
}

void EOpen::replay(MDS *mds)
{
  dout(10) << "EOpen.replay " << dendl;
  metablob.replay(mds, _segment);

  // note which segments inodes belong to, so we don't have to start rejournaling them
  for (vector<inodeno_t>::iterator p = inos.begin();
       p != inos.end();
       p++) {
    CInode *in = mds->mdcache->get_inode(*p);
    if (!in) {
      dout(0) << "EOpen.replay ino " << *p << " not in metablob" << dendl;
      assert(in);
    }
    _segment->open_files.push_back(&in->item_open_file);
  }
}


// -----------------------
// ECommitted

void ECommitted::replay(MDS *mds)
{
  if (mds->mdcache->uncommitted_masters.count(reqid)) {
    dout(10) << "ECommitted.replay " << reqid << dendl;
    mds->mdcache->uncommitted_masters[reqid].ls->uncommitted_masters.erase(reqid);
    mds->mdcache->uncommitted_masters.erase(reqid);
  } else {
    dout(10) << "ECommitted.replay " << reqid << " -- didn't see original op" << dendl;
  }
}



// -----------------------
// ESlaveUpdate

void ESlaveUpdate::replay(MDS *mds)
{
  switch (op) {
  case ESlaveUpdate::OP_PREPARE:
    dout(10) << "ESlaveUpdate.replay prepare " << reqid << " for mds" << master 
           << ": applying commit, saving rollback info" << dendl;
    assert(mds->mdcache->uncommitted_slave_updates[master].count(reqid) == 0);
    commit.replay(mds, _segment);
    mds->mdcache->uncommitted_slave_updates[master][reqid] = 
      new MDSlaveUpdate(origop, rollback, _segment->slave_updates);
    break;

  case ESlaveUpdate::OP_COMMIT:
    if (mds->mdcache->uncommitted_slave_updates[master].count(reqid)) {
      dout(10) << "ESlaveUpdate.replay commit " << reqid << " for mds" << master << dendl;
      delete mds->mdcache->uncommitted_slave_updates[master][reqid];
      mds->mdcache->uncommitted_slave_updates[master].erase(reqid);
      if (mds->mdcache->uncommitted_slave_updates[master].empty())
      mds->mdcache->uncommitted_slave_updates.erase(master);
    } else {
      dout(10) << "ESlaveUpdate.replay commit " << reqid << " for mds" << master 
             << ": ignoring, no previously saved prepare" << dendl;
    }
    break;

  case ESlaveUpdate::OP_ROLLBACK:
    if (mds->mdcache->uncommitted_slave_updates[master].count(reqid)) {
      dout(10) << "ESlaveUpdate.replay abort " << reqid << " for mds" << master
             << ": applying rollback commit blob" << dendl;
      assert(mds->mdcache->uncommitted_slave_updates[master].count(reqid));
      commit.replay(mds, _segment);
      delete mds->mdcache->uncommitted_slave_updates[master][reqid];
      mds->mdcache->uncommitted_slave_updates[master].erase(reqid);
      if (mds->mdcache->uncommitted_slave_updates[master].empty())
      mds->mdcache->uncommitted_slave_updates.erase(master);
    } else {
      dout(10) << "ESlaveUpdate.replay abort " << reqid << " for mds" << master 
             << ": ignoring, no previously saved prepare" << dendl;
    }
    break;

  default:
    assert(0);
  }
}


// -----------------------
// ESubtreeMap

void ESubtreeMap::replay(MDS *mds) 
{
  // suck up the subtree map?
  if (mds->mdcache->is_subtrees()) {
    dout(10) << "ESubtreeMap.replay -- ignoring, already have import map" << dendl;
    return;
  }

  dout(10) << "ESubtreeMap.replay -- reconstructing (auth) subtree spanning tree" << dendl;
  
  // first, stick the spanning tree in my cache
  //metablob.print(*_dout);
  metablob.replay(mds, _segment);
  
  // restore import/export maps
  for (map<dirfrag_t, vector<dirfrag_t> >::iterator p = subtrees.begin();
       p != subtrees.end();
       ++p) {
    CDir *dir = mds->mdcache->get_dirfrag(p->first);
    mds->mdcache->adjust_bounded_subtree_auth(dir, p->second, mds->get_nodeid());
  }
  
  mds->mdcache->show_subtrees();
}



// -----------------------
// EFragment

void EFragment::replay(MDS *mds)
{
  dout(10) << "EFragment.replay " << ino << " " << basefrag << " by " << bits << dendl;
  
  CInode *in = mds->mdcache->get_inode(ino);
  assert(in);

  list<CDir*> resultfrags;
  list<Context*> waiters;
  mds->mdcache->adjust_dir_fragments(in, basefrag, bits, resultfrags, waiters, true);

  metablob.replay(mds, _segment);
}






// =========================================================================

// -----------------------
// EExport

void EExport::replay(MDS *mds)
{
  dout(10) << "EExport.replay " << base << dendl;
  metablob.replay(mds, _segment);
  
  CDir *dir = mds->mdcache->get_dirfrag(base);
  assert(dir);
  
  set<CDir*> realbounds;
  for (set<dirfrag_t>::iterator p = bounds.begin();
       p != bounds.end();
       ++p) {
    CDir *bd = mds->mdcache->get_dirfrag(*p);
    assert(bd);
    realbounds.insert(bd);
  }

  // adjust auth away
  mds->mdcache->adjust_bounded_subtree_auth(dir, realbounds, pair<int,int>(CDIR_AUTH_UNKNOWN, CDIR_AUTH_UNKNOWN));
  mds->mdcache->try_subtree_merge(dir);
}


// -----------------------
// EImportStart

void EImportStart::update_segment()
{
  _segment->sessionmapv = cmapv;
}

void EImportStart::replay(MDS *mds)
{
  dout(10) << "EImportStart.replay " << base << dendl;
  //metablob.print(*_dout);
  metablob.replay(mds, _segment);

  // put in ambiguous import list
  mds->mdcache->add_ambiguous_import(base, bounds);

  // open client sessions?
  if (mds->sessionmap.version >= cmapv) {
    dout(10) << "EImportStart.replay sessionmap " << mds->sessionmap.version 
           << " >= " << cmapv << ", noop" << dendl;
  } else {
    dout(10) << "EImportStart.replay sessionmap " << mds->sessionmap.version 
           << " < " << cmapv << dendl;
    map<client_t,entity_inst_t> cm;
    bufferlist::iterator blp = client_map.begin();
    ::decode(cm, blp);
    mds->sessionmap.open_sessions(cm);
    assert(mds->sessionmap.version == cmapv);
    mds->sessionmap.projected = mds->sessionmap.version;
  }
  update_segment();
}

// -----------------------
// EImportFinish

void EImportFinish::replay(MDS *mds)
{
  if (mds->mdcache->have_ambiguous_import(base)) {
    dout(10) << "EImportFinish.replay " << base << " success=" << success << dendl;
    if (success) 
      mds->mdcache->finish_ambiguous_import(base);
    else
      mds->mdcache->cancel_ambiguous_import(base);
  } else {
    dout(10) << "EImportFinish.replay " << base << " success=" << success
           << ", predates my subtree_map start point, ignoring" 
           << dendl;
    // verify that?
  }
}






Generated by  Doxygen 1.6.0   Back to index