प्रोग्रामिंगमध्ये डेटा कॅशिंग हे एक सामान्य तंत्र आहे. हे आपल्याला दीर्घकालीन कारवाईशिवाय डेटा त्वरीत पुनर्प्राप्त करण्याची परवानगी देते. परंतु काही दीर्घकालीन कामातून डेटा कॅश करण्यात समस्या आहे. जर कॅशे मूल्य वगळले गेले तर ते आवश्यक असेल. जर हे दीर्घकालीन HTTP विनंती किंवा SQL स्टेटमेंट द्वारे आवश्यक असेल, तर पुढील कॅशे व्हॅल्यू विनंतीमुळे अनेक HTTP क्वेरी / SQL स्टेटमेंट पुन्हा पुन्हा होऊ शकतात. मी कॅशे अंमलबजावणी शोधत होतो जे प्रकल्प वापरून या समस्येचे निराकरण करते प्रकल्प अणुभट्टी, प्रोजेक्ट रिएक्टर रिअॅक्टिव्ह स्ट्रीम स्पेसिफिकेशनच्या वर बांधले गेले आहे, जे रिiveक्टिव applicationsप्लिकेशन तयार करण्यासाठी एक मानक आहे. तुम्हाला कदाचित माहित असेल Mono आणि Flux विषय: स्प्रिंग वेबफ्लक्स:, प्रोजेक्ट रिएक्टर स्प्रिंग वेबफ्लक्सची वैशिष्ट्यीकृत जेट लायब्ररी आहे.

या लेखात, मी एक प्रेरित कॅशे अंमलबजावणी सुचवेन CacheMono रिएक्टरच्या अॅडॉन्स प्रकल्पातून. आम्ही असे गृहीत धरू की दीर्घकालीन HTTP क्वेरी किंवा SQL स्टेटमेंटचा परिणाम a म्हणून दर्शवला जातोMono ऑब्जेक्ट अ Mono ऑब्जेक्ट “भौतिक” आहे आणि “रिएक्टर” म्हणून संग्रहित आहे Signal ऑब्जेक्ट जे a दर्शवते Mono, कॅशे मूल्य आवश्यक असल्यास सिग्नल मोनोच्या “अमूर्त” आहेत lookup पद्धत: समान की सह अनेक शोध समान ठेवतील Mono ऑब्जेक्ट, म्हणून दीर्घकालीन ऑपरेशन फक्त एकदाच केले जाते.

चला एक वर्ग तयार करू CacheMono तीन कारखाना पद्धतींद्वारे.

@Slf4j
public class CacheMono<KEY, IVALUE, OVALUE> {

    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private final Map<KEY, CacheMonoValue<OVALUE>> cache = new HashMap<>();

    /**
     * External value supplier which should be provided if "valuePublisher" with "keyExtractor"
     * are not set
     */
    private final Function<KEY, Mono<OVALUE>> valueSupplier;
    /**
     * External source publisher stream which should be provided if "valueSupplier" is not set
     */
    private final Flux<IVALUE> valuePublisher;
    /**
     * Key extractor for emitted items provided by "valuePublisher"
     */
    private final Function<IVALUE, KEY> keyExtractor;
    /**
     * Value extractor for emitted items provided by "valuePublisher"
     */
    private final Function<IVALUE, OVALUE> valueExtractor;

    private CacheMono(Function<KEY, Mono<OVALUE>> valueSupplier, Flux<IVALUE> valuePublisher,
            Function<IVALUE, KEY> keyExtractor, Function<IVALUE, OVALUE> valueExtractor) {
        this.valueSupplier = valueSupplier;
        this.valuePublisher = valuePublisher;
        this.keyExtractor = keyExtractor;
        this.valueExtractor = valueExtractor;
    }

    /**
     * Factory method to create a CacheMono instance from an external value supplier. The value
     * supplier is called by this CacheMono instance for retrieving values when they are missing
     * in cache ("pull" principle to retrieve not yet cached values).
     */
    public static <KEY, VALUE> CacheMono<KEY, VALUE, VALUE> fromSupplier(
            @NonNull Function<KEY, Mono<VALUE>> valueSupplier) {
        Objects.requireNonNull(valueSupplier);
        return new CacheMono<>(valueSupplier, null, null, null);
    }

