Cassandraの書き込み

Commit Log=>MemTable

RowMutationの送信
  • org.apache.cassandra.avro.CassandraServer
    • commit logに書かれた内容をRowMutationとして適当なレプリカノードに送信する
    • StrageProxyからmutate関数を呼ぶ.
  • org.apache.cassandra.service.StorageProxy
    • mutate関数内でgetReplicationStrageを元にreplicaノードを決定
      • getReplicationStrageではHashMap.get(key.toString())でノード位置を取得
RowMutationの受信
  • org.apache.cassandra.db.RowMutationVerbHandler
    • RowMutationを受信
    • handlerはStorageServiceで登録されている
    • Table.open(rm.getTable()).apply(rm, bytes, true)によりMemTableに書き込み
MemTableの書き込み
  • org.apache.cassandra.db.Table
    • apply(RowMutation mutation, Object serializedMutation, boolean writeCommitLog)関数によりmemtableにrmの書き込みを行う
      • writeCommitLogがtrueの場合はmutationに関するfutureオブジェクトを使ってローカルログへ書き込み(Future future = CommitLog.instance().add(mutation, serializedMutation);)
      • カラムファミリーごとにColumnFamilyStoreを介してmemtableオブジェクトを作り、memtablesToFlushというHashMapに書き込み
      • memtablesToFlushがいっぱいならflushする
        • maybeSwitchMemTableを呼ぶ。

MemTable=>SSTable

  • MemTableが一杯になったらColumnFamilyStoreによる処理で非同期でSSTableに書き出される
  • org.apache.cassandra.db.ColumnFamilyStore
    • maybeSwitchMemtable関数により古いmemtableをswapする。
      • memTableのflushAndSignalを呼んでいる
    • flushAndSignal関数
      • cfs.addSSTable(writeSortedContents());=>ソートしてコンテンツを追加
      • cfs.getMemtablesPendingFlush().remove(Memtable.this);=>memtableから削除

SSTableの圧縮

非同期でSSTableを圧縮。ガベージコレクタにファントム参照オブジェクト(PhantomReference)を登録し、SSTableへの参照がなくなった時点で消去する。

  • org.apache.cassandra.db.ColumnFamilyStore
    • addSSTable関数内でCompactionManagersubmitMinorIfNeeded非同期関数を呼ぶ。
  • org.apache.cassandra.db.CompactionManager
    • submitMinorIfNeeded関数
      • sstableをsortしてから、doCompactionによりより古いデータに対して圧縮処理を行う