Gao Conghui

好好学习,天天向上

爬虫,视频处理


一种神奇的批量创建类的方法--参照namedtuple

def namedtuple(typename, field_names, verbose=False, rename=False):
  """Returns a new subclass of tuple with named fields.

namedtuple是一个很神奇的东西,在看kafka-python的时候看到了很多地方用到了这玩意。用法也非常简单

from collections import namedtuple

People = namedtuple(typename="people",field_names=["sex","name"],verbose=True)
print People(sex="f",name="abc")

很方便的创建一个父类为tuple的类,且可以用kv对的方式创建对象。

怎么实现的嘞?我能最快想到的方法大概是使用type直接创建一个类。

def __new__(_cls, sex, name):
    return tuple.__new__(_cls, (sex, name))

People2 = type("People2",(tuple,),dict(__new__=__new__))
print People2(sex="f",name="abc")

大概是这样的,但是还有一些问题,比如打印出来的样子啦 之类的,虽然麻烦点,总归是能做的。

又或者是使用metaclass搞个类工厂,会比type方便不少。

带着疑问,点开了namedtuple的源码。

class_definition = _class_template.format(
    typename = typename,
    field_names = tuple(field_names),
    num_fields = len(field_names),
    arg_list = repr(tuple(field_names)).replace("'", "")[1:-1],
    repr_fmt = ', '.join(_repr_template.format(name=name)
                         for name in field_names),
    field_defs = '\n'.join(_field_template.format(index=index, name=name)
                           for index, name in enumerate(field_names))
)

# Execute the template string in a temporary namespace and support
# tracing utilities by setting a value for frame.f_globals['__name__']
namespace = dict(_itemgetter=_itemgetter, __name__='namedtuple_%s' % typename,
                 OrderedDict=OrderedDict, _property=property, _tuple=tuple)
try:
    exec class_definition in namespace
except SyntaxError as e:
    raise SyntaxError(e.message + ':\n' + class_definition)
result = namespace[typename]

感觉画风有点不太对了…_class_template是个啥玩意,咋还用exec嘞?

_class_template = '''\
class {typename}(tuple):
    '{typename}({arg_list})'

    __slots__ = ()

    _fields = {field_names!r}

    def __new__(_cls, {arg_list}):
        'Create new instance of {typename}({arg_list})'
        return _tuple.__new__(_cls, ({arg_list}))

    @classmethod
    def _make(cls, iterable, new=tuple.__new__, len=len):
        'Make a new {typename} object from a sequence or iterable'
        result = new(cls, iterable)
        if len(result) != {num_fields:d}:
            raise TypeError('Expected {num_fields:d} arguments, got %d' % len(result))
        return result

    def __repr__(self):
        'Return a nicely formatted representation string'
        return '{typename}({repr_fmt})' % self

    def _asdict(self):
        'Return a new OrderedDict which maps field names to their values'
        return OrderedDict(zip(self._fields, self))

    def _replace(_self, **kwds):
        'Return a new {typename} object replacing specified fields with new values'
        result = _self._make(map(kwds.pop, {field_names!r}, _self))
        if kwds:
            raise ValueError('Got unexpected field names: %r' % kwds.keys())
        return result

    def __getnewargs__(self):
        'Return self as a plain tuple.  Used by copy and pickle.'
        return tuple(self)

    __dict__ = _property(_asdict)

    def __getstate__(self):
        'Exclude the OrderedDict from pickling'
        pass

{field_defs}
'''

哦。

最近的文章

优雅关闭以及机器kubernetes pods

优雅启动很常见的一个场景,一个服务刚启动,可能会有一堆东西要加载(比如我这边需要读数据库中一堆东西)需要一些时间,而这段时间里,我不希望kubernetes 把请求打到这些还没初始化的pod上。kubernetes提供了一个叫探针的东西,可以用来检测pod是否就绪,只有就绪的情况才会把请求打过来,如果非就绪状态,这些pod会从service的load balancer中暂时移除。探针可以是一个command或者是一个HTTP的请求,这边使用的是一个HTTP请求的形式。需要保证程序在正常情况...…

kubernetes继续阅读
更早的文章

kafka-python 获取topic lag值

说真,这个问题看上去很简单,但“得益”与kafka-python神奇的文档,真的不算简单,反正我是搜了半天还看了半天源码。直接上代码吧from kafka import SimpleClient, KafkaConsumerfrom kafka.common import OffsetRequestPayload, TopicPartitiondef get_topic_offset(brokers, topic): """ 获取一个topic的offset值的和 """...…

python kafka继续阅读