Check synchronized is necessity

project directory structure

src/main
└── scala
    ├── master
    │   ├── server
    │   │   └── MasterServerImpl.scala
    │   └── MasterMain.scala
    └── worker
        ├── io
        │   ├── Data.scala
        │   ├── DataProcessor.scala
        │   └── ...
        ├── server
        │   └── WorkerServerImpl.scala
        └── WorkerMain.scala

Master

object Master extends App {
	
}

Master Server

class MasterServerImpl extends ... {
	/**
	 * 워커들의 ip, port를 저장
	 * ㄴ map으로 관리하면 dead 이후 같은 ip로 register 되었을 때 update 편함!
	 */
	var workerInfosMap: Map[Ip, Port]
	/** register 마다 버전 1 up */
	var version
	/**
	 * 워커 ip를 key로, sample data를 value로 갖는 map
	 * ㄴ map으로 관리하면 wating request 개수가 아닌, ip(key) 개수로 sort 
	 */
	var sampleMap: Map[Ip, KeyData]
	/**
	 * 각각의 partition에서의 (최소, 최대) key List
	 * [(min1, max1), (min2, max2), ...]  (max1 < min2, ...) 인 상황에서
	 * 각각의 partition range는 sort된 ip에 대응됨
	 */
	lazy val partitionRange: List[(str, str)] = { ... }
	
	var waitingRequestForRegister
	var waitingRequestForPartitionRange: Map[Ip, WaitingReq]
	var waitingRequestForUpdatedWorkerInfo

	def register(ip, port) -> (workerInfosMap, version) // 워커 노드의 ip, port 등록

	/**
   * Client : partition 범위 획득
	 * Server : sampleMap += data, waitingRequestForPartitionRange의 key 개수가 
	 *										 worker 수와 같으면 partitionRange를 reply
	 */
	def getPartitionRange(data) -> partitionRange
	/**
	 * Client : partition 요청했는데 응답 없음. 버전 이거 맞음? 업뎃 된거 있음?
	 * Server : version 동일 시 아직 업뎃 x -> waitingRequestForUpdatedWorkerInfo에 추가
	 *                   다를 시 업뎃 o -> update된 정보 제공
	 * -> 그냥 register에 병합하는 것으로 최종 결정
	 */
	def getUpdatedWorkerInfo(version) -> (updatedWorkerInfo, Optional: version)

	def canShutdownWorkerServer()
	
}

MasterServer.proto

syntax = "proto3";

package com.master.server;

import "google/protobuf/empty.proto";

message WorkerInfo {
  string ip = 1;
  uint32 port = 2;
}
message Version {
  uint32 version = 1;
}

message RegisterReply {
  repeated WorkerInfo workerInfos = 1;
  Version version = 2;
}

message SampleKeyData {
  message Key {
    bytes keyDatum = 1;
  }
  repeated Key keyData = 1;
}

message PartitionRanges {
  // [startKey, endKey) === (startKey <= key < endKey)
  message PartitionRange {
    bytes startKey = 1;
    bytes endKey = 2;
  }
  repeated PartitionRange PartitionRanges = 1;
}

message CanShutdownWorkerServerReply {
  bool canShutdownWorkerServer = 1;
}

service MasterServer {
  rpc register (WorkerInfo) returns (RegisterReply);
  rpc getPartitionRange (SampleKeyData) returns (PartitionRanges);
  rpc getUpdatedWorkerInfo (Version) returns (WorkerInfo);
  rpc canShutdownWorkerServer (google.protobuf.Empty) returns (CanShutdownWorkerServerReply);
}

Worker

def sortAndPartition(inputDataPath, outputDataPath, partitionRange)
def merge(dataPaths, outputDataPath)

object Worker extends App {
	... (procedure)
}

Worker Server

class WorkerServerImpl extends ... { 
/** 
* worker2가 worker 1 서버에게 getPartitionData를 실행해서 
* worker 1 서버가 이를 waiting request 에 삽입 & partition이 아직 안끝난 상황. 
* 근데 worker 1이 kill 당함. 
* 그때 worker 2가 알아차릴 수 있는가? 
* 이를 timeout 관리 없이 이를 알아차릴 수 있으면 good 
* 근데 아닐 수도 있고, 그럴 위험성이 존재하는 상황이기에 
* t초마다 계속 getPartitionData를 실행해야 하지 않은가? 
*/  
var isPartitionDone: Boolean // partition 단계가 다 끝났으면 true, 안 끝났으면 false 
var waitingRequestForGetPartitionData 
/** 
* worker n이 partitioning을 끝냈을 때 
* worker n이 worker n server의 (localhost) 
* setPartitionDone을 실행해 worker server의 isPartitionDone을 true로 변경 
*/ 
def setPartitionDone()
 /** 
 * Client : 셔플링 도중 자신의 ip에 해당하는 parition 요구하기 
 * Server : isPartitoinDone이 true이면, 해당 partition 전송 
 * false이면, waitingRequestForGetPartitionData에 request 보관
 */ 
 def getPartitionData(ip) 
 def isAlive() 
 // [서버 닫히는 요청 다 확인 후의 kill] 대비용 -> master client 존재 이유  }

WorkerServer.proto