2023年11月29日发(作者:)

RocketMQConnectRuntime:⼀些优化的想法

前⾔

最近整理了⼀下RocketMQ Connect Runtime中处理connector/task 的config的逻辑,然后也尝试提出了⼏个优化RocketMQ Connect

Runtime的想法,其中有不少问题,还希望⼤家⼀起讨论。同时本⽂⾥提出的⼀个可能的办法依赖于我对ServiceThread

的理解是对的:ServiceThread 周期性地执⾏⼀个函数,或者在其他线程调⽤wakeup()时⽴即执⾏⼀次函数。

当前RocketMQ connect runtime的config处理逻辑

如何理解Data Plane和Control Plane的解耦

如何实现强制退出connector/task

如何增强当前有的异常处理机制

资源隔离相关的思考

修改后⽴即触发呢?对于Runtime来说keyValueStore的persist过程应该是透明的,具体什么时候persist不由runtime

来决定,类似于操作系统⾥page的flush过程。

另外startConnectors() 所传进来的connector config永远不可能包含是带有DELETED标志位的config, 这是因为startConnectors() 所使

⽤进来的connector config 是经过了getConnectorConfig() 的filter的,因此这个if 语句⾥有⼀半永远不会被执⾏到。

for (WorkerConnector workerConnector : workingConnectors) {

String connectorName = nectorName();

public void stop() {

(true);

wn();

();

因此由Runtime去监测SinkTask的状态是可⾏的办法,我们再来看看第⼆种由SinkTask通知runtime的实现。

理论上底层runtime的实现应该对上层的SinkTask保持透明,也就是说,如果上层SinkTask想要改变其在runtime中对应的状态的话,只能通过

openmessaging⾥定义的SinkTaskContext 来实现。不幸的是,openmessaing⾥的SinkTaskContext并没有定义相关接⼝。

SinkConnectorContext 或者 SinkTaskContext 可以看作上层connector/task 和底层runtime交互的接⼝, ⽐如在

jdbc-sink-connector ⾥就会通过调⽤context的 ⽅法来通知runtime需要重新划分该

requestTaskReconfiguration()

connector下的task。

因此我们可以这样设计

WorkerSinkTask负责管理SinkTask的状态,有New, Pending, Running, Stopping, Stopped, Error这样六种状态

如何判断某个WorkerSinkTask⽆法正常退出,必须由Runtime强制退出

如果定位到WorkerSinkTask⽆法正常退出的原因,如果有Exception应该怎么处理

⽤户是否应该有权不经过gracefully的stop尝试,直接cancel掉某个WorkerSinkTask

结合之前状态机的思路,如果⼀个task长期处于Stopping状态,那么超过⼀定的实现之后我们可以认为它timeout了,那么我们就可以直接

cancel掉这个正在执⾏的task。这个⽅法要求我们持有每个(workerSinktask) 所返回的Future对象。

WorkerConnector的强制关闭可以采取和WorkerSinkTask相同的办法

如果引⼊状态管理了,我们也许可以增强⼀下当前的RESTful接⼝,使得⽤户可以看到每个task/connector的状态.

其他问题

现在Worker的stop() 似乎并不会清空所有正在跑的task(),⽽是单纯的释放了⼀些资源,我觉得这近似于暴⼒退出。我们可以思考⼀下