Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #22] make codeCov of mqtt.cs.channel more than 80% #45

Merged
merged 1 commit into from
Apr 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,7 @@ public static boolean checkExtDataChange(Channel channel) {
if (!getInfo(channel).containsKey(CHANNEL_EXT_CHANGE_KEY)) {
getInfo(channel).put(CHANNEL_EXT_CHANGE_KEY, false);
}
Object obj = getInfo(channel).get(CHANNEL_EXT_CHANGE_KEY);
if (obj == null) {
return false;
}
return (boolean)obj;
return (boolean) getInfo(channel).get(CHANNEL_EXT_CHANGE_KEY);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as the above comment

}

public static String getId(Channel channel) {
Expand All @@ -114,11 +110,7 @@ public static Boolean getCleanSessionFlag(Channel channel) {
if (!getInfo(channel).containsKey(CHANNEL_CLEAN_SESSION_KEY)) {
getInfo(channel).put(CHANNEL_CLEAN_SESSION_KEY, true);
}
Object obj = getInfo(channel).get(CHANNEL_CLEAN_SESSION_KEY);
if (obj == null) {
return true;
}
return (Boolean)obj;
return (Boolean) getInfo(channel).get(CHANNEL_CLEAN_SESSION_KEY);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in high concurrency scenarios, when the clear(Channel channel) method is called, NPE will be triggered here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tianliuliu yes, that's my mistake. See you've fixed it, thanks for your time and sorry for the trouble.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tianliuliu yes, that's my mistake. See you've fixed it, thanks for your time and sorry for the trouble.

it does not matter, you are welcome.

}

public static void setCleanSessionFlag(Channel channel, Boolean cleanSessionFalg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause.getMessage() != null && simpleExceptions.contains(cause.getMessage())) {
} else {
if (cause.getMessage() == null || !simpleExceptions.contains(cause.getMessage())) {
logger.error("exceptionCaught {}", ctx.channel(), cause);
}
channelManager.closeConnect(ctx.channel(), ChannelCloseFrom.SERVER, cause.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ private void doPing(Timeout timeout, Channel channel) {
TimeUnit.SECONDS);
}
} catch (Exception e) {
logger.error("", e);
logger.error("Exception when doPing: ", e);
}
}

Expand All @@ -109,22 +109,18 @@ public void closeConnect(Channel channel, ChannelCloseFrom from, String reason)
if (clientId == null) {
channelMap.remove(channelId);
sessionLoop.unloadSession(clientId, channelId);
if (channel.isActive()) {
channel.close();
}
return;
} else {
//session maybe null
Session session = sessionLoop.unloadSession(clientId, channelId);
retryDriver.unloadSession(session);
channelMap.remove(channelId);
ChannelInfo.clear(channel);
}

//session maybe null
Session session = sessionLoop.unloadSession(clientId, channelId);
retryDriver.unloadSession(session);
channelMap.remove(channelId);

ChannelInfo.clear(channel);

if (channel.isActive()) {
channel.close();
}
logger.info("Close Connect of channel {} from {} by reason of {}", channel, from, reason);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one or more
* * contributor license agreements. See the NOTICE file distributed with
* * this work for additional information regarding copyright ownership.
* * The ASF licenses this file to You under the Apache License, Version 2.0
* * (the "License"); you may not use this file except in compliance with
* * the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/

package org.apache.rocketmq.mqtt.cs.test.channel;

import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
import org.junit.Assert;
import org.junit.Test;

import java.util.concurrent.CompletableFuture;

