Sunday, 31 August 2014

Aws Kinesis Integration (sending messages to kinesis)

I will be sharing here details about integration of Aws Kinesis here. If you have bit time, I would recommend you to visit here


1. Create a properties file for Kinesis.

@Component
public class KinesisProp {

    @Value("${kinesis.access.key}")
    private String  awsAccessKey;
    @Value("${kinesis.secret.key}")
    private String  awsSecretKey;
    @Value("${kinesis.streamName}")
    private String streamName;
    @Value("${kinesis.receive.limit}")
    private Integer receiveLimit;
    @Value("${kinesis.shradCount}")
    private Integer shardCount;
    @Value("${kinesis.region}")
    private String region;
    @Value("${kinesis.steam.initialPosition}")
    private String streamInitialPosition;
    @Value("${kinesis.endPoint}")
    private String endPoint; 


    // getters and setters here

}

2. Create a service call, which will be responsible for sending messages to Kinesis.

@Component
public class KinesisService {
    private static final Logger logger = LoggerFactory.getLogger(KinesisService.class);
    @Autowired
    private KinesisProp kinesisProp;
    private AmazonKinesisClient kinesisClient;
 

    @PostConstruct
    private void init() {
        try {
            instantiateKinesisClient();
        } catch (Exception e) {
            logger.error("Exception occurred while instantiating the Kinesis Client", e);
        }
    }

    private void instantiateKinesisClient() throws IOException {
        BasicAWSCredentials  credentials = new BasicAWSCredentials(this.kinesisProp.getAwsAccessKey(), this.kinesisProp.getAwsSecretKey());
        kinesisClient = new AmazonKinesisClient(credentials);
        final Region apac_east = Region.getRegion(Regions.valueOf(kinesisProp.getRegion()));
        kinesisClient.setRegion(apac_east);
        createStream();
    }

    private void createStream() {
        final ListStreamsRequest listStreamsRequest = new ListStreamsRequest();
        ListStreamsResult listStreamsResult = kinesisClient.listStreams(listStreamsRequest);
        if (listStreamsResult == null || listStreamsResult.getStreamNames().isEmpty() ||
                (!listStreamsResult.getStreamNames().contains(kinesisProp.getStreamName()))) {

            CreateStreamRequest createStreamRequest = new CreateStreamRequest();
            createStreamRequest.setStreamName(kinesisProp.getStreamName());
            createStreamRequest.setShardCount(kinesisProp.getShardCount());

            try {
                kinesisClient.createStream(createStreamRequest);
            } catch (Exception e) {
                logger.error("Error occurred while creating the stream!", e);
            }
        }
    }

    public void sendMessage(final String message) throws AmazonServiceException {
       PutRecordRequest putRecordRequest = new PutRecordRequest();
 putRecordRequest.setStreamName(kinesisProp.getStreamName());
 putRecordRequest.setData(ByteBuffer.wrap(message.getBytes()));                                                                      putRecordRequest.setPartitionKey(Constants.AWS_KINESIS_PARTITION_KEY);
          kinesisClient.putRecord(putRecordRequest);
    }
 

}


3. Fetching the messages can be done in two ways.

(a) Using Aws Kinesis API
(b) Using Aws Kinesis Client Library.

I will update on this in my next blog.

Note:
(a) Spring is used for DI.
(b) When you are creating the access key and secret key, make sure to create for the kinesis app.

No comments:

Post a Comment