    /**
     * Factory method to create a CacheMono instance from an external value publisher.
     * Published values will fill this cache (reactive "push" way).
     */
    public static <KEY, VALUE> CacheMono<KEY, VALUE, VALUE> fromPublisher(
            @NonNull Flux<VALUE> valuePublisher, @NonNull Function<VALUE, KEY> keyExtractor) {
        Objects.requireNonNull(valuePublisher);
        Objects.requireNonNull(keyExtractor);
        return createCacheMono(valuePublisher, keyExtractor, Function.identity());
    }

    /**
     * Factory method to create a CacheMono instance from an external value publisher.
     * Published values will fill this cache (reactive "push" way).
     */
    public static <KEY, IVALUE, OVALUE> CacheMono<KEY, IVALUE, OVALUE> fromPublisher(
            @NonNull Flux<IVALUE> valuePublisher,
            @NonNull Function<IVALUE, KEY> keyExtractor,
            @NonNull Function<IVALUE, OVALUE> valueExtractor) {
        Objects.requireNonNull(valuePublisher);
        Objects.requireNonNull(keyExtractor);
        return createCacheMono(valuePublisher, keyExtractor, valueExtractor);
    }
  
    private static <KEY, IVALUE, OVALUE> CacheMono<KEY, IVALUE, OVALUE> createCacheMono(
            @NonNull Flux<IVALUE> valuePublisher,
            @NonNull Function<IVALUE, KEY> keyExtractor,
            @NonNull Function<IVALUE, OVALUE> valueExtractor) {
        var cacheMono = new CacheMono<>(null, valuePublisher, keyExtractor, valueExtractor);
        valuePublisher.doOnEach(signal -> {
            if (signal.hasValue()) {
                final var inputValue = signal.get();
                final var outputSignal = Signal.next(valueExtractor.apply(inputValue));
                cacheMono.cache.put(keyExtractor.apply(inputValue),
                                    new CacheMonoValue<>(outputSignal));
            } else if (signal.isOnError()) {
                if (signal.getThrowable() == null) {
                    log.error("Error from value publisher");
                } else {
                    log.error("Error from value publisher, message = {}",
                              signal.getThrowable().getMessage());
                }
            }
        }).subscribe();

        return cacheMono;
    }
    
    ...
}

कॅश केलेली मूल्ये अद्याप पुनर्संचयित केली जाणार नाहीत valueSupplier किंवा: valuePublisherपहिला “पुल” तत्त्व वापरतो आणि दुसरा संचयित मूल्ये टिकवून ठेवण्यासाठी “पुश” तत्त्व वापरतो. याचा अर्थ एकतर valueSupplier किंवा: valuePublisher च्या सोबत keyExtractor आणि valueExtractor परिभाषित केले पाहिजे.

लक्षात ठेवा. आपण एकापेक्षा जास्त तयार केल्यास CacheMono समान मूल्य प्रकाशकाकडून, आपण a हस्तांतरित करणे आवश्यक आहे Flux प्रवाह जो इतिहास साठवतो – संभाव्य ग्राहकांना सुरुवातीपासून संग्रहित आयटम रिलीज करतो. हे आवश्यक आहे कारण हे CacheMono अंमलबजावणी फ्लक्स प्रवाहाच्या प्रवाहाची सदस्यता घेतली आहे जेणेकरून स्त्रोत प्रवाह चौरस मूल्यांकडे वाहतो तेव्हा कॅशे स्वयंचलितपणे भरला जातो (इतर कारखाना पद्धतीद्वारे प्रतिक्रियाशील “पुश” pull “पुल”). असे तयार करण्याचा सर्वात सोपा मार्ग Flux विद्यमान प्रवाहावरून कॉल करणे cache() कोणत्याही पद्धती Flux प्रवाह

तुम्ही बघू शकता, आम्ही प्रकरणांचा मागोवा ठेवतो CacheMonoValueहे फक्त गुंडाळले आहे Mono किंवा: Signal, आम्ही हा धडा अंतर्गत धडा म्हणून सादर करू शकतो.

private static class CacheMonoValue<VALUE> {

    private Mono<VALUE> mono;
    private Signal<VALUE> signal;

    CacheMonoValue(Mono<VALUE> mono) {
        this.mono = mono;
    }

