Calcite Druid Adapter 的分页死循环 Bug

问题描述


前一阵子, 公司的一个后台应用上线后发现内存泄露, 通过 JVM 内存和栈分析并结合业务日志排查发现 ( jstat 和 jstack ) :
Calcite Druid Adapter 很可能有一个死循环, 而这个死循环不断的进行 Query 请求, 而 Query SQL 只是一个简单的 Select 语句.

依赖说明 :


org.apache.calcite:calcite-druid:1.11.0druid-0.9.1 .

SQL


SELECT   *   FROM "alert_message"  WHERE  "time" >= '1970-01-01 00:00:00' AND "time" <= '2017-04-05 10:00:39'  AND "host_id"  = 'xxx'

问题定位


为了找到问题的根源, 需要去看 Calcite Druid Adapter 的源码.

源码分析


首先要找到为什么会重复发送 query, 或是什么条件或场景导致.
因此, 我们先从发请求的代码位置入手.

发起 REST 请求的位置


在 jstack 的结果中找到 DruidConnectionImplrequest 方法:

org.apache.calcite.adapter.druid.DruidConnectionImpl#request - (org/apache/calcite/adapter/druid/DruidConnectionImpl.java:94)
|- org.apache.calcite.adapter.druid.DruidQuery.DruidQueryNode#run

他被 DruidQueryNoderun 方法调用, 如下:

public void run() throws InterruptedException {
final List<ColumnMetaData.Rep> fieldTypes = new ArrayList<>();
for (RelDataTypeField field : query.getRowType().getFieldList()) {
fieldTypes.add(getPrimitive(field));
}
final DruidConnectionImpl connection =
new DruidConnectionImpl(query.druidTable.schema.url,
query.druidTable.schema.coordinatorUrl);
final boolean limitQuery = containsLimit(querySpec);
final DruidConnectionImpl.Page page = new DruidConnectionImpl.Page();
do {
final String queryString =
querySpec.getQueryString(page.pagingIdentifier, page.offset);
connection.request(querySpec.queryType, queryString, sink,
querySpec.fieldNames, fieldTypes, page);
} while (!limitQuery
&& page.pagingIdentifier != null
&& page.totalRowCount > 0);
}

其中, while 就是死循环的位置了, 那么什么时候 while 循环才会停止呢 ? 三个条件, 一一排查 :

limitQuery 为 true


final boolean limitQuery = containsLimit(querySpec);
private static boolean containsLimit(QuerySpec querySpec) {
return querySpec.queryString.contains("\"context\":{\""
+ DRUID_QUERY_FETCH + "\":true");
}

就是说如果请求 json 包含了 "context":{"druid.query.fetch":false} , 那么, limitQuery 将会为 true.
那么 fetch 又是在哪里设置呢 ?

druid 文档并没有发现 druid.query.fetch, 只有一个简单描述:

context An additional JSON Object which can be used to specify certain flags.

fetch 设置在这一行代码 org/apache/calcite/adapter/druid/DruidQuery.java:395 , 因为不是导致死循环的原因, 因此不在赘述.

page.pagingIdentifier


如果 page.pagingIdentifier = null, 那么循环将会停止.

druid 查询的结果解析方法 parse() org.apache.calcite.adapter.druid.DruidConnectionImpl#parse 中:

private void parse(QueryType queryType, InputStream in, Sink sink,
List<String> fieldNames, List<ColumnMetaData.Rep> fieldTypes, Page page) {
// ...
case SELECT:
if (parser.nextToken() == JsonToken.START_ARRAY
&& parser.nextToken() == JsonToken.START_OBJECT) {
page.pagingIdentifier = null;
page.offset = -1;
page.totalRowCount = 0;
expectScalarField(parser, DEFAULT_RESPONSE_TIMESTAMP_COLUMN);
if (parser.nextToken() == JsonToken.FIELD_NAME
&& parser.getCurrentName().equals("result")
&& parser.nextToken() == JsonToken.START_OBJECT) {
if (parser.nextToken() == JsonToken.FIELD_NAME
&& parser.getCurrentName().equals("pagingIdentifiers")
&& parser.nextToken() == JsonToken.START_OBJECT) {
JsonToken token = parser.nextToken();
while (parser.getCurrentToken() == JsonToken.FIELD_NAME) {
page.pagingIdentifier = parser.getCurrentName();
if (parser.nextToken() == JsonToken.VALUE_NUMBER_INT) {
page.offset = parser.getIntValue();
}
token = parser.nextToken();
}
expect(token, JsonToken.END_OBJECT);
}
if (parser.nextToken() == JsonToken.FIELD_NAME
&& parser.getCurrentName().equals("events")
&& parser.nextToken() == JsonToken.START_ARRAY) {
while (parser.nextToken() == JsonToken.START_OBJECT) {
expectScalarField(parser, "segmentId");
expectScalarField(parser, "offset");
if (parser.nextToken() == JsonToken.FIELD_NAME
&& parser.getCurrentName().equals("event")
&& parser.nextToken() == JsonToken.START_OBJECT) {
parseFields(fieldNames, fieldTypes, posTimestampField, rowBuilder, parser);
sink.send(rowBuilder.build());
rowBuilder.reset();
page.totalRowCount += 1;
}
expect(parser, JsonToken.END_OBJECT);
}
parser.nextToken();
}
}
}
break;
// ...
}

