Java教程

python实现java mybatis-plus的雪花算法

本文主要是介绍python实现java mybatis-plus的雪花算法,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
import copy
import os
import random
import time


class Sequence:
    # 时间起始标记点,作为基准,一般取系统的最近时间(一旦确定不能变动)
    twepoch = 1288834974657
    # 机器标识位数
    worker_id_bits = 5
    datacenter_id_bits = 5
    max_worker_id = -1 ^ (-1 << worker_id_bits)
    maxDatacenterId = -1 ^ (-1 << datacenter_id_bits)
    # 毫秒内自增位
    sequenceBits = 12
    workerIdShift = sequenceBits
    datacenterIdShift = sequenceBits + worker_id_bits
    timestampLeftShift = sequenceBits + worker_id_bits + datacenter_id_bits
    sequenceMask = -1 ^ (-1 << sequenceBits)

    workerId = 1
    # 数据标识 ID  部分
    datacenterId = 0
    # 并发控制
    sequence = 0
    # 上次生产ID时间戳
    lastTimestamp = -1

    def __int__(self):
        self.datacenterId = self._get_datacenter_id(self.maxDatacenterId)
        self.workerId = self._get_max_workerId(self.datacenterId, self.max_worker_id)

    # 数据标识id部分
    def _get_datacenter_id(self, maxDatacenterId):
        mac = self.__get_mac_address()
        id = ((0x000000FF & mac[len(mac) - 1]) | (0x0000FF00 & ((mac[len(mac) - 2]) << 8))) >> 6
        id = id % (maxDatacenterId + 1)
        return id

    # 获取maxworkerid
    def _get_max_workerId(self, datacenterId, maxWorkerId):
        mpid = str(datacenterId) + str(os.getpid())
        return (hash(mpid) & 0xffff) % (maxWorkerId + 1)

    def __get_mac_address(self):
        import uuid
        node = uuid.getnode()
        mac = uuid.UUID(int=node).hex[-12:]
        decimalismMac = [
            int(mac[0:2], 16), int(mac[2:4], 16), int(mac[4:6], 16), int(mac[6:8], 16), int(mac[8:10], 16),
            int(mac[10:12], 16)
        ]
        return decimalismMac

    # 获取下一个id
    @classmethod
    def nextId(cls):

        timestamp = int(time.time() * 1000)
        # 闰秒
        if (timestamp < cls.lastTimestamp):
            offset = cls.lastTimestamp - timestamp
            if offset <= 5:
                time.sleep(offset << 1)
                timestamp = int(time.time() * 1000)
                if timestamp < cls.lastTimestamp:
                    return None
            else:
                return None

        if cls.lastTimestamp == timestamp:
            #  相同毫秒内,序列号自增
            cls.sequence = (cls.sequence + 1) & cls.sequenceMask
            if cls.sequence == 0:
                # 同一毫秒的序列数已经达到最大
                timestamp = cls.__til_next_millis(cls.lastTimestamp)
        else:
            # 不同毫秒内,序列号置为 1 - 3 随机数
            cls.sequence = random.randrange(1, 4)

        cls.lastTimestamp = copy.copy(timestamp)

        # 时间戳部分 | 数据中心部分 | 机器标识部分 | 序列号部分
        return ((timestamp - cls.twepoch) << cls.timestampLeftShift) | (
                cls.datacenterId << cls.datacenterIdShift) | (cls.workerId << cls.workerIdShift) | cls.sequence

    def __til_next_millis(self, lastTimestamp):
        timestamp = int(time.time() * 1000)
        while timestamp <= lastTimestamp:
            timestamp = int(time.time() * 1000)
        return timestamp

这篇关于python实现java mybatis-plus的雪花算法的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!