Creating ISO8583 Client using Spring Integration
Hi, in this post we are going to talk about Creating ISO8583 Client using Spring Integration. We are going to Create ISO8583 Client that support Multiplexing. With multiplexing support, we can send multiple request at the same time without blocking each other. All we need to do is choose message key to identify request message and its response. Usualy, bit 11 and bit 41 used to idenfity corresponding request-response message.
Client Configuration-
Messaging Gateway
Used as proxy between application layer and abstraction over the messaging API -
Message Channel
We are going to use Publish and Subscribe Channel, and have 2 handlers to subscribe this channel -
Bridge Channel
A simple MessageHandler implementation that passes the request Message directly to the output channel without modifying it -
Tcp Sending Message Handler
Tcp outbound channel adapter using a TcpConnection to send data -
Client Connection Factory
Outgoing connection factory used to send and receive message, with serializer and deserializer to get message from stream and put into the stream based on Specification -
Tcp Receiving Message Handler
Tcp inbound channel adapter using a TcpConnection to receive data -
Agregator
Matching request and response message based on correlation key -
Transformer
Transform response message from byte array into IsoMessage
Messaging Gateway: an Interface with send method with Gateway annotation.
@MessagingGateway
public interface Gateway {
@Gateway(requestChannel = "input",
payloadExpression = "#args[0]",
replyTimeoutExpression = "#args[1]")
CustomIsoMessage send(byte[] in, Long timeout);
}
Message Channel: Publish and Subscribe Channel, this will be subscribed by 2 message handlers. Note the bean name and requestChannel on the Gateway.
@Bean("input")
public PublishSubscribeChannel input() {
return new PublishSubscribeChannel();
}
Bridge Handler: Subscribe channel input and put the message directly into "toAggregator.client" message channel consumed by agregator.
@Bean
@ServiceActivator(inputChannel = "input")
public BridgeHandler bridge() {
BridgeHandler bridge = new BridgeHandler();
bridge.setOrder(1);
bridge.setOutputChannelName("toAggregator.client");
return bridge;
}
Tcp Sending Message Handler: Subscribe channel input and sending the message through Client Connection Factory
@Bean("outAdapter.client")
@ServiceActivator(inputChannel = "input")
public TcpSendingMessageHandler outAdapterClient(AbstractClientConnectionFactory client) {
TcpSendingMessageHandler outboundChannelAdapter = new TcpSendingMessageHandler();
outboundChannelAdapter.setOrder(2);
outboundChannelAdapter.setConnectionFactory(client);
return outboundChannelAdapter;
}
Client Connection Factory: Connection factory to establish outgoing connection. Set the serializer and deserializer to get/put the message from/into stream
@Bean("client")
public AbstractClientConnectionFactory client(@Value("${tcp.server.port}") int port,
ByteArrayLengthHeaderSerializer asciiSerializer) {
TcpNetClientConnectionFactory connectionFactory = new TcpNetClientConnectionFactory("localhost", port);
connectionFactory.setSingleUse(false);
connectionFactory.setSerializer(asciiSerializer);
connectionFactory.setDeserializer(asciiSerializer);
connectionFactory.setSoTimeout(180000);
return connectionFactory;
}
Tcp Receiving Message Handler: Receive incomming message through connection factory and put it into "toAggregator.client" message channel consumed by agregator.
@Bean
public TcpReceivingChannelAdapter inAdapterClient(AbstractClientConnectionFactory client) {
TcpReceivingChannelAdapter inboundChannelAdapter = new TcpReceivingChannelAdapter();
inboundChannelAdapter.setConnectionFactory(client);
inboundChannelAdapter.setOutputChannelName("toAggregator.client");
return inboundChannelAdapter;
}
Agregator: Hold the request message and waiting for response with same message key until timeout.
@Bean("toAggregator.client")
public MessageChannel toAggregatorClient() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "toAggregator.client")
public MessageHandler aggregator(MessageFactory<CustomIsoMessage> messageFactory) {
AggregatingMessageHandler aggregator = new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor());
aggregator.setOutputChannelName("toTransformer.client");
aggregator.setExpireGroupsUponCompletion(true);
aggregator.setExpireGroupsUponTimeout(true);
aggregator.setDiscardChannelName("noResponseChannel");
aggregator.setGroupTimeoutExpression(new ValueExpression<>(180000L));
aggregator.setCorrelationStrategy(new CorrelationStrategy() {
@Override
public Object getCorrelationKey(Message<?> message) {
Object payload = message.getPayload();
try {
CustomIsoMessage isoMsg = messageFactory.parseMessage(((byte[])payload), 0);
String terminalId = isoMsg.getObjectValue(41);
String stan = isoMsg.getObjectValue(11);
String remoteHost = String.valueOf(message.getHeaders().get("ip_address", String.class));
Integer remotePort = message.getHeaders().get("ip_tcp_remotePort", Integer.class);
remoteHost = StringUtils.defaultIfBlank(remoteHost, message.getHeaders().get("host", String.class));
remotePort = remotePort != null ? remotePort : message.getHeaders().get("port", Integer.class);
boolean incomming = !message.getHeaders().containsKey("replyChannel");
String msgLog = isoMsg.dumpField(incomming, terminalId+stan, remoteHost, remotePort,
Long.valueOf(message.getHeaders().get("timestamp").toString()));
log.info(msgLog);
return terminalId+stan;
} catch (Exception e) {
log.error("aggregator error", e);
}
return payload;
}
});
aggregator.setReleaseStrategy(new ReleaseStrategy() {
@Override
public boolean canRelease(MessageGroup group) {
return group.getMessages().size() == 2;
}
});
return aggregator;
}
Transformer: Convert response message from byte array into IsoMessage class, result from this transformer will be returned on Gateway method.
@Transformer(inputChannel="toTransformer.client")
public CustomIsoMessage convert(@Payload GenericMessage<List<byte[]>> messages) {
try {
return messageFactory.parseMessage(messages.getPayload().get(1), 0);
} catch (Exception e) {
log.error("ERROR WHEN TRANSFORM MESSAGE [toTransformer.client]", e);
}
return null;
}
For sending the message all we need to do is calling gateway.send methods.
@Autowired
Gateway gateway;
@Test
public void given_validRequest_when_sendingEcho_then_returnRC00() throws Exception {
IsoMessage msg = createEchoMsg(TERMINAL_CODE, MAC_ADDRESS, TERMINAL_VERSION);
CustomIsoMessage responseMsg = gateway.send(msg.writeData(), 10000l);
assertNotNull(responseMsg);
assertEquals("00", responseMsg.getObjectValue(39));
}
Thats all about Creating ISO8583 Client using Spring Integration. hope this help.
Comments