Cassandraの読み込み

読み込み側

  • org.apache.cassandra.cli.CliClient
    • executeGet関数内でクライアントからクエリを構文解析して
    • get関数を使ってCassandraServerにリクエス
  • org.apache.cassandra.avro.CassandraServer
    • get関数=>multigetInternal=>multigetColumns=>readColumnFamilyからStorageProxy内のreadProtocol(commands, thriftConsistencyLevel(consistency))を呼ぶ
  • org.apache.cassandra.service.StorageProxy
    • readProtocol
      • consistency levelをcheck.
        • levelがONEなら・・
          • commandごとにその取得データの存在するendpointsをStorageServiceから取得
          • それらのendpointsの中にlocalアドレスが含まれるか否かで、commandをlocalCommandsとremoteCommandsに分ける
        • levelがONE以外なら・・
          • strongReadを呼ぶ
    • weakReadLocal(localCommands)
      • weakReadLocalCallableによりCallableを作り、ThreadPoolExecutorにsubmitしてfutureを生成
      • futureごとにrowを取得して、commandに応じたrowsにまとめて返す
    • weakReadRemote(remoteCommands)
      • StorageServiceのfindSuitableEndpoint関数によりcommandデータに応じたendpointを取得
        • findSuitableEndPointはendpointsのリストをproximity(距離的な近さで同じaddressかどうか=>同じRackかどうか=>同じデータセンタかどうかという順でcompare)でソートして、最も近いliveなendpointを返す
      • commandからmessageを作り(localアドレスなど)、ヘッダを付加して(message,endpoints)のペアをMessagingServiceのsendRR関数により送信。結果をiars(IAsyncResult型のリスト)に追加
      • iarsの要素ごとにremoteノードと通信してbodyをget.
      • bodyをdeserializeしてrowsを取り出して出力
    • strongRead
      1. N個のendpointを上と同じようにgetし、endpointListを作る
      2. endpointの数だけmessageを作る
      3. messageのうち1つを実際のdataを得るためにset. 残りは要約だけ得るためにset.(最適化の為に実際のデータは最も近いToken1ノードから。残りはそのデータとのhash値計算を行う)
      4. 全てのmessageをsendRR(quorumResponseHandlerを付加)
      5. 最低Xノードからのレスポンスをまつ。(X<=NかつX内にdata nodeを含む)
      6. もし、digestとdataがmatchしたらdataを返す
      7. 違う場合はbackgroundでStorageService.doConsistencyCheckを行い、不整合のcheckを行う(read repair)
  • org.apache.cassandra.net.MessagingService
    • sendRR
      • sendOneWayによりendpointsに対してmessageを送信。
      • 結果をIAsyncResultとして返す
    • sendOneWay
      • messageをserializeしてbufferを作り、endpointを元にconnection(TCP)を行い、bufferをwrite.

読み込み処理側

  • org.apache.cassandra.db.ReadVerbHandler
    • MessageDelivaryTaskからdoVerb関数が呼ばれる
    • messageをdeserializeしてcommandを取得
      • messageの種類(SliceFromReadCommand, SliceByNamesReadCommand)によってシリアライズ方法が異なる
    • commandからtableを取得してそれから、rowをgetする
    • rowからReadResponseを作り、serializeして送信元にsendOneWay.
      • isDigestQueryがtrueの場合はReadResponse(ColumnFamily.digest(row.cf))によってReadResponseを作る
      • digestではMessageDigestクラスを使ってMD5valueハッシュ値を返す
  • org.apache.cassandra.db.Table
    • getRow
      • columnFamilyStore.getColumnFamilyを使用してデータを取得する
  • org.apache.cassandra.db.ColumnFamilyStore
    • DatabaseDescriptorを使って、table名とカラムファミリー名を元にそれに関連するファイル(data,filter,index)を全て取得=>files
    • filesの各ファイルについて
      • orphan(古くなったindex,filterファイル)を見つけ削除
      • Dataファイルを圧縮順にソートしてssTableFilesにadd
    • ssTableFilesの各ファイルからSSTableReaderオブジェクトを作り、sstablesにadd
    • sstablesにはtable名とカラムファミリー名にマッチする様々なversionのファイルが入る
    • SSTableTrackerの_ssTablesにsstablesをadd
    • getColumnFamily
      • クエリが一行のとき(supercolumnでないとき)
        • cacheRowよりmemtableまたはsstableからfilter.keyに関する最新のcached(カラムファミリー)を取ってくる
        • filter.getMemColumnIteratorによりmemtableにcachedを登録
    • cacheRow
      • SSTableTrackerからgetRowCacheによりcachedを取得
      • もし、データがnullなら、getTopLevelColumns関数を呼びcachedを取得
      • 取得したcachedをgetRowCacheにput
    • getTopLevelColumns
      • top-levelのカラムを問い合わせ、それらをindexでマージしたCF(カラムファミリ)を返す
      1. ColumnIterator型のリストiteratorsを用意
      2. current Memtableをlockしてfilter.getMemColumnIteratorでiterを取得し、そのCFをreturnCFに登録。iterをiteratorsに登録.その後unlock
      3. その他のmemtableへの問い合わせを上記と同じように行い、iterをiteratorsに登録(lockはしない)
      4. sstableへの問い合わせを行い、上記と同じように行い、iterをiteratorsに登録
      • =>Iteratorを使うことでフィルタが読み込みを完了したらすぐに停止が可能となり、必要ないカラムを読み込まなくて良い。
        • 潜在的に複数のSSTableのバージョンからカラムをマージする必要があるため、読み込みイテレータはReducingIteratorを使用して組み合わされます。まだ組み合わされていないカラムのイテレータをインプットとして受け取り、組み合わされたバージョンをアウトプットとして生成します。
  • org.apache.cassandra.io.SSTableTracker
    • add関数
      • replace()を呼ぶ
    • replace関数
      • sstablesの各sstable(SSTableReaderオブジェクト)に対して
      • 特定できたsstableのみをaddした新しいsstablesとして返す