/** * An end point for the RPC that defines what functions to trigger given a message. * * It is guaranteed that `onStart`, `receive` and `onStop` will be called in sequence. * * The life-cycle of an endpoint is: * * constructor -> onStart -> receive* -> onStop * * Note: `receive` can be called concurrently. If you want `receive` to be thread-safe, please use * [[ThreadSafeRpcEndpoint]] * * If any error is thrown from one of [[RpcEndpoint]] methods except `onError`, `onError` will be * invoked with the cause. If `onError` throws an error, [[RpcEnv]] will ignore it. */ private[spark] traitRpcEndpoint{
/** * The [[RpcEnv]] that this [[RpcEndpoint]] is registered to. */ val rpcEnv: RpcEnv
/** * The [[RpcEndpointRef]] of this [[RpcEndpoint]]. `self` will become valid when `onStart` is * called. And `self` will become `null` when `onStop` is called. * * Note: Because before `onStart`, [[RpcEndpoint]] has not yet been registered and there is not * valid [[RpcEndpointRef]] for it. So don't call `self` before `onStart` is called. */ finaldefself: RpcEndpointRef = { require(rpcEnv != null, "rpcEnv has not been initialized") rpcEnv.endpointRef(this) }
/** * Process messages from [[RpcEndpointRef.send]] or [[RpcCallContext.reply)]]. If receiving a * unmatched message, [[SparkException]] will be thrown and sent to `onError`. */ defreceive: PartialFunction[Any, Unit] = { case _ => thrownewSparkException(self + " does not implement 'receive'") }
/** * Process messages from [[RpcEndpointRef.ask]]. If receiving a unmatched message, * [[SparkException]] will be thrown and sent to `onError`. */ defreceiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case _ => context.sendFailure(newSparkException(self + " won't reply anything")) }
/** * Invoked when any exception is thrown during handling messages. */ defonError(cause: Throwable): Unit = { // By default, throw e and let RpcEnv handle it throw cause }
/** * Invoked when `remoteAddress` is connected to the current node. */ defonConnected(remoteAddress: RpcAddress): Unit = { // By default, do nothing. }
/** * Invoked when `remoteAddress` is lost. */ defonDisconnected(remoteAddress: RpcAddress): Unit = { // By default, do nothing. }
/** * Invoked when some network error happens in the connection between the current node and * `remoteAddress`. */ defonNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = { // By default, do nothing. }
/** * Invoked before [[RpcEndpoint]] starts to handle any message. */ defonStart(): Unit = { // By default, do nothing. }
/** * Invoked when [[RpcEndpoint]] is stopping. `self` will be `null` in this method and you cannot * use it to send or ask messages. */ defonStop(): Unit = { // By default, do nothing. }
/** * A convenient method to stop [[RpcEndpoint]]. */ finaldefstop(): Unit = { val _self = self if (_self != null) { rpcEnv.stop(_self) } } }
/** * A reference for a remote [[RpcEndpoint]]. [[RpcEndpointRef]] is thread-safe. */ private[spark] abstractclassRpcEndpointRef(conf: SparkConf) extendsSerializablewithLogging {
private[this] val maxRetries = RpcUtils.numRetries(conf) private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf) private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)
/** * return the address for the [[RpcEndpointRef]] */ defaddress: RpcAddress
defname: String
/** * Sends a one-way asynchronous message. Fire-and-forget semantics. */ defsend(message: Any): Unit
/** * Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to * receive the reply within the specified timeout. * * This method only sends the message once and never retries. */ defask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]
/** * Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to * receive the reply within a default timeout. * * This method only sends the message once and never retries. */ defask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout)
/** * Send a message to the corresponding [[RpcEndpoint]] and get its result within a default * timeout, or throw a SparkException if this fails even after the default number of retries. * The default `timeout` will be used in every trial of calling `sendWithReply`. Because this * method retries, the message handling in the receiver side should be idempotent. * * Note: this is a blocking action which may cost a lot of time, so don't call it in a message * loop of [[RpcEndpoint]]. * * @param message the message to send * @tparam T type of the reply message * @return the reply message from the corresponding [[RpcEndpoint]] */ defaskWithRetry[T: ClassTag](message: Any): T = askWithRetry(message, defaultAskTimeout)
/** * Send a message to the corresponding [[RpcEndpoint.receive]] and get its result within a * specified timeout, throw a SparkException if this fails even after the specified number of * retries. `timeout` will be used in every trial of calling `sendWithReply`. Because this method * retries, the message handling in the receiver side should be idempotent. * * Note: this is a blocking action which may cost a lot of time, so don't call it in a message * loop of [[RpcEndpoint]]. * * @param message the message to send * @param timeout the timeout duration * @tparam T type of the reply message * @return the reply message from the corresponding [[RpcEndpoint]] */ defaskWithRetry[T: ClassTag](message: Any, timeout: RpcTimeout): T = { // TODO: Consider removing multiple attempts var attempts = 0 var lastException: Exception = null while (attempts < maxRetries) { attempts += 1 try { val future = ask[T](message, timeout) val result = timeout.awaitResult(future) if (result == null) { thrownewSparkException("RpcEndpoint returned null") } return result } catch { case ie: InterruptedException => throw ie case e: Exception => lastException = e logWarning(s"Error sending message [message = $message] in $attempts attempts", e) }
if (attempts < maxRetries) { Thread.sleep(retryWaitMs) } }