Check synchronized is necessity
project directory structure
src/main
└── scala
├── master
│ ├── server
│ │ └── MasterServerImpl.scala
│ └── MasterMain.scala
└── worker
├── io
│ ├── Data.scala
│ ├── DataProcessor.scala
│ └── ...
├── server
│ └── WorkerServerImpl.scala
└── WorkerMain.scala
object Master extends App {
}
class MasterServerImpl extends ... {
/**
* 워커들의 ip, port를 저장
* ㄴ map으로 관리하면 dead 이후 같은 ip로 register 되었을 때 update 편함!
*/
var workerInfosMap: Map[Ip, Port]
/** register 마다 버전 1 up */
var version
/**
* 워커 ip를 key로, sample data를 value로 갖는 map
* ㄴ map으로 관리하면 wating request 개수가 아닌, ip(key) 개수로 sort
*/
var sampleMap: Map[Ip, KeyData]
/**
* 각각의 partition에서의 (최소, 최대) key List
* [(min1, max1), (min2, max2), ...] (max1 < min2, ...) 인 상황에서
* 각각의 partition range는 sort된 ip에 대응됨
*/
lazy val partitionRange: List[(str, str)] = { ... }
var waitingRequestForRegister
var waitingRequestForPartitionRange: Map[Ip, WaitingReq]
var waitingRequestForUpdatedWorkerInfo
def register(ip, port) -> (workerInfosMap, version) // 워커 노드의 ip, port 등록
/**
* Client : partition 범위 획득
* Server : sampleMap += data, waitingRequestForPartitionRange의 key 개수가
* worker 수와 같으면 partitionRange를 reply
*/
def getPartitionRange(data) -> partitionRange
/**
* Client : partition 요청했는데 응답 없음. 버전 이거 맞음? 업뎃 된거 있음?
* Server : version 동일 시 아직 업뎃 x -> waitingRequestForUpdatedWorkerInfo에 추가
* 다를 시 업뎃 o -> update된 정보 제공
* -> 그냥 register에 병합하는 것으로 최종 결정
*/
def getUpdatedWorkerInfo(version) -> (updatedWorkerInfo, Optional: version)
def canShutdownWorkerServer()
}
syntax = "proto3";
package com.master.server;
import "google/protobuf/empty.proto";
message WorkerInfo {
string ip = 1;
uint32 port = 2;
}
message Version {
uint32 version = 1;
}
message RegisterReply {
repeated WorkerInfo workerInfos = 1;
Version version = 2;
}
message SampleKeyData {
message Key {
bytes keyDatum = 1;
}
repeated Key keyData = 1;
}
message PartitionRanges {
// [startKey, endKey) === (startKey <= key < endKey)
message PartitionRange {
bytes startKey = 1;
bytes endKey = 2;
}
repeated PartitionRange PartitionRanges = 1;
}
message CanShutdownWorkerServerReply {
bool canShutdownWorkerServer = 1;
}
service MasterServer {
rpc register (WorkerInfo) returns (RegisterReply);
rpc getPartitionRange (SampleKeyData) returns (PartitionRanges);
rpc getUpdatedWorkerInfo (Version) returns (WorkerInfo);
rpc canShutdownWorkerServer (google.protobuf.Empty) returns (CanShutdownWorkerServerReply);
}
def sortAndPartition(inputDataPath, outputDataPath, partitionRange)
def merge(dataPaths, outputDataPath)
object Worker extends App {
... (procedure)
}
class WorkerServerImpl extends ... {
/**
* worker2가 worker 1 서버에게 getPartitionData를 실행해서
* worker 1 서버가 이를 waiting request 에 삽입 & partition이 아직 안끝난 상황.
* 근데 worker 1이 kill 당함.
* 그때 worker 2가 알아차릴 수 있는가?
* 이를 timeout 관리 없이 이를 알아차릴 수 있으면 good
* 근데 아닐 수도 있고, 그럴 위험성이 존재하는 상황이기에
* t초마다 계속 getPartitionData를 실행해야 하지 않은가?
*/
var isPartitionDone: Boolean // partition 단계가 다 끝났으면 true, 안 끝났으면 false
var waitingRequestForGetPartitionData
/**
* worker n이 partitioning을 끝냈을 때
* worker n이 worker n server의 (localhost)
* setPartitionDone을 실행해 worker server의 isPartitionDone을 true로 변경
*/
def setPartitionDone()
/**
* Client : 셔플링 도중 자신의 ip에 해당하는 parition 요구하기
* Server : isPartitoinDone이 true이면, 해당 partition 전송
* false이면, waitingRequestForGetPartitionData에 request 보관
*/
def getPartitionData(ip)
def isAlive()
// [서버 닫히는 요청 다 확인 후의 kill] 대비용 -> master client 존재 이유 }