add etags as used in AWS flow
This commit is contained in:
		
							parent
							
								
									98600ac419
								
							
						
					
					
						commit
						b8f72d2705
					
				
					 1 changed files with 131 additions and 0 deletions
				
			
		
							
								
								
									
										131
									
								
								etags.py
									
										
									
									
									
										Executable file
									
								
							
							
						
						
									
										131
									
								
								etags.py
									
										
									
									
									
										Executable file
									
								
							|  | @ -0,0 +1,131 @@ | |||
| #!/usr/bin/env python3 | ||||
| 
 | ||||
| import boto3 | ||||
| from requests_futures.sessions import FuturesSession | ||||
| 
 | ||||
| from concurrent.futures import as_completed | ||||
| import json | ||||
| import os | ||||
| import sys | ||||
| import traceback | ||||
| 
 | ||||
| session = FuturesSession() | ||||
| ddb = boto3.client('dynamodb') | ||||
| events = boto3.client('events') | ||||
| table = os.getenv('ETAGS_TABLE', 'etags') | ||||
| event_bus_name = os.getenv('ETAGS_BUS_NAME', 'url-content-changes') | ||||
| force_change = os.getenv('ETAGS_FORCE_CHANGE', 'False') != 'False' | ||||
| 
 | ||||
| 
 | ||||
| def printerr(*args, **kwargs): | ||||
|     print(*args, file=sys.stderr, **kwargs) | ||||
| 
 | ||||
| 
 | ||||
| def ddb_url(url): | ||||
|     return { | ||||
|             'PK': {'S': url} | ||||
|             } | ||||
| 
 | ||||
| 
 | ||||
| def all_etags(urls): | ||||
|     response = ddb.batch_get_item(RequestItems={table: { | ||||
|         'Keys': [ddb_url(url) for url in urls], | ||||
|         }}) | ||||
|     rc = {} | ||||
|     for item in response['Responses'][table]: | ||||
|         rc[item['PK']['S']] = item['etag']['S'] | ||||
|     return rc | ||||
| 
 | ||||
| 
 | ||||
| def create_put_request(item): | ||||
|     return { | ||||
|             'PutRequest': { | ||||
|                 'Item': { | ||||
|                     'PK': {'S': item['url']}, | ||||
|                     'etag': {'S': item['etag']}, | ||||
|                 }, | ||||
|             }, | ||||
|             } | ||||
| 
 | ||||
| 
 | ||||
| def create_event(item): | ||||
|     return { | ||||
|             'Source': 'etags', | ||||
|             'Resources': [item['url']], | ||||
|             'Detail': json.dumps({'text': item['text']}), | ||||
|             'DetailType': 'text-content', | ||||
|             'EventBusName': event_bus_name, | ||||
|             } | ||||
| 
 | ||||
| 
 | ||||
| def process_changes(changed): | ||||
|     # TODO: Deal with 25 max requests per call | ||||
|     response = ddb.batch_write_item(RequestItems={ | ||||
|         table: [create_put_request(item) for item in changed] | ||||
|         }) | ||||
|     unprocessed = {} | ||||
|     if table in response['UnprocessedItems']: | ||||
|         for item in response['UnprocessedItems'][table]: | ||||
|             printerr('DDB did not process item with url %s' % | ||||
|                      item['PutRequest']['Item']['PK']['S']) | ||||
|             unprocessed[item['PutRequest']['Item']['PK']['S']] = True | ||||
|    response = events.put_events( | ||||
|         Entries=[create_event(item) for item in | ||||
|                  filter(lambda k: k['url'] not in unprocessed, changed)] | ||||
|     ) | ||||
|     if response['FailedEntryCount'] > 0: | ||||
|         for entry in response['Entries']: | ||||
|             printerr(json.dumps(entry)) | ||||
| 
 | ||||
| 
 | ||||
| def make_requests(urls, existing_etags): | ||||
|     rs = [] | ||||
|     rsdict = {} | ||||
|     for u in urls: | ||||
|         if u in existing_etags and not force_change: | ||||
|             etag = existing_etags[u] | ||||
|             future = session.get(u, headers={'If-None-Match': etag}) | ||||
|         else: | ||||
|             future = session.get(u) | ||||
|         rsdict[future] = {'url': u} | ||||
|         rs.append(future) | ||||
|     return rs, rsdict | ||||
| 
 | ||||
| 
 | ||||
| def lambda_handler(event, context): | ||||
|     existing_etags = all_etags(event['urls']) | ||||
|     (rs, rsdict) = make_requests(event['urls'], existing_etags) | ||||
|     changed = [] | ||||
| 
 | ||||
|     for future in as_completed(rs): | ||||
|         try: | ||||
|             result = future.result() | ||||
|             current_etag = result.headers['etag'] | ||||
|             prior_etag = None | ||||
|             if rsdict[future]['url'] in existing_etags: | ||||
|                 prior_etag = existing_etags[rsdict[future]['url']] | ||||
|             if force_change or current_etag != prior_etag: | ||||
|                 changed.append({ | ||||
|                     'url': rsdict[future]['url'], | ||||
|                     'etag': current_etag, | ||||
|                     'text': result.text, | ||||
|                     }) | ||||
|         except Exception as ex: | ||||
|             printerr('Error loading %s: %s' % (rsdict[future]['url'], ex)) | ||||
|             traceback.print_exc() | ||||
| 
 | ||||
|     if len(changed) > 0: | ||||
|         print('changes detected') | ||||
|         process_changes(changed) | ||||
| 
 | ||||
|     return { | ||||
|         'statusCode': 200, | ||||
|         'body': json.dumps(event) | ||||
|     } | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == '__main__': | ||||
|     if len(sys.argv) < 2: | ||||
|         print('usage: etags.py <url>') | ||||
|         sys.exit(1) | ||||
|     print(json.dumps(lambda_handler({'urls': sys.argv[1:]}, None))) | ||||
		Loading…
	
	Add table
		
		Reference in a new issue