1
+ /*
2
+ * Licensed to the Apache Software Foundation (ASF) under one or more
3
+ * contributor license agreements. See the NOTICE file distributed with
4
+ * this work for additional information regarding copyright ownership.
5
+ * The ASF licenses this file to You under the Apache License, Version 2.0
6
+ * (the "License"); you may not use this file except in compliance with
7
+ * the License. You may obtain a copy of the License at
8
+ *
9
+ * http://www.apache.org/licenses/LICENSE-2.0
10
+ *
11
+ * Unless required by applicable law or agreed to in writing, software
12
+ * distributed under the License is distributed on an "AS IS" BASIS,
13
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
+ * See the License for the specific language governing permissions and
15
+ * limitations under the License.
16
+ */
17
+
18
+ package org .apache .rocketmq .namesrv .processor ;
19
+
20
+ import io .netty .channel .ChannelHandlerContext ;
21
+ import java .lang .reflect .Field ;
22
+ import java .util .ArrayList ;
23
+ import java .util .HashMap ;
24
+ import java .util .List ;
25
+ import org .apache .rocketmq .client .ClientConfig ;
26
+ import org .apache .rocketmq .client .exception .MQClientException ;
27
+ import org .apache .rocketmq .client .impl .MQClientAPIImpl ;
28
+ import org .apache .rocketmq .client .impl .MQClientManager ;
29
+ import org .apache .rocketmq .client .impl .factory .MQClientInstance ;
30
+ import org .apache .rocketmq .common .namesrv .NamesrvConfig ;
31
+ import org .apache .rocketmq .common .protocol .ResponseCode ;
32
+ import org .apache .rocketmq .common .protocol .route .BrokerData ;
33
+ import org .apache .rocketmq .common .protocol .route .TopicRouteData ;
34
+ import org .apache .rocketmq .namesrv .NamesrvController ;
35
+ import org .apache .rocketmq .remoting .CommandCustomHeader ;
36
+ import org .apache .rocketmq .remoting .exception .RemotingCommandException ;
37
+ import org .apache .rocketmq .remoting .exception .RemotingException ;
38
+ import org .apache .rocketmq .remoting .netty .NettyServerConfig ;
39
+ import org .apache .rocketmq .remoting .protocol .RemotingCommand ;
40
+ import org .apache .rocketmq .tools .admin .DefaultMQAdminExt ;
41
+ import org .apache .rocketmq .tools .admin .DefaultMQAdminExtImpl ;
42
+ import org .junit .After ;
43
+ import org .junit .Before ;
44
+ import org .junit .Test ;
45
+
46
+ import static org .assertj .core .api .Assertions .assertThat ;
47
+ import static org .mockito .ArgumentMatchers .anyLong ;
48
+ import static org .mockito .ArgumentMatchers .anyString ;
49
+ import static org .mockito .Mockito .mock ;
50
+ import static org .mockito .Mockito .when ;
51
+
52
+ public class ClusterTestRequestProcessorTest {
53
+ private ClusterTestRequestProcessor clusterTestProcessor ;
54
+ private DefaultMQAdminExtImpl defaultMQAdminExtImpl ;
55
+ private MQClientInstance mqClientInstance = MQClientManager .getInstance ().getAndCreateMQClientInstance (new ClientConfig ());
56
+ private MQClientAPIImpl mQClientAPIImpl ;
57
+ private ChannelHandlerContext ctx ;
58
+
59
+ @ Before
60
+ public void init () throws NoSuchFieldException , IllegalAccessException , RemotingException , MQClientException , InterruptedException {
61
+ NamesrvController namesrvController = new NamesrvController (
62
+ new NamesrvConfig (),
63
+ new NettyServerConfig ()
64
+ );
65
+
66
+ clusterTestProcessor = new ClusterTestRequestProcessor (namesrvController , "default-producer" );
67
+ mQClientAPIImpl = mock (MQClientAPIImpl .class );
68
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt ();
69
+ defaultMQAdminExtImpl = new DefaultMQAdminExtImpl (defaultMQAdminExt , 1000 );
70
+ ctx = mock (ChannelHandlerContext .class );
71
+
72
+ Field field = DefaultMQAdminExtImpl .class .getDeclaredField ("mqClientInstance" );
73
+ field .setAccessible (true );
74
+ field .set (defaultMQAdminExtImpl , mqClientInstance );
75
+ field = MQClientInstance .class .getDeclaredField ("mQClientAPIImpl" );
76
+ field .setAccessible (true );
77
+ field .set (mqClientInstance , mQClientAPIImpl );
78
+ field = ClusterTestRequestProcessor .class .getDeclaredField ("adminExt" );
79
+ field .setAccessible (true );
80
+ field .set (clusterTestProcessor , defaultMQAdminExt );
81
+
82
+ TopicRouteData topicRouteData = new TopicRouteData ();
83
+ List <BrokerData > brokerDatas = new ArrayList <>();
84
+ HashMap <Long , String > brokerAddrs = new HashMap <>();
85
+ brokerAddrs .put (1234l , "127.0.0.1:10911" );
86
+ BrokerData brokerData = new BrokerData ();
87
+ brokerData .setCluster ("default-cluster" );
88
+ brokerData .setBrokerName ("default-broker" );
89
+ brokerData .setBrokerAddrs (brokerAddrs );
90
+ brokerDatas .add (brokerData );
91
+ topicRouteData .setBrokerDatas (brokerDatas );
92
+ when (mQClientAPIImpl .getTopicRouteInfoFromNameServer (anyString (), anyLong ())).thenReturn (topicRouteData );
93
+ }
94
+
95
+ @ After
96
+ public void terminate () {
97
+ }
98
+
99
+ @ Test
100
+ public void testGetRouteInfoByTopic () throws RemotingCommandException {
101
+ RemotingCommand request = RemotingCommand .createRequestCommand (12 , new CommandCustomHeader () {
102
+ @ Override public void checkFields () throws RemotingCommandException {
103
+
104
+ }
105
+ });
106
+ RemotingCommand remoting = clusterTestProcessor .getRouteInfoByTopic (ctx , request );
107
+ assertThat (remoting .getCode ()).isEqualTo (ResponseCode .TOPIC_NOT_EXIST );
108
+ assertThat (remoting .getBody ()).isNull ();
109
+ assertThat (remoting .getRemark ()).isNotNull ();
110
+ }
111
+
112
+ }
0 commit comments