    CacheMonoValue(Signal<VALUE> signal) {
        this.signal = signal;
    }

    Mono<VALUE> toMono() {
        if (mono != null) {
            return mono;
        }
        return Mono.justOrEmpty(signal).dematerialize();
    }

    Optional<VALUE> getValue() {
        if (signal == null) {
            return Optional.empty();
        }
        return Optional.ofNullable(signal.get());
    }
}

ते काही शब्दांत आपण पाहू Mono दीर्घ-अभिनय मूल्य ताबडतोब कॅशेमध्ये साठवले जाते. सारखे Mono नमुना सर्व शोधांसाठी समान की नंतर घेतला जातो. एकदा निकाल लागला Mono उपलब्ध, वाजवी मूल्य म्हणून साठवले जाते: Signal त्याच की अंतर्गत. बरं, स्टेप बाय स्टेप. इथे बघ lookup प्रथम पद्धत हे एक सुप्रसिद्ध नमुना वापरते. मूल्य वगळल्यास, तर्क आत आहे switchIfEmpty ऑपरेटर कार्यान्वित आहे.

/**
 * Finds a value by key in an in-memory cache or load it from a remote source.
 * The loaded value will be cached.
 */
public Mono<OVALUE> lookup(KEY key) {
    return Mono.defer(() -> getValueAsMono(key)
            .switchIfEmpty(Mono.defer(() -> onCacheMissResume(key)))
    );
}

private Mono<OVALUE> getValueAsMono(KEY key) {
    final Lock readLock = lock.readLock();
    readLock.lock();
    try {
        return Mono.justOrEmpty(cache.get(key)).flatMap(CacheMonoValue::toMono);
    } finally {
        readLock.unlock();
    }
}

private Mono<OVALUE> onCacheMissResume(KEY key) {
    final Lock writeLock = lock.writeLock();
    writeLock.lock();
    try {
        // check if value was already cached by another thread
        final var cachedValue = cache.get(key);
        if (cachedValue == null) {
            final Mono<OVALUE> monoValue;
            if (valuePublisher != null) {
                // get value from external value publisher
                monoValue = valuePublisher
                        .filter(value -> Objects.equals(keyExtractor.apply(value), key))
                        .map(valueExtractor)
                        .next();
            } else if (valueSupplier != null) {
                // get value from external supplier
                monoValue = valueSupplier.apply(key);
            } else {
                throw new IllegalStateException("Value can be not determined," +
                        "neither valuePublisher nor valueSupplier were set");
            }
            // cache Mono as value immediately
            cache.put(key, new CacheMonoValue<>(monoValue));

            // cache success and error values encapsulated in signal when it is available
            return monoValue.doOnEach(signal -> {
                if (signal.isOnNext()) {
                    cache.put(key, new CacheMonoValue<>(
                      Signal.next(Objects.requireNonNull(signal.get())))
                    );
                } else if (signal.isOnError()) {
                    final Signal<OVALUE> errorSignal;
                    if (signal.getThrowable() == null) {
                        errorSignal = Signal.error(
                          new Throwable("Getting value from external provider failed"));
                    } else {
                        errorSignal = Signal.error(signal.getThrowable());
                    }
                    cache.put(key, new CacheMonoValue<>(errorSignal));
                }
            });
        }
        return Mono.justOrEmpty(cachedValue).flatMap(CacheMonoValue::toMono);
    } finally {
        writeLock.unlock();
    }
}

मध्ये: onCacheMissResume, चुकलेले मूल्य वरीलद्वारे जतन केले जाईल valueSupplier किंवा: valuePublisher, मी म्हटल्याप्रमाणे, मूल्य ताबडतोब ए म्हणून संग्रहित केले जाते Mono त्यानंतरच्या सर्व शोधांसाठी ऑब्जेक्ट परत केला जातो. एकदा दीर्घकालीन क्रियेचे मूल्य उपलब्ध झाले की तर्कशास्त्र आत असते monoValue.doOnEach(...) घडत आहे. खर्च समाविष्ट Signal : फोन करून परत करता येतेsignal.get(),

चला काही सुलभ पद्धती देखील लागू करूया. विशेषतः अशा पद्धती ज्या कॅशमधून विद्यमान (जतन केलेली) मूल्ये परत करतात.

