initial implementation
This commit is contained in:
parent
e02160e3a2
commit
722b0289b5
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
|
@ -0,0 +1,2 @@
|
||||||
|
ddb.db
|
||||||
|
ddbbolt
|
15
README.md
Normal file
15
README.md
Normal file
|
@ -0,0 +1,15 @@
|
||||||
|
# DynamoDB Bolt
|
||||||
|
|
||||||
|
This project presents itself as [Amazon DynamoDB](https://aws.amazon.com/dynamodb/),
|
||||||
|
but uses [Bolt](https://github.com/boltdb/bolt) for data storage. It currently
|
||||||
|
only supports a handful of operations, and even then not with full fidelity:
|
||||||
|
|
||||||
|
* CreateTable
|
||||||
|
* BatchGetItem
|
||||||
|
* BatchWriteItem
|
||||||
|
|
||||||
|
UpdateItem, PutItem and GetItem should be trivial to implement.
|
||||||
|
|
||||||
|
It's designed for those times you want [DynamoDB Local](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DynamoDBLocal.html),
|
||||||
|
but don't want a full Java VM, etc. On small data sets, this static executable
|
||||||
|
executable will use <10MB of resident memory.
|
626
ddbbolt.go
Normal file
626
ddbbolt.go
Normal file
|
@ -0,0 +1,626 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"golang.org/x/crypto/sha3"
|
||||||
|
"encoding/json"
|
||||||
|
"encoding/gob"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"github.com/boltdb/bolt"
|
||||||
|
)
|
||||||
|
|
||||||
|
//////////////////////////////////////////////////////////////////////
|
||||||
|
// BatchWriteItem Structs
|
||||||
|
//////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
type batchWriteItemRequest struct {
|
||||||
|
tables map[string]batchWriteItemTable
|
||||||
|
// TODO: Support ReturnConsumedCapacity
|
||||||
|
// TODO: Support ReturnItemCollectionMetrics
|
||||||
|
}
|
||||||
|
|
||||||
|
type batchWriteItemTable struct {
|
||||||
|
tableName string
|
||||||
|
requests []writeRequest
|
||||||
|
}
|
||||||
|
|
||||||
|
type writeRequest struct {
|
||||||
|
deleteRequest []attributeInfo // Key
|
||||||
|
putRequest []attributeInfo // Item
|
||||||
|
}
|
||||||
|
|
||||||
|
//////////////////////////////////////////////////////////////////////
|
||||||
|
// BatchGetItem Structs
|
||||||
|
//////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
// {"RequestItems": {"etags": {"Keys": [{"PK": {"S": "my key"}}]}}}
|
||||||
|
|
||||||
|
type batchGetItemRequest struct {
|
||||||
|
tables map[string]batchGetItemTable
|
||||||
|
// TODO: Support ReturnConsumedCapacity
|
||||||
|
}
|
||||||
|
|
||||||
|
type batchGetItemTable struct {
|
||||||
|
tableName string
|
||||||
|
// TODO: Support AttributesToGet
|
||||||
|
// TODO: Support ConsistentRead
|
||||||
|
// TODO: Support ExpressionAttributeNames
|
||||||
|
keys [][]attributeInfo
|
||||||
|
// TODO: Support ProjectionExpression
|
||||||
|
}
|
||||||
|
|
||||||
|
//////////////////////////////////////////////////////////////////////
|
||||||
|
// Common structs
|
||||||
|
//////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
type attributeInfo struct {
|
||||||
|
AttributeName string
|
||||||
|
AttributeType string
|
||||||
|
AttributeValue interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
type httpErrorStruct struct {
|
||||||
|
msg string
|
||||||
|
statusCode int
|
||||||
|
}
|
||||||
|
|
||||||
|
// newHttpError is used for when we need to return a specific error
|
||||||
|
// back to the customer. Examples are the table not found, which returns
|
||||||
|
// a 400 status code along with a JSON body to describe the table(s) that
|
||||||
|
// were missing
|
||||||
|
func newHttpError(msg string, statusCode int) error {
|
||||||
|
return &httpErrorStruct {
|
||||||
|
msg: msg,
|
||||||
|
statusCode: statusCode,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (errorStruct *httpErrorStruct) Error() string {
|
||||||
|
return errorStruct.msg
|
||||||
|
}
|
||||||
|
|
||||||
|
func (errorStruct *httpErrorStruct) StatusCode() int {
|
||||||
|
return errorStruct.statusCode
|
||||||
|
}
|
||||||
|
|
||||||
|
//////////////////////////////////////////////////////////////////////
|
||||||
|
// CreateTable Structs
|
||||||
|
//////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
type createTableRequest struct {
|
||||||
|
TableName string
|
||||||
|
KeySchema []keySchemaInfo
|
||||||
|
AttributeDefinitions map[string]string
|
||||||
|
}
|
||||||
|
|
||||||
|
type keySchemaInfo struct {
|
||||||
|
AttributeName string
|
||||||
|
KeyType string
|
||||||
|
}
|
||||||
|
|
||||||
|
//////////////////////////////////////////////////////////////////////
|
||||||
|
// Common functions
|
||||||
|
//////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
// getCompositeKey creates a hash from the key information that can be used
|
||||||
|
// to read/write from the bucket (DDB calls this a table). Keys in DDB
|
||||||
|
// can be composed of multiple attributes, so we leverage the metadata captured
|
||||||
|
// in a CreateTable call to lookup the KeySchema, find all the attributes, and
|
||||||
|
// concatenate the hashes of each attribute value. There are limitations to
|
||||||
|
// the implementation below. First, we're only currently supporting string
|
||||||
|
// values. This is probably not that big of a deal to expand. Secondly, we
|
||||||
|
// only support a single hash implementation, hardcoded to ShakeSum256. Third,
|
||||||
|
// we only support HASH keys - if a range comes along, this will return an
|
||||||
|
// error. Lastly, there's no coverage for the case where a client fails to
|
||||||
|
// send in the whole key. In this case, a hash will be generated for only
|
||||||
|
// part of the key, and likely no data will be found (read) or an orphan will
|
||||||
|
// be created (write)
|
||||||
|
func getCompositeKey(tableName string, db *bolt.DB, key []attributeInfo) ([]byte, error){
|
||||||
|
var rc []byte
|
||||||
|
var metadata createTableRequest
|
||||||
|
err := db.View(func(tx *bolt.Tx) error {
|
||||||
|
b := tx.Bucket([]byte("_m"))
|
||||||
|
if b == nil {
|
||||||
|
log.Printf("Metadata bucket not found!\n")
|
||||||
|
return errors.New("Metadata bucket '_m' not found")
|
||||||
|
}
|
||||||
|
v := b.Get([]byte(tableName))
|
||||||
|
if v == nil {
|
||||||
|
msg := fmt.Sprintf("No metadata found for table '%s'\n")
|
||||||
|
log.Printf(msg)
|
||||||
|
return errors.New(msg)
|
||||||
|
}
|
||||||
|
buf := bytes.NewReader(v)
|
||||||
|
dec := gob.NewDecoder(buf)
|
||||||
|
err := dec.Decode(&metadata)
|
||||||
|
if err != nil { log.Println(`failed gob Decode`, err); return err }
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Println("Error getting metadata: %s", err)
|
||||||
|
return rc, err
|
||||||
|
}
|
||||||
|
if len(metadata.KeySchema) == 0 {
|
||||||
|
return rc, errors.New("Error getting metadata: key schema length 0")
|
||||||
|
}
|
||||||
|
rc = make([]byte, 512 / 8 * len(metadata.KeySchema))
|
||||||
|
for i, keypart := range metadata.KeySchema {
|
||||||
|
if keypart.KeyType != "HASH" {
|
||||||
|
return rc, errors.New("Unsupported key type " + keypart.KeyType + " for attribute " + keypart.AttributeName)
|
||||||
|
}
|
||||||
|
// TODO: Determine how we could support (if we want to support) other hash types
|
||||||
|
// We're simply going to stitch together hashes of these attributes
|
||||||
|
for _, attribute := range key {
|
||||||
|
if attribute.AttributeName == keypart.AttributeName {
|
||||||
|
var buf []byte
|
||||||
|
switch attribute.AttributeType {
|
||||||
|
case "S":
|
||||||
|
buf = []byte(attribute.AttributeValue.(string))
|
||||||
|
default:
|
||||||
|
return rc, errors.New("Unsupported attribute type " + attribute.AttributeType + " for attribute " + keypart.AttributeName)
|
||||||
|
}
|
||||||
|
h := make([]byte, 64)
|
||||||
|
sha3.ShakeSum256(h, buf)
|
||||||
|
copy(rc[(i * 64):], h)
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return rc, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// parseAttributes is useful for anything that is a set of objects of the form
|
||||||
|
// {
|
||||||
|
// "attribute1": { "S": "foo" },
|
||||||
|
// "attribute2": { "S": "bar" }
|
||||||
|
// }
|
||||||
|
// This pattern occurs all over the place in DDB. In the above example,
|
||||||
|
// "attribute1" is the attribute name, "S" is the type, and "foo" is the value
|
||||||
|
func parseAttributes(attributes map[string]interface{}) []attributeInfo {
|
||||||
|
rc := make([]attributeInfo, len(attributes))
|
||||||
|
jnx := 0
|
||||||
|
|
||||||
|
for attributename, attributeValueInt := range attributes {
|
||||||
|
// { PK: {"S": "url"}}
|
||||||
|
var attributetype string
|
||||||
|
var attributevalue interface{}
|
||||||
|
for attributetype, attributevalue = range attributeValueInt.(map[string]interface{}) {
|
||||||
|
// {"S": "url"}
|
||||||
|
// Should only be one value - if it's more, do we care?
|
||||||
|
}
|
||||||
|
rc[jnx] = attributeInfo{
|
||||||
|
AttributeName: attributename,
|
||||||
|
AttributeType: attributetype,
|
||||||
|
AttributeValue: attributevalue,
|
||||||
|
}
|
||||||
|
jnx++
|
||||||
|
}
|
||||||
|
return rc
|
||||||
|
}
|
||||||
|
|
||||||
|
//////////////////////////////////////////////////////////////////////
|
||||||
|
// BatchGetItem implementation
|
||||||
|
//////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
// We don't use the built-in parse-to-struct mechanism as the REST API
|
||||||
|
// structures aren't really aligned with what go needs here. We also don't
|
||||||
|
// need quite the complexity that DDB is using, so we make a few shortcuts
|
||||||
|
func parseBatchGetItem(event map[string]interface{}) (req batchGetItemRequest, err error) {
|
||||||
|
defer func() {
|
||||||
|
// recover from panic if one occured. Set err to nil otherwise.
|
||||||
|
if paniced := recover(); paniced != nil {
|
||||||
|
log.Printf("Parse error: %v", paniced)
|
||||||
|
err = errors.New("Error parsing request for batchGetItemRequest")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
req = batchGetItemRequest {
|
||||||
|
tables: map[string]batchGetItemTable{},
|
||||||
|
}
|
||||||
|
for table, requestInt := range event["RequestItems"].(map[string]interface{}) {
|
||||||
|
request := requestInt.(map[string]interface{})
|
||||||
|
keys := request["Keys"].([]interface{})
|
||||||
|
req.tables[table] = batchGetItemTable{
|
||||||
|
tableName: table,
|
||||||
|
keys: make([][]attributeInfo, len(keys)),
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Support AttributesToGet
|
||||||
|
// TODO: Support ConsistentRead
|
||||||
|
// TODO: Support ExpressionAttributeNames
|
||||||
|
for inx, keyInt := range keys {
|
||||||
|
// [{ "PK": {"S": "url"}}, {"PK": {"S": "url2"}}]
|
||||||
|
attributes := keyInt.(map[string]interface{})
|
||||||
|
req.tables[table].keys[inx] = parseAttributes(attributes)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Support ProjectionExpression
|
||||||
|
}
|
||||||
|
// TODO: Support ReturnConsumedCapacity
|
||||||
|
return req, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// batchGetItem does the heavy lifting. We'll get parseBatchGetItem to parse
|
||||||
|
// out the request, then loop through tables to get data. For each key
|
||||||
|
// (collection of attributes), we'll get the compositeKey hash, then go to
|
||||||
|
// bolt to find the data. If there is data in the DB, we use gob to decode it.
|
||||||
|
// Gob will provide us with an []attributeInfo, so we need to coerce that
|
||||||
|
// structure into what the response should look like, then json.Marshal that
|
||||||
|
// to the correct string. From there, it's basically a bunch of error
|
||||||
|
// handling and string concatenation.
|
||||||
|
func batchGetItem(event map[string]interface{}, db *bolt.DB) (string, error) {
|
||||||
|
req, err := parseBatchGetItem(event)
|
||||||
|
if err != nil {
|
||||||
|
log.Println("Error parsing")
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
consumedCapacity := ""
|
||||||
|
response := ""
|
||||||
|
prefix := ""
|
||||||
|
for _, table := range req.tables {
|
||||||
|
log.Printf("got request for table %s\n", table.tableName)
|
||||||
|
keyResponses := ""
|
||||||
|
keyPrefix := ""
|
||||||
|
err = db.View(func(tx *bolt.Tx) error {
|
||||||
|
b := tx.Bucket([]byte(table.tableName))
|
||||||
|
if b == nil {
|
||||||
|
log.Printf("Table '%s' does not exist", table.tableName)
|
||||||
|
msg := fmt.Sprintf("{\"__type\":\"com.amazonaws.dynamodb.v20120810#ResourceNotFoundException\",\"message\":\"Requested resource not found: Table: %s not found\"}", table.tableName)
|
||||||
|
return newHttpError(msg, 400)
|
||||||
|
}
|
||||||
|
for _, key := range table.keys {
|
||||||
|
compositeKey, err := getCompositeKey(table.tableName, db, key)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Error getting composite key: %s", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
v := b.Get(compositeKey)
|
||||||
|
if v != nil {
|
||||||
|
buf := bytes.NewReader(v)
|
||||||
|
dec := gob.NewDecoder(buf)
|
||||||
|
var attributes []attributeInfo
|
||||||
|
err = dec.Decode(&attributes)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Error decoding value for key: %x", compositeKey)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
respObj := map[string]interface{}{}
|
||||||
|
for _, attribute := range attributes {
|
||||||
|
typedVal := map[string]interface{}{}
|
||||||
|
typedVal[attribute.AttributeType] = attribute.AttributeValue
|
||||||
|
respObj[attribute.AttributeName] = typedVal
|
||||||
|
}
|
||||||
|
respStr, err := json.Marshal(respObj)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Error marshalling value to json for key: %x", compositeKey)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// This is only useful for debugging and we probably don't
|
||||||
|
// want all our values showing up in logs
|
||||||
|
// log.Printf("Got value '%s' for key '%s' with hash '%x'", respStr, key, compositeKey)
|
||||||
|
// We're expecting the value to be a json doc, so this becomes simple
|
||||||
|
keyResponses = fmt.Sprintf(`%s%s%s`, keyResponses, prefix, respStr)
|
||||||
|
keyPrefix = ","
|
||||||
|
}else{
|
||||||
|
log.Printf("No value found for key '%s' with hash '%x'", key, compositeKey)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
response = fmt.Sprintf(`%s%s"%s": [ %s ]`, response, prefix, table.tableName, keyResponses)
|
||||||
|
consumedCapacity = fmt.Sprintf(`%s%s{"TableName": "%s", "CapacityUnits": 1 }`,
|
||||||
|
consumedCapacity, prefix, table.tableName)
|
||||||
|
prefix = ","
|
||||||
|
}
|
||||||
|
// TODO: I think this implementation is incomplete
|
||||||
|
msg := fmt.Sprintf(`
|
||||||
|
{
|
||||||
|
"Responses": { %s },
|
||||||
|
"UnprocessedKeys": {},
|
||||||
|
"ConsumedCapacity": [ %s ]
|
||||||
|
}
|
||||||
|
`, response, consumedCapacity)
|
||||||
|
return msg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//////////////////////////////////////////////////////////////////////
|
||||||
|
// CreateTable implementation
|
||||||
|
//////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
func parseCreateTable(event map[string]interface{}) (req createTableRequest, err error) {
|
||||||
|
// {"TableName": "etags", "AttributeDefinitions": [{"AttributeName": "PK", "AttributeType": "S"}], "KeySchema": [{"AttributeName": "PK", "KeyType": "HASH"}]}
|
||||||
|
defer func() {
|
||||||
|
// recover from panic if one occured. Set err to nil otherwise.
|
||||||
|
if paniced := recover(); paniced != nil {
|
||||||
|
log.Printf("Parse error: %v", paniced)
|
||||||
|
err = errors.New("Error parsing request for batchGetItemRequest")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
req = createTableRequest {}
|
||||||
|
// TODO: Support everything else
|
||||||
|
req.TableName = event["TableName"].(string)
|
||||||
|
keySchema := event["KeySchema"].([]interface{})
|
||||||
|
req.KeySchema = make([]keySchemaInfo, len(keySchema))
|
||||||
|
req.AttributeDefinitions = map[string]string{}
|
||||||
|
for _, definitionInfo := range event["AttributeDefinitions"].([]interface{}) {
|
||||||
|
definition := definitionInfo.(map[string]interface{})
|
||||||
|
req.AttributeDefinitions[definition["AttributeName"].(string)] = definition["AttributeType"].(string)
|
||||||
|
}
|
||||||
|
for inx, definitionInfo := range keySchema {
|
||||||
|
definition := definitionInfo.(map[string]interface{})
|
||||||
|
req.KeySchema[inx] = keySchemaInfo {
|
||||||
|
AttributeName: definition["AttributeName"].(string),
|
||||||
|
KeyType: definition["KeyType"].(string),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return req, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// We need a metadata bucket to hold key information from this request, as
|
||||||
|
// DDB supports composite keys (keys with > 1 attribute). Note that multiple
|
||||||
|
// attribute keys have not been tested. The metadata bucket will be created
|
||||||
|
// if it doesn't already exist
|
||||||
|
func createTableMetadata(req createTableRequest, db *bolt.DB) error {
|
||||||
|
return db.Update(func(tx *bolt.Tx) error {
|
||||||
|
var err error
|
||||||
|
b := tx.Bucket([]byte("_m"))
|
||||||
|
if b == nil {
|
||||||
|
log.Printf("Metadata bucket doesn't exist - creating")
|
||||||
|
b, err = tx.CreateBucket([]byte("_m"))
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Error creating metadata bucket: %s", req.TableName, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Insert data into metadata bucket
|
||||||
|
buf := bytes.Buffer{}
|
||||||
|
enc := gob.NewEncoder(&buf)
|
||||||
|
err = enc.Encode(req)
|
||||||
|
if err != nil { log.Println(`failed gob Encode`, err); return err }
|
||||||
|
err = b.Put([]byte(req.TableName), buf.Bytes())
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// createTable. This is fairly straightforward, with a bolt bucket = DDB table
|
||||||
|
func createTable(event map[string]interface{}, db *bolt.DB) (string, error) {
|
||||||
|
req, err := parseCreateTable(event)
|
||||||
|
if err != nil {
|
||||||
|
log.Println("Error parsing")
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
if err = createTableMetadata(req, db); err != nil {
|
||||||
|
log.Printf("Error creating metadata for bucket '%s'\n", err.Error())
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
// Use the transaction...
|
||||||
|
// we only care about table name, and table = bucket in bolt
|
||||||
|
// Start a writable transaction.
|
||||||
|
tx, err := db.Begin(true)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
defer tx.Rollback()
|
||||||
|
|
||||||
|
_, err = tx.CreateBucket([]byte(req.TableName))
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Error creating bucket '%s': %s", req.TableName, err.Error())
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Commit the transaction and check for error.
|
||||||
|
if err := tx.Commit(); err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
msg := fmt.Sprintf(`{
|
||||||
|
"AttributeDefinitions": [],
|
||||||
|
"TableName": "%s",
|
||||||
|
"KeySchema": [],
|
||||||
|
"LocalSecondaryIndexes": []
|
||||||
|
"ProvisionedThroughput": {
|
||||||
|
"ReadCapacityUnits": 5000,
|
||||||
|
"WriteCapacityUnits": 5000
|
||||||
|
},
|
||||||
|
"Tags": []
|
||||||
|
}
|
||||||
|
`, req.TableName)
|
||||||
|
return msg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//////////////////////////////////////////////////////////////////////
|
||||||
|
// BatchWriteItem implementation
|
||||||
|
//////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
func parseBatchWriteItem(event map[string]interface{}) (req batchWriteItemRequest, err error) {
|
||||||
|
defer func() {
|
||||||
|
// recover from panic if one occured. Set err to nil otherwise.
|
||||||
|
if paniced := recover(); paniced != nil {
|
||||||
|
log.Printf("Parse error: %v", paniced)
|
||||||
|
err = errors.New("Error parsing request for batchWriteItemRequest")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
req = batchWriteItemRequest {
|
||||||
|
tables: map[string]batchWriteItemTable{},
|
||||||
|
}
|
||||||
|
for table, tableInt := range event["RequestItems"].(map[string]interface{}) {
|
||||||
|
requests := tableInt.([]interface{})
|
||||||
|
req.tables[table] = batchWriteItemTable{
|
||||||
|
tableName: table,
|
||||||
|
requests: make([]writeRequest, len(requests)),
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Println("parsing writeitem request for table: " + table)
|
||||||
|
for inx, requestInt := range requests {
|
||||||
|
// [{{ "DeleteRequest": {...}, {"PutRequest": {...}}}]
|
||||||
|
request := requestInt.(map[string]interface{})
|
||||||
|
putRequest, putok := request["PutRequest"]
|
||||||
|
deleteRequest, delok := request["DeleteRequest"]
|
||||||
|
if putok {
|
||||||
|
req.tables[table].requests[inx].putRequest =
|
||||||
|
parseAttributes(putRequest.(map[string]interface{})["Item"].(map[string]interface{}))
|
||||||
|
}
|
||||||
|
if delok {
|
||||||
|
req.tables[table].requests[inx].deleteRequest =
|
||||||
|
parseAttributes(deleteRequest.(map[string]interface{})["Key"].(map[string]interface{}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Support ProjectionExpression
|
||||||
|
}
|
||||||
|
// TODO: Support ReturnConsumedCapacity
|
||||||
|
return req, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// {"RequestItems": {"etags": [{"PutRequest": {"Item": {"PK": {"S": "foo"}, "etag": {"S": "bar"}}}}]}}
|
||||||
|
|
||||||
|
// batchWriteItem gets the parsed event, loops through each table and processes
|
||||||
|
// any requests that might exist. This DOES NOT CURRENTLY DO DELETES, although
|
||||||
|
// that should be a trivial implementation. The key is the compositekey hash,
|
||||||
|
// and the value is the gob-encoded []attributeInfo for the passed in data
|
||||||
|
func batchWriteItem(event map[string]interface{}, db *bolt.DB) (string, error) {
|
||||||
|
req, err := parseBatchWriteItem(event)
|
||||||
|
if err != nil {
|
||||||
|
log.Println("Could not parse BatchWriteItem: " + err.Error())
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
for tablename, table := range req.tables {
|
||||||
|
log.Println("Processing request for table " + tablename)
|
||||||
|
for _, request := range table.requests {
|
||||||
|
if request.putRequest != nil && len(request.putRequest) > 0 {
|
||||||
|
key, err := getCompositeKey(tablename, db, request.putRequest)
|
||||||
|
if err != nil {
|
||||||
|
log.Println("Error getting composite key: " + err.Error())
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
log.Printf("Put request for hashkey %x", key)
|
||||||
|
err = db.Update(func(tx *bolt.Tx) error {
|
||||||
|
var err error
|
||||||
|
b := tx.Bucket([]byte(tablename))
|
||||||
|
if b == nil {
|
||||||
|
log.Printf("Table '%s' does not exist", table.tableName)
|
||||||
|
msg := fmt.Sprintf("{\"__type\":\"com.amazonaws.dynamodb.v20120810#ResourceNotFoundException\",\"message\":\"Requested resource not found: Table: %s not found\"}", table.tableName)
|
||||||
|
return newHttpError(msg, 400)
|
||||||
|
}
|
||||||
|
// Insert data into bucket
|
||||||
|
buf := bytes.Buffer{}
|
||||||
|
enc := gob.NewEncoder(&buf)
|
||||||
|
err = enc.Encode(request.putRequest)
|
||||||
|
if err != nil { log.Println(`failed gob Encode`, err); return err }
|
||||||
|
err = b.Put([]byte(key), buf.Bytes())
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Println("Error updating database: " + err.Error())
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
log.Printf("Put update succeeded for key %x", key)
|
||||||
|
}
|
||||||
|
if request.deleteRequest != nil && len(request.deleteRequest) > 0 {
|
||||||
|
return "", errors.New("Delete requests not yet supported")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
msg := fmt.Sprintf(`
|
||||||
|
{
|
||||||
|
"ConsumedCapacity": [],
|
||||||
|
"ItemCollectionMetrics": {},
|
||||||
|
"UnprocessedItems": {}
|
||||||
|
}`)
|
||||||
|
return msg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//////////////////////////////////////////////////////////////////////
|
||||||
|
// Web goo
|
||||||
|
//////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
// postCommand handles routing requests to the appropriate function based
|
||||||
|
// on the command (X-Amz-Target http header)
|
||||||
|
func postCommand(command string, rawEvent []byte, db *bolt.DB) (string, error) {
|
||||||
|
// Probably better as map[string, func]
|
||||||
|
var event map[string]interface{}
|
||||||
|
if err := json.Unmarshal(rawEvent, &event); err !=nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
switch command {
|
||||||
|
case "DynamoDB_20120810.BatchGetItem":
|
||||||
|
return batchGetItem(event, db)
|
||||||
|
case "DynamoDB_20120810.CreateTable":
|
||||||
|
return createTable(event, db)
|
||||||
|
case "DynamoDB_20120810.BatchWriteItem":
|
||||||
|
return batchWriteItem(event, db)
|
||||||
|
default:
|
||||||
|
return "", errors.New("unrecognized command: " + command)
|
||||||
|
}
|
||||||
|
return "", errors.New("unreachable")
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// main - setup http listener and marshall all posts to postCommand for
|
||||||
|
// processing. Will also open up/create our bolt db
|
||||||
|
func main() {
|
||||||
|
port := os.Getenv("PORT")
|
||||||
|
if port == "" { port = ":8080" } else { port = ":" + port }
|
||||||
|
filename := os.Getenv("FILE")
|
||||||
|
if filename == "" { filename = "ddb.db" }
|
||||||
|
db, err := bolt.Open(filename, 0600, nil)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
defer db.Close()
|
||||||
|
fmt.Printf("listening on port %s\n", port)
|
||||||
|
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
switch r.Method {
|
||||||
|
case "POST":
|
||||||
|
body, err := ioutil.ReadAll(r.Body)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, "Could not read body", 400)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
target := r.Header.Get("X-Amz-Target")
|
||||||
|
log.Println(target)
|
||||||
|
// Dump headers (if needed)
|
||||||
|
// for k, v := range r.Header {
|
||||||
|
// log.Println(k)
|
||||||
|
// log.Println(v)
|
||||||
|
// }
|
||||||
|
if os.Getenv("DEBUG") == "true" {
|
||||||
|
// This could include sensitive data
|
||||||
|
log.Println(string(body[:]))
|
||||||
|
}
|
||||||
|
var resp = ""
|
||||||
|
if resp, err = postCommand(target, body, db); err != nil {
|
||||||
|
switch err.(type) {
|
||||||
|
case *httpErrorStruct:
|
||||||
|
httpError := err.(*httpErrorStruct)
|
||||||
|
w.WriteHeader(httpError.StatusCode())
|
||||||
|
if _, err = w.Write([]byte(httpError.Error())); err != nil {
|
||||||
|
http.Error(w, "Could not write response", 400)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
http.Error(w, "Bad request", 400)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if _, err = w.Write([]byte(resp)); err != nil {
|
||||||
|
http.Error(w, "Could not write response", 400)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
fmt.Printf("invalid request, method %s\n", r.Method)
|
||||||
|
http.Error(w, "Not found", 404)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
log.Fatal(http.ListenAndServe(port, nil))
|
||||||
|
}
|
||||||
|
|
8
go.mod
Normal file
8
go.mod
Normal file
|
@ -0,0 +1,8 @@
|
||||||
|
module lerch.org/ddbbolt
|
||||||
|
|
||||||
|
go 1.15
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/boltdb/bolt v1.3.1
|
||||||
|
golang.org/x/crypto v0.0.0-20201116153603-4be66e5b6582
|
||||||
|
)
|
72
go.sum
Normal file
72
go.sum
Normal file
|
@ -0,0 +1,72 @@
|
||||||
|
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||||
|
github.com/DataDog/zstd v1.4.1 h1:3oxKN3wbHibqx897utPC2LTQU4J+IHWWJO+glkAkpFM=
|
||||||
|
github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
|
||||||
|
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
|
||||||
|
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
|
||||||
|
github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4=
|
||||||
|
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
|
||||||
|
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
|
||||||
|
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
|
||||||
|
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
|
||||||
|
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
|
||||||
|
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
|
||||||
|
github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
|
||||||
|
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/dgraph-io/badger v1.6.2 h1:mNw0qs90GVgGGWylh0umH5iag1j6n/PeJtNvL6KY/x8=
|
||||||
|
github.com/dgraph-io/badger/v2 v2.2007.2 h1:EjjK0KqwaFMlPin1ajhP943VPENHJdEz1KLIegjaI3k=
|
||||||
|
github.com/dgraph-io/badger/v2 v2.2007.2/go.mod h1:26P/7fbL4kUZVEVKLAKXkBXKOydDmM2p1e+NhhnBCAE=
|
||||||
|
github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de h1:t0UHb5vdojIDUqktM6+xJAfScFBsVpXZmqC9dsgJmeA=
|
||||||
|
github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E=
|
||||||
|
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
|
||||||
|
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
|
||||||
|
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
|
||||||
|
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
|
||||||
|
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
|
||||||
|
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
|
||||||
|
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||||
|
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
|
||||||
|
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||||
|
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
|
||||||
|
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
|
||||||
|
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||||
|
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||||
|
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||||
|
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
|
||||||
|
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
|
||||||
|
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
|
||||||
|
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
|
||||||
|
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
|
||||||
|
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||||
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
|
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
|
||||||
|
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
|
||||||
|
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
|
||||||
|
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
|
||||||
|
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
|
||||||
|
github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU=
|
||||||
|
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
|
||||||
|
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
|
||||||
|
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
|
||||||
|
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||||
|
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
|
||||||
|
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
|
||||||
|
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
|
||||||
|
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
|
||||||
|
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||||
|
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||||
|
golang.org/x/crypto v0.0.0-20201116153603-4be66e5b6582 h1:0WDrJ1E7UolDk1KhTXxxw3Fc8qtk5x7dHP431KHEJls=
|
||||||
|
golang.org/x/crypto v0.0.0-20201116153603-4be66e5b6582/go.mod h1:tCqSYrHVcf3i63Co2FzBkTCo2gdF6Zak62921dSfraU=
|
||||||
|
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||||
|
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
|
||||||
|
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||||
|
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
|
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
|
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb h1:fgwFCsaw9buMuxNd6+DQfAuSFqbNiQZpcgJQAgJsK6k=
|
||||||
|
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||||
|
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||||
|
golang.org/x/term v0.0.0-20201113234701-d7a72108b828/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
|
||||||
|
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||||
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
|
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
|
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
Loading…
Reference in New Issue
Block a user