Python discord 模块,ext() 实例源码
我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用discord.ext()。
def spank(self, ctx, user : discord.Member = None):
"""Spank"""
author = ctx.message.author
if not user:
with aiohttp.ClientSession() as session:
async with session.get("https://images-ext-1.discordapp.net/eyJ1cmwiOiJodHRwczovL2Nkbi5kaXNjb3JkYXBwLmNvbS9hdHRhY2htZW50cy8xMDc5NDI2NTIyNzU2MDE0MDgvMTA3OTQ1MDg3MzUwMDc5NDg4L1R1SEdKLmdpZiJ9.-XeFHSFOR0nv53M34HeUBqQc7Wc.gif") as resp:
test = await resp.read()
with open("data/commands/Images/imgres.gif", "wb") as f:
f.write(test)
await self.bot.say("{} spanked someone! :scream:".format(author.mention))
await self.bot.upload("data/commands/Images/imgres.gif")
else:
with aiohttp.ClientSession() as session:
async with session.get("https://images-ext-1.discordapp.net/eyJ1cmwiOiJodHRwczovL2Nkbi5kaXNjb3JkYXBwLmNvbS9hdHRhY2htZW50cy8xMDc5NDI2NTIyNzU2MDE0MDgvMTA3OTQ1MDg3MzUwMDc5NDg4L1R1SEdKLmdpZiJ9.-XeFHSFOR0nv53M34HeUBqQc7Wc.gif") as resp:
test = await resp.read()
with open("data/commands/Images/imgres.gif", "wb") as f:
f.write(test)
await self.bot.say("{} spanked {}! :scream:".format(author.mention, user.mention))
await self.bot.upload("data/commands/Images/imgres.gif")
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs, formatter=DogbotHelpFormatter())
# configuration dict
self.cfg = kwargs.get('cfg', {})
# aiohttp session used for fetching data
self.session = aiohttp.ClientSession(loop=self.loop)
# boot time (for uptime)
self.boot_time = datetime.datetime.utcnow()
# aioredis connection
self.redis = None
# asyncpg
self.database = self.cfg['db']['postgres']['database']
self.pgpool = None
# load core extensions
self._exts_to_load = []
self.load_extensions('dog/core/ext', 'Core recursive load')
def load_extensions(self, directory: str, prefix: str = 'Recursive load'):
"""Loads extensions from a directory recursively."""
IGNORE = {'__init__.py', '__pycache__', '.DS_Store'}
base = directory.replace('/', '.')
# build list of extension stems
extension_stems = [
path.stem for path in Path(directory).resolve().iterdir() \
if path.name not in IGNORE and path.suffix != 'pyc'
]
logger.debug('Extensions to load: %s', extension_stems)
for ext in extension_stems:
load_path = base + '.' + ext
logger.info('%s: %s', prefix, load_path)
self.load_extension(load_path)
# keep track of a list of extensions to load
self._exts_to_load = list(self.extensions.keys()).copy()
def update_statistics(pg: asyncpg.connection.Connection, ctx: commands.Context):
"""
Updates command statistics for a specific `discord.ext.commands.Context`.
If no record was found for a command,it is created. Otherwise,the ``times_used`` and
``last_used`` fields are updated.
"""
row = await get_statistics(pg, str(ctx.command))
if row is None:
# first time command is being used,insert it into the database
insert = 'INSERT INTO command_statistics VALUES ($1,1,$2)'
await pg.execute(insert, str(ctx.command), datetime.datetime.utcnow())
logger.info('First command usage for %s', ctx.command)
else:
# command was used before,increment time_used and update last_used
update = ('UPDATE command_statistics SET times_used = times_used + 1,last_used = $2 '
'WHERE command_name = $1')
await pg.execute(update, datetime.datetime.utcnow())
def emote(self, emote: str):
"""Get a Twitch,FrankerFaceZ,BetterTTV,or Discord emote.
Usage: emote [name of emote]"""
emote = emote.replace(':', '')
with async_timeout.timeout(13):
try:
async with self.bot.cog_http.get('https://static-cdn.jtvnw.net/emoticons/v1/' + str(self.bot.emotes['twitch'][emote]['image_id']) + '/1.0') as resp:
emote_img = await resp.read()
except KeyError: # let's try frankerfacez
try:
async with self.bot.cog_http.get('https://cdn.frankerfacez.com/emoticon/' + str(self.bot.emotes['ffz'][emote]) + '/1') as resp:
emote_img = await resp.read()
except KeyError: # let's try BetterTTV
try:
async with self.bot.cog_http.get(self.bot.emotes['bttv'][emote]) as resp:
emote_img = await resp.read()
except KeyError: # let's try Discord
await ctx.send('**No such emote!** I can fetch from Twitch,or Discord (soon).')
return False
img_bytes = io.BytesIO(emote_img)
ext = imghdr.what(img_bytes)
await ctx.send(file=discord.File(img_bytes, f'emote.{ext}'))
def rtfs(self, *, thing: str):
"""tries to show the source file of a thing
thing may be a non-builtin module,class,method,function,traceback,frame,or code object,
or if surrounded by single or double quotes,
a space separated discord.ext.commands call,
or a period deliminated file/module path as used when importing"""
msg = ctx.message
variables = {
'ctx': ctx,
'bot': self.bot,
'message': msg,
'server': msg.server,
'channel': msg.channel,
'author': msg.author
}
def call(thing, variables):
return self.repl_format_source(eval(thing, variables))
closure = _close(call, thing, variables)
await self.print_results(ctx, _call_catch_fmt(closure, 0))
def add_jose_cog(self, cls: 'class'):
"""Add a cog but load its requirements first."""
requires = cls._cog_metadata.get('requires', [])
log.debug('requirements for %s: %r', cls, requires)
for _req in requires:
req = f'ext.{_req}'
if not self.extensions.get(req):
log.debug('loading %r from requirements', req)
self.load_extension(req)
else:
log.debug('%s already loaded', req)
# We instantiate here because
# instantiating on the old add_cog
# is exactly the cause of the problem
cog = cls(self)
super().add_cog(cog)
def charge_ext(self):
extdiv = "data/biomes/ext/{}.txt"
chemin = extdiv.format(self.system["EXT"])
if os.path.isfile(chemin):
liste = chemin
with open(liste, "r", encoding="ISO-8859-1") as f:
liste = f.readlines()
self.charge = {}
for line in liste:
line = line.replace("\n", "")
tag = line[:3]
item = line[4:]
if tag not in self.charge:
self.charge[tag] = {"TAG" : tag, "ITEMS" : []}
self.charge[tag]["ITEMS"].append(item)
else:
self.save("charge")
return True
else:
return False
def on_command_error(ctx, error):
if isinstance(error, discord.ext.commands.errors.CommandNotFound):
pass # ...don't need to know if commands don't exist
elif isinstance(error, discord.ext.commands.errors.CheckFailure):
await ctx.send("You don't have permission to use this command.")
elif isinstance(error, discord.ext.commands.errors.MissingRequiredArgument):
formatter = commands.formatter.HelpFormatter()
await ctx.send("You are missing required arguments.\n{}".format(formatter.format_help_for(ctx, ctx.command)[0]))
elif isinstance(error, discord.ext.commands.errors.CommandOnCooldown):
await ctx.message.delete()
await ctx.send("This command is on cooldown,don't you dare try it again.", delete_after=10)
else:
if ctx.command:
await ctx.send("An error occurred while processing the `{}` command.".format(ctx.command.name))
print('Ignoring exception in command {0.command} in {0.message.channel}'.format(ctx))
tb = traceback.format_exception(type(error), error, error.__traceback__)
error_trace = "".join(tb)
print(error_trace)
embed = discord.Embed(description=error_trace.translate(bot.escape_trans))
await bot.err_logs_channel.send("An error occurred while processing the `{}` command in channel `{}`.".format(ctx.command.name, ctx.message.channel), embed=embed)
def imgur_remove(self,albumName,imageLink):
"""removes image from album,pass direct image"""
albumName = albumName.lower()
for album in imgurClient.get_account_albums('me'):
if albumName == album.title.lower():
#image = imgurClient.upload_from_url(args[1],config=None,anon=True)
#imgurClient.album_add_images(album.id,image['id'])
images = imgurClient.get_album_images(album.id)
#imageID = urlparse(imageLink).path[1::] #remove / at start
imageID, ext = splitext(urlparse(imageLink).path)
imageID = imageID[1::]
for image in images:
if imageID == image.id:
imgurClient.album_remove_images(album.id,imageID)
await self.bot.say("removed {} from album".format(image.id))
break
else:
#album not found
await self.bot.say("Album: " + albumName + " not found")
def get_ext(url):
"""Return the filename extension from url,or ''."""
parsed = urlparse(url)
root, ext = splitext(parsed.path)
return ext[1:] # or ext if you want the leading '.'
def get_extensions(self, path, excl):
extensions = []
for root, dir, files in os.walk(path):
for items in fnmatch.filter(files, "*"):
temp_extensions = items.rfind(".")
ext = items[temp_extensions+1:]
if ext not in extensions:
if ext in excl:
extensions.append(ext)
pass
return extensions
def on_command_error(error, ctx):
if isinstance(error, discord.ext.commands.errors.CommandNotFound):
pass # ...don't need to know if commands don't exist
if isinstance(error, discord.ext.commands.errors.CheckFailure):
await bot.send_message(ctx.message.channel, "{} You don't have permission to use this command.".format(ctx.message.author.mention))
elif isinstance(error, discord.ext.commands.errors.MissingRequiredArgument):
formatter = commands.formatter.HelpFormatter()
await bot.send_message(ctx.message.channel, "{} You are missing required arguments.\n{}".format(ctx.message.author.mention, formatter.format_help_for(ctx, ctx.command)[0]))
else:
if ctx.command:
await bot.send_message(ctx.message.channel, "An error occured while processing the `{}` command.".format(ctx.command.name))
print('Ignoring exception in command {}'.format(ctx.command), file=sys.stderr)
traceback.print_exception(type(error), error.__traceback__, file=sys.stderr)
def get_server(context: Context) -> discord.Server:
""" Gets the server to which a command was sent,
based on the command's context.
:param context: The context in which the command was sent.
:type context: discord.ext.commands.Context
:return: The server
"""
return context.message.server
def get_server_id(context: Context) -> str:
""" Gets the ID of the server to which a command was sent,
based on the command's context.
:param context: The context in which the command was sent.
:type context: discord.ext.commands.Context
:return: The server's ID
"""
server = get_server(context)
return None if server is None else server.id
def get_channel(context: Context) -> discord.Channel:
""" Gets a channel to which a command was sent,based on the command's
context.
:param context: The context in which the command was sent.
:type context: discord.ext.commands.Context
:return: The channel
"""
return context.message.channel
def get_channel_id(context: Context) -> str:
""" Gets the ID of the channel to which a command was sent,
based on the command's context.
:param context: The context in which the command was sent.
:type context: discord.ext.commands.Context
:return: The channel's ID
"""
return get_channel(context).id
def get_channel_name(context: Context) -> str:
""" Gets the name of the channel to which a command was sent,
based on the command's context.
:param context: The context in which the command was sent.
:type context: discord.ext.commands.Context
:return: The channel's name
"""
return get_channel(context).name
def get_author_name(context: Context, nick=False) -> str:
""" Gets the name of the author of a command,based on the command's
context.
:param context: The context in which the command was sent.
:type context: discord.ext.commands.Context
:param nick: Whether the given value should be the real user name,
or the member's nick (if available). Defaults to False
:type nick: bool
:return: The author's name
"""
return get_name(context.message.author, nick)
def author_has_role(context: commands.Context, role_id: str) -> bool:
""" Checks within a command's authors roles for one that has a matching ID
to the one given.
:param context: The context in which the command was sent.
:type context: discord.ext.commands.Context
:param role_id: The ID of a role to check for.
:type role_id: str
:return: True if the author has a role with the given ID,false otherwise.
"""
return has_role(context.message.author, role_id)
def to_boolean(value) -> bool:
"""Parses a string to boolean. Only meant to be used for command arguments.
:param value: The string to evaluate.
:return: The parsed value.
:raise: discord.ext.commands.errors.BadArgument: If the value
cannot be parsed.
.. note::
The valid values are not just 'True' or 'False'.
It can be either 'true','on','yes' or 'y' for True
or 'false','off','no' or 'n' for False
and is not case-sensitive (so something like TruE is valid).
"""
if isinstance(value, bool):
return value
value = str(value).lower()
if value in ['1', 'true', 'on', 'yes', 'y']:
return True
elif value in ['0', 'false', 'off', 'no', 'n']:
return False
else:
raise TypeError("Could not parse {} to boolean".format(value))
def on_command_error(self, exception, context):
"""
Override bot.on_command_error function.
Error handling function.
"""
if isinstance(exception, discord.ext.commands.errors.CommandNotFound):
msg = """Sorry,this command is unknown to me... :japanese_ogre:\
\nDo you need help? \
\nIf so,just type ***!help*** :sunglasses:"""
elif isinstance(exception,
discord.ext.commands.errors.DisabledCommand):
msg = ":sleeping:"
elif isinstance(exception,
discord.ext.commands.errors.CheckFailure):
msg = """:octagonal_sign: you are not allowed to do this.\
\nOnly an :alien: can wake me up..."""
elif isinstance(exception,
discord.ext.commands.errors.MissingRequiredArgument):
msg = """:thought_balloon: You forgot parameters.\
\nType !help [command] to get help :interrobang:"""
elif isinstance(exception,
APIconnectionError):
msg = """It seems we can't contact the API...:frowning2: \
\nTake a rest and retry later.:play_pause: """
elif isinstance(exception,
discord.ext.commands.errors.CommandInvokeError):
msg = "oups... " + str(exception)
print(msg)
else:
msg = "oups inconnu... " + str(type(exception)) + str(exception)
print(msg)
await self.send_message(context.message.channel, msg)
def kys(self, ctx):
"""Kill Yourself"""
author = ctx.message.author
with aiohttp.ClientSession() as session:
async with session.get("https://images-ext-2.discordapp.net/eyJ1cmwiOiJodHRwczovL2Nkbi5tZW1lLmFtL2luc3RhbmNlcy81MDB4LzY3NTY4NTg0LmpwZyJ9.oNix5m3kkZkmQK5V8uDlLD_fVe0.jpg") as resp:
test = await resp.read()
with open("data/commands/Images/imgres.png", "wb") as f:
f.write(test)
await self.bot.upload("data/commands/Images/imgres.png")
def datboi(self, ctx):
"""Dait Boi"""
author = ctx.message.author
foo = ["http://i1.kym-cdn.com/photos/images/newsfeed/001/112/704/8a8.jpg", "http://2static.fjcdn.com/pictures/Origins_3c4898_5920950.jpg", "http://i3.kym-cdn.com/photos/images/original/001/116/633/8e9.jpg", "http://www.kappit.com/img/pics/201605_1023_iagcb_sm.jpg", "https://img0.etsystatic.com/125/1/12078849/il_340x270.969775168_rzq8.jpg", "https://i.ytimg.com/vi/fs5Sudmauks/maxresdefault.jpg", "http://i1.kym-cdn.com/photos/images/original/001/118/006/5b1.jpg", "http://i3.kym-cdn.com/photos/images/newsfeed/001/118/078/22b.jpeg", "http://i3.kym-cdn.com/photos/images/newsfeed/001/118/014/e78.png", "https://images-ext-1.discordapp.net/eyJ1cmwiOiJodHRwOi8vY2RuLnNtb3NoLmNvbS9zaXRlcy9kZWZhdWx0L2ZpbGVzLzIwMTYvMDUvZGF0LWJvaS1tZW1lcy1ib3ktd2hvLWxpdmVkLmpwZyJ9.WBSEhlT69xiEWq0KRgR90j9YwDA.jpg?width=243&height=249", "http://i2.kym-cdn.com/photos/images/newsfeed/001/112/712/650.jpg"]
with aiohttp.ClientSession() as session:
async with session.get("{}".format(random.choice(foo))) as resp:
test = await resp.read()
with open("data/commands/Images/imgres.png", "wb") as f:
f.write(test)
await self.bot.say("Here comes dat boi! o shit waddup!:\n")
await self.bot.upload("data/commands/Images/imgres.png")
def reload_modules(self):
"""Reloads all Dogbot related modules."""
# get applicable modules to reload
modules = {k: m for k, m in sys.modules.items() if ('dog' in k and 'datadog' not in k) and 'ext' not in k and
k != 'dog'}
for name, module in modules.items():
logger.info('Reloading bot module: %s', name)
importlib.reload(module)
logger.info('Finished reloading bot modules!')
def get_statistics(pg: asyncpg.connection.Connection, command_name: str) -> \
Union[List[asyncpg.Record], None]:
"""
Fetches statistics for a specific ``discord.ext.commands.Context``.
If no record was found,``None`` is returned.
"""
return await pg.fetchrow('SELECT * FROM command_statistics WHERE command_name = $1',
command_name)
def get_nitro_embed(self):
emb = discord.Embed(color=0x505e80, description='Discord Nitro is **required** to view this message.')
emb.set_thumbnail(url='https://images-ext-2.discordapp.net/eyJ1cmwiOiJodHRwczovL2Nkbi5kaXNjb3'
'JkYXBwLmNvbS9lbW9qaXMvMjY0Mjg3NTY5Njg3MjE2MTI5LnBuZyJ9.2'
'6ZJzd3ReEjyptc_N8jX-00oFGs')
emb.set_author(name='Discord Nitro Message', icon_url='https://images-ext-1.discordapp.net/eyJ1'
'cmwiOiJodHRwczovL2Nkbi5kaXNjb3JkYXBwLmNvbS9lbW9qaXMvMjYz'
'MDQzMDUxMzM5OTA3MDcyLnBuZyJ9.-pis3JTckm9LcASNN16DaKy9qlI')
return emb
def _reload(self, ext: str = None):
"""Reloads a module. Can only be used by the owner."""
if ext:
self.bot.unload_extension(ext)
self.bot.load_extension(ext)
else:
for m in self.bot.initial_extensions:
log.info('Reloading cog %s', m)
self.bot.unload_extension(m)
self.bot.load_extension(m)
def __init__(self):
super().__init__(command_prefix=_callable_prefix,
formatter=_chiaki_formatter,
description=config.description,
pm_help=None)
# loop is needed to prevent outside coro errors
self.session = aiohttp.ClientSession(loop=self.loop)
self.table_base = None
try:
with open('data/command_image_urls.json') as f:
self.command_image_urls = __import__('json').load(f)
except FileNotFoundError:
self.command_image_urls = {}
self.message_counter = 0
self.command_counter = collections.Counter()
self.custom_prefixes = JSONFile('customprefixes.json')
self.cog_aliases = {}
self.reset_requested = False
psql = f'postgresql://{config.psql_user}:{config.psql_pass}@{config.psql_host}/{config.psql_db}'
self.db = asyncqlio.DatabaseInterface(psql)
self.loop.run_until_complete(self._connect_to_db())
self.db_scheduler = DatabaseScheduler(self.db, timefunc=datetime.utcnow)
self.db_scheduler.add_callback(self._dispatch_from_scheduler)
for ext in config.extensions:
# Errors should never pass silently,if there's a bug in an extension,
# better to know now before the bot logs in,because a restart
# can become extremely expensive later on,especially with the
# 1000 IDENTIFYs a day limit.
self.load_extension(ext)
self._game_task = self.loop.create_task(self.change_game())
def on_command_error(ctx, discord.ext.commands.errors.CommandNotFound):
await ctx.author.send(
"{},this command does not exist!```{}```".format(ctx.message.author.mention, ctx.message.content))
elif isinstance(error, discord.ext.commands.errors.NotOwner):
await ctx.author.send("{},only my Owner can ask me to do that,nya!```{}```".format(ctx.message.author.mention,
ctx.message.content))
elif isinstance(error, discord.ext.commands.errors.UserInputError):
await ctx.author.send(
"{},Input error```py\n{}: {}\n```".format(ctx.message.author.mention, type(error).__name__, str(error)))
elif isinstance(error, discord.ext.commands.errors.NoPrivateMessage):
await ctx.author.send(
"{},this command cannot be send in a PM!```{}```".format(ctx.message.author.mention, discord.ext.commands.errors.CheckFailure):
await ctx.author.send(
"You don\'t have the permission to use this command,{}```{}```".format(ctx.message.author.mention,
ctx.message.content))
else:
await ctx.author.send(
"{},error```py\n{}: {}\n```".format(ctx.message.author.mention, str(error)))
has_mention = False
for arg in ctx.args:
if '@' in str(arg):
has_mention = True
break
if has_mention is True:
return False
await ctx.message.delete()
def _get_league_fixtures_timeframe(self, server_id: str, league_id: str, timeframe: str):
"""Retrieves specific league matchday fixtures from API
Optional timeframe parameter:
The value of the timeFrame argument must start with either p(ast) or n(ext),representing a timeframe either in the past or future. It is followed by a number in the range 1..99. It defaults to n7 in the fixture resource and is unset for fixture as a subresource.
For instance: p6 would return all fixtures in the last 6 days,whereas n23 would result in returning all fixtures in the next 23 days."""
params = {'timeFrame': timeframe}
url = self.api_url + 'competitions/{}/fixtures'.format(league_id)
return await self._make_request(url, params, server_id)
def on_message(message):
msgchan = message.channel
if await command(message, "help", True):
cmd = message.content[len(prefix + "help "):]
help_cmds = ""
if cmd == "":
for ext in cmds.keys():
help_cmds += "\n**{}**:\n".format(ext)
for cmda in cmds[ext].keys():
if len(cmda + cmds[ext][cmda]['help']) > 70:
help_cmds += "\t- `{}`: {}...\n".format(cmda, cmds[ext][cmda]['help'][:70])
else:
help_cmds += "\t- `{}`: {}\n".format(cmda, cmds[ext][cmda]['help'])
if len(help_cmds) > 1750:
await say(msgchan, help_cmds)
help_cmds = ""
await say(msgchan, help_cmds)
help_cmds = ""
await say(msgchan, "To get information of a specific command type {}help <command>".format(prefix))
else:
error = 0
for ext in cmds.keys():
try:
temp = cmds[ext][cmd]['help']
await say(msgchan, "`{}` ({}):\n{}\n\nUsage:\n`{}`".format(cmd, ext, cmds[ext][cmd]['help'], prefix + cmds[ext][cmd]['usage']))
temp = None
except:
temp = None
error += 1
if error == len(cmds.keys()):
await say(msgchan, "The command you entered ({}) could not be found.".format(cmd))
def get_source(self, thing):
"""returns a source object of a thing
thing may be a non-builtin module,
or a space separated discord.ext.commands call,
or a period deliminated file/module path as used when importing
"""
if isinstance(thing, str):
if '.' in thing: # import
modules = thing.split('.')
def get_last_attr(prev, attrs):
try:
return prev.callback
except AttributeError:
if not attrs:
return prev
return get_last_attr(getattr(prev, attrs.pop(0)),
attrs)
thing = get_last_attr(__import__(modules.pop(0)), modules)
else: # space delimited command call
names = thing.split()
thing = self.bot.commands[names.pop(0)]
for name in names:
thing = thing.commands[name]
thing = thing.callback
return Source(thing)
def handle_message_mention(self, origin, message):
perms = origin.permissions_in(message.channel)
if not perms.read_messages:
# if you can't read the messages,then you shouldn't get PM'd.
return
messages = []
async for msg in self.bot.logs_from(message.channel, limit=3, before=message):
messages.append(msg)
messages.reverse()
messages.append(message)
if origin.status != discord.Status.online:
# wait 30 seconds for context if it's not always on
await asyncio.sleep(30)
# get an updated reference,server references don't change
member = message.server.get_member(origin.id)
if member is None or member.status == discord.Status.online:
# they've come online,so they might have checked the mention
return
# get the messages after this one
ext = []
async for msg in self.bot.logs_from(message.channel, after=message):
ext.append(msg)
ext.reverse()
messages.extend(ext)
try:
fmt = 'You were mentioned in {0.channel.mention}:\n{1}'
fmt = fmt.format(message, '\n'.join(map(self.format_message, messages)))
await self.bot.send_message(origin, fmt)
except:
# silent failure best failure
pass
def audioplayer(self, error_on_none=True):
# TODO: ACCOUNT FOR WHEN THIS MESSAGE IS A PM
if isinstance(ctx, discord.ext.commands.Context):
if ctx.message.guild is None: # This is a private channel,so give it user
ctx = ctx.message.author
else:
ctx = ctx.message.guild
if isinstance(ctx, discord.User):
author = ctx
for audioplayer in self.audioplayers:
member = audioplayer.guild.get_member(author.id)
if member and member.voice and audioplayer.voice and audioplayer.voice.channel.id == member.voice.channel.id:
if botdata.guildinfo(audioplayer.guild).is_banned(member):
raise AudioPlayerNotFoundError("Nice try,but you're banned in the voice channel that I'm in")
return audioplayer
if error_on_none:
raise AudioPlayerNotFoundError("You're not in any voice channels that I'm in")
else:
return None
elif isinstance(ctx, discord.Guild):
guild = ctx
elif isinstance(ctx, discord.abc.GuildChannel):
guild = ctx.guild
else:
raise ValueError(f"Incorrect type '{type(ctx)}' given to audioplayer function")
for audioplayer in self.audioplayers:
if audioplayer.guild == guild:
return audioplayer
if error_on_none:
raise AudioPlayerNotFoundError(f"I'm not in a voice channel on this server/guild. Have an admin do `{self.bot.command_prefix}summon` to put me in one.")
else:
return None
# Connects an audioplayer for the correct guild to the indicated channel
def load_all(self):
"""Load all extensions in the extensions list,
but make sure all dependencies are matched for every cog."""
for extension in extensions:
try:
self.load_extension(f'ext.{extension}')
except Exception as err:
log.error(f'Failed to load {extension}', exc_info=True)
sys.exit(1)
def midi(self, ctx: commands.Context,
tempo: int=120, data: str=None):
"""
Convert text to MIDI. Multiple channels can be used by splitting text with |.
Letters are converted to their pitch values using a mapping.
Full documentation about it is not provided,read the code at
https://github.com/lnmds/jose/blob/master/ext/midi.py
To give longer input than a discord message allows you may upload a .txt file of up to 20 KiB.
"""
if data is None:
try:
data = await self.download_data(ctx.message)
except Exception as err:
log.exception('error downloading file at midi')
raise self.SayException('We had an error while downloading '
'the file,are you sure it is text?')
before = time.monotonic()
midi_file = await self.make_midi(tempo, data)
duration = (time.monotonic() - before) * 1000
if midi_file is None:
return await ctx.send('Failed to generate a MIDI file!')
file = io.BytesIO()
await self.loop.run_in_executor(None, midi_file.writeFile, file)
file.seek(0)
wrapped = discord.File(file, filename='boop.midi')
await ctx.send(f'Took {duration:.3f}ms!', file=wrapped)
def __init__(self, bot):
self.bot = bot
self.player = dataIO.load_json("data/biomes/player.json")
self.system = dataIO.load_json("data/biomes/system.json")
self.charge = dataIO.load_json("data/biomes/charge.json")
self.sponsor = dataIO.load_json("data/biomes/sponsor.json")
defpar = {"SPONSOR_NB": 0, "STOP" : False, "EXT" : None,"OPEN" : False, "PLAYING" : False,"PLAYERS" : 0,"MAX_PLAYERS" : 12, "SAISON" : 1}
extdiv = "data/biomes/ext/{}.txt"
def ext(self, val:str):
"""Change l'extension de la session."""
nom = val
val += ".txt"
exts = os.listdir("data/biomes/ext/")
if val in exts:
self.system["EXT"] = nom
self.save("system")
await self.bot.say("Extension {} selectionnée ".format(nom.title()))
else:
await self.bot.say("Cette extension n'existe pas.")
def check_folders():
if not os.path.exists("data/biomes"):
print("Creation du fichier Biomes...")
os.makedirs("data/biomes")
if not os.path.exists("data/biomes/ext/"):
print("Creation du fichier Extensions Biomes...")
os.makedirs("data/biomes/ext/")
def build_rtfm_lookup_table(self):
cache = {}
page_types = {
'rewrite': (
'http://discordpy.rtfd.io/en/rewrite/api.html',
'http://discordpy.rtfd.io/en/rewrite/ext/commands/api.html'
),
'latest': (
'http://discordpy.rtfd.io/en/latest/api.html',
)
}
for key, pages in page_types.items():
sub = cache[key] = {}
for page in pages:
async with self.bot.session.get(page) as resp:
if resp.status != 200:
raise RuntimeError('Cannot build rtfm lookup table,try again later.')
text = await resp.text(encoding='utf-8')
root = etree.fromstring(text, etree.HTMLParser())
nodes = root.findall(".//dt/a[@class='headerlink']")
for node in nodes:
href = node.get('href', '')
as_key = href.replace('#discord.', '').replace('ext.commands.', '')
sub[as_key] = page + href
self._rtfm_cache = cache
def steam_error(self, ctx):
if isinstance(error, commands.errors.CommandOnCooldown):
seconds = str(error)[34:]
await self.bot.say(f':alarm_clock: Cooldown! Versuche es in {seconds} erneut')
if isinstance(error, discord.ext.commands.errors.CommandInvokeError):
await self.bot.say(':x: Konnte keine Verbindung zum Steam API aufbauen')
def init_extensions(self):
for ext in os.listdir('extensions'):
if ext.endswith('.py') and not ext.startswith('core'):
try:
self.bot.load_extension(f'extensions.{ext[:-3]}')
self.settings['extensions'].append(f'extensions.{ext[:-3]}')
except:
pass
def download(url, ext : str = "jpg", sizeLimit : int = 8000000, ua : str = 'CorpNewt DeepThoughtBot'):
"""Download the passed URL and return the file path."""
# Set up a temp directory
dirpath = tempfile.mkdtemp()
tempFileName = url.rsplit('/', 1)[-1]
# Strip question mark
tempFileName = tempFileName.split('?')[0]
imagePath = dirpath + "/" + tempFileName
try:
rImage = requests.get(url, stream = True, headers = {'User-agent': ua})
except:
remove(dirpath)
return None
with open(imagePath, 'wb') as f:
for chunk in rImage.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
# Check if the file exists
if not os.path.exists(imagePath):
remove(dirpath)
return None
# Let's make sure it's less than the passed limit
imageSize = os.stat(imagePath)
while int(imageSize.st_size) > sizeLimit:
try:
# Image is too big - resize
myimage = Image.open(imagePath)
xsize, ysize = myimage.size
ratio = sizeLimit/int(imageSize.st_size)
xsize *= ratio
ysize *= ratio
myimage = myimage.resize((int(xsize), int(ysize)), Image.ANTIALIAS)
myimage.save(imagePath)
imageSize = os.stat(imagePath)
except Exception:
# Image too big and can't be opened
remove(dirpath)
return None
try:
# Try to get the extension
img = Image.open(imagePath)
ext = img.format
img.close()
except Exception:
# Not something we understand - error out
remove(dirpath)
return None
if ext:
os.rename(imagePath, '{}.{}'.format(imagePath, ext))
return '{}.{}'.format(imagePath, ext)
else:
return imagePath
def load_cogs(bot):
defaults = ("alias", "audio", "customcom", "downloader", "economy",
"general", "image", "mod", "streams", "trivia")
try:
registry = dataIO.load_json("data/red/cogs.json")
except:
registry = {}
bot.load_extension('cogs.owner')
owner_cog = bot.get_cog('Owner')
if owner_cog is None:
print("The owner cog is missing. It contains core functions without "
"which Red cannot function. Reinstall.")
exit(1)
if bot.settings._no_cogs:
bot.logger.debug("Skipping initial cogs loading (--no-cogs)")
if not os.path.isfile("data/red/cogs.json"):
dataIO.save_json("data/red/cogs.json", {})
return
failed = []
extensions = owner_cog._list_cogs()
if not registry: # All default cogs enabled by default
for ext in defaults:
registry["cogs." + ext] = True
for extension in extensions:
if extension.lower() == "cogs.owner":
continue
to_load = registry.get(extension, False)
if to_load:
try:
owner_cog._load_cog(extension)
except Exception as e:
print("{}: {}".format(e.__class__.__name__, str(e)))
bot.logger.exception(e)
failed.append(extension)
registry[extension] = False
dataIO.save_json("data/red/cogs.json", registry)
if failed:
print("\nFailed to load: {}\n".format(" ".join(failed)))
def __init__(self, bot: commands.Bot, idx_emoji, idx_embed):
"""
Create a new PagedEmbed.
Arguments:
ctx : discord.ext.commands.Context
The context in which the command
to send the PagedEmbed was invoked.
bot : discord.ext.commands.Bot
The bot that the application is
logged in with. In Cogs,this is
usually saved as an instance
attribute `bot`.
idx_emoji : str
The Emoji which is used to navigate
to the initial page,constructed
through arguments and keyword
arguments passed to this constructor.
Example:
embed = PagedEmbed(ctx,bot,"??",
title="Index Page",
description="This page gets shown initially."
)
embed.add_page("??",discord.Embed(
title="Second Page",
description="This page gets shown when you click the ?? emoji."
))
"""
self._ctx = ctx
self._bot = bot
self._msg = None
# Dictionary to contain embed Pages in the following format:
# {
# "idx_emoji": discord.Embed,
# "emoji": discord.Embed,
# ...
# }
self._pages = {
idx_emoji: idx_embed
}
self.current_page = self._pages[idx_emoji]
# Attach our event listener to the bot
self._bot.add_listener(self.on_reaction_add)
def _addimage(self, category: str,
image_url: str=None):
"""Adds a new image to the specified category."""
await self.bot.type()
server = ctx.message.server
if server.id not in os.listdir(self.base):
self.settings[server.id] = default_settings
dataIO.save_json(self.settings_path, self.settings)
os.makedirs(os.path.join(self.base, server.id))
if category not in self._list_image_dirs(server.id):
await self.bot.reply(cf.error("Category not found."))
return
attach = ctx.message.attachments
if len(attach) > 1 or (attach and image_url):
await self.bot.reply(cf.error("Please only provide one file."))
return
url = ""
filename = ""
if attach:
a = attach[0]
url = a["url"]
filename = a["filename"]
elif image_url:
url = image_url
filename = os.path.basename(
"_".join(url.split()).replace("%20", "_"))
else:
await self.bot.reply(cf.error(
"You must provide either a Discord attachment"
" or a direct link to an image."))
return
new_id = str(self.settings[server.id]["next_ids"][category])
self.settings[server.id]["next_ids"][category] += 1
dataIO.save_json(self.settings_path, self.settings)
ext = os.path.splitext(filename)[1]
filepath = os.path.join(
self.base, server.id, category, "{}_{}.{}".format(
category, new_id, ext))
async with aiohttp.get(url) as new_sound:
f = open(filepath, "wb")
f.write(await new_sound.read())
f.close()
await self.bot.reply(cf.info("Image added."))
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。