mirror of
				https://github.com/k3s-io/kubernetes.git
				synced 2025-11-03 23:40:03 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			145 lines
		
	
	
		
			6.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			145 lines
		
	
	
		
			6.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#!/usr/bin/env python
 | 
						|
 | 
						|
# Copyright 2015 The Kubernetes Authors All rights reserved.
 | 
						|
#
 | 
						|
# Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
# you may not use this file except in compliance with the License.
 | 
						|
# You may obtain a copy of the License at
 | 
						|
#
 | 
						|
#     http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
#
 | 
						|
# Unless required by applicable law or agreed to in writing, software
 | 
						|
# distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
# See the License for the specific language governing permissions and
 | 
						|
# limitations under the License.
 | 
						|
 | 
						|
# Based on http://peter-hoffmann.com/2012/python-simple-queue-redis-queue.html 
 | 
						|
# and the suggestion in the redis documentation for RPOPLPUSH, at 
 | 
						|
# http://redis.io/commands/rpoplpush, which suggests how to implement a work-queue.
 | 
						|
 | 
						|
 
 | 
						|
import redis
 | 
						|
import uuid
 | 
						|
import hashlib
 | 
						|
 | 
						|
class RedisWQ(object):
 | 
						|
    """Simple Finite Work Queue with Redis Backend
 | 
						|
 | 
						|
    This work queue is finite: as long as no more work is added
 | 
						|
    after workers start, the workers can detect when the queue
 | 
						|
    is completely empty.
 | 
						|
 | 
						|
    The items in the work queue are assumed to have unique values.
 | 
						|
 | 
						|
    This object is not intended to be used by multiple threads
 | 
						|
    concurrently.
 | 
						|
    """
 | 
						|
    def __init__(self, name, **redis_kwargs):
 | 
						|
       """The default connection parameters are: host='localhost', port=6379, db=0
 | 
						|
 | 
						|
       The work queue is identified by "name".  The library may create other
 | 
						|
       keys with "name" as a prefix. 
 | 
						|
       """
 | 
						|
       self._db = redis.StrictRedis(**redis_kwargs)
 | 
						|
       # The session ID will uniquely identify this "worker".
 | 
						|
       self._session = str(uuid.uuid4())
 | 
						|
       # Work queue is implemented as two queues: main, and processing.
 | 
						|
       # Work is initially in main, and moved to processing when a client picks it up.
 | 
						|
       self._main_q_key = name
 | 
						|
       self._processing_q_key = name + ":processing"
 | 
						|
       self._lease_key_prefix = name + ":leased_by_session:"
 | 
						|
 | 
						|
    def sessionID(self):
 | 
						|
        """Return the ID for this session."""
 | 
						|
        return self._session
 | 
						|
 | 
						|
    def _main_qsize(self):
 | 
						|
        """Return the size of the main queue."""
 | 
						|
        return self._db.llen(self._main_q_key)
 | 
						|
 | 
						|
    def _processing_qsize(self):
 | 
						|
        """Return the size of the main queue."""
 | 
						|
        return self._db.llen(self._processing_q_key)
 | 
						|
 | 
						|
    def empty(self):
 | 
						|
        """Return True if the queue is empty, including work being done, False otherwise.
 | 
						|
 | 
						|
        False does not necessarily mean that there is work available to work on right now,
 | 
						|
        """
 | 
						|
        return self._main_qsize() == 0 and self._processing_qsize() == 0
 | 
						|
 | 
						|
# TODO: implement this
 | 
						|
#    def check_expired_leases(self):
 | 
						|
#        """Return to the work queueReturn True if the queue is empty, False otherwise."""
 | 
						|
#        # Processing list should not be _too_ long since it is approximately as long
 | 
						|
#        # as the number of active and recently active workers.
 | 
						|
#        processing = self._db.lrange(self._processing_q_key, 0, -1)
 | 
						|
#        for item in processing:
 | 
						|
#          # If the lease key is not present for an item (it expired or was 
 | 
						|
#          # never created because the client crashed before creating it)
 | 
						|
#          # then move the item back to the main queue so others can work on it.
 | 
						|
#          if not self._lease_exists(item):
 | 
						|
#            TODO: transactionally move the key from processing queue to
 | 
						|
#            to main queue, while detecting if a new lease is created
 | 
						|
#            or if either queue is modified.
 | 
						|
 | 
						|
    def _itemkey(self, item):
 | 
						|
        """Returns a string that uniquely identifies an item (bytes)."""
 | 
						|
        return hashlib.sha224(item).hexdigest()
 | 
						|
 | 
						|
    def _lease_exists(self, item):
 | 
						|
        """True if a lease on 'item' exists."""
 | 
						|
        return self._db.exists(self._lease_key_prefix + self._itemkey(item))
 | 
						|
 | 
						|
    def lease(self, lease_secs=60, block=True, timeout=None):
 | 
						|
        """Begin working on an item the work queue. 
 | 
						|
 | 
						|
        Lease the item for lease_secs.  After that time, other
 | 
						|
        workers may consider this client to have crashed or stalled
 | 
						|
        and pick up the item instead.
 | 
						|
 | 
						|
        If optional args block is true and timeout is None (the default), block
 | 
						|
        if necessary until an item is available."""
 | 
						|
        if block:
 | 
						|
            item = self._db.brpoplpush(self._main_q_key, self._processing_q_key, timeout=timeout)
 | 
						|
        else:
 | 
						|
            item = self._db.rpoplpush(self._main_q_key, self._processing_q_key)
 | 
						|
        if item:
 | 
						|
            # Record that we (this session id) are working on a key.  Expire that
 | 
						|
            # note after the lease timeout.
 | 
						|
            # Note: if we crash at this line of the program, then GC will see no lease
 | 
						|
            # for this item an later return it to the main queue.
 | 
						|
            itemkey = self._itemkey(item)
 | 
						|
            self._db.setex(self._lease_key_prefix + itemkey, lease_secs, self._session)
 | 
						|
        return item
 | 
						|
 | 
						|
    def complete(self, value):
 | 
						|
        """Complete working on the item with 'value'.
 | 
						|
 | 
						|
        If the lease expired, the item may not have completed, and some
 | 
						|
        other worker may have picked it up.  There is no indication
 | 
						|
        of what happened.
 | 
						|
        """
 | 
						|
        self._db.lrem(self._processing_q_key, 0, value)
 | 
						|
        # If we crash here, then the GC code will try to move the value, but it will
 | 
						|
        # not be here, which is fine.  So this does not need to be a transaction.
 | 
						|
        itemkey = self._itemkey(value)
 | 
						|
        self._db.delete(self._lease_key_prefix + itemkey, self._session)
 | 
						|
 | 
						|
# TODO: add functions to clean up all keys associated with "name" when
 | 
						|
# processing is complete.
 | 
						|
 | 
						|
# TODO: add a function to add an item to the queue.  Atomically
 | 
						|
# check if the queue is empty and if so fail to add the item
 | 
						|
# since other workers might think work is done and be in the process
 | 
						|
# of exiting.
 | 
						|
 | 
						|
# TODO(etune): move to my own github for hosting, e.g. github.com/erictune/rediswq-py and
 | 
						|
# make it so it can be pip installed by anyone (see
 | 
						|
# http://stackoverflow.com/questions/8247605/configuring-so-that-pip-install-can-work-from-github)
 | 
						|
 | 
						|
# TODO(etune): finish code to GC expired leases, and call periodically
 | 
						|
#  e.g. each time lease times out.
 | 
						|
 |