2023年11月29日发(作者:)
RocketMQConnectRuntime:⼀些优化的想法
前⾔
最近整理了⼀下RocketMQ Connect Runtime中处理connector/task 的config的逻辑,然后也尝试提出了⼏个优化RocketMQ Connect
Runtime的想法,其中有不少问题,还希望⼤家⼀起讨论。同时本⽂⾥提出的⼀个可能的办法依赖于我对ServiceThread
的理解是对的:ServiceThread 周期性地执⾏⼀个函数,或者在其他线程调⽤wakeup()时⽴即执⾏⼀次函数。
当前RocketMQ connect runtime的config处理逻辑
如何理解Data Plane和Control Plane的解耦
如何实现强制退出connector/task
如何增强当前有的异常处理机制
资源隔离相关的思考
修改后⽴即触发呢?对于Runtime来说keyValueStore的persist过程应该是透明的,具体什么时候persist不由runtime
来决定,类似于操作系统⾥page的flush过程。
另外startConnectors() 所传进来的connector config永远不可能包含是带有DELETED标志位的config, 这是因为startConnectors() 所使
⽤进来的connector config 是经过了getConnectorConfig() 的filter的,因此这个if 语句⾥有⼀半永远不会被执⾏到。
for (WorkerConnector workerConnector : workingConnectors) {
String connectorName = nectorName();
public void stop() {
(true);
wn();
();
因此由Runtime去监测SinkTask的状态是可⾏的办法,我们再来看看第⼆种由SinkTask通知runtime的实现。
理论上底层runtime的实现应该对上层的SinkTask保持透明,也就是说,如果上层SinkTask想要改变其在runtime中对应的状态的话,只能通过
openmessaging⾥定义的SinkTaskContext 来实现。不幸的是,openmessaing⾥的SinkTaskContext并没有定义相关接⼝。
SinkConnectorContext 或者 SinkTaskContext 可以看作上层connector/task 和底层runtime交互的接⼝, ⽐如在
jdbc-sink-connector ⾥就会通过调⽤context的 ⽅法来通知runtime需要重新划分该
requestTaskReconfiguration()
connector下的task。
因此我们可以这样设计
WorkerSinkTask负责管理SinkTask的状态,有New, Pending, Running, Stopping, Stopped, Error这样六种状态
如何判断某个WorkerSinkTask⽆法正常退出,必须由Runtime强制退出
如果定位到WorkerSinkTask⽆法正常退出的原因,如果有Exception应该怎么处理
⽤户是否应该有权不经过gracefully的stop尝试,直接cancel掉某个WorkerSinkTask
结合之前状态机的思路,如果⼀个task长期处于Stopping状态,那么超过⼀定的实现之后我们可以认为它timeout了,那么我们就可以直接
cancel掉这个正在执⾏的task。这个⽅法要求我们持有每个(workerSinktask) 所返回的Future对象。
WorkerConnector的强制关闭可以采取和WorkerSinkTask相同的办法
如果引⼊状态管理了,我们也许可以增强⼀下当前的RESTful接⼝,使得⽤户可以看到每个task/connector的状态.
其他问题
现在Worker的stop() 似乎并不会清空所有正在跑的task(),⽽是单纯的释放了⼀些资源,我觉得这近似于暴⼒退出。我们可以思考⼀下


发布评论