AWS S3

Sending Location Engine SDK Events to S3 via Kinesis Firehose

Amazon Kinesis Data Firehose is a fully managed service for delivering real-time streaming data to destinations such as Amazon Simple Storage Service (S3). For existing and new AWS users, pairing Kinesis with Location Engine allows for easy storage and analysis of Location Engine events. For example, an app can stream circumstance met events to an S3 bucket for both real-time and batch campaign analysis. The steps below illustrate the simplicity of integrating the two services.

2059

Prerequisites

This document assumes an understanding of AWS services and that you have set up a Kinesis delivery stream with an S3 destination. For background please review:

🚧

Jump to platform-specific instructions


Android

Before following the steps below please review:

1: Include the AWS SDK

dependencies {
    // Existing dependencies 
    ....

    // Engine Library
    implementation 'com.factual:engine-sdk:[ENGINE_VERSION]'

    // AWS Libraries
    implementation 'com.amazonaws:aws-android-sdk-core:2.12.+'
    implementation 'com.amazonaws:aws-android-sdk-kinesis:2.12.+'
}

2: Create a Firehose Recorder

An instance of the KinesisFirehoseRecorder is the only entry point needed to add data. Note that the below code is provided as an example of setting up a Firehose Recorder and is not intended for production use

package com.factual.enginekinesisexample;

import android.content.Context;
import android.os.AsyncTask;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.mobileconnectors.kinesis.kinesisrecorder.KinesisFirehoseRecorder;
import com.amazonaws.regions.Regions;
import java.io.File;

public final class KinesisClient {
  private static volatile KinesisClient instance;
  private KinesisFirehoseRecorder firehoseRecorder;

  private KinesisClient(){}

  private KinesisClient(Context context) {
    File cacheDirectory = [YOUR_CACHE_DIRECTORY];
    Regions region = [YOUR_REGION];
    AWSCredentialsProvider provider = [YOUR_CREDENTIAL_PROVIDER]; 
    this.firehoseRecorder = new KinesisFirehoseRecorder(
        cacheDirectory,
        region,
        provider);
  }

  public static KinesisClient getInstance(Context context) {
    if (instance == null) {
      synchronized (KinesisClient.class) {
        if (instance == null) instance = new KinesisClient(context);
      }
    }
    return instance;
  }

  public void saveKinesisRecord(String record) {
    firehoseRecorder.saveRecord(record, firehoseStreamName);
    submitKinesisRecords();
  }

  public void submitKinesisRecords() {
    new AsyncTask<Void, Void, Void>() {
      @Override
      protected Void doInBackground(Void... voids) {
        firehoseRecorder.submitAllRecords();
        return null;
      }
    }.executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR);
  }

}

3: Call the Firehose Recorder when a circumstance is met

In the below example a JSON representation of the CircumstanceResponse is saved for each circumstance met. After all responses have been recorded they are flushed to AWS by using the submitAllRecords method provided by the KinesisFirehoseRecorder

package com.factual.enginekinesisexample;

import android.util.Log;
import com.factual.engine.api.CircumstanceResponse;
import com.factual.engine.api.FactualClientReceiver;
import org.json.JSONException;
import java.util.List;

public class KinesisFactualClientReceiver extends FactualClientReceiver {

  @Override
  public void onCircumstancesMet(List<CircumstanceResponse> circumstances) {
    KinesisClient kinesisClient = KinesisClient.getInstance(getContext());

    for(CircumstanceResponse circumstanceResponse : circumstances) {
      try {
        String circumstanceMetEntry = circumstanceResponse.toJson().toString() + "\n";
        kinesisClient.saveKinesisRecord(circumstanceMetEntry);
      } catch (JSONException e) {
        Log.e("engine", e.getMessage());
      }
    }
    kinesisClient.submitKinesisRecords();
  }
}

4: Verify your data

The configured S3 bucket should now contain circumstance events. Note that there may be some delay in delivery based on your Kinesis "buffer conditions". The current Kinesis AWS buffer default is 5 MB or 300 seconds.


