from datetime import datetime
from http import HTTPStatus
from os import access
from flask import request
from flask_jwt_extended import create_access_token, get_jwt, get_jwt_identity, jwt_required
from flask_restful import Resource
from mysql.connector.errors import Error
from config import Config
import boto3
boto3와 라이브러리를 import한다.
class ObjectDetctionResource(Resource):
def get(self):
# 1. 클라이언트로부터 데이터를 받아온다.
filename = request.args['filename']
# 2. 위의 파일은, S3에 저장되어 있어야 한다.
# rekognition 을 이용해서, object detection 한다.
client = boto3.client('rekognition', 'ap-northeast-2', aws_access_key_id = Config.ACCESS_KEY, aws_secret_access_key = Config.SECRET_ACCESS)
response = client.detect_labels(Image = {'S3Object': {'Bucket':Config.S3_BUCKET, 'Name':filename}}, MaxLabels = 10)
for label in response['Labels']:
print ("Label: " + label['Name'])
print ("Confidence: " + str(label['Confidence']))
print ("Instances:")
for instance in label['Instances']:
print (" Bounding box")
print (" Top: " + str(instance['BoundingBox']['Top']))
print (" Left: " + str(instance['BoundingBox']['Left']))
print (" Width: " + str(instance['BoundingBox']['Width']))
print (" Height: " + str(instance['BoundingBox']['Height']))
print (" Confidence: " + str(instance['Confidence']))
print()
print ("Parents:")
for parent in label['Parents']:
print (" " + parent['Name'])
print ("----------")
print ()
return {'result':'success', 'Labels':response['Labels']}, 200
클라이언트로부터 파일이름을 받고, boto3.client로 연결한다. detect_labels의 파라미터에 s3와 파일이름을 넣고 라벨 수를 조정해준 뒤 결과를 받아온다.
from datetime import datetime
from http import HTTPStatus
from os import access
from flask import request
from flask_jwt_extended import create_access_token, get_jwt, get_jwt_identity, jwt_required
from flask_restful import Resource
from mysql.connector.errors import Error
from mysql_connection import get_connection
import mysql.connector
from config import Config
import boto3
pip install boto3로 설치하고 import 한다. boto3는 aws를 파이썬에서 연결할 수 있도록 해주는 라이브러리다.
class PostingResource(Resource):
def post(self):
# 1. 클라이언트로부터 데이터 받아온다.
# photo(file), content(text)
if 'photo' not in request.files:
return {'error', '파일을 업로드하세요'}, 400
file = request.files['photo']
content = request.form['content']
# 2. S3에 파일 업로드
# 파일명을 우리가 변경해 준다.
# 파일명은, 유니크하게 만들어야 한다.
current_time = datetime.now()
new_file_name = current_time.isoformat().replace(':', '_') + '.jpg'
# 유저가 올린 파일의 이름을, 내가 만든 파일명으로 변경
file.filename = new_file_name
# S3 에 업로드 하면 된다.
# AWS 의 라이브러리를 사용해야 한다.
# 이 파이썬 라이브러리를 boto3 라이브러리다!
# boto3 라이브러리 설치
# pip install boto3
s3 = boto3.client('s3', aws_access_key_id = Config.ACCESS_KEY, aws_secret_access_key = Config.SECRET_ACCESS)
try:
s3.upload_fileobj(file, Config.S3_BUCKET, file.filename, ExtraArgs = {'ACL':'public-read', 'ContentType':file.content_type})
except Exception as e:
return {'error':str(e)}, 500
# 3. DB에 저장
try :
# 데이터 insert
# 1. DB에 연결
connection = get_connection()
# 2. 쿼리문 만들기
query = '''insert into posting
(content, imgUrl)
values
( %s, %s);'''
record = (content, new_file_name)
# 3. 커서를 가져온다.
cursor = connection.cursor()
# 4. 쿼리문을 커서를 이용해서 실행한다.
cursor.execute(query, record)
# 5. 커넥션을 커밋해줘야 한다 => 디비에 영구적으로 반영하라는 뜻
connection.commit()
# 6. 자원 해제
cursor.close()
connection.close()
except mysql.connector.Error as e :
print(e)
cursor.close()
connection.close()
return {"error" : str(e)}, 503
return {'result':'success'}
from flask import Flask
from flask_jwt_extended import JWTManager
from flask_restful import Api
from config import Config
from resources.recipe import RecipeListResource
from resources.recipe_info import RecipeResource
from resources.recipe_publish import RecipePublishResource
from resources.user import UserLoginResource, UserLogoutResource, UserRegisterResource, jwt_blacklist
app = Flask(__name__)
# 환경변수 셋팅
app.config.from_object(Config)
# JWT 토큰 라이브러리 만들기
jwt = JWTManager(app)
# 로그아웃 된 토큰이 들어있는 set을, jwt에 알려준다.
@jwt.token_in_blocklist_loader
def check_if_token_is_revoked(jwt_header, jwt_payload):
jti = jwt_payload['jti']
return jti in jwt_blacklist
api = Api(app)
# 경로와 리소스(API 코드)를 연결한다.
api.add_resource(RecipeListResource, '/recipes')
api.add_resource(RecipeResource, '/recipes/<int:recipe_id>')
api.add_resource(RecipePublishResource, '/recipes/<int:recipe_id>/publish')
api.add_resource(UserRegisterResource, '/users/register')
api.add_resource(UserLoginResource, '/users/login')
api.add_resource(UserLogoutResource, '/users/logout')
if __name__ == "__main__":
app.run()
@jwt.token_in_blocklist_loader를 적고 함수를 만든다.
로그아웃된 토큰이 들어있는 set을, jwt에 알려준다.
# 로그아웃 기능을 하는 클래스
class UserLogoutResource(Resource):
@jwt_required()
def post(self):
jti = get_jwt()['jti']
print(jti)
jwt_blacklist.add(jti)
return {'result', 'success'}, 200
로그아웃 기능을 하는 클래스다. @jwt_required를 적어주고 jti를 만들고 집합에 넣어준다.
mysql_connection.py파일에는 mysql에 연결하기위한 코드들이 작성되어 있다.
app.py 코드
from flask import Flask
from flask_restful import Api
from resources.recipe import RecipeListResource
from resources.recipe_info import RecipeResource
app = Flask(__name__)
api = Api(app)
# 경로와 리소스(API 코드)를 연결한다.
api.add_resource(RecipeListResource, '/recipes')
api.add_resource(RecipeResource, '/recipes/<int:recipe_id>')
if __name__ == "__main__":
app.run()
플라스크를 import하고 기본 구조 작성뒤 경로와 리소스를 연결한다.
recipe.py 코드
from http import HTTPStatus
from flask import request
from flask_restful import Resource
from mysql.connector.errors import Error
from mysql_connection import get_connection
import mysql.connector
### API 를 만들기 위한 클래스 작성
### class(클래스) 란?? 변수와 함수로 구성된 묶음!
### 클래스는 상속이 가능하다!
### API를 만들기 위한 클래스는, flask_restful 라이브러리의
### Resource 클래스를 상속해서 만들어야 한다.
class RecipeListResource(Resource):
# restful api 의 method 에 해당하는 함수 작성
def post(self) :
# api 실행 코드를 여기에 작성
# 클라이언트에서, body 부분에 작성한 json 을
# 받아오는 코드
data = request.get_json()
# 받아온 데이터를 디비 저장하면 된다.
try :
# 데이터 insert
# 1. DB에 연결
connection = get_connection()
# 2. 쿼리문 만들기
query = '''insert into recipe
(name, description, cook_time, directions)
values
( %s , %s , %s ,%s);'''
record = (data['name'], data['description'], data['cook_time'], data['directions'] )
# 3. 커서를 가져온다.
cursor = connection.cursor()
# 4. 쿼리문을 커서를 이용해서 실행한다.
cursor.execute(query, record)
# 5. 커넥션을 커밋해줘야 한다 => 디비에 영구적으로 반영하라는 뜻
connection.commit()
# 6. 자원 해제
cursor.close()
connection.close()
except mysql.connector.Error as e :
print(e)
cursor.close()
connection.close()
return {"error" : str(e)}, 503
return {"result" : "success"}, 200
def get(self) :
# 쿼리 스트링으로 오는 데이터는 아래처럼 처리해준다.
offset = request.args.get('offset')
limit = request.args.get('limit')
# 디비로부터 데이터를 받아서, 클라이언트에 보내준다.
try :
connection = get_connection()
query = '''select *
from recipe
limit '''+offset+''' , '''+limit+''';'''
# select 문은, dictionary = True 를 해준다.
cursor = connection.cursor(dictionary = True)
cursor.execute(query)
# select 문은, 아래 함수를 이용해서, 데이터를 가져온다.
result_list = cursor.fetchall()
print(result_list)
# 중요! 디비에서 가져온 timestamp 는
# 파이썬의 datetime 으로 자동 변경된다.
# 문제는! 이데이터를 json 으로 바로 보낼수 없으므로,
# 문자열로 바꿔서 다시 저장해서 보낸다.
i = 0
for record in result_list :
result_list[i]['created_at'] = record['created_at'].isoformat()
result_list[i]['updated_at'] = record['updated_at'].isoformat()
i = i + 1
cursor.close()
connection.close()
except mysql.connector.Error as e :
print(e)
cursor.close()
connection.close()
return {"error" : str(e)}, 503
return { "result" : "success" ,
"count" : len(result_list) ,
"result_list" : result_list }, 200
insert와 select가 post, get 메소드로 구현되어있다.
recipe_info.py 코드
from http import HTTPStatus
from flask import request
from flask_restful import Resource
from mysql.connector.errors import Error
from mysql_connection import get_connection
import mysql.connector
class RecipeResource(Resource):
# 클라이언트로부터 /recipes/3 이런식으로 경로를 처리하므로
# 숫자는 바뀌므로, 변수로 처리해준다.
def get(self, recipe_id):
# 디비에서, recipe_id에 들어있는 값에 해당되는
# 데이터를 select 해온다.
try :
connection = get_connection()
query = '''select *
from recipe
where id = %s;'''
record = (recipe_id, )
# select 문은, dictionary = True 를 해준다.
cursor = connection.cursor(dictionary = True)
cursor.execute(query, record)
# select 문은, 아래 함수를 이용해서, 데이터를 가져온다.
result_list = cursor.fetchall()
print(result_list)
# 중요! 디비에서 가져온 timestamp 는
# 파이썬의 datetime 으로 자동 변경된다.
# 문제는! 이데이터를 json 으로 바로 보낼수 없으므로,
# 문자열로 바꿔서 다시 저장해서 보낸다.
i = 0
for record in result_list :
result_list[i]['created_at'] = record['created_at'].isoformat()
result_list[i]['updated_at'] = record['updated_at'].isoformat()
i = i + 1
cursor.close()
connection.close()
except mysql.connector.Error as e :
print(e)
cursor.close()
connection.close()
return {"error" : str(e)}, 503
return {'result':'success',
'info':result_list[0]}
# 데이터를 업데이트하는 API들은 put 함수를 사용한다.
def put(self, recipe_id):
# body에서 전달된 데이터를 처리
data = request.get_json()
# 디비 업데이트 실행코드
try:
# 데이터 업데이트
# 1. DB에 연결
connection = get_connection()
# 2. 쿼리문 만들기
query = '''update recipe
set name = %s, description = %s,
cook_time = %s,
directions = %s
where id = %s;'''
record = (data['name'], data['description'], data['cook_time'], data['directions'], recipe_id)
# 3. 커서를 가져온다.
cursor = connection.cursor()
# 4. 쿼리문을 커서를 이용해서 실행한다.
cursor.execute(query, record)
# 5. 커넥션을 커밋해줘야 한다 => 디비에 영구적으로 반영하라는 뜻
connection.commit()
# 6. 자원 해제
cursor.close()
connection.close()
except mysql.connector.Error as e:
print(e)
cursor.close()
connection.close()
return {'error':str(e)}, 503
return {'result':'sucess'}, 200
# 삭제하는 delete 함수
def delete(self, recipe_id):
try:
# 데이터 삭제
# 1. DB에 연결
connection = get_connection()
# 2. 쿼리문 만들기
query = '''delete from recipe
where id = %s;'''
record = (recipe_id, )
# 3. 커서를 가져온다.
cursor = connection.cursor()
# 4. 쿼리문을 커서를 이용해서 실행한다.
cursor.execute(query, record)
# 5. 커넥션을 커밋해줘야 한다 => 디비에 영구적으로 반영하라는 뜻
connection.commit()
# 6. 자원 해제
cursor.close()
connection.close()
except mysql.connector.Error as e:
print(e)
cursor.close()
connection.close()
return {'error':str(e)}, 503
return {'result':'success'}, 200
select, update, delete가 get, put, delete 메소드로 구현되어 있다.