package com.neusoft.bsh.boot.redis.queue;

import com.neusoft.bsh.boot.redis.operation.RedisListOperations;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/neusoft/bsh/boot/redis/queue/AbstractRedisQueueConsumer.class */
public abstract class AbstractRedisQueueConsumer<T> {
    private static final Logger log = LoggerFactory.getLogger(AbstractRedisQueueConsumer.class);

    protected abstract String getQueueName();

    protected abstract void process(T t);

    protected abstract Class<T> getDataClass();

    protected abstract RedisListOperations getRedisListOperations();

    protected int getConsumerSize() {
        return 1;
    }

    private long popDelaySeconds() {
        return 5L;
    }

    public void init() {
        int consumerSize = getConsumerSize();
        String queueName = getQueueName();
        long popDelaySeconds = popDelaySeconds();
        Class<T> dataClass = getDataClass();
        for (int i = 0; i < consumerSize; i++) {
            log.info("监听队列【{}】开始", queueName);
            new Thread(() -> {
                while (true) {
                    try {
                        Object rightPop = getRedisListOperations().rightPop(queueName, popDelaySeconds, TimeUnit.SECONDS, dataClass);
                        if (rightPop != null) {
                            process(rightPop);
                        }
                    } catch (Exception e) {
                        log.error("消费消息异常.队列={}", queueName, e);
                    }
                }
            }).start();
        }
    }
}
