20
20
import java .util .HashMap ;
21
21
import java .util .HashSet ;
22
22
import java .util .List ;
23
- import java .util .Map ;
24
23
import java .util .Set ;
25
24
import java .util .UUID ;
26
25
import java .util .concurrent .ArrayBlockingQueue ;
30
29
import java .util .concurrent .atomic .AtomicInteger ;
31
30
import java .util .concurrent .atomic .AtomicLong ;
32
31
33
- import org .apache .commons .lang3 .StringUtils ;
34
32
import org .apache .rocketmq .client .AccessChannel ;
35
33
import org .apache .rocketmq .client .common .ThreadLocalIndex ;
36
34
import org .apache .rocketmq .client .exception .MQClientException ;
@@ -59,22 +57,25 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
59
57
private final int queueSize ;
60
58
private final int batchSize ;
61
59
private final int maxMsgSize ;
60
+ private final long pollingTimeMil ;
61
+ private final long waitTimeThresholdMil ;
62
62
private final DefaultMQProducer traceProducer ;
63
63
private final ThreadPoolExecutor traceExecutor ;
64
64
// The last discard number of log
65
65
private AtomicLong discardCount ;
66
66
private Thread worker ;
67
67
private final ArrayBlockingQueue <TraceContext > traceContextQueue ;
68
+ private final HashMap <String , TraceDataSegment > taskQueueByTopic ;
68
69
private ArrayBlockingQueue <Runnable > appenderQueue ;
69
70
private volatile Thread shutDownHook ;
70
71
private volatile boolean stopped = false ;
71
72
private DefaultMQProducerImpl hostProducer ;
72
73
private DefaultMQPushConsumerImpl hostConsumer ;
73
74
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex ();
74
75
private String dispatcherId = UUID .randomUUID ().toString ();
75
- private String traceTopicName ;
76
+ private volatile String traceTopicName ;
76
77
private AtomicBoolean isStarted = new AtomicBoolean (false );
77
- private AccessChannel accessChannel = AccessChannel .LOCAL ;
78
+ private volatile AccessChannel accessChannel = AccessChannel .LOCAL ;
78
79
private String group ;
79
80
private Type type ;
80
81
@@ -83,8 +84,11 @@ public AsyncTraceDispatcher(String group, Type type, String traceTopicName, RPCH
83
84
this .queueSize = 2048 ;
84
85
this .batchSize = 100 ;
85
86
this .maxMsgSize = 128000 ;
87
+ this .pollingTimeMil = 100 ;
88
+ this .waitTimeThresholdMil = 500 ;
86
89
this .discardCount = new AtomicLong (0L );
87
90
this .traceContextQueue = new ArrayBlockingQueue <TraceContext >(1024 );
91
+ this .taskQueueByTopic = new HashMap ();
88
92
this .group = group ;
89
93
this .type = type ;
90
94
@@ -243,126 +247,147 @@ class AsyncRunnable implements Runnable {
243
247
@ Override
244
248
public void run () {
245
249
while (!stopped ) {
246
- List <TraceContext > contexts = new ArrayList <TraceContext >(batchSize );
247
250
synchronized (traceContextQueue ) {
248
- for ( int i = 0 ; i < batchSize ; i ++) {
249
- TraceContext context = null ;
251
+ long endTime = System . currentTimeMillis () + pollingTimeMil ;
252
+ while ( System . currentTimeMillis () < endTime ) {
250
253
try {
251
- //get trace data element from blocking Queue - traceContextQueue
252
- context = traceContextQueue .poll (5 , TimeUnit .MILLISECONDS );
253
- } catch (InterruptedException e ) {
254
- }
255
- if (context != null ) {
256
- contexts .add (context );
257
- } else {
258
- break ;
254
+ TraceContext traceContext = traceContextQueue .poll (
255
+ endTime - System .currentTimeMillis (), TimeUnit .MILLISECONDS
256
+ );
257
+
258
+ if (traceContext != null && !traceContext .getTraceBeans ().isEmpty ()) {
259
+ // get the topic which the trace message will send to
260
+ String traceTopicName = this .getTraceTopicName (traceContext .getRegionId ());
261
+
262
+ // get the traceDataSegment which will save this trace message, create if null
263
+ TraceDataSegment traceDataSegment = taskQueueByTopic .get (traceTopicName );
264
+ if (traceDataSegment == null ) {
265
+ traceDataSegment = new TraceDataSegment (traceTopicName , traceContext .getRegionId ());
266
+ taskQueueByTopic .put (traceTopicName , traceDataSegment );
267
+ }
268
+
269
+ // encode traceContext and save it into traceDataSegment
270
+ // NOTE if data size in traceDataSegment more than maxMsgSize,
271
+ // a AsyncDataSendTask will be created and submitted
272
+ TraceTransferBean traceTransferBean = TraceDataEncoder .encoderFromContextBean (traceContext );
273
+ traceDataSegment .addTraceTransferBean (traceTransferBean );
274
+ }
275
+ } catch (InterruptedException ignore ) {
276
+ log .debug ("traceContextQueue#poll exception" );
259
277
}
260
278
}
261
- if (contexts .size () > 0 ) {
262
- AsyncAppenderRequest request = new AsyncAppenderRequest (contexts );
263
- traceExecutor .submit (request );
264
- } else if (AsyncTraceDispatcher .this .stopped ) {
279
+
280
+ // NOTE send the data in traceDataSegment which the first TraceTransferBean
281
+ // is longer than waitTimeThreshold
282
+ sendDataByTimeThreshold ();
283
+
284
+ if (AsyncTraceDispatcher .this .stopped ) {
265
285
this .stopped = true ;
266
286
}
267
287
}
268
288
}
269
289
270
290
}
271
- }
272
291
273
- class AsyncAppenderRequest implements Runnable {
274
- List <TraceContext > contextList ;
292
+ private void sendDataByTimeThreshold () {
293
+ long now = System .currentTimeMillis ();
294
+ for (TraceDataSegment taskInfo : taskQueueByTopic .values ()) {
295
+ if (now - taskInfo .firstBeanAddTime >= waitTimeThresholdMil ) {
296
+ taskInfo .sendAllData ();
297
+ }
298
+ }
299
+ }
275
300
276
- public AsyncAppenderRequest (final List <TraceContext > contextList ) {
277
- if (contextList != null ) {
278
- this .contextList = contextList ;
279
- } else {
280
- this .contextList = new ArrayList <TraceContext >(1 );
301
+ private String getTraceTopicName (String regionId ) {
302
+ AccessChannel accessChannel = AsyncTraceDispatcher .this .getAccessChannel ();
303
+ if (AccessChannel .CLOUD == accessChannel ) {
304
+ return TraceConstants .TRACE_TOPIC_PREFIX + regionId ;
281
305
}
306
+
307
+ return AsyncTraceDispatcher .this .getTraceTopicName ();
282
308
}
309
+ }
283
310
284
- @ Override
285
- public void run () {
286
- sendTraceData (contextList );
311
+ class TraceDataSegment {
312
+ private long firstBeanAddTime ;
313
+ private int currentMsgSize ;
314
+ private final String traceTopicName ;
315
+ private final String regionId ;
316
+ private final List <TraceTransferBean > traceTransferBeanList = new ArrayList ();
317
+
318
+ TraceDataSegment (String traceTopicName , String regionId ) {
319
+ this .traceTopicName = traceTopicName ;
320
+ this .regionId = regionId ;
287
321
}
288
322
289
- public void sendTraceData (List <TraceContext > contextList ) {
290
- Map <String , List <TraceTransferBean >> transBeanMap = new HashMap <String , List <TraceTransferBean >>();
291
- for (TraceContext context : contextList ) {
292
- if (context .getTraceBeans ().isEmpty ()) {
293
- continue ;
294
- }
295
- // Topic value corresponding to original message entity content
296
- String topic = context .getTraceBeans ().get (0 ).getTopic ();
297
- String regionId = context .getRegionId ();
298
- // Use original message entity's topic as key
299
- String key = topic ;
300
- if (!StringUtils .isBlank (regionId )) {
301
- key = key + TraceConstants .CONTENT_SPLITOR + regionId ;
302
- }
303
- List <TraceTransferBean > transBeanList = transBeanMap .get (key );
304
- if (transBeanList == null ) {
305
- transBeanList = new ArrayList <TraceTransferBean >();
306
- transBeanMap .put (key , transBeanList );
307
- }
308
- TraceTransferBean traceData = TraceDataEncoder .encoderFromContextBean (context );
309
- transBeanList .add (traceData );
310
- }
311
- for (Map .Entry <String , List <TraceTransferBean >> entry : transBeanMap .entrySet ()) {
312
- String [] key = entry .getKey ().split (String .valueOf (TraceConstants .CONTENT_SPLITOR ));
313
- String dataTopic = entry .getKey ();
314
- String regionId = null ;
315
- if (key .length > 1 ) {
316
- dataTopic = key [0 ];
317
- regionId = key [1 ];
318
- }
319
- flushData (entry .getValue (), dataTopic , regionId );
323
+ public void addTraceTransferBean (TraceTransferBean traceTransferBean ) {
324
+ initFirstBeanAddTime ();
325
+ this .traceTransferBeanList .add (traceTransferBean );
326
+ this .currentMsgSize += traceTransferBean .getTransData ().length ();
327
+ if (currentMsgSize >= traceProducer .getMaxMessageSize ()) {
328
+ List <TraceTransferBean > dataToSend = new ArrayList (traceTransferBeanList );
329
+ AsyncDataSendTask asyncDataSendTask = new AsyncDataSendTask (traceTopicName , regionId , dataToSend );
330
+ traceExecutor .submit (asyncDataSendTask );
331
+
332
+ this .clear ();
320
333
}
321
334
}
322
335
323
- /**
324
- * Batch sending data actually
325
- */
326
- private void flushData (List <TraceTransferBean > transBeanList , String dataTopic , String regionId ) {
327
- if (transBeanList .size () == 0 ) {
336
+ public void sendAllData () {
337
+ if (this .traceTransferBeanList .isEmpty ()) {
328
338
return ;
329
339
}
330
- // Temporary buffer
340
+ List <TraceTransferBean > dataToSend = new ArrayList (traceTransferBeanList );
341
+ AsyncDataSendTask asyncDataSendTask = new AsyncDataSendTask (traceTopicName , regionId , dataToSend );
342
+ traceExecutor .submit (asyncDataSendTask );
343
+
344
+ this .clear ();
345
+ }
346
+
347
+ private void initFirstBeanAddTime () {
348
+ if (firstBeanAddTime == 0 ) {
349
+ firstBeanAddTime = System .currentTimeMillis ();
350
+ }
351
+ }
352
+
353
+ private void clear () {
354
+ this .firstBeanAddTime = 0 ;
355
+ this .currentMsgSize = 0 ;
356
+ this .traceTransferBeanList .clear ();
357
+ }
358
+ }
359
+
360
+
361
+ class AsyncDataSendTask implements Runnable {
362
+ private final String traceTopicName ;
363
+ private final String regionId ;
364
+ private final List <TraceTransferBean > traceTransferBeanList ;
365
+
366
+ public AsyncDataSendTask (String traceTopicName , String regionId , List <TraceTransferBean > traceTransferBeanList ) {
367
+ this .traceTopicName = traceTopicName ;
368
+ this .regionId = regionId ;
369
+ this .traceTransferBeanList = traceTransferBeanList ;
370
+ }
371
+
372
+ @ Override
373
+ public void run () {
331
374
StringBuilder buffer = new StringBuilder (1024 );
332
- int count = 0 ;
333
375
Set <String > keySet = new HashSet <String >();
334
-
335
- for (TraceTransferBean bean : transBeanList ) {
336
- // Keyset of message trace includes msgId of or original message
376
+ for (TraceTransferBean bean : traceTransferBeanList ) {
337
377
keySet .addAll (bean .getTransKey ());
338
378
buffer .append (bean .getTransData ());
339
- count ++;
340
- // Ensure that the size of the package should not exceed the upper limit.
341
- if (buffer .length () >= traceProducer .getMaxMessageSize ()) {
342
- sendTraceDataByMQ (keySet , buffer .toString (), dataTopic , regionId );
343
- // Clear temporary buffer after finishing
344
- buffer .delete (0 , buffer .length ());
345
- keySet .clear ();
346
- count = 0 ;
347
- }
348
- }
349
- if (count > 0 ) {
350
- sendTraceDataByMQ (keySet , buffer .toString (), dataTopic , regionId );
351
379
}
352
- transBeanList . clear ( );
380
+ sendTraceDataByMQ ( keySet , buffer . toString (), traceTopicName );
353
381
}
354
382
355
383
/**
356
384
* Send message trace data
357
385
*
358
386
* @param keySet the keyset in this batch(including msgId in original message not offsetMsgId)
359
387
* @param data the message trace data in this batch
388
+ * @param traceTopic the topic which message trace data will send to
360
389
*/
361
- private void sendTraceDataByMQ (Set <String > keySet , final String data , String dataTopic , String regionId ) {
362
- String traceTopic = traceTopicName ;
363
- if (AccessChannel .CLOUD == accessChannel ) {
364
- traceTopic = TraceConstants .TRACE_TOPIC_PREFIX + regionId ;
365
- }
390
+ private void sendTraceDataByMQ (Set <String > keySet , final String data , String traceTopic ) {
366
391
final Message message = new Message (traceTopic , data .getBytes ());
367
392
// Keyset of message trace includes msgId of or original message
368
393
message .setKeys (keySet );
0 commit comments