Python google.appengine.api.urlfetch 模块,POST 实例源码
我们从Python开源项目中,提取了以下45个代码示例,用于说明如何使用google.appengine.api.urlfetch.POST。
def pushtxn(raw_tx):
'''Insight send raw tx API'''
url = PUSH_TX_URL
payload = urllib.urlencode({
"rawtx": raw_tx
})
result = urlfetch.fetch(url,
method=urlfetch.POST,
payload=payload
)
if result.status_code == 200:
j = json.loads(result.content)
txid = j.get('txid')
return txid, raw_tx
else:
msg = 'Error accessing insight API:'+str(result.status_code)+" "+str(result.content)
ErrorNotification.new(msg)
return None, msg
def exportItems( module, target, importKey, startCursor, endCursor):
Skel = skeletonByKind( module )
query = Skel().all().cursor( startCursor, endCursor )
for item in query.run(250):
flatItem = DbTransfer.genDict( item )
formFields = {
"e": pickle.dumps(flatItem).encode("HEX"),
"key": importKey
}
result = urlfetch.fetch( url=target,
payload=urllib.urlencode(formFields),
method=urlfetch.POST,
headers={'Content-Type': 'application/x-www-form-urlencoded'})
if startCursor == endCursor:
try:
utils.sendEMailToAdmins("Export of kind %s finished" % module,
"ViUR finished to export kind %s to %s.\n" % (module, target))
except: #OverQuota,whatever
pass
# --- import ---
def check_language(text):
if len(text) > MAX_LENGTH:
logging.info("trimming text from %s to %s", len(text), MAX_LENGTH)
text = text[:MAX_LENGTH]
base_url = 'https://www.googleapis.com/language/translate/v2/detect'
params = {'key': API_KEY, 'q': text}
form_data = urls.urlencode(params)
if urllib2_fallback:
request = urllib2.Request(base_url, form_data, {'X-HTTP-Method-Override': 'GET'})
response_content = urllib2.urlopen(request).read()
else:
result = urlfetch.fetch(url=base_url, payload=form_data, method=urlfetch.POST, headers={'X-HTTP-Method-Override': 'GET'})
if result.status_code != 200:
error = "result status code is %s for content %s" % (result.status_code, result.content)
logging.error(error)
raise Exception("Error in translation: %s" % error)
response_content = result.content
json_content = json.loads(response_content)
real_results = json_content['data']['detections'][0][0]
logging.info("text classification returned %s", real_results)
if real_results['confidence'] > 0.10:
return real_results['language']
else:
return None
def auth_callback_provider( self ):
params = { 'code': self.request.get( 'code' ),
'client_id': self.get_auth_request_client_id(),
'client_secret': self.get_client_secret(),
'redirect_uri': self.domain_name[ :-1 ] + self.get_auth_callback(),
'grant_type': 'authorization_code',
}
urlParams = enki.libutil.urlencode( params )
url = self.token_endpoint()
result = self.urlfetch_safe( url = url,
payload = urlParams,
method = urlfetch.POST,
headers = { 'Content-Type': 'application/x-www-form-urlencoded' }
)
self.process_token_result( result )
def get_access_token(force=False):
"""Tries to obtain access token from memcache and,if it fails,
obtains a new set and stores in memcache.
See https://dev.twitter.com/oauth/application-only.
Deleting the memcache key `access_token` will trigger a token refresh.
"""
token = memcache.get('access_token')
if force or token is None:
logging.warning('Needed to fetch access_token')
encoded_key = urllib.quote_plus(CUSTOMER_KEY)
encoded_secret = urllib.quote_plus(CUSTOMER_SECRET)
encoded_credentials = base64.b64encode(
"{}:{}".format(encoded_key, encoded_secret))
response = urlfetch.fetch(
'https://api.twitter.com/oauth2/token',
payload='grant_type=client_credentials',
method=urlfetch.POST,
headers={'Authorization': 'Basic ' + encoded_credentials})
if response.status_code == urlfetch.httplib.OK:
response_data = json.loads(response.content)
token = response_data['access_token']
memcache.set('access_token', token, 2592000) # 30 days
return token
def urlread(url, data=None, headers=None):
if data is not None:
if headers is None:
headers = {"Content-type": "application/x-www-form-urlencoded"}
method = urlfetch.POST
else:
if headers is None:
headers = {}
method = urlfetch.GET
result = urlfetch.fetch(url, method=method,
payload=data, headers=headers)
if result.status_code == 200:
return result.content
else:
raise urllib2.URLError("fetch error url=%s,code=%d" % (url, result.status_code))
def get_request_token(base):
'''
Get request token
'''
data = urllib.urlencode({
'consumer_key': POCKET_CONSUMER_KEY,
'redirect_uri': base + POCKET_FINISH_REDIRECT
})
logging.debug(data)
res = urlfetch.fetch(
url=POCKET_OAUTH_REQUEST,
method=urlfetch.POST,
payload=data,
validate_certificate=True)
code = redirect = None
logging.debug(res.status_code)
if res.status_code == 200:
result = res.content
if 'code=' in result:
code = result.replace('code=','')
redirect = POCKET_AUTHORIZE_REDIR + '?request_token=%s&redirect_uri=%s' % (code, base + POCKET_FINISH_REDIRECT)
return (code, redirect)
def get_access_token(code):
'''
Get request token
'''
data = urllib.urlencode({
'consumer_key': POCKET_CONSUMER_KEY,
'code': code
})
logging.debug(data)
res = urlfetch.fetch(
url=POCKET_OAUTH_AUTHORIZE,
validate_certificate=True)
code = redirect = None
logging.debug(res.status_code)
if res.status_code == 200:
result = res.content
data = urlparse.parse_qs(result)
access_token = data.get('access_token', [None])[0]
return access_token
def fetch(url, headers=None,
cookie=Cookie.SimpleCookie(),
user_agent='Mozilla/5.0'):
headers = headers or {}
if data is not None:
data = urllib.urlencode(data)
if user_agent:
headers['User-agent'] = user_agent
headers['Cookie'] = ' '.join(
['%s=%s;' % (c.key, c.value) for c in cookie.values()])
try:
from google.appengine.api import urlfetch
except ImportError:
req = urllib2.Request(url, data, headers)
html = urllib2.urlopen(req).read()
else:
method = ((data is None) and urlfetch.GET) or urlfetch.POST
while url is not None:
response = urlfetch.fetch(url=url, payload=data,
method=method, headers=headers,
allow_truncated=False, follow_redirects=False,
deadline=10)
# next request will be a get,so no need to send the data again
data = None
method = urlfetch.GET
# load cookies from the response
cookie.load(response.headers.get('set-cookie', ''))
url = response.headers.get('location')
html = response.content
return html
def obtain_bearer_token(host, path):
"""Given a bearer token,send a GET request to the API.
Args:
host (str): The domain host of the API.
path (str): The path of the API after the domain.
params (dict): An optional set of query parameters in the request.
Returns:
str: OAuth bearer token,obtained using client_id and client_secret.
Raises:
HTTPError: An error occurs from the HTTP request.
"""
url = '{0}{1}'.format(host, quote(path.encode('utf8')))
data = urlencode({
'client_id': CLIENT_ID,
'client_secret': CLIENT_SECRET,
'grant_type': GRANT_TYPE,
})
print('@@@@@@@@@' + CLIENT_ID)
headers = {
'content-type': 'application/x-www-form-urlencoded',
}
result = urlfetch.fetch(
url=url,
headers=headers)
print('@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@' + result.content)
return "BIO6_LpbIcFkeKDB9SsSAONt3lE2IwrdiTxUeq-Ag1MKOzSc4m-8QyPjdV6WmI27ySuLEKv7czHoJmJjFHrCyjfgxucTvKPpJG9JCsg_08KCz4J-WrEfeaiACoJ2WXYx"
def fetch(url, ''))
url = response.headers.get('location')
html = response.content
return html
def sendData(form_field):
url = "https://api.line.me/v2/bot/message/push"
result = urlfetch.fetch(
url=url,
payload=json.dumps(form_field, ensure_ascii=False),
headers={
'Content-type': 'application/json',
'Authorization': 'Bearer ' + const.ChannelAccesstoken
}
)
if result.status_code == 200:
logging.debug(result.content)
else:
logging.debug(result.content)
def sendData(ifttt_event, ifttt_key, senderID, from_id, event_type, msg_data):
if ifttt_key == None:
send2line.sendText(str(senderID), const.MSG_NONREGISTRATION)
return
if msg_data.count('\n'):
msg_data = u"<pre>" + msg_data + u"</pre>"
form_fields = {
"value1": from_id,
"value2": event_type,
"value3": msg_data,
}
logging.debug(form_fields)
logging.debug(ifttt_key)
requestStr = json.dumps(form_fields, ensure_ascii=False)
# requestStr=requestStr.replace('\n','\\n').replace('\r','')
url = "https://maker.ifttt.com/trigger/" + ifttt_event + "/with/key/" + ifttt_key
result = urlfetch.fetch(
url=url,
payload=requestStr,
headers={
'Content-Type': ' application/json'
}
)
logging.debug(result.status_code)
if result.status_code == 200:
logging.debug(result.content)
elif result.status_code == 401:
send2line.sendText(str(senderID), const.MSG_MAKERKEY_Failed)
logging.debug(result.content)
else:
logging.debug(result.content)
def telegram_post(data, deadline=3):
return urlfetch.fetch(url=TELEGRAM_URL_SEND,
headers=JSON_HEADER, deadline=deadline)
def telegram_query(uid, deadline=3):
data = json.dumps({'chat_id': uid, 'action': 'typing'})
return urlfetch.fetch(url=TELEGRAM_URL_CHAT_ACTION, deadline=deadline)
def telegram_photo(data, deadline=3):
return urlfetch.fetch(url=TELEGRAM_URL_SEND_PHOTO, deadline=deadline)
def send_typing(uid):
data = json.dumps({'chat_id': uid, 'action': 'typing'})
try:
rpc = urlfetch.create_rpc()
urlfetch.make_fetch_call(rpc, url=TELEGRAM_URL_CHAT_ACTION,
method=urlfetch.POST, headers=JSON_HEADER)
except:
return
def fetch(url, ''))
url = response.headers.get('location')
html = response.content
return html
def fromClient(self, valuesCache, name, data):
"""
Reads a value from the client.
If this value is valid for this bone,
store this value and return None.
Otherwise our prevIoUs value is
left unchanged and an error-message
is returned.
:param name: Our name in the skeleton
:type name: String
:param data: *User-supplied* request-data
:type data: Dict
:returns: None or String
"""
if request.current.get().isDevServer: #We dont enforce captchas on dev server
return( None )
user = utils.getCurrentUser()
if user and "root" in user["access"]: # Don't bother trusted users with this (not supported by admin/vi anyways)
return( None )
if not "recaptcha_challenge_field" in data.keys() or not "recaptcha_response_field" in data.keys():
return( u"No Captcha given!" )
data = { "privatekey": self.privateKey,
"remoteip": request.current.get().request.remote_addr,
"challenge": data["recaptcha_challenge_field"],
"response": data["recaptcha_response_field"]
}
response = urlfetch.fetch( url="http://www.google.com/recaptcha/api/verify",
payload=urllib.urlencode( data ),
method=urlfetch.POST,
headers={"Content-Type": "application/x-www-form-urlencoded"} )
if str(response.content).strip().lower().startswith("true"):
return( None )
return( u"Invalid Captcha" )
def get(self):
# [START urlfetch-post]
try:
form_data = urllib.urlencode(UrlPostHandler.form_fields)
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
result = urlfetch.fetch(
url='http://localhost:8080/submit_form',
payload=form_data,
method=urlfetch.POST,
headers=headers)
self.response.write(result.content)
except urlfetch.Error:
logging.exception('Caught exception fetching url')
# [END urlfetch-post]
def fetch(url, ''))
url = response.headers.get('location')
html = response.content
return html
def __init__(self, host, port=None, strict=False, timeout=None):
from google.appengine.api import urlfetch
self._fetch = urlfetch.fetch
self._method_map = {
'GET': urlfetch.GET,
'POST': urlfetch.POST,
'HEAD': urlfetch.HEAD,
'PUT': urlfetch.PUT,
'DELETE': urlfetch.DELETE,
'PATCH': urlfetch.PATCH,
}
self.host = host
self.port = port
self._method = self._url = None
self._body = ''
self.headers = []
if not isinstance(timeout, (float, int, long)):
timeout = None
self.timeout = timeout
def _MakeRemoteSyncCall(self, service, call, request, response):
"""Send an RPC to a remote_api endpoint."""
request_pb = remote_api_pb.Request()
request_pb.set_service_name(service)
request_pb.set_method(call)
request_pb.set_request(request.Encode())
response_pb = remote_api_pb.Response()
encoded_request = request_pb.Encode()
try:
urlfetch_response = urlfetch.fetch(self.remote_url, encoded_request,
urlfetch.POST, self.extra_headers,
follow_redirects=False,
deadline=10)
except Exception, e:
logging.exception('Fetch Failed to %s', self.remote_url)
raise FetchFailed(e)
if urlfetch_response.status_code != 200:
logging.error('Fetch Failed to %s; Status %s; body %s',
self.remote_url,
urlfetch_response.status_code,
urlfetch_response.content)
raise FetchFailed(urlfetch_response.status_code)
response_pb.ParseFromString(urlfetch_response.content)
if response_pb.has_application_error():
error_pb = response_pb.application_error()
raise apiproxy_errors.ApplicationError(error_pb.code(),
error_pb.detail())
elif response_pb.has_exception():
raise pickle.loads(response_pb.exception())
elif response_pb.has_java_exception():
raise UnkNownJavaServerError('An unkNown error has occured in the '
'Java remote_api handler for this call.')
else:
response.ParseFromString(response_pb.response())
def __init__(self, strict=None,
timeout=_GLOBAL_DEFAULT_TIMEOUT, source_address=None,
context=None):
# net.proto.ProcotolBuffer relies on httplib so importing urlfetch at the
# module level causes a failure on prod. That means the import needs to be
# lazy.
from google.appengine.api import urlfetch
self._fetch = urlfetch.fetch
self._method_map = {
'GET': urlfetch.GET,
}
self.host = host
self.port = port
# With urllib2 in Python 2.6,an object can be passed here.
# The default is set to socket.GLOBAL_DEFAULT_TIMEOUT which is an object.
# We only accept float,int or long values,otherwise it can be
# silently ignored.
if not isinstance(timeout, long)):
timeout = None
self.timeout = timeout
# Both 'strict' and 'source_address' are ignored.
self._method = self._url = None
self._body = ''
self.headers = []
def auth_callback_provider( self ):
params = { 'openid.ns': '',
'openid.op_endpoint': '',
'openid.claimed_id': '',
'openid.identity': '',
'openid.return_to': '',
'openid.response_nonce': '',
'openid.assoc_handle': '',
'openid.signed': '',
'openid.sig': '',
}
for key in params:
params[ key ] = self.request.get( key )
params[ 'openid.mode' ] = 'check_authentication'
# param {'openid.claimed_id': u'http://steamcommunity.com/openid/id/7****************'}
claimedId = str( params[ 'openid.claimed_id' ])[ len( 'http://steamcommunity.com/openid/id/' ): ]
loginInfo = { 'provider_name': self.get_provider_name( ),
'provider_uid': claimedId,
'email': '',
'email_verified': '' }
urlParams = enki.libutil.urlencode( params )
fullURL = 'https://steamcommunity.com/openid/login'
result = self.urlfetch_safe( url = fullURL, payload = urlParams, method = urlfetch.POST )
if 'ns:http://specs.openid.net/auth/2.0\nis_valid:true\n' in result.content: # only if is_valid do we trust the loginInfo
self.provider_authenticated_callback( loginInfo )
#===== TWITTER =========================================================================================================
def auth_sign( self, normalised_url, ordered_params, token_secret = '', method_get = False ):
# note: create signature see https://dev.twitter.com/oauth/overview/creating-signatures
params_to_sign = enki.libutil.urlencode( ordered_params )
oauth_signature_string = ''
if method_get:
oauth_signature_string = 'GET&' + percent_encode( normalised_url ) + '&' + percent_encode( params_to_sign )
else:
oauth_signature_string = 'POST&' + percent_encode( normalised_url ) + '&' + percent_encode( params_to_sign )
key = percent_encode( settings.secrets.CLIENT_SECRET_TWITTER ) + '&' + token_secret
hmac_hash = hmac.new( key, oauth_signature_string, hashlib.sha1 )
oauth_signature = base64.b64encode( hmac_hash.digest())
return oauth_signature
def auth_request( self ):
# STEP 1
# note: these parameters need to be sorted alphabetically by key. They are therefore a list of tuples and not a dictionary.
params = [( 'oauth_callback' , self.domain_name[ :-1 ] + self.get_auth_callback()),
( 'oauth_consumer_key' , settings.secrets.CLIENT_ID_TWITTER ),
( 'oauth_nonce' , webapp2_extras.security.generate_random_string( length = 42, pool = webapp2_extras.security.ALPHANUMERIC ).encode( 'utf-8' )),
( 'oauth_signature_method' , "HMAC-SHA1" ),
( 'oauth_timestamp' , str( int( time.time()))),
( 'oauth_version' , "1.0" )]
normalised_url = 'https://api.twitter.com/oauth/request_token/'
oauth_signature = self.auth_sign( normalised_url, params )
params.append(( 'oauth_signature', oauth_signature ))
url_params = enki.libutil.urlencode( params )
result = self.urlfetch_safe( url = normalised_url, payload = url_params, method = urlfetch.POST )
response = self.process_result_as_query_string( result )
# STEP 2
if response.get( 'oauth_callback_confirmed' ) != 'true' :
self.abort( 401 )
return
else:
oauth_token = response.get( 'oauth_token' )
self.session[ 'twitter_oauth_token' ] = oauth_token
self.session[ 'twitter_oauth_token_secret' ] = response.get( 'oauth_token_secret' )
url_redirect_params = enki.libutil.urlencode([( 'oauth_token', oauth_token )])
url_redirect = 'https://api.twitter.com/oauth/authenticate?' + url_redirect_params
self.redirect( url_redirect )
return
def get_download_URL( self, enkiDL_URL, secret, item_to_download, ip_addr ):
form_fields = { 'item' : item_to_download, 'secret' : secret, 'ip_addr' : ip_addr }
form_data = enki.libutil.urlencode( form_fields )
try:
result = urlfetch.fetch( url = enkiDL_URL, payload = form_data, method = urlfetch.POST )
if result.status_code == 200:
token = result.content
self.download_url = enkiDL_URL + 'download?token=' + str( token ) + '&item=' + str( item_to_download )
else:
self.error = 1
return
except urlfetch.DownloadError:
self.error = 2
return
def send_attachment_message(sender, attachment_type, payload):
fb_sender_id = sender['id']
content = {
'recipient': {
'id': fb_sender_id
},
'message': {
'attachment': {
'type': attachment_type,
'payload': payload
}
}
}
headers = {
'Content-Type': 'application/json'
}
payload = json.dumps(content)
logging.debug(payload)
url = 'https://graph.facebook.com/v2.6/me/messages?access_token=' + config.FACEBOOK_PAGE_ACCESS_TOKEN
if config.PRODUCTION:
req = urlfetch.fetch(
url,
payload,
urlfetch.POST,
headers
)
logging.debug(req.content)
def send_fb_message(payload):
if config.PRODUCTION:
try:
req = urlfetch.fetch(
'https://graph.facebook.com/v2.6/me/messages?access_token=' + config.FACEBOOK_PAGE_ACCESS_TOKEN,
payload,
urlfetch.POST,
{'Content-Type': 'application/json'}
)
logging.debug(req.content)
except urlfetch.Error as e:
logging.error(e.message)
def fetch(url, ''))
url = response.headers.get('location')
html = response.content
return html
def request(self, operation, url, headers=None):
"""Performs an HTTP call to the server,supports GET,POST,PUT,and
DELETE.
Usage example,perform and HTTP GET on http://www.google.com/:
import atom.http
client = atom.http.HttpClient()
http_response = client.request('GET','http://www.google.com/')
Args:
operation: str The HTTP operation to be performed. This is usually one
of 'GET','POST','PUT',or 'DELETE'
data: filestream,list of parts,or other object which can be converted
to a string. Should be set to None when performing a GET or DELETE.
If data is a file-like object which can be read,this method will
read a chunk of 100K bytes at a time and send them.
If the data is a list of parts to be sent,each part will be
evaluated and sent.
url: The full URL to which the request should be sent. Can be a string
or atom.url.Url.
headers: dict of strings. HTTP headers which should be sent
in the request.
"""
all_headers = self.headers.copy()
if headers:
all_headers.update(headers)
# Construct the full payload.
# Assume that data is None or a string.
data_str = data
if data:
if isinstance(data, list):
# If data is a list of different objects,convert them all to strings
# and join them together.
converted_parts = [__ConvertDataPart(x) for x in data]
data_str = ''.join(converted_parts)
else:
data_str = __ConvertDataPart(data)
# If the list of headers does not include a Content-Length,attempt to
# calculate it based on the data object.
if data and 'Content-Length' not in all_headers:
all_headers['Content-Length'] = len(data_str)
# Set the content type to the default value if none was set.
if 'Content-Type' not in all_headers:
all_headers['Content-Type'] = 'application/atom+xml'
# Lookup the urlfetch operation which corresponds to the desired HTTP verb.
if operation == 'GET':
method = urlfetch.GET
elif operation == 'POST':
method = urlfetch.POST
elif operation == 'PUT':
method = urlfetch.PUT
elif operation == 'DELETE':
method = urlfetch.DELETE
else:
method = None
return HttpResponse(urlfetch.Fetch(url=str(url), payload=data_str,
method=method, headers=all_headers))
def impersonate(self, user_id=DEFAULT):
"""
To use this make a POST to
`http://..../impersonate request.post_vars.user_id=<id>`
Set request.post_vars.user_id to 0 to restore original user.
requires impersonator is logged in and::
has_permission('impersonate','auth_user',user_id)
"""
request = current.request
session = current.session
auth = session.auth
table_user = self.table_user()
if not self.is_logged_in():
raise HTTP(401, "Not Authorized")
current_id = auth.user.id
requested_id = user_id
user = None
if user_id is DEFAULT:
user_id = current.request.post_vars.user_id
if user_id and user_id != self.user.id and user_id != '0':
if not self.has_permission('impersonate',
self.table_user(),
user_id):
raise HTTP(403, "Forbidden")
user = table_user(user_id)
if not user:
raise HTTP(401, "Not Authorized")
auth.impersonator = pickle.dumps(session, pickle.HIGHEST_PROTOCOL)
auth.user.update(
table_user._filter_fields(user, True))
self.user = auth.user
self.update_groups()
log = self.messages['impersonate_log']
self.log_event(log, dict(id=current_id, other_id=auth.user.id))
self.run_login_onaccept()
elif user_id in (0, '0'):
if self.is_impersonating():
session.clear()
session.update(pickle.loads(auth.impersonator))
self.user = session.auth.user
self.update_groups()
self.run_login_onaccept()
return None
if requested_id is DEFAULT and not request.post_vars:
return sqlFORM.factory(Field('user_id', 'integer'))
elif not user:
return None
else:
return sqlFORM(table_user, user.id, readonly=True)
def impersonate(self, "Not Authorized")
current_id = auth.user.id
requested_id = user_id
if user_id is DEFAULT:
user_id = current.request.post_vars.user_id
if user_id and user_id != self.user.id and user_id != '0':
if not self.has_permission('impersonate', 'integer'))
return sqlFORM(table_user, readonly=True)
def request(self, headers=all_headers))
def request(self, headers=None):
"""Performs an HTTP call to the server,and
DELETE.
Usage example,perform and HTTP GET on http://www.google.com/:
import atom.http
client = atom.http.HttpClient()
http_response = client.request('GET','http://www.google.com/')
Args:
operation: str The HTTP operation to be performed. This is usually one
of 'GET',or 'DELETE'
data: filestream,or other object which can be converted
to a string. Should be set to None when performing a GET or DELETE.
If data is a file-like object which can be read,this method will
read a chunk of 100K bytes at a time and send them.
If the data is a list of parts to be sent,each part will be
evaluated and sent.
url: The full URL to which the request should be sent. Can be a string
or atom.url.Url.
headers: dict of strings. HTTP headers which should be sent
in the request.
"""
all_headers = self.headers.copy()
if headers:
all_headers.update(headers)
# Construct the full payload.
# Assume that data is None or a string.
data_str = data
if data:
if isinstance(data, list):
# If data is a list of different objects,convert them all to strings
# and join them together.
converted_parts = [__ConvertDataPart(x) for x in data]
data_str = ''.join(converted_parts)
else:
data_str = __ConvertDataPart(data)
# If the list of headers does not include a Content-Length,attempt to
# calculate it based on the data object.
if data and 'Content-Length' not in all_headers:
all_headers['Content-Length'] = len(data_str)
# Set the content type to the default value if none was set.
if 'Content-Type' not in all_headers:
all_headers['Content-Type'] = 'application/atom+xml'
# Lookup the urlfetch operation which corresponds to the desired HTTP verb.
if operation == 'GET':
method = urlfetch.GET
elif operation == 'POST':
method = urlfetch.POST
elif operation == 'PUT':
method = urlfetch.PUT
elif operation == 'DELETE':
method = urlfetch.DELETE
else:
method = None
return HttpResponse(urlfetch.Fetch(url=str(url),
method=method, headers=all_headers))
def auth_callback_provider( self ):
# STEP 3
oauth_verifier = self.request.get( 'oauth_verifier' )
params = [( 'oauth_consumer_key' ,
( 'oauth_token', self.session.get( 'twitter_oauth_token' )), "1.0" )]
normalised_url = 'https://api.twitter.com/oauth/access_token/'
oauth_signature = self.auth_sign( normalised_url, params, self.session.get( 'twitter_oauth_token_secret') )
params.append(( 'oauth_signature', oauth_signature ))
params.append(( 'oauth_verifier', oauth_verifier ))
url_params = enki.libutil.urlencode( params )
result = self.urlfetch_safe( url = normalised_url, method = urlfetch.POST )
response = self.process_result_as_query_string( result )
oauth_token = response.get( 'oauth_token' )
oauth_token_secret = response.get('oauth_token_secret')
user_id = response.get( 'user_id')
if user_id and oauth_token:
#get email address if we can
verify_params = [('include_email', 'true'),
('include_entities','false'),
('oauth_consumer_key',
('oauth_nonce',
('oauth_signature_method', "HMAC-SHA1"),
('oauth_timestamp', str(int(time.time()))),
('oauth_token', oauth_token ),
('oauth_version', "1.0"),
('skip_status', 'true')]
verify_oauth_signature = self.auth_sign('https://api.twitter.com/1.1/account/verify_credentials.json', verify_params,oauth_token_secret, method_get=True )
verify_params.append(('oauth_signature', verify_oauth_signature))
verify_url_params = enki.libutil.urlencode( verify_params )
full_url = 'https://api.twitter.com/1.1/account/verify_credentials.json?' + verify_url_params
verify_credentials_result_json = self.urlfetch_safe( url = full_url, method = urlfetch.GET )
verify_credentials_result = self.process_result_as_JSON(verify_credentials_result_json)
response['email'] = verify_credentials_result['email']
response['email_verified'] = True
loginInfoSettings = { 'provider_uid': 'user_id',
'email': 'email',
'email_verified': 'email_verified' }
loginInfo = self.process_login_info( loginInfoSettings, response )
self.provider_authenticated_callback( loginInfo )
else:
self.abort( 401 )
return
def set_welcome_message(fb_page_id):
url = 'https://graph.facebook.com/v2.6/' + fb_page_id + '/thread_settings?access_token=' + config.FACEBOOK_PAGE_ACCESS_TOKEN
if config.PRODUCTION:
content = {
'setting_type': 'call_to_actions',
'thread_state': 'new_thread',
'call_to_actions': [
{
'message': {
'text': 'Hello there!',
'attachment': {
'type': 'template',
'payload': {
'template_type': 'generic',
'elements': [
{
'title': 'Welcome to %s' % config.FACEBOOK_BOT_NAME,
'item_url': 'https://gilacoolbot.appspot.com',
'image_url': 'http://messengerdemo.parseapp.com/img/rift.png',
'subtitle': 'This is a subtitle',
'buttons': [
{
'type': 'web_url',
'title': 'View website',
'url': 'https://gilacoolbot.appspot.com'
},
{
'type': 'postback',
'title': 'Start chatting',
'payload': 'DEVELOPER_DEFINED_PAYLOAD',
}
]
}
]
}
}
}
}
]
}
headers = {
'Content-Type': 'application/json'
}
payload = json.dumps(content)
req = urlfetch.fetch(
url,
headers
)
logging.debug(req.content)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。