I will be sharing here details about integration of Aws Kinesis here. If you have bit time, I would recommend you to visit here
1. Create a properties file for Kinesis.
@Component
public class KinesisProp {
@Value("${kinesis.access.key}")
private String awsAccessKey;
@Value("${kinesis.secret.key}")
private String awsSecretKey;
@Value("${kinesis.streamName}")
private String streamName;
@Value("${kinesis.receive.limit}")
private Integer receiveLimit;
@Value("${kinesis.shradCount}")
private Integer shardCount;
@Value("${kinesis.region}")
private String region;
@Value("${kinesis.steam.initialPosition}")
private String streamInitialPosition;
@Value("${kinesis.endPoint}")
private String endPoint;
// getters and setters here
}
2. Create a service call, which will be responsible for sending messages to Kinesis.
@Component
public class KinesisService {
private static final Logger logger = LoggerFactory.getLogger(KinesisService.class);
@Autowired
private KinesisProp kinesisProp;
private AmazonKinesisClient kinesisClient;
@PostConstruct
private void init() {
try {
instantiateKinesisClient();
} catch (Exception e) {
logger.error("Exception occurred while instantiating the Kinesis Client", e);
}
}
private void instantiateKinesisClient() throws IOException {
BasicAWSCredentials credentials = new BasicAWSCredentials(this.kinesisProp.getAwsAccessKey(), this.kinesisProp.getAwsSecretKey());
kinesisClient = new AmazonKinesisClient(credentials);
final Region apac_east = Region.getRegion(Regions.valueOf(kinesisProp.getRegion()));
kinesisClient.setRegion(apac_east);
createStream();
}
private void createStream() {
final ListStreamsRequest listStreamsRequest = new ListStreamsRequest();
ListStreamsResult listStreamsResult = kinesisClient.listStreams(listStreamsRequest);
if (listStreamsResult == null || listStreamsResult.getStreamNames().isEmpty() ||
(!listStreamsResult.getStreamNames().contains(kinesisProp.getStreamName()))) {
CreateStreamRequest createStreamRequest = new CreateStreamRequest();
createStreamRequest.setStreamName(kinesisProp.getStreamName());
createStreamRequest.setShardCount(kinesisProp.getShardCount());
try {
kinesisClient.createStream(createStreamRequest);
} catch (Exception e) {
logger.error("Error occurred while creating the stream!", e);
}
}
}
public void sendMessage(final String message) throws AmazonServiceException {
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setStreamName(kinesisProp.getStreamName());
putRecordRequest.setData(ByteBuffer.wrap(message.getBytes())); putRecordRequest.setPartitionKey(Constants.AWS_KINESIS_PARTITION_KEY);
kinesisClient.putRecord(putRecordRequest);
}
}
3. Fetching the messages can be done in two ways.
(a) Using Aws Kinesis API
(b) Using Aws Kinesis Client Library.
I will update on this in my next blog.
Note:
(a) Spring is used for DI.
(b) When you are creating the access key and secret key, make sure to create for the kinesis app.
1. Create a properties file for Kinesis.
@Component
public class KinesisProp {
@Value("${kinesis.access.key}")
private String awsAccessKey;
@Value("${kinesis.secret.key}")
private String awsSecretKey;
@Value("${kinesis.streamName}")
private String streamName;
@Value("${kinesis.receive.limit}")
private Integer receiveLimit;
@Value("${kinesis.shradCount}")
private Integer shardCount;
@Value("${kinesis.region}")
private String region;
@Value("${kinesis.steam.initialPosition}")
private String streamInitialPosition;
@Value("${kinesis.endPoint}")
private String endPoint;
// getters and setters here
}
2. Create a service call, which will be responsible for sending messages to Kinesis.
@Component
public class KinesisService {
private static final Logger logger = LoggerFactory.getLogger(KinesisService.class);
@Autowired
private KinesisProp kinesisProp;
private AmazonKinesisClient kinesisClient;
@PostConstruct
private void init() {
try {
instantiateKinesisClient();
} catch (Exception e) {
logger.error("Exception occurred while instantiating the Kinesis Client", e);
}
}
private void instantiateKinesisClient() throws IOException {
BasicAWSCredentials credentials = new BasicAWSCredentials(this.kinesisProp.getAwsAccessKey(), this.kinesisProp.getAwsSecretKey());
kinesisClient = new AmazonKinesisClient(credentials);
final Region apac_east = Region.getRegion(Regions.valueOf(kinesisProp.getRegion()));
kinesisClient.setRegion(apac_east);
createStream();
}
private void createStream() {
final ListStreamsRequest listStreamsRequest = new ListStreamsRequest();
ListStreamsResult listStreamsResult = kinesisClient.listStreams(listStreamsRequest);
if (listStreamsResult == null || listStreamsResult.getStreamNames().isEmpty() ||
(!listStreamsResult.getStreamNames().contains(kinesisProp.getStreamName()))) {
CreateStreamRequest createStreamRequest = new CreateStreamRequest();
createStreamRequest.setStreamName(kinesisProp.getStreamName());
createStreamRequest.setShardCount(kinesisProp.getShardCount());
try {
kinesisClient.createStream(createStreamRequest);
} catch (Exception e) {
logger.error("Error occurred while creating the stream!", e);
}
}
}
public void sendMessage(final String message) throws AmazonServiceException {
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setStreamName(kinesisProp.getStreamName());
putRecordRequest.setData(ByteBuffer.wrap(message.getBytes())); putRecordRequest.setPartitionKey(Constants.AWS_KINESIS_PARTITION_KEY);
kinesisClient.putRecord(putRecordRequest);
}
}
3. Fetching the messages can be done in two ways.
(a) Using Aws Kinesis API
(b) Using Aws Kinesis Client Library.
I will update on this in my next blog.
Note:
(a) Spring is used for DI.
(b) When you are creating the access key and secret key, make sure to create for the kinesis app.
No comments:
Post a Comment