AWS S3
Sending Location Engine SDK Events to S3 via Kinesis Firehose
Amazon Kinesis Data Firehose is a fully managed service for delivering real-time streaming data to destinations such as Amazon Simple Storage Service (S3). For existing and new AWS users, pairing Kinesis with Location Engine allows for easy storage and analysis of Location Engine events. For example, an app can stream circumstance met events to an S3 bucket for both real-time and batch campaign analysis. The steps below illustrate the simplicity of integrating the two services.
Prerequisites
This document assumes an understanding of AWS services and that you have set up a Kinesis delivery stream with an S3 destination. For background please review:
- What Is Amazon Kinesis Data Firehose?
- Everything You Ever Wanted to Know about Amazon Kinesis Firehose
Jump to platform-specific instructions
Android
Before following the steps below please review:
1: Include the AWS SDK
dependencies {
// Existing dependencies
....
// Engine Library
implementation 'com.factual:engine-sdk:[ENGINE_VERSION]'
// AWS Libraries
implementation 'com.amazonaws:aws-android-sdk-core:2.12.+'
implementation 'com.amazonaws:aws-android-sdk-kinesis:2.12.+'
}
2: Create a Firehose Recorder
An instance of the KinesisFirehoseRecorder is the only entry point needed to add data. Note that the below code is provided as an example of setting up a Firehose Recorder and is not intended for production use
package com.factual.enginekinesisexample;
import android.content.Context;
import android.os.AsyncTask;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.mobileconnectors.kinesis.kinesisrecorder.KinesisFirehoseRecorder;
import com.amazonaws.regions.Regions;
import java.io.File;
public final class KinesisClient {
private static volatile KinesisClient instance;
private KinesisFirehoseRecorder firehoseRecorder;
private KinesisClient(){}
private KinesisClient(Context context) {
File cacheDirectory = [YOUR_CACHE_DIRECTORY];
Regions region = [YOUR_REGION];
AWSCredentialsProvider provider = [YOUR_CREDENTIAL_PROVIDER];
this.firehoseRecorder = new KinesisFirehoseRecorder(
cacheDirectory,
region,
provider);
}
public static KinesisClient getInstance(Context context) {
if (instance == null) {
synchronized (KinesisClient.class) {
if (instance == null) instance = new KinesisClient(context);
}
}
return instance;
}
public void saveKinesisRecord(String record) {
firehoseRecorder.saveRecord(record, firehoseStreamName);
submitKinesisRecords();
}
public void submitKinesisRecords() {
new AsyncTask<Void, Void, Void>() {
@Override
protected Void doInBackground(Void... voids) {
firehoseRecorder.submitAllRecords();
return null;
}
}.executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR);
}
}
3: Call the Firehose Recorder when a circumstance is met
In the below example a JSON representation of the CircumstanceResponse
is saved for each circumstance met. After all responses have been recorded they are flushed to AWS by using the submitAllRecords
method provided by the KinesisFirehoseRecorder
package com.factual.enginekinesisexample;
import android.util.Log;
import com.factual.engine.api.CircumstanceResponse;
import com.factual.engine.api.FactualClientReceiver;
import org.json.JSONException;
import java.util.List;
public class KinesisFactualClientReceiver extends FactualClientReceiver {
@Override
public void onCircumstancesMet(List<CircumstanceResponse> circumstances) {
KinesisClient kinesisClient = KinesisClient.getInstance(getContext());
for(CircumstanceResponse circumstanceResponse : circumstances) {
try {
String circumstanceMetEntry = circumstanceResponse.toJson().toString() + "\n";
kinesisClient.saveKinesisRecord(circumstanceMetEntry);
} catch (JSONException e) {
Log.e("engine", e.getMessage());
}
}
kinesisClient.submitKinesisRecords();
}
}
4: Verify your data
The configured S3 bucket should now contain circumstance events. Note that there may be some delay in delivery based on your Kinesis "buffer conditions". The current Kinesis AWS buffer default is 5 MB or 300 seconds.
iOS
Before following the steps below please review:
1: Include the AWS SDK
Install the FactualEngineSDK
as well as the AWSCore
and AWSKinesis
SDKs via Cocoapods
source 'https://github.com/Factual/cocoapods.git'
source 'https://github.com/CocoaPods/Specs.git'
target 'YourAppTarget' do
pod 'FactualEngineSDK'
pod 'AWSCore'
pod 'AWSKinesis'
end
2: Create a Firehose Recorder
An instance of the KinesisFirehoseRecorder is the only entry point needed to add data. First, import the Engine header and implement the FactualEngineDelegate
interface.
#import <UIKit/UIKit.h>
#import "FactualEngine.h"
@interface AppDelegate : UIResponder <UIApplicationDelegate,
FactualEngineDelegate>
@property (strong, nonatomic) UIWindow *window;
@end
Next instantiate the AWS SDK with proper credentials. In the below example AWS setup occurs in the engineDidStartWithInstance
callback.
#import "AppDelegate.h"
#import <AWSKinesis/AWSKinesis.h>
@interface AppDelegate ()
@property AWSFirehoseRecorder *firehoseRecorder;
@end
@implementation AppDelegate
- (BOOL)application:(UIApplication *)application didFinishLaunchingWithOptions:(NSDictionary *)launchOptions {
// See https://developer.factual.com/docs/engine-sdk-ios-guide for best practices
}
- (void)engineDidStartWithInstance:(FactualEngine *)engine {
// Connect to AWS Services
AWSRegionType region = [YOUR_REGION];
AWSCredentialsProvider *credentialsProvider = [YOUR_CREDENTIAL_PROVIDER]
AWSServiceConfiguration *configuration = [[AWSServiceConfiguration alloc]
initWithRegion:region credentialsProvider:credentialsProvider];
[AWSServiceManager defaultServiceManager].defaultServiceConfiguration = configuration;
// Create FirehoseRecorder
_firehoseRecorder = [AWSFirehoseRecorder defaultFirehoseRecorder];
}
- (void)engineDidStop{
}
- (void)engineDidFailWithError:(FactualError *)error{
}
- (void)engineDidReportInfo:(NSString *)infoMessage{
}
- (void)engineDidSyncWithGarage{
}
- (void)engineDidLoadConfig:(FactualConfigMetadata *)data{
}
- (void)engineDidReportDiagnosticMessage:(nonnull NSString *)diagnosticMessage {
}
- (void)circumstancesMet:(NSArray<CircumstanceResponse *> *)circumstanceResponses{
}
@end
3: Call the Firehose Recorder when a circumstance is met
In the circumstancesMet
callback a JSON representation of the CircumstanceResponse
is saved for each circumstance met. After responses have been recorded they are flushed to AWS by using the submitAllRecords
method provided by the KinesisFirehoseRecorder
- (void)circumstancesMet:(NSArray<CircumstanceResponse *> *)circumstanceResponses{
for (CircumstanceResponse *circumstanceResponse in circumstanceResponses){
NSError *error;
NSData *jsonData = [NSJSONSerialization dataWithJSONObject:[circumstanceResponse toDict]
options:0
error:&error];
[_firehoseRecorder saveRecord:jsonData streamName:[YOUR_STREAM]]; // <----- RECORD
}
[_firehoseRecorder submitAllRecords]; // <----- FLUSH
}
Both
saveRecord
andsubmitAllRecords
are asynchronous operations. Ensure thesaveRecord
call is complete before you invokesubmitAllRecords
. Please read the AWS aws-iOS Developer Guide for best practices
4: Verify your data
The configured S3 bucket should now contain circumstance events. Note that there may be some delay in delivery based on your Kinesis "buffer conditions". The current Kinesis AWS buffer default is 5 MB or 300 seconds.
Updated 4 months ago