/*
 * Decompiled with CFR 0.152.
 */
package io.intino.ness.datalake.hadoop;

import io.intino.alexandria.logger.Logger;
import io.intino.alexandria.zet.Zet;
import io.intino.alexandria.zet.ZetReader;
import io.intino.alexandria.zet.ZetStream;
import io.intino.ness.datalake.hadoop.HadoopStage;
import io.intino.ness.datalake.hadoop.sessions.SetSessionReader;
import io.intino.ness.ingestion.Fingerprint;
import io.intino.ness.ingestion.Session;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

public class SessionSealer {
    private final FileSystem fs;
    private final HadoopStage stage;
    private final Path events;
    private final Path sets;
    private final Path temp;

    public SessionSealer(FileSystem fs, HadoopStage stage, Path eventStore, Path setStore, Path temp) {
        this.fs = fs;
        this.stage = stage;
        this.events = eventStore;
        this.sets = setStore;
        this.temp = temp;
    }

    public void seal() {
        try {
            Job job = Job.getInstance();
            job.setJarByClass(this.getClass());
            job.setJobName("Sealer");
            FileInputFormat.addInputPath((Job)job, (Path)this.stage.path());
            job.setInputFormatClass(SequenceFileInputFormat.class);
            FileOutputFormat.setOutputPath((Job)job, (Path)this.sets);
            job.setMapperClass(SessionMapper.class);
            job.setCombinerClass(ChunksCombiner.class);
            job.setReducerClass(BlobReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(BytesWritable.class);
            job.setOutputFormatClass(SequenceFileOutputFormat.class);
            job.addCacheFile(this.sets.toUri());
            job.addCacheFile(this.events.toUri());
            job.setNumReduceTasks(2);
            job.waitForCompletion(true);
            if (job.isSuccessful()) {
                System.out.println("Job was successful");
            } else if (!job.isSuccessful()) {
                System.out.println("Job was not successful");
            }
        }
        catch (IOException | ClassNotFoundException | InterruptedException e) {
            Logger.error((Throwable)e);
        }
    }

    public static class BlobReducer
    extends Reducer<Text, BytesWritable, Text, BytesWritable> {
        private final BytesWritable zet = new BytesWritable();
        private Text fingerprint = new Text();

        protected void setup(Reducer.Context context) throws IOException, InterruptedException {
            super.setup(context);
        }

        protected void reduce(Text key, Iterable<BytesWritable> values, Reducer.Context context) throws IOException, InterruptedException {
            Fingerprint fingerprint = new Fingerprint(key.toString());
            Path destination = this.fileOf(context.getConfiguration(), fingerprint, new Path(context.getCacheFiles()[0]));
            SequenceFile.Reader reader = new SequenceFile.Reader(context.getConfiguration(), new SequenceFile.Reader.Option[]{SequenceFile.Reader.file((Path)destination)});
            byte[] bytes = values.iterator().next().getBytes();
        }

        private Path fileOf(Configuration conf, Fingerprint fingerprint, Path sets) {
            Path path = new Path(sets, fingerprint.tank() + "/" + fingerprint.timetag() + ".seq");
            try {
                FileSystem.get((Configuration)conf).mkdirs(path);
            }
            catch (IOException e) {
                Logger.error((Throwable)e);
            }
            return path;
        }
    }

    public static class ChunksCombiner
    extends Reducer<Text, BytesWritable, Text, BytesWritable> {
        private final BytesWritable zet = new BytesWritable();
        private Text fingerprint = new Text();

        protected void reduce(Text key, Iterable<BytesWritable> values, Reducer.Context context) throws IOException, InterruptedException {
            ZetReader sink = new ZetReader(new long[0]);
            for (BytesWritable value : values) {
                ByteBuffer buffer = ByteBuffer.allocate(8).put(value.getBytes(), 0, value.getLength());
                sink = new ZetStream.Merge(new ZetStream[]{sink, new ZetReader(buffer.asLongBuffer().array())});
            }
            ByteBuffer result = ByteBuffer.allocate(8);
            for (long id : new Zet((ZetStream)sink).ids()) {
                result.putLong(id);
            }
            this.zet.set(result.array(), 0, result.array().length);
            context.write((Object)this.fingerprint, (Object)this.zet);
        }
    }

    public static class SessionMapper
    extends Mapper<Text, BytesWritable, Text, BytesWritable> {
        private final BytesWritable zet = new BytesWritable();
        private Text fingerprint = new Text();

        protected void map(Text key, BytesWritable value, Mapper.Context context) throws IOException, InterruptedException {
            if (key.toString().startsWith(Session.Type.event.name())) {
                this.mapEventBlob(value.getBytes());
            } else {
                Map<String, byte[]> fingerprintMap = this.mapSetSessionsToZetStreams(value.getBytes());
                for (String fp : fingerprintMap.keySet()) {
                    this.fingerprint.set(fp);
                    byte[] bytes = fingerprintMap.get(fp);
                    this.zet.set(bytes, 0, bytes.length);
                    context.write((Object)this.fingerprint, (Object)this.zet);
                }
            }
        }

        private Map<String, byte[]> mapSetSessionsToZetStreams(byte[] bytes) {
            try {
                SetSessionReader reader = new SetSessionReader(bytes);
                HashMap<String, byte[]> chunks = new HashMap<String, byte[]>();
                for (Fingerprint fingerprint : reader.fingerprints()) {
                    chunks.put(fingerprint.toString(), this.merge(reader.streamsOf(fingerprint)));
                }
                return chunks;
            }
            catch (IOException e) {
                Logger.error((Throwable)e);
                return Collections.emptyMap();
            }
        }

        private void mapEventBlob(byte[] bytes) {
        }

        private byte[] merge(List<ZetStream> zetStreams) {
            long[] ids;
            ByteBuffer buffer = ByteBuffer.allocate(8);
            for (long id : ids = new Zet((ZetStream)new ZetStream.Merge(zetStreams)).ids()) {
                buffer.putLong(id);
            }
            return buffer.array();
        }
    }
}

