Python colorama.Fore 模块,MAGENTA 实例源码
我们从Python开源项目中,提取了以下36个代码示例,用于说明如何使用colorama.Fore.MAGENTA。
def main(self, s):
# Trims input s to be just the city/region name
s = s.replace('time ', '').replace('in ', '')
# Transforms a city name into coordinates using Google Maps API
loc = getLocation(s)
# Gets current date and time using TimeZoneDB API
send_url = (
"http://api.timezonedb.com/v2/get-time-zone?"
"key=BFA6XBCZ8AL5&format=json"
"&by=position&lat={:.6f}&lng={:.6f}".format(*loc)
)
r = requests.get(send_url)
j = json.loads(r.text)
time = j['formatted']
self.dst = j['dst']
# Prints current date and time as YYYY-MM-DD HH:MM:SS
print(Fore.MAGENTA + "The current date and time in " +
str(s).title() + " is: " + str(time) + Fore.RESET)
def radio_play(s):
global prev, song, voice, player
if (len(s) > 4):
song = yt[s]
player = voice.create_ffmpeg_player(ytDir + song['file'], before_options='-hide_banner -loglevel panic', options='-b:a 64k -bufsize 64k')
player.start()
dprint(Fore.MAGENTA + 'Playing:' + Style.NORMAL + ' [YT] ' + song['artist'] + ' - ' + song['title'])
else:
song = songListByID[s]
player = voice.create_ffmpeg_player(musicDir + song['file'], options='-b:a 64k -bufsize 64k')
player.start()
if (song['artist']):
dprint(Fore.MAGENTA + 'Playing:' + Style.NORMAL + ' [' + song['id'] + '] ' + song['artist'] + ' - ' + song['title'])
else:
dprint(Fore.MAGENTA + 'Playing:' + Style.NORMAL + ' [' + song['id'] + '] ' + song['title'])
await client.change_status(game=discord.Game(name=song['title'], url='', type=0), idle=False)
prev.append(song['id'])
if (len(prev) > 5):
prev.remove(prev[0])
return player
def present(self):
for package in self.list_:
tag_str = " ".join(
("{}[{}]".format(k.color, k) for k in self.tags[package])
)
print("{}/{} {}".format(
Fore.MAGENTA + package.name + Style.RESET_ALL,
package.version,
tag_str
))
def status(f):
"""
:param f:
:return:
"""
if f == 0:
print('[' + Fore.GREEN + 'Done' + Fore.RESET + ']') # Python 3:,flush=True)
if f < 0:
print('[' + Fore.RED + 'Error' + Fore.RESET + ']') # Python 3:,flush=True)
if f > 0:
print('[' + Fore.MAGENTA + '???' + Fore.RESET + ']') # Python 3:,flush=True)
sys.stdout.flush() # Force flush in Python 2
def print_msg(msg=u'', color=Fore.MAGENTA):
""":type msg: basestring"""
with print_lock:
click.echo(u'{}{}{}'.format(color, force_unicode(msg), Fore.RESET),
color=True)
sys.stdout.flush()
def get_pipeline_logs(multilog):
"""Gets logs generated by each pipeline.
:param multilog: logger handler instance
"""
def _format_log(name, log):
name, log = force_unicode(name), force_unicode(log)
return center_text_message(name, color=Fore.MAGENTA) + '\n' + log
entries = {
name: _format_log(name, log)
for name, log in multilog.grouped_by_thread.items()
}
try:
url = os.environ['KD_PASTEBIN_URL']
user = os.environ['KD_PASTEBIN_USER']
password = os.environ['KD_PASTEBIN_PASS']
with PastebinClient(url, user, password) as c:
urls = (u'{}: {}'.format(n, c.post(e)) for n, e in entries.items())
msg = '\n' + '\n'.join(urls)
except Exception as e:
# Fallback if pastebin isn't accessible
msg = u'\n!!! Could not upload logs to pastebin,' \
u'falling back to console. Reason:\n{}\n\n{}'.format(
u''.join(traceback.format_exception(*sys.exc_info())),
u'\n'.join(entries.values())
)
msg = center_text_message(
'PIPELINE DETAILED LOGS', fill_char='=', color=Fore.MAGENTA) + msg
return msg.encode('utf-8')
def info(self, message):
print(Fore.MAGENTA + threading.current_thread().name + ': ' + Fore.CYAN + message + Fore.RESET)
def success(self, message):
print(Fore.MAGENTA + threading.current_thread().name + ': ' + Fore.GREEN + message + Fore.RESET)
def log_debug(message):
print(Style.DIM + Fore.MAGENTA + message + Fore.RESET + Style.RESET_ALL)
def info(self, message):
print(Fore.MAGENTA + threading.current_thread().name + ': ' + Fore.CYAN + message + Fore.RESET)
def log_debug(message):
print(Style.DIM + Fore.MAGENTA + message + Fore.RESET + Style.RESET_ALL)
def log(color, m_str, log_this=True):
global log_file
pid = str(os.getpid())
print(Fore.MAGENTA + "[*] [thread {}] {} {} {}".format(Fore.GREEN + pid, color, Style.RESET_ALL))
if len(log_file) > 0 and log_this:
log_strs.append(m_str)
def differenciate_list(custom_list, real_list, l_type):
for k,i in enumerate(custom_list[:]):
if k >= len(real_list):
log(Fore.MAGENTA, "Added {} '{}' to current domain".format(l_type, custom_list[k]), log_this=False)
def formatEntry(e:Entry):
R = _S.RESET_ALL
ID = R + _B.BLUE
P = R + _F.LIGHTGREEN_EX
TAG = R + _F.RED
NAME = R + _B.RED
Bd = R + _F.CYAN
PATH = R + _F.YELLOW
TITLE = R + _F.LIGHTMAGENTA_EX
AUTHORS = R + _F.MAGENTA
prefix = Bd + '| ' + R
comment = ( s + '\n' + prefix for s in e.comment.split('\n') ) if e.comment != '' else ()
return ''.join( (
Bd, '--------------------------------------------------------------------------------', R, '\n',
prefix, ID, 'ID : ', '{:>5}'.format(e.ID or ''), ' '*47, P, '{:>20}'.format(e.priority), NAME, e.name, PATH, e.pathstr(),
*( (
prefix, TITLE, e.bibtex.get('title', ''),
prefix, AUTHORS, e.bibtex.get('author',
) if e.bibtex else (
prefix, '<No Bibtex>', '\n')
), (R + ' ').join(''.join((TAG, '#', t)) for t in e.tags),
prefix, *comment ,
Bd,
))
def give_llama_if_exchanger(dev_name):
stats = get_llama_stats(dev_name)
if stats['Given'] > stats['Received']:
dev_id = re.search(regex['llama_page_dev_id'], stats['HTML']).group(1)
give_llama(dev_id)
else:
llama_counts['not_exchanger'] += 1
echo(Fore.MAGENTA + '{:<5} {:<22} {:<5} {:<5} {:<5}'.format(llama_counts['not_exchanger'],
dev_name,
stats['Received'],
stats['Given'],
stats['Received'] - stats['Given']))
def initialize_git_repo(current):
os.chdir(current)
result = subprocess.Popen(['git', 'init'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
res1 = result.communicate()[0]
subprocess.Popen(['git', 'add', '.'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE).wait()
subprocess.Popen(['git', 'commit', '-m', 'First commit'], 'tag', '-a', '0.1', 'First alpha version'], stderr=subprocess.PIPE).wait()
print(Fore.MAGENTA + " [+] " + str(res1, 'utf-8') + Fore.RESET)
def recv(self, str, mode):
if str: print(Back.MAGENTA + str + Style.RESET_ALL)
if str and mode == 'hex':
print(Fore.MAGENTA + conv().hex(str, ':') + Style.RESET_ALL)
# show information
def debug(self, message):
print(Fore.MAGENTA + "[DEBUG] " + self.timestamp() + " " +
message.encode('ascii', 'ignore').decode('ascii')
+ Fore.RESET)
def f(self, func="basic", message=""):
print(Fore.MAGENTA + "[FUNC/{}] ".format(func.upper())
+ self.timestamp() + " " +
message.encode("ascii", "ignore").decode('ascii') + Fore.RESET)
def color_critical(message, logger_name = "root"):
__log(LEVEL_CRITICAL, '%s%s%s'%(Fore.MAGENTA, message, logger_name)
########################interface#######################################
def open_detail_page(filtered_goods):
"""expect a number or a string which joined by ','
to open the target goods url in a browser window
:param filtered_goods
:return: None
"""
print(colorful_text('which do you prefer? type it\'s index', Fore.MAGENTA))
print(colorful_text('if many,use \',\' to split them', Fore.MAGENTA))
print(colorful_text('use \'control + c\' to exit.', Fore.MAGENTA))
try:
index = input('goods index: ')
result_goods = filter(get_target_goods(
index.split(',')), filtered_goods)
goods_list = [goods for goods in result_goods]
if len(goods_list):
for goods in goods_list:
goods_url = goods["url"]
if goods_url[0] == '/':
goods_url = 'https:{}'.format(goods_url)
webbrowser.open_new(goods_url)
else:
error_message('no such index')
open_detail_page(filtered_goods)
except KeyboardInterrupt:
error_message('exit')
def print_purpul(*args):
""""""
raw = str(args)
init(autoreset=True)
print((Fore.MAGENTA + raw))
#----------------------------------------------------------------------
def print_purpul(*args):
""""""
raw = str(args)
init(autoreset=True)
print((Fore.MAGENTA + raw))
#----------------------------------------------------------------------
def __init__(self):
self.colors = {'red':Fore.RED,'green':Fore.GREEN,'cyan':Fore.CYAN,'yellow':Fore.LIGHTYELLOW_EX,'magenta':Fore.MAGENTA,'bold':Style.BRIGHT,'reset':Style.RESET_ALL}
self.translator = Translator()
self.select_languages = "tr"
self.database = "dictionary.db"
self.connection = sqlite3.connect(self.database)
self.cursor = self.connection.cursor()
self.columns = {'isim':'i_anlam','fiil':'f_anlam','zarf':'z_anlam','edat':'e_anlam','baglac':'b_anlam','sifat':'s_anlam','zamir':'zz_anlam'}
self.names2 = {'isim':'isim','zarf':'zarf','ba?laç':'baglac','s?fat':'sifat','zamir':'zamir','fiil':'fiil','edat':'edat'}
self.c_word = ""
self.c_word_last = ""
self.c_word_new = ""
def print_purpul(*args):
""""""
raw = str(args)
init(autoreset=True)
print((Fore.MAGENTA + raw))
#----------------------------------------------------------------------
def yt_queue(s, m):
global prev, player, yt
ydl_opts = {
'format': 'bestaudio/best',
'noplaylist': True,
'nocheckcertificate': True,
'quiet': True,
'outtmpl': ytDir + '%(id)s',
'default_search': 'auto'
}
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
meta = ydl.extract_info(s, download=False)
yt[meta['id']] = {
'id': meta['id'],
'title': meta['title'],
'artist': meta['uploader'],
'album': 'YouTube',
'composer': None, # meta['view_count'] / meta['like_count'] / meta['dislike_count']
'length': meta['duration'],
'file': meta['id']
}
if (meta['id'] in prev):
mainMessage = '[' + m.author.display_name + ']?The song [YT] _' + meta['title'] + '_ has already been played recently'
elif (meta['id'] in queue):
mainMessage = '[' + m.author.display_name + ']?The song [YT] _' + meta['title'] + '_ is already in the queue'
elif (meta['duration'] > 900):
mainMessage = '[' + m.author.display_name + ']?The song [YT] _' + meta['title'] + '_ is too long (max 15 minutes)'
else:
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
ydl.download([s])
queue.append(meta['id'])
dprint(Fore.MAGENTA + m.author.display_name + ' queued:' + Style.NORMAL + ' [YT] ' + meta['uploader'] + ' - ' + meta['title'])
mainMessage = '[' + m.author.display_name + ']?Queued [YT] _' + meta['title'] + '_'
await client.send_message(m.channel, mainMessage)
if (m.server): await client.delete_message(m)
def pretty_print_testcase(testcase, error=''):
""" Pretty print a testcase """
if error:
msg_template = Style.BRIGHT + '{id}' + Style.RESET_ALL + '\t' + \
Fore.MAGENTA + '{status}' + Fore.RESET + '\t' + \
'{name}\t=> ' + str(error)
elif testcase['status'] == 'PASS':
msg_template = Style.BRIGHT + '{id}' + Style.RESET_ALL + '\t' + \
Fore.LIGHTGREEN_EX + '{status}' + Fore.RESET + '\t' + \
'{name}\t'
else:
msg_template = Style.BRIGHT + '{id}' + Style.RESET_ALL + '\t' + \
Fore.LIGHTRED_EX + '{status}' + Fore.RESET + '\t' + \
'{name}\t'
print(msg_template.format(**testcase), end=Style.RESET_ALL)
def print_title( ):
print( "\n\n\t\t\t\t" + Fore.YELLOW + "SAMP HELPER" + Fore.MAGENTA + " PYTHON TOOL " + Fore.GREEN + "BY" + Fore.RED + " SREYAS" );
def print_magenta( str ):
print( Style.BRIGHT + Fore.MAGENTA + str );
def magenta(self, s):
return Fore.MAGENTA + s + Fore.RESET
# ??:? ?:?
def caller(self, proxy):
""" Populate the XML-RPC system.multicall() with the maximum number of predefined
of subrequests and then fire it.
"""
calls = 0
global exit_flag
pbar = tqdm(self.queue.qsize(), desc=self.name, total=self.queue.qsize(), unit='multicall', unit_scale=True, dynamic_ncols=True)
while not self.queue.empty() and not exit_flag:
chunks_size = self.queue.get()
multicall = xmlrpc.client.MultiCall(proxy)
for passwords in chunks_size:
# Can be any other available method that needs auth.
multicall.wp.getUsersBlogs(self.username, passwords.strip())
try:
if self.verbose:
pbar.write(Fore.MAGENTA + "[{}]".format(self.name) + TRAFFIC_OUT + "XML request [#{}]:".format(calls))
pbar.write("{}".format(chunks_size) + Style.RESET_ALL)
res = multicall()
except:
pbar.write(ERROR + "could not make an XML-RPC call" + Style.RESET_ALL)
continue
if self.verbose:
pbar.write(Back.MAGENTA + "[{}]".format(self.name) + TRAFFIC_IN + "XML response [#{}] (200 OK):".format(calls))
pbar.write("{}".format(res.results) + Style.RESET_ALL)
if re.search("isAdmin", str(res.results), re.MULTILINE):
i = 0
for item in res.results:
if re.search(r"'isAdmin': True", str(item)):
exit_flag = True
# let time for the threads to terminate
time.sleep(2)
# pbar.write() seems to be bugged at the moment
pbar.write(RESULT + "found a match: \"{0}:{1}\"".format(self.username, chunks_size[i].strip()) + Style.RESET_ALL)
# Log the password in case sys.stdout acts dodgy
with open("passpot.pot", "a+") as logfile:
logfile.write("{0} - {1}:{2}\n".format(self.xmlrpc_intf, self.username, chunks_size[i].strip()))
break
i += 1
calls += 1
self.queue.task_done()
pbar.update()
pbar.close()
def _report_created(self):
# set up the report data
rows = []
ips = []
data_center = self._get_data_center().get('ABBR')
plan = self._get_plan().get('RAM')
for linode in self._created_linodes:
rows.append((
linode['hostname'],
linode['public'],
linode['private'],
linode['gateway'],
data_center,
plan
))
ips.append(linode['public'])
firewall_command = './apply-firewall.py --private-key /path/to/key/deis --hosts ' + string.join(ips, ' ')
# set up the report constants
divider = Style.BRIGHT + Fore.MAGENTA + ('=' * 109) + Fore.RESET + Style.RESET_ALL
column_format = " {:<20} {:<20} {:<20} {:<20} {:<12} {:>8}"
formatted_header = column_format.format(*('HOSTNAME', 'PUBLIC IP', 'PRIVATE IP', 'GATEWAY', 'DC', 'PLAN'))
# display the report
print('')
print(divider)
print(divider)
print('')
print(Style.BRIGHT + Fore.LIGHTGREEN_EX + ' Successfully provisioned ' + str(self.num_nodes) + ' nodes!' + Fore.RESET + Style.RESET_ALL)
print('')
print(Style.BRIGHT + Fore.CYAN + formatted_header + Fore.RESET + Style.RESET_ALL)
for row in rows:
print(Fore.CYAN + column_format.format(*row) + Fore.RESET)
print('')
print('')
print(Fore.LIGHTYELLOW_EX + ' Finish up your installation by securing your cluster with the following command:' + Fore.RESET)
print('')
print(' ' + firewall_command)
print('')
print(divider)
print(divider)
print('')
def _report_created(self):
# set up the report data
rows = []
ips = []
data_center = self._get_data_center().get('ABBR')
plan = self._get_plan().get('RAM')
for linode in self._created_linodes:
rows.append((
linode['hostname'], 'PLAN'))
# display the report
print('')
print(divider)
print(divider)
print('')
print(Style.BRIGHT + Fore.LIGHTGREEN_EX + ' Successfully provisioned ' + str(self.num_nodes) + ' nodes!' + Fore.RESET + Style.RESET_ALL)
print('')
print(Style.BRIGHT + Fore.CYAN + formatted_header + Fore.RESET + Style.RESET_ALL)
for row in rows:
print(Fore.CYAN + column_format.format(*row) + Fore.RESET)
print('')
print('')
print(Fore.LIGHTYELLOW_EX + ' Finish up your installation by securing your cluster with the following command:' + Fore.RESET)
print('')
print(' ' + firewall_command)
print('')
print(divider)
print(divider)
print('')
def plot_similarity_clusters(desc1, desc2, plot = None):
"""
find similar sounds using Affinity Propagation clusters
:param desc1: first descriptor values
:param desc2: second descriptor values
:returns:
- euclidean_labels: labels of clusters
"""
if plot == True:
print (Fore.MAGENTA + "Clustering")
else:
pass
min_max = preprocessing.scale(np.vstack((desc1,desc2)).T, with_mean=False, with_std=False)
pca = PCA(n_components=2, whiten=True)
y = pca.fit(min_max).transform(min_max)
euclidean = AffinityPropagation(convergence_iter=1800, affinity='euclidean')
euclidean_labels= euclidean.fit_predict(y)
if plot == True:
time.sleep(5)
print (Fore.WHITE + "Cada número representa el grupo al que pertence el sonido como ejemplar de otro/s. El grupo '0' esta coloreado en azul,el grupo '1' esta coloreado en rojo,el grupo '2' esta coloreado en amarillo. Observa el ploteo para ver qué sonidos son ejemplares de otros")
print np.vstack((euclidean_labels,files)).T
time.sleep(6)
plt.scatter(y[euclidean_labels==0,0], y[euclidean_labels==0,1], c='b')
plt.scatter(y[euclidean_labels==1, y[euclidean_labels==1, c='r')
plt.scatter(y[euclidean_labels==2, y[euclidean_labels==2, c='y')
plt.scatter(y[euclidean_labels==3, y[euclidean_labels==3, c='g')
plt.show()
else:
pass
return euclidean_labels
# save clusters files in clusters directory
def hexdump(data, cols=80):
"""
This is the main function which prints everything.
"""
# print the header
print(Fore.MAGENTA + 'pyhexdump: {} bytes'.format(len(data)))
print('ascii characters: GREEN')
print('non-ascii: RED')
print(Fore.BLUE + '{:>6} | {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} | {}'.format(
'Offset(h)',
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
'String'
))
print('-'*cols + Fore.RESET)
# formating string for each line
print_string = Fore.BLUE + '{:09X} | ' + Fore.RESET + '{:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} {:02X} ' + Fore.GREEN + '| {}' + Fore.RESET
# break data up into 16 byte chunks
size = 16
buff = []
line = [0]*size
for i, char in enumerate(data):
if i % size == 0 and i != 0:
buff.append(line)
line = [0]*size
line[0] = char
else:
line[i % size] = char
if i == len(data) - 1:
buff.append(line)
# print data out
for i, line in enumerate(buff):
print(print_string.format(i,
line[0],
line[1],
line[2],
line[3],
line[4],
line[5],
line[6],
line[7],
line[8],
line[9],
line[10],
line[11],
line[12],
line[13],
line[14],
line[15],
recover(line)
)
)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。