Python json 模块,load() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用json.load()。
def get_named_set(lang_codes, feature_set):
if feature_set == 'id':
return get_id_set(lang_codes)
if feature_set not in FEATURE_SETS:
print("ERROR: Invalid feature set " + feature_set, file=sys.stderr)
sys.exit()
filename, source, prefix = FEATURE_SETS[feature_set]
feature_database = np.load(filename)
lang_codes = [ get_language_code(l, feature_database) for l in lang_codes ]
lang_indices = [ get_language_index(l, feature_database) for l in lang_codes ]
feature_names = get_feature_names(prefix, feature_database)
feature_indices = [ get_feature_index(f, feature_database) for f in feature_names ]
source_index = get_source_index(source, feature_database)
feature_values = feature_database["data"][lang_indices,:,:][:,feature_indices,source_index]
feature_values = feature_values.squeeze(axis=2)
return feature_names, feature_values
def directory_has_smart_contract(location):
# returns bool if there is a tsol contract in said directory
# probably makes more sense to put this inside of the tsol package
code_path = glob.glob(os.path.join(location, '*.tsol'))
example = glob.glob(os.path.join(location, '*.json'))
assert len(code_path) > 0 and len(example) > 0, 'Could not find *.tsol and *.json files in provided directory.'
# pop off the first file name and turn the code into a file object
code = open(code_path[0])
# turn the example into a dict
with open(example[0]) as e:
example = json.load(e)
try:
tsol.compile(code, example)
except Exception as e:
print(e)
return False
return True
def load_prevIoUs(self, path=None):
"""Load prevIoUs copy of config from disk.
In normal usage you don't need to call this method directly - it
is called automatically at object initialization.
:param path:
File path from which to load the prevIoUs config. If `None`,
config is loaded from the default location. If `path` is
specified,subsequent `save()` calls will write to the same
path.
"""
self.path = path or self.path
with open(self.path) as f:
self._prev_dict = json.load(f)
for k, v in copy.deepcopy(self._prev_dict).items():
if k not in self:
self[k] = v
def info_to_db(db, name):
cur = db.cursor()
with open(name) as data_file:
data = json.load(data_file)
subject = data['subject']
code = data['code']
title = data['title']
credit = data['credit']
desc = data['desc']
geneds = data['geneds']
if geneds is None:
geneds = ''
else:
geneds = ','.join(geneds)
url = data['url']
cmd = 'INSERT INTO `CourseExplorer` (`Id`,`Subject`,`Code`,`Title`,`Credit`,`Description`,`GenEd`,' \
'`Url`) VALUES(NULL,%s,%s) '
val = (subject, code, title, credit, desc, geneds, url)
cur.execute(cmd, val)
# if subject == 'PHYS':
# print cmd % val
def _set_asset_paths(self, app):
"""
Read in the manifest json file which acts as a manifest for assets.
This allows us to get the asset path as well as hashed names.
:param app: aiohttp application
:return: None
"""
webpack_stats = app.settings.WEBPACK_MANIFEST_PATH
try:
with open(webpack_stats, 'r') as stats_json:
stats = json.load(stats_json)
if app.settings.WEBPACK_ASSETS_URL:
self.assets_url = app.settings.WEBPACK_ASSETS_URL
else:
self.assets_url = stats['publicPath']
self.assets = stats['assets']
except IOError:
raise RuntimeError(
"'WEBPACK_MANIFEST_PATH' is required to be set and "
"it must point to a valid json file.")
def _get_external_data(url):
result = {}
try:
# urlopen might fail if it runs into redirections,
# because of Python issue #13696. Fixed in locators
# using a custom redirect handler.
resp = urlopen(url)
headers = resp.info()
ct = headers.get('Content-Type')
if not ct.startswith('application/json'):
logger.debug('Unexpected response for JSON request: %s', ct)
else:
reader = codecs.getreader('utf-8')(resp)
#data = reader.read().decode('utf-8')
#result = json.loads(data)
result = json.load(reader)
except Exception as e:
logger.exception('Failed to get external data for %s: %s', url, e)
return result
def save(self, pypi_version, current_time):
# Check to make sure that we own the directory
if not check_path_owner(os.path.dirname(self.statefile_path)):
return
# Now that we've ensured the directory is owned by this user,we'll go
# ahead and make sure that all our directories are created.
ensure_dir(os.path.dirname(self.statefile_path))
# Attempt to write out our version check file
with lockfile.LockFile(self.statefile_path):
if os.path.exists(self.statefile_path):
with open(self.statefile_path) as statefile:
state = json.load(statefile)
else:
state = {}
state[sys.prefix] = {
"last_check": current_time.strftime(SELFCHECK_DATE_FMT),
"pypi_version": pypi_version,
}
with open(self.statefile_path, "w") as statefile:
json.dump(state, statefile, sort_keys=True,
separators=(",", ":"))
def load(self):
# XXX JSON is not a great database
for path in load_config_paths('wheel'):
conf = os.path.join(native(path), self.CONfig_NAME)
if os.path.exists(conf):
with open(conf, 'r') as infile:
self.data = json.load(infile)
for x in ('signers', 'verifiers'):
if not x in self.data:
self.data[x] = []
if 'schema' not in self.data:
self.data['schema'] = self.SCHEMA
elif self.data['schema'] != self.SCHEMA:
raise ValueError(
"Bad wheel.json version {0},expected {1}".format(
self.data['schema'], self.SCHEMA))
break
return self
def __load_layout(self, config):
var = config.get_value('engine/replace-with-kanji-python', 'layout')
if var is None or var.get_type_string() != 's':
path = os.path.join(os.getenv('IBUS_REPLACE_WITH_KANJI_LOCATION'), 'layouts')
path = os.path.join(path, 'roomazi.json')
if var:
config.unset('engine/replace-with-kanji-python', 'layout')
else:
path = var.get_string()
logger.info("layout: %s", path)
layout = roomazi.layout # Use 'roomazi' as default
try:
with open(path) as f:
layout = json.load(f)
except ValueError as error:
logger.error("JSON error: %s", error)
except OSError as error:
logger.error("Error: %s", error)
except:
logger.error("Unexpected error: %s %s", sys.exc_info()[0], sys.exc_info()[1])
self.__to_kana = self.__handle_roomazi_layout
if 'Type' in layout:
if layout['Type'] == 'Kana':
self.__to_kana = self.__handle_kana_layout
return layout
def _get_cache(ttl, cache_path):
'''
If url contains valid cache,returns it,else returns empty list.
'''
# Check if we have a valid cached version.
try:
cached_time = os.path.getmtime(cache_path)
except OSError:
return []
if current_time() - cached_time < ttl:
log.debug('%s is less than ttl', cache_path)
try:
with open(cache_path) as json_file:
loaded_json = json.load(json_file)
return loaded_json
except IOError:
return []
except ValueError:
log.error('%s was not json formatted', cache_path)
return []
else:
log.debug('%s was older than ttl', cache_path)
return []
def test_update_text(self):
with open(os.path.join(self.FIXTURES_DIR, 'eyes_in_the_skies.json')) as f:
final_data = json.load(f)
original_text = final_data['RTR']['cards'][0]['originalText']
final_text = final_data['RTR']['cards'][0]['text']
# copy the data and munge it into its original state.
original_data = copy.deepcopy(final_data)
original_data['RTR']['cards'][0]['text'] = original_text
# Import the original data.
parse_data(original_data, ['RTR'])
eyes_in_the_skies = Card.objects.first()
self.assertEqual(eyes_in_the_skies.text, original_text)
# Import the final,updated data.
parse_data(final_data, ['RTR'])
eyes_in_the_skies.refresh_from_db()
self.assertEqual(eyes_in_the_skies.text, final_text)
def test_update_types(self):
with open(os.path.join(self.FIXTURES_DIR, 'jackal_pup.json')) as f:
final_data = json.load(f)
# copy the data and munge the types.
original_data = copy.deepcopy(final_data)
original_subtype = 'Hound'
original_data['TMP']['cards'][0]['subtypes'] = [original_subtype]
# Import the original data.
parse_data(original_data, ['TMP'])
jackal_pup = Card.objects.first()
self.assertEqual(jackal_pup.subtypes.count(), 1)
self.assertEqual(jackal_pup.subtypes.first().name, original_subtype)
# Import the final, ['TMP'])
jackal_pup.refresh_from_db()
self.assertEqual(jackal_pup.subtypes.count(), 'Jackal')
# The Hound subtype has been deleted.
self.assertFalse(CardSubtype.objects.filter(name=original_subtype).exists())
def test_update_loyalty(self):
"""
Simulates the upgrade process from version 0.2 to version 0.4.
"""
with open(os.path.join(self.FIXTURES_DIR, 'vraska_the_unseen.json')) as f:
final_data = json.load(f)
# copy the data and munge it to remove the loyalty.
original_data = copy.deepcopy(final_data)
del original_data['RTR']['cards'][0]['loyalty']
# Import the original data.
parse_data(original_data, ['RTR'])
vraska = Card.objects.first()
self.assertIsNone(vraska.loyalty)
# Import the final, ['RTR'])
vraska.refresh_from_db()
self.assertEqual(vraska.loyalty, 5)
def main():
if len(sys.argv) < 2:
sys.stderr.write("USAGE: %s measurement\n" % sys.argv[0])
sys.exit(1)
path = sys.argv[1]
with open(os.path.join(path, "Metadata.json")) as f:
Metadata = json.load(f)
start = date(Metadata["start"][:-1])
end = date(Metadata["start"][:-1])
print('open measurement "%s" from "%s" to "%s"', Metadata["name"], start, end)
for service in Metadata["services"]:
print('open service "%s"' % service["name"])
with open(os.path.join(path, service["filename"])) as csvfile:
r = csv.DictReader(csvfile, dialect=csv.excel_tab)
for row in r:
print(row["time"])
def configure(self):
# load config values
try:
with open(self.configfile) as configfile_contents:
self.config = json.load(configfile_contents)
except:
self.config = {}
try:
self.agent_services = self.api_session.get(self.api_endpoint + '/agent/services?stale').json()
except:
print_exc()
exit(135)
self.managed_service = self.agent_services[self.service]
if self.managed_service['Tags'] == None:
self.managed_service['Tags'] = []
if self.role_source == "facter":
self.get_facter_state(self.DEFAULT_FACTERFILE)
else:
print("!! unsupported PG role source !!")
exit(140)
def prompt_pick_backup(message):
"""Prompts the user to pick an existing database,and returns the
selected choice database ID and its Metadata"""
# First load all the saved databases (splitting extension and path)
saved_db = [path.splitext(path.split(f)[1])[0] for f in glob('backups/*.tlo')]
# Then prompt the user
print('Available backups databases:')
for i, db_id in enumerate(saved_db):
Metadata = get_Metadata(db_id)
print('{}. {},ID: {}'.format(i + 1,
Metadata.get('peer_name', '???'),
db_id))
db_id = saved_db[get_integer(message, 1, len(saved_db)) - 1]
return db_id, get_Metadata(db_id)
def _get_external_data(url):
result = {}
try:
# urlopen might fail if it runs into redirections, e)
return result
def load(self):
# XXX JSON is not a great database
for path in load_config_paths('wheel'):
conf = os.path.join(native(path), 'verifiers'):
if x not in self.data:
self.data[x] = []
if 'schema' not in self.data:
self.data['schema'] = self.SCHEMA
elif self.data['schema'] != self.SCHEMA:
raise ValueError(
"Bad wheel.json version {0}, self.SCHEMA))
break
return self
def read_store(self):
"""
Get the store from file.
"""
with open(self._path,'r',encoding='ascii') as fr:
outer_data = json.load(fr)
try:
validate(outer_data,OUTER_SCHEMA)
except ValidationError:
raise SSError("Invalid outer schema")
enc_bytes = hex_str_to_bytes(outer_data["enc_blob"])
try:
inner_data = self._Box.decrypt(enc_bytes)
except nacl.exceptions.CryptoError:
raise SSCryptoError("Wrong password")
return json.loads(inner_data.decode('utf-8'))
def __init__(self, path='config.json'):
CONfig_PATH = os.path.join(sys.path[0], path)
dict.__init__(self)
self.path = path
try:
self.load()
except:
print "Creando fichero de configuracion"
self['hs_threshold1'] = 100
self['hs_threshold2'] = 200
self['hs_apertura'] = 1
self['hs_blur'] = 3
self['hs_minLineLength'] = 100
self['hs_maxLineGap'] = 10
self['hs_kernel_dilate'] = 3
self['hs_kernel_erode'] = 3
self['hs_param1'] = 3
self['hs_param2'] = 3
self['hs_param3'] = 3
self['hs_dilate_iteracciones'] = 1
self.save()
def test_match_fetching(self):
"""Check if the server handle correctly a normal request."""
# Forward sample data fetched from the Riot API directly as result
this_dir = os.path.dirname(os.path.abspath(__file__))
with open(os.sep.join([this_dir, "samples", self.MATCH_DATA])) as f:
self.match_data = json.load(f)
# Setup mocks
self.service.cache_manager.find_match.return_value = None
self.service.riot_api_handler.get_match.return_value = self.match_data
response = self.stub.Match(service_pb2.MatchRequest(id=4242,
region=constants_pb2.EUW))
# Check the conversion of the sample is matching the response
converter = JSONConverter(None)
expected = converter.json_match_to_match_pb(self.match_data)
self.assertEqual(response, expected)
self.assertTrue(self.service.cache_manager.save_match.called)
def test_match_from_cache(self):
"""Check if the server returns a match from the cache."""
# Forward sample data fetched from the Riot API directly as result
this_dir = os.path.dirname(os.path.abspath(__file__))
with open(os.sep.join([this_dir, self.MATCH_DATA])) as f:
self.match_data = json.load(f)
# Setup mocks
self.service.cache_manager.find_match.return_value = self.match_data
response = self.stub.Match(service_pb2.MatchRequest(id=4242,
region=constants_pb2.EUW))
# Check the conversion of the sample is matching the response
self.assertTrue(self.service.cache_manager.find_match.called)
self.assertFalse(self.service.riot_api_handler.get_match.called)
self.assertFalse(self.service.cache_manager.save_match.called)
def load(self):
# Prepare + load directory.
super().load()
# Load the files and parse JSON.
parsed_settings = dict()
try:
for file_name in self.files:
file_path = os.path.join(self.directory, file_name)
with open(file_path, 'r') as file_handle:
parsed_settings.update(json.load(file_handle))
except json.JSONDecodeError as e:
raise ImproperlyConfigured(
'Your settings file(s) contain invalid JSON Syntax! Please fix and restart!,{}'.format(str(e))
)
# Loop and set in local settings (+ uppercase keys).
for key, value in parsed_settings.items():
self.settings[key.upper()] = value
def load_endpoints(self):
self.choose_endpoint.endpoints.clear()
for name in listdir(str(BASE_PATH / 'endpoints')):
if name.endswith('.json'):
item = QListWidgetItem(name.split('.json')[0], self.choose_endpoint.endpoints)
item.setFlags(item.flags() & ~Qt.ItemIsEnabled)
pb_msg_to_endpoints = defaultdict(list)
with open(str(BASE_PATH / 'endpoints' / name)) as fd:
for endpoint in load(fd, object_pairs_hook=OrderedDict):
pb_msg_to_endpoints[endpoint['request']['proto_msg'].split('.')[-1]].append(endpoint)
for pb_msg, endpoints in pb_msg_to_endpoints.items():
item = QListWidgetItem(' ' * 4 + pb_msg, self.choose_endpoint.endpoints)
item.setFlags(item.flags() & ~Qt.ItemIsEnabled)
for endpoint in endpoints:
path_and_qs = '/' + endpoint['request']['url'].split('/', 3).pop()
item = QListWidgetItem(' ' * 8 + path_and_qs, self.choose_endpoint.endpoints)
item.setData(Qt.UserRole, endpoint)
self.set_view(self.choose_endpoint)
def load_barcode_dist(filename, barcode_whitelist, gem_group, proportions=True):
""" Load barcode count distribution from a json file """
# Input barcode whitelist must be an ordered type;
# safeguard against it going out of sync with the distribution file
assert barcode_whitelist is None or isinstance(barcode_whitelist, list)
if not os.path.isfile(filename):
return None
with open(filename, 'r') as f:
values = json.load(f)
start = (gem_group-1)*len(barcode_whitelist)
end = gem_group*len(barcode_whitelist)
barcode_counts = {bc: value for bc, value in zip(barcode_whitelist, values[start:end])}
if proportions:
total_barcode_counts = sum(barcode_counts.values())
barcode_dist = {bc: tk_stats.robust_divide(float(value), float(total_barcode_counts))
for bc, value in barcode_counts.iteritems()}
return barcode_dist
else:
return barcode_counts
def split(args):
assert len(args.read1s) == len(args.read2s)
chunks = []
# Determine the number of buckets required to achieve
# the given chunk size.
chunks_per_gem_group = {}
with open(args.reads_summary) as f:
reads_summary = json.load(f)
for gg in args.gem_groups:
readpairs = reads_summary['%d_total_reads_per_gem_group' % gg]
chunks_per_gem_group[str(gg)] = max(2,
int(math.ceil(float(readpairs) / \
args.readpairs_per_chunk)))
for fastq1, fastq2 in itertools.izip(args.read1s, args.read2s):
chunks.append({
'read1s_chunk': fastq1,
'read2s_chunk': fastq2,
'chunks_per_gem_group': chunks_per_gem_group,
})
return {'chunks': chunks}
def main(args, outs):
# Write read_chunk for consumption by Rust
with open("chunk_args.json", "w") as f:
json.dump(args.read_chunk, f)
output_path = martian.make_path("")
prefix = "fastq_chunk"
chunk_reads_args = ['chunk_reads', '--reads-per-fastq', str(args.reads_per_file), output_path, prefix, "--martian-args", "chunk_args.json"]
print "running chunk reads: [%s]" % str(chunk_reads_args)
subprocess.check_call(chunk_reads_args)
with open(os.path.join(output_path, "read_chunks.json")) as f:
chunk_results = json.load(f)
outs.out_chunks = []
# Write out a new chunk entry for each resulting chunk
for chunk in chunk_results:
print args.read_chunk
chunk_copy = args.read_chunk.copy()
print chunk_copy
chunk_copy['read_chunks'] = chunk
outs.out_chunks.append(chunk_copy)
def load_model(file_path):
with open(file_path, "r") as f:
decision_trees, sparse_features, total_feature_count = load(f)
dts = {}
for key in decision_trees.keys():
dts[key] = []
for dt in decision_trees[key]:
d = DecisionTree([])
for k in dt['model'].keys():
setattr(d, k, dt['model'][k])
dt['model'] = d
dts[key].append(dt)
return ClassificationEngine(
decision_trees=dts,
sparse_features=sparse_features,
total_feature_count=total_feature_count
)
def requests_with_cache(dir):
def decorator(func):
def wrapper(**kwargs):
cache_key = str(kwargs.get("param", "default.json"))
cache_url = dir + "/" + cache_key.replace("/", "-").replace("_", "-")
if os.path.isfile(cache_url):
with open(cache_url, 'r') as f:
print(cache_url)
return json.load(f)
with open(cache_url, 'w+') as f:
ret = func(**kwargs)
json.dump(ret, f)
return ret
return wrapper
return decorator
def main():
if len(sys.argv) == 1:
infile = sys.stdin
outfile = sys.stdout
elif len(sys.argv) == 2:
infile = open(sys.argv[1], 'rb')
outfile = sys.stdout
elif len(sys.argv) == 3:
infile = open(sys.argv[1], 'rb')
outfile = open(sys.argv[2], 'wb')
else:
raise SystemExit(sys.argv[0] + " [infile [outfile]]")
try:
obj = json.load(infile)
except ValueError, e:
raise SystemExit(e)
json.dump(obj, outfile, indent=4)
outfile.write('\n')
def import_policy(cb, parser, args):
p = cb.create(Policy)
p.policy = json.load(open(args.policyfile, "r"))
p.description = args.description
p.name = args.name
p.priorityLevel = args.prioritylevel
p.version = 2
try:
p.save()
except ServerError as se:
print("Could not add policy: {0}".format(str(se)))
except Exception as e:
print("Could not add policy: {0}".format(str(e)))
else:
print("Added policy. New policy ID is {0}".format(p.id))
def create_graph():
logfile = 'result/log'
xs = []
ys = []
ls = []
f = open(logfile, 'r')
data = json.load(f)
print(data)
for d in data:
xs.append(d["iteration"])
ys.append(d["main/accuracy"])
ls.append(d["main/loss"])
plt.clf()
plt.cla()
plt.hlines(1, 0, np.max(xs), colors='r', linestyles="dashed") # y=-1,1??????
plt.title(r"loss/accuracy")
plt.plot(xs, ys, label="accuracy")
plt.plot(xs, ls, label="loss")
plt.legend()
plt.savefig("result/log.png")
def associate_lambda(group_config, lambda_config):
"""
Associate the Lambda described in the `lambda_config` with the
Greengrass Group described by the `group_config`
:param group_config: `gg_group_setup.GroupConfigFile` to store the group
:param lambda_config: the configuration describing the Lambda to
associate with the Greengrass Group
:return:
"""
with open(lambda_config, "r") as f:
cfg = json.load(f)
config = GroupConfigFile(config_file=group_config)
lambdas = config['lambda_functions']
lambdas[cfg['func_name']] = {
'arn': cfg['lambda_arn'],
'arn_qualifier': cfg['lambda_alias']
}
config['lambda_functions'] = lambdas
def load_cache(self, file):
import json
try:
with open(file, 'r') as f:
self.cache = json.load(f)
except:
# Fail silently
pass
def editPipeline(args, config):
pipelinedbutils = Pipelinedbutils(config)
request = json.loads(pipelinedbutils.getJobInfo(select=["request"], where={"job_id": args.jobId})[0].request)
_, tmp = mkstemp()
with open(tmp, 'w') as f:
f.write("{data}".format(data=json.dumps(request, indent=4)))
if "EDITOR" in os.environ.keys():
editor = os.environ["EDITOR"]
else:
editor = "/usr/bin/nano"
if subprocess.call([editor, tmp]) == 0:
with open(tmp, 'r') as f:
request = json.load(f)
pipelinedbutils.updateJob(args.jobId, keyName="job_id", setValues={"request": json.dumps(request)})
else:
print "ERROR: there was a problem editing the request"
exit(-1)
def _load_ready_file(self):
if self._ready is not None:
return
if os.path.exists(self._ready_file):
with open(self._ready_file) as fp:
self._ready = set(json.load(fp))
else:
self._ready = set()
def get_matchmaker_map(mm_file='/etc/oslo/matchmaker_ring.json'):
mm_map = {}
if os.path.isfile(mm_file):
with open(mm_file, 'r') as f:
mm_map = json.load(f)
return mm_map
def _git_yaml_load(projects_yaml):
"""
Load the specified yaml into a dictionary.
"""
if not projects_yaml:
return None
return yaml.load(projects_yaml)
def get_id_set(lang_codes):
feature_database = np.load("family_features.npz")
lang_codes = [ get_language_code(l, feature_database) for l in lang_codes ]
all_languages = list(feature_database["langs"])
feature_names = [ "ID_" + l.upper() for l in all_languages ]
values = np.zeros((len(lang_codes), len(feature_names)))
for i, lang_code in enumerate(lang_codes):
feature_index = get_language_index(lang_code, feature_database)
values[i, feature_index] = 1.0
return feature_names, values
def generate(location):
# cli wizard for creating a new contract from a template
if directory_has_smart_contract(location):
example_payload = json.load(open(glob.glob(os.path.join(location, '*.json'))[0]))
print(example_payload)
for k, v in example_payload.items():
value = input(k + ':')
if value != '':
example_payload[k] = value
print(example_payload)
code_path = glob.glob(os.path.join(location, '*.tsol'))
tsol.compile(open(code_path[0]), example_payload)
print('Code compiles with new payload.')
selection = ''
while True:
selection = input('(G)enerate solidity contract or (E)xport implementation:')
if selection.lower() == 'g':
output_name = input('Name your contract file without an extension:')
code = tsol.generate_code(open(code_path[0]).read(), example_payload)
open(os.path.join(location, '{}.sol'.format(output_name)), 'w').write(code)
break
if selection.lower() == 'e':
output_name = input('Name your implementation file without an extension:')
json.dump(example_payload, open(os.path.join(location, '{}.json'.format(output_name)), 'w'))
break
else:
print('Provided directory does not contain a *.tsol and *.json or does not compile.')
def __init__(self, path=None, filename=None):
self.kafka_offset_spec_file = os.path.join(
(path or "/tmp/"), (filename or 'kafka_offset_specs.json'))
self._kafka_offsets = {}
if os.path.exists(self.kafka_offset_spec_file):
try:
f = open(self.kafka_offset_spec_file)
kafka_offset_dict = json.load(f)
for key, value in kafka_offset_dict.items():
log.info("Found offset %s: %s", key, value)
self._kafka_offsets[key] = OffsetSpec(
app_name=value.get('app_name'),
topic=value.get('topic'),
partition=value.get('partition'),
from_offset=value.get('from_offset'),
until_offset=value.get('until_offset'),
batch_time=value.get('batch_time'),
last_updated=value.get('last_updated'),
revision=value.get('revision')
)
except Exception:
log.info('Invalid or corrupts offsets file found at %s,'
' starting over' % self.kafka_offset_spec_file)
else:
log.info('No kafka offsets found at startup')
def load_offset_file_as_json(self, file_path):
with open(file_path, 'r') as f:
json_file = json.load(f)
return json_file
def register(self, name, serializer):
"""Register ``serializer`` object under ``name``.
Raises :class:`AttributeError` if ``serializer`` in invalid.
.. note::
``name`` will be used as the file extension of the saved files.
:param name: Name to register ``serializer`` under
:type name: ``unicode`` or ``str``
:param serializer: object with ``load()`` and ``dump()``
methods
"""
# Basic validation
getattr(serializer, 'load')
getattr(serializer, 'dump')
self._serializers[name] = serializer
def load(cls, file_obj):
"""Load serialized object from open JSON file.
.. versionadded:: 1.8
:param file_obj: file handle
:type file_obj: ``file`` object
:returns: object loaded from JSON file
:rtype: object
"""
return json.load(file_obj)
def load(cls, file_obj):
"""Load serialized object from open pickle file.
.. versionadded:: 1.8
:param file_obj: file handle
:type file_obj: ``file`` object
:returns: object loaded from pickle file
:rtype: object
"""
return cPickle.load(file_obj)
def load(cls, file_obj):
"""Load serialized object from open pickle file.
.. versionadded:: 1.8
:param file_obj: file handle
:type file_obj: ``file`` object
:returns: object loaded from pickle file
:rtype: object
"""
return pickle.load(file_obj)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。