Spark PRC

  • receive:接收消息并处理,但不需要给客户端回复。
  • receiveAndReply:接收消息并处理,需要给客户端回复。回复是通过 RpcCall-Context来实现的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
/**
* An end point for the RPC that defines what functions to trigger given a message.
*
* It is guaranteed that `onStart`, `receive` and `onStop` will be called in sequence.
*
* The life-cycle of an endpoint is:
*
* constructor -> onStart -> receive* -> onStop
*
* Note: `receive` can be called concurrently. If you want `receive` to be thread-safe, please use
* [[ThreadSafeRpcEndpoint]]
*
* If any error is thrown from one of [[RpcEndpoint]] methods except `onError`, `onError` will be
* invoked with the cause. If `onError` throws an error, [[RpcEnv]] will ignore it.
*/
private[spark] trait RpcEndpoint {

/**
* The [[RpcEnv]] that this [[RpcEndpoint]] is registered to.
*/
val rpcEnv: RpcEnv

/**
* The [[RpcEndpointRef]] of this [[RpcEndpoint]]. `self` will become valid when `onStart` is
* called. And `self` will become `null` when `onStop` is called.
*
* Note: Because before `onStart`, [[RpcEndpoint]] has not yet been registered and there is not
* valid [[RpcEndpointRef]] for it. So don't call `self` before `onStart` is called.
*/
final def self: RpcEndpointRef = {
require(rpcEnv != null, "rpcEnv has not been initialized")
rpcEnv.endpointRef(this)
}

/**
* Process messages from [[RpcEndpointRef.send]] or [[RpcCallContext.reply)]]. If receiving a
* unmatched message, [[SparkException]] will be thrown and sent to `onError`.
*/
def receive: PartialFunction[Any, Unit] = {
case _ => throw new SparkException(self + " does not implement 'receive'")
}

/**
* Process messages from [[RpcEndpointRef.ask]]. If receiving a unmatched message,
* [[SparkException]] will be thrown and sent to `onError`.
*/
def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case _ => context.sendFailure(new SparkException(self + " won't reply anything"))
}

/**
* Invoked when any exception is thrown during handling messages.
*/
def onError(cause: Throwable): Unit = {
// By default, throw e and let RpcEnv handle it
throw cause
}

/**
* Invoked when `remoteAddress` is connected to the current node.
*/
def onConnected(remoteAddress: RpcAddress): Unit = {
// By default, do nothing.
}

/**
* Invoked when `remoteAddress` is lost.
*/
def onDisconnected(remoteAddress: RpcAddress): Unit = {
// By default, do nothing.
}

/**
* Invoked when some network error happens in the connection between the current node and
* `remoteAddress`.
*/
def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = {
// By default, do nothing.
}

/**
* Invoked before [[RpcEndpoint]] starts to handle any message.
*/
def onStart(): Unit = {
// By default, do nothing.
}

/**
* Invoked when [[RpcEndpoint]] is stopping. `self` will be `null` in this method and you cannot
* use it to send or ask messages.
*/
def onStop(): Unit = {
// By default, do nothing.
}

/**
* A convenient method to stop [[RpcEndpoint]].
*/
final def stop(): Unit = {
val _self = self
if (_self != null) {
rpcEnv.stop(_self)
}
}
}

在Akka中只要你持有了⼀个Actor的引⽤ ActorRef,那么你就可以使⽤此ActorRef向远端的Actor发起请求。 RpcEndpointRef也具有同等的效⽤,要向⼀个远端的RpcEndpoint发起请 求,你就必须持有这个RpcEndpoint的RpcEndpointRef。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
/**
* A reference for a remote [[RpcEndpoint]]. [[RpcEndpointRef]] is thread-safe.
*/
private[spark] abstract class RpcEndpointRef(conf: SparkConf)
extends Serializable with Logging {

private[this] val maxRetries = RpcUtils.numRetries(conf)
private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)

/**
* return the address for the [[RpcEndpointRef]]
*/
def address: RpcAddress

def name: String

/**
* Sends a one-way asynchronous message. Fire-and-forget semantics.
*/
def send(message: Any): Unit

/**
* Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to
* receive the reply within the specified timeout.
*
* This method only sends the message once and never retries.
*/
def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]

/**
* Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to
* receive the reply within a default timeout.
*
* This method only sends the message once and never retries.
*/
def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout)

/**
* Send a message to the corresponding [[RpcEndpoint]] and get its result within a default
* timeout, or throw a SparkException if this fails even after the default number of retries.
* The default `timeout` will be used in every trial of calling `sendWithReply`. Because this
* method retries, the message handling in the receiver side should be idempotent.
*
* Note: this is a blocking action which may cost a lot of time, so don't call it in a message
* loop of [[RpcEndpoint]].
*
* @param message the message to send
* @tparam T type of the reply message
* @return the reply message from the corresponding [[RpcEndpoint]]
*/
def askWithRetry[T: ClassTag](message: Any): T = askWithRetry(message, defaultAskTimeout)

/**
* Send a message to the corresponding [[RpcEndpoint.receive]] and get its result within a
* specified timeout, throw a SparkException if this fails even after the specified number of
* retries. `timeout` will be used in every trial of calling `sendWithReply`. Because this method
* retries, the message handling in the receiver side should be idempotent.
*
* Note: this is a blocking action which may cost a lot of time, so don't call it in a message
* loop of [[RpcEndpoint]].
*
* @param message the message to send
* @param timeout the timeout duration
* @tparam T type of the reply message
* @return the reply message from the corresponding [[RpcEndpoint]]
*/
def askWithRetry[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
// TODO: Consider removing multiple attempts
var attempts = 0
var lastException: Exception = null
while (attempts < maxRetries) {
attempts += 1
try {
val future = ask[T](message, timeout)
val result = timeout.awaitResult(future)
if (result == null) {
throw new SparkException("RpcEndpoint returned null")
}
return result
} catch {
case ie: InterruptedException => throw ie
case e: Exception =>
lastException = e
logWarning(s"Error sending message [message = $message] in $attempts attempts", e)
}

if (attempts < maxRetries) {
Thread.sleep(retryWaitMs)
}
}

throw new SparkException(
s"Error sending message [message = $message]", lastException)
}

}
  • RpcEndpoint:RPC端点,即RPC分布式环境中⼀个具体的实例,其可 以对指定的消息进⾏处理。由于RpcEndpoint是⼀个特质,所以需要提供 RpcEndpoint的实现类。特质RpcEndpoint已在前⽂详细介绍,此处不再赘 述。

  • RpcEndpointRef:RPC端点引⽤,即RPC分布式环境中⼀个具体实体 的引⽤,所谓引⽤实际是“spark://host:port/name”这种格式的地址。其中, host为端点所在RPC服务所在的主机IP,port是端点所在RPC服务的端⼜, name是端点实例的名称。抽象类RpcEndpointRef已在前⽂详细介绍,此处 不再赘述。