/**
 * Gets cached values as Java Stream. Returned stream is not sorted.
 */
public Stream<OVALUE> getValues() {
    final Lock readLock = lock.readLock();
    readLock.lock();
    try {
        return cache.values().stream().flatMap(cachedValue -> cachedValue.getValue().stream());
    } finally {
        readLock.unlock();
    }
}

/**
 * Gets cached value as Java Optional.
 */
public Optional<OVALUE> getValue(KEY key) {
    final Lock readLock = lock.readLock();
    readLock.lock();
    try {
        return Optional.ofNullable(cache.get(key)).flatMap(CacheMonoValue::getValue);
    } finally {
        readLock.unlock();
    }
}

/**
 * Removes the mapping for a key from this map if it is present.
 */
public void remove(KEY key) {
    final Lock writeLock = lock.writeLock();
    writeLock.lock();
    try {
        cache.remove(key);
    } finally {
        writeLock.unlock();
    }
}

वापरात आहे CacheMono धडा सोपा आहे. माझ्या सध्याच्या प्रोजेक्टमधून फक्त दोन कोड स्निपेट्स. पहिला एक तयार करतो CacheMono उदाहरणार्थ कॉल करून CacheMono.fromSupplier,

@Service
@Slf4j
@RequiredArgsConstructor
public class TopologyRepository {

    private final CacheMono<TopologyRef, TopologyDto, TopologyDto> cache;
    private final TopologyLoader topologyLoader;
    private final TopologyCreator topologyCreator;

    @Autowired
    public UnoTopologyRepository(TopologyLoader topologyLoader,
                                 TopologyCreator topologyCreator) {
        this.topologyLoader = topologyLoader;
        this.topologyCreator = topologyCreator;
        cache = CacheMono.fromSupplier(this::retrieveTopology);
    }

    /**
     * Finds a topology from this repository by reference.
     */
    public Mono<TopologyDto> findUnoTopology(TopologyRef topologyRef) {
        return cache.lookup(topologyRef)
                .doOnNext(topology ->
                          log.info("Topology was found by lookup with key {}", topologyRef))
                .onErrorResume(err -> {
                    log.error("Error on lookup Topology by key {}, message: {}",
                              topologyRef, err.getMessage());
                    return Mono.empty();
                });
    }

    private Mono<TopologyDto> retrieveTopology(TopologyRef topologyRef) {
        CompletableFuture<UnoTopologyDto> future = CompletableFuture.supplyAsync(() -> {
            final var loaderContext = topologyLoader.retrieveTopology(topologyRef);
            return topologyCreator.createTopology(loaderContext);
        });
        return Mono.fromFuture(future);
    }
}

दुसरा निर्माण करतो a CacheMono उदाहरणार्थ कॉल करून CacheMono.fromPublisher,

@Service
@Slf4j
@RequiredArgsConstructor
public class SspDefinitionenStore implements SspDefinitionConsumer {

    private CacheMono>VersionedId, SspDefinition, SspDefinition> sspDefinitionCache;
    private FluxSink>SspDefinition> sspDefinitionSink;

    @PostConstruct
    public void initialize() {
        sspDefinitionCache = CacheMono.fromPublisher(
                Flux.create(sink -> sspDefinitionSink = sink),
                SspDefinition::getId);
    }

    @Override
    public void accept(SspDefinition sspDefinition) {
        sspDefinitionSink.next(sspDefinition);
    }

    public Mono>SspDefinition> lookupSspDefinition(VersionedId sspId) {
        return sspDefinitionCache.lookup(sspId)
                .doOnNext(sspTopology -> log.info(
                    "SspDefinition was found by lookup with key {}", sspId))
                .onErrorResume(err -> {
                    log.error("Error on lookup SspDefinition by key {}, message: {}",
                              sspId, err.getMessage());
                    return Mono.empty();
                });
    }

    public Optional>SspDefinition> findSspDefinition(VersionedId sspId) {
        return sspDefinitionCache.getValue(sspId);
    }

    public Flux>SspDefinition> findSspDefinitions() {
        return Flux.fromStream(sspDefinitionCache.getValues().filter(Objects::nonNull));
    }

    ...
}

समाप्त: मजा करा!