Creating ISO8583 Client using Netty Framework

Creating an ISO8583 Client using Netty Framework is a little different compared to Spring Integration, i didn't find any reference for correlation mechanism. The Best example I found is just creating a Bootstrap instance each time we are going to send a request. Doing that, it’s going to create a new connection with a different port each time we send a request to the server.

This post will explain how to create an ISO8583 Client with stateful connection and has auto reconnect ability and for the correlation strategy, simply use a concurrent map with combination bit 11 and 41 as the key. And for message specification we are going to reuse Ascii Decoder, Ascii Encoder, Iso8583 Decoder and Iso8583 Encoder created before.

Diagram above describes how the IsoClient works:

  1. Call Channel.writeAndFlush() method to send a request message, after sending the message, it will wait for the response by checking the concurrent map with bit 11 and 41 as the key.
  2. Message encoder will transform CustomIsoMessage into a byte array.
  3. Frame Encoder will add a message length indicator with ascii format into the start of the message.
  4. Messages will be sent into the wire by socket connection.
  5. Socket sending the message.
  6. Socket receive an incoming (response) message.
  7. Frame Decoder will get the message from the stream.
  8. Message Decoder will transform ByteBuf into CustomIsoMessage.
  9. Inbound Handler will put received response CustomIsoMessage into Concurrent Map.
  10. Response message stored in concurrent map with bit 11 and 41 as the key.
  11. IsoClient getting the response message.