public class TestChannelInfo {

@Test
public void test() {
NioSocketChannel channel = new NioSocketChannel();
String extDataStr = "{\"test\":\"extData\"}";

// test 'update/check/get/encode' of 'ExtData'
Assert.assertFalse(ChannelInfo.updateExtData(channel, ""));
Assert.assertFalse(ChannelInfo.checkExtDataChange(channel));
Assert.assertEquals(0, ChannelInfo.getExtData(channel).size());
// update 'ExtData'
Assert.assertTrue(ChannelInfo.updateExtData(channel, extDataStr));
Assert.assertTrue(ChannelInfo.checkExtDataChange(channel));
Assert.assertEquals(1, ChannelInfo.getExtData(channel).size());
Assert.assertEquals(extDataStr, ChannelInfo.encodeExtData(channel));

// test 'getId'
Assert.assertNotNull(ChannelInfo.getId(channel));

// test 'set/get CleanSessionFlag'
ChannelInfo.setCleanSessionFlag(channel, Boolean.FALSE);
Assert.assertFalse(ChannelInfo.getCleanSessionFlag(channel));

// test 'set/get ClientId'
String clientId = "testExtData";
ChannelInfo.setClientId(channel, clientId);
Assert.assertEquals(clientId, ChannelInfo.getClientId(channel));

// test 'set/get ChannelLifeCycle'
ChannelInfo.setChannelLifeCycle(channel, System.currentTimeMillis());
Assert.assertNotEquals(Long.MAX_VALUE, ChannelInfo.getChannelLifeCycle(channel));

// test 'set/get/remove Future'
String futureKey = "futureKey";
ChannelInfo.setFuture(channel, futureKey, new CompletableFuture<>());
Assert.assertNotNull(ChannelInfo.getFuture(channel, futureKey));
ChannelInfo.removeFuture(channel, futureKey);
Assert.assertNull(ChannelInfo.getFuture(channel, futureKey));

// test 'touch/getLastTouch'
Assert.assertEquals(0, ChannelInfo.getLastTouch(channel));
ChannelInfo.touch(channel);
Assert.assertNotEquals(0, ChannelInfo.getLastTouch(channel));

// test 'lastActive/getLastActive'
ChannelInfo.lastActive(channel, System.currentTimeMillis());
Assert.assertNotEquals(0, ChannelInfo.getLastActive(channel));

// test 'set/get RemoteIP'
ChannelInfo.setRemoteIP(channel, "");
Assert.assertEquals("", ChannelInfo.getRemoteIP(channel));

// test 'set/get KeepLive/isExpired'
Assert.assertTrue(ChannelInfo.isExpired(channel));
ChannelInfo.setKeepLive(channel, -1);
Assert.assertTrue(ChannelInfo.isExpired(channel));

// test 'set/get Owner/Namespace'
String ownerNamespc = "channelInfo";
ChannelInfo.setOwner(channel, ownerNamespc);
ChannelInfo.setNamespace(channel, ownerNamespc);
Assert.assertEquals(ownerNamespc, ChannelInfo.getOwner(channel));
Assert.assertEquals(ownerNamespc, ChannelInfo.getNamespace(channel));

// test 'clear'
ChannelInfo.clear(channel);
Assert.assertEquals(ownerNamespc, ChannelInfo.getNamespace(channel));
Assert.assertEquals(0, ChannelInfo.getExtData(channel).size());

if (channel.isActive()) {
channel.close();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one or more
* * contributor license agreements. See the NOTICE file distributed with
* * this work for additional information regarding copyright ownership.
* * The ASF licenses this file to You under the Apache License, Version 2.0
* * (the "License"); you may not use this file except in compliance with
* * the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/

package org.apache.rocketmq.mqtt.cs.test.channel;

import io.netty.channel.ChannelHandlerContext;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.mqtt.cs.channel.ChannelManager;
import org.apache.rocketmq.mqtt.cs.channel.ConnectHandler;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;

@RunWith(MockitoJUnitRunner.class)
public class TestConnectHandler {

private ConnectHandler connectHandler;

@Mock
private ChannelManager channelManager;

@Mock
private ChannelHandlerContext ctx;

@Before
public void Before() throws IllegalAccessException {
connectHandler = new ConnectHandler();
FieldUtils.writeDeclaredField(connectHandler, "channelManager", channelManager, true);
}

@After
public void After() {
}

@Test
public void testChannelActive() throws Exception {
connectHandler.channelActive(ctx);
verify(channelManager).addChannel(any());
}

@Test
public void testChannelInactive() throws Exception {
connectHandler.channelInactive(ctx);
verify(channelManager).closeConnect(any(), any(), any());
}

@Test
public void testExceptionCaught() throws Exception {
connectHandler.exceptionCaught(ctx, new Throwable("err"));
verify(channelManager).closeConnect(any(), any(), any());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one or more
* * contributor license agreements. See the NOTICE file distributed with
* * this work for additional information regarding copyright ownership.
* * The ASF licenses this file to You under the Apache License, Version 2.0
* * (the "License"); you may not use this file except in compliance with
* * the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/

package org.apache.rocketmq.mqtt.cs.test.channel;

import io.netty.channel.Channel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.mqtt.cs.channel.ChannelCloseFrom;
import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
import org.apache.rocketmq.mqtt.cs.channel.DefaultChannelManager;
import org.apache.rocketmq.mqtt.cs.config.ConnectConf;
import org.apache.rocketmq.mqtt.cs.session.infly.RetryDriver;
import org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;

import java.util.Map;

import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.verify;

@RunWith(MockitoJUnitRunner.class)
public class TestDefaultChannelManager {
private DefaultChannelManager defaultChannelManager;
private final String clientId = "clientId";
private final String channelId = "channelId";

@Mock
private SessionLoop sessionLoop;

@Mock
private ConnectConf connectConf;

@Mock
private RetryDriver retryDriver;

@Spy
private NioSocketChannel channel;

@Before
public void Before() throws IllegalAccessException {
defaultChannelManager = new DefaultChannelManager();
FieldUtils.writeDeclaredField(defaultChannelManager, "sessionLoop", sessionLoop, true);
FieldUtils.writeDeclaredField(defaultChannelManager, "connectConf", connectConf, true);
FieldUtils.writeDeclaredField(defaultChannelManager, "retryDriver", retryDriver, true);
FieldUtils.writeStaticField(DefaultChannelManager.class, "minBlankChannelSeconds", 0, true);
defaultChannelManager.init();
}

@After
public void After() {
if (channel.isActive()) {
channel.close();
}
}

@Test
public void testAddChannel() {
ChannelInfo.setClientId(channel, clientId);
ChannelInfo.setChannelLifeCycle(channel, 1000L);
defaultChannelManager.addChannel(channel);

// waiting the execution of the 'doPing' TimerTask
try {
Thread.sleep(2000);
} catch (InterruptedException ignored) {}

// verify 'doPing' and 'closeConnect'
verify(sessionLoop).unloadSession(Mockito.eq(clientId), anyString());
verify(retryDriver).unloadSession(Mockito.any());
}

@Test
public void testCloseConnectNullClientId() {
defaultChannelManager.closeConnect(channel, ChannelCloseFrom.CLIENT, "ForTest");
verify(sessionLoop).unloadSession(Mockito.isNull(), anyString());
}

@Test
public void testCloseConnect() {
ChannelInfo.setClientId(channel, clientId);
defaultChannelManager.closeConnect(channel, ChannelCloseFrom.SERVER, "ForTest");
verify(sessionLoop).unloadSession(Mockito.eq(clientId), anyString());
verify(retryDriver).unloadSession(Mockito.any());
}

@Test
public void testCloseConnectNoFrom() throws IllegalAccessException {
defaultChannelManager.closeConnect(channelId, "ForTest");
Object channelMap = FieldUtils.readDeclaredField(defaultChannelManager, "channelMap", true);
Assert.assertEquals(0, ((Map<String, Channel>) channelMap).size());
}

@Test
public void testGetChannelById() {
Assert.assertNull(defaultChannelManager.getChannelById(channelId));
}

@Test
public void testTotalConn() {
Assert.assertEquals(0, defaultChannelManager.totalConn());
defaultChannelManager.addChannel(channel);
Assert.assertEquals(1, defaultChannelManager.totalConn());
}
}