iOS

Before following the steps below please review:

1: Include the AWS SDK

Install the FactualEngineSDK as well as the AWSCore and AWSKinesis SDKs via Cocoapods

source 'https://github.com/Factual/cocoapods.git'
source 'https://github.com/CocoaPods/Specs.git'

target 'YourAppTarget' do
  pod 'FactualEngineSDK'
  pod 'AWSCore' 
  pod 'AWSKinesis'
end

2: Create a Firehose Recorder

An instance of the KinesisFirehoseRecorder is the only entry point needed to add data. First, import the Engine header and implement the FactualEngineDelegate interface.

#import <UIKit/UIKit.h>
#import "FactualEngine.h"

@interface AppDelegate : UIResponder <UIApplicationDelegate,
                                      FactualEngineDelegate> 

@property (strong, nonatomic) UIWindow *window;

@end

Next instantiate the AWS SDK with proper credentials. In the below example AWS setup occurs in the engineDidStartWithInstance callback.

#import "AppDelegate.h"
#import <AWSKinesis/AWSKinesis.h>

@interface AppDelegate ()

@property AWSFirehoseRecorder *firehoseRecorder;

@end

@implementation AppDelegate

- (BOOL)application:(UIApplication *)application didFinishLaunchingWithOptions:(NSDictionary *)launchOptions {
    // See https://developer.factual.com/docs/engine-sdk-ios-guide for best practices 
}

- (void)engineDidStartWithInstance:(FactualEngine *)engine {
    
    // Connect to AWS Services
    AWSRegionType region = [YOUR_REGION];
    AWSCredentialsProvider *credentialsProvider = [YOUR_CREDENTIAL_PROVIDER]
    AWSServiceConfiguration *configuration = [[AWSServiceConfiguration alloc]
                                              initWithRegion:region credentialsProvider:credentialsProvider];
    [AWSServiceManager defaultServiceManager].defaultServiceConfiguration = configuration;

    // Create FirehoseRecorder
    _firehoseRecorder = [AWSFirehoseRecorder defaultFirehoseRecorder];
}

- (void)engineDidStop{
}

- (void)engineDidFailWithError:(FactualError *)error{
}

- (void)engineDidReportInfo:(NSString *)infoMessage{
}

- (void)engineDidSyncWithGarage{
}

- (void)engineDidLoadConfig:(FactualConfigMetadata *)data{
}

- (void)engineDidReportDiagnosticMessage:(nonnull NSString *)diagnosticMessage {
}
            
- (void)circumstancesMet:(NSArray<CircumstanceResponse *> *)circumstanceResponses{
}

@end

3: Call the Firehose Recorder when a circumstance is met

In the circumstancesMet callback a JSON representation of the CircumstanceResponse is saved for each circumstance met. After responses have been recorded they are flushed to AWS by using the submitAllRecords method provided by the KinesisFirehoseRecorder

- (void)circumstancesMet:(NSArray<CircumstanceResponse *> *)circumstanceResponses{
    for (CircumstanceResponse *circumstanceResponse in circumstanceResponses){
        NSError *error;
        NSData *jsonData = [NSJSONSerialization dataWithJSONObject:[circumstanceResponse toDict]
                                                                    options:0
                                                                    error:&error];
        [_firehoseRecorder saveRecord:jsonData streamName:[YOUR_STREAM]]; // <----- RECORD
    }
    
    [_firehoseRecorder submitAllRecords]; // <----- FLUSH
}

🚧

Both saveRecord and submitAllRecords are asynchronous operations. Ensure the saveRecord call is complete before you invoke submitAllRecords. Please read the AWS aws-iOS Developer Guide for best practices


4: Verify your data

The configured S3 bucket should now contain circumstance events. Note that there may be some delay in delivery based on your Kinesis "buffer conditions". The current Kinesis AWS buffer default is 5 MB or 300 seconds.