Compared to the server, for the client Bootstrap is used instead of ServerBootstrap and for the group there is only one worker, since there are no parent group and child group. For the handler, no complex logic is used and only puts the response message into a correlation map. Let’s take a look on the constructor of the IsoClient class:

  
public IsoClient(String host, int port, long reconnectDelay) {
	this.host = host;
	this.port = port;
	this.reconnectDelay = reconnectDelay;
	this.timer = new Timer();
	this.bootstrap = new Bootstrap();
	this.workGroup = new MultiThreadIoEventLoopGroup(20, NioIoHandler.newFactory());
	bootstrap.group(workGroup);
	bootstrap.channel(NioSocketChannel.class);
	bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
	bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
	bootstrap.handler(new ChannelInitializer<SocketChannel>() {
		@Override
		protected void initChannel(SocketChannel ch) throws Exception {
			ChannelPipeline p = ch.pipeline();
			p.addLast("frameDecoder", new AsciiFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
			p.addLast("decoder", new Iso8583Decoder());
			p.addLast("frameEncoder", new AsciiFrameEncoder(4, false));
			p.addLast("encoder", new Iso8583Encoder());
			p.addLast(new SimpleChannelInboundHandler<CustomIsoMessage>() {
				@Override
				protected void channelRead0(ChannelHandlerContext ctx, CustomIsoMessage msg) throws Exception {
					log.debug("Putting {} into correlation map", msg.getMessageKey());
					correlationMap.put(msg.getMessageKey(), msg);
				}
				
			});
		}
		
	});
}  
  

The sendAndReceive method is used to send requests and wait for the response in the correlation map.

  
public CustomIsoMessage sendAndReceive(CustomIsoMessage requestMsg, long timeout) throws IOException {
	sendRequest(requestMsg);
	long start = System.currentTimeMillis();
	while (!correlationMap.containsKey(requestMsg.getMessageKey()) 
			&& (System.currentTimeMillis() - start < timeout) ) {
		log.debug("Getting {} from correlation map", requestMsg.getMessageKey());
		CommonUtil.sleep(100);
	}
	// take the message
	return correlationMap.remove(requestMsg.getMessageKey());
}
  

For connecting to the server and listening to the connection status, we are creating two ChannelFutureListener first, for listening whether a connection is successfully established or not, and the second is to listen if there is a connection lost.

  
private void connect() {
	try{
		log.info("Connecting to {}:{}", this.host, this.port);
		ChannelFuture channelFuture = bootstrap.connect(this.host, this.port);
		channelFuture.addListener(new ChannelFutureListener() {
			
			@Override
			public void operationComplete(ChannelFuture future) throws Exception {
				if( !future.isSuccess() ) {
					future.channel().close();
					bootstrap.connect(host, port).addListener(this);
					
				} else {
					channel = future.channel();
					log.info("Connection to {}:{} Established", host, port);
					channel.closeFuture().addListener(new ChannelFutureListener() {							
						@Override
						public void operationComplete(ChannelFuture future) throws Exception {
							log.warn("Connection to {}:{} Lost", host, port);
							if (!isShutdown)
								reconnect( reconnectDelay );
							else
								log.warn("Shutting Channel Down");
						}
					});
				}
			}
			
		});
} catch (Exception e) {
	log.error("Connection not Established", e);
	if (!isShutdown)
			reconnect( reconnectDelay );
	}
}
  

The next part is the sendRequest method, please note that it is a synchronized method. First, it will check whether the channel is null to perform the initial connection and wait until the channel is active. If connection is established, calling channel.writeAndFlush method will put the CustomIsoMessage (requestMsg) into the wire, to be processed by the Encoders.

  
private synchronized void sendRequest(CustomIsoMessage requestMsg) throws IOException {
	if (channel == null) {
		reconnect(5);
		// wait until 10s
		int counter = 1;
		while ( (channel == null || !channel.isActive()) && counter <= 20) {
			CommonUtil.sleep(500l);
			counter++;
		}
	}
	
	if( channel != null && channel.isActive() ) {
		channel.writeAndFlush( requestMsg );
		
	} else {
		throw new IOException("Can't send message to inactive connection");
	}
}
  

For the reconnect mechanism, Timer is used to execute the connect method in the different thread with given the delay.

  
private void reconnect( long delay ) {
	log.info("Reconnecting in {} millis", delay);
	timer.schedule( new TimerTask() {
		@Override
		public void run() {
			if ((channel == null || !channel.isActive()) 
					&& !isShutdown)
				connect();
		}
	}, delay );
}
  

And for the last part is the sample usage for sending concurrent requests to the server and making sure it gets the correct response. This will send a parallel echo request into the server using executor service.

  
public class IsoClientTest {

	private static ConfigurationUtil configurationUtil = ConfigurationUtil.getInstance();
	private static ExecutorService executorService = Executors.newFixedThreadPool(2);
	
	public static void main(String[] args) throws InterruptedException {
		String serverPort = configurationUtil.getConfig(ConfigurationKey.ASCII_SERVER_PORT, "8081");		
		MessageFactoryUtil messageFactory = MessageFactoryUtil.getInstance();
		
		IsoClient isoClient = new IsoClient("localhost", Integer.valueOf(serverPort), 5000);

		AtomicInteger stan = new AtomicInteger();
		List<Callable<Void>> tasks = new ArrayList<>();
		for (int i = 0; i < 1; i++) {
			tasks.add(() -> {
				sendRequest(messageFactory, isoClient, stan);
				return null;
			});
		}
		List<Future<Void>> invokeAll = executorService.invokeAll(tasks);
		while (invokeAll.stream().filter(v -> !v.isDone()).findAny().isPresent()) {
			CommonUtil.sleep(1000);
		}
		isoClient.shutdown();
	}

	private static void sendRequest(MessageFactoryUtil messageFactory, IsoClient isoClient, AtomicInteger stan) {
		CustomIsoMessage requestMsg = messageFactory.newMessage(0x800);
		requestMsg.setField(11, new IsoValue<Integer>(IsoType.NUMERIC, stan.addAndGet(1), 6));
		requestMsg.setField(41, new IsoValue<String>(IsoType.ALPHA, "12345678", 8));
		requestMsg.setField(70, new IsoValue<String>(IsoType.ALPHA, "301", 3));
		try {
			CustomIsoMessage responseMsg = isoClient.sendAndReceive(requestMsg, 30000);
			if (responseMsg == null)
				throw new TimeoutException("Timeout for "+requestMsg.getMessageKey());
			
			if (!requestMsg.getMessageKey().equals(responseMsg.getMessageKey()))
				throw new Exception("Invalid Correlation");
			
		} catch (TimeoutException e) {
			System.out.println(e.getMessage());
		}
		catch (Exception e) {
			e.printStackTrace();
		}
	}
}

  

That's all about Creating ISO8583 Client using Netty Framework, you can access complete source code on my github repository here (https://github.com/didikhari/middleware-netty).

Comments