-
Notifications
You must be signed in to change notification settings - Fork 0
/
meli_sdk.py
144 lines (121 loc) · 5.85 KB
/
meli_sdk.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from configparser import SafeConfigParser
from urllib.parse import urlencode
import json
import os
import re
import requests
import ssl
class Meli(object):
def __init__(self, client_id, client_secret, access_token=None, refresh_token=None):
self.client_id = client_id
self.client_secret = client_secret
self.access_token = access_token
self.refresh_token = refresh_token
self.expires_in = None
parser = SafeConfigParser()
parser.read(os.path.dirname(os.path.abspath(__file__))+'/config.ini')
self._requests = requests.Session()
try:
self.SSL_VERSION = parser.get('config', 'ssl_version')
self._requests.mount('https://', SSLAdapter(ssl_version=getattr(ssl, self.SSL_VERSION)))
except:
self._requests = requests
self.API_ROOT_URL = parser.get('config', 'api_root_url')
self.SDK_VERSION = parser.get('config', 'sdk_version')
self.AUTH_URL = parser.get('config', 'auth_url')
self.OAUTH_URL = parser.get('config', 'oauth_url')
#AUTH METHODS
def auth_url(self,redirect_URI):
params = {'client_id':self.client_id,'response_type':'code','redirect_uri':redirect_URI}
url = self.AUTH_URL + '/authorization' + '?' + urlencode(params)
return url
def authorize(self, code, redirect_URI):
params = { 'grant_type' : 'authorization_code', 'client_id' : self.client_id, 'client_secret' : self.client_secret, 'code' : code, 'redirect_uri' : redirect_URI}
headers = {'Accept': 'application/json', 'User-Agent':self.SDK_VERSION, 'Content-type':'application/json'}
uri = self.make_path(self.OAUTH_URL)
response = self._requests.post(uri, params=urlencode(params), headers=headers)
if response.ok:
response_info = response.json()
self.access_token = response_info['access_token']
if 'refresh_token' in response_info:
self.refresh_token = response_info['refresh_token']
else:
self.refresh_token = '' # offline_access not set up
self.expires_in = response_info['expires_in']
return self.access_token
else:
# response code isn't a 200; raise an exception
response.raise_for_status()
def get_refresh_token(self):
if self.refresh_token:
params = {'grant_type' : 'refresh_token', 'client_id' : self.client_id, 'client_secret' : self.client_secret, 'refresh_token' : self.refresh_token}
headers = {'Accept': 'application/json', 'User-Agent':self.SDK_VERSION, 'Content-type':'application/json'}
uri = self.make_path(self.OAUTH_URL)
response = self._requests.post(uri, params=urlencode(params), headers=headers, data=params)
if response.ok:
response_info = response.json()
self.access_token = response_info['access_token']
self.refresh_token = response_info['refresh_token']
self.expires_in = response_info['expires_in']
return self.access_token
else:
# response code isn't a 200; raise an exception
response.raise_for_status()
else:
raise Exception("Offline-Access is not allowed.")
# REQUEST METHODS
def get(self, path, params=None, extra_headers=None):
params = params or {}
headers = {'Accept': 'application/json', 'User-Agent':self.SDK_VERSION, 'Content-type':'application/json'}
if extra_headers:
headers.update(extra_headers)
uri = self.make_path(path)
response = self._requests.get(uri, params=urlencode(params), headers=headers)
return response
def post(self, path, body=None, params=None, extra_headers=None):
params = params or {}
headers = {'Accept': 'application/json', 'User-Agent':self.SDK_VERSION, 'Content-type':'application/json'}
if extra_headers:
headers.update(extra_headers)
uri = self.make_path(path)
if body:
body = json.dumps(body)
response = self._requests.post(uri, data=body, params=urlencode(params), headers=headers)
return response
def put(self, path, body=None, params=None, extra_headers=None):
params = params or {}
headers = {'Accept': 'application/json', 'User-Agent':self.SDK_VERSION, 'Content-type':'application/json'}
if extra_headers:
headers.update(extra_headers)
uri = self.make_path(path)
if body:
body = json.dumps(body)
response = self._requests.put(uri, data=body, params=urlencode(params), headers=headers)
return response
def delete(self, path, params=None, extra_headers=None):
params = params or {}
headers = {'Accept': 'application/json', 'User-Agent':self.SDK_VERSION, 'Content-type':'application/json'}
if extra_headers:
headers.update(extra_headers)
uri = self.make_path(path)
response = self._requests.delete(uri, params=params, headers=headers)
return response
def options(self, path, params=None, extra_headers=None):
params = params or {}
headers = {'Accept': 'application/json', 'User-Agent':self.SDK_VERSION, 'Content-type':'application/json'}
if extra_headers:
headers.update(extra_headers)
uri = self.make_path(path)
response = self._requests.options(uri, params=urlencode(params), headers=headers)
return response
def make_path(self, path, params=None):
params = params or {}
# Making Path and add a leading / if not exist
if not (re.search("^/", path)):
path = "/" + path
path = self.API_ROOT_URL + path
if params:
path = path + "?" + urlencode(params)
return path