Cassandraの読み込み
読み込み側
- org.apache.cassandra.cli.CliClient
- org.apache.cassandra.avro.CassandraServer
- get関数=>multigetInternal=>multigetColumns=>readColumnFamilyからStorageProxy内のreadProtocol(commands, thriftConsistencyLevel(consistency))を呼ぶ
- org.apache.cassandra.service.StorageProxy
- readProtocol
- consistency levelをcheck.
- levelがONEなら・・
- commandごとにその取得データの存在するendpointsをStorageServiceから取得
- それらのendpointsの中にlocalアドレスが含まれるか否かで、commandをlocalCommandsとremoteCommandsに分ける
- levelがONE以外なら・・
- strongReadを呼ぶ
- levelがONEなら・・
- consistency levelをcheck.
- weakReadLocal(localCommands)
- weakReadLocalCallableによりCallableを作り、ThreadPoolExecutorにsubmitしてfutureを生成
- futureごとにrowを取得して、commandに応じたrowsにまとめて返す
- weakReadRemote(remoteCommands)
- StorageServiceのfindSuitableEndpoint関数によりcommandデータに応じたendpointを取得
- findSuitableEndPointはendpointsのリストをproximity(距離的な近さで同じaddressかどうか=>同じRackかどうか=>同じデータセンタかどうかという順でcompare)でソートして、最も近いliveなendpointを返す
- commandからmessageを作り(localアドレスなど)、ヘッダを付加して(message,endpoints)のペアをMessagingServiceのsendRR関数により送信。結果をiars(IAsyncResult型のリスト)に追加
- iarsの要素ごとにremoteノードと通信してbodyをget.
- bodyをdeserializeしてrowsを取り出して出力
- StorageServiceのfindSuitableEndpoint関数によりcommandデータに応じたendpointを取得
- strongRead
- N個のendpointを上と同じようにgetし、endpointListを作る
- endpointの数だけmessageを作る
- messageのうち1つを実際のdataを得るためにset. 残りは要約だけ得るためにset.(最適化の為に実際のデータは最も近いToken1ノードから。残りはそのデータとのhash値計算を行う)
- 全てのmessageをsendRR(quorumResponseHandlerを付加)
- 最低Xノードからのレスポンスをまつ。(X<=NかつX内にdata nodeを含む)
- もし、digestとdataがmatchしたらdataを返す
- 違う場合はbackgroundでStorageService.doConsistencyCheckを行い、不整合のcheckを行う(read repair)
- readProtocol
- org.apache.cassandra.net.MessagingService
- sendRR
- sendOneWayによりendpointsに対してmessageを送信。
- 結果をIAsyncResultとして返す
- sendOneWay
- messageをserializeしてbufferを作り、endpointを元にconnection(TCP)を行い、bufferをwrite.
- sendRR
読み込み処理側
- org.apache.cassandra.db.ReadVerbHandler
- MessageDelivaryTaskからdoVerb関数が呼ばれる
- messageをdeserializeしてcommandを取得
- messageの種類(SliceFromReadCommand, SliceByNamesReadCommand)によってシリアライズ方法が異なる
- commandからtableを取得してそれから、rowをgetする
- rowからReadResponseを作り、serializeして送信元にsendOneWay.
- org.apache.cassandra.db.Table
- getRow
- columnFamilyStore.getColumnFamilyを使用してデータを取得する
- getRow
- org.apache.cassandra.db.ColumnFamilyStore
- DatabaseDescriptorを使って、table名とカラムファミリー名を元にそれに関連するファイル(data,filter,index)を全て取得=>files
- filesの各ファイルについて
- orphan(古くなったindex,filterファイル)を見つけ削除
- Dataファイルを圧縮順にソートしてssTableFilesにadd
- ssTableFilesの各ファイルからSSTableReaderオブジェクトを作り、sstablesにadd
- sstablesにはtable名とカラムファミリー名にマッチする様々なversionのファイルが入る
- SSTableTrackerの_ssTablesにsstablesをadd
- getColumnFamily
- クエリが一行のとき(supercolumnでないとき)
- cacheRowよりmemtableまたはsstableからfilter.keyに関する最新のcached(カラムファミリー)を取ってくる
- filter.getMemColumnIteratorによりmemtableにcachedを登録
- クエリが一行のとき(supercolumnでないとき)
- cacheRow
- SSTableTrackerからgetRowCacheによりcachedを取得
- もし、データがnullなら、getTopLevelColumns関数を呼びcachedを取得
- 取得したcachedをgetRowCacheにput
- getTopLevelColumns
- top-levelのカラムを問い合わせ、それらをindexでマージしたCF(カラムファミリ)を返す
- org.apache.cassandra.io.SSTableTracker
- add関数
- replace()を呼ぶ
- replace関数
- sstablesの各sstable(SSTableReaderオブジェクト)に対して
- getIndexPostionsによりindexに対してバイナリサーチを行い、行を特定
- 特定できたsstableのみをaddした新しいsstablesとして返す
- sstablesの各sstable(SSTableReaderオブジェクト)に対して
- add関数