其中, 在 response 的 pagingIdentifiers 中, 如果有下一页, 则设置给 page.pagingIdentifier.

if (parser.nextToken() == JsonToken.FIELD_NAME
&& parser.getCurrentName().equals("pagingIdentifiers")
&& parser.nextToken() == JsonToken.START_OBJECT) {
JsonToken token = parser.nextToken();
while (parser.getCurrentToken() == JsonToken.FIELD_NAME) {
page.pagingIdentifier = parser.getCurrentName();
if (parser.nextToken() == JsonToken.VALUE_NUMBER_INT) {
page.offset = parser.getIntValue();
}
token = parser.nextToken();
}
expect(token, JsonToken.END_OBJECT);
}

测试发现, page.pagingIdentifier 总是不为 null, 因此导致死循环.

因此需要追查 pagingIdentifier 为什么不是 null.

通过分析 Druid 的 Query / Response 可以发现:

query json 1:

{"queryType":"select","dataSource":"system_alerts_01","descending":false,"intervals":["1970-01-01T00:00:00.000Z/2017-04-05T10:00:39.001Z"],"filter":{"type":"selector","dimension":"host_id","value":"fadf4b05-c78c-5aa4-a5b3-82b618bd395b"},"dimensions":["host_id","threshold_metric_name","type","threshold_compare","threshold_last_time"],"metrics":["alert_value","threshold_value","threshold_duration","alert_time"],"granularity":"all","pagingSpec":{"pagingIdentifiers":{"system_alerts_01_2017-04-05T09:00:00.000Z_2017-04-05T10:00:00.000Z_2017-04-05T09:10:11.149Z_1":0},"threshold":16384,"fromNext":true},"context":{"druid.query.fetch":false}}

query json 2:

{"queryType":"select","dataSource":"system_alerts_01","descending":false,"intervals":["1970-01-01T00:00:00.000Z/2017-04-05T09:51:13.001Z"],"filter":{"type":"selector","dimension":"host_id","value":"fadf4b05-c78c-5aa4-a5b3-82b618bd395b"},"dimensions":["host_id","threshold_metric_name","type","threshold_compare","threshold_last_time"],"metrics":["alert_value","threshold_value","threshold_duration","alert_time"],"granularity":"all","pagingSpec":{"pagingIdentifiers":{"system_alerts_01_2017-04-05T09:00:00.000Z_2017-04-05T09:51:13.001Z_2017-04-05T09:10:11.149Z_2":0},"threshold":16384,"fromNext":true},"context":{"druid.query.fetch":false}}

以上两个请求, 发现一直在交替发送. 只是 2017-04-05T09:10:11.149Z_12017-04-05T09:10:11.149Z_2 不同.

这时再结合源码, 就可以知道为何出现这种情况.

罪魁祸首


是的, 这是 druid 的问题.

{"queryType":"select","dataSource":"system_alerts_01","descending":false,"intervals":["1970-01-01T00:00:00.000Z/2017-04-05T10:00:39.001Z"],"filter":{"type":"selector","dimension":"host_id","value":"xxx"},"dimensions":["host_id","threshold_metric_name","type","threshold_compare","threshold_last_time"],"metrics":["alert_value","threshold_value","threshold_duration","alert_time"],"granularity":"all","pagingSpec":{"pagingIdentifiers":{"system_alerts_01_2017-04-05T09:00:00.000Z_2017-04-05T10:00:00.000Z_2017-04-05T09:10:11.149Z_2":0},"threshold":16384,"fromNext":true},"context":{"druid.query.fetch":false}}


Response: [ {
"timestamp" : "2017-04-05T09:10:11.000Z",
"result" : {
"pagingIdentifiers" : {
"system_alerts_01_2017-04-05T09:00:00.000Z_2017-04-05T10:00:00.000Z_2017-04-05T09:10:11.149Z" : 0,
"system_alerts_01_2017-04-05T09:00:00.000Z_2017-04-05T10:00:00.000Z_2017-04-05T09:10:11.149Z_1" : 0
},

请求 pagingIdentifiers 是 _2017-04-05T09:10:11.149Z_2 的 segment :

"pagingSpec":{"pagingIdentifiers":{"system_alerts_01_2017-04-05T09:00:00.000Z_2017-04-05T10:00:00.000Z_2017-04-05T09:10:11.149Z_2":0},"threshold":16384,"fromNext":true},"context":{"druid.query.fetch":false}}

response 返回却是前面的 2017-04-05T09:10:11.149Z2017-04-05T09:10:11.149Z_1 两个 segment (其实当前已经到了最后一页) , 这样下一次请求又将从第一页开始, 如此不断循环 :

Response: [ {
"timestamp" : "2017-04-05T09:10:11.000Z",
"result" : {
"pagingIdentifiers" : {
"system_alerts_01_2017-04-05T09:00:00.000Z_2017-04-05T10:00:00.000Z_2017-04-05T09:10:11.149Z" : 0,
"system_alerts_01_2017-04-05T09:00:00.000Z_2017-04-05T10:00:00.000Z_2017-04-05T09:10:11.149Z_1" : 0
},

github 上的 druid 相关 issue


https://github.com/druid-io/druid/pull/2480
https://github.com/druid-io/druid/issues